Files
WealthySmart/backend/app/auth.py
Carlos Escalante 0a8e00e227
All checks were successful
Deploy to VPS / deploy (push) Successful in 45s
Add accounts expansion, analytics, exchange rates, API tokens, PWA support, and UI overhaul
- Expand Account model with account_type (pension, savings, liability, crypto), new banks/currencies (BTC, XMR, FCL, ROP, VOL, MEMP, MPAT, MORTGAGE), and next_payment field
- Add exchange rate endpoint (BCCR integration), analytics endpoint, paste-import for transactions, and API token management
- Add PWA manifest, service worker, and app icons
- Redesign dashboard, transactions, transfers, and login pages with theme support
- Add billing cycle selector, confirm dialog, and paste import modal components
- One-time DB reset in deploy workflow for schema migration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 18:23:47 -06:00

54 lines
1.7 KiB
Python

import hashlib
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlmodel import Session, select
from app.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
ALGORITHM = "HS256"
def create_access_token(subject: str) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM)
def hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
# Try JWT first
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return username
except JWTError:
pass
# Fallback: check API token
from app.db import get_session
from app.models.models import APIToken
token_hash = hash_token(token)
with next(get_session()) as session:
api_token = session.exec(
select(APIToken).where(
APIToken.token_hash == token_hash,
APIToken.is_active == True,
)
).first()
if api_token:
if api_token.expires_at and api_token.expires_at < datetime.utcnow():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
return f"api:{api_token.name}"
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)