import json from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from pywebpush import WebPushException, webpush from sqlmodel import Session, select from app.api import deps from app.config import settings from app.models.push_subscription import PushSubscription router = APIRouter() class SubscriptionKeys(BaseModel): p256dh: str auth: str class SubscribeRequest(BaseModel): endpoint: str keys: SubscriptionKeys reminder_hour: int = 9 reminder_minute: int = 0 timezone: str = "UTC" class UnsubscribeRequest(BaseModel): endpoint: str def send_push( subscriptions: list[PushSubscription], title: str, body: str, url: str = "/supplements", session: Session | None = None, ) -> None: payload = json.dumps({"title": title, "body": body, "url": url}) for sub in subscriptions: try: webpush( subscription_info={ "endpoint": sub.endpoint, "keys": {"p256dh": sub.p256dh, "auth": sub.auth}, }, data=payload, vapid_private_key=settings.VAPID_PRIVATE_KEY, vapid_claims={"sub": settings.VAPID_MAILTO}, ) except WebPushException as ex: if ex.response and ex.response.status_code in (404, 410): sub.is_active = False if session: session.add(sub) @router.get("/vapid-public-key") def get_vapid_public_key() -> dict: return {"public_key": settings.VAPID_PUBLIC_KEY} @router.post("/subscribe", status_code=201) def subscribe( data: SubscribeRequest, current_user: deps.CurrentUser, session: Session = Depends(deps.get_session), ) -> dict: existing = session.exec(select(PushSubscription).where(PushSubscription.endpoint == data.endpoint)).first() if existing: existing.user_id = current_user.id existing.p256dh = data.keys.p256dh existing.auth = data.keys.auth existing.reminder_hour = data.reminder_hour existing.reminder_minute = data.reminder_minute existing.timezone = data.timezone existing.is_active = True session.add(existing) else: sub = PushSubscription( user_id=current_user.id, endpoint=data.endpoint, p256dh=data.keys.p256dh, auth=data.keys.auth, reminder_hour=data.reminder_hour, reminder_minute=data.reminder_minute, timezone=data.timezone, ) session.add(sub) session.commit() return {"status": "subscribed"} @router.delete("/unsubscribe") def unsubscribe( data: UnsubscribeRequest, current_user: deps.CurrentUser, session: Session = Depends(deps.get_session), ) -> dict: sub = session.exec( select(PushSubscription) .where(PushSubscription.endpoint == data.endpoint) .where(PushSubscription.user_id == current_user.id) ).first() if sub: sub.is_active = False session.add(sub) session.commit() return {"status": "unsubscribed"} @router.post("/test") def send_test_notification( current_user: deps.CurrentUser, session: Session = Depends(deps.get_session), ) -> dict: subs = session.exec( select(PushSubscription) .where(PushSubscription.user_id == current_user.id) .where(PushSubscription.is_active == True) # noqa: E712 ).all() if not subs: raise HTTPException(status_code=404, detail="No active subscriptions") send_push(list(subs), title="Test Notification", body="Push notifications are working!", session=session) session.commit() return {"sent": len(subs)}