Files
HoruxDespachosNuevo/apps/api/src/jobs/sat-sync.job.ts
Horux Dev 7df27ce66d chore: catálogo obligaciones, cierre automático, fixes SAT y facturación
- Catálogo de obligaciones fiscales expandido a 30 entradas con campo requierePago.
- Soporte de frecuencia cuatrimestral en obligaciones y declaraciones.
- Automatización de cierre de obligaciones fiscales desde Documentos › Declaraciones.
- Nuevas tablas obligacion_evidencias, obligacion_periodos estados y declaracion_obligaciones.
- Nuevo servicio obligacion-evidencias.service.ts y endpoints REST.
- Refactor de declaraciones.service.ts para vincular obligaciones y crear evidencias.
- Notificaciones por email para evidencias de obligaciones.
- Adjuntar PDFs en correo de declaración subida.
- Fix drill-down de CFDIs: carga completa al visualizar.
- Fix sincronización SAT: tipos P/N, UUID case-insensitive, no reutilizar requestId.
- Fix suscripciones pending en /configuracion/planes-despacho.
- Fix sugerencias de Clave Producto SAT: importar catálogo y robustecer autocomplete.
- Quitar toggle manual de completado en Configuración › Obligaciones fiscales › Tareas.
- Scripts de soporte para Demo Ventas y utilerías (change-user-email, resend-welcome, import-clave-prod-serv).
- Documentación de cambios en docs/CAMBIOS-2026-05-04.md.
2026-06-22 04:53:59 +00:00

768 lines
27 KiB
TypeScript

import cron from 'node-cron';
import { prisma } from '../config/database.js';
import { startSync, getSyncStatus, retryTimedOutJobs } from '../services/sat/sat.service.js';
import { sweepStaleSatJobs } from '../services/sat/sweep-stale-jobs.service.js';
import { hasFielConfigured } from '../services/fiel.service.js';
import { consultarOpinion, limpiarOpinionesAntiguas } from '../services/opinion-cumplimiento.service.js';
import { applyPendingChanges, expireTrials, sendExpiryReminders } from '../services/payment/subscription.service.js';
import { resetExpiredMonthlyTimbres } from '../services/facturapi.service.js';
import { purgeDeclaracionesAntiguas } from '../services/declaraciones.service.js';
import { consultarConstancia, purgeConstanciasAntiguas } from '../services/constancia.service.js';
import { tenantDb } from '../config/database.js';
import type { Pool } from 'pg';
const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días
const RECOVERY_CRON_SCHEDULE = '0 10 * * *'; // 10:00 AM todos los días
const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas
const OPINION_CRON_SCHEDULE = '0 4 * * 0'; // Sundays 4:00 AM
const CSF_CRON_SCHEDULE = '0 4 1 * *'; // Día 1 de cada mes 04:00 AM (CSF mensual)
const INCREMENTAL_CRON_SCHEDULE = '0 11,15,19 * * *'; // 11:00, 15:00 y 19:00; fuera de ese rango el daily (03:00) cubre
const SUBSCRIPTION_LIFECYCLE_CRON = '30 2 * * *'; // 2:30 AM diario — aplica pending changes + expira trials
const EXPIRY_REMINDERS_CRON = '0 9 * * *'; // 9:00 AM diario — avisos pre-vencimiento (7d/3d/1d/0d)
let isRunning = false;
let isIncrementalRunning = false;
let isRecoveryRunning = false;
/**
* Verifica si un tenant tiene FIEL a nivel tenant (legacy Horux 360)
* o a nivel contribuyente (modelo despacho).
*/
async function hasAnyFielConfigured(tenantId: string, databaseName?: string | null): Promise<boolean> {
// 1) FIEL legacy a nivel tenant
const hasLegacy = await hasFielConfigured(tenantId);
if (hasLegacy) return true;
// 2) FIEL por contribuyente (modelo despacho)
if (!databaseName) {
const tenant = await prisma.tenant.findUnique({
where: { id: tenantId },
select: { databaseName: true },
});
databaseName = tenant?.databaseName;
}
if (!databaseName) return false;
try {
const pool = await tenantDb.getPool(tenantId, databaseName);
const { rows } = await pool.query(
`SELECT 1 FROM fiel_contribuyente WHERE is_active = true LIMIT 1`
);
return rows.length > 0;
} catch (err: any) {
console.error(`[SAT Cron] Error verificando FIEL contribuyente para tenant ${tenantId}:`, err.message);
return false;
}
}
/**
* Obtiene los tenants que tienen FIEL configurada y activa
*/
async function getTenantsWithFiel(): Promise<string[]> {
const tenants = await prisma.tenant.findMany({
where: { active: true },
select: { id: true, databaseName: true },
});
const tenantsWithFiel: string[] = [];
for (const tenant of tenants) {
const hasFiel = await hasAnyFielConfigured(tenant.id, tenant.databaseName);
if (hasFiel) {
tenantsWithFiel.push(tenant.id);
}
}
return tenantsWithFiel;
}
/**
* Verifica si un tenant (o un contribuyente específico dentro del tenant)
* necesita sincronización inicial.
*/
async function needsInitialSync(tenantId: string, contribuyenteId?: string): Promise<boolean> {
const where: any = {
tenantId,
type: 'initial',
status: 'completed',
};
if (contribuyenteId) {
where.contribuyenteId = contribuyenteId;
}
const completedSync = await prisma.satSyncJob.findFirst({ where });
return !completedSync;
}
/**
* Ejecuta sincronización para un tenant y sus contribuyentes
*/
async function syncTenant(tenantId: string): Promise<void> {
try {
// Obtener contribuyentes del tenant
const tenant = await prisma.tenant.findUnique({
where: { id: tenantId },
select: { databaseName: true },
});
let contribuyenteIds: string[] = [];
if (tenant?.databaseName) {
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
const { rows } = await pool.query('SELECT entidad_id FROM contribuyentes');
contribuyenteIds = rows.map((r: any) => r.entidad_id);
}
// Si no hay contribuyentes, sincronizar a nivel tenant (legacy Horux 360)
if (contribuyenteIds.length === 0) {
const needsInitial = await needsInitialSync(tenantId);
const syncType = needsInitial ? 'initial' : 'daily';
const status = await getSyncStatus(tenantId);
if (status.hasActiveSync) {
console.log(`[SAT Cron] Tenant ${tenantId} ya tiene sync activo, omitiendo`);
return;
}
console.log(`[SAT Cron] Iniciando sync ${syncType} para tenant ${tenantId} (sin contribuyentes)`);
const jobId = await startSync(tenantId, syncType);
console.log(`[SAT Cron] Job ${jobId} iniciado para tenant ${tenantId}`);
return;
}
// Sincronizar cada contribuyente (cada uno puede necesitar su propio initial)
for (const contribuyenteId of contribuyenteIds) {
try {
const status = await getSyncStatus(tenantId, contribuyenteId);
if (status.hasActiveSync) {
console.log(`[SAT Cron] Tenant ${tenantId} contribuyente ${contribuyenteId} ya tiene sync activo, omitiendo`);
continue;
}
const needsInitial = await needsInitialSync(tenantId, contribuyenteId);
const syncType = needsInitial ? 'initial' : 'daily';
console.log(`[SAT Cron] Iniciando sync ${syncType} para tenant ${tenantId} contribuyente ${contribuyenteId}`);
const jobId = await startSync(tenantId, syncType, undefined, undefined, contribuyenteId);
console.log(`[SAT Cron] Job ${jobId} iniciado para tenant ${tenantId} contribuyente ${contribuyenteId}`);
} catch (error: any) {
console.error(`[SAT Cron] Error sincronizando tenant ${tenantId} contribuyente ${contribuyenteId}:`, error.message);
}
}
} catch (error: any) {
console.error(`[SAT Cron] Error sincronizando tenant ${tenantId}:`, error.message);
}
}
/**
* Ejecuta el job de sincronización para todos los tenants
*/
async function runSyncJob(): Promise<void> {
if (isRunning) {
console.log('[SAT Cron] Job ya en ejecución, omitiendo');
return;
}
isRunning = true;
console.log('[SAT Cron] Iniciando job de sincronización diaria');
try {
const tenantIds = await getTenantsWithFiel();
console.log(`[SAT Cron] ${tenantIds.length} tenants con FIEL configurada`);
if (tenantIds.length === 0) {
console.log('[SAT Cron] No hay tenants para sincronizar');
return;
}
// Procesar en lotes para no saturar
for (let i = 0; i < tenantIds.length; i += CONCURRENT_SYNCS) {
const batch = tenantIds.slice(i, i + CONCURRENT_SYNCS);
await Promise.all(batch.map(syncTenant));
// Pequeña pausa entre lotes
if (i + CONCURRENT_SYNCS < tenantIds.length) {
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
console.log('[SAT Cron] Job de sincronización completado');
} catch (error: any) {
console.error('[SAT Cron] Error en job:', error.message);
} finally {
isRunning = false;
}
}
/**
* Obtiene los tenants activos cuyo plan habilita SAT incremental (3 syncs/día
* adicionales al daily). El flag vive en `despacho_plan_prices.permite_sat_incremental`,
* editable por admin global desde `/configuracion/precios-suscripcion`.
* Default backfill: mi_empresa_plus, business_control, business_cloud.
*/
async function getTenantsConSatIncremental(): Promise<string[]> {
const planesIncrementales = await prisma.despachoPlanPrice.findMany({
where: { permiteSatIncremental: true },
select: { plan: true },
});
const planNames = planesIncrementales.map(p => p.plan);
if (planNames.length === 0) return [];
const tenants = await prisma.tenant.findMany({
where: { active: true, plan: { in: planNames as any } },
select: { id: true, databaseName: true },
});
const result: string[] = [];
for (const tenant of tenants) {
if (await hasAnyFielConfigured(tenant.id, tenant.databaseName)) {
result.push(tenant.id);
}
}
return result;
}
/**
* Dispara una sincronización incremental (ventana de 6 horas) para un tenant
* y sus contribuyentes.
*/
async function incrementalSyncTenant(tenantId: string): Promise<void> {
try {
// Obtener contribuyentes del tenant
const tenant = await prisma.tenant.findUnique({
where: { id: tenantId },
select: { databaseName: true },
});
let contribuyenteIds: string[] = [];
if (tenant?.databaseName) {
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
const { rows } = await pool.query('SELECT entidad_id FROM contribuyentes');
contribuyenteIds = rows.map((r: any) => r.entidad_id);
}
// Si no hay contribuyentes, sincronizar a nivel tenant (legacy)
if (contribuyenteIds.length === 0) {
const completedInitial = await prisma.satSyncJob.findFirst({
where: { tenantId, type: 'initial', status: 'completed' },
});
if (!completedInitial) {
console.log(`[SAT Cron Inc] Tenant ${tenantId} sin sync inicial completado, omitiendo incremental`);
return;
}
const status = await getSyncStatus(tenantId);
if (status.hasActiveSync) {
console.log(`[SAT Cron Inc] Tenant ${tenantId} con sync activo, omitiendo`);
return;
}
console.log(`[SAT Cron Inc] Iniciando incremental para tenant ${tenantId} (sin contribuyentes)`);
const jobId = await startSync(tenantId, 'incremental');
console.log(`[SAT Cron Inc] Job ${jobId} iniciado`);
return;
}
// Sincronizar cada contribuyente solo si ya tiene su initial completado
for (const contribuyenteId of contribuyenteIds) {
try {
const hasInitial = await prisma.satSyncJob.findFirst({
where: { tenantId, contribuyenteId, type: 'initial', status: 'completed' },
});
if (!hasInitial) {
console.log(`[SAT Cron Inc] Tenant ${tenantId} contribuyente ${contribuyenteId} sin sync inicial, omitiendo incremental`);
continue;
}
const status = await getSyncStatus(tenantId, contribuyenteId);
if (status.hasActiveSync) {
console.log(`[SAT Cron Inc] Tenant ${tenantId} contribuyente ${contribuyenteId} con sync activo, omitiendo`);
continue;
}
console.log(`[SAT Cron Inc] Iniciando incremental para tenant ${tenantId} contribuyente ${contribuyenteId}`);
const jobId = await startSync(tenantId, 'incremental', undefined, undefined, contribuyenteId);
console.log(`[SAT Cron Inc] Job ${jobId} iniciado`);
} catch (error: any) {
console.error(`[SAT Cron Inc] Error para tenant ${tenantId} contribuyente ${contribuyenteId}:`, error.message);
}
}
} catch (error: any) {
console.error(`[SAT Cron Inc] Error para tenant ${tenantId}:`, error.message);
}
}
/**
* Ejecuta el job incremental para todos los tenants cuyo plan habilita SAT
* incremental (Mi Empresa +, Business Control, Enterprise por default;
* configurable desde admin via despacho_plan_prices.permite_sat_incremental).
*/
async function runIncrementalSyncJob(): Promise<void> {
if (isIncrementalRunning) {
console.log('[SAT Cron Inc] Job ya en ejecución, omitiendo');
return;
}
isIncrementalRunning = true;
console.log('[SAT Cron Inc] Iniciando ciclo incremental');
try {
const tenantIds = await getTenantsConSatIncremental();
console.log(`[SAT Cron Inc] ${tenantIds.length} tenants con incremental habilitado y FIEL`);
if (tenantIds.length === 0) return;
for (let i = 0; i < tenantIds.length; i += CONCURRENT_SYNCS) {
const batch = tenantIds.slice(i, i + CONCURRENT_SYNCS);
await Promise.all(batch.map(incrementalSyncTenant));
if (i + CONCURRENT_SYNCS < tenantIds.length) {
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
console.log('[SAT Cron Inc] Ciclo incremental completado');
} catch (error: any) {
console.error('[SAT Cron Inc] Error en ciclo:', error.message);
} finally {
isIncrementalRunning = false;
}
}
async function runOpinionJob(): Promise<void> {
console.log('[Opinion Cron] Iniciando descarga semanal de Opinión de Cumplimiento');
const tenants = await prisma.tenant.findMany({
where: { active: true },
select: { id: true, rfc: true, databaseName: true },
});
let success = 0;
let failed = 0;
let skipped = 0;
for (const tenant of tenants) {
const hasFiel = await hasFielConfigured(tenant.id);
if (!hasFiel) {
skipped++;
continue;
}
try {
console.log(`[Opinion Cron] Consultando opinión para ${tenant.rfc}...`);
await consultarOpinion(tenant.id);
success++;
// Cleanup old records
const pool = await tenantDb.getPool(tenant.id, tenant.databaseName);
const deleted = await limpiarOpinionesAntiguas(pool);
if (deleted > 0) {
console.log(`[Opinion Cron] ${tenant.rfc}: ${deleted} opiniones antiguas eliminadas`);
}
} catch (error: any) {
console.error(`[Opinion Cron] Error para ${tenant.rfc}:`, error.message);
failed++;
}
}
console.log(`[Opinion Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`);
}
async function runCsfJob(): Promise<void> {
console.log('[CSF Cron] Iniciando descarga mensual de Constancia de Situación Fiscal');
const tenants = await prisma.tenant.findMany({
where: { active: true },
select: { id: true, rfc: true },
});
let success = 0;
let failed = 0;
let skipped = 0;
for (const tenant of tenants) {
const hasFiel = await hasFielConfigured(tenant.id);
if (!hasFiel) { skipped++; continue; }
try {
console.log(`[CSF Cron] Consultando CSF para ${tenant.rfc}...`);
await consultarConstancia(tenant.id);
success++;
} catch (error: any) {
console.error(`[CSF Cron] Error para ${tenant.rfc}:`, error.message);
failed++;
}
// Delay entre tenants para no saturar al SAT y reducir bloqueos por IP
await new Promise(r => setTimeout(r, 30_000));
}
console.log(`[CSF Cron] Completado — éxito: ${success}, fallidos: ${failed}, sin FIEL: ${skipped}`);
}
function getYesterdayEnd(): Date {
const now = new Date();
return new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1, 23, 59, 59);
}
async function hasIncompleteCfdis(pool: Pool, contribuyenteId: string): Promise<boolean> {
const { rows } = await pool.query<{ count: string }>(`
SELECT COUNT(*)::text as count
FROM cfdis
WHERE contribuyente_id = $1
AND status = 'Vigente'
AND tipo_comprobante IN ('I', 'E', 'P', 'N')
AND xml_original IS NULL
`, [contribuyenteId]);
return Number(rows[0]?.count || 0) > 0;
}
async function getOldestIncompleteCfdiDate(pool: Pool, contribuyenteId: string): Promise<Date | null> {
const { rows } = await pool.query<{ fecha_emision: Date | null }>(`
SELECT MIN(fecha_emision) as fecha_emision
FROM cfdis
WHERE contribuyente_id = $1
AND status = 'Vigente'
AND tipo_comprobante IN ('I', 'E', 'P', 'N')
AND xml_original IS NULL
`, [contribuyenteId]);
return rows[0]?.fecha_emision || null;
}
async function waitForRecoveryJob(jobId: string): Promise<void> {
while (true) {
const job = await prisma.satSyncJob.findUnique({ where: { id: jobId } });
if (!job || job.status === 'completed' || job.status === 'failed') {
return;
}
await new Promise(resolve => setTimeout(resolve, 60000));
}
}
async function recoverContribuyente(tenantId: string, databaseName: string, contribuyenteId: string): Promise<void> {
try {
const status = await getSyncStatus(tenantId, contribuyenteId);
if (status.hasActiveSync) {
console.log(`[SAT Recovery] ${contribuyenteId} tiene sync activo, omitiendo`);
return;
}
const pool = await tenantDb.getPool(tenantId, databaseName);
const hasIncomplete = await hasIncompleteCfdis(pool, contribuyenteId);
const lastDaily = await prisma.satSyncJob.findFirst({
where: { tenantId, contribuyenteId, type: 'daily' },
orderBy: { startedAt: 'desc' },
});
if (!hasIncomplete && lastDaily?.status !== 'failed') {
return;
}
const dateTo = getYesterdayEnd();
let dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate());
if (hasIncomplete) {
const oldest = await getOldestIncompleteCfdiDate(pool, contribuyenteId);
if (oldest) {
dateFrom = new Date(oldest.getFullYear(), oldest.getMonth(), 1);
dateFrom.setMonth(dateFrom.getMonth() - 1);
}
}
console.log(`[SAT Recovery] Recuperando ${contribuyenteId}: ${dateFrom.toISOString()}${dateTo.toISOString()}`);
const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo, contribuyenteId);
console.log(`[SAT Recovery] Job ${jobId} iniciado`);
await waitForRecoveryJob(jobId);
} catch (error: any) {
console.error(`[SAT Recovery] Error recuperando ${contribuyenteId}:`, error.message);
}
}
async function recoverTenant(tenantId: string): Promise<void> {
const tenant = await prisma.tenant.findUnique({
where: { id: tenantId },
select: { databaseName: true },
});
if (!tenant?.databaseName) return;
const pool = await tenantDb.getPool(tenantId, tenant.databaseName);
const { rows } = await pool.query<{ entidad_id: string }>('SELECT entidad_id FROM contribuyentes');
const contribuyenteIds = rows.map(r => r.entidad_id);
if (contribuyenteIds.length === 0) {
const status = await getSyncStatus(tenantId);
if (status.hasActiveSync) return;
const lastDaily = await prisma.satSyncJob.findFirst({
where: { tenantId, contribuyenteId: null, type: 'daily' },
orderBy: { startedAt: 'desc' },
});
if (lastDaily?.status === 'failed') {
const dateTo = getYesterdayEnd();
const dateFrom = new Date(dateTo.getFullYear() - 1, dateTo.getMonth(), dateTo.getDate());
console.log(`[SAT Recovery] Recuperando tenant legacy ${tenantId}`);
const jobId = await startSync(tenantId, 'initial', dateFrom, dateTo);
await waitForRecoveryJob(jobId);
}
return;
}
for (const contribuyenteId of contribuyenteIds) {
await recoverContribuyente(tenantId, tenant.databaseName, contribuyenteId);
}
}
export async function runRecoverySyncJob(): Promise<void> {
if (isRecoveryRunning) {
console.log('[SAT Recovery] Ya en ejecución, omitiendo');
return;
}
isRecoveryRunning = true;
console.log('[SAT Recovery] Iniciando job de recuperación');
try {
const tenantIds = await getTenantsWithFiel();
console.log(`[SAT Recovery] ${tenantIds.length} tenants con FIEL`);
for (const tenantId of tenantIds) {
await recoverTenant(tenantId);
}
console.log('[SAT Recovery] Job de recuperación completado');
} catch (error: any) {
console.error('[SAT Recovery] Error:', error.message);
} finally {
isRecoveryRunning = false;
}
}
let scheduledTask: ReturnType<typeof cron.schedule> | null = null;
let retryTask: ReturnType<typeof cron.schedule> | null = null;
let recoveryTask: ReturnType<typeof cron.schedule> | null = null;
let opinionTask: ReturnType<typeof cron.schedule> | null = null;
let csfTask: ReturnType<typeof cron.schedule> | null = null;
let incrementalTask: ReturnType<typeof cron.schedule> | null = null;
let subscriptionTask: ReturnType<typeof cron.schedule> | null = null;
let expiryRemindersTask: ReturnType<typeof cron.schedule> | null = null;
let watchdogTask: ReturnType<typeof cron.schedule> | null = null;
const RETRY_CRON_SCHEDULE = '0 * * * *'; // Cada hora
const WATCHDOG_CRON_SCHEDULE = '0 */2 * * *'; // Cada 2 horas — marca stale jobs como failed
/**
* Inicia el job programado
*/
export function startSatSyncJob(): void {
if (scheduledTask) {
console.log('[SAT Cron] Job ya está programado');
return;
}
// Validar expresión cron
if (!cron.validate(SYNC_CRON_SCHEDULE)) {
console.error('[SAT Cron] Expresión cron inválida:', SYNC_CRON_SCHEDULE);
return;
}
scheduledTask = cron.schedule(SYNC_CRON_SCHEDULE, runSyncJob, {
timezone: 'America/Mexico_City',
});
// Cron de reintentos: cada hora revisa si hay jobs pendientes de retry
retryTask = cron.schedule(RETRY_CRON_SCHEDULE, async () => {
try {
await retryTimedOutJobs();
} catch (error: any) {
console.error('[SAT Retry Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
// Cron de recuperación: 10:00 AM diario. Revisa si el sync diario falló o si
// hay CFDIs vigentes sin XML, y relanza un sync `initial` con rango extendido
// para completar los XML faltantes.
recoveryTask = cron.schedule(RECOVERY_CRON_SCHEDULE, async () => {
try {
await runRecoverySyncJob();
} catch (error: any) {
console.error('[SAT Recovery Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
// Cron watchdog: cada 2h marca como `failed` los jobs que quedaron stale
// (pending con nextRetryAt > 12h atrás, running con startedAt > 4h atrás).
// Thresholds sobreescribibles vía env (STALE_PENDING_HOURS / STALE_RUNNING_HOURS)
// — defaults razonables pensando en que un sync inicial típico termina
// en <2h y el retryCron corre cada hora.
watchdogTask = cron.schedule(WATCHDOG_CRON_SCHEDULE, async () => {
try {
const pendingHours = Number(process.env.STALE_PENDING_HOURS || 12);
const runningHours = Number(process.env.STALE_RUNNING_HOURS || 4);
const result = await sweepStaleSatJobs({ apply: true, pendingHours, runningHours });
if (result.pendingMarked + result.runningMarked > 0) {
console.log(`[SAT Watchdog] Marcados failed: pending=${result.pendingMarked} running=${result.runningMarked}`);
}
} catch (error: any) {
console.error('[SAT Watchdog] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
opinionTask = cron.schedule(OPINION_CRON_SCHEDULE, async () => {
try {
await runOpinionJob();
} catch (error: any) {
console.error('[Opinion Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
csfTask = cron.schedule(CSF_CRON_SCHEDULE, async () => {
try {
await runCsfJob();
} catch (error: any) {
console.error('[CSF Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
incrementalTask = cron.schedule(INCREMENTAL_CRON_SCHEDULE, async () => {
try {
await runIncrementalSyncJob();
} catch (error: any) {
console.error('[SAT Cron Inc] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
subscriptionTask = cron.schedule(SUBSCRIPTION_LIFECYCLE_CRON, async () => {
try {
const pending = await applyPendingChanges();
const trials = await expireTrials();
// Reset mensual de TimbreSuscripcion: para cada tenant cuyo periodoFin
// ya pasó, resetea usados=0 y avanza la ventana +1 mes/año. Los paquetes
// adicionales NO se tocan; su expiraEn = adquiridoEn + 1 año fijo.
const timbres = await resetExpiredMonthlyTimbres();
// Cleanup retención 5 años (CFF Art. 30): borra declaraciones provisionales
// viejas. Iteramos cada tenant activo. Por-tenant try/catch para que un
// tenant que falla no bloquee al resto.
let declsBorradas = 0;
let csfsBorradas = 0;
const tenants = await prisma.tenant.findMany({
where: { active: true },
select: { id: true, databaseName: true, rfc: true },
});
for (const t of tenants) {
try {
const pool = await tenantDb.getPool(t.id, t.databaseName);
const declResult = await purgeDeclaracionesAntiguas(pool);
declsBorradas += declResult.deleted;
const csfResult = await purgeConstanciasAntiguas(pool);
csfsBorradas += csfResult.deleted;
} catch (err: any) {
console.error(`[Cleanup] Tenant ${t.rfc} fallo en purge:`, err.message || err);
}
}
console.log(`[Subscription Cron] pending aplicados: ${pending.applied} (${pending.errors} errores), trials expirados: ${trials.expired}, timbres reseteados: ${timbres.reset}, declaraciones >5 años borradas: ${declsBorradas}, CSFs >5 años borradas: ${csfsBorradas}`);
} catch (error: any) {
console.error('[Subscription Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
// Cron 9:00 AM diario — emails pre-vencimiento (7d/3d/1d) y aviso 0d post-vencimiento.
// Idempotente vía `Subscription.lastReminderDay`.
expiryRemindersTask = cron.schedule(EXPIRY_REMINDERS_CRON, async () => {
try {
const result = await sendExpiryReminders();
if (result.sent > 0 || result.errors > 0) {
console.log(`[Expiry Reminders] enviados: ${result.sent}, reset por renovación: ${result.resetOnly}, skipped: ${result.skipped}, errores: ${result.errors}`);
}
} catch (error: any) {
console.error('[Expiry Reminders Cron] Error:', error.message);
}
}, {
timezone: 'America/Mexico_City',
});
console.log(`[SAT Cron] Job programado para: ${SYNC_CRON_SCHEDULE} (America/Mexico_City)`);
console.log(`[SAT Cron] Retry programado cada hora`);
console.log(`[SAT Recovery Cron] Programado para: ${RECOVERY_CRON_SCHEDULE} (America/Mexico_City)`);
console.log(`[Opinion Cron] Programado para: ${OPINION_CRON_SCHEDULE} (America/Mexico_City)`);
console.log(`[CSF Cron] Programado para: ${CSF_CRON_SCHEDULE} (America/Mexico_City)`);
console.log(`[SAT Cron Inc] Incremental Enterprise programado para: ${INCREMENTAL_CRON_SCHEDULE} (America/Mexico_City)`);
console.log(`[Subscription Cron] Lifecycle programado para: ${SUBSCRIPTION_LIFECYCLE_CRON} (America/Mexico_City)`);
console.log(`[SAT Watchdog] Programado para: ${WATCHDOG_CRON_SCHEDULE} (America/Mexico_City)`);
}
/**
* Detiene el job programado
*/
export function stopSatSyncJob(): void {
if (scheduledTask) {
scheduledTask.stop();
scheduledTask = null;
}
if (retryTask) {
retryTask.stop();
retryTask = null;
}
if (recoveryTask) {
recoveryTask.stop();
recoveryTask = null;
}
if (opinionTask) {
opinionTask.stop();
opinionTask = null;
}
if (csfTask) {
csfTask.stop();
csfTask = null;
}
if (incrementalTask) {
incrementalTask.stop();
incrementalTask = null;
}
if (subscriptionTask) {
subscriptionTask.stop();
subscriptionTask = null;
}
if (expiryRemindersTask) {
expiryRemindersTask.stop();
expiryRemindersTask = null;
}
if (watchdogTask) {
watchdogTask.stop();
watchdogTask = null;
}
console.log('[SAT Cron] Jobs detenidos');
}
/**
* Ejecuta manualmente el ciclo incremental Enterprise (para testing).
*/
export async function runIncrementalSyncJobManually(): Promise<void> {
await runIncrementalSyncJob();
}
/**
* Ejecuta el job manualmente (para testing o ejecución forzada)
*/
export async function runSatSyncJobManually(): Promise<void> {
await runSyncJob();
}
/**
* Obtiene información del próximo job programado
*/
export function getJobInfo(): { scheduled: boolean; expression: string; timezone: string } {
return {
scheduled: scheduledTask !== null,
expression: SYNC_CRON_SCHEDULE,
timezone: 'America/Mexico_City',
};
}