Initial commit - Horux Despachos NL
This commit is contained in:
342
apps/api/src/services/metricas-compute.service.ts
Normal file
342
apps/api/src/services/metricas-compute.service.ts
Normal file
@@ -0,0 +1,342 @@
|
||||
import type { Pool } from 'pg';
|
||||
import { prisma, tenantDb } from '../config/database.js';
|
||||
import {
|
||||
calcularIngresosPorRegimen,
|
||||
calcularEgresosPorRegimen,
|
||||
calcularNcsEmitidasPorRegimen,
|
||||
calcularNcsRecibidasPorRegimen,
|
||||
calcularGastosNoDeduciblesEfectivoPorRegimen,
|
||||
} from './dashboard.service.js';
|
||||
import { getResumenIva } from './impuestos.service.js';
|
||||
import {
|
||||
upsertMetricaMensual,
|
||||
getPendingInvalidations,
|
||||
clearInvalidation,
|
||||
} from './metricas.service.js';
|
||||
|
||||
/**
|
||||
* Tanda A — Cimientos del sistema hot/cold de métricas pre-calculadas.
|
||||
*
|
||||
* Este módulo calcula las métricas mensuales agregando desde `cfdis` raw, y las
|
||||
* guarda en la tabla `metricas_mensuales` del tenant para que los consumers las
|
||||
* lean sin recomputar. Los consumers aún NO leen de la tabla (Tanda B), esto
|
||||
* solo llena y mantiene la tabla.
|
||||
*
|
||||
* Alcance Tanda A (campos poblados):
|
||||
* - ingresos_cobrados, egresos_pagados (flujo efectivo, respeta grupo de régimen)
|
||||
* - iva_trasladado_total, iva_acreditable, iva_retenido_cobrado, iva_resultado
|
||||
* - utilidad_realizada, flujo_entradas/salidas/neto
|
||||
* - cfdis_emitidos_count, cfdis_recibidos_count, cfdis_cancelados_count
|
||||
*
|
||||
* Fuera de alcance Tanda A (quedan en 0 — iteraciones futuras):
|
||||
* - Desglose IVA por tasa (16/8/0/exento)
|
||||
* - ISR causado/retenido/a_pagar (requiere tablas progresivas + coeficiente)
|
||||
* - IEPS trasladado/acreditable
|
||||
* - CxC/CxP saldo final + counts
|
||||
* - ingresos_devengados, egresos_devengados, utilidad_devengada (split PF vs PM)
|
||||
*/
|
||||
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
// Compute para UN (contribuyente, anio, mes)
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Computa y hace upsert de métricas mensuales para un contribuyente en un mes.
|
||||
* Crea una fila por cada régimen detectado en los CFDIs del mes. Si no hay
|
||||
* CFDIs en el mes, no inserta nada (filas ausentes = mes sin actividad).
|
||||
*/
|
||||
export async function computeMetricaMensual(
|
||||
pool: Pool,
|
||||
tenantId: string,
|
||||
contribuyenteId: string,
|
||||
anio: number,
|
||||
mes: number,
|
||||
): Promise<{ filasEscritas: number }> {
|
||||
const safeContrib = contribuyenteId.replace(/[^a-f0-9-]/gi, '');
|
||||
const fi = `${anio}-${String(mes).padStart(2, '0')}-01`;
|
||||
const lastDay = new Date(anio, mes, 0).getDate();
|
||||
const ff = `${anio}-${String(mes).padStart(2, '0')}-${String(lastDay).padStart(2, '0')}`;
|
||||
|
||||
// DELETE del cache del periodo ANTES de llamar a calcular{Ingresos,Egresos}.
|
||||
// Crítico: esas funciones hacen read-through cache, así que si encuentran
|
||||
// filas en metricas_mensuales leen valores viejos y el recompute propaga
|
||||
// datos stale. Al borrar primero, el read-through no encuentra nada y cae
|
||||
// al path on-the-fly (que es lo que queremos al recomputar).
|
||||
await pool.query(
|
||||
`DELETE FROM metricas_mensuales WHERE contribuyente_id = $1 AND anio = $2 AND mes = $3`,
|
||||
[safeContrib, anio, mes],
|
||||
);
|
||||
|
||||
// Reusa la lógica canónica de los servicios existentes. Paso `_ignorados=[]`
|
||||
// para que NO filtre régimenes ignorados por el tenant — en la tabla
|
||||
// almacenamos todos los datos; el consumer decide si filtrar ignorados.
|
||||
const [ingresos, egresos, resumenIva, ncsEmitidas, ncsRecibidas, noDeducibles] = await Promise.all([
|
||||
calcularIngresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||||
calcularEgresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||||
getResumenIva(pool, fi, ff, tenantId, false, contribuyenteId),
|
||||
calcularNcsEmitidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||||
calcularNcsRecibidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||||
calcularGastosNoDeduciblesEfectivoPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||||
]);
|
||||
|
||||
// Counts de CFDIs del mes (por régimen, usando FECHA_EFECTIVA del fix P)
|
||||
const { rows: countsRows } = await pool.query<{
|
||||
regimen: string | null;
|
||||
direction: 'E' | 'R';
|
||||
vigentes: string;
|
||||
cancelados: string;
|
||||
}>(`
|
||||
SELECT
|
||||
CASE WHEN type='EMITIDO' THEN regimen_fiscal_emisor ELSE regimen_fiscal_receptor END AS regimen,
|
||||
CASE WHEN type='EMITIDO' THEN 'E' ELSE 'R' END AS direction,
|
||||
COUNT(*) FILTER (WHERE status = 'Vigente') AS vigentes,
|
||||
COUNT(*) FILTER (WHERE status IN ('Cancelado','0')) AS cancelados
|
||||
FROM cfdis
|
||||
WHERE EXTRACT(YEAR FROM (CASE WHEN tipo_comprobante='P' THEN fecha_pago_p ELSE fecha_emision END)) = $1
|
||||
AND EXTRACT(MONTH FROM (CASE WHEN tipo_comprobante='P' THEN fecha_pago_p ELSE fecha_emision END)) = $2
|
||||
AND contribuyente_id = $3
|
||||
GROUP BY 1, 2
|
||||
`, [anio, mes, safeContrib]);
|
||||
|
||||
// Indexa counts por régimen para lookup
|
||||
const emitidosPorReg = new Map<string | null, { vigentes: number; cancelados: number }>();
|
||||
const recibidosPorReg = new Map<string | null, { vigentes: number; cancelados: number }>();
|
||||
for (const r of countsRows) {
|
||||
const target = r.direction === 'E' ? emitidosPorReg : recibidosPorReg;
|
||||
target.set(r.regimen, {
|
||||
vigentes: Number(r.vigentes) || 0,
|
||||
cancelados: Number(r.cancelados) || 0,
|
||||
});
|
||||
}
|
||||
|
||||
// Régimenes a procesar = unión de los que aparecen en ingresos, egresos,
|
||||
// NCs emitidas/recibidas o counts.
|
||||
const regimenes = new Set<string>();
|
||||
ingresos.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||||
egresos.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||||
ncsEmitidas.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||||
ncsRecibidas.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||||
noDeducibles.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||||
emitidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); });
|
||||
recibidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); });
|
||||
|
||||
// (DELETE ya se hizo al inicio, ver comentario arriba.)
|
||||
if (regimenes.size === 0) {
|
||||
return { filasEscritas: 0 };
|
||||
}
|
||||
|
||||
let filasEscritas = 0;
|
||||
for (const regimen of regimenes) {
|
||||
const ing = ingresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const egr = egresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const ivaTras = resumenIva.trasladadoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const ivaAcr = resumenIva.acreditablePorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const ivaRet = resumenIva.retenidoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const ivaResultado = ivaTras - ivaAcr - ivaRet;
|
||||
const emitidos = emitidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 };
|
||||
const recibidos = recibidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 };
|
||||
const ncsEm = ncsEmitidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const ncsRec = ncsRecibidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
const noDed = noDeducibles.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||||
|
||||
await upsertMetricaMensual(pool, contribuyenteId, anio, mes, regimen, {
|
||||
ivaTrasladadoTotal: ivaTras,
|
||||
ivaAcreditable: ivaAcr,
|
||||
ivaRetenidoCobrado: ivaRet,
|
||||
ivaResultado,
|
||||
ingresosCobrados: ing,
|
||||
egresosPagados: egr,
|
||||
utilidadRealizada: ing - egr,
|
||||
flujoEntradas: ing,
|
||||
flujoSalidas: egr,
|
||||
flujoNeto: ing - egr,
|
||||
ncsEmitidas: ncsEm,
|
||||
ncsRecibidas: ncsRec,
|
||||
gastosNoDeduciblesEfectivo: noDed,
|
||||
cfdisEmitidosCount: emitidos.vigentes,
|
||||
cfdisRecibidosCount: recibidos.vigentes,
|
||||
cfdisCanceladosCount: emitidos.cancelados + recibidos.cancelados,
|
||||
});
|
||||
filasEscritas++;
|
||||
}
|
||||
|
||||
return { filasEscritas };
|
||||
}
|
||||
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
// Backfill completo para un tenant
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
|
||||
export interface BackfillOptions {
|
||||
/** Si es true, solo hace dry-run (log), no escribe. */
|
||||
dryRun?: boolean;
|
||||
/** Año desde el cual backfillear. Default: año del CFDI más antiguo. */
|
||||
desdeAnio?: number;
|
||||
/** Año hasta el cual (inclusive). Default: año actual - 1 (el año actual se calcula on-the-fly). */
|
||||
hastaAnio?: number;
|
||||
}
|
||||
|
||||
export interface BackfillResult {
|
||||
tenantId: string;
|
||||
contribuyentesProcesados: number;
|
||||
mesesProcesados: number;
|
||||
filasEscritas: number;
|
||||
errores: Array<{ contribuyenteId: string; anio: number; mes: number; error: string }>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Itera contribuyentes × años × meses y llena `metricas_mensuales`. Diseñado
|
||||
* para correrse una vez (bootstrap) o ad-hoc cuando se detecten huecos.
|
||||
*/
|
||||
export async function backfillTenant(
|
||||
tenantId: string,
|
||||
opts: BackfillOptions = {},
|
||||
): Promise<BackfillResult> {
|
||||
const tenant = await prisma.tenant.findUnique({
|
||||
where: { id: tenantId },
|
||||
select: { databaseName: true, rfc: true },
|
||||
});
|
||||
if (!tenant) throw new Error(`Tenant ${tenantId} no encontrado`);
|
||||
|
||||
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
|
||||
|
||||
const { rows: contribs } = await pool.query<{ entidad_id: string }>(
|
||||
`SELECT entidad_id FROM contribuyentes`,
|
||||
);
|
||||
if (contribs.length === 0) {
|
||||
return {
|
||||
tenantId,
|
||||
contribuyentesProcesados: 0,
|
||||
mesesProcesados: 0,
|
||||
filasEscritas: 0,
|
||||
errores: [],
|
||||
};
|
||||
}
|
||||
|
||||
const currentYear = new Date().getFullYear();
|
||||
const hastaAnio = opts.hastaAnio ?? currentYear - 1;
|
||||
|
||||
// Para cada contribuyente, determinar rango de años desde el CFDI más antiguo
|
||||
const result: BackfillResult = {
|
||||
tenantId,
|
||||
contribuyentesProcesados: 0,
|
||||
mesesProcesados: 0,
|
||||
filasEscritas: 0,
|
||||
errores: [],
|
||||
};
|
||||
|
||||
for (const c of contribs) {
|
||||
const { rows: [rango] } = await pool.query<{ min_anio: number | null }>(
|
||||
`SELECT EXTRACT(YEAR FROM MIN(fecha_emision))::int AS min_anio
|
||||
FROM cfdis WHERE contribuyente_id = $1`,
|
||||
[c.entidad_id],
|
||||
);
|
||||
if (!rango?.min_anio) continue; // sin CFDIs, skip
|
||||
|
||||
const desdeAnio = opts.desdeAnio ?? rango.min_anio;
|
||||
result.contribuyentesProcesados++;
|
||||
|
||||
for (let anio = desdeAnio; anio <= hastaAnio; anio++) {
|
||||
for (let mes = 1; mes <= 12; mes++) {
|
||||
result.mesesProcesados++;
|
||||
if (opts.dryRun) continue;
|
||||
try {
|
||||
const { filasEscritas } = await computeMetricaMensual(pool, tenantId, c.entidad_id, anio, mes);
|
||||
result.filasEscritas += filasEscritas;
|
||||
} catch (err: any) {
|
||||
result.errores.push({
|
||||
contribuyenteId: c.entidad_id,
|
||||
anio,
|
||||
mes,
|
||||
error: err?.message || String(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
// Procesamiento de invalidaciones (cron)
|
||||
// ───────────────────────────────────────────────────────────────
|
||||
|
||||
export interface ProcessResult {
|
||||
procesadas: number;
|
||||
filasEscritas: number;
|
||||
errores: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Lee `metricas_invalidaciones`, recomputa cada (contribuyente, anio, mes)
|
||||
* marcado, y limpia la invalidación al terminar. Fail-safe: si una entrada
|
||||
* falla, loguea y continúa con la siguiente.
|
||||
*/
|
||||
export async function processInvalidations(pool: Pool, tenantId: string): Promise<ProcessResult> {
|
||||
const pending = await getPendingInvalidations(pool);
|
||||
if (pending.length === 0) return { procesadas: 0, filasEscritas: 0, errores: 0 };
|
||||
|
||||
let procesadas = 0;
|
||||
let filasEscritas = 0;
|
||||
let errores = 0;
|
||||
|
||||
for (const inv of pending) {
|
||||
try {
|
||||
const { filasEscritas: fe } = await computeMetricaMensual(
|
||||
pool, tenantId, inv.contribuyenteId, inv.anio, inv.mes,
|
||||
);
|
||||
filasEscritas += fe;
|
||||
await clearInvalidation(pool, inv.contribuyenteId, inv.anio, inv.mes);
|
||||
procesadas++;
|
||||
} catch (err: any) {
|
||||
console.error(
|
||||
`[Metricas] Error computando (tenant=${tenantId}, contrib=${inv.contribuyenteId}, ${inv.anio}-${String(inv.mes).padStart(2, '0')}):`,
|
||||
err?.message || err,
|
||||
);
|
||||
errores++;
|
||||
}
|
||||
}
|
||||
|
||||
return { procesadas, filasEscritas, errores };
|
||||
}
|
||||
|
||||
/**
|
||||
* Itera todos los tenants activos y procesa sus invalidaciones pendientes.
|
||||
* Usado por el cron job.
|
||||
*/
|
||||
export async function processAllTenantsInvalidations(): Promise<{
|
||||
tenantsRevisados: number;
|
||||
totalProcesadas: number;
|
||||
totalFilasEscritas: number;
|
||||
totalErrores: number;
|
||||
}> {
|
||||
const tenants = await prisma.tenant.findMany({
|
||||
where: { active: true },
|
||||
select: { id: true, databaseName: true },
|
||||
});
|
||||
|
||||
let totalProcesadas = 0;
|
||||
let totalFilasEscritas = 0;
|
||||
let totalErrores = 0;
|
||||
|
||||
for (const t of tenants) {
|
||||
try {
|
||||
const pool = await tenantDb.getPool(t.id, t.databaseName);
|
||||
const r = await processInvalidations(pool, t.id);
|
||||
totalProcesadas += r.procesadas;
|
||||
totalFilasEscritas += r.filasEscritas;
|
||||
totalErrores += r.errores;
|
||||
} catch (err: any) {
|
||||
console.error(`[Metricas] Error procesando tenant ${t.id}:`, err?.message || err);
|
||||
totalErrores++;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
tenantsRevisados: tenants.length,
|
||||
totalProcesadas,
|
||||
totalFilasEscritas,
|
||||
totalErrores,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user