mirror of
https://github.com/escalante29/WealthySmart.git
synced 2026-05-19 11:28:49 +02:00
All checks were successful
Deploy to VPS / deploy (push) Successful in 14s
Budget/transactions/salarios totals summed Transaction.amount directly,
so USD/EUR entries were treated as CRC and effectively disappeared from
the dashboard (the analytics fix in 9a80f2a only covered analytics).
Adds a shared get_converted_amount_expr() helper driven by the full
Currency enum — USD/EUR via ExchangeRate-API, BTC/XMR via CoinGecko —
and wires it into every func.sum(Transaction.amount) site.
Also starts a background task in the FastAPI lifespan that force-refreshes
every currency 4x/day, persisting USD to the DB and updating in-memory
caches for the rest. Failures are swallowed per-currency so a CoinGecko
outage cannot take out USD/EUR, and the last-known rate is always retained.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
379 lines
12 KiB
Python
379 lines
12 KiB
Python
import asyncio
|
|
import logging
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
|
|
import httpx
|
|
from sqlalchemy import case
|
|
from sqlmodel import Session, col, select
|
|
|
|
from app.config import settings
|
|
from app.db import engine
|
|
from app.models.models import ExchangeRate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Scheduled refresh interval — 4x/day
|
|
REFRESH_INTERVAL_SECONDS = 6 * 3600
|
|
|
|
# BCCR indicators: 317 = buy, 318 = sell
|
|
BCCR_URL = "https://gee.bccr.fi.cr/Indicadores/Suscripciones/WS/wsindicadoreseconomicos.asmx/ObtenerIndicadoresEconomicos"
|
|
|
|
# Fallback APIs (no API key required, all support CRC)
|
|
EXCHANGERATE_API_URL = "https://open.er-api.com/v6/latest/USD"
|
|
CURRENCY_API_URL = "https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@latest/v1/currencies/usd.json"
|
|
CURRENCY_API_FALLBACK_URL = "https://latest.currency-api.pages.dev/v1/currencies/usd.json"
|
|
FLOATRATES_URL = "https://www.floatrates.com/daily/usd.json"
|
|
|
|
# Typical buy/sell spread for USD/CRC (~0.5% each side of mid-market)
|
|
_SPREAD = 0.005
|
|
|
|
_cache: dict[str, tuple[ExchangeRate, datetime]] = {}
|
|
_last_known: ExchangeRate | None = None # survives cache expiry — always holds the last successful rate
|
|
CACHE_TTL = timedelta(hours=1)
|
|
|
|
# Generic X/CRC mid-market rate cache (by currency code)
|
|
_xcrc_cache: dict[str, tuple[float, datetime]] = {}
|
|
_last_known_xcrc: dict[str, float] = {}
|
|
|
|
# CoinGecko ids for supported crypto codes
|
|
_COINGECKO_IDS = {"BTC": "bitcoin", "XMR": "monero"}
|
|
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
|
|
|
|
|
|
def _fetch_bccr_rate(indicator: int, date_str: str) -> float | None:
|
|
"""Fetch a single indicator from BCCR API."""
|
|
try:
|
|
params = {
|
|
"Indicador": str(indicator),
|
|
"FechaInicio": date_str,
|
|
"FechaFinal": date_str,
|
|
"Nombre": settings.BCCR_API_EMAIL or "WealthySmart",
|
|
"SubNiveles": "N",
|
|
"CorreoElectronico": settings.BCCR_API_EMAIL or "no-reply@example.com",
|
|
"Token": settings.BCCR_API_TOKEN or "",
|
|
}
|
|
resp = httpx.get(BCCR_URL, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
|
|
root = ET.fromstring(resp.text)
|
|
for datos in root.iter():
|
|
if datos.tag.endswith("NUM_VALOR"):
|
|
return float(datos.text.strip().replace(",", "."))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _fetch_bccr() -> tuple[float, float] | None:
|
|
"""Try BCCR official API. Returns (buy, sell) or None."""
|
|
today = datetime.now().strftime("%d/%m/%Y")
|
|
buy = _fetch_bccr_rate(317, today)
|
|
sell = _fetch_bccr_rate(318, today)
|
|
if buy is not None and sell is not None:
|
|
return (buy, sell)
|
|
return None
|
|
|
|
|
|
def _mid_to_buy_sell(mid: float) -> tuple[float, float]:
|
|
"""Convert a mid-market rate to approximate buy/sell with a spread."""
|
|
return (mid * (1 - _SPREAD), mid * (1 + _SPREAD))
|
|
|
|
|
|
def _fetch_exchangerate_api() -> tuple[float, float] | None:
|
|
"""Try ExchangeRate-API (open.er-api.com). No key required."""
|
|
try:
|
|
resp = httpx.get(EXCHANGERATE_API_URL, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("result") == "success":
|
|
crc = data["rates"].get("CRC")
|
|
if crc:
|
|
return _mid_to_buy_sell(float(crc))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _fetch_currency_api() -> tuple[float, float] | None:
|
|
"""Try fawazahmed0/currency-api (CDN-hosted). No key required."""
|
|
for url in (CURRENCY_API_URL, CURRENCY_API_FALLBACK_URL):
|
|
try:
|
|
resp = httpx.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
crc = data.get("usd", {}).get("crc")
|
|
if crc:
|
|
return _mid_to_buy_sell(float(crc))
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _fetch_floatrates() -> tuple[float, float] | None:
|
|
"""Try FloatRates. No key required."""
|
|
try:
|
|
resp = httpx.get(FLOATRATES_URL, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
crc_data = data.get("crc")
|
|
if crc_data and "rate" in crc_data:
|
|
return _mid_to_buy_sell(float(crc_data["rate"]))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _fetch_rate_from_apis() -> tuple[float, float] | None:
|
|
"""Try all sources in order: BCCR → ExchangeRate-API → currency-api → FloatRates."""
|
|
for fetcher in (_fetch_bccr, _fetch_exchangerate_api, _fetch_currency_api, _fetch_floatrates):
|
|
result = fetcher()
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
|
|
def _remember(rate: ExchangeRate) -> ExchangeRate:
|
|
"""Store rate in both TTL cache and permanent last-known holder."""
|
|
global _last_known
|
|
_cache["current"] = (rate, datetime.utcnow())
|
|
_last_known = rate
|
|
return rate
|
|
|
|
|
|
def get_current_rate(session: Session) -> ExchangeRate | None:
|
|
"""Get current USD/CRC rate. Never returns None once a rate has been fetched."""
|
|
global _last_known
|
|
|
|
# 1. Fresh memory cache (< 1 hour)
|
|
cached = _cache.get("current")
|
|
if cached and datetime.utcnow() - cached[1] < CACHE_TTL:
|
|
return cached[0]
|
|
|
|
# 2. Fresh DB rate (< 1 hour)
|
|
one_hour_ago = datetime.utcnow() - CACHE_TTL
|
|
db_rate = session.exec(
|
|
select(ExchangeRate)
|
|
.where(ExchangeRate.fetched_at > one_hour_ago)
|
|
.order_by(col(ExchangeRate.fetched_at).desc())
|
|
).first()
|
|
if db_rate:
|
|
return _remember(db_rate)
|
|
|
|
# 3. Try all API sources
|
|
result = _fetch_rate_from_apis()
|
|
if result is not None:
|
|
buy, sell = result
|
|
rate = ExchangeRate(date=datetime.utcnow(), buy_rate=buy, sell_rate=sell)
|
|
session.add(rate)
|
|
session.commit()
|
|
session.refresh(rate)
|
|
return _remember(rate)
|
|
|
|
# 4. Stale DB rate (any age)
|
|
fallback = session.exec(
|
|
select(ExchangeRate).order_by(col(ExchangeRate.fetched_at).desc())
|
|
).first()
|
|
if fallback:
|
|
return _remember(fallback)
|
|
|
|
# 5. Last known in-memory rate (survives even if DB is empty)
|
|
if _last_known:
|
|
return _last_known
|
|
|
|
return None
|
|
|
|
|
|
def _fetch_fiat_crc_mid(code: str) -> float | None:
|
|
"""Derive {code}/CRC mid-market rate from ExchangeRate-API (USD-based).
|
|
|
|
X/CRC = CRC_per_USD / X_per_USD
|
|
"""
|
|
try:
|
|
resp = httpx.get(EXCHANGERATE_API_URL, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if data.get("result") == "success":
|
|
crc = data["rates"].get("CRC")
|
|
x = data["rates"].get(code)
|
|
if crc and x:
|
|
return float(crc) / float(x)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _fetch_crypto_crc(code: str) -> float | None:
|
|
"""Fetch {code}/CRC spot from CoinGecko."""
|
|
coin_id = _COINGECKO_IDS.get(code)
|
|
if not coin_id:
|
|
return None
|
|
try:
|
|
resp = httpx.get(
|
|
COINGECKO_URL,
|
|
params={"ids": coin_id, "vs_currencies": "crc"},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
price = data.get(coin_id, {}).get("crc")
|
|
if price:
|
|
return float(price)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_crc_rate(code: str) -> float | None:
|
|
"""Get current {code}→CRC rate (cached 1 hour). Fiat via ExchangeRate-API, crypto via CoinGecko."""
|
|
if code == "CRC":
|
|
return 1.0
|
|
|
|
cached = _xcrc_cache.get(code)
|
|
if cached and datetime.utcnow() - cached[1] < CACHE_TTL:
|
|
return cached[0]
|
|
|
|
if code in _COINGECKO_IDS:
|
|
rate = _fetch_crypto_crc(code)
|
|
else:
|
|
rate = _fetch_fiat_crc_mid(code)
|
|
|
|
if rate is not None:
|
|
_xcrc_cache[code] = (rate, datetime.utcnow())
|
|
_last_known_xcrc[code] = rate
|
|
return rate
|
|
|
|
return _last_known_xcrc.get(code)
|
|
|
|
|
|
def get_crc_multipliers(session: Session) -> dict[str, float]:
|
|
"""Return {currency_code: CRC_multiplier} for every supported currency."""
|
|
from app.models.models import Currency
|
|
|
|
multipliers: dict[str, float] = {"CRC": 1.0}
|
|
|
|
usd_rate = get_current_rate(session)
|
|
if usd_rate:
|
|
multipliers["USD"] = usd_rate.sell_rate
|
|
|
|
for code in (c.value for c in Currency):
|
|
if code in multipliers:
|
|
continue
|
|
rate = get_crc_rate(code)
|
|
if rate is not None:
|
|
multipliers[code] = rate
|
|
|
|
return multipliers
|
|
|
|
|
|
def get_converted_amount_expr(session: Session):
|
|
"""Return a SQLAlchemy expression converting Transaction.amount to CRC.
|
|
|
|
Builds a CASE that multiplies by the per-currency CRC rate; CRC passes through.
|
|
Missing rates fall back to 1.0 (treat as CRC) rather than 0.0 so a transient
|
|
API outage does not silently zero out foreign-currency totals.
|
|
"""
|
|
from app.models.models import Transaction
|
|
|
|
multipliers = get_crc_multipliers(session)
|
|
whens = [
|
|
(Transaction.currency == code, Transaction.amount * mult)
|
|
for code, mult in multipliers.items()
|
|
if code != "CRC"
|
|
]
|
|
if not whens:
|
|
return Transaction.amount
|
|
return case(*whens, else_=Transaction.amount)
|
|
|
|
|
|
def _refresh_usd_rate() -> bool:
|
|
"""Force-fetch USD/CRC from APIs and persist to DB. Returns True on success."""
|
|
fetched = _fetch_rate_from_apis()
|
|
if fetched is None:
|
|
return False
|
|
buy, sell = fetched
|
|
with Session(engine) as session:
|
|
rate = ExchangeRate(date=datetime.utcnow(), buy_rate=buy, sell_rate=sell)
|
|
session.add(rate)
|
|
session.commit()
|
|
session.refresh(rate)
|
|
_remember(rate)
|
|
return True
|
|
|
|
|
|
def _refresh_other_rate(code: str) -> bool:
|
|
"""Force-fetch {code}/CRC and update in-memory cache. Returns True on success."""
|
|
if code in _COINGECKO_IDS:
|
|
rate = _fetch_crypto_crc(code)
|
|
else:
|
|
rate = _fetch_fiat_crc_mid(code)
|
|
if rate is None:
|
|
return False
|
|
_xcrc_cache[code] = (rate, datetime.utcnow())
|
|
_last_known_xcrc[code] = rate
|
|
return True
|
|
|
|
|
|
def refresh_all_rates() -> dict[str, bool]:
|
|
"""Force-refresh every supported currency.
|
|
|
|
Each currency is refreshed independently — one failure does not affect others.
|
|
On success the DB (for USD) and in-memory caches are updated. On failure the
|
|
previous value is retained via `_last_known_*` / stale-DB fallback, so callers
|
|
always see the most recent working rate.
|
|
"""
|
|
from app.models.models import Currency
|
|
|
|
results: dict[str, bool] = {}
|
|
|
|
try:
|
|
results["USD"] = _refresh_usd_rate()
|
|
except Exception:
|
|
logger.exception("USD rate refresh failed")
|
|
results["USD"] = False
|
|
|
|
for currency in Currency:
|
|
code = currency.value
|
|
if code in ("CRC", "USD"):
|
|
continue
|
|
try:
|
|
results[code] = _refresh_other_rate(code)
|
|
except Exception:
|
|
logger.exception("%s rate refresh failed", code)
|
|
results[code] = False
|
|
|
|
return results
|
|
|
|
|
|
async def refresh_rates_periodically(
|
|
interval_seconds: int = REFRESH_INTERVAL_SECONDS,
|
|
) -> None:
|
|
"""Background loop that refreshes all currency rates every `interval_seconds`.
|
|
|
|
Never raises — failures are logged and the last-known rates are retained.
|
|
Runs one refresh immediately on startup, then sleeps on the fixed interval.
|
|
"""
|
|
while True:
|
|
try:
|
|
report = await asyncio.to_thread(refresh_all_rates)
|
|
ok = sorted(k for k, v in report.items() if v)
|
|
failed = sorted(k for k, v in report.items() if not v)
|
|
logger.info(
|
|
"Exchange rate refresh complete: ok=%s failed=%s", ok, failed
|
|
)
|
|
except Exception:
|
|
logger.exception("Exchange rate refresh loop crashed")
|
|
await asyncio.sleep(interval_seconds)
|
|
|
|
|
|
def get_rate_history(session: Session, days: int = 30) -> list[ExchangeRate]:
|
|
"""Get historical exchange rates."""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
return list(
|
|
session.exec(
|
|
select(ExchangeRate)
|
|
.where(ExchangeRate.date > cutoff)
|
|
.order_by(col(ExchangeRate.date).desc())
|
|
).all()
|
|
)
|