import type { Pool } from 'pg'; import { prisma, tenantDb } from '../config/database.js'; import { calcularIngresosPorRegimen, calcularEgresosPorRegimen, calcularNcsEmitidasPorRegimen, calcularNcsRecibidasPorRegimen, calcularGastosNoDeduciblesEfectivoPorRegimen, } from './dashboard.service.js'; import { getResumenIva } from './impuestos.service.js'; import { upsertMetricaMensual, getPendingInvalidations, clearInvalidation, } from './metricas.service.js'; /** * Tanda A — Cimientos del sistema hot/cold de métricas pre-calculadas. * * Este módulo calcula las métricas mensuales agregando desde `cfdis` raw, y las * guarda en la tabla `metricas_mensuales` del tenant para que los consumers las * lean sin recomputar. Los consumers aún NO leen de la tabla (Tanda B), esto * solo llena y mantiene la tabla. * * Alcance Tanda A (campos poblados): * - ingresos_cobrados, egresos_pagados (flujo efectivo, respeta grupo de régimen) * - iva_trasladado_total, iva_acreditable, iva_retenido_cobrado, iva_resultado * - utilidad_realizada, flujo_entradas/salidas/neto * - cfdis_emitidos_count, cfdis_recibidos_count, cfdis_cancelados_count * * Fuera de alcance Tanda A (quedan en 0 — iteraciones futuras): * - Desglose IVA por tasa (16/8/0/exento) * - ISR causado/retenido/a_pagar (requiere tablas progresivas + coeficiente) * - IEPS trasladado/acreditable * - CxC/CxP saldo final + counts * - ingresos_devengados, egresos_devengados, utilidad_devengada (split PF vs PM) */ // ─────────────────────────────────────────────────────────────── // Compute para UN (contribuyente, anio, mes) // ─────────────────────────────────────────────────────────────── /** * Computa y hace upsert de métricas mensuales para un contribuyente en un mes. * Crea una fila por cada régimen detectado en los CFDIs del mes. Si no hay * CFDIs en el mes, no inserta nada (filas ausentes = mes sin actividad). */ export async function computeMetricaMensual( pool: Pool, tenantId: string, contribuyenteId: string, anio: number, mes: number, ): Promise<{ filasEscritas: number }> { const safeContrib = contribuyenteId.replace(/[^a-f0-9-]/gi, ''); const fi = `${anio}-${String(mes).padStart(2, '0')}-01`; const lastDay = new Date(anio, mes, 0).getDate(); const ff = `${anio}-${String(mes).padStart(2, '0')}-${String(lastDay).padStart(2, '0')}`; // DELETE del cache del periodo ANTES de llamar a calcular{Ingresos,Egresos}. // Crítico: esas funciones hacen read-through cache, así que si encuentran // filas en metricas_mensuales leen valores viejos y el recompute propaga // datos stale. Al borrar primero, el read-through no encuentra nada y cae // al path on-the-fly (que es lo que queremos al recomputar). await pool.query( `DELETE FROM metricas_mensuales WHERE contribuyente_id = $1 AND anio = $2 AND mes = $3`, [safeContrib, anio, mes], ); // Reusa la lógica canónica de los servicios existentes. Paso `_ignorados=[]` // para que NO filtre régimenes ignorados por el tenant — en la tabla // almacenamos todos los datos; el consumer decide si filtrar ignorados. const [ingresos, egresos, resumenIva, ncsEmitidas, ncsRecibidas, noDeducibles] = await Promise.all([ calcularIngresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId), calcularEgresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId), getResumenIva(pool, fi, ff, tenantId, false, contribuyenteId), calcularNcsEmitidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId), calcularNcsRecibidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId), calcularGastosNoDeduciblesEfectivoPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId), ]); // Counts de CFDIs del mes (por régimen, usando FECHA_EFECTIVA del fix P) const { rows: countsRows } = await pool.query<{ regimen: string | null; direction: 'E' | 'R'; vigentes: string; cancelados: string; }>(` SELECT CASE WHEN type='EMITIDO' THEN regimen_fiscal_emisor ELSE regimen_fiscal_receptor END AS regimen, CASE WHEN type='EMITIDO' THEN 'E' ELSE 'R' END AS direction, COUNT(*) FILTER (WHERE status = 'Vigente') AS vigentes, COUNT(*) FILTER (WHERE status IN ('Cancelado','0')) AS cancelados FROM cfdis WHERE EXTRACT(YEAR FROM (CASE WHEN tipo_comprobante='P' THEN (fecha_pago_p - interval '1 hour') ELSE COALESCE(fecha_efectiva, fecha_emision - interval '1 hour') END)) = $1 AND EXTRACT(MONTH FROM (CASE WHEN tipo_comprobante='P' THEN (fecha_pago_p - interval '1 hour') ELSE COALESCE(fecha_efectiva, fecha_emision - interval '1 hour') END)) = $2 AND contribuyente_id = $3 GROUP BY 1, 2 `, [anio, mes, safeContrib]); // Indexa counts por régimen para lookup const emitidosPorReg = new Map(); const recibidosPorReg = new Map(); for (const r of countsRows) { const target = r.direction === 'E' ? emitidosPorReg : recibidosPorReg; target.set(r.regimen, { vigentes: Number(r.vigentes) || 0, cancelados: Number(r.cancelados) || 0, }); } // Régimenes a procesar = unión de los que aparecen en ingresos, egresos, // NCs emitidas/recibidas o counts. const regimenes = new Set(); ingresos.porRegimen.forEach(r => regimenes.add(r.regimenClave)); egresos.porRegimen.forEach(r => regimenes.add(r.regimenClave)); ncsEmitidas.porRegimen.forEach(r => regimenes.add(r.regimenClave)); ncsRecibidas.porRegimen.forEach(r => regimenes.add(r.regimenClave)); noDeducibles.porRegimen.forEach(r => regimenes.add(r.regimenClave)); emitidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); }); recibidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); }); // (DELETE ya se hizo al inicio, ver comentario arriba.) if (regimenes.size === 0) { return { filasEscritas: 0 }; } let filasEscritas = 0; for (const regimen of regimenes) { const ing = ingresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const egr = egresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const ivaTras = resumenIva.trasladadoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const ivaAcr = resumenIva.acreditablePorRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const ivaRet = resumenIva.retenidoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const ivaResultado = ivaTras - ivaAcr - ivaRet; const emitidos = emitidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 }; const recibidos = recibidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 }; const ncsEm = ncsEmitidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const ncsRec = ncsRecibidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0; const noDed = noDeducibles.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0; await upsertMetricaMensual(pool, contribuyenteId, anio, mes, regimen, { ivaTrasladadoTotal: ivaTras, ivaAcreditable: ivaAcr, ivaRetenidoCobrado: ivaRet, ivaResultado, ingresosCobrados: ing, egresosPagados: egr, utilidadRealizada: ing - egr, flujoEntradas: ing, flujoSalidas: egr, flujoNeto: ing - egr, ncsEmitidas: ncsEm, ncsRecibidas: ncsRec, gastosNoDeduciblesEfectivo: noDed, cfdisEmitidosCount: emitidos.vigentes, cfdisRecibidosCount: recibidos.vigentes, cfdisCanceladosCount: emitidos.cancelados + recibidos.cancelados, }); filasEscritas++; } return { filasEscritas }; } // ─────────────────────────────────────────────────────────────── // Backfill completo para un tenant // ─────────────────────────────────────────────────────────────── export interface BackfillOptions { /** Si es true, solo hace dry-run (log), no escribe. */ dryRun?: boolean; /** Año desde el cual backfillear. Default: año del CFDI más antiguo. */ desdeAnio?: number; /** Año hasta el cual (inclusive). Default: año actual - 1 (el año actual se calcula on-the-fly). */ hastaAnio?: number; } export interface BackfillResult { tenantId: string; contribuyentesProcesados: number; mesesProcesados: number; filasEscritas: number; errores: Array<{ contribuyenteId: string; anio: number; mes: number; error: string }>; } /** * Itera contribuyentes × años × meses y llena `metricas_mensuales`. Diseñado * para correrse una vez (bootstrap) o ad-hoc cuando se detecten huecos. */ export async function backfillTenant( tenantId: string, opts: BackfillOptions = {}, ): Promise { const tenant = await prisma.tenant.findUnique({ where: { id: tenantId }, select: { databaseName: true, rfc: true }, }); if (!tenant) throw new Error(`Tenant ${tenantId} no encontrado`); const pool = await tenantDb.getPool(tenantId, tenant.databaseName); const { rows: contribs } = await pool.query<{ entidad_id: string }>( `SELECT entidad_id FROM contribuyentes`, ); if (contribs.length === 0) { return { tenantId, contribuyentesProcesados: 0, mesesProcesados: 0, filasEscritas: 0, errores: [], }; } const currentYear = new Date().getFullYear(); const hastaAnio = opts.hastaAnio ?? currentYear - 1; // Para cada contribuyente, determinar rango de años desde el CFDI más antiguo const result: BackfillResult = { tenantId, contribuyentesProcesados: 0, mesesProcesados: 0, filasEscritas: 0, errores: [], }; for (const c of contribs) { const { rows: [rango] } = await pool.query<{ min_anio: number | null }>( `SELECT EXTRACT(YEAR FROM MIN(fecha_emision - interval '1 hour'))::int AS min_anio FROM cfdis WHERE contribuyente_id = $1`, [c.entidad_id], ); if (!rango?.min_anio) continue; // sin CFDIs, skip const desdeAnio = opts.desdeAnio ?? rango.min_anio; result.contribuyentesProcesados++; for (let anio = desdeAnio; anio <= hastaAnio; anio++) { for (let mes = 1; mes <= 12; mes++) { result.mesesProcesados++; if (opts.dryRun) continue; try { const { filasEscritas } = await computeMetricaMensual(pool, tenantId, c.entidad_id, anio, mes); result.filasEscritas += filasEscritas; } catch (err: any) { result.errores.push({ contribuyenteId: c.entidad_id, anio, mes, error: err?.message || String(err), }); } } } } return result; } // ─────────────────────────────────────────────────────────────── // Procesamiento de invalidaciones (cron) // ─────────────────────────────────────────────────────────────── export interface ProcessResult { procesadas: number; filasEscritas: number; errores: number; } /** * Lee `metricas_invalidaciones`, recomputa cada (contribuyente, anio, mes) * marcado, y limpia la invalidación al terminar. Fail-safe: si una entrada * falla, loguea y continúa con la siguiente. */ export async function processInvalidations(pool: Pool, tenantId: string): Promise { const pending = await getPendingInvalidations(pool); if (pending.length === 0) return { procesadas: 0, filasEscritas: 0, errores: 0 }; let procesadas = 0; let filasEscritas = 0; let errores = 0; for (const inv of pending) { try { const { filasEscritas: fe } = await computeMetricaMensual( pool, tenantId, inv.contribuyenteId, inv.anio, inv.mes, ); filasEscritas += fe; await clearInvalidation(pool, inv.contribuyenteId, inv.anio, inv.mes); procesadas++; } catch (err: any) { console.error( `[Metricas] Error computando (tenant=${tenantId}, contrib=${inv.contribuyenteId}, ${inv.anio}-${String(inv.mes).padStart(2, '0')}):`, err?.message || err, ); errores++; } } return { procesadas, filasEscritas, errores }; } /** * Itera todos los tenants activos y procesa sus invalidaciones pendientes. * Usado por el cron job. */ export async function processAllTenantsInvalidations(): Promise<{ tenantsRevisados: number; totalProcesadas: number; totalFilasEscritas: number; totalErrores: number; }> { const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true, databaseName: true }, }); let totalProcesadas = 0; let totalFilasEscritas = 0; let totalErrores = 0; for (const t of tenants) { try { const pool = await tenantDb.getPool(t.id, t.databaseName); const r = await processInvalidations(pool, t.id); totalProcesadas += r.procesadas; totalFilasEscritas += r.filasEscritas; totalErrores += r.errores; } catch (err: any) { console.error(`[Metricas] Error procesando tenant ${t.id}:`, err?.message || err); totalErrores++; } } return { tenantsRevisados: tenants.length, totalProcesadas, totalFilasEscritas, totalErrores, }; }