Files
WealthySmart/backend/app/auth.py
Carlos Escalante 140a75f706 Add cookie-based SPA auth and update container plumbing
Backend now exposes /api/auth/login + /api/auth/logout setting an
httpOnly ws_token cookie, and get_current_user accepts either the
cookie (SPA) or a Bearer token (n8n/CLI). AuthContext probes the
cookie via /api/v1/auth/me. Dockerfiles and compose files updated
for the new agent service deps and CopilotKit dev sidecar.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 22:02:02 -06:00

79 lines
2.6 KiB
Python

import hashlib
import re
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Cookie, Depends, Header, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlmodel import Session, select
from app.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
ALGORITHM = "HS256"
def create_access_token(subject: str) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM)
def hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
def _validate_token(token: str) -> str:
"""Validate JWT and return subject, or raise 401."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return username
except JWTError:
pass
# Fallback: check API token
from app.db import get_session
from app.models.models import APIToken
token_hash = hash_token(token)
with next(get_session()) as session:
api_token = session.exec(
select(APIToken).where(
APIToken.token_hash == token_hash,
APIToken.is_active == True,
)
).first()
if api_token:
if api_token.expires_at and api_token.expires_at < datetime.utcnow():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
return f"api:{api_token.name}"
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
def get_current_user_cookie_or_bearer(
authorization: Optional[str] = Header(default=None),
ws_token: Optional[str] = Cookie(default=None),
) -> str:
"""Accepts httpOnly cookie (SPA) or Bearer token (API clients / n8n)."""
token: Optional[str] = None
if authorization and authorization.lower().startswith("bearer "):
token = authorization.split(" ", 1)[1].strip()
elif ws_token:
token = ws_token
if not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return _validate_token(token)
def get_current_user(
authorization: Optional[str] = Header(default=None),
ws_token: Optional[str] = Cookie(default=None),
) -> str:
"""SPA cookie or Bearer token. Single dependency for all v1 endpoints."""
return get_current_user_cookie_or_bearer(authorization, ws_token)