import hashlib import re from datetime import datetime, timedelta from typing import Optional from fastapi import Cookie, Depends, Header, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlmodel import Session, select from app.config import settings oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") ALGORITHM = "HS256" def create_access_token(subject: str) -> str: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM) def hash_token(token: str) -> str: return hashlib.sha256(token.encode()).hexdigest() def _validate_token(token: str) -> str: """Validate JWT and return subject, or raise 401.""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return username except JWTError: pass # Fallback: check API token from app.db import get_session from app.models.models import APIToken token_hash = hash_token(token) with next(get_session()) as session: api_token = session.exec( select(APIToken).where( APIToken.token_hash == token_hash, APIToken.is_active == True, ) ).first() if api_token: if api_token.expires_at and api_token.expires_at < datetime.utcnow(): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired") return f"api:{api_token.name}" raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) def get_current_user_cookie_or_bearer( authorization: Optional[str] = Header(default=None), ws_token: Optional[str] = Cookie(default=None), ) -> str: """Accepts httpOnly cookie (SPA) or Bearer token (API clients / n8n).""" token: Optional[str] = None if authorization and authorization.lower().startswith("bearer "): token = authorization.split(" ", 1)[1].strip() elif ws_token: token = ws_token if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) return _validate_token(token) def get_current_user( authorization: Optional[str] = Header(default=None), ws_token: Optional[str] = Cookie(default=None), ) -> str: """SPA cookie or Bearer token. Single dependency for all v1 endpoints.""" return get_current_user_cookie_or_bearer(authorization, ws_token)