import { prisma, tenantDb } from '../../config/database.js'; import { getDecryptedFiel } from '../fiel.service.js'; import { getDecryptedFielContribuyente } from '../contribuyente-fiel.service.js'; import { markForInvalidation } from '../metricas.service.js'; import { createSatService, querySat, verifySatRequest, downloadSatPackage, type FielData, } from './sat-client.service.js'; import { processPackage, processMetadataPackage, extractXmlsFromZip, type CfdiParsed, type CfdiMetadata } from './sat-parser.service.js'; import { recomputarSaldoPendiente, uuidsAfectadosPorCfdi } from '../../utils/saldo.js'; import type { SatSyncJob, CfdiSyncType, SatSyncType } from '@horux/shared'; import type { Service } from '@nodecfdi/sat-ws-descarga-masiva'; import type { Pool } from 'pg'; import * as fs from 'fs'; import * as path from 'path'; const POLL_INTERVAL_MS = 60000; // 60 segundos const MAX_POLL_ATTEMPTS = 45; // 45 minutos máximo (45 × 60s) const YEARS_TO_SYNC = 6; // SAT solo permite descargar últimos 6 años /** * Política de retry por tipo de sync. * - `retryAtHours[i]` = horas DESDE startedAt para el retry i+1. * Los tiempos son ABSOLUTOS desde el inicio, no acumulativos desde el * intento anterior — así el retry 1 cae exactamente a startedAt+6h sin * importar cuánto tardó el intento original en hacer timeout. * - `maxRetries` = nº máximo de reintentos (después → status='failed'). * * Justificación de las políticas: * - daily/custom: el contador puede esperar 12h. 2 retries cubre fallas * transitorias del SAT sin atascar pendientes infinitamente. * - initial bootstrap: la primera sync de un tenant puede ser de 6 años * de datos — vale la pena más paciencia (24h del start es el último * intento). Si después de 24h sigue fallando, hay un problema estructural. * - incremental: corre cada 4h por el cron. Si una falla, la siguiente * ejecución cubrirá el gap (la ventana de 8h se solapa). Reintentar * duplicaría carga sin beneficio. */ const RETRY_POLICIES: Record<'daily' | 'custom' | 'initial' | 'incremental', { maxRetries: number; retryAtHours: number[]; }> = { daily: { maxRetries: 2, retryAtHours: [6, 12] }, custom: { maxRetries: 2, retryAtHours: [6, 12] }, initial: { maxRetries: 3, retryAtHours: [6, 12, 24] }, incremental: { maxRetries: 0, retryAtHours: [] }, }; function getRetryPolicy(job: { type: SatSyncType; isCustomRange: boolean }) { if (job.type === 'initial' && job.isCustomRange) return RETRY_POLICIES.custom; return RETRY_POLICIES[job.type]; } /** * Calcula `nextRetryAt` para el retry número `nextRetryNumber` (1-based). * Devuelve null si ya no hay retries disponibles según la política. */ function computeNextRetryAt( startedAt: Date, nextRetryNumber: number, policy: { retryAtHours: number[] }, ): Date | null { const idx = nextRetryNumber - 1; if (idx < 0 || idx >= policy.retryAtHours.length) return null; return new Date(startedAt.getTime() + policy.retryAtHours[idx] * 60 * 60 * 1000); } interface SyncContext { fielData: FielData; service: Service; rfc: string; tenantId: string; databaseName: string; contribuyenteId: string | null; getPool: () => Promise; } /** * Actualiza el progreso de un job */ async function updateJobProgress( jobId: string, updates: Partial<{ status: 'pending' | 'running' | 'completed' | 'failed'; satRequestId: string; satPackageIds: string[]; cfdisFound: number; cfdisDownloaded: number; cfdisInserted: number; cfdisUpdated: number; progressPercent: number; errorMessage: string; startedAt: Date; completedAt: Date; retryCount: number; nextRetryAt: Date; }> ): Promise { await prisma.satSyncJob.update({ where: { id: jobId }, data: updates, }); } /** * Obtiene o crea un RFC en la tabla rfcs del tenant */ async function getOrCreateRfc(pool: Pool, rfc: string, razonSocial: string | null, regimenFiscal?: string | null): Promise { const { rows } = await pool.query( `INSERT INTO rfcs (rfc, razon_social, regimen_fiscal) VALUES ($1, $2, $3) ON CONFLICT (rfc) DO UPDATE SET razon_social = COALESCE(NULLIF($2, ''), rfcs.razon_social), regimen_fiscal = CASE WHEN $3 IS NOT NULL AND $3 != '' THEN $3 ELSE rfcs.regimen_fiscal END RETURNING id`, [rfc, razonSocial || null, regimenFiscal || null] ); return rows[0].id; } /** * Guarda los XMLs extraídos del ZIP en disco para respaldo */ function saveXmlsToDisk( zipBase64: string, tenantRfc: string, tipoCfdi: CfdiSyncType, packageId: string ): string { const baseDir = path.join(process.cwd(), 'data', 'xmls', tenantRfc.toLowerCase(), tipoCfdi); fs.mkdirSync(baseDir, { recursive: true }); const xmlFiles = extractXmlsFromZip(zipBase64); const packageDir = path.join(baseDir, packageId); fs.mkdirSync(packageDir, { recursive: true }); for (const { filename, content } of xmlFiles) { fs.writeFileSync(path.join(packageDir, filename), content, 'utf-8'); } console.log(`[SAT] ${xmlFiles.length} XMLs guardados en ${packageDir}`); return packageDir; } /** * Guarda los CFDIs en la base de datos del tenant */ async function saveCfdis( pool: Pool, cfdis: CfdiParsed[], jobId: string, contribuyenteId: string | null = null ): Promise<{ inserted: number; updated: number }> { let inserted = 0; let updated = 0; for (const cfdi of cfdis) { try { const tc = cfdi.tipoCambio || 1; const m = (v: number) => v * tc; // compute MXN const fechaEmision = cfdi.fechaEmision; const year = String(fechaEmision.getFullYear()); const month = String(fechaEmision.getMonth() + 1).padStart(2, '0'); // Upsert RFCs y obtener IDs const rfcEmisorId = await getOrCreateRfc(pool, cfdi.rfcEmisor, cfdi.nombreEmisor, cfdi.regimenFiscalEmisor); const rfcReceptorId = await getOrCreateRfc(pool, cfdi.rfcReceptor, cfdi.nombreReceptor, cfdi.regimenFiscalReceptor); // Normaliza UUID a lowercase (RFC 4122 canonical) para evitar duplicados // entre el XML parser y el CSV metadata parser del SAT. const uuidNorm = cfdi.uuid ? cfdi.uuid.toLowerCase() : cfdi.uuid; // All values for the full column set const vals = [ year, month, cfdi.type, uuidNorm, cfdi.serie, cfdi.folio, cfdi.status, fechaEmision, rfcEmisorId, cfdi.rfcEmisor, cfdi.nombreEmisor, rfcReceptorId, cfdi.rfcReceptor, cfdi.nombreReceptor, cfdi.subtotal, m(cfdi.subtotal), cfdi.descuento, m(cfdi.descuento), cfdi.total, m(cfdi.total), cfdi.saldoInsoluto, cfdi.moneda, tc, cfdi.tipoComprobante, cfdi.metodoPago, cfdi.formaPago, cfdi.usoCfdi, cfdi.pac, cfdi.fechaCertSat, cfdi.uuidRelacionado, cfdi.isrRetencion, m(cfdi.isrRetencion), cfdi.ivaTraslado, m(cfdi.ivaTraslado), cfdi.ivaRetencion, m(cfdi.ivaRetencion), cfdi.iepsTraslado, m(cfdi.iepsTraslado), cfdi.iepsRetencion, m(cfdi.iepsRetencion), cfdi.impuestosLocalesTrasladado, m(cfdi.impuestosLocalesTrasladado), cfdi.impuestosLocalesRetenidos, m(cfdi.impuestosLocalesRetenidos), cfdi.montoPago, m(cfdi.montoPago), cfdi.fechaPagoP, cfdi.numParcialidad, cfdi.isrRetencionPago, m(cfdi.isrRetencionPago), cfdi.ivaTrasladoPago, m(cfdi.ivaTrasladoPago), cfdi.ivaRetencionPago, m(cfdi.ivaRetencionPago), cfdi.iepsTrasladoPago, m(cfdi.iepsTrasladoPago), cfdi.iepsRetencionPago, m(cfdi.iepsRetencionPago), cfdi.fechaPago, cfdi.fechaInicialPago, cfdi.fechaFinalPago, cfdi.numDiasPagados, cfdi.numSeguroSocial, cfdi.puesto, cfdi.salarioBaseCotApor, m(cfdi.salarioBaseCotApor), cfdi.salarioDiarioIntegrado, m(cfdi.salarioDiarioIntegrado), cfdi.totalPercepciones, m(cfdi.totalPercepciones), cfdi.totalDeducciones, m(cfdi.totalDeducciones), cfdi.impRetenidosNomina, m(cfdi.impRetenidosNomina), cfdi.otrasDeduccionesNomina, m(cfdi.otrasDeduccionesNomina), cfdi.subsidioCausado, m(cfdi.subsidioCausado), cfdi.regimenFiscalEmisor, cfdi.regimenFiscalReceptor, cfdi.codigoPostalReceptor, cfdi.xmlOriginal, cfdi.cfdiTipoRelacion, cfdi.cfdisRelacionados, jobId, ]; const { rows: existing } = await pool.query( `SELECT id FROM cfdis WHERE LOWER(uuid) = $1`, [uuidNorm] ); if (existing.length > 0) { // $1=uuid(WHERE), $2-$85=all vals (includes rfc_emisor_id, rfc_receptor_id, // cfdi_tipo_relacion, cfdis_relacionados) await pool.query( `UPDATE cfdis SET year=$2, month=$3, type=$4, uuid=$5, serie=$6, folio=$7, status=$8, fecha_emision=$9, rfc_emisor_id=$10, rfc_emisor=$11, nombre_emisor=$12, rfc_receptor_id=$13, rfc_receptor=$14, nombre_receptor=$15, subtotal=$16, subtotal_mxn=$17, descuento=$18, descuento_mxn=$19, total=$20, total_mxn=$21, saldo_insoluto=$22, moneda=$23, tipo_cambio=$24, tipo_comprobante=$25, metodo_pago=$26, forma_pago=$27, uso_cfdi=$28, pac=$29, fecha_cert_sat=$30, uuid_relacionado=$31, isr_retencion=$32, isr_retencion_mxn=$33, iva_traslado=$34, iva_traslado_mxn=$35, iva_retencion=$36, iva_retencion_mxn=$37, ieps_traslado=$38, ieps_traslado_mxn=$39, ieps_retencion=$40, ieps_retencion_mxn=$41, impuestos_locales_trasladado=$42, impuestos_locales_trasladado_mxn=$43, impuestos_locales_retenidos=$44, impuestos_locales_retenidos_mxn=$45, monto_pago=$46, monto_pago_mxn=$47, fecha_pago_p=$48, num_parcialidad=$49, isr_retencion_pago=$50, isr_retencion_pago_mxn=$51, iva_traslado_pago=$52, iva_traslado_pago_mxn=$53, iva_retencion_pago=$54, iva_retencion_pago_mxn=$55, ieps_traslado_pago=$56, ieps_traslado_pago_mxn=$57, ieps_retencion_pago=$58, ieps_retencion_pago_mxn=$59, fecha_pago=$60, fecha_inicial_pago=$61, fecha_final_pago=$62, num_dias_pagados=$63, num_seguro_social=$64, puesto=$65, salario_base_cot_apor=$66, salario_base_cot_apor_mxn=$67, salario_diario_integrado=$68, salario_diario_integrado_mxn=$69, total_percepciones=$70, total_percepciones_mxn=$71, total_deducciones=$72, total_deducciones_mxn=$73, imp_retenidos_nomina=$74, imp_retenidos_nomina_mxn=$75, otras_deducciones_nomina=$76, otras_deducciones_nomina_mxn=$77, subsidio_causado=$78, subsidio_causado_mxn=$79, regimen_fiscal_emisor=$80, regimen_fiscal_receptor=$81, codigo_postal_receptor=$82, xml_original=$83, cfdi_tipo_relacion=$84, cfdis_relacionados=$85, last_sat_sync=NOW(), sat_sync_job_id=$86::uuid, actualizado_en=NOW() WHERE uuid = $1`, [cfdi.uuid, ...vals] ); // Re-insert conceptos for updated CFDI await pool.query(`DELETE FROM cfdi_conceptos WHERE cfdi_id = $1`, [existing[0].id]); await saveConceptos(pool, existing[0].id, cfdi); updated++; } else { // $1-$83 = data fields (year..cfdis_relacionados), $84 = jobId, $85 = contribuyente_id const dataPlaceholders = vals.slice(0, -1).map((_, i) => `$${i + 1}`).join(','); await pool.query( `INSERT INTO cfdis ( year, month, type, uuid, serie, folio, status, fecha_emision, rfc_emisor_id, rfc_emisor, nombre_emisor, rfc_receptor_id, rfc_receptor, nombre_receptor, subtotal, subtotal_mxn, descuento, descuento_mxn, total, total_mxn, saldo_insoluto, moneda, tipo_cambio, tipo_comprobante, metodo_pago, forma_pago, uso_cfdi, pac, fecha_cert_sat, uuid_relacionado, isr_retencion, isr_retencion_mxn, iva_traslado, iva_traslado_mxn, iva_retencion, iva_retencion_mxn, ieps_traslado, ieps_traslado_mxn, ieps_retencion, ieps_retencion_mxn, impuestos_locales_trasladado, impuestos_locales_trasladado_mxn, impuestos_locales_retenidos, impuestos_locales_retenidos_mxn, monto_pago, monto_pago_mxn, fecha_pago_p, num_parcialidad, isr_retencion_pago, isr_retencion_pago_mxn, iva_traslado_pago, iva_traslado_pago_mxn, iva_retencion_pago, iva_retencion_pago_mxn, ieps_traslado_pago, ieps_traslado_pago_mxn, ieps_retencion_pago, ieps_retencion_pago_mxn, fecha_pago, fecha_inicial_pago, fecha_final_pago, num_dias_pagados, num_seguro_social, puesto, salario_base_cot_apor, salario_base_cot_apor_mxn, salario_diario_integrado, salario_diario_integrado_mxn, total_percepciones, total_percepciones_mxn, total_deducciones, total_deducciones_mxn, imp_retenidos_nomina, imp_retenidos_nomina_mxn, otras_deducciones_nomina, otras_deducciones_nomina_mxn, subsidio_causado, subsidio_causado_mxn, regimen_fiscal_emisor, regimen_fiscal_receptor, codigo_postal_receptor, xml_original, cfdi_tipo_relacion, cfdis_relacionados, source, sat_sync_job_id, last_sat_sync, contribuyente_id ) VALUES ( ${dataPlaceholders}, 'sat', $${vals.length}::uuid, NOW(), $${vals.length + 1} )`, [...vals, contribuyenteId] ); // Get the inserted cfdi id and save conceptos const { rows: [newRow] } = await pool.query(`SELECT id FROM cfdis WHERE uuid = $1`, [cfdi.uuid]); if (newRow) await saveConceptos(pool, newRow.id, cfdi); inserted++; } // Marcar el mes para recompute de métricas pre-calculadas. Para tipo P // el mes contable es el de fecha_pago_p (coherente con los cálculos // fiscales). Solo tiene sentido en tenants con contribuyentes. if (contribuyenteId) { const fechaContableRaw = cfdi.tipoComprobante === 'P' && cfdi.fechaPagoP ? cfdi.fechaPagoP : cfdi.fechaEmision; const fechaContable = fechaContableRaw instanceof Date ? fechaContableRaw : new Date(fechaContableRaw); const anio = fechaContable.getFullYear(); const mes = fechaContable.getMonth() + 1; await markForInvalidation(pool, contribuyenteId, anio, mes, 'SAT_SYNC_CFDI').catch( err => console.warn('[SAT] markForInvalidation falló:', err?.message || err), ); } } catch (error) { console.error(`[SAT] Error guardando CFDI ${cfdi.uuid}:`, error); } } // Recompute saldo_pendiente_mxn para todos los CFDIs afectados por este // batch: los I PPD recién insertados, más los I PPD referenciados por los // P y E no-07 que entraron. Un solo UPDATE agregado al final del loop es // más eficiente que uno por CFDI (la subquery del SALDO es costosa). const afectados = new Set(); for (const cfdi of cfdis) { for (const u of uuidsAfectadosPorCfdi(cfdi)) afectados.add(u); } if (afectados.size > 0) { try { await recomputarSaldoPendiente(pool, Array.from(afectados)); } catch (err: any) { console.warn(`[SAT] recomputarSaldoPendiente falló: ${err?.message || err}`); } } return { inserted, updated }; } /** * Guarda los conceptos de un CFDI en cfdi_conceptos */ async function saveConceptos(pool: Pool, cfdiId: number, cfdi: CfdiParsed): Promise { if (!cfdi.conceptos || cfdi.conceptos.length === 0) return; const tc = cfdi.tipoCambio || 1; const m = (v: number) => v * tc; for (const c of cfdi.conceptos) { await pool.query(` INSERT INTO cfdi_conceptos ( cfdi_id, clave_prod_serv, no_identificacion, descripcion, cantidad, clave_unidad, unidad, valor_unitario, valor_unitario_mxn, importe, importe_mxn, descuento, descuento_mxn, isr_retencion, isr_retencion_mxn, iva_traslado, iva_traslado_mxn, iva_retencion, iva_retencion_mxn, ieps_traslado, ieps_traslado_mxn, ieps_retencion, ieps_retencion_mxn ) VALUES ( $1,$2,$3,$4,$5,$6,$7,$8,$9,$10, $11,$12,$13,$14,$15,$16,$17,$18,$19,$20, $21,$22,$23 ) `, [ cfdiId, c.claveProdServ, c.noIdentificacion, c.descripcion, c.cantidad, c.claveUnidad, c.unidad, c.valorUnitario, m(c.valorUnitario), c.importe, m(c.importe), c.descuento, m(c.descuento), c.isrRetencion, m(c.isrRetencion), c.ivaTraslado, m(c.ivaTraslado), c.ivaRetencion, m(c.ivaRetencion), c.iepsTraslado, m(c.iepsTraslado), c.iepsRetencion, m(c.iepsRetencion), ]); } } /** * Guarda/actualiza CFDIs desde metadata del SAT. * - Si el CFDI no existe: inserta con datos básicos de metadata (sin XML). * - Si el CFDI ya existe y la metadata dice Cancelado: actualiza status + fecha_cancelacion. * - Si el CFDI ya existe y sigue Vigente: no toca nada (los datos del XML son más completos). */ async function saveMetadata( pool: Pool, items: CfdiMetadata[], jobId: string, contribuyenteId: string | null = null ): Promise<{ inserted: number; updated: number }> { let inserted = 0; let updated = 0; for (const m of items) { try { // Normaliza UUID a lowercase para evitar duplicados case-sensitive con // el XML parser (que también normaliza). CSV del SAT lo devuelve UPPERCASE. const uuidNorm = m.uuid ? m.uuid.toLowerCase() : m.uuid; const { rows: existing } = await pool.query( `SELECT id, status FROM cfdis WHERE LOWER(uuid) = $1`, [uuidNorm] ); if (existing.length > 0) { // Solo actualizar si cambió a Cancelado if (m.status === 'Cancelado' && existing[0].status !== 'Cancelado') { await pool.query( `UPDATE cfdis SET status = 'Cancelado', fecha_cancelacion = $2, actualizado_en = NOW() WHERE id = $1`, [existing[0].id, m.fechaCancelacion] ); updated++; } } else { // Insertar CFDI con datos básicos de metadata (sin XML) const year = String(m.fechaEmision.getFullYear()); const month = String(m.fechaEmision.getMonth() + 1).padStart(2, '0'); // Upsert RFCs const rfcEmisorId = await getOrCreateRfc(pool, m.rfcEmisor, m.nombreEmisor); const rfcReceptorId = await getOrCreateRfc(pool, m.rfcReceptor, m.nombreReceptor); await pool.query( `INSERT INTO cfdis ( year, month, type, uuid, status, fecha_emision, fecha_cancelacion, rfc_emisor_id, rfc_emisor, nombre_emisor, rfc_receptor_id, rfc_receptor, nombre_receptor, total, total_mxn, moneda, tipo_comprobante, pac, fecha_cert_sat, source, sat_sync_job_id, last_sat_sync, contribuyente_id ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $14, 'MXN', $15, $16, $17, 'sat-metadata', $18::uuid, NOW(), $19 )`, [ year, month, m.type, uuidNorm, m.status, m.fechaEmision, m.fechaCancelacion, rfcEmisorId, m.rfcEmisor, m.nombreEmisor, rfcReceptorId, m.rfcReceptor, m.nombreReceptor, m.monto, m.tipoComprobante, m.rfcPac, m.fechaCertSat, jobId, contribuyenteId, ] ); inserted++; } // Invalidar metricas: tanto insert como status→Cancelado afectan el mes. if (contribuyenteId) { const anio = m.fechaEmision.getFullYear(); const mes = m.fechaEmision.getMonth() + 1; await markForInvalidation(pool, contribuyenteId, anio, mes, 'SAT_METADATA').catch( err => console.warn('[SAT] markForInvalidation falló:', err?.message || err), ); } } catch (error: any) { console.error(`[SAT] Error guardando metadata CFDI ${m.uuid}:`, error.message); } } return { inserted, updated }; } /** * Construye el identificador único de un request dentro de un job. * Hay 2-N requests por job (daily=4, initial=N×4) — necesitamos distinguirlos * para reusar el correcto en retries. */ function makeRequestKindKey( fechaInicio: Date, fechaFin: Date, tipoCfdi: CfdiSyncType, requestType: 'cfdi' | 'metadata', ): string { return `${requestType}-${tipoCfdi}-${fechaInicio.toISOString().slice(0, 10)}-${fechaFin.toISOString().slice(0, 10)}`; } /** * Persiste un (kindKey, requestId) en el mapa `sat_request_ids` con merge * atómico SQL. Evita race conditions vs read-modify-write desde JS. * Sigue actualizando `satRequestId` (singular) para backward compat. */ async function persistSatRequestId(jobId: string, kindKey: string, requestId: string): Promise { await prisma.$executeRawUnsafe( `UPDATE sat_sync_jobs SET sat_request_ids = COALESCE(sat_request_ids, '{}'::jsonb) || $1::jsonb, sat_request_id = $2 WHERE id = $3`, JSON.stringify({ [kindKey]: requestId }), requestId, jobId, ); } /** * Solicita, espera y descarga paquetes del SAT para un rango+tipo+requestType. * Retorna los contenidos base64 de los paquetes descargados. * * Reuso de requestIds en retries: * - Antes de crear una nueva solicitud al SAT, busca si el job ya tiene un * requestId guardado para esta misma kindKey (mismo rango+tipo+requestType). * - Si existe: hace `verifySatRequest` directo. Si está listo → descarga. * Si está pending/processing → entra al polling con ese mismo id. * Si está failed/rejected o el verify lanza excepción → fallback a crear nuevo. * - Esto evita quemar cuota del SAT en cada reintento (límite ~5 solicitudes * activas por RFC). */ async function requestAndDownload( ctx: SyncContext, jobId: string, fechaInicio: Date, fechaFin: Date, tipoCfdi: CfdiSyncType, requestType: 'cfdi' | 'metadata', ): Promise<{ packageContents: string[]; totalCfdis: number }> { const label = `${tipoCfdi}/${requestType}`; const kindKey = makeRequestKindKey(fechaInicio, fechaFin, tipoCfdi, requestType); // Intentar reusar requestId previo del mismo job/kindKey (caso retry) const jobRow = await prisma.satSyncJob.findUnique({ where: { id: jobId }, select: { satRequestIds: true, tenantId: true, contribuyenteId: true, dateFrom: true, dateTo: true }, }); let existingMap = (jobRow?.satRequestIds as Record | null) || {}; // Si no existe en el job actual, buscar en el job más reciente del mismo tenant/contribuyente // SOLO si el rango de fechas es idéntico (mismo dateFrom/dateTo). if (!existingMap[kindKey]) { const previousJob = await prisma.satSyncJob.findFirst({ where: { tenantId: jobRow?.tenantId, contribuyenteId: jobRow?.contribuyenteId ?? null, id: { not: jobId }, dateFrom: jobRow?.dateFrom, dateTo: jobRow?.dateTo, }, orderBy: { createdAt: 'desc' }, select: { satRequestIds: true }, }); if (previousJob?.satRequestIds) { const prevMap = previousJob.satRequestIds as Record; if (prevMap[kindKey]) { console.log(`[SAT] Reutilizando requestId de job previo (${label}): ${prevMap[kindKey]}`); // Copiar al job actual para futuros usos await persistSatRequestId(jobId, kindKey, prevMap[kindKey]); existingMap = { ...existingMap, [kindKey]: prevMap[kindKey] }; } } } let requestId: string | null = existingMap[kindKey] || null; let verifyResult: Awaited> | undefined; if (requestId) { console.log(`[SAT] Reusando requestId previo (${label}): ${requestId}`); try { verifyResult = await verifySatRequest(ctx.service, requestId); console.log(`[SAT] Estado del request reusado (${label}): ${verifyResult.status}`); // Estados terminales inválidos → descartar y crear nuevo if (verifyResult.status === 'failed' || verifyResult.status === 'rejected') { console.log(`[SAT] Request reusado en estado ${verifyResult.status}, creando nuevo`); requestId = null; verifyResult = undefined; } } catch (err: any) { // El SAT a veces devuelve errores raros para requestIds expirados (>72h). // Defensivo: descartar y crear nuevo. console.warn(`[SAT] Verify del request reusado falló (${err.message}), creando nuevo`); requestId = null; verifyResult = undefined; } } // Si no hay requestId válido reusable, crear nuevo if (!requestId) { console.log(`[SAT] Solicitando ${label} desde ${fechaInicio.toISOString()} hasta ${fechaFin.toISOString()}`); const queryResult = await querySat(ctx.service, fechaInicio, fechaFin, tipoCfdi, requestType); if (!queryResult.success) { if (queryResult.statusCode === '5004') { console.log(`[SAT] No se encontraron CFDIs (${label})`); return { packageContents: [], totalCfdis: 0 }; } throw new Error(`Error SAT (${label}): ${queryResult.message}`); } requestId = queryResult.requestId!; console.log(`[SAT] Nueva solicitud creada (${label}): ${requestId}`); await persistSatRequestId(jobId, kindKey, requestId); } // Polling — si el reuse ya devolvió `ready`, salta el loop directamente. if (!verifyResult || verifyResult.status !== 'ready') { let attempts = 0; while (attempts < MAX_POLL_ATTEMPTS) { await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)); attempts++; verifyResult = await verifySatRequest(ctx.service, requestId); console.log(`[SAT] Estado ${label}: ${verifyResult.status} (intento ${attempts})`); if (verifyResult.status === 'ready') break; if (verifyResult.status === 'failed' || verifyResult.status === 'rejected') { throw new Error(`Solicitud fallida (${label}): ${verifyResult.message}`); } } } if (!verifyResult || verifyResult.status !== 'ready') { throw new Error(`Timeout esperando respuesta del SAT (${label})`); } const packageContents: string[] = []; for (let i = 0; i < verifyResult.packageIds.length; i++) { const packageId = verifyResult.packageIds[i]; console.log(`[SAT] Descargando paquete ${label} ${i + 1}/${verifyResult.packageIds.length}: ${packageId}`); const downloadResult = await downloadSatPackage(ctx.service, packageId); if (!downloadResult.success) { console.error(`[SAT] Error descargando paquete ${packageId}: ${downloadResult.message}`); continue; } // Guardar en disco como respaldo if (requestType === 'cfdi') { try { saveXmlsToDisk(downloadResult.packageContent, ctx.rfc, tipoCfdi, packageId); } catch (err: any) { console.error(`[SAT] Error guardando XMLs en disco: ${err.message}`); } } packageContents.push(downloadResult.packageContent); } return { packageContents, totalCfdis: verifyResult.totalCfdis }; } /** * Procesa una solicitud de descarga para un rango de fechas. * 1) Descarga XMLs de CFDIs vigentes → INSERT/UPDATE completo * 2) Descarga metadata de todos (vigentes+cancelados) → INSERT básico o UPDATE status */ async function processDateRange( ctx: SyncContext, jobId: string, fechaInicio: Date, fechaFin: Date, tipoCfdi: CfdiSyncType, skipJobUpdate = false ): Promise<{ found: number; downloaded: number; inserted: number; updated: number }> { let totalFound = 0; let totalDownloaded = 0; let totalInserted = 0; let totalUpdated = 0; // Solo XMLs de vigentes (datos completos) try { const { packageContents, totalCfdis } = await requestAndDownload( ctx, jobId, fechaInicio, fechaFin, tipoCfdi, 'cfdi' ); totalFound += totalCfdis; for (const content of packageContents) { const cfdis = processPackage(content, tipoCfdi); totalDownloaded += cfdis.length; console.log(`[SAT] Procesando ${cfdis.length} CFDIs XML del paquete`); const { inserted, updated } = await saveCfdis(await ctx.getPool(), cfdis, jobId, ctx.contribuyenteId); totalInserted += inserted; totalUpdated += updated; } } catch (error: any) { console.error(`[SAT] Error en XMLs ${tipoCfdi}: ${error.message}`); } if (!skipJobUpdate) { await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, }); } return { found: totalFound, downloaded: totalDownloaded, inserted: totalInserted, updated: totalUpdated, }; } /** * Descarga y procesa metadata de un rango de fechas para un tipo de CFDI. * Metadata incluye vigentes + cancelados. */ async function processMetadataRange( ctx: SyncContext, jobId: string, fechaInicio: Date, fechaFin: Date, tipoCfdi: CfdiSyncType ): Promise<{ inserted: number; updated: number }> { let totalInserted = 0; let totalUpdated = 0; try { const { packageContents } = await requestAndDownload( ctx, jobId, fechaInicio, fechaFin, tipoCfdi, 'metadata' ); for (const content of packageContents) { const items = processMetadataPackage(content, tipoCfdi); console.log(`[SAT] Procesando ${items.length} registros de metadata ${tipoCfdi}`); const { inserted, updated } = await saveMetadata(await ctx.getPool(), items, jobId, ctx.contribuyenteId); totalInserted += inserted; totalUpdated += updated; } } catch (error: any) { console.error(`[SAT] Error en metadata ${tipoCfdi}: ${error.message}`); } return { inserted: totalInserted, updated: totalUpdated }; } /** * Determina el tamaño de bloque óptimo consultando metadata del rango completo. * <= 15,000 CFDIs → bloques de 6 meses * > 15,000 CFDIs → bloques de 2 meses */ async function determineChunkMonths( ctx: SyncContext, jobId: string, fechaInicio: Date, fechaFin: Date, ): Promise { const THRESHOLD = 15_000; let totalCfdis = 0; for (const tipo of ['emitidos', 'recibidos'] as const) { try { const { totalCfdis: count } = await requestAndDownload( ctx, jobId, fechaInicio, fechaFin, tipo, 'metadata' ); totalCfdis += count; console.log(`[SAT] Sondeo metadata ${tipo}: ${count} CFDIs en rango completo`); } catch (error: any) { console.log(`[SAT] No se pudo sondear metadata ${tipo}: ${error.message}`); } } const chunkMonths = totalCfdis > THRESHOLD ? 3 : 6; console.log(`[SAT] Total estimado: ${totalCfdis} CFDIs → bloques de ${chunkMonths} meses`); return chunkMonths; } /** * Genera bloques de fechas segmentados por N meses. */ function generateChunks(fechaInicio: Date, fechaFin: Date, chunkMonths: number): { start: Date; end: Date }[] { const chunks: { start: Date; end: Date }[] = []; let current = new Date(fechaInicio); while (current < fechaFin) { const chunkEnd = new Date(current.getFullYear(), current.getMonth() + chunkMonths, 0, 23, 59, 59); const end = chunkEnd > fechaFin ? fechaFin : chunkEnd; chunks.push({ start: new Date(current), end }); current = new Date(current.getFullYear(), current.getMonth() + chunkMonths, 1); } return chunks; } /** * Ejecuta sincronización inicial o por rango personalizado. * - XMLs: bloques de 3 o 6 meses (según volumen) * - Metadata: bloques anuales (siempre) */ async function processInitialSync( ctx: SyncContext, jobId: string, customDateFrom?: Date, customDateTo?: Date ): Promise { const ahora = new Date(); // Exactamente 6 años atrás desde hoy (mismo día del mes), no inicio de mes. // El SAT rechaza "mayor a 6 años" si usamos el día 1 del mes hace 6 años. const inicioHistorico = customDateFrom || new Date(ahora.getFullYear() - YEARS_TO_SYNC, ahora.getMonth(), ahora.getDate()); const fechaFin = customDateTo || ahora; // Paso 1: Sondeo — determinar tamaño de bloque para XMLs const chunkMonths = await determineChunkMonths(ctx, jobId, inicioHistorico, fechaFin); const xmlChunks = generateChunks(inicioHistorico, fechaFin, chunkMonths); const metaChunks = generateChunks(inicioHistorico, fechaFin, 36); // bloques de 3 años console.log(`[SAT] Sincronización: ${xmlChunks.length} bloques XML (${chunkMonths}m) + ${metaChunks.length} bloques metadata (36m)`); let totalFound = 0; let totalDownloaded = 0; let totalInserted = 0; let totalUpdated = 0; const totalSteps = xmlChunks.length * 2 + metaChunks.length * 2; // emitidos + recibidos por cada chunk let completedSteps = 0; // Helper para actualizar progreso acumulado async function reportProgress() { completedSteps++; const progressPercent = totalSteps > 0 ? Math.round((completedSteps / totalSteps) * 100) : 0; await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, progressPercent, }); } // Paso 2: Descargar XMLs de vigentes (bloques de 3/6 meses) for (let i = 0; i < xmlChunks.length; i++) { const { start, end } = xmlChunks[i]; console.log(`[SAT] XML bloque ${i + 1}/${xmlChunks.length}: ${start.toISOString().slice(0, 10)} → ${end.toISOString().slice(0, 10)}`); try { const emitidos = await processDateRange(ctx, jobId, start, end, 'emitidos', true); totalFound += emitidos.found; totalDownloaded += emitidos.downloaded; totalInserted += emitidos.inserted; totalUpdated += emitidos.updated; } catch (error: any) { console.error(`[SAT] Error emitidos XML bloque ${i + 1}:`, error.message); } await reportProgress(); try { const recibidos = await processDateRange(ctx, jobId, start, end, 'recibidos', true); totalFound += recibidos.found; totalDownloaded += recibidos.downloaded; totalInserted += recibidos.inserted; totalUpdated += recibidos.updated; } catch (error: any) { console.error(`[SAT] Error recibidos XML bloque ${i + 1}:`, error.message); } await reportProgress(); await new Promise(resolve => setTimeout(resolve, 5000)); } // Paso 3: Descargar metadata (bloques anuales) for (let i = 0; i < metaChunks.length; i++) { const { start, end } = metaChunks[i]; console.log(`[SAT] Metadata bloque ${i + 1}/${metaChunks.length}: ${start.toISOString().slice(0, 10)} → ${end.toISOString().slice(0, 10)}`); try { const { inserted, updated } = await processMetadataRange(ctx, jobId, start, end, 'emitidos'); totalInserted += inserted; totalUpdated += updated; } catch (error: any) { console.error(`[SAT] Error metadata emitidos bloque ${i + 1}:`, error.message); } await reportProgress(); try { const { inserted, updated } = await processMetadataRange(ctx, jobId, start, end, 'recibidos'); totalInserted += inserted; totalUpdated += updated; } catch (error: any) { console.error(`[SAT] Error metadata recibidos bloque ${i + 1}:`, error.message); } await reportProgress(); await new Promise(resolve => setTimeout(resolve, 5000)); } await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, progressPercent: 100, }); } /** * Ejecuta sincronización diaria (mes actual) */ /** * Procesa un rango personalizado de fechas. * Si el rango supera 6 meses, divide en bloques de 6 meses. * Si no, usa el rango directamente. */ async function processCustomRangeSync( ctx: SyncContext, jobId: string, dateFrom: Date, dateTo: Date ): Promise { const diffMs = dateTo.getTime() - dateFrom.getTime(); const diffMonths = diffMs / (1000 * 60 * 60 * 24 * 30); const MAX_MONTHS_DIRECT = 6; let totalFound = 0; let totalDownloaded = 0; let totalInserted = 0; let totalUpdated = 0; if (diffMonths <= MAX_MONTHS_DIRECT) { // Rango <= 6 meses: solicitud directa sin dividir console.log(`[SAT] Rango personalizado (${diffMonths.toFixed(1)} meses) — solicitud directa`); for (const tipo of ['emitidos', 'recibidos'] as const) { try { const result = await processDateRange(ctx, jobId, dateFrom, dateTo, tipo); totalFound += result.found; totalDownloaded += result.downloaded; totalInserted += result.inserted; totalUpdated += result.updated; } catch (error: any) { console.error(`[SAT] Error ${tipo} rango personalizado:`, error.message); } } } else { // Rango > 6 meses: dividir en bloques de 6 meses const chunks = generateChunks(dateFrom, dateTo, 6); console.log(`[SAT] Rango personalizado (${diffMonths.toFixed(1)} meses) — ${chunks.length} bloques de 6 meses`); for (let i = 0; i < chunks.length; i++) { const { start, end } = chunks[i]; console.log(`[SAT] Bloque ${i + 1}/${chunks.length}: ${start.toISOString().slice(0, 10)} → ${end.toISOString().slice(0, 10)}`); for (const tipo of ['emitidos', 'recibidos'] as const) { try { const result = await processDateRange(ctx, jobId, start, end, tipo); totalFound += result.found; totalDownloaded += result.downloaded; totalInserted += result.inserted; totalUpdated += result.updated; } catch (error: any) { console.error(`[SAT] Error ${tipo} bloque ${i + 1}:`, error.message); } } // Pausa entre bloques para no saturar el SAT if (i < chunks.length - 1) { await new Promise(resolve => setTimeout(resolve, 5000)); } } } // Metadata: siempre el rango completo (incluye cancelados) for (const tipo of ['emitidos', 'recibidos'] as const) { try { const { inserted, updated } = await processMetadataRange(ctx, jobId, dateFrom, dateTo, tipo); totalInserted += inserted; totalUpdated += updated; } catch (error: any) { console.error(`[SAT] Error metadata ${tipo} rango personalizado:`, error.message); } } await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, }); } /** * Sincronización incremental: ventana fija de las últimas 8 horas. * Diseñada para correr 3 veces al día (11:00, 15:00, 19:00) en clientes Enterprise. * La ventana de 8h cubre el gap máximo (03:00 → 11:00) entre el daily y el primer * incremental; los disparos siguientes solapan, pero la unicidad del UUID deduplica. * Fuera de ese rango, el daily de 03:00 se encarga. */ const INCREMENTAL_WINDOW_HOURS = 8; async function processIncrementalSync(ctx: SyncContext, jobId: string): Promise { const ahora = new Date(); const desde = new Date(ahora.getTime() - INCREMENTAL_WINDOW_HOURS * 60 * 60 * 1000); let totalFound = 0; let totalDownloaded = 0; let totalInserted = 0; let totalUpdated = 0; console.log(`[SAT] Incremental: ${desde.toISOString()} → ${ahora.toISOString()} (${INCREMENTAL_WINDOW_HOURS}h)`); for (const tipo of ['emitidos', 'recibidos'] as const) { try { const result = await processDateRange(ctx, jobId, desde, ahora, tipo); totalFound += result.found; totalDownloaded += result.downloaded; totalInserted += result.inserted; totalUpdated += result.updated; } catch (error: any) { console.error(`[SAT] Error incremental XMLs ${tipo}:`, error.message); } } for (const tipo of ['emitidos', 'recibidos'] as const) { try { const { inserted, updated } = await processMetadataRange(ctx, jobId, desde, ahora, tipo); totalInserted += inserted; totalUpdated += updated; } catch (error: any) { console.error(`[SAT] Error incremental metadata ${tipo}:`, error.message); } } await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, }); } async function processDailySync(ctx: SyncContext, jobId: string): Promise { const ahora = new Date(); const inicioAño = new Date(ahora.getFullYear(), 0, 1); const hace7Dias = new Date(ahora.getTime() - 7 * 24 * 60 * 60 * 1000); let totalFound = 0; let totalDownloaded = 0; let totalInserted = 0; let totalUpdated = 0; // Paso 1: XMLs de los últimos 7 días (CFDIs nuevos) console.log(`[SAT] Daily: XMLs desde ${hace7Dias.toISOString().slice(0, 10)} → ${ahora.toISOString().slice(0, 10)}`); for (const tipo of ['emitidos', 'recibidos'] as const) { try { const result = await processDateRange(ctx, jobId, hace7Dias, ahora, tipo); totalFound += result.found; totalDownloaded += result.downloaded; totalInserted += result.inserted; totalUpdated += result.updated; } catch (error: any) { console.error(`[SAT] Error XMLs ${tipo} (7 días):`, error.message); } } // Paso 2: Metadata del ciclo fiscal actual (enero → hoy) // Captura cancelaciones y cambios de status del año completo console.log(`[SAT] Daily: Metadata desde ${inicioAño.toISOString().slice(0, 10)} → ${ahora.toISOString().slice(0, 10)}`); for (const tipo of ['emitidos', 'recibidos'] as const) { try { const { inserted, updated } = await processMetadataRange(ctx, jobId, inicioAño, ahora, tipo); totalInserted += inserted; totalUpdated += updated; } catch (error: any) { console.error(`[SAT] Error metadata ${tipo} (ciclo fiscal):`, error.message); } } await updateJobProgress(jobId, { cfdisFound: totalFound, cfdisDownloaded: totalDownloaded, cfdisInserted: totalInserted, cfdisUpdated: totalUpdated, }); } /** * Inicia la sincronización con el SAT */ export async function startSync( tenantId: string, type: SatSyncType = 'daily', dateFrom?: Date, dateTo?: Date, contribuyenteId?: string ): Promise { // Try per-contribuyente FIEL first (despachos), then legacy (Horux360) let decryptedFiel = null; if (contribuyenteId) { const tenant = await prisma.tenant.findUnique({ where: { id: tenantId }, select: { databaseName: true } }); if (tenant) { const pool = await tenantDb.getPool(tenantId, tenant.databaseName); decryptedFiel = await getDecryptedFielContribuyente(pool, contribuyenteId); } } if (!decryptedFiel) { decryptedFiel = await getDecryptedFiel(tenantId); } if (!decryptedFiel) { throw new Error('No hay FIEL configurada o está vencida'); } const fielData: FielData = { cerContent: decryptedFiel.cerContent, keyContent: decryptedFiel.keyContent, password: decryptedFiel.password, }; const service = createSatService(fielData); const tenant = await prisma.tenant.findUnique({ where: { id: tenantId }, select: { databaseName: true }, }); if (!tenant) { throw new Error('Tenant no encontrado'); } // Lock a nivel (tenantId, contribuyenteId). Contribuyentes distintos dentro // del mismo tenant (despacho) pueden sincronizarse en paralelo — cada uno // usa su propio FIEL y su propio conjunto de CFDIs. El null de Horux 360 // (tenant-wide) solo se bloquea contra sí mismo. const activeSync = await prisma.satSyncJob.findFirst({ where: { tenantId, contribuyenteId: contribuyenteId ?? null, status: { in: ['pending', 'running'] }, }, }); if (activeSync) { throw new Error('Ya hay una sincronización en curso'); } const now = new Date(); // `isCustomRange` solo aplica a 'initial' con fechas explícitas del UI. // Bootstrap puro = initial sin fechas → usa default de 6 años atrás. const isCustomRange = type === 'initial' && (!!dateFrom || !!dateTo); const job = await prisma.satSyncJob.create({ data: { tenantId, contribuyenteId: contribuyenteId || null, type, status: 'running', dateFrom: dateFrom || new Date(now.getFullYear() - YEARS_TO_SYNC, 0, 1), dateTo: dateTo || now, startedAt: now, isCustomRange, }, }); const ctx: SyncContext = { fielData, service, rfc: decryptedFiel.rfc, tenantId, databaseName: tenant.databaseName, contribuyenteId: contribuyenteId || null, getPool: () => tenantDb.getPool(tenantId, tenant.databaseName), }; // Ejecutar sincronización en background (async () => { try { if (type === 'initial') { await processInitialSync(ctx, job.id, dateFrom, dateTo); } else if (type === 'incremental') { await processIncrementalSync(ctx, job.id); } else if (dateFrom && dateTo) { await processCustomRangeSync(ctx, job.id, dateFrom, dateTo); } else { await processDailySync(ctx, job.id); } await updateJobProgress(job.id, { status: 'completed', completedAt: new Date(), progressPercent: 100, }); console.log(`[SAT] Sincronización ${job.id} completada`); } catch (error: any) { console.error(`[SAT] Error en sincronización ${job.id}:`, error); const isTimeout = error.message?.includes('Timeout'); const currentRetries = job.retryCount || 0; const policy = getRetryPolicy(job); const nextRetryNumber = currentRetries + 1; const nextRetry = isTimeout && nextRetryNumber <= policy.maxRetries ? computeNextRetryAt(job.startedAt!, nextRetryNumber, policy) : null; if (nextRetry) { await updateJobProgress(job.id, { status: 'pending', errorMessage: `Timeout (intento ${nextRetryNumber}/${policy.maxRetries}). Reintento programado para ${nextRetry.toLocaleString('es-MX')}.`, retryCount: nextRetryNumber, nextRetryAt: nextRetry, }); console.log(`[SAT] Job ${job.id} programado para reintento ${nextRetryNumber}/${policy.maxRetries} a las ${nextRetry.toLocaleString('es-MX')}`); } else { // Sin reintentos restantes, error no-timeout, o policy con maxRetries=0 (incremental) const finalMsg = isTimeout ? policy.maxRetries === 0 ? 'Timeout en sync incremental — sin reintentos por política. Próximo cron incremental cubrirá el gap.' : 'Fallo conexión SAT, vuelve a intentar con un rango de fechas menor.' : error.message; await updateJobProgress(job.id, { status: 'failed', errorMessage: finalMsg, completedAt: new Date(), }); } } })(); return job.id; } /** * Reintenta jobs de SAT que tienen nextRetryAt pasado. * Llamado por el cron cada hora. */ export async function retryTimedOutJobs(): Promise { // No filtramos por retryCount aquí porque el max es per-policy (varía por // type + isCustomRange). El catch del retry ya valida y marca failed si // se excede. Los jobs con maxRetries=0 nunca llegan a status='pending', // van directo a 'failed', así que no aparecen acá. const pendingJobs = await prisma.satSyncJob.findMany({ where: { status: 'pending', nextRetryAt: { lte: new Date() }, }, include: { tenant: { select: { id: true, databaseName: true, rfc: true } } }, }); if (pendingJobs.length === 0) return; console.log(`[SAT Retry] ${pendingJobs.length} job(s) pendientes de reintento`); for (const job of pendingJobs) { try { // Verificar que no haya otro sync activo para el MISMO (tenant, contribuyente). // Contribuyentes distintos pueden correr en paralelo. const activeSync = await prisma.satSyncJob.findFirst({ where: { tenantId: job.tenantId, contribuyenteId: job.contribuyenteId ?? null, status: 'running', }, }); if (activeSync) { console.log(`[SAT Retry] (${job.tenant.rfc}, contrib=${job.contribuyenteId || 'tenant-wide'}) tiene sync activo, posponiendo`); await updateJobProgress(job.id, { nextRetryAt: new Date(Date.now() + 60 * 60 * 1000), // +1 hora }); continue; } console.log(`[SAT Retry] Reintentando job ${job.id} (${job.tenant.rfc}), intento ${job.retryCount}/${getRetryPolicy(job).maxRetries}`); // Try per-contribuyente FIEL first, then legacy let decryptedFiel = null; if (job.contribuyenteId) { const pool = await tenantDb.getPool(job.tenantId, job.tenant.databaseName); decryptedFiel = await getDecryptedFielContribuyente(pool, job.contribuyenteId); } if (!decryptedFiel) { decryptedFiel = await getDecryptedFiel(job.tenantId); } if (!decryptedFiel) { await updateJobProgress(job.id, { status: 'failed', errorMessage: 'FIEL no disponible para reintento', completedAt: new Date(), }); continue; } const service = createSatService({ cerContent: decryptedFiel.cerContent, keyContent: decryptedFiel.keyContent, password: decryptedFiel.password, }); const ctx: SyncContext = { fielData: { cerContent: decryptedFiel.cerContent, keyContent: decryptedFiel.keyContent, password: decryptedFiel.password, }, service, rfc: decryptedFiel.rfc, tenantId: job.tenantId, databaseName: job.tenant.databaseName, contribuyenteId: job.contribuyenteId ?? null, getPool: () => tenantDb.getPool(job.tenantId, job.tenant.databaseName), }; await updateJobProgress(job.id, { status: 'running', errorMessage: null as any }); // Re-ejecutar según tipo original try { if (job.type === 'initial') { await processInitialSync(ctx, job.id, job.dateFrom, job.dateTo); } else if (job.type === 'incremental') { await processIncrementalSync(ctx, job.id); } else { await processDailySync(ctx, job.id); } await updateJobProgress(job.id, { status: 'completed', completedAt: new Date(), progressPercent: 100, errorMessage: null as any, }); console.log(`[SAT Retry] Job ${job.id} completado en reintento ${job.retryCount}`); } catch (retryError: any) { console.error(`[SAT Retry] Job ${job.id} falló de nuevo:`, retryError.message); const isTimeout = retryError.message?.includes('Timeout'); const policy = getRetryPolicy(job); const nextRetryNumber = job.retryCount + 1; const nextRetry = isTimeout && nextRetryNumber <= policy.maxRetries ? computeNextRetryAt(job.startedAt!, nextRetryNumber, policy) : null; if (nextRetry) { await updateJobProgress(job.id, { status: 'pending', errorMessage: `Timeout (intento ${nextRetryNumber}/${policy.maxRetries}). Reintento programado para ${nextRetry.toLocaleString('es-MX')}.`, retryCount: nextRetryNumber, nextRetryAt: nextRetry, }); } else { await updateJobProgress(job.id, { status: 'failed', errorMessage: isTimeout ? 'Fallo conexión SAT, vuelve a intentar con un rango de fechas menor.' : retryError.message, completedAt: new Date(), }); } } } catch (error: any) { console.error(`[SAT Retry] Error procesando job ${job.id}:`, error.message); await updateJobProgress(job.id, { status: 'failed', errorMessage: error.message, completedAt: new Date(), }); } } } /** * Obtiene el estado actual de sincronización de un tenant */ export async function getSyncStatus(tenantId: string, contribuyenteId?: string): Promise<{ hasActiveSync: boolean; currentJob?: SatSyncJob; lastCompletedJob?: SatSyncJob; totalCfdisSynced: number; }> { const contribuyenteFilter = contribuyenteId ? { contribuyenteId } : {}; const activeJob = await prisma.satSyncJob.findFirst({ where: { tenantId, ...contribuyenteFilter, status: { in: ['pending', 'running'] }, }, orderBy: { createdAt: 'desc' }, }); const lastCompleted = await prisma.satSyncJob.findFirst({ where: { tenantId, ...contribuyenteFilter, status: 'completed', }, orderBy: { completedAt: 'desc' }, }); const totals = await prisma.satSyncJob.aggregate({ where: { tenantId, ...contribuyenteFilter, status: 'completed', }, _sum: { cfdisInserted: true, }, }); const mapJob = (job: any): SatSyncJob => ({ id: job.id, tenantId: job.tenantId, type: job.type, status: job.status, dateFrom: job.dateFrom.toISOString(), dateTo: job.dateTo.toISOString(), cfdiType: job.cfdiType ?? undefined, satRequestId: job.satRequestId ?? undefined, satPackageIds: job.satPackageIds, cfdisFound: job.cfdisFound, cfdisDownloaded: job.cfdisDownloaded, cfdisInserted: job.cfdisInserted, cfdisUpdated: job.cfdisUpdated, progressPercent: job.progressPercent, errorMessage: job.errorMessage ?? undefined, startedAt: job.startedAt?.toISOString(), completedAt: job.completedAt?.toISOString(), createdAt: job.createdAt.toISOString(), retryCount: job.retryCount, }); return { hasActiveSync: !!activeJob, currentJob: activeJob ? mapJob(activeJob) : undefined, lastCompletedJob: lastCompleted ? mapJob(lastCompleted) : undefined, totalCfdisSynced: totals._sum.cfdisInserted || 0, }; } /** * Obtiene el historial de sincronizaciones */ export async function getSyncHistory( tenantId: string, page: number = 1, limit: number = 10, contribuyenteId?: string ): Promise<{ jobs: SatSyncJob[]; total: number }> { const contribuyenteFilter = contribuyenteId ? { contribuyenteId } : {}; const [jobs, total] = await Promise.all([ prisma.satSyncJob.findMany({ where: { tenantId, ...contribuyenteFilter }, orderBy: { createdAt: 'desc' }, skip: (page - 1) * limit, take: limit, }), prisma.satSyncJob.count({ where: { tenantId, ...contribuyenteFilter } }), ]); return { jobs: jobs.map(job => ({ id: job.id, tenantId: job.tenantId, type: job.type, status: job.status, dateFrom: job.dateFrom.toISOString(), dateTo: job.dateTo.toISOString(), cfdiType: job.cfdiType ?? undefined, satRequestId: job.satRequestId ?? undefined, satPackageIds: job.satPackageIds, cfdisFound: job.cfdisFound, cfdisDownloaded: job.cfdisDownloaded, cfdisInserted: job.cfdisInserted, cfdisUpdated: job.cfdisUpdated, progressPercent: job.progressPercent, errorMessage: job.errorMessage ?? undefined, startedAt: job.startedAt?.toISOString(), completedAt: job.completedAt?.toISOString(), createdAt: job.createdAt.toISOString(), retryCount: job.retryCount, })), total, }; } /** * Reintenta un job fallido */ export async function retryJob(jobId: string): Promise { const job = await prisma.satSyncJob.findUnique({ where: { id: jobId }, }); if (!job) { throw new Error('Job no encontrado'); } if (job.status !== 'failed') { throw new Error('Solo se pueden reintentar jobs fallidos'); } return startSync(job.tenantId, job.type, job.dateFrom, job.dateTo, job.contribuyenteId ?? undefined); }