import hashlib from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlmodel import Session, select from app.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") ALGORITHM = "HS256" def create_access_token(subject: str) -> str: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM) def hash_token(token: str) -> str: return hashlib.sha256(token.encode()).hexdigest() def get_current_user(token: str = Depends(oauth2_scheme)) -> str: # Try JWT first try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return username except JWTError: pass # Fallback: check API token from app.db import get_session from app.models.models import APIToken token_hash = hash_token(token) with next(get_session()) as session: api_token = session.exec( select(APIToken).where( APIToken.token_hash == token_hash, APIToken.is_active == True, ) ).first() if api_token: if api_token.expires_at and api_token.expires_at < datetime.utcnow(): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") return f"api:{api_token.name}" raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)