Convert all currencies to CRC and poll rates every 6h
All checks were successful
Deploy to VPS / deploy (push) Successful in 14s

Budget/transactions/salarios totals summed Transaction.amount directly,
so USD/EUR entries were treated as CRC and effectively disappeared from
the dashboard (the analytics fix in 9a80f2a only covered analytics).
Adds a shared get_converted_amount_expr() helper driven by the full
Currency enum — USD/EUR via ExchangeRate-API, BTC/XMR via CoinGecko —
and wires it into every func.sum(Transaction.amount) site.

Also starts a background task in the FastAPI lifespan that force-refreshes
every currency 4x/day, persisting USD to the DB and updating in-memory
caches for the rest. Failures are swallowed per-currency so a CoinGecko
outage cannot take out USD/EUR, and the last-known rate is always retained.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Carlos Escalante
2026-04-15 17:16:20 -06:00
parent 9a80f2a997
commit 94a8a894a6
6 changed files with 222 additions and 75 deletions

View File

@@ -1,12 +1,21 @@
import asyncio
import logging
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import httpx
from sqlalchemy import case
from sqlmodel import Session, col, select
from app.config import settings
from app.db import engine
from app.models.models import ExchangeRate
logger = logging.getLogger(__name__)
# Scheduled refresh interval — 4x/day
REFRESH_INTERVAL_SECONDS = 6 * 3600
# BCCR indicators: 317 = buy, 318 = sell
BCCR_URL = "https://gee.bccr.fi.cr/Indicadores/Suscripciones/WS/wsindicadoreseconomicos.asmx/ObtenerIndicadoresEconomicos"
@@ -23,9 +32,13 @@ _cache: dict[str, tuple[ExchangeRate, datetime]] = {}
_last_known: ExchangeRate | None = None # survives cache expiry — always holds the last successful rate
CACHE_TTL = timedelta(hours=1)
# EUR/CRC mid-market rate cache
_eur_crc_cache: dict[str, tuple[float, datetime]] = {}
_last_known_eur_crc: float | None = None
# Generic X/CRC mid-market rate cache (by currency code)
_xcrc_cache: dict[str, tuple[float, datetime]] = {}
_last_known_xcrc: dict[str, float] = {}
# CoinGecko ids for supported crypto codes
_COINGECKO_IDS = {"BTC": "bitcoin", "XMR": "monero"}
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
def _fetch_bccr_rate(indicator: int, date_str: str) -> float | None:
@@ -171,10 +184,10 @@ def get_current_rate(session: Session) -> ExchangeRate | None:
return None
def _fetch_eur_crc_mid() -> float | None:
"""Derive EUR/CRC mid-market rate from ExchangeRate-API (USD-based).
def _fetch_fiat_crc_mid(code: str) -> float | None:
"""Derive {code}/CRC mid-market rate from ExchangeRate-API (USD-based).
EUR/CRC = CRC_per_USD / EUR_per_USD
X/CRC = CRC_per_USD / X_per_USD
"""
try:
resp = httpx.get(EXCHANGERATE_API_URL, timeout=10)
@@ -182,32 +195,175 @@ def _fetch_eur_crc_mid() -> float | None:
data = resp.json()
if data.get("result") == "success":
crc = data["rates"].get("CRC")
eur = data["rates"].get("EUR")
if crc and eur:
return float(crc) / float(eur)
x = data["rates"].get(code)
if crc and x:
return float(crc) / float(x)
except Exception:
pass
return None
def get_eur_crc_rate() -> float | None:
"""Get current EUR→CRC mid-market rate (cached 1 hour)."""
global _last_known_eur_crc
def _fetch_crypto_crc(code: str) -> float | None:
"""Fetch {code}/CRC spot from CoinGecko."""
coin_id = _COINGECKO_IDS.get(code)
if not coin_id:
return None
try:
resp = httpx.get(
COINGECKO_URL,
params={"ids": coin_id, "vs_currencies": "crc"},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
price = data.get(coin_id, {}).get("crc")
if price:
return float(price)
except Exception:
pass
return None
cached = _eur_crc_cache.get("current")
def get_crc_rate(code: str) -> float | None:
"""Get current {code}→CRC rate (cached 1 hour). Fiat via ExchangeRate-API, crypto via CoinGecko."""
if code == "CRC":
return 1.0
cached = _xcrc_cache.get(code)
if cached and datetime.utcnow() - cached[1] < CACHE_TTL:
return cached[0]
rate = _fetch_eur_crc_mid()
if code in _COINGECKO_IDS:
rate = _fetch_crypto_crc(code)
else:
rate = _fetch_fiat_crc_mid(code)
if rate is not None:
_eur_crc_cache["current"] = (rate, datetime.utcnow())
_last_known_eur_crc = rate
_xcrc_cache[code] = (rate, datetime.utcnow())
_last_known_xcrc[code] = rate
return rate
if _last_known_eur_crc:
return _last_known_eur_crc
return _last_known_xcrc.get(code)
return None
def get_crc_multipliers(session: Session) -> dict[str, float]:
"""Return {currency_code: CRC_multiplier} for every supported currency."""
from app.models.models import Currency
multipliers: dict[str, float] = {"CRC": 1.0}
usd_rate = get_current_rate(session)
if usd_rate:
multipliers["USD"] = usd_rate.sell_rate
for code in (c.value for c in Currency):
if code in multipliers:
continue
rate = get_crc_rate(code)
if rate is not None:
multipliers[code] = rate
return multipliers
def get_converted_amount_expr(session: Session):
"""Return a SQLAlchemy expression converting Transaction.amount to CRC.
Builds a CASE that multiplies by the per-currency CRC rate; CRC passes through.
Missing rates fall back to 1.0 (treat as CRC) rather than 0.0 so a transient
API outage does not silently zero out foreign-currency totals.
"""
from app.models.models import Transaction
multipliers = get_crc_multipliers(session)
whens = [
(Transaction.currency == code, Transaction.amount * mult)
for code, mult in multipliers.items()
if code != "CRC"
]
if not whens:
return Transaction.amount
return case(*whens, else_=Transaction.amount)
def _refresh_usd_rate() -> bool:
"""Force-fetch USD/CRC from APIs and persist to DB. Returns True on success."""
fetched = _fetch_rate_from_apis()
if fetched is None:
return False
buy, sell = fetched
with Session(engine) as session:
rate = ExchangeRate(date=datetime.utcnow(), buy_rate=buy, sell_rate=sell)
session.add(rate)
session.commit()
session.refresh(rate)
_remember(rate)
return True
def _refresh_other_rate(code: str) -> bool:
"""Force-fetch {code}/CRC and update in-memory cache. Returns True on success."""
if code in _COINGECKO_IDS:
rate = _fetch_crypto_crc(code)
else:
rate = _fetch_fiat_crc_mid(code)
if rate is None:
return False
_xcrc_cache[code] = (rate, datetime.utcnow())
_last_known_xcrc[code] = rate
return True
def refresh_all_rates() -> dict[str, bool]:
"""Force-refresh every supported currency.
Each currency is refreshed independently — one failure does not affect others.
On success the DB (for USD) and in-memory caches are updated. On failure the
previous value is retained via `_last_known_*` / stale-DB fallback, so callers
always see the most recent working rate.
"""
from app.models.models import Currency
results: dict[str, bool] = {}
try:
results["USD"] = _refresh_usd_rate()
except Exception:
logger.exception("USD rate refresh failed")
results["USD"] = False
for currency in Currency:
code = currency.value
if code in ("CRC", "USD"):
continue
try:
results[code] = _refresh_other_rate(code)
except Exception:
logger.exception("%s rate refresh failed", code)
results[code] = False
return results
async def refresh_rates_periodically(
interval_seconds: int = REFRESH_INTERVAL_SECONDS,
) -> None:
"""Background loop that refreshes all currency rates every `interval_seconds`.
Never raises — failures are logged and the last-known rates are retained.
Runs one refresh immediately on startup, then sleeps on the fixed interval.
"""
while True:
try:
report = await asyncio.to_thread(refresh_all_rates)
ok = sorted(k for k, v in report.items() if v)
failed = sorted(k for k, v in report.items() if not v)
logger.info(
"Exchange rate refresh complete: ok=%s failed=%s", ok, failed
)
except Exception:
logger.exception("Exchange rate refresh loop crashed")
await asyncio.sleep(interval_seconds)
def get_rate_history(session: Session, days: int = 30) -> list[ExchangeRate]: