mirror of
https://github.com/escalante29/healthy-fit.git
synced 2026-03-21 10:48:46 +01:00
- Supplement tracking: CRUD endpoints, /today, /logs, Supplements page - Kettlebell workouts: session tracking, analytics endpoint, ActiveSession page - Calendar module: events CRUD, calendar components - Push notifications: VAPID keys, PushSubscription model, APScheduler reminders, service worker with push/notificationclick handlers, Profile notifications UI - PWA: vite-plugin-pwa, manifest, icons, service worker generation - Frontend: TypeScript types, API modules, ConfirmModal, toast notifications - Auth fixes: password hashing, nutrition endpoint auth - CLAUDE.md: project documentation and development guide Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
405 lines
14 KiB
Python
405 lines
14 KiB
Python
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlmodel import Session, select
|
|
|
|
from app.ai.kettlebell import kettlebell_module
|
|
from app.api import deps
|
|
from app.models.kettlebell import KettlebellSession, KettlebellSetLog
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class KettlebellGenerateRequest(BaseModel):
|
|
focus: str
|
|
duration_minutes: int
|
|
available_weights: list[float]
|
|
|
|
|
|
class LogSetRequest(BaseModel):
|
|
exercise_order: int
|
|
set_number: int
|
|
actual_reps: int
|
|
actual_weight_kg: float
|
|
actual_duration_seconds: int
|
|
perceived_effort: int
|
|
|
|
|
|
class CompleteSessionRequest(BaseModel):
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class ExerciseProgression(BaseModel):
|
|
date: str
|
|
max_weight: float
|
|
avg_rpe: float
|
|
total_volume: float
|
|
|
|
|
|
class PersonalRecord(BaseModel):
|
|
exercise_name: str
|
|
max_weight: float
|
|
date: str
|
|
|
|
|
|
class WeeklyVolume(BaseModel):
|
|
week: str
|
|
sessions: int
|
|
total_volume: float
|
|
|
|
|
|
class AnalyticsResponse(BaseModel):
|
|
weekly_sessions: List[WeeklyVolume]
|
|
exercise_progressions: Dict[str, List[ExerciseProgression]]
|
|
personal_records: List[PersonalRecord]
|
|
avg_rpe_trend: List[Dict]
|
|
|
|
|
|
@router.get("/analytics", response_model=AnalyticsResponse)
|
|
def get_analytics(
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Get aggregated analytics across all completed sessions."""
|
|
completed_sessions = session.exec(
|
|
select(KettlebellSession)
|
|
.where(KettlebellSession.user_id == current_user.id)
|
|
.where(KettlebellSession.status == "completed")
|
|
.order_by(KettlebellSession.completed_at)
|
|
).all()
|
|
|
|
if not completed_sessions:
|
|
return AnalyticsResponse(
|
|
weekly_sessions=[],
|
|
exercise_progressions={},
|
|
personal_records=[],
|
|
avg_rpe_trend=[],
|
|
)
|
|
|
|
session_ids = [s.id for s in completed_sessions]
|
|
all_sets = session.exec(select(KettlebellSetLog).where(KettlebellSetLog.session_id.in_(session_ids))).all()
|
|
|
|
# Map session_id -> session
|
|
session_map = {s.id: s for s in completed_sessions}
|
|
# Map session_id -> exercise_order -> exercise_name
|
|
exercise_name_map: Dict[int, Dict[int, str]] = {}
|
|
for s in completed_sessions:
|
|
exercises = s.exercises.get("exercises", []) if s.exercises else []
|
|
exercise_name_map[s.id] = {ex["order"]: ex["name"] for ex in exercises if "order" in ex and "name" in ex}
|
|
|
|
# Weekly sessions + volume
|
|
weeks: Dict[str, Dict] = defaultdict(lambda: {"sessions": set(), "volume": 0.0})
|
|
for set_log in all_sets:
|
|
s = session_map.get(set_log.session_id)
|
|
if not s or not s.completed_at:
|
|
continue
|
|
week_start = s.completed_at - timedelta(days=s.completed_at.weekday())
|
|
week_key = week_start.strftime("%Y-%m-%d")
|
|
weeks[week_key]["sessions"].add(set_log.session_id)
|
|
volume = set_log.actual_reps * set_log.actual_weight_kg
|
|
weeks[week_key]["volume"] += volume
|
|
|
|
weekly_sessions = [
|
|
WeeklyVolume(week=k, sessions=len(v["sessions"]), total_volume=round(v["volume"], 1))
|
|
for k, v in sorted(weeks.items())
|
|
]
|
|
|
|
# Exercise progressions + PRs
|
|
exercise_data: Dict[str, List[Dict]] = defaultdict(list)
|
|
for set_log in all_sets:
|
|
s = session_map.get(set_log.session_id)
|
|
if not s or not s.completed_at:
|
|
continue
|
|
ex_name = exercise_name_map.get(set_log.session_id, {}).get(set_log.exercise_order)
|
|
if not ex_name:
|
|
continue
|
|
exercise_data[ex_name].append(
|
|
{
|
|
"date": s.completed_at.strftime("%Y-%m-%d"),
|
|
"weight": set_log.actual_weight_kg,
|
|
"rpe": set_log.perceived_effort,
|
|
"volume": set_log.actual_reps * set_log.actual_weight_kg,
|
|
}
|
|
)
|
|
|
|
exercise_progressions: Dict[str, List[ExerciseProgression]] = {}
|
|
personal_records: List[PersonalRecord] = []
|
|
|
|
for ex_name, sets in exercise_data.items():
|
|
by_date: Dict[str, Dict] = defaultdict(lambda: {"weights": [], "rpes": [], "volume": 0.0})
|
|
for s in sets:
|
|
by_date[s["date"]]["weights"].append(s["weight"])
|
|
by_date[s["date"]]["rpes"].append(s["rpe"])
|
|
by_date[s["date"]]["volume"] += s["volume"]
|
|
|
|
progressions = [
|
|
ExerciseProgression(
|
|
date=d,
|
|
max_weight=max(v["weights"]),
|
|
avg_rpe=round(sum(v["rpes"]) / len(v["rpes"]), 1),
|
|
total_volume=round(v["volume"], 1),
|
|
)
|
|
for d, v in sorted(by_date.items())
|
|
]
|
|
exercise_progressions[ex_name] = progressions
|
|
|
|
all_weights = [s["weight"] for s in sets]
|
|
max_w = max(all_weights)
|
|
pr_date = next(s["date"] for s in sorted(sets, key=lambda x: x["date"]) if s["weight"] == max_w)
|
|
personal_records.append(PersonalRecord(exercise_name=ex_name, max_weight=max_w, date=pr_date))
|
|
|
|
personal_records.sort(key=lambda x: x.max_weight, reverse=True)
|
|
|
|
# Avg RPE trend by session
|
|
avg_rpe_trend = []
|
|
sets_by_session: Dict[int, List] = defaultdict(list)
|
|
for set_log in all_sets:
|
|
sets_by_session[set_log.session_id].append(set_log.perceived_effort)
|
|
|
|
for s in completed_sessions:
|
|
rpes = sets_by_session.get(s.id, [])
|
|
if rpes and s.completed_at:
|
|
avg_rpe_trend.append(
|
|
{
|
|
"date": s.completed_at.strftime("%Y-%m-%d"),
|
|
"avg_rpe": round(sum(rpes) / len(rpes), 1),
|
|
"session_title": s.title,
|
|
}
|
|
)
|
|
|
|
return AnalyticsResponse(
|
|
weekly_sessions=weekly_sessions,
|
|
exercise_progressions=exercise_progressions,
|
|
personal_records=personal_records,
|
|
avg_rpe_trend=avg_rpe_trend,
|
|
)
|
|
|
|
|
|
@router.post("/generate", response_model=KettlebellSession)
|
|
def generate_session(
|
|
*,
|
|
current_user: deps.CurrentUser,
|
|
request: KettlebellGenerateRequest,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Generate a new kettlebell session using AI."""
|
|
try:
|
|
user_profile = (
|
|
f"Age: {current_user.age or 'unknown'}, "
|
|
f"Gender: {current_user.gender or 'unknown'}, "
|
|
f"Weight: {current_user.weight or 'unknown'} kg, "
|
|
f"Height: {current_user.height or 'unknown'} cm"
|
|
)
|
|
available_weights_kg = ", ".join(str(w) for w in sorted(request.available_weights))
|
|
|
|
generated = kettlebell_module(
|
|
user_profile=user_profile,
|
|
available_weights_kg=available_weights_kg,
|
|
focus=request.focus,
|
|
duration_minutes=request.duration_minutes,
|
|
)
|
|
|
|
s = generated.session
|
|
kb_session = KettlebellSession(
|
|
user_id=current_user.id,
|
|
title=s.title,
|
|
focus=s.focus,
|
|
exercises={"exercises": [ex.model_dump() for ex in s.exercises]},
|
|
total_duration_min=s.total_duration_min,
|
|
difficulty=s.difficulty,
|
|
notes=s.notes,
|
|
status="generated",
|
|
)
|
|
session.add(kb_session)
|
|
session.commit()
|
|
session.refresh(kb_session)
|
|
return kb_session
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/", response_model=List[KettlebellSession])
|
|
def list_sessions(
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Get all kettlebell sessions for the current user."""
|
|
statement = (
|
|
select(KettlebellSession)
|
|
.where(KettlebellSession.user_id == current_user.id)
|
|
.order_by(KettlebellSession.created_at.desc())
|
|
)
|
|
return session.exec(statement).all()
|
|
|
|
|
|
@router.get("/{session_id}", response_model=KettlebellSession)
|
|
def get_session(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Get a single kettlebell session."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
return kb_session
|
|
|
|
|
|
@router.patch("/{session_id}/start", response_model=KettlebellSession)
|
|
def start_session(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Mark a session as in progress."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
kb_session.status = "in_progress"
|
|
kb_session.started_at = datetime.utcnow()
|
|
session.add(kb_session)
|
|
session.commit()
|
|
session.refresh(kb_session)
|
|
return kb_session
|
|
|
|
|
|
@router.post("/{session_id}/sets", response_model=KettlebellSetLog)
|
|
def log_set(
|
|
session_id: int,
|
|
request: LogSetRequest,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Log a completed set."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
set_log = KettlebellSetLog(
|
|
session_id=session_id,
|
|
exercise_order=request.exercise_order,
|
|
set_number=request.set_number,
|
|
actual_reps=request.actual_reps,
|
|
actual_weight_kg=request.actual_weight_kg,
|
|
actual_duration_seconds=request.actual_duration_seconds,
|
|
perceived_effort=request.perceived_effort,
|
|
)
|
|
session.add(set_log)
|
|
session.commit()
|
|
session.refresh(set_log)
|
|
return set_log
|
|
|
|
|
|
@router.get("/{session_id}/sets", response_model=List[KettlebellSetLog])
|
|
def get_sets(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Get all logged sets for a session."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
statement = select(KettlebellSetLog).where(KettlebellSetLog.session_id == session_id)
|
|
return session.exec(statement).all()
|
|
|
|
|
|
@router.post("/{session_id}/retry", response_model=KettlebellSession)
|
|
def retry_session(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Clone an abandoned session as a fresh generated session."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
new_session = KettlebellSession(
|
|
user_id=current_user.id,
|
|
title=kb_session.title,
|
|
focus=kb_session.focus,
|
|
exercises=kb_session.exercises,
|
|
total_duration_min=kb_session.total_duration_min,
|
|
difficulty=kb_session.difficulty,
|
|
notes="",
|
|
status="generated",
|
|
)
|
|
session.add(new_session)
|
|
session.commit()
|
|
session.refresh(new_session)
|
|
return new_session
|
|
|
|
|
|
@router.patch("/{session_id}/abandon", response_model=KettlebellSession)
|
|
def abandon_session(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Mark a session as abandoned."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
kb_session.status = "abandoned"
|
|
session.add(kb_session)
|
|
session.commit()
|
|
session.refresh(kb_session)
|
|
return kb_session
|
|
|
|
|
|
@router.delete("/{session_id}", status_code=204)
|
|
def delete_session(
|
|
session_id: int,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> None:
|
|
"""Delete a session and all its set logs."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
statement = select(KettlebellSetLog).where(KettlebellSetLog.session_id == session_id)
|
|
for log in session.exec(statement).all():
|
|
session.delete(log)
|
|
session.flush()
|
|
session.delete(kb_session)
|
|
session.commit()
|
|
|
|
|
|
@router.patch("/{session_id}/complete", response_model=KettlebellSession)
|
|
def complete_session(
|
|
session_id: int,
|
|
request: CompleteSessionRequest,
|
|
current_user: deps.CurrentUser,
|
|
session: Session = Depends(deps.get_session),
|
|
) -> Any:
|
|
"""Mark a session as completed."""
|
|
kb_session = session.get(KettlebellSession, session_id)
|
|
if not kb_session:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if kb_session.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not authorized")
|
|
kb_session.status = "completed"
|
|
kb_session.completed_at = datetime.utcnow()
|
|
if request.notes is not None:
|
|
kb_session.notes = request.notes
|
|
session.add(kb_session)
|
|
session.commit()
|
|
session.refresh(kb_session)
|
|
return kb_session
|