diff --git a/apps/api/src/services/sat/sat.service.ts b/apps/api/src/services/sat/sat.service.ts new file mode 100644 index 0000000..7c41e3e --- /dev/null +++ b/apps/api/src/services/sat/sat.service.ts @@ -0,0 +1,618 @@ +import { prisma } from '../../config/database.js'; +import { getDecryptedFiel } from '../fiel.service.js'; +import { authenticate, isTokenValid } from './sat-auth.service.js'; +import { + requestDownload, + verifyRequest, + downloadPackage, + isRequestComplete, + isRequestFailed, + isRequestInProgress, + SAT_REQUEST_STATES, +} from './sat-download.service.js'; +import { processPackage, type CfdiParsed } from './sat-parser.service.js'; +import type { SatSyncJob, CfdiSyncType, SatSyncType } from '@horux/shared'; +import type { Credential } from '@nodecfdi/credentials/node'; + +const POLL_INTERVAL_MS = 30000; // 30 segundos +const MAX_POLL_ATTEMPTS = 60; // 30 minutos máximo +const YEARS_TO_SYNC = 10; + +interface SyncContext { + credential: Credential; + token: string; + tokenExpiresAt: Date; + rfc: string; + tenantId: string; + schemaName: string; +} + +/** + * Obtiene o renueva el token de autenticación + */ +async function ensureValidToken(ctx: SyncContext): Promise { + if (!isTokenValid({ token: ctx.token, expiresAt: ctx.tokenExpiresAt })) { + console.log('[SAT] Renovando token...'); + const newToken = await authenticate(ctx.credential); + ctx.token = newToken.token; + ctx.tokenExpiresAt = newToken.expiresAt; + } +} + +/** + * Actualiza el progreso de un job + */ +async function updateJobProgress( + jobId: string, + updates: Partial<{ + status: 'pending' | 'running' | 'completed' | 'failed'; + satRequestId: string; + satPackageIds: string[]; + cfdisFound: number; + cfdisDownloaded: number; + cfdisInserted: number; + cfdisUpdated: number; + progressPercent: number; + errorMessage: string; + startedAt: Date; + completedAt: Date; + retryCount: number; + nextRetryAt: Date; + }> +): Promise { + await prisma.satSyncJob.update({ + where: { id: jobId }, + data: updates, + }); +} + +/** + * Guarda los CFDIs en la base de datos del tenant + */ +async function saveCfdis( + schemaName: string, + cfdis: CfdiParsed[], + jobId: string +): Promise<{ inserted: number; updated: number }> { + let inserted = 0; + let updated = 0; + + for (const cfdi of cfdis) { + try { + // Usar raw query para el esquema del tenant + const existing = await prisma.$queryRawUnsafe<{ id: string }[]>( + `SELECT id FROM "${schemaName}".cfdis WHERE uuid_fiscal = $1`, + cfdi.uuidFiscal + ); + + if (existing.length > 0) { + // Actualizar CFDI existente + await prisma.$executeRawUnsafe( + `UPDATE "${schemaName}".cfdis SET + tipo = $2, + serie = $3, + folio = $4, + fecha_emision = $5, + fecha_timbrado = $6, + rfc_emisor = $7, + nombre_emisor = $8, + rfc_receptor = $9, + nombre_receptor = $10, + subtotal = $11, + descuento = $12, + iva = $13, + isr_retenido = $14, + iva_retenido = $15, + total = $16, + moneda = $17, + tipo_cambio = $18, + metodo_pago = $19, + forma_pago = $20, + uso_cfdi = $21, + estado = $22, + xml_original = $23, + last_sat_sync = NOW(), + sat_sync_job_id = $24, + updated_at = NOW() + WHERE uuid_fiscal = $1`, + cfdi.uuidFiscal, + cfdi.tipo, + cfdi.serie, + cfdi.folio, + cfdi.fechaEmision, + cfdi.fechaTimbrado, + cfdi.rfcEmisor, + cfdi.nombreEmisor, + cfdi.rfcReceptor, + cfdi.nombreReceptor, + cfdi.subtotal, + cfdi.descuento, + cfdi.iva, + cfdi.isrRetenido, + cfdi.ivaRetenido, + cfdi.total, + cfdi.moneda, + cfdi.tipoCambio, + cfdi.metodoPago, + cfdi.formaPago, + cfdi.usoCfdi, + cfdi.estado, + cfdi.xmlOriginal, + jobId + ); + updated++; + } else { + // Insertar nuevo CFDI + await prisma.$executeRawUnsafe( + `INSERT INTO "${schemaName}".cfdis ( + id, uuid_fiscal, tipo, serie, folio, fecha_emision, fecha_timbrado, + rfc_emisor, nombre_emisor, rfc_receptor, nombre_receptor, + subtotal, descuento, iva, isr_retenido, iva_retenido, total, + moneda, tipo_cambio, metodo_pago, forma_pago, uso_cfdi, estado, + xml_original, source, sat_sync_job_id, last_sat_sync, created_at + ) VALUES ( + gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, + $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, + $23, 'sat', $24, NOW(), NOW() + )`, + cfdi.uuidFiscal, + cfdi.tipo, + cfdi.serie, + cfdi.folio, + cfdi.fechaEmision, + cfdi.fechaTimbrado, + cfdi.rfcEmisor, + cfdi.nombreEmisor, + cfdi.rfcReceptor, + cfdi.nombreReceptor, + cfdi.subtotal, + cfdi.descuento, + cfdi.iva, + cfdi.isrRetenido, + cfdi.ivaRetenido, + cfdi.total, + cfdi.moneda, + cfdi.tipoCambio, + cfdi.metodoPago, + cfdi.formaPago, + cfdi.usoCfdi, + cfdi.estado, + cfdi.xmlOriginal, + jobId + ); + inserted++; + } + } catch (error) { + console.error(`[SAT] Error guardando CFDI ${cfdi.uuidFiscal}:`, error); + } + } + + return { inserted, updated }; +} + +/** + * Procesa una solicitud de descarga para un rango de fechas + */ +async function processDateRange( + ctx: SyncContext, + jobId: string, + fechaInicio: Date, + fechaFin: Date, + tipoCfdi: CfdiSyncType +): Promise<{ found: number; downloaded: number; inserted: number; updated: number }> { + console.log(`[SAT] Procesando ${tipoCfdi} desde ${fechaInicio.toISOString()} hasta ${fechaFin.toISOString()}`); + + await ensureValidToken(ctx); + + // 1. Solicitar descarga + const requestResponse = await requestDownload({ + credential: ctx.credential, + token: ctx.token, + rfc: ctx.rfc, + fechaInicio, + fechaFin, + tipoSolicitud: 'CFDI', + tipoCfdi, + }); + + if (requestResponse.codEstatus !== '5000') { + if (requestResponse.codEstatus === '5004') { + console.log('[SAT] No se encontraron CFDIs en el rango'); + return { found: 0, downloaded: 0, inserted: 0, updated: 0 }; + } + throw new Error(`Error SAT: ${requestResponse.codEstatus} - ${requestResponse.mensaje}`); + } + + const idSolicitud = requestResponse.idSolicitud; + console.log(`[SAT] Solicitud creada: ${idSolicitud}`); + + await updateJobProgress(jobId, { satRequestId: idSolicitud }); + + // 2. Esperar y verificar solicitud + let verifyResponse; + let attempts = 0; + + while (attempts < MAX_POLL_ATTEMPTS) { + await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)); + attempts++; + + await ensureValidToken(ctx); + verifyResponse = await verifyRequest(ctx.credential, ctx.token, ctx.rfc, idSolicitud); + + console.log(`[SAT] Estado solicitud: ${verifyResponse.estadoSolicitud} (intento ${attempts})`); + + if (isRequestComplete(verifyResponse.estadoSolicitud)) { + break; + } + + if (isRequestFailed(verifyResponse.estadoSolicitud)) { + throw new Error(`Solicitud fallida: ${verifyResponse.mensaje}`); + } + } + + if (!verifyResponse || !isRequestComplete(verifyResponse.estadoSolicitud)) { + throw new Error('Timeout esperando respuesta del SAT'); + } + + // 3. Descargar paquetes + const packageIds = verifyResponse.paquetes; + await updateJobProgress(jobId, { + satPackageIds: packageIds, + cfdisFound: verifyResponse.numeroCfdis, + }); + + let totalInserted = 0; + let totalUpdated = 0; + let totalDownloaded = 0; + + for (let i = 0; i < packageIds.length; i++) { + const packageId = packageIds[i]; + console.log(`[SAT] Descargando paquete ${i + 1}/${packageIds.length}: ${packageId}`); + + await ensureValidToken(ctx); + const packageResponse = await downloadPackage(ctx.credential, ctx.token, ctx.rfc, packageId); + + // 4. Procesar paquete + const cfdis = processPackage(packageResponse.paquete); + totalDownloaded += cfdis.length; + + console.log(`[SAT] Procesando ${cfdis.length} CFDIs del paquete`); + + const { inserted, updated } = await saveCfdis(ctx.schemaName, cfdis, jobId); + totalInserted += inserted; + totalUpdated += updated; + + // Actualizar progreso + const progress = Math.round(((i + 1) / packageIds.length) * 100); + await updateJobProgress(jobId, { + cfdisDownloaded: totalDownloaded, + cfdisInserted: totalInserted, + cfdisUpdated: totalUpdated, + progressPercent: progress, + }); + } + + return { + found: verifyResponse.numeroCfdis, + downloaded: totalDownloaded, + inserted: totalInserted, + updated: totalUpdated, + }; +} + +/** + * Ejecuta sincronización inicial (últimos 10 años) + */ +async function processInitialSync(ctx: SyncContext, jobId: string): Promise { + const ahora = new Date(); + const inicioHistorico = new Date(ahora.getFullYear() - YEARS_TO_SYNC, ahora.getMonth(), 1); + + let totalFound = 0; + let totalDownloaded = 0; + let totalInserted = 0; + let totalUpdated = 0; + + // Procesar por meses para evitar límites del SAT + let currentDate = new Date(inicioHistorico); + + while (currentDate < ahora) { + const monthEnd = new Date(currentDate.getFullYear(), currentDate.getMonth() + 1, 0, 23, 59, 59); + const rangeEnd = monthEnd > ahora ? ahora : monthEnd; + + // Procesar emitidos + try { + const emitidos = await processDateRange(ctx, jobId, currentDate, rangeEnd, 'emitidos'); + totalFound += emitidos.found; + totalDownloaded += emitidos.downloaded; + totalInserted += emitidos.inserted; + totalUpdated += emitidos.updated; + } catch (error: any) { + console.error(`[SAT] Error procesando emitidos ${currentDate.toISOString()}:`, error.message); + } + + // Procesar recibidos + try { + const recibidos = await processDateRange(ctx, jobId, currentDate, rangeEnd, 'recibidos'); + totalFound += recibidos.found; + totalDownloaded += recibidos.downloaded; + totalInserted += recibidos.inserted; + totalUpdated += recibidos.updated; + } catch (error: any) { + console.error(`[SAT] Error procesando recibidos ${currentDate.toISOString()}:`, error.message); + } + + // Siguiente mes + currentDate = new Date(currentDate.getFullYear(), currentDate.getMonth() + 1, 1); + + // Pequeña pausa entre meses para no saturar el SAT + await new Promise(resolve => setTimeout(resolve, 5000)); + } + + await updateJobProgress(jobId, { + cfdisFound: totalFound, + cfdisDownloaded: totalDownloaded, + cfdisInserted: totalInserted, + cfdisUpdated: totalUpdated, + }); +} + +/** + * Ejecuta sincronización diaria (mes actual) + */ +async function processDailySync(ctx: SyncContext, jobId: string): Promise { + const ahora = new Date(); + const inicioMes = new Date(ahora.getFullYear(), ahora.getMonth(), 1); + + let totalFound = 0; + let totalDownloaded = 0; + let totalInserted = 0; + let totalUpdated = 0; + + // Procesar emitidos del mes + try { + const emitidos = await processDateRange(ctx, jobId, inicioMes, ahora, 'emitidos'); + totalFound += emitidos.found; + totalDownloaded += emitidos.downloaded; + totalInserted += emitidos.inserted; + totalUpdated += emitidos.updated; + } catch (error: any) { + console.error('[SAT] Error procesando emitidos:', error.message); + } + + // Procesar recibidos del mes + try { + const recibidos = await processDateRange(ctx, jobId, inicioMes, ahora, 'recibidos'); + totalFound += recibidos.found; + totalDownloaded += recibidos.downloaded; + totalInserted += recibidos.inserted; + totalUpdated += recibidos.updated; + } catch (error: any) { + console.error('[SAT] Error procesando recibidos:', error.message); + } + + await updateJobProgress(jobId, { + cfdisFound: totalFound, + cfdisDownloaded: totalDownloaded, + cfdisInserted: totalInserted, + cfdisUpdated: totalUpdated, + }); +} + +/** + * Inicia la sincronización con el SAT + */ +export async function startSync( + tenantId: string, + type: SatSyncType = 'daily', + dateFrom?: Date, + dateTo?: Date +): Promise { + // Obtener credenciales FIEL + const fielData = await getDecryptedFiel(tenantId); + if (!fielData) { + throw new Error('No hay FIEL configurada o está vencida'); + } + + // Obtener datos del tenant + const tenant = await prisma.tenant.findUnique({ + where: { id: tenantId }, + select: { schemaName: true }, + }); + + if (!tenant) { + throw new Error('Tenant no encontrado'); + } + + // Verificar que no haya sync activo + const activeSync = await prisma.satSyncJob.findFirst({ + where: { + tenantId, + status: { in: ['pending', 'running'] }, + }, + }); + + if (activeSync) { + throw new Error('Ya hay una sincronización en curso'); + } + + // Crear job + const now = new Date(); + const job = await prisma.satSyncJob.create({ + data: { + tenantId, + type, + status: 'running', + dateFrom: dateFrom || new Date(now.getFullYear() - YEARS_TO_SYNC, 0, 1), + dateTo: dateTo || now, + startedAt: now, + }, + }); + + // Autenticar con SAT + const tokenData = await authenticate(fielData.credential); + + const ctx: SyncContext = { + credential: fielData.credential, + token: tokenData.token, + tokenExpiresAt: tokenData.expiresAt, + rfc: fielData.rfc, + tenantId, + schemaName: tenant.schemaName, + }; + + // Ejecutar sincronización en background + (async () => { + try { + if (type === 'initial') { + await processInitialSync(ctx, job.id); + } else { + await processDailySync(ctx, job.id); + } + + await updateJobProgress(job.id, { + status: 'completed', + completedAt: new Date(), + progressPercent: 100, + }); + + console.log(`[SAT] Sincronización ${job.id} completada`); + } catch (error: any) { + console.error(`[SAT] Error en sincronización ${job.id}:`, error); + await updateJobProgress(job.id, { + status: 'failed', + errorMessage: error.message, + completedAt: new Date(), + }); + } + })(); + + return job.id; +} + +/** + * Obtiene el estado actual de sincronización de un tenant + */ +export async function getSyncStatus(tenantId: string): Promise<{ + hasActiveSync: boolean; + currentJob?: SatSyncJob; + lastCompletedJob?: SatSyncJob; + totalCfdisSynced: number; +}> { + const activeJob = await prisma.satSyncJob.findFirst({ + where: { + tenantId, + status: { in: ['pending', 'running'] }, + }, + orderBy: { createdAt: 'desc' }, + }); + + const lastCompleted = await prisma.satSyncJob.findFirst({ + where: { + tenantId, + status: 'completed', + }, + orderBy: { completedAt: 'desc' }, + }); + + const totals = await prisma.satSyncJob.aggregate({ + where: { + tenantId, + status: 'completed', + }, + _sum: { + cfdisInserted: true, + }, + }); + + const mapJob = (job: any): SatSyncJob => ({ + id: job.id, + tenantId: job.tenantId, + type: job.type, + status: job.status, + dateFrom: job.dateFrom.toISOString(), + dateTo: job.dateTo.toISOString(), + cfdiType: job.cfdiType ?? undefined, + satRequestId: job.satRequestId ?? undefined, + satPackageIds: job.satPackageIds, + cfdisFound: job.cfdisFound, + cfdisDownloaded: job.cfdisDownloaded, + cfdisInserted: job.cfdisInserted, + cfdisUpdated: job.cfdisUpdated, + progressPercent: job.progressPercent, + errorMessage: job.errorMessage ?? undefined, + startedAt: job.startedAt?.toISOString(), + completedAt: job.completedAt?.toISOString(), + createdAt: job.createdAt.toISOString(), + retryCount: job.retryCount, + }); + + return { + hasActiveSync: !!activeJob, + currentJob: activeJob ? mapJob(activeJob) : undefined, + lastCompletedJob: lastCompleted ? mapJob(lastCompleted) : undefined, + totalCfdisSynced: totals._sum.cfdisInserted || 0, + }; +} + +/** + * Obtiene el historial de sincronizaciones + */ +export async function getSyncHistory( + tenantId: string, + page: number = 1, + limit: number = 10 +): Promise<{ jobs: SatSyncJob[]; total: number }> { + const [jobs, total] = await Promise.all([ + prisma.satSyncJob.findMany({ + where: { tenantId }, + orderBy: { createdAt: 'desc' }, + skip: (page - 1) * limit, + take: limit, + }), + prisma.satSyncJob.count({ where: { tenantId } }), + ]); + + return { + jobs: jobs.map(job => ({ + id: job.id, + tenantId: job.tenantId, + type: job.type, + status: job.status, + dateFrom: job.dateFrom.toISOString(), + dateTo: job.dateTo.toISOString(), + cfdiType: job.cfdiType ?? undefined, + satRequestId: job.satRequestId ?? undefined, + satPackageIds: job.satPackageIds, + cfdisFound: job.cfdisFound, + cfdisDownloaded: job.cfdisDownloaded, + cfdisInserted: job.cfdisInserted, + cfdisUpdated: job.cfdisUpdated, + progressPercent: job.progressPercent, + errorMessage: job.errorMessage ?? undefined, + startedAt: job.startedAt?.toISOString(), + completedAt: job.completedAt?.toISOString(), + createdAt: job.createdAt.toISOString(), + retryCount: job.retryCount, + })), + total, + }; +} + +/** + * Reintenta un job fallido + */ +export async function retryJob(jobId: string): Promise { + const job = await prisma.satSyncJob.findUnique({ + where: { id: jobId }, + }); + + if (!job) { + throw new Error('Job no encontrado'); + } + + if (job.status !== 'failed') { + throw new Error('Solo se pueden reintentar jobs fallidos'); + } + + return startSync(job.tenantId, job.type, job.dateFrom, job.dateTo); +}