Files
WealthySmart/backend/app/api/v1/endpoints/analytics.py
Carlos Escalante 792cef5006
All checks were successful
Deploy to VPS / deploy (push) Successful in 28s
Fix analytics case() bug, add privacy mode, add prod DB sync script
Fix SQLAlchemy case() import in monthly-trend endpoint. Add
data-sensitive attributes to Analytics charts and tables for privacy
blur. Add scripts/sync-db.sh for one-click prod-to-local PostgreSQL
sync. Remove SQLite artifacts from gitignore.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 20:10:58 -06:00

186 lines
5.1 KiB
Python

from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import case
from sqlmodel import Session, func, select
from app.auth import get_current_user
from app.db import get_session
from app.models.models import Category, Transaction
from app.services.budget_projection import get_cycle_range
router = APIRouter(prefix="/analytics", tags=["analytics"])
class CategorySpending(BaseModel):
category_id: Optional[int]
category_name: str
total: float
count: int
percentage: float
class MonthlyTrend(BaseModel):
year: int
month: int
label: str
total_crc: float
total_usd: float
count: int
class DailySpending(BaseModel):
date: str
total: float
count: int
@router.get("/by-category", response_model=list[CategorySpending])
def spending_by_category(
cycle_year: Optional[int] = None,
cycle_month: Optional[int] = None,
session: Session = Depends(get_session),
_user: str = Depends(get_current_user),
):
query = (
select(
Transaction.category_id,
func.sum(Transaction.amount).label("total"),
func.count().label("count"),
)
.where(Transaction.transaction_type == "COMPRA")
.group_by(Transaction.category_id)
)
if cycle_year and cycle_month:
start, end = get_cycle_range(cycle_year, cycle_month)
query = query.where(Transaction.date >= start, Transaction.date < end)
rows = session.exec(query).all()
grand_total = sum(r[1] for r in rows) or 1
results = []
for category_id, total, count in rows:
cat_name = "Uncategorized"
if category_id:
cat = session.get(Category, category_id)
if cat:
cat_name = cat.name
results.append(
CategorySpending(
category_id=category_id,
category_name=cat_name,
total=float(total),
count=count,
percentage=round(float(total) / grand_total * 100, 1),
)
)
return sorted(results, key=lambda x: x.total, reverse=True)
@router.get("/monthly-trend", response_model=list[MonthlyTrend])
def monthly_trend(
months: int = 6,
session: Session = Depends(get_session),
_user: str = Depends(get_current_user),
):
"""Monthly spending totals using billing cycle boundaries (18th-18th)."""
now = datetime.now()
results = []
month_names = [
"", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
]
y, m = now.year, now.month
for _ in range(months):
start, end = get_cycle_range(y, m)
row = session.exec(
select(
func.count(),
func.coalesce(
func.sum(
case(
(Transaction.currency == "CRC", Transaction.amount),
else_=0,
)
),
0,
),
func.coalesce(
func.sum(
case(
(Transaction.currency == "USD", Transaction.amount),
else_=0,
)
),
0,
),
)
.where(
Transaction.transaction_type == "COMPRA",
Transaction.date >= start,
Transaction.date < end,
)
).first()
count = row[0] if row else 0
total_crc = float(row[1]) if row else 0.0
total_usd = float(row[2]) if row else 0.0
end_month = m + 1 if m < 12 else 1
label = f"{month_names[m]} - {month_names[end_month]}"
results.append(
MonthlyTrend(
year=y,
month=m,
label=label,
total_crc=total_crc,
total_usd=total_usd,
count=count,
)
)
# Previous month
if m == 1:
y, m = y - 1, 12
else:
m -= 1
return list(reversed(results))
@router.get("/daily-spending", response_model=list[DailySpending])
def daily_spending(
cycle_year: Optional[int] = None,
cycle_month: Optional[int] = None,
session: Session = Depends(get_session),
_user: str = Depends(get_current_user),
):
query = (
select(
func.date(Transaction.date).label("day"),
func.sum(Transaction.amount).label("total"),
func.count().label("count"),
)
.where(Transaction.transaction_type == "COMPRA")
.group_by(func.date(Transaction.date))
.order_by(func.date(Transaction.date))
)
if cycle_year and cycle_month:
start, end = get_cycle_range(cycle_year, cycle_month)
query = query.where(Transaction.date >= start, Transaction.date < end)
rows = session.exec(query).all()
return [
DailySpending(date=str(day), total=float(total), count=count)
for day, total, count in rows
]