mirror of
https://github.com/escalante29/WealthySmart.git
synced 2026-05-19 09:28:47 +02:00
Add pension PDF upload, parsing, and fund summary API
All checks were successful
Deploy to VPS / deploy (push) Successful in 48s
All checks were successful
Deploy to VPS / deploy (push) Successful in 48s
Backend: parse BAC pension statement PDFs (VOL, ROP, FCL) via pdftotext, store snapshots with duplicate detection, reject credit card statements. Endpoints: POST /upload, GET /snapshots, GET /fund-summary. Frontend: wire up drag-and-drop upload, load real balances and rendimientos from API, show upload results with error/duplicate feedback. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
121
backend/app/api/v1/endpoints/pensions.py
Normal file
121
backend/app/api/v1/endpoints/pensions.py
Normal file
@@ -0,0 +1,121 @@
|
||||
from fastapi import APIRouter, Depends, UploadFile
|
||||
from pydantic import BaseModel
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from app.auth import get_current_user
|
||||
from app.db import get_session
|
||||
from app.models.models import Bank, PensionSnapshot, PensionSnapshotRead
|
||||
from app.services.pension_pdf import parse_pension_pdf
|
||||
|
||||
router = APIRouter(prefix="/pensions", tags=["pensions"])
|
||||
|
||||
|
||||
class PensionUploadResult(BaseModel):
|
||||
imported: int
|
||||
duplicates: int
|
||||
errors: list[str]
|
||||
snapshots: list[PensionSnapshotRead]
|
||||
|
||||
|
||||
@router.post("/upload", response_model=PensionUploadResult)
|
||||
async def upload_pension_pdfs(
|
||||
files: list[UploadFile],
|
||||
session: Session = Depends(get_session),
|
||||
_user: str = Depends(get_current_user),
|
||||
):
|
||||
imported = 0
|
||||
duplicates = 0
|
||||
errors: list[str] = []
|
||||
created: list[PensionSnapshot] = []
|
||||
|
||||
for file in files:
|
||||
filename = file.filename or "unknown.pdf"
|
||||
try:
|
||||
pdf_bytes = await file.read()
|
||||
fund_snapshots = parse_pension_pdf(pdf_bytes, filename)
|
||||
except ValueError as e:
|
||||
errors.append(str(e))
|
||||
continue
|
||||
except Exception as e:
|
||||
errors.append(f"{filename}: {e}")
|
||||
continue
|
||||
|
||||
for snap in fund_snapshots:
|
||||
existing = session.exec(
|
||||
select(PensionSnapshot).where(
|
||||
PensionSnapshot.fund == Bank(snap.fund),
|
||||
PensionSnapshot.period_start == snap.period_start,
|
||||
PensionSnapshot.period_end == snap.period_end,
|
||||
)
|
||||
).first()
|
||||
if existing:
|
||||
duplicates += 1
|
||||
continue
|
||||
|
||||
row = PensionSnapshot(
|
||||
fund=Bank(snap.fund),
|
||||
contract_number=snap.contract_number,
|
||||
period_start=snap.period_start,
|
||||
period_end=snap.period_end,
|
||||
saldo_anterior=snap.saldo_anterior,
|
||||
aportes=snap.aportes,
|
||||
rendimientos=snap.rendimientos,
|
||||
retiros=snap.retiros,
|
||||
traslados=snap.traslados,
|
||||
comision=snap.comision,
|
||||
correccion=snap.correccion,
|
||||
bonificacion=snap.bonificacion,
|
||||
saldo_final=snap.saldo_final,
|
||||
source_filename=filename,
|
||||
)
|
||||
session.add(row)
|
||||
created.append(row)
|
||||
imported += 1
|
||||
|
||||
if imported > 0:
|
||||
session.commit()
|
||||
for row in created:
|
||||
session.refresh(row)
|
||||
|
||||
return PensionUploadResult(
|
||||
imported=imported,
|
||||
duplicates=duplicates,
|
||||
errors=errors,
|
||||
snapshots=[PensionSnapshotRead.model_validate(r) for r in created],
|
||||
)
|
||||
|
||||
|
||||
@router.get("/snapshots", response_model=list[PensionSnapshotRead])
|
||||
def get_snapshots(
|
||||
session: Session = Depends(get_session),
|
||||
_user: str = Depends(get_current_user),
|
||||
):
|
||||
rows = session.exec(
|
||||
select(PensionSnapshot).order_by(
|
||||
PensionSnapshot.period_end.desc(), # type: ignore[union-attr]
|
||||
PensionSnapshot.fund,
|
||||
)
|
||||
).all()
|
||||
return rows
|
||||
|
||||
|
||||
@router.get("/fund-summary", response_model=list[PensionSnapshotRead])
|
||||
def get_fund_summary(
|
||||
session: Session = Depends(get_session),
|
||||
_user: str = Depends(get_current_user),
|
||||
):
|
||||
"""Return the latest snapshot per fund (by most recent period_end)."""
|
||||
all_rows = session.exec(
|
||||
select(PensionSnapshot).order_by(
|
||||
PensionSnapshot.period_end.desc(), # type: ignore[union-attr]
|
||||
)
|
||||
).all()
|
||||
|
||||
seen: set[str] = set()
|
||||
latest: list[PensionSnapshot] = []
|
||||
for row in all_rows:
|
||||
if row.fund.value not in seen:
|
||||
seen.add(row.fund.value)
|
||||
latest.append(row)
|
||||
|
||||
return latest
|
||||
@@ -9,6 +9,7 @@ from app.api.v1.endpoints import (
|
||||
exchange_rate,
|
||||
import_transactions,
|
||||
notifications,
|
||||
pensions,
|
||||
salarios,
|
||||
settings,
|
||||
tokens,
|
||||
@@ -28,3 +29,4 @@ api_router.include_router(settings.router)
|
||||
api_router.include_router(budget.router)
|
||||
api_router.include_router(notifications.router)
|
||||
api_router.include_router(salarios.router)
|
||||
api_router.include_router(pensions.router)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import enum
|
||||
from datetime import datetime
|
||||
from datetime import date, datetime
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import JSON, Column
|
||||
from sqlalchemy import JSON, Column, UniqueConstraint
|
||||
from sqlmodel import Field, Relationship, SQLModel
|
||||
|
||||
|
||||
@@ -300,3 +300,36 @@ class PushSubscription(SQLModel, table=True):
|
||||
class PushSubscriptionCreate(SQLModel):
|
||||
endpoint: str
|
||||
keys: dict # {"p256dh": "...", "auth": "..."}
|
||||
|
||||
|
||||
# --- Pension Snapshot ---
|
||||
|
||||
|
||||
class PensionSnapshotBase(SQLModel):
|
||||
fund: Bank
|
||||
contract_number: str
|
||||
period_start: date
|
||||
period_end: date
|
||||
saldo_anterior: float
|
||||
aportes: float
|
||||
rendimientos: float
|
||||
retiros: float
|
||||
traslados: float
|
||||
comision: float
|
||||
correccion: float
|
||||
bonificacion: float
|
||||
saldo_final: float
|
||||
source_filename: str
|
||||
|
||||
|
||||
class PensionSnapshot(PensionSnapshotBase, table=True):
|
||||
__table_args__ = (
|
||||
UniqueConstraint("fund", "period_start", "period_end"),
|
||||
)
|
||||
id: Optional[int] = Field(default=None, primary_key=True)
|
||||
created_at: datetime = Field(default_factory=datetime.utcnow)
|
||||
|
||||
|
||||
class PensionSnapshotRead(PensionSnapshotBase):
|
||||
id: int
|
||||
created_at: datetime
|
||||
|
||||
225
backend/app/services/pension_pdf.py
Normal file
225
backend/app/services/pension_pdf.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""Parse BAC San José Pensiones PDF statements into structured fund snapshots."""
|
||||
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from datetime import date
|
||||
|
||||
|
||||
@dataclass
|
||||
class FundSnapshot:
|
||||
fund: str # "ROP", "FCL", or "VOL"
|
||||
contract_number: str
|
||||
period_start: date
|
||||
period_end: date
|
||||
saldo_anterior: float
|
||||
aportes: float
|
||||
rendimientos: float
|
||||
retiros: float
|
||||
traslados: float
|
||||
comision: float
|
||||
correccion: float
|
||||
bonificacion: float
|
||||
saldo_final: float
|
||||
|
||||
|
||||
def _find_pdftotext() -> str:
|
||||
"""Find pdftotext binary, checking common install paths."""
|
||||
import os
|
||||
|
||||
cmd = shutil.which("pdftotext")
|
||||
if cmd:
|
||||
return cmd
|
||||
for path in ["/opt/homebrew/bin/pdftotext", "/usr/bin/pdftotext", "/usr/local/bin/pdftotext"]:
|
||||
if os.path.isfile(path):
|
||||
return path
|
||||
raise FileNotFoundError("pdftotext not found — install poppler-utils")
|
||||
|
||||
|
||||
def extract_text(pdf_bytes: bytes) -> str:
|
||||
pdftotext_bin = _find_pdftotext()
|
||||
with tempfile.NamedTemporaryFile(suffix=".pdf") as f:
|
||||
f.write(pdf_bytes)
|
||||
f.flush()
|
||||
result = subprocess.run(
|
||||
[pdftotext_bin, "-layout", f.name, "-"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise ValueError(f"pdftotext failed: {result.stderr.strip()}")
|
||||
return result.stdout
|
||||
|
||||
|
||||
def detect_type(text: str) -> str:
|
||||
"""Return 'VOL', 'ROP_FCL', or 'UNKNOWN'."""
|
||||
if any(kw in text for kw in ("MARCA DE TARJETA", "ESTADO DE CUENTA", "PAGO MÍNIMO")):
|
||||
return "CREDIT_CARD"
|
||||
if "FONDO C VOLUNTARIO" in text:
|
||||
return "VOL"
|
||||
if "RÉGIMEN OBLIGATORIO" in text or ("ROP" in text and "FCL" in text):
|
||||
return "ROP_FCL"
|
||||
return "UNKNOWN"
|
||||
|
||||
|
||||
def _parse_amount(s: str) -> float:
|
||||
"""Parse '17,819,176.79' or '-12,693.13' into float."""
|
||||
cleaned = s.replace(",", "")
|
||||
return float(cleaned)
|
||||
|
||||
|
||||
def _find_amounts(line: str) -> list[float]:
|
||||
"""Extract all ¢-prefixed amounts from a line."""
|
||||
return [_parse_amount(m) for m in re.findall(r"¢\s*(-?[\d,]+\.\d{2})", line)]
|
||||
|
||||
|
||||
def _parse_period(text: str) -> tuple[date, date]:
|
||||
m = re.search(r"DEL\s+(\d{2}/\d{2}/\d{4})\s+AL\s+(\d{2}/\d{2}/\d{4})", text)
|
||||
if not m:
|
||||
raise ValueError("Could not find period dates (DEL ... AL ...)")
|
||||
start = date(int(m.group(1)[6:]), int(m.group(1)[3:5]), int(m.group(1)[:2]))
|
||||
end = date(int(m.group(2)[6:]), int(m.group(2)[3:5]), int(m.group(2)[:2]))
|
||||
return start, end
|
||||
|
||||
|
||||
def _extract_summary_value(text: str, label: str) -> list[float]:
|
||||
"""Find a summary line by label and return all ¢ amounts on that line."""
|
||||
pattern = re.compile(re.escape(label) + r".*", re.IGNORECASE)
|
||||
for line in text.split("\n"):
|
||||
if pattern.search(line):
|
||||
amounts = _find_amounts(line)
|
||||
if amounts:
|
||||
return amounts
|
||||
return []
|
||||
|
||||
|
||||
_SUMMARY_FIELDS = [
|
||||
("Saldo Anterior", "saldo_anterior"),
|
||||
("Aportes", "aportes"),
|
||||
("Rendimientos", "rendimientos"),
|
||||
("Retiros", "retiros"),
|
||||
("Traslados", "traslados"),
|
||||
("Comisión de Administración", "comision"),
|
||||
("Corrección de Imputaciones", "correccion"),
|
||||
("Bonificación", "bonificacion"),
|
||||
]
|
||||
|
||||
|
||||
def _find_final_balance(text: str, after_label: str = "Bonificación") -> list[float]:
|
||||
"""Find the standalone balance line after the last summary field.
|
||||
|
||||
After Bonificación (or Corrección for ROP+FCL), there's a line with just
|
||||
the final balance amount(s) and no label.
|
||||
"""
|
||||
lines = text.split("\n")
|
||||
found_label = False
|
||||
for line in lines:
|
||||
if after_label in line:
|
||||
found_label = True
|
||||
continue
|
||||
if found_label:
|
||||
amounts = _find_amounts(line)
|
||||
if amounts:
|
||||
return amounts
|
||||
return []
|
||||
|
||||
|
||||
def parse_vol(text: str) -> list[FundSnapshot]:
|
||||
period_start, period_end = _parse_period(text)
|
||||
|
||||
# Contract number
|
||||
m = re.search(r"N°\s*Contrato:\s*(\S+)", text)
|
||||
contract = m.group(1) if m else ""
|
||||
|
||||
data: dict[str, float] = {}
|
||||
for label, field in _SUMMARY_FIELDS:
|
||||
amounts = _extract_summary_value(text, label)
|
||||
data[field] = amounts[0] if amounts else 0.0
|
||||
|
||||
finals = _find_final_balance(text, "Bonificación")
|
||||
if not finals:
|
||||
# Fallback: look after Corrección
|
||||
finals = _find_final_balance(text, "Corrección de Imputaciones")
|
||||
saldo_final = finals[0] if finals else 0.0
|
||||
|
||||
return [
|
||||
FundSnapshot(
|
||||
fund="VOL",
|
||||
contract_number=contract,
|
||||
period_start=period_start,
|
||||
period_end=period_end,
|
||||
saldo_final=saldo_final,
|
||||
**data,
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def parse_rop_fcl(text: str) -> list[FundSnapshot]:
|
||||
period_start, period_end = _parse_period(text)
|
||||
|
||||
# Contract numbers
|
||||
m_rop = re.search(r"N°\s*Contrato\s*ROP:\s*(\S+)", text)
|
||||
m_fcl = re.search(r"N°\s*Contrato\s*FCL:\s*(\S+)", text)
|
||||
contract_rop = m_rop.group(1) if m_rop else ""
|
||||
contract_fcl = m_fcl.group(1) if m_fcl else ""
|
||||
|
||||
rop_data: dict[str, float] = {}
|
||||
fcl_data: dict[str, float] = {}
|
||||
|
||||
for label, field in _SUMMARY_FIELDS:
|
||||
amounts = _extract_summary_value(text, label)
|
||||
if len(amounts) >= 2:
|
||||
rop_data[field] = amounts[0]
|
||||
fcl_data[field] = amounts[1]
|
||||
elif len(amounts) == 1:
|
||||
rop_data[field] = amounts[0]
|
||||
fcl_data[field] = 0.0
|
||||
else:
|
||||
rop_data[field] = 0.0
|
||||
fcl_data[field] = 0.0
|
||||
|
||||
# Final balance line (after Corrección since ROP+FCL has no Bonificación)
|
||||
finals = _find_final_balance(text, "Corrección de Imputaciones")
|
||||
rop_final = finals[0] if len(finals) >= 1 else 0.0
|
||||
fcl_final = finals[1] if len(finals) >= 2 else 0.0
|
||||
|
||||
return [
|
||||
FundSnapshot(
|
||||
fund="ROP",
|
||||
contract_number=contract_rop,
|
||||
period_start=period_start,
|
||||
period_end=period_end,
|
||||
saldo_final=rop_final,
|
||||
**rop_data,
|
||||
),
|
||||
FundSnapshot(
|
||||
fund="FCL",
|
||||
contract_number=contract_fcl,
|
||||
period_start=period_start,
|
||||
period_end=period_end,
|
||||
saldo_final=fcl_final,
|
||||
**fcl_data,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def parse_pension_pdf(pdf_bytes: bytes, filename: str = "") -> list[FundSnapshot]:
|
||||
"""Parse a pension PDF and return fund snapshots.
|
||||
|
||||
Raises ValueError for credit card statements or unrecognized formats.
|
||||
"""
|
||||
text = extract_text(pdf_bytes)
|
||||
doc_type = detect_type(text)
|
||||
|
||||
if doc_type == "CREDIT_CARD":
|
||||
raise ValueError(f"'{filename}' is a credit card statement, not a pension extract")
|
||||
if doc_type == "UNKNOWN":
|
||||
raise ValueError(f"'{filename}' is not a recognized BAC pension statement")
|
||||
|
||||
if doc_type == "VOL":
|
||||
return parse_vol(text)
|
||||
else:
|
||||
return parse_rop_fcl(text)
|
||||
Reference in New Issue
Block a user