mirror of
https://github.com/escalante29/WealthySmart.git
synced 2026-05-19 08:48:48 +02:00
Backend now exposes /api/auth/login + /api/auth/logout setting an httpOnly ws_token cookie, and get_current_user accepts either the cookie (SPA) or a Bearer token (n8n/CLI). AuthContext probes the cookie via /api/v1/auth/me. Dockerfiles and compose files updated for the new agent service deps and CopilotKit dev sidecar. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
79 lines
2.6 KiB
Python
79 lines
2.6 KiB
Python
import hashlib
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from fastapi import Cookie, Depends, Header, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import JWTError, jwt
|
|
from sqlmodel import Session, select
|
|
|
|
from app.config import settings
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
|
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
def create_access_token(subject: str) -> str:
|
|
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
def _validate_token(token: str) -> str:
|
|
"""Validate JWT and return subject, or raise 401."""
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
|
return username
|
|
except JWTError:
|
|
pass
|
|
|
|
# Fallback: check API token
|
|
from app.db import get_session
|
|
from app.models.models import APIToken
|
|
|
|
token_hash = hash_token(token)
|
|
with next(get_session()) as session:
|
|
api_token = session.exec(
|
|
select(APIToken).where(
|
|
APIToken.token_hash == token_hash,
|
|
APIToken.is_active == True,
|
|
)
|
|
).first()
|
|
if api_token:
|
|
if api_token.expires_at and api_token.expires_at < datetime.utcnow():
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
|
|
return f"api:{api_token.name}"
|
|
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
|
|
|
|
|
def get_current_user_cookie_or_bearer(
|
|
authorization: Optional[str] = Header(default=None),
|
|
ws_token: Optional[str] = Cookie(default=None),
|
|
) -> str:
|
|
"""Accepts httpOnly cookie (SPA) or Bearer token (API clients / n8n)."""
|
|
token: Optional[str] = None
|
|
if authorization and authorization.lower().startswith("bearer "):
|
|
token = authorization.split(" ", 1)[1].strip()
|
|
elif ws_token:
|
|
token = ws_token
|
|
if not token:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
|
return _validate_token(token)
|
|
|
|
|
|
def get_current_user(
|
|
authorization: Optional[str] = Header(default=None),
|
|
ws_token: Optional[str] = Cookie(default=None),
|
|
) -> str:
|
|
"""SPA cookie or Bearer token. Single dependency for all v1 endpoints."""
|
|
return get_current_user_cookie_or_bearer(authorization, ws_token)
|