Factura Global & fecha_efectiva: - Migracion 045_factura_global.sql: periodicidad, meses_global, año_global, fecha_efectiva - sat-parser.service.ts: extrae InformacionGlobal del XML - sat.service.ts: calcFechaEfectiva con soporte bimestral (periodicidad 05) - metricas-compute, dashboard, impuestos, cfdi, export, conciliacion, alertas: reemplaza fecha_emision-1h por COALESCE(fecha_efectiva, fecha_emision-1h) - Script recalc-metricas.ts para recalculo manual Fallback datos fiscales tenant → contribuyente: - contribuyente.service.ts: fetchTenantFiscalData + mergeContribuyenteWithTenant rellena regimenFiscal, codigoPostal y domicilio cuando el contribuyente tiene el mismo RFC que el tenant y sus campos estan vacios - contribuyente.controller.ts y contribuyente-config.controller.ts: pasan req.user!.tenantId al servicio Fix critico SAT sync: - sat.service.ts: anio_global → año_global en INSERT/UPDATE de CFDIs (la migracion creo 'año_global' con tilde; el codigo usaba 'anio_global', causando fallo en 100% de inserciones de CFDI) - determineChunkMonths: salta sondeo si existe job previo con requestIds - MAX_POLL_ATTEMPTS: 45 → 500 (~8h) para syncs iniciales grandes Docs: - docs/sessions/2026-05-22-factura-global-contribuyente-fallback.md
343 lines
14 KiB
TypeScript
343 lines
14 KiB
TypeScript
import type { Pool } from 'pg';
|
||
import { prisma, tenantDb } from '../config/database.js';
|
||
import {
|
||
calcularIngresosPorRegimen,
|
||
calcularEgresosPorRegimen,
|
||
calcularNcsEmitidasPorRegimen,
|
||
calcularNcsRecibidasPorRegimen,
|
||
calcularGastosNoDeduciblesEfectivoPorRegimen,
|
||
} from './dashboard.service.js';
|
||
import { getResumenIva } from './impuestos.service.js';
|
||
import {
|
||
upsertMetricaMensual,
|
||
getPendingInvalidations,
|
||
clearInvalidation,
|
||
} from './metricas.service.js';
|
||
|
||
/**
|
||
* Tanda A — Cimientos del sistema hot/cold de métricas pre-calculadas.
|
||
*
|
||
* Este módulo calcula las métricas mensuales agregando desde `cfdis` raw, y las
|
||
* guarda en la tabla `metricas_mensuales` del tenant para que los consumers las
|
||
* lean sin recomputar. Los consumers aún NO leen de la tabla (Tanda B), esto
|
||
* solo llena y mantiene la tabla.
|
||
*
|
||
* Alcance Tanda A (campos poblados):
|
||
* - ingresos_cobrados, egresos_pagados (flujo efectivo, respeta grupo de régimen)
|
||
* - iva_trasladado_total, iva_acreditable, iva_retenido_cobrado, iva_resultado
|
||
* - utilidad_realizada, flujo_entradas/salidas/neto
|
||
* - cfdis_emitidos_count, cfdis_recibidos_count, cfdis_cancelados_count
|
||
*
|
||
* Fuera de alcance Tanda A (quedan en 0 — iteraciones futuras):
|
||
* - Desglose IVA por tasa (16/8/0/exento)
|
||
* - ISR causado/retenido/a_pagar (requiere tablas progresivas + coeficiente)
|
||
* - IEPS trasladado/acreditable
|
||
* - CxC/CxP saldo final + counts
|
||
* - ingresos_devengados, egresos_devengados, utilidad_devengada (split PF vs PM)
|
||
*/
|
||
|
||
// ───────────────────────────────────────────────────────────────
|
||
// Compute para UN (contribuyente, anio, mes)
|
||
// ───────────────────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Computa y hace upsert de métricas mensuales para un contribuyente en un mes.
|
||
* Crea una fila por cada régimen detectado en los CFDIs del mes. Si no hay
|
||
* CFDIs en el mes, no inserta nada (filas ausentes = mes sin actividad).
|
||
*/
|
||
export async function computeMetricaMensual(
|
||
pool: Pool,
|
||
tenantId: string,
|
||
contribuyenteId: string,
|
||
anio: number,
|
||
mes: number,
|
||
): Promise<{ filasEscritas: number }> {
|
||
const safeContrib = contribuyenteId.replace(/[^a-f0-9-]/gi, '');
|
||
const fi = `${anio}-${String(mes).padStart(2, '0')}-01`;
|
||
const lastDay = new Date(anio, mes, 0).getDate();
|
||
const ff = `${anio}-${String(mes).padStart(2, '0')}-${String(lastDay).padStart(2, '0')}`;
|
||
|
||
// DELETE del cache del periodo ANTES de llamar a calcular{Ingresos,Egresos}.
|
||
// Crítico: esas funciones hacen read-through cache, así que si encuentran
|
||
// filas en metricas_mensuales leen valores viejos y el recompute propaga
|
||
// datos stale. Al borrar primero, el read-through no encuentra nada y cae
|
||
// al path on-the-fly (que es lo que queremos al recomputar).
|
||
await pool.query(
|
||
`DELETE FROM metricas_mensuales WHERE contribuyente_id = $1 AND anio = $2 AND mes = $3`,
|
||
[safeContrib, anio, mes],
|
||
);
|
||
|
||
// Reusa la lógica canónica de los servicios existentes. Paso `_ignorados=[]`
|
||
// para que NO filtre régimenes ignorados por el tenant — en la tabla
|
||
// almacenamos todos los datos; el consumer decide si filtrar ignorados.
|
||
const [ingresos, egresos, resumenIva, ncsEmitidas, ncsRecibidas, noDeducibles] = await Promise.all([
|
||
calcularIngresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||
calcularEgresosPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||
getResumenIva(pool, fi, ff, tenantId, false, contribuyenteId),
|
||
calcularNcsEmitidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||
calcularNcsRecibidasPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||
calcularGastosNoDeduciblesEfectivoPorRegimen(pool, tenantId, fi, ff, [], undefined, false, contribuyenteId),
|
||
]);
|
||
|
||
// Counts de CFDIs del mes (por régimen, usando FECHA_EFECTIVA del fix P)
|
||
const { rows: countsRows } = await pool.query<{
|
||
regimen: string | null;
|
||
direction: 'E' | 'R';
|
||
vigentes: string;
|
||
cancelados: string;
|
||
}>(`
|
||
SELECT
|
||
CASE WHEN type='EMITIDO' THEN regimen_fiscal_emisor ELSE regimen_fiscal_receptor END AS regimen,
|
||
CASE WHEN type='EMITIDO' THEN 'E' ELSE 'R' END AS direction,
|
||
COUNT(*) FILTER (WHERE status = 'Vigente') AS vigentes,
|
||
COUNT(*) FILTER (WHERE status IN ('Cancelado','0')) AS cancelados
|
||
FROM cfdis
|
||
WHERE EXTRACT(YEAR FROM (CASE WHEN tipo_comprobante='P' THEN (fecha_pago_p - interval '1 hour') ELSE COALESCE(fecha_efectiva, fecha_emision - interval '1 hour') END)) = $1
|
||
AND EXTRACT(MONTH FROM (CASE WHEN tipo_comprobante='P' THEN (fecha_pago_p - interval '1 hour') ELSE COALESCE(fecha_efectiva, fecha_emision - interval '1 hour') END)) = $2
|
||
AND contribuyente_id = $3
|
||
GROUP BY 1, 2
|
||
`, [anio, mes, safeContrib]);
|
||
|
||
// Indexa counts por régimen para lookup
|
||
const emitidosPorReg = new Map<string | null, { vigentes: number; cancelados: number }>();
|
||
const recibidosPorReg = new Map<string | null, { vigentes: number; cancelados: number }>();
|
||
for (const r of countsRows) {
|
||
const target = r.direction === 'E' ? emitidosPorReg : recibidosPorReg;
|
||
target.set(r.regimen, {
|
||
vigentes: Number(r.vigentes) || 0,
|
||
cancelados: Number(r.cancelados) || 0,
|
||
});
|
||
}
|
||
|
||
// Régimenes a procesar = unión de los que aparecen en ingresos, egresos,
|
||
// NCs emitidas/recibidas o counts.
|
||
const regimenes = new Set<string>();
|
||
ingresos.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||
egresos.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||
ncsEmitidas.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||
ncsRecibidas.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||
noDeducibles.porRegimen.forEach(r => regimenes.add(r.regimenClave));
|
||
emitidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); });
|
||
recibidosPorReg.forEach((_, k) => { if (k) regimenes.add(k); });
|
||
|
||
// (DELETE ya se hizo al inicio, ver comentario arriba.)
|
||
if (regimenes.size === 0) {
|
||
return { filasEscritas: 0 };
|
||
}
|
||
|
||
let filasEscritas = 0;
|
||
for (const regimen of regimenes) {
|
||
const ing = ingresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const egr = egresos.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const ivaTras = resumenIva.trasladadoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const ivaAcr = resumenIva.acreditablePorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const ivaRet = resumenIva.retenidoPorRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const ivaResultado = ivaTras - ivaAcr - ivaRet;
|
||
const emitidos = emitidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 };
|
||
const recibidos = recibidosPorReg.get(regimen) || { vigentes: 0, cancelados: 0 };
|
||
const ncsEm = ncsEmitidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const ncsRec = ncsRecibidas.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
const noDed = noDeducibles.porRegimen.find(r => r.regimenClave === regimen)?.monto || 0;
|
||
|
||
await upsertMetricaMensual(pool, contribuyenteId, anio, mes, regimen, {
|
||
ivaTrasladadoTotal: ivaTras,
|
||
ivaAcreditable: ivaAcr,
|
||
ivaRetenidoCobrado: ivaRet,
|
||
ivaResultado,
|
||
ingresosCobrados: ing,
|
||
egresosPagados: egr,
|
||
utilidadRealizada: ing - egr,
|
||
flujoEntradas: ing,
|
||
flujoSalidas: egr,
|
||
flujoNeto: ing - egr,
|
||
ncsEmitidas: ncsEm,
|
||
ncsRecibidas: ncsRec,
|
||
gastosNoDeduciblesEfectivo: noDed,
|
||
cfdisEmitidosCount: emitidos.vigentes,
|
||
cfdisRecibidosCount: recibidos.vigentes,
|
||
cfdisCanceladosCount: emitidos.cancelados + recibidos.cancelados,
|
||
});
|
||
filasEscritas++;
|
||
}
|
||
|
||
return { filasEscritas };
|
||
}
|
||
|
||
// ───────────────────────────────────────────────────────────────
|
||
// Backfill completo para un tenant
|
||
// ───────────────────────────────────────────────────────────────
|
||
|
||
export interface BackfillOptions {
|
||
/** Si es true, solo hace dry-run (log), no escribe. */
|
||
dryRun?: boolean;
|
||
/** Año desde el cual backfillear. Default: año del CFDI más antiguo. */
|
||
desdeAnio?: number;
|
||
/** Año hasta el cual (inclusive). Default: año actual - 1 (el año actual se calcula on-the-fly). */
|
||
hastaAnio?: number;
|
||
}
|
||
|
||
export interface BackfillResult {
|
||
tenantId: string;
|
||
contribuyentesProcesados: number;
|
||
mesesProcesados: number;
|
||
filasEscritas: number;
|
||
errores: Array<{ contribuyenteId: string; anio: number; mes: number; error: string }>;
|
||
}
|
||
|
||
/**
|
||
* Itera contribuyentes × años × meses y llena `metricas_mensuales`. Diseñado
|
||
* para correrse una vez (bootstrap) o ad-hoc cuando se detecten huecos.
|
||
*/
|
||
export async function backfillTenant(
|
||
tenantId: string,
|
||
opts: BackfillOptions = {},
|
||
): Promise<BackfillResult> {
|
||
const tenant = await prisma.tenant.findUnique({
|
||
where: { id: tenantId },
|
||
select: { databaseName: true, rfc: true },
|
||
});
|
||
if (!tenant) throw new Error(`Tenant ${tenantId} no encontrado`);
|
||
|
||
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
|
||
|
||
const { rows: contribs } = await pool.query<{ entidad_id: string }>(
|
||
`SELECT entidad_id FROM contribuyentes`,
|
||
);
|
||
if (contribs.length === 0) {
|
||
return {
|
||
tenantId,
|
||
contribuyentesProcesados: 0,
|
||
mesesProcesados: 0,
|
||
filasEscritas: 0,
|
||
errores: [],
|
||
};
|
||
}
|
||
|
||
const currentYear = new Date().getFullYear();
|
||
const hastaAnio = opts.hastaAnio ?? currentYear - 1;
|
||
|
||
// Para cada contribuyente, determinar rango de años desde el CFDI más antiguo
|
||
const result: BackfillResult = {
|
||
tenantId,
|
||
contribuyentesProcesados: 0,
|
||
mesesProcesados: 0,
|
||
filasEscritas: 0,
|
||
errores: [],
|
||
};
|
||
|
||
for (const c of contribs) {
|
||
const { rows: [rango] } = await pool.query<{ min_anio: number | null }>(
|
||
`SELECT EXTRACT(YEAR FROM MIN(fecha_emision - interval '1 hour'))::int AS min_anio
|
||
FROM cfdis WHERE contribuyente_id = $1`,
|
||
[c.entidad_id],
|
||
);
|
||
if (!rango?.min_anio) continue; // sin CFDIs, skip
|
||
|
||
const desdeAnio = opts.desdeAnio ?? rango.min_anio;
|
||
result.contribuyentesProcesados++;
|
||
|
||
for (let anio = desdeAnio; anio <= hastaAnio; anio++) {
|
||
for (let mes = 1; mes <= 12; mes++) {
|
||
result.mesesProcesados++;
|
||
if (opts.dryRun) continue;
|
||
try {
|
||
const { filasEscritas } = await computeMetricaMensual(pool, tenantId, c.entidad_id, anio, mes);
|
||
result.filasEscritas += filasEscritas;
|
||
} catch (err: any) {
|
||
result.errores.push({
|
||
contribuyenteId: c.entidad_id,
|
||
anio,
|
||
mes,
|
||
error: err?.message || String(err),
|
||
});
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return result;
|
||
}
|
||
|
||
// ───────────────────────────────────────────────────────────────
|
||
// Procesamiento de invalidaciones (cron)
|
||
// ───────────────────────────────────────────────────────────────
|
||
|
||
export interface ProcessResult {
|
||
procesadas: number;
|
||
filasEscritas: number;
|
||
errores: number;
|
||
}
|
||
|
||
/**
|
||
* Lee `metricas_invalidaciones`, recomputa cada (contribuyente, anio, mes)
|
||
* marcado, y limpia la invalidación al terminar. Fail-safe: si una entrada
|
||
* falla, loguea y continúa con la siguiente.
|
||
*/
|
||
export async function processInvalidations(pool: Pool, tenantId: string): Promise<ProcessResult> {
|
||
const pending = await getPendingInvalidations(pool);
|
||
if (pending.length === 0) return { procesadas: 0, filasEscritas: 0, errores: 0 };
|
||
|
||
let procesadas = 0;
|
||
let filasEscritas = 0;
|
||
let errores = 0;
|
||
|
||
for (const inv of pending) {
|
||
try {
|
||
const { filasEscritas: fe } = await computeMetricaMensual(
|
||
pool, tenantId, inv.contribuyenteId, inv.anio, inv.mes,
|
||
);
|
||
filasEscritas += fe;
|
||
await clearInvalidation(pool, inv.contribuyenteId, inv.anio, inv.mes);
|
||
procesadas++;
|
||
} catch (err: any) {
|
||
console.error(
|
||
`[Metricas] Error computando (tenant=${tenantId}, contrib=${inv.contribuyenteId}, ${inv.anio}-${String(inv.mes).padStart(2, '0')}):`,
|
||
err?.message || err,
|
||
);
|
||
errores++;
|
||
}
|
||
}
|
||
|
||
return { procesadas, filasEscritas, errores };
|
||
}
|
||
|
||
/**
|
||
* Itera todos los tenants activos y procesa sus invalidaciones pendientes.
|
||
* Usado por el cron job.
|
||
*/
|
||
export async function processAllTenantsInvalidations(): Promise<{
|
||
tenantsRevisados: number;
|
||
totalProcesadas: number;
|
||
totalFilasEscritas: number;
|
||
totalErrores: number;
|
||
}> {
|
||
const tenants = await prisma.tenant.findMany({
|
||
where: { active: true },
|
||
select: { id: true, databaseName: true },
|
||
});
|
||
|
||
let totalProcesadas = 0;
|
||
let totalFilasEscritas = 0;
|
||
let totalErrores = 0;
|
||
|
||
for (const t of tenants) {
|
||
try {
|
||
const pool = await tenantDb.getPool(t.id, t.databaseName);
|
||
const r = await processInvalidations(pool, t.id);
|
||
totalProcesadas += r.procesadas;
|
||
totalFilasEscritas += r.filasEscritas;
|
||
totalErrores += r.errores;
|
||
} catch (err: any) {
|
||
console.error(`[Metricas] Error procesando tenant ${t.id}:`, err?.message || err);
|
||
totalErrores++;
|
||
}
|
||
}
|
||
|
||
return {
|
||
tenantsRevisados: tenants.length,
|
||
totalProcesadas,
|
||
totalFilasEscritas,
|
||
totalErrores,
|
||
};
|
||
}
|