import cron from 'node-cron'; import { prisma } from '../config/database.js'; import { startSync, getSyncStatus, retryTimedOutJobs } from '../services/sat/sat.service.js'; import { sweepStaleSatJobs } from '../services/sat/sweep-stale-jobs.service.js'; import { hasFielConfigured } from '../services/fiel.service.js'; import { consultarOpinion, consultarOpinionContribuyente, limpiarOpinionesAntiguas } from '../services/opinion-cumplimiento.service.js'; import { applyPendingChanges, expireTrials } from '../services/payment/subscription.service.js'; import { resetExpiredMonthlyTimbres } from '../services/facturapi.service.js'; import { purgeDeclaracionesAntiguas } from '../services/declaraciones.service.js'; import { consultarConstancia, consultarConstanciaContribuyente, purgeConstanciasAntiguas } from '../services/constancia.service.js'; import { tenantDb } from '../config/database.js'; import { isDespachoTenant } from '@horux/shared'; const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas const OPINION_CRON_SCHEDULE = '0 4 * * 0'; // Sundays 4:00 AM const CSF_CRON_SCHEDULE = '0 4 1 * *'; // Día 1 de cada mes 04:00 AM (CSF mensual) const INCREMENTAL_CRON_SCHEDULE = '0 11,15,19 * * *'; // 11:00, 15:00 y 19:00; fuera de ese rango el daily (03:00) cubre const SUBSCRIPTION_LIFECYCLE_CRON = '30 2 * * *'; // 2:30 AM diario — aplica pending changes + expira trials let isRunning = false; let isIncrementalRunning = false; /** * Obtiene los tenants que tienen FIEL configurada y activa */ async function getTenantsWithFiel(): Promise { const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true }, }); const tenantsWithFiel: string[] = []; for (const tenant of tenants) { const hasFiel = await hasFielConfigured(tenant.id); if (hasFiel) { tenantsWithFiel.push(tenant.id); } } return tenantsWithFiel; } /** * Verifica si un tenant necesita sincronización inicial */ async function needsInitialSync(tenantId: string): Promise { const completedSync = await prisma.satSyncJob.findFirst({ where: { tenantId, type: 'initial', status: 'completed', }, }); return !completedSync; } /** * Ejecuta sincronización para un tenant */ async function syncTenant(tenantId: string): Promise { try { // Verificar si hay sync activo const status = await getSyncStatus(tenantId); if (status.hasActiveSync) { console.log(`[SAT Cron] Tenant ${tenantId} ya tiene sync activo, omitiendo`); return; } // Determinar tipo de sync const needsInitial = await needsInitialSync(tenantId); const syncType = needsInitial ? 'initial' : 'daily'; console.log(`[SAT Cron] Iniciando sync ${syncType} para tenant ${tenantId}`); const jobId = await startSync(tenantId, syncType); console.log(`[SAT Cron] Job ${jobId} iniciado para tenant ${tenantId}`); } catch (error: any) { console.error(`[SAT Cron] Error sincronizando tenant ${tenantId}:`, error.message); } } /** * Ejecuta el job de sincronización para todos los tenants */ async function runSyncJob(): Promise { if (isRunning) { console.log('[SAT Cron] Job ya en ejecución, omitiendo'); return; } isRunning = true; console.log('[SAT Cron] Iniciando job de sincronización diaria'); try { const tenantIds = await getTenantsWithFiel(); console.log(`[SAT Cron] ${tenantIds.length} tenants con FIEL configurada`); if (tenantIds.length === 0) { console.log('[SAT Cron] No hay tenants para sincronizar'); return; } // Procesar en lotes para no saturar for (let i = 0; i < tenantIds.length; i += CONCURRENT_SYNCS) { const batch = tenantIds.slice(i, i + CONCURRENT_SYNCS); await Promise.all(batch.map(syncTenant)); // Pequeña pausa entre lotes if (i + CONCURRENT_SYNCS < tenantIds.length) { await new Promise(resolve => setTimeout(resolve, 5000)); } } console.log('[SAT Cron] Job de sincronización completado'); } catch (error: any) { console.error('[SAT Cron] Error en job:', error.message); } finally { isRunning = false; } } /** * Obtiene los tenants Enterprise activos con FIEL configurada. */ async function getEnterpriseTenantsWithFiel(): Promise { const tenants = await prisma.tenant.findMany({ where: { active: true, plan: 'enterprise' }, select: { id: true }, }); const result: string[] = []; for (const tenant of tenants) { if (await hasFielConfigured(tenant.id)) { result.push(tenant.id); } } return result; } /** * Dispara una sincronización incremental (ventana de 6 horas) para un tenant. * Si el tenant ya tiene un sync activo, omite para no solapar solicitudes al SAT. * Si el tenant nunca ha hecho `initial`, omite: el incremental no debe actuar * como primera descarga — la inicial requiere correrse aparte. */ async function incrementalSyncTenant(tenantId: string): Promise { try { const status = await getSyncStatus(tenantId); if (status.hasActiveSync) { console.log(`[SAT Cron Inc] Tenant ${tenantId} con sync activo, omitiendo`); return; } const completedInitial = await prisma.satSyncJob.findFirst({ where: { tenantId, type: 'initial', status: 'completed' }, }); if (!completedInitial) { console.log(`[SAT Cron Inc] Tenant ${tenantId} sin sync inicial completado, omitiendo incremental`); return; } console.log(`[SAT Cron Inc] Iniciando incremental para tenant ${tenantId}`); const jobId = await startSync(tenantId, 'incremental'); console.log(`[SAT Cron Inc] Job ${jobId} iniciado`); } catch (error: any) { console.error(`[SAT Cron Inc] Error para tenant ${tenantId}:`, error.message); } } /** * Ejecuta el job incremental de 6 horas para todos los tenants Enterprise. */ async function runIncrementalSyncJob(): Promise { if (isIncrementalRunning) { console.log('[SAT Cron Inc] Job ya en ejecución, omitiendo'); return; } isIncrementalRunning = true; console.log('[SAT Cron Inc] Iniciando ciclo incremental Enterprise'); try { const tenantIds = await getEnterpriseTenantsWithFiel(); console.log(`[SAT Cron Inc] ${tenantIds.length} tenants Enterprise con FIEL`); if (tenantIds.length === 0) return; for (let i = 0; i < tenantIds.length; i += CONCURRENT_SYNCS) { const batch = tenantIds.slice(i, i + CONCURRENT_SYNCS); await Promise.all(batch.map(incrementalSyncTenant)); if (i + CONCURRENT_SYNCS < tenantIds.length) { await new Promise(resolve => setTimeout(resolve, 5000)); } } console.log('[SAT Cron Inc] Ciclo incremental completado'); } catch (error: any) { console.error('[SAT Cron Inc] Error en ciclo:', error.message); } finally { isIncrementalRunning = false; } } async function runOpinionJob(): Promise { console.log('[Opinion Cron] Iniciando descarga semanal de Opinión de Cumplimiento'); const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true, rfc: true, databaseName: true }, }); let success = 0; let failed = 0; let skipped = 0; for (const tenant of tenants) { const isDespacho = isDespachoTenant(tenant.rfc); if (isDespacho) { // Modo despacho: iterar contribuyentes con FIEL try { const pool = await tenantDb.getPool(tenant.id, tenant.databaseName); const { rows: contribuyentes } = await pool.query(` SELECT c.entidad_id as id, c.rfc FROM contribuyentes c JOIN fiel_contribuyente f ON f.contribuyente_id = c.entidad_id WHERE f.is_active = true `); if (contribuyentes.length === 0) { skipped++; continue; } for (const contrib of contribuyentes) { try { console.log(`[Opinion Cron] Consultando opinión para contribuyente ${contrib.rfc} (tenant ${tenant.rfc})...`); await consultarOpinionContribuyente(pool, contrib.id); success++; } catch (err: any) { console.error(`[Opinion Cron] Error para contribuyente ${contrib.rfc}:`, err.message); failed++; } } const deleted = await limpiarOpinionesAntiguas(pool); if (deleted > 0) { console.log(`[Opinion Cron] ${tenant.rfc}: ${deleted} opiniones antiguas eliminadas`); } } catch (error: any) { console.error(`[Opinion Cron] Error despacho ${tenant.rfc}:`, error.message); failed++; } continue; } // Modo legacy (Horux 360) const hasFiel = await hasFielConfigured(tenant.id); if (!hasFiel) { skipped++; continue; } try { console.log(`[Opinion Cron] Consultando opinión para ${tenant.rfc}...`); await consultarOpinion(tenant.id); success++; // Cleanup old records const pool = await tenantDb.getPool(tenant.id, tenant.databaseName); const deleted = await limpiarOpinionesAntiguas(pool); if (deleted > 0) { console.log(`[Opinion Cron] ${tenant.rfc}: ${deleted} opiniones antiguas eliminadas`); } } catch (error: any) { console.error(`[Opinion Cron] Error para ${tenant.rfc}:`, error.message); failed++; } } console.log(`[Opinion Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`); } async function runCsfJob(): Promise { console.log('[CSF Cron] Iniciando descarga mensual de Constancia de Situación Fiscal'); const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true, rfc: true, databaseName: true }, }); let success = 0; let failed = 0; let skipped = 0; for (const tenant of tenants) { const isDespacho = isDespachoTenant(tenant.rfc); if (isDespacho) { // Modo despacho: iterar contribuyentes con FIEL try { const pool = await tenantDb.getPool(tenant.id, tenant.databaseName); const { rows: contribuyentes } = await pool.query(` SELECT c.entidad_id as id, c.rfc FROM contribuyentes c JOIN fiel_contribuyente f ON f.contribuyente_id = c.entidad_id WHERE f.is_active = true `); if (contribuyentes.length === 0) { skipped++; continue; } for (const contrib of contribuyentes) { try { console.log(`[CSF Cron] Consultando CSF para contribuyente ${contrib.rfc} (tenant ${tenant.rfc})...`); await consultarConstanciaContribuyente(pool, contrib.id); success++; } catch (err: any) { console.error(`[CSF Cron] Error para contribuyente ${contrib.rfc}:`, err.message); failed++; } } } catch (error: any) { console.error(`[CSF Cron] Error despacho ${tenant.rfc}:`, error.message); failed++; } continue; } // Modo legacy (Horux 360) const hasFiel = await hasFielConfigured(tenant.id); if (!hasFiel) { skipped++; continue; } try { console.log(`[CSF Cron] Consultando CSF para ${tenant.rfc}...`); await consultarConstancia(tenant.id); success++; } catch (error: any) { console.error(`[CSF Cron] Error para ${tenant.rfc}:`, error.message); failed++; } } console.log(`[CSF Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`); } let scheduledTask: ReturnType | null = null; let retryTask: ReturnType | null = null; let opinionTask: ReturnType | null = null; let csfTask: ReturnType | null = null; let incrementalTask: ReturnType | null = null; let subscriptionTask: ReturnType | null = null; let watchdogTask: ReturnType | null = null; const RETRY_CRON_SCHEDULE = '0 * * * *'; // Cada hora const WATCHDOG_CRON_SCHEDULE = '0 */2 * * *'; // Cada 2 horas — marca stale jobs como failed /** * Inicia el job programado */ export function startSatSyncJob(): void { if (scheduledTask) { console.log('[SAT Cron] Job ya está programado'); return; } // Validar expresión cron if (!cron.validate(SYNC_CRON_SCHEDULE)) { console.error('[SAT Cron] Expresión cron inválida:', SYNC_CRON_SCHEDULE); return; } scheduledTask = cron.schedule(SYNC_CRON_SCHEDULE, runSyncJob, { timezone: 'America/Mexico_City', }); // Cron de reintentos: cada hora revisa si hay jobs pendientes de retry retryTask = cron.schedule(RETRY_CRON_SCHEDULE, async () => { try { await retryTimedOutJobs(); } catch (error: any) { console.error('[SAT Retry Cron] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); // Cron watchdog: cada 2h marca como `failed` los jobs que quedaron stale // (pending con nextRetryAt > 12h atrás, running con startedAt > 4h atrás). // Thresholds sobreescribibles vía env (STALE_PENDING_HOURS / STALE_RUNNING_HOURS) // — defaults razonables pensando en que un sync inicial típico termina // en <2h y el retryCron corre cada hora. watchdogTask = cron.schedule(WATCHDOG_CRON_SCHEDULE, async () => { try { const pendingHours = Number(process.env.STALE_PENDING_HOURS || 12); const runningHours = Number(process.env.STALE_RUNNING_HOURS || 4); const result = await sweepStaleSatJobs({ apply: true, pendingHours, runningHours }); if (result.pendingMarked + result.runningMarked > 0) { console.log(`[SAT Watchdog] Marcados failed: pending=${result.pendingMarked} running=${result.runningMarked}`); } } catch (error: any) { console.error('[SAT Watchdog] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); opinionTask = cron.schedule(OPINION_CRON_SCHEDULE, async () => { try { await runOpinionJob(); } catch (error: any) { console.error('[Opinion Cron] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); csfTask = cron.schedule(CSF_CRON_SCHEDULE, async () => { try { await runCsfJob(); } catch (error: any) { console.error('[CSF Cron] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); incrementalTask = cron.schedule(INCREMENTAL_CRON_SCHEDULE, async () => { try { await runIncrementalSyncJob(); } catch (error: any) { console.error('[SAT Cron Inc] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); subscriptionTask = cron.schedule(SUBSCRIPTION_LIFECYCLE_CRON, async () => { try { const pending = await applyPendingChanges(); const trials = await expireTrials(); // Reset mensual de TimbreSuscripcion: para cada tenant cuyo periodoFin // ya pasó, resetea usados=0 y avanza la ventana +1 mes/año. Los paquetes // adicionales NO se tocan; su expiraEn = adquiridoEn + 1 año fijo. const timbres = await resetExpiredMonthlyTimbres(); // Cleanup retención 5 años (CFF Art. 30): borra declaraciones provisionales // viejas. Iteramos cada tenant activo. Por-tenant try/catch para que un // tenant que falla no bloquee al resto. let declsBorradas = 0; let csfsBorradas = 0; const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true, databaseName: true, rfc: true }, }); for (const t of tenants) { try { const pool = await tenantDb.getPool(t.id, t.databaseName); const declResult = await purgeDeclaracionesAntiguas(pool); declsBorradas += declResult.deleted; const csfResult = await purgeConstanciasAntiguas(pool); csfsBorradas += csfResult.deleted; } catch (err: any) { console.error(`[Cleanup] Tenant ${t.rfc} fallo en purge:`, err.message || err); } } console.log(`[Subscription Cron] pending aplicados: ${pending.applied} (${pending.errors} errores), trials expirados: ${trials.expired}, timbres reseteados: ${timbres.reset}, timbres custom creados: ${timbres.customCreated}, declaraciones >5 años borradas: ${declsBorradas}, CSFs >5 años borradas: ${csfsBorradas}`); } catch (error: any) { console.error('[Subscription Cron] Error:', error.message); } }, { timezone: 'America/Mexico_City', }); console.log(`[SAT Cron] Job programado para: ${SYNC_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[SAT Cron] Retry programado cada hora`); console.log(`[Opinion Cron] Programado para: ${OPINION_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[CSF Cron] Programado para: ${CSF_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[SAT Cron Inc] Incremental Enterprise programado para: ${INCREMENTAL_CRON_SCHEDULE} (America/Mexico_City)`); console.log(`[Subscription Cron] Lifecycle programado para: ${SUBSCRIPTION_LIFECYCLE_CRON} (America/Mexico_City)`); console.log(`[SAT Watchdog] Programado para: ${WATCHDOG_CRON_SCHEDULE} (America/Mexico_City)`); } /** * Detiene el job programado */ export function stopSatSyncJob(): void { if (scheduledTask) { scheduledTask.stop(); scheduledTask = null; } if (retryTask) { retryTask.stop(); retryTask = null; } if (opinionTask) { opinionTask.stop(); opinionTask = null; } if (csfTask) { csfTask.stop(); csfTask = null; } if (incrementalTask) { incrementalTask.stop(); incrementalTask = null; } if (subscriptionTask) { subscriptionTask.stop(); subscriptionTask = null; } if (watchdogTask) { watchdogTask.stop(); watchdogTask = null; } console.log('[SAT Cron] Jobs detenidos'); } /** * Ejecuta manualmente el ciclo incremental Enterprise (para testing). */ export async function runIncrementalSyncJobManually(): Promise { await runIncrementalSyncJob(); } /** * Ejecuta el job manualmente (para testing o ejecución forzada) */ export async function runSatSyncJobManually(): Promise { await runSyncJob(); } /** * Obtiene información del próximo job programado */ export function getJobInfo(): { scheduled: boolean; expression: string; timezone: string } { return { scheduled: scheduledTask !== null, expression: SYNC_CRON_SCHEDULE, timezone: 'America/Mexico_City', }; }