Files
WealthySmart/backend/app/services/exchange_rate.py
Carlos Escalante 94a8a894a6
All checks were successful
Deploy to VPS / deploy (push) Successful in 14s
Convert all currencies to CRC and poll rates every 6h
Budget/transactions/salarios totals summed Transaction.amount directly,
so USD/EUR entries were treated as CRC and effectively disappeared from
the dashboard (the analytics fix in 9a80f2a only covered analytics).
Adds a shared get_converted_amount_expr() helper driven by the full
Currency enum — USD/EUR via ExchangeRate-API, BTC/XMR via CoinGecko —
and wires it into every func.sum(Transaction.amount) site.

Also starts a background task in the FastAPI lifespan that force-refreshes
every currency 4x/day, persisting USD to the DB and updating in-memory
caches for the rest. Failures are swallowed per-currency so a CoinGecko
outage cannot take out USD/EUR, and the last-known rate is always retained.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 17:16:20 -06:00

379 lines
12 KiB
Python

import asyncio
import logging
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
import httpx
from sqlalchemy import case
from sqlmodel import Session, col, select
from app.config import settings
from app.db import engine
from app.models.models import ExchangeRate
logger = logging.getLogger(__name__)
# Scheduled refresh interval — 4x/day
REFRESH_INTERVAL_SECONDS = 6 * 3600
# BCCR indicators: 317 = buy, 318 = sell
BCCR_URL = "https://gee.bccr.fi.cr/Indicadores/Suscripciones/WS/wsindicadoreseconomicos.asmx/ObtenerIndicadoresEconomicos"
# Fallback APIs (no API key required, all support CRC)
EXCHANGERATE_API_URL = "https://open.er-api.com/v6/latest/USD"
CURRENCY_API_URL = "https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@latest/v1/currencies/usd.json"
CURRENCY_API_FALLBACK_URL = "https://latest.currency-api.pages.dev/v1/currencies/usd.json"
FLOATRATES_URL = "https://www.floatrates.com/daily/usd.json"
# Typical buy/sell spread for USD/CRC (~0.5% each side of mid-market)
_SPREAD = 0.005
_cache: dict[str, tuple[ExchangeRate, datetime]] = {}
_last_known: ExchangeRate | None = None # survives cache expiry — always holds the last successful rate
CACHE_TTL = timedelta(hours=1)
# Generic X/CRC mid-market rate cache (by currency code)
_xcrc_cache: dict[str, tuple[float, datetime]] = {}
_last_known_xcrc: dict[str, float] = {}
# CoinGecko ids for supported crypto codes
_COINGECKO_IDS = {"BTC": "bitcoin", "XMR": "monero"}
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
def _fetch_bccr_rate(indicator: int, date_str: str) -> float | None:
"""Fetch a single indicator from BCCR API."""
try:
params = {
"Indicador": str(indicator),
"FechaInicio": date_str,
"FechaFinal": date_str,
"Nombre": settings.BCCR_API_EMAIL or "WealthySmart",
"SubNiveles": "N",
"CorreoElectronico": settings.BCCR_API_EMAIL or "no-reply@example.com",
"Token": settings.BCCR_API_TOKEN or "",
}
resp = httpx.get(BCCR_URL, params=params, timeout=10)
resp.raise_for_status()
root = ET.fromstring(resp.text)
for datos in root.iter():
if datos.tag.endswith("NUM_VALOR"):
return float(datos.text.strip().replace(",", "."))
except Exception:
pass
return None
def _fetch_bccr() -> tuple[float, float] | None:
"""Try BCCR official API. Returns (buy, sell) or None."""
today = datetime.now().strftime("%d/%m/%Y")
buy = _fetch_bccr_rate(317, today)
sell = _fetch_bccr_rate(318, today)
if buy is not None and sell is not None:
return (buy, sell)
return None
def _mid_to_buy_sell(mid: float) -> tuple[float, float]:
"""Convert a mid-market rate to approximate buy/sell with a spread."""
return (mid * (1 - _SPREAD), mid * (1 + _SPREAD))
def _fetch_exchangerate_api() -> tuple[float, float] | None:
"""Try ExchangeRate-API (open.er-api.com). No key required."""
try:
resp = httpx.get(EXCHANGERATE_API_URL, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("result") == "success":
crc = data["rates"].get("CRC")
if crc:
return _mid_to_buy_sell(float(crc))
except Exception:
pass
return None
def _fetch_currency_api() -> tuple[float, float] | None:
"""Try fawazahmed0/currency-api (CDN-hosted). No key required."""
for url in (CURRENCY_API_URL, CURRENCY_API_FALLBACK_URL):
try:
resp = httpx.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
crc = data.get("usd", {}).get("crc")
if crc:
return _mid_to_buy_sell(float(crc))
except Exception:
continue
return None
def _fetch_floatrates() -> tuple[float, float] | None:
"""Try FloatRates. No key required."""
try:
resp = httpx.get(FLOATRATES_URL, timeout=10)
resp.raise_for_status()
data = resp.json()
crc_data = data.get("crc")
if crc_data and "rate" in crc_data:
return _mid_to_buy_sell(float(crc_data["rate"]))
except Exception:
pass
return None
def _fetch_rate_from_apis() -> tuple[float, float] | None:
"""Try all sources in order: BCCR → ExchangeRate-API → currency-api → FloatRates."""
for fetcher in (_fetch_bccr, _fetch_exchangerate_api, _fetch_currency_api, _fetch_floatrates):
result = fetcher()
if result is not None:
return result
return None
def _remember(rate: ExchangeRate) -> ExchangeRate:
"""Store rate in both TTL cache and permanent last-known holder."""
global _last_known
_cache["current"] = (rate, datetime.utcnow())
_last_known = rate
return rate
def get_current_rate(session: Session) -> ExchangeRate | None:
"""Get current USD/CRC rate. Never returns None once a rate has been fetched."""
global _last_known
# 1. Fresh memory cache (< 1 hour)
cached = _cache.get("current")
if cached and datetime.utcnow() - cached[1] < CACHE_TTL:
return cached[0]
# 2. Fresh DB rate (< 1 hour)
one_hour_ago = datetime.utcnow() - CACHE_TTL
db_rate = session.exec(
select(ExchangeRate)
.where(ExchangeRate.fetched_at > one_hour_ago)
.order_by(col(ExchangeRate.fetched_at).desc())
).first()
if db_rate:
return _remember(db_rate)
# 3. Try all API sources
result = _fetch_rate_from_apis()
if result is not None:
buy, sell = result
rate = ExchangeRate(date=datetime.utcnow(), buy_rate=buy, sell_rate=sell)
session.add(rate)
session.commit()
session.refresh(rate)
return _remember(rate)
# 4. Stale DB rate (any age)
fallback = session.exec(
select(ExchangeRate).order_by(col(ExchangeRate.fetched_at).desc())
).first()
if fallback:
return _remember(fallback)
# 5. Last known in-memory rate (survives even if DB is empty)
if _last_known:
return _last_known
return None
def _fetch_fiat_crc_mid(code: str) -> float | None:
"""Derive {code}/CRC mid-market rate from ExchangeRate-API (USD-based).
X/CRC = CRC_per_USD / X_per_USD
"""
try:
resp = httpx.get(EXCHANGERATE_API_URL, timeout=10)
resp.raise_for_status()
data = resp.json()
if data.get("result") == "success":
crc = data["rates"].get("CRC")
x = data["rates"].get(code)
if crc and x:
return float(crc) / float(x)
except Exception:
pass
return None
def _fetch_crypto_crc(code: str) -> float | None:
"""Fetch {code}/CRC spot from CoinGecko."""
coin_id = _COINGECKO_IDS.get(code)
if not coin_id:
return None
try:
resp = httpx.get(
COINGECKO_URL,
params={"ids": coin_id, "vs_currencies": "crc"},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
price = data.get(coin_id, {}).get("crc")
if price:
return float(price)
except Exception:
pass
return None
def get_crc_rate(code: str) -> float | None:
"""Get current {code}→CRC rate (cached 1 hour). Fiat via ExchangeRate-API, crypto via CoinGecko."""
if code == "CRC":
return 1.0
cached = _xcrc_cache.get(code)
if cached and datetime.utcnow() - cached[1] < CACHE_TTL:
return cached[0]
if code in _COINGECKO_IDS:
rate = _fetch_crypto_crc(code)
else:
rate = _fetch_fiat_crc_mid(code)
if rate is not None:
_xcrc_cache[code] = (rate, datetime.utcnow())
_last_known_xcrc[code] = rate
return rate
return _last_known_xcrc.get(code)
def get_crc_multipliers(session: Session) -> dict[str, float]:
"""Return {currency_code: CRC_multiplier} for every supported currency."""
from app.models.models import Currency
multipliers: dict[str, float] = {"CRC": 1.0}
usd_rate = get_current_rate(session)
if usd_rate:
multipliers["USD"] = usd_rate.sell_rate
for code in (c.value for c in Currency):
if code in multipliers:
continue
rate = get_crc_rate(code)
if rate is not None:
multipliers[code] = rate
return multipliers
def get_converted_amount_expr(session: Session):
"""Return a SQLAlchemy expression converting Transaction.amount to CRC.
Builds a CASE that multiplies by the per-currency CRC rate; CRC passes through.
Missing rates fall back to 1.0 (treat as CRC) rather than 0.0 so a transient
API outage does not silently zero out foreign-currency totals.
"""
from app.models.models import Transaction
multipliers = get_crc_multipliers(session)
whens = [
(Transaction.currency == code, Transaction.amount * mult)
for code, mult in multipliers.items()
if code != "CRC"
]
if not whens:
return Transaction.amount
return case(*whens, else_=Transaction.amount)
def _refresh_usd_rate() -> bool:
"""Force-fetch USD/CRC from APIs and persist to DB. Returns True on success."""
fetched = _fetch_rate_from_apis()
if fetched is None:
return False
buy, sell = fetched
with Session(engine) as session:
rate = ExchangeRate(date=datetime.utcnow(), buy_rate=buy, sell_rate=sell)
session.add(rate)
session.commit()
session.refresh(rate)
_remember(rate)
return True
def _refresh_other_rate(code: str) -> bool:
"""Force-fetch {code}/CRC and update in-memory cache. Returns True on success."""
if code in _COINGECKO_IDS:
rate = _fetch_crypto_crc(code)
else:
rate = _fetch_fiat_crc_mid(code)
if rate is None:
return False
_xcrc_cache[code] = (rate, datetime.utcnow())
_last_known_xcrc[code] = rate
return True
def refresh_all_rates() -> dict[str, bool]:
"""Force-refresh every supported currency.
Each currency is refreshed independently — one failure does not affect others.
On success the DB (for USD) and in-memory caches are updated. On failure the
previous value is retained via `_last_known_*` / stale-DB fallback, so callers
always see the most recent working rate.
"""
from app.models.models import Currency
results: dict[str, bool] = {}
try:
results["USD"] = _refresh_usd_rate()
except Exception:
logger.exception("USD rate refresh failed")
results["USD"] = False
for currency in Currency:
code = currency.value
if code in ("CRC", "USD"):
continue
try:
results[code] = _refresh_other_rate(code)
except Exception:
logger.exception("%s rate refresh failed", code)
results[code] = False
return results
async def refresh_rates_periodically(
interval_seconds: int = REFRESH_INTERVAL_SECONDS,
) -> None:
"""Background loop that refreshes all currency rates every `interval_seconds`.
Never raises — failures are logged and the last-known rates are retained.
Runs one refresh immediately on startup, then sleeps on the fixed interval.
"""
while True:
try:
report = await asyncio.to_thread(refresh_all_rates)
ok = sorted(k for k, v in report.items() if v)
failed = sorted(k for k, v in report.items() if not v)
logger.info(
"Exchange rate refresh complete: ok=%s failed=%s", ok, failed
)
except Exception:
logger.exception("Exchange rate refresh loop crashed")
await asyncio.sleep(interval_seconds)
def get_rate_history(session: Session, days: int = 30) -> list[ExchangeRate]:
"""Get historical exchange rates."""
cutoff = datetime.utcnow() - timedelta(days=days)
return list(
session.exec(
select(ExchangeRate)
.where(ExchangeRate.date > cutoff)
.order_by(col(ExchangeRate.date).desc())
).all()
)