From 63908f9e9d890c4b5c68e40f87a53b55aa4d0773 Mon Sep 17 00:00:00 2001 From: Horux Dev Date: Sun, 14 Jun 2026 04:07:11 +0000 Subject: [PATCH] =?UTF-8?q?feat(sat):=20agregar=20cron=20de=20recuperaci?= =?UTF-8?q?=C3=B3n=20diaria=20a=20las=2010:00=20AM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Revisa si el sync diario falló o si hay CFDIs vigentes sin xml_original. - Si detecta facturas incompletas, lanza un sync initial con rango extendido (desde un mes antes de la factura incompleta más antigua hasta ayer). - Corre secuencialmente por contribuyente para no saturar al SAT. - Incluye soporte para tenants legacy sin contribuyentes. --- apps/api/src/jobs/sat-sync.job.ts | 160 ++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/apps/api/src/jobs/sat-sync.job.ts b/apps/api/src/jobs/sat-sync.job.ts index 6cea14b..b36a97c 100644 --- a/apps/api/src/jobs/sat-sync.job.ts +++ b/apps/api/src/jobs/sat-sync.job.ts @@ -9,8 +9,10 @@ import { resetExpiredMonthlyTimbres } from '../services/facturapi.service.js'; import { purgeDeclaracionesAntiguas } from '../services/declaraciones.service.js'; import { consultarConstancia, purgeConstanciasAntiguas } from '../services/constancia.service.js'; import { tenantDb } from '../config/database.js'; +import type { Pool } from 'pg'; const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días +const RECOVERY_CRON_SCHEDULE = '0 10 * * *'; // 10:00 AM todos los días const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas const OPINION_CRON_SCHEDULE = '0 4 * * 0'; // Sundays 4:00 AM const CSF_CRON_SCHEDULE = '0 4 1 * *'; // Día 1 de cada mes 04:00 AM (CSF mensual) @@ -20,6 +22,7 @@ const EXPIRY_REMINDERS_CRON = '0 9 * * *'; // 9:00 AM diario — avisos p let isRunning = false; let isIncrementalRunning = false; +let isRecoveryRunning = false; /** * Verifica si un tenant tiene FIEL a nivel tenant (legacy Horux 360) @@ -388,8 +391,147 @@ async function runCsfJob(): Promise { console.log(`[CSF Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`); } +function getYesterdayEnd(): Date { + const now = new Date(); + return new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1, 23, 59, 59); +} + +async function hasIncompleteCfdis(pool: Pool, contribuyenteId: string): Promise { + const { rows } = await pool.query<{ count: string }>(` + SELECT COUNT(*)::text as count + FROM cfdis + WHERE contribuyente_id = $1 + AND status = 'Vigente' + AND tipo_comprobante IN ('I', 'E') + AND xml_original IS NULL + `, [contribuyenteId]); + return Number(rows[0]?.count || 0) > 0; +} + +async function getOldestIncompleteCfdiDate(pool: Pool, contribuyenteId: string): Promise { + const { rows } = await pool.query<{ fecha_emision: Date | null }>(` + SELECT MIN(fecha_emision) as fecha_emision + FROM cfdis + WHERE contribuyente_id = $1 + AND status = 'Vigente' + AND tipo_comprobante IN ('I', 'E') + AND xml_original IS NULL + `, [contribuyenteId]); + return rows[0]?.fecha_emision || null; +} + +async function waitForRecoveryJob(jobId: string): Promise { + while (true) { + const job = await prisma.satSyncJob.findUnique({ where: { id: jobId } }); + if (!job || job.status === 'completed' || job.status === 'failed') { + return; + } + await new Promise(resolve => setTimeout(resolve, 60000)); + } +} + +async function recoverContribuyente(tenantId: string, databaseName: string, contribuyenteId: string): Promise { + try { + const status = await getSyncStatus(tenantId, contribuyenteId); + if (status.hasActiveSync) { + console.log(`[SAT Recovery] ${contribuyenteId} tiene sync activo, omitiendo`); + return; + } + + const pool = await tenantDb.getPool(tenantId, databaseName); + const hasIncomplete = await hasIncompleteCfdis(pool, contribuyenteId); + + const lastDaily = await prisma.satSyncJob.findFirst({ + where: { tenantId, contribuyenteId, type: 'daily' }, + orderBy: { startedAt: 'desc' }, + }); + + if (!hasIncomplete && lastDaily?.status !== 'failed') { + return; + } + + const dateTo = getYesterdayEnd(); + let dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate()); + + if (hasIncomplete) { + const oldest = await getOldestIncompleteCfdiDate(pool, contribuyenteId); + if (oldest) { + dateFrom = new Date(oldest.getFullYear(), oldest.getMonth(), 1); + dateFrom.setMonth(dateFrom.getMonth() - 1); + } + } + + console.log(`[SAT Recovery] Recuperando ${contribuyenteId}: ${dateFrom.toISOString()} → ${dateTo.toISOString()}`); + const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo, contribuyenteId); + console.log(`[SAT Recovery] Job ${jobId} iniciado`); + + await waitForRecoveryJob(jobId); + } catch (error: any) { + console.error(`[SAT Recovery] Error recuperando ${contribuyenteId}:`, error.message); + } +} + +async function recoverTenant(tenantId: string): Promise { + const tenant = await prisma.tenant.findUnique({ + where: { id: tenantId }, + select: { databaseName: true }, + }); + if (!tenant?.databaseName) return; + + const pool = await tenantDb.getPool(tenantId, tenant.databaseName); + const { rows } = await pool.query<{ entidad_id: string }>('SELECT entidad_id FROM contribuyentes'); + const contribuyenteIds = rows.map(r => r.entidad_id); + + if (contribuyenteIds.length === 0) { + const status = await getSyncStatus(tenantId); + if (status.hasActiveSync) return; + const lastDaily = await prisma.satSyncJob.findFirst({ + where: { tenantId, contribuyenteId: null, type: 'daily' }, + orderBy: { startedAt: 'desc' }, + }); + if (lastDaily?.status === 'failed') { + const dateTo = getYesterdayEnd(); + const dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate()); + console.log(`[SAT Recovery] Recuperando tenant legacy ${tenantId}`); + const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo); + await waitForRecoveryJob(jobId); + } + return; + } + + for (const contribuyenteId of contribuyenteIds) { + await recoverContribuyente(tenantId, tenant.databaseName, contribuyenteId); + } +} + +async function runRecoverySyncJob(): Promise { + if (isRecoveryRunning) { + console.log('[SAT Recovery] Ya en ejecución, omitiendo'); + return; + } + + isRecoveryRunning = true; + console.log('[SAT Recovery] Iniciando job de recuperación'); + + try { + const tenantIds = await getTenantsWithFiel(); + console.log(`[SAT Recovery] ${tenantIds.length} tenants con FIEL`); + + for (const tenantId of tenantIds) { + await recoverTenant(tenantId); + } + + console.log('[SAT Recovery] Job de recuperación completado'); + } catch (error: any) { + console.error('[SAT Recovery] Error:', error.message); + } finally { + isRecoveryRunning = false; + } +} + let scheduledTask: ReturnType | null = null; let retryTask: ReturnType | null = null; +let recoveryTask: ReturnType | null = null; let opinionTask: ReturnType | null = null; let csfTask: ReturnType | null = null; let incrementalTask: ReturnType | null = null; @@ -430,6 +572,19 @@ export function startSatSyncJob(): void { timezone: 'America/Mexico_City', }); + // Cron de recuperación: 10:00 AM diario. Revisa si el sync diario falló o si + // hay CFDIs vigentes sin XML, y relanza un sync `initial` con rango extendido + // para completar los XML faltantes. + recoveryTask = cron.schedule(RECOVERY_CRON_SCHEDULE, async () => { + try { + await runRecoverySyncJob(); + } catch (error: any) { + console.error('[SAT Recovery Cron] Error:', error.message); + } + }, { + timezone: 'America/Mexico_City', + }); + // Cron watchdog: cada 2h marca como `failed` los jobs que quedaron stale // (pending con nextRetryAt > 12h atrás, running con startedAt > 4h atrás). // Thresholds sobreescribibles vía env (STALE_PENDING_HOURS / STALE_RUNNING_HOURS) @@ -535,6 +690,7 @@ export function startSatSyncJob(): void { console.log(`[SAT Cron] Job programado para: ${SYNC_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[SAT Cron] Retry programado cada hora`); + console.log(`[SAT Recovery Cron] Programado para: ${RECOVERY_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[Opinion Cron] Programado para: ${OPINION_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[CSF Cron] Programado para: ${CSF_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[SAT Cron Inc] Incremental Enterprise programado para: ${INCREMENTAL_CRON_SCHEDULE} (America/Mexico_City)`); @@ -554,6 +710,10 @@ export function stopSatSyncJob(): void { retryTask.stop(); retryTask = null; } + if (recoveryTask) { + recoveryTask.stop(); + recoveryTask = null; + } if (opinionTask) { opinionTask.stop(); opinionTask = null;