import xml.etree.ElementTree as ET from datetime import datetime, timedelta import httpx from sqlmodel import Session, col, select from app.config import settings from app.models.models import ExchangeRate # BCCR indicators: 317 = buy, 318 = sell BCCR_URL = "https://gee.bccr.fi.cr/Indicadores/Suscripciones/WS/wsindicadoreseconomicos.asmx/ObtenerIndicadoresEconomicos" _cache: dict[str, tuple[ExchangeRate, datetime]] = {} CACHE_TTL = timedelta(hours=1) def _fetch_bccr_rate(indicator: int, date_str: str) -> float | None: """Fetch a single indicator from BCCR API.""" try: params = { "Indicador": str(indicator), "FechaInicio": date_str, "FechaFinal": date_str, "Nombre": settings.BCCR_API_EMAIL or "WealthySmart", "SubNiveles": "N", "CorreoElectronico": settings.BCCR_API_EMAIL or "no-reply@example.com", "Token": settings.BCCR_API_TOKEN or "", } resp = httpx.get(BCCR_URL, params=params, timeout=10) resp.raise_for_status() # Parse XML response root = ET.fromstring(resp.text) # The value is in INGC011_DES_DATOS > NUM_VALOR ns = {"": "http://ws.sdde.bccr.fi.cr"} for datos in root.iter(): if datos.tag.endswith("NUM_VALOR"): return float(datos.text.strip().replace(",", ".")) except Exception: pass return None def get_current_rate(session: Session) -> ExchangeRate | None: """Get current USD/CRC rate. Uses in-memory cache + DB fallback.""" # Check memory cache cached = _cache.get("current") if cached and datetime.utcnow() - cached[1] < CACHE_TTL: return cached[0] # Check DB for recent rate one_hour_ago = datetime.utcnow() - CACHE_TTL db_rate = session.exec( select(ExchangeRate) .where(ExchangeRate.fetched_at > one_hour_ago) .order_by(col(ExchangeRate.fetched_at).desc()) ).first() if db_rate: _cache["current"] = (db_rate, datetime.utcnow()) return db_rate # Fetch from BCCR today = datetime.now().strftime("%d/%m/%Y") buy = _fetch_bccr_rate(317, today) sell = _fetch_bccr_rate(318, today) if buy is None or sell is None: # Fallback: return most recent DB rate regardless of age fallback = session.exec( select(ExchangeRate).order_by(col(ExchangeRate.fetched_at).desc()) ).first() return fallback rate = ExchangeRate( date=datetime.utcnow(), buy_rate=buy, sell_rate=sell, ) session.add(rate) session.commit() session.refresh(rate) _cache["current"] = (rate, datetime.utcnow()) return rate def get_rate_history(session: Session, days: int = 30) -> list[ExchangeRate]: """Get historical exchange rates.""" cutoff = datetime.utcnow() - timedelta(days=days) return list( session.exec( select(ExchangeRate) .where(ExchangeRate.date > cutoff) .order_by(col(ExchangeRate.date).desc()) ).all() )