feat(sat): agregar cron de recuperación diaria a las 10:00 AM
- Revisa si el sync diario falló o si hay CFDIs vigentes sin xml_original. - Si detecta facturas incompletas, lanza un sync initial con rango extendido (desde un mes antes de la factura incompleta más antigua hasta ayer). - Corre secuencialmente por contribuyente para no saturar al SAT. - Incluye soporte para tenants legacy sin contribuyentes.
This commit is contained in:
@@ -9,8 +9,10 @@ import { resetExpiredMonthlyTimbres } from '../services/facturapi.service.js';
|
|||||||
import { purgeDeclaracionesAntiguas } from '../services/declaraciones.service.js';
|
import { purgeDeclaracionesAntiguas } from '../services/declaraciones.service.js';
|
||||||
import { consultarConstancia, purgeConstanciasAntiguas } from '../services/constancia.service.js';
|
import { consultarConstancia, purgeConstanciasAntiguas } from '../services/constancia.service.js';
|
||||||
import { tenantDb } from '../config/database.js';
|
import { tenantDb } from '../config/database.js';
|
||||||
|
import type { Pool } from 'pg';
|
||||||
|
|
||||||
const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días
|
const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días
|
||||||
|
const RECOVERY_CRON_SCHEDULE = '0 10 * * *'; // 10:00 AM todos los días
|
||||||
const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas
|
const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas
|
||||||
const OPINION_CRON_SCHEDULE = '0 4 * * 0'; // Sundays 4:00 AM
|
const OPINION_CRON_SCHEDULE = '0 4 * * 0'; // Sundays 4:00 AM
|
||||||
const CSF_CRON_SCHEDULE = '0 4 1 * *'; // Día 1 de cada mes 04:00 AM (CSF mensual)
|
const CSF_CRON_SCHEDULE = '0 4 1 * *'; // Día 1 de cada mes 04:00 AM (CSF mensual)
|
||||||
@@ -20,6 +22,7 @@ const EXPIRY_REMINDERS_CRON = '0 9 * * *'; // 9:00 AM diario — avisos p
|
|||||||
|
|
||||||
let isRunning = false;
|
let isRunning = false;
|
||||||
let isIncrementalRunning = false;
|
let isIncrementalRunning = false;
|
||||||
|
let isRecoveryRunning = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifica si un tenant tiene FIEL a nivel tenant (legacy Horux 360)
|
* Verifica si un tenant tiene FIEL a nivel tenant (legacy Horux 360)
|
||||||
@@ -388,8 +391,147 @@ async function runCsfJob(): Promise<void> {
|
|||||||
console.log(`[CSF Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`);
|
console.log(`[CSF Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function getYesterdayEnd(): Date {
|
||||||
|
const now = new Date();
|
||||||
|
return new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1, 23, 59, 59);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function hasIncompleteCfdis(pool: Pool, contribuyenteId: string): Promise<boolean> {
|
||||||
|
const { rows } = await pool.query<{ count: string }>(`
|
||||||
|
SELECT COUNT(*)::text as count
|
||||||
|
FROM cfdis
|
||||||
|
WHERE contribuyente_id = $1
|
||||||
|
AND status = 'Vigente'
|
||||||
|
AND tipo_comprobante IN ('I', 'E')
|
||||||
|
AND xml_original IS NULL
|
||||||
|
`, [contribuyenteId]);
|
||||||
|
return Number(rows[0]?.count || 0) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getOldestIncompleteCfdiDate(pool: Pool, contribuyenteId: string): Promise<Date | null> {
|
||||||
|
const { rows } = await pool.query<{ fecha_emision: Date | null }>(`
|
||||||
|
SELECT MIN(fecha_emision) as fecha_emision
|
||||||
|
FROM cfdis
|
||||||
|
WHERE contribuyente_id = $1
|
||||||
|
AND status = 'Vigente'
|
||||||
|
AND tipo_comprobante IN ('I', 'E')
|
||||||
|
AND xml_original IS NULL
|
||||||
|
`, [contribuyenteId]);
|
||||||
|
return rows[0]?.fecha_emision || null;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitForRecoveryJob(jobId: string): Promise<void> {
|
||||||
|
while (true) {
|
||||||
|
const job = await prisma.satSyncJob.findUnique({ where: { id: jobId } });
|
||||||
|
if (!job || job.status === 'completed' || job.status === 'failed') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 60000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function recoverContribuyente(tenantId: string, databaseName: string, contribuyenteId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
const status = await getSyncStatus(tenantId, contribuyenteId);
|
||||||
|
if (status.hasActiveSync) {
|
||||||
|
console.log(`[SAT Recovery] ${contribuyenteId} tiene sync activo, omitiendo`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pool = await tenantDb.getPool(tenantId, databaseName);
|
||||||
|
const hasIncomplete = await hasIncompleteCfdis(pool, contribuyenteId);
|
||||||
|
|
||||||
|
const lastDaily = await prisma.satSyncJob.findFirst({
|
||||||
|
where: { tenantId, contribuyenteId, type: 'daily' },
|
||||||
|
orderBy: { startedAt: 'desc' },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!hasIncomplete && lastDaily?.status !== 'failed') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const dateTo = getYesterdayEnd();
|
||||||
|
let dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate());
|
||||||
|
|
||||||
|
if (hasIncomplete) {
|
||||||
|
const oldest = await getOldestIncompleteCfdiDate(pool, contribuyenteId);
|
||||||
|
if (oldest) {
|
||||||
|
dateFrom = new Date(oldest.getFullYear(), oldest.getMonth(), 1);
|
||||||
|
dateFrom.setMonth(dateFrom.getMonth() - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[SAT Recovery] Recuperando ${contribuyenteId}: ${dateFrom.toISOString()} → ${dateTo.toISOString()}`);
|
||||||
|
const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo, contribuyenteId);
|
||||||
|
console.log(`[SAT Recovery] Job ${jobId} iniciado`);
|
||||||
|
|
||||||
|
await waitForRecoveryJob(jobId);
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error(`[SAT Recovery] Error recuperando ${contribuyenteId}:`, error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function recoverTenant(tenantId: string): Promise<void> {
|
||||||
|
const tenant = await prisma.tenant.findUnique({
|
||||||
|
where: { id: tenantId },
|
||||||
|
select: { databaseName: true },
|
||||||
|
});
|
||||||
|
if (!tenant?.databaseName) return;
|
||||||
|
|
||||||
|
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
|
||||||
|
const { rows } = await pool.query<{ entidad_id: string }>('SELECT entidad_id FROM contribuyentes');
|
||||||
|
const contribuyenteIds = rows.map(r => r.entidad_id);
|
||||||
|
|
||||||
|
if (contribuyenteIds.length === 0) {
|
||||||
|
const status = await getSyncStatus(tenantId);
|
||||||
|
if (status.hasActiveSync) return;
|
||||||
|
const lastDaily = await prisma.satSyncJob.findFirst({
|
||||||
|
where: { tenantId, contribuyenteId: null, type: 'daily' },
|
||||||
|
orderBy: { startedAt: 'desc' },
|
||||||
|
});
|
||||||
|
if (lastDaily?.status === 'failed') {
|
||||||
|
const dateTo = getYesterdayEnd();
|
||||||
|
const dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate());
|
||||||
|
console.log(`[SAT Recovery] Recuperando tenant legacy ${tenantId}`);
|
||||||
|
const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo);
|
||||||
|
await waitForRecoveryJob(jobId);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const contribuyenteId of contribuyenteIds) {
|
||||||
|
await recoverContribuyente(tenantId, tenant.databaseName, contribuyenteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function runRecoverySyncJob(): Promise<void> {
|
||||||
|
if (isRecoveryRunning) {
|
||||||
|
console.log('[SAT Recovery] Ya en ejecución, omitiendo');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
isRecoveryRunning = true;
|
||||||
|
console.log('[SAT Recovery] Iniciando job de recuperación');
|
||||||
|
|
||||||
|
try {
|
||||||
|
const tenantIds = await getTenantsWithFiel();
|
||||||
|
console.log(`[SAT Recovery] ${tenantIds.length} tenants con FIEL`);
|
||||||
|
|
||||||
|
for (const tenantId of tenantIds) {
|
||||||
|
await recoverTenant(tenantId);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log('[SAT Recovery] Job de recuperación completado');
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('[SAT Recovery] Error:', error.message);
|
||||||
|
} finally {
|
||||||
|
isRecoveryRunning = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let scheduledTask: ReturnType<typeof cron.schedule> | null = null;
|
let scheduledTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
let retryTask: ReturnType<typeof cron.schedule> | null = null;
|
let retryTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
|
let recoveryTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
let opinionTask: ReturnType<typeof cron.schedule> | null = null;
|
let opinionTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
let csfTask: ReturnType<typeof cron.schedule> | null = null;
|
let csfTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
let incrementalTask: ReturnType<typeof cron.schedule> | null = null;
|
let incrementalTask: ReturnType<typeof cron.schedule> | null = null;
|
||||||
@@ -430,6 +572,19 @@ export function startSatSyncJob(): void {
|
|||||||
timezone: 'America/Mexico_City',
|
timezone: 'America/Mexico_City',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Cron de recuperación: 10:00 AM diario. Revisa si el sync diario falló o si
|
||||||
|
// hay CFDIs vigentes sin XML, y relanza un sync `initial` con rango extendido
|
||||||
|
// para completar los XML faltantes.
|
||||||
|
recoveryTask = cron.schedule(RECOVERY_CRON_SCHEDULE, async () => {
|
||||||
|
try {
|
||||||
|
await runRecoverySyncJob();
|
||||||
|
} catch (error: any) {
|
||||||
|
console.error('[SAT Recovery Cron] Error:', error.message);
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
timezone: 'America/Mexico_City',
|
||||||
|
});
|
||||||
|
|
||||||
// Cron watchdog: cada 2h marca como `failed` los jobs que quedaron stale
|
// Cron watchdog: cada 2h marca como `failed` los jobs que quedaron stale
|
||||||
// (pending con nextRetryAt > 12h atrás, running con startedAt > 4h atrás).
|
// (pending con nextRetryAt > 12h atrás, running con startedAt > 4h atrás).
|
||||||
// Thresholds sobreescribibles vía env (STALE_PENDING_HOURS / STALE_RUNNING_HOURS)
|
// Thresholds sobreescribibles vía env (STALE_PENDING_HOURS / STALE_RUNNING_HOURS)
|
||||||
@@ -535,6 +690,7 @@ export function startSatSyncJob(): void {
|
|||||||
|
|
||||||
console.log(`[SAT Cron] Job programado para: ${SYNC_CRON_SCHEDULE} (America/Mexico_City)`);
|
console.log(`[SAT Cron] Job programado para: ${SYNC_CRON_SCHEDULE} (America/Mexico_City)`);
|
||||||
console.log(`[SAT Cron] Retry programado cada hora`);
|
console.log(`[SAT Cron] Retry programado cada hora`);
|
||||||
|
console.log(`[SAT Recovery Cron] Programado para: ${RECOVERY_CRON_SCHEDULE} (America/Mexico_City)`);
|
||||||
console.log(`[Opinion Cron] Programado para: ${OPINION_CRON_SCHEDULE} (America/Mexico_City)`);
|
console.log(`[Opinion Cron] Programado para: ${OPINION_CRON_SCHEDULE} (America/Mexico_City)`);
|
||||||
console.log(`[CSF Cron] Programado para: ${CSF_CRON_SCHEDULE} (America/Mexico_City)`);
|
console.log(`[CSF Cron] Programado para: ${CSF_CRON_SCHEDULE} (America/Mexico_City)`);
|
||||||
console.log(`[SAT Cron Inc] Incremental Enterprise programado para: ${INCREMENTAL_CRON_SCHEDULE} (America/Mexico_City)`);
|
console.log(`[SAT Cron Inc] Incremental Enterprise programado para: ${INCREMENTAL_CRON_SCHEDULE} (America/Mexico_City)`);
|
||||||
@@ -554,6 +710,10 @@ export function stopSatSyncJob(): void {
|
|||||||
retryTask.stop();
|
retryTask.stop();
|
||||||
retryTask = null;
|
retryTask = null;
|
||||||
}
|
}
|
||||||
|
if (recoveryTask) {
|
||||||
|
recoveryTask.stop();
|
||||||
|
recoveryTask = null;
|
||||||
|
}
|
||||||
if (opinionTask) {
|
if (opinionTask) {
|
||||||
opinionTask.stop();
|
opinionTask.stop();
|
||||||
opinionTask = null;
|
opinionTask = null;
|
||||||
|
|||||||
Reference in New Issue
Block a user