feat(sat): add main sync orchestrator service (Phase 5)

- Add sat.service.ts as the main orchestrator that coordinates:
  - FIEL credential retrieval and token management
  - SAT download request workflow
  - Package processing and CFDI storage
  - Progress tracking and job management
- Support for initial sync (10 years history) and daily sync
- Automatic token refresh during long-running syncs
- Month-by-month processing to avoid SAT limits
- Raw SQL queries for multi-tenant schema isolation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Consultoria AS
2026-01-25 00:52:18 +00:00
parent 09684f77b9
commit 473912bfd7

View File

@@ -0,0 +1,618 @@
import { prisma } from '../../config/database.js';
import { getDecryptedFiel } from '../fiel.service.js';
import { authenticate, isTokenValid } from './sat-auth.service.js';
import {
requestDownload,
verifyRequest,
downloadPackage,
isRequestComplete,
isRequestFailed,
isRequestInProgress,
SAT_REQUEST_STATES,
} from './sat-download.service.js';
import { processPackage, type CfdiParsed } from './sat-parser.service.js';
import type { SatSyncJob, CfdiSyncType, SatSyncType } from '@horux/shared';
import type { Credential } from '@nodecfdi/credentials/node';
const POLL_INTERVAL_MS = 30000; // 30 segundos
const MAX_POLL_ATTEMPTS = 60; // 30 minutos máximo
const YEARS_TO_SYNC = 10;
interface SyncContext {
credential: Credential;
token: string;
tokenExpiresAt: Date;
rfc: string;
tenantId: string;
schemaName: string;
}
/**
* Obtiene o renueva el token de autenticación
*/
async function ensureValidToken(ctx: SyncContext): Promise<void> {
if (!isTokenValid({ token: ctx.token, expiresAt: ctx.tokenExpiresAt })) {
console.log('[SAT] Renovando token...');
const newToken = await authenticate(ctx.credential);
ctx.token = newToken.token;
ctx.tokenExpiresAt = newToken.expiresAt;
}
}
/**
* Actualiza el progreso de un job
*/
async function updateJobProgress(
jobId: string,
updates: Partial<{
status: 'pending' | 'running' | 'completed' | 'failed';
satRequestId: string;
satPackageIds: string[];
cfdisFound: number;
cfdisDownloaded: number;
cfdisInserted: number;
cfdisUpdated: number;
progressPercent: number;
errorMessage: string;
startedAt: Date;
completedAt: Date;
retryCount: number;
nextRetryAt: Date;
}>
): Promise<void> {
await prisma.satSyncJob.update({
where: { id: jobId },
data: updates,
});
}
/**
* Guarda los CFDIs en la base de datos del tenant
*/
async function saveCfdis(
schemaName: string,
cfdis: CfdiParsed[],
jobId: string
): Promise<{ inserted: number; updated: number }> {
let inserted = 0;
let updated = 0;
for (const cfdi of cfdis) {
try {
// Usar raw query para el esquema del tenant
const existing = await prisma.$queryRawUnsafe<{ id: string }[]>(
`SELECT id FROM "${schemaName}".cfdis WHERE uuid_fiscal = $1`,
cfdi.uuidFiscal
);
if (existing.length > 0) {
// Actualizar CFDI existente
await prisma.$executeRawUnsafe(
`UPDATE "${schemaName}".cfdis SET
tipo = $2,
serie = $3,
folio = $4,
fecha_emision = $5,
fecha_timbrado = $6,
rfc_emisor = $7,
nombre_emisor = $8,
rfc_receptor = $9,
nombre_receptor = $10,
subtotal = $11,
descuento = $12,
iva = $13,
isr_retenido = $14,
iva_retenido = $15,
total = $16,
moneda = $17,
tipo_cambio = $18,
metodo_pago = $19,
forma_pago = $20,
uso_cfdi = $21,
estado = $22,
xml_original = $23,
last_sat_sync = NOW(),
sat_sync_job_id = $24,
updated_at = NOW()
WHERE uuid_fiscal = $1`,
cfdi.uuidFiscal,
cfdi.tipo,
cfdi.serie,
cfdi.folio,
cfdi.fechaEmision,
cfdi.fechaTimbrado,
cfdi.rfcEmisor,
cfdi.nombreEmisor,
cfdi.rfcReceptor,
cfdi.nombreReceptor,
cfdi.subtotal,
cfdi.descuento,
cfdi.iva,
cfdi.isrRetenido,
cfdi.ivaRetenido,
cfdi.total,
cfdi.moneda,
cfdi.tipoCambio,
cfdi.metodoPago,
cfdi.formaPago,
cfdi.usoCfdi,
cfdi.estado,
cfdi.xmlOriginal,
jobId
);
updated++;
} else {
// Insertar nuevo CFDI
await prisma.$executeRawUnsafe(
`INSERT INTO "${schemaName}".cfdis (
id, uuid_fiscal, tipo, serie, folio, fecha_emision, fecha_timbrado,
rfc_emisor, nombre_emisor, rfc_receptor, nombre_receptor,
subtotal, descuento, iva, isr_retenido, iva_retenido, total,
moneda, tipo_cambio, metodo_pago, forma_pago, uso_cfdi, estado,
xml_original, source, sat_sync_job_id, last_sat_sync, created_at
) VALUES (
gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22,
$23, 'sat', $24, NOW(), NOW()
)`,
cfdi.uuidFiscal,
cfdi.tipo,
cfdi.serie,
cfdi.folio,
cfdi.fechaEmision,
cfdi.fechaTimbrado,
cfdi.rfcEmisor,
cfdi.nombreEmisor,
cfdi.rfcReceptor,
cfdi.nombreReceptor,
cfdi.subtotal,
cfdi.descuento,
cfdi.iva,
cfdi.isrRetenido,
cfdi.ivaRetenido,
cfdi.total,
cfdi.moneda,
cfdi.tipoCambio,
cfdi.metodoPago,
cfdi.formaPago,
cfdi.usoCfdi,
cfdi.estado,
cfdi.xmlOriginal,
jobId
);
inserted++;
}
} catch (error) {
console.error(`[SAT] Error guardando CFDI ${cfdi.uuidFiscal}:`, error);
}
}
return { inserted, updated };
}
/**
* Procesa una solicitud de descarga para un rango de fechas
*/
async function processDateRange(
ctx: SyncContext,
jobId: string,
fechaInicio: Date,
fechaFin: Date,
tipoCfdi: CfdiSyncType
): Promise<{ found: number; downloaded: number; inserted: number; updated: number }> {
console.log(`[SAT] Procesando ${tipoCfdi} desde ${fechaInicio.toISOString()} hasta ${fechaFin.toISOString()}`);
await ensureValidToken(ctx);
// 1. Solicitar descarga
const requestResponse = await requestDownload({
credential: ctx.credential,
token: ctx.token,
rfc: ctx.rfc,
fechaInicio,
fechaFin,
tipoSolicitud: 'CFDI',
tipoCfdi,
});
if (requestResponse.codEstatus !== '5000') {
if (requestResponse.codEstatus === '5004') {
console.log('[SAT] No se encontraron CFDIs en el rango');
return { found: 0, downloaded: 0, inserted: 0, updated: 0 };
}
throw new Error(`Error SAT: ${requestResponse.codEstatus} - ${requestResponse.mensaje}`);
}
const idSolicitud = requestResponse.idSolicitud;
console.log(`[SAT] Solicitud creada: ${idSolicitud}`);
await updateJobProgress(jobId, { satRequestId: idSolicitud });
// 2. Esperar y verificar solicitud
let verifyResponse;
let attempts = 0;
while (attempts < MAX_POLL_ATTEMPTS) {
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS));
attempts++;
await ensureValidToken(ctx);
verifyResponse = await verifyRequest(ctx.credential, ctx.token, ctx.rfc, idSolicitud);
console.log(`[SAT] Estado solicitud: ${verifyResponse.estadoSolicitud} (intento ${attempts})`);
if (isRequestComplete(verifyResponse.estadoSolicitud)) {
break;
}
if (isRequestFailed(verifyResponse.estadoSolicitud)) {
throw new Error(`Solicitud fallida: ${verifyResponse.mensaje}`);
}
}
if (!verifyResponse || !isRequestComplete(verifyResponse.estadoSolicitud)) {
throw new Error('Timeout esperando respuesta del SAT');
}
// 3. Descargar paquetes
const packageIds = verifyResponse.paquetes;
await updateJobProgress(jobId, {
satPackageIds: packageIds,
cfdisFound: verifyResponse.numeroCfdis,
});
let totalInserted = 0;
let totalUpdated = 0;
let totalDownloaded = 0;
for (let i = 0; i < packageIds.length; i++) {
const packageId = packageIds[i];
console.log(`[SAT] Descargando paquete ${i + 1}/${packageIds.length}: ${packageId}`);
await ensureValidToken(ctx);
const packageResponse = await downloadPackage(ctx.credential, ctx.token, ctx.rfc, packageId);
// 4. Procesar paquete
const cfdis = processPackage(packageResponse.paquete);
totalDownloaded += cfdis.length;
console.log(`[SAT] Procesando ${cfdis.length} CFDIs del paquete`);
const { inserted, updated } = await saveCfdis(ctx.schemaName, cfdis, jobId);
totalInserted += inserted;
totalUpdated += updated;
// Actualizar progreso
const progress = Math.round(((i + 1) / packageIds.length) * 100);
await updateJobProgress(jobId, {
cfdisDownloaded: totalDownloaded,
cfdisInserted: totalInserted,
cfdisUpdated: totalUpdated,
progressPercent: progress,
});
}
return {
found: verifyResponse.numeroCfdis,
downloaded: totalDownloaded,
inserted: totalInserted,
updated: totalUpdated,
};
}
/**
* Ejecuta sincronización inicial (últimos 10 años)
*/
async function processInitialSync(ctx: SyncContext, jobId: string): Promise<void> {
const ahora = new Date();
const inicioHistorico = new Date(ahora.getFullYear() - YEARS_TO_SYNC, ahora.getMonth(), 1);
let totalFound = 0;
let totalDownloaded = 0;
let totalInserted = 0;
let totalUpdated = 0;
// Procesar por meses para evitar límites del SAT
let currentDate = new Date(inicioHistorico);
while (currentDate < ahora) {
const monthEnd = new Date(currentDate.getFullYear(), currentDate.getMonth() + 1, 0, 23, 59, 59);
const rangeEnd = monthEnd > ahora ? ahora : monthEnd;
// Procesar emitidos
try {
const emitidos = await processDateRange(ctx, jobId, currentDate, rangeEnd, 'emitidos');
totalFound += emitidos.found;
totalDownloaded += emitidos.downloaded;
totalInserted += emitidos.inserted;
totalUpdated += emitidos.updated;
} catch (error: any) {
console.error(`[SAT] Error procesando emitidos ${currentDate.toISOString()}:`, error.message);
}
// Procesar recibidos
try {
const recibidos = await processDateRange(ctx, jobId, currentDate, rangeEnd, 'recibidos');
totalFound += recibidos.found;
totalDownloaded += recibidos.downloaded;
totalInserted += recibidos.inserted;
totalUpdated += recibidos.updated;
} catch (error: any) {
console.error(`[SAT] Error procesando recibidos ${currentDate.toISOString()}:`, error.message);
}
// Siguiente mes
currentDate = new Date(currentDate.getFullYear(), currentDate.getMonth() + 1, 1);
// Pequeña pausa entre meses para no saturar el SAT
await new Promise(resolve => setTimeout(resolve, 5000));
}
await updateJobProgress(jobId, {
cfdisFound: totalFound,
cfdisDownloaded: totalDownloaded,
cfdisInserted: totalInserted,
cfdisUpdated: totalUpdated,
});
}
/**
* Ejecuta sincronización diaria (mes actual)
*/
async function processDailySync(ctx: SyncContext, jobId: string): Promise<void> {
const ahora = new Date();
const inicioMes = new Date(ahora.getFullYear(), ahora.getMonth(), 1);
let totalFound = 0;
let totalDownloaded = 0;
let totalInserted = 0;
let totalUpdated = 0;
// Procesar emitidos del mes
try {
const emitidos = await processDateRange(ctx, jobId, inicioMes, ahora, 'emitidos');
totalFound += emitidos.found;
totalDownloaded += emitidos.downloaded;
totalInserted += emitidos.inserted;
totalUpdated += emitidos.updated;
} catch (error: any) {
console.error('[SAT] Error procesando emitidos:', error.message);
}
// Procesar recibidos del mes
try {
const recibidos = await processDateRange(ctx, jobId, inicioMes, ahora, 'recibidos');
totalFound += recibidos.found;
totalDownloaded += recibidos.downloaded;
totalInserted += recibidos.inserted;
totalUpdated += recibidos.updated;
} catch (error: any) {
console.error('[SAT] Error procesando recibidos:', error.message);
}
await updateJobProgress(jobId, {
cfdisFound: totalFound,
cfdisDownloaded: totalDownloaded,
cfdisInserted: totalInserted,
cfdisUpdated: totalUpdated,
});
}
/**
* Inicia la sincronización con el SAT
*/
export async function startSync(
tenantId: string,
type: SatSyncType = 'daily',
dateFrom?: Date,
dateTo?: Date
): Promise<string> {
// Obtener credenciales FIEL
const fielData = await getDecryptedFiel(tenantId);
if (!fielData) {
throw new Error('No hay FIEL configurada o está vencida');
}
// Obtener datos del tenant
const tenant = await prisma.tenant.findUnique({
where: { id: tenantId },
select: { schemaName: true },
});
if (!tenant) {
throw new Error('Tenant no encontrado');
}
// Verificar que no haya sync activo
const activeSync = await prisma.satSyncJob.findFirst({
where: {
tenantId,
status: { in: ['pending', 'running'] },
},
});
if (activeSync) {
throw new Error('Ya hay una sincronización en curso');
}
// Crear job
const now = new Date();
const job = await prisma.satSyncJob.create({
data: {
tenantId,
type,
status: 'running',
dateFrom: dateFrom || new Date(now.getFullYear() - YEARS_TO_SYNC, 0, 1),
dateTo: dateTo || now,
startedAt: now,
},
});
// Autenticar con SAT
const tokenData = await authenticate(fielData.credential);
const ctx: SyncContext = {
credential: fielData.credential,
token: tokenData.token,
tokenExpiresAt: tokenData.expiresAt,
rfc: fielData.rfc,
tenantId,
schemaName: tenant.schemaName,
};
// Ejecutar sincronización en background
(async () => {
try {
if (type === 'initial') {
await processInitialSync(ctx, job.id);
} else {
await processDailySync(ctx, job.id);
}
await updateJobProgress(job.id, {
status: 'completed',
completedAt: new Date(),
progressPercent: 100,
});
console.log(`[SAT] Sincronización ${job.id} completada`);
} catch (error: any) {
console.error(`[SAT] Error en sincronización ${job.id}:`, error);
await updateJobProgress(job.id, {
status: 'failed',
errorMessage: error.message,
completedAt: new Date(),
});
}
})();
return job.id;
}
/**
* Obtiene el estado actual de sincronización de un tenant
*/
export async function getSyncStatus(tenantId: string): Promise<{
hasActiveSync: boolean;
currentJob?: SatSyncJob;
lastCompletedJob?: SatSyncJob;
totalCfdisSynced: number;
}> {
const activeJob = await prisma.satSyncJob.findFirst({
where: {
tenantId,
status: { in: ['pending', 'running'] },
},
orderBy: { createdAt: 'desc' },
});
const lastCompleted = await prisma.satSyncJob.findFirst({
where: {
tenantId,
status: 'completed',
},
orderBy: { completedAt: 'desc' },
});
const totals = await prisma.satSyncJob.aggregate({
where: {
tenantId,
status: 'completed',
},
_sum: {
cfdisInserted: true,
},
});
const mapJob = (job: any): SatSyncJob => ({
id: job.id,
tenantId: job.tenantId,
type: job.type,
status: job.status,
dateFrom: job.dateFrom.toISOString(),
dateTo: job.dateTo.toISOString(),
cfdiType: job.cfdiType ?? undefined,
satRequestId: job.satRequestId ?? undefined,
satPackageIds: job.satPackageIds,
cfdisFound: job.cfdisFound,
cfdisDownloaded: job.cfdisDownloaded,
cfdisInserted: job.cfdisInserted,
cfdisUpdated: job.cfdisUpdated,
progressPercent: job.progressPercent,
errorMessage: job.errorMessage ?? undefined,
startedAt: job.startedAt?.toISOString(),
completedAt: job.completedAt?.toISOString(),
createdAt: job.createdAt.toISOString(),
retryCount: job.retryCount,
});
return {
hasActiveSync: !!activeJob,
currentJob: activeJob ? mapJob(activeJob) : undefined,
lastCompletedJob: lastCompleted ? mapJob(lastCompleted) : undefined,
totalCfdisSynced: totals._sum.cfdisInserted || 0,
};
}
/**
* Obtiene el historial de sincronizaciones
*/
export async function getSyncHistory(
tenantId: string,
page: number = 1,
limit: number = 10
): Promise<{ jobs: SatSyncJob[]; total: number }> {
const [jobs, total] = await Promise.all([
prisma.satSyncJob.findMany({
where: { tenantId },
orderBy: { createdAt: 'desc' },
skip: (page - 1) * limit,
take: limit,
}),
prisma.satSyncJob.count({ where: { tenantId } }),
]);
return {
jobs: jobs.map(job => ({
id: job.id,
tenantId: job.tenantId,
type: job.type,
status: job.status,
dateFrom: job.dateFrom.toISOString(),
dateTo: job.dateTo.toISOString(),
cfdiType: job.cfdiType ?? undefined,
satRequestId: job.satRequestId ?? undefined,
satPackageIds: job.satPackageIds,
cfdisFound: job.cfdisFound,
cfdisDownloaded: job.cfdisDownloaded,
cfdisInserted: job.cfdisInserted,
cfdisUpdated: job.cfdisUpdated,
progressPercent: job.progressPercent,
errorMessage: job.errorMessage ?? undefined,
startedAt: job.startedAt?.toISOString(),
completedAt: job.completedAt?.toISOString(),
createdAt: job.createdAt.toISOString(),
retryCount: job.retryCount,
})),
total,
};
}
/**
* Reintenta un job fallido
*/
export async function retryJob(jobId: string): Promise<string> {
const job = await prisma.satSyncJob.findUnique({
where: { id: jobId },
});
if (!job) {
throw new Error('Job no encontrado');
}
if (job.status !== 'failed') {
throw new Error('Solo se pueden reintentar jobs fallidos');
}
return startSync(job.tenantId, job.type, job.dateFrom, job.dateTo);
}