import calendar from datetime import datetime from sqlmodel import Session, func, select from app.models.models import ( BalanceOverride, RecurringFrequency, RecurringItem, RecurringItemType, Transaction, TransactionSource, TransactionType, ) MIN_YEAR = 2026 MAX_YEAR = 2030 # Fresh start: months before this are zeroed out FRESH_START_YEAR = 2026 FRESH_START_MONTH = 3 def get_effective_amount(item: RecurringItem, month: int, year: int) -> float | None: """Return the effective amount for a recurring item in a given month, or None if inactive.""" freq = item.frequency if freq == RecurringFrequency.MONTHLY: if item.override_amounts and str(month) in item.override_amounts: return float(item.override_amounts[str(month)]) return item.amount if freq == RecurringFrequency.WEEKLY: # Count occurrences of the weekday in this month # day_of_month stores day-of-week: 0=Monday weekday = item.day_of_month if item.day_of_month is not None else 0 cal = calendar.monthcalendar(year, month) count = sum(1 for week in cal if week[weekday] != 0) return item.amount * count if freq == RecurringFrequency.QUARTERLY: # Active in months 3, 6, 9, 12 by default if month % 3 == 0: if item.override_amounts and str(month) in item.override_amounts: return float(item.override_amounts[str(month)]) return item.amount return None if freq == RecurringFrequency.BIANNUAL: # Active in month_of_year and 6 months later base = item.month_of_year or 1 second = base + 6 if base <= 6 else base - 6 if month in (base, second): return item.amount return None if freq == RecurringFrequency.YEARLY: if month == (item.month_of_year or 12): return item.amount return None return None def get_month_range(year: int, month: int) -> tuple[datetime, datetime]: """Return (start, end) for a calendar month.""" start = datetime(year, month, 1) if month == 12: end = datetime(year + 1, 1, 1) else: end = datetime(year, month + 1, 1) return start, end def get_cycle_range(year: int, month: int) -> tuple[datetime, datetime]: """Return (start, end) for billing cycle: month/18 to month+1/18.""" start = datetime(year, month, 18) if month == 12: end = datetime(year + 1, 1, 18) else: end = datetime(year, month + 1, 18) return start, end def get_previous_cycle(year: int, month: int) -> tuple[int, int]: """Return (year, month) for the billing cycle preceding the given one.""" if month == 1: return year - 1, 12 return year, month - 1 def compute_actuals_by_source( session: Session, year: int, month: int ) -> dict[str, dict]: """Query actual transaction totals grouped by source. Credit card uses billing cycle (18th-18th) with deferred logic. Cash/Transfer use calendar month (1st-1st). """ # CC billing cycle for budget month M is the cycle that *ends* around the 18th of M # i.e. cycle (M-1): from (M-1)/18 to M/18, paid with month M salary cc_cycle_y, cc_cycle_m = get_previous_cycle(year, month) cc_start, cc_end = get_cycle_range(cc_cycle_y, cc_cycle_m) prev_cc_y, prev_cc_m = get_previous_cycle(cc_cycle_y, cc_cycle_m) prev_start, prev_end = get_cycle_range(prev_cc_y, prev_cc_m) cal_start, cal_end = get_month_range(year, month) results = {} for source in TransactionSource: if source == TransactionSource.CREDIT_CARD: start, end = cc_start, cc_end # Normal transactions in this cycle (not deferred) compra_normal = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= start, Transaction.date < end, Transaction.source == source, Transaction.transaction_type == TransactionType.COMPRA, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) ).one() # Deferred from previous cycle compra_deferred = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == source, Transaction.transaction_type == TransactionType.COMPRA, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) ).one() compra = float(compra_normal) + float(compra_deferred) dev_normal = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= start, Transaction.date < end, Transaction.source == source, Transaction.transaction_type == TransactionType.DEVOLUCION, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) ).one() dev_deferred = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == source, Transaction.transaction_type == TransactionType.DEVOLUCION, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) ).one() devolucion = float(dev_normal) + float(dev_deferred) count_normal = session.exec( select(func.count()).where( Transaction.date >= start, Transaction.date < end, Transaction.source == source, Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) ).one() count_deferred = session.exec( select(func.count()).where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == source, Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) ).one() count = count_normal + count_deferred results[source.value] = { "source": source.value, "total_compra": compra, "total_devolucion": devolucion, "net": compra - devolucion, "count": count, } else: # Cash / Transfer: calendar month, no deferred logic compra = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= cal_start, Transaction.date < cal_end, Transaction.source == source, Transaction.transaction_type == TransactionType.COMPRA, ) ).one() devolucion = session.exec( select(func.coalesce(func.sum(Transaction.amount), 0)).where( Transaction.date >= cal_start, Transaction.date < cal_end, Transaction.source == source, Transaction.transaction_type == TransactionType.DEVOLUCION, ) ).one() count = session.exec( select(func.count()).where( Transaction.date >= cal_start, Transaction.date < cal_end, Transaction.source == source, Transaction.transaction_type != TransactionType.DEPOSITO, ) ).one() compra_val = float(compra) devolucion_val = float(devolucion) results[source.value] = { "source": source.value, "total_compra": compra_val, "total_devolucion": devolucion_val, "net": compra_val - devolucion_val, "count": count, } return results def compute_actuals_by_category( session: Session, year: int, month: int ) -> dict[int, float]: """Return {category_id: net_amount} for actual transactions. Credit card uses billing cycle (18th-18th) with deferred logic. Cash/Transfer use calendar month (1st-1st). """ cc_cycle_y, cc_cycle_m = get_previous_cycle(year, month) cc_start, cc_end = get_cycle_range(cc_cycle_y, cc_cycle_m) prev_cc_y, prev_cc_m = get_previous_cycle(cc_cycle_y, cc_cycle_m) prev_start, prev_end = get_cycle_range(prev_cc_y, prev_cc_m) cal_start, cal_end = get_month_range(year, month) totals: dict[int, float] = {} def _merge_rows(rows: list) -> None: for cat_id, tx_type, amount in rows: val = float(amount) if tx_type == TransactionType.DEVOLUCION: val = -val totals[cat_id] = totals.get(cat_id, 0) + val # 1) CC normal in this cycle (not deferred) _merge_rows( session.exec( select( Transaction.category_id, Transaction.transaction_type, func.sum(Transaction.amount), ) .where( Transaction.date >= cc_start, Transaction.date < cc_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.category_id.is_not(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) .group_by(Transaction.category_id, Transaction.transaction_type) ).all() ) # 2) CC deferred from previous cycle _merge_rows( session.exec( select( Transaction.category_id, Transaction.transaction_type, func.sum(Transaction.amount), ) .where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.category_id.is_not(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) .group_by(Transaction.category_id, Transaction.transaction_type) ).all() ) # 3) Non-CC: calendar month _merge_rows( session.exec( select( Transaction.category_id, Transaction.transaction_type, func.sum(Transaction.amount), ) .where( Transaction.date >= cal_start, Transaction.date < cal_end, Transaction.source != TransactionSource.CREDIT_CARD, Transaction.category_id.is_not(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, ) .group_by(Transaction.category_id, Transaction.transaction_type) ).all() ) return totals def compute_cc_by_category( session: Session, year: int, month: int ) -> list[dict]: """Return credit card spending by category for the billing cycle.""" cc_cycle_y, cc_cycle_m = get_previous_cycle(year, month) cc_start, cc_end = get_cycle_range(cc_cycle_y, cc_cycle_m) prev_cc_y, prev_cc_m = get_previous_cycle(cc_cycle_y, cc_cycle_m) prev_start, prev_end = get_cycle_range(prev_cc_y, prev_cc_m) totals: dict[int | None, float] = {} def _merge(rows: list) -> None: for cat_id, tx_type, amount in rows: val = float(amount) if tx_type == TransactionType.DEVOLUCION: val = -val totals[cat_id] = totals.get(cat_id, 0) + val # CC normal in this cycle _merge( session.exec( select( Transaction.category_id, Transaction.transaction_type, func.sum(Transaction.amount), ) .where( Transaction.date >= cc_start, Transaction.date < cc_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) .group_by(Transaction.category_id, Transaction.transaction_type) ).all() ) # CC deferred from previous cycle _merge( session.exec( select( Transaction.category_id, Transaction.transaction_type, func.sum(Transaction.amount), ) .where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) .group_by(Transaction.category_id, Transaction.transaction_type) ).all() ) # Resolve category names from app.models.models import Category result = [] for cat_id, amount in totals.items(): if amount <= 0: continue if cat_id is not None: cat = session.get(Category, cat_id) name = cat.name if cat else "Sin categoría" else: name = "Sin categoría" result.append({"category_name": name, "amount": round(amount, 2)}) return sorted(result, key=lambda x: x["amount"], reverse=True) def compute_monthly_projection( session: Session, year: int, month: int ) -> dict: """Compute full monthly projection with no-double-count logic.""" items = session.exec( select(RecurringItem).where(RecurringItem.is_active == True) # noqa: E712 ).all() actuals_by_source = compute_actuals_by_source(session, year, month) actuals_by_category = compute_actuals_by_category(session, year, month) income_items = [] expense_items = [] savings_items = [] total_income = 0.0 total_fixed_expenses = 0.0 total_savings = 0.0 for item in items: effective = get_effective_amount(item, month, year) if effective is None: continue detail = { "id": item.id, "name": item.name, "amount": effective, "item_type": item.item_type.value, "frequency": item.frequency.value, "category_name": item.category.name if item.category else None, "category_id": item.category_id, "used_actual": False, } if item.item_type == RecurringItemType.INCOME: income_items.append(detail) total_income += effective elif item.item_type == RecurringItemType.EXPENSE: # No-double-count: if category has actuals, use actual instead if item.category_id and item.category_id in actuals_by_category: actual_amount = actuals_by_category[item.category_id] detail["amount"] = actual_amount detail["projected_amount"] = effective detail["used_actual"] = True total_fixed_expenses += actual_amount else: total_fixed_expenses += effective expense_items.append(detail) elif item.item_type == RecurringItemType.SAVINGS: savings_items.append(detail) total_savings += effective # Sum actuals from sources for categories NOT covered by recurring items covered_category_ids = { item.category_id for item in items if item.item_type == RecurringItemType.EXPENSE and item.category_id is not None and get_effective_amount(item, month, year) is not None } uncovered_actual = 0.0 for cat_id, amount in actuals_by_category.items(): if cat_id not in covered_category_ids: uncovered_actual += amount # Also add transactions with no category (hybrid ranges + deferred) cc_cycle_y, cc_cycle_m = get_previous_cycle(year, month) cc_start, cc_end = get_cycle_range(cc_cycle_y, cc_cycle_m) prev_cc_y, prev_cc_m = get_previous_cycle(cc_cycle_y, cc_cycle_m) prev_start, prev_end = get_cycle_range(prev_cc_y, prev_cc_m) cal_start, cal_end = get_month_range(year, month) def _sum_uncategorized(rows: list) -> float: total = 0.0 for tx_type, amount in rows: val = float(amount) if tx_type == TransactionType.DEVOLUCION: val = -val total += val return total # CC uncategorized: this cycle (not deferred) uncovered_actual += _sum_uncategorized( session.exec( select(Transaction.transaction_type, func.sum(Transaction.amount)) .where( Transaction.date >= cc_start, Transaction.date < cc_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.category_id.is_(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == False, # noqa: E712 ) .group_by(Transaction.transaction_type) ).all() ) # CC uncategorized: deferred from previous cycle uncovered_actual += _sum_uncategorized( session.exec( select(Transaction.transaction_type, func.sum(Transaction.amount)) .where( Transaction.date >= prev_start, Transaction.date < prev_end, Transaction.source == TransactionSource.CREDIT_CARD, Transaction.category_id.is_(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, Transaction.deferred_to_next_cycle == True, # noqa: E712 ) .group_by(Transaction.transaction_type) ).all() ) # Non-CC uncategorized: calendar month uncovered_actual += _sum_uncategorized( session.exec( select(Transaction.transaction_type, func.sum(Transaction.amount)) .where( Transaction.date >= cal_start, Transaction.date < cal_end, Transaction.source != TransactionSource.CREDIT_CARD, Transaction.category_id.is_(None), # type: ignore[union-attr] Transaction.transaction_type != TransactionType.DEPOSITO, ) .group_by(Transaction.transaction_type) ).all() ) actual_credit_card = actuals_by_source.get("CREDIT_CARD", {}).get("net", 0) actual_cash = actuals_by_source.get("CASH", {}).get("net", 0) actual_transfers = actuals_by_source.get("TRANSFER", {}).get("net", 0) cc_by_category = compute_cc_by_category(session, year, month) gran_total = total_fixed_expenses + uncovered_actual # Savings are NOT deducted — they are already deducted from gross salary # (the income amounts are net, post-savings) net_balance = total_income - gran_total return { "year": year, "month": month, "projected_income": total_income, "projected_fixed_expenses": total_fixed_expenses, "projected_savings": total_savings, "actual_credit_card": actual_credit_card, "actual_cash": actual_cash, "actual_transfers": actual_transfers, "uncovered_actual": uncovered_actual, "gran_total_egresos": gran_total, "net_balance": net_balance, "income_items": income_items, "expense_items": expense_items, "savings_items": savings_items, "actuals_by_source": list(actuals_by_source.values()), "cc_by_category": cc_by_category, } def _get_december_cumulative(session: Session, year: int) -> float: """Get the cumulative balance for December of a given year.""" # Check for an override first override = session.exec( select(BalanceOverride).where( BalanceOverride.year == year, BalanceOverride.month == 12 ) ).first() if override: return override.override_balance # Compute the full year to get December's cumulative overrides = session.exec( select(BalanceOverride).where(BalanceOverride.year == year) ).all() override_map = {o.month: o.override_balance for o in overrides} cumulative = 0.0 if year > FRESH_START_YEAR: cumulative = _get_december_cumulative(session, year - 1) for m in range(1, 13): if year == FRESH_START_YEAR and m < FRESH_START_MONTH: continue data = compute_monthly_projection(session, year, m) cumulative += data["net_balance"] if m in override_map: cumulative = override_map[m] return cumulative def compute_yearly_projection_with_cumulative( session: Session, year: int ) -> list[dict]: """Compute all 12 months with cumulative balance tracking.""" overrides = session.exec( select(BalanceOverride).where(BalanceOverride.year == year) ).all() override_map = {o.month: o.override_balance for o in overrides} # Determine January carryover if year <= FRESH_START_YEAR: carryover = 0.0 else: carryover = _get_december_cumulative(session, year - 1) months = [] for m in range(1, 13): data = compute_monthly_projection(session, year, m) is_before_fresh_start = ( year == FRESH_START_YEAR and m < FRESH_START_MONTH ) if is_before_fresh_start: data["carryover_balance"] = 0.0 data["cumulative_balance"] = 0.0 data["balance_overridden"] = False else: data["carryover_balance"] = carryover cumulative = carryover + data["net_balance"] if m in override_map: cumulative = override_map[m] data["balance_overridden"] = True else: data["balance_overridden"] = False data["cumulative_balance"] = cumulative carryover = cumulative months.append(data) return months