mirror of
https://github.com/escalante29/WealthySmart.git
synced 2026-05-19 13:28:48 +02:00
All checks were successful
Deploy to VPS / deploy (push) Successful in 45s
- Expand Account model with account_type (pension, savings, liability, crypto), new banks/currencies (BTC, XMR, FCL, ROP, VOL, MEMP, MPAT, MORTGAGE), and next_payment field - Add exchange rate endpoint (BCCR integration), analytics endpoint, paste-import for transactions, and API token management - Add PWA manifest, service worker, and app icons - Redesign dashboard, transactions, transfers, and login pages with theme support - Add billing cycle selector, confirm dialog, and paste import modal components - One-time DB reset in deploy workflow for schema migration Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
54 lines
1.7 KiB
Python
54 lines
1.7 KiB
Python
import hashlib
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import JWTError, jwt
|
|
from sqlmodel import Session, select
|
|
|
|
from app.config import settings
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
|
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
def create_access_token(subject: str) -> str:
|
|
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
return jwt.encode({"sub": subject, "exp": expire}, settings.SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
|
|
# Try JWT first
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|
|
return username
|
|
except JWTError:
|
|
pass
|
|
|
|
# Fallback: check API token
|
|
from app.db import get_session
|
|
from app.models.models import APIToken
|
|
|
|
token_hash = hash_token(token)
|
|
with next(get_session()) as session:
|
|
api_token = session.exec(
|
|
select(APIToken).where(
|
|
APIToken.token_hash == token_hash,
|
|
APIToken.is_active == True,
|
|
)
|
|
).first()
|
|
if api_token:
|
|
if api_token.expires_at and api_token.expires_at < datetime.utcnow():
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
|
|
return f"api:{api_token.name}"
|
|
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
|