diff --git a/apps/api/src/jobs/sat-sync.job.ts b/apps/api/src/jobs/sat-sync.job.ts index 8e26cca..d0afbc5 100644 --- a/apps/api/src/jobs/sat-sync.job.ts +++ b/apps/api/src/jobs/sat-sync.job.ts @@ -1,15 +1,98 @@ import cron from 'node-cron'; import { prisma } from '../config/database.js'; -import { startSync, getSyncStatus } from '../services/sat/sat.service.js'; +import { startSync, getSyncStatus, resumeSatSync } from '../services/sat/sat.service.js'; import { hasFielConfigured } from '../services/fiel.service.js'; const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días const FREQUENT_SYNC_SCHEDULE = '0 */6 * * *'; // Cada 6 horas (00:00, 06:00, 12:00, 18:00) const FREQUENT_SYNC_RFCS = ['ROEM691011EZ4']; // Tenants con sync frecuente const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas +const ORPHANED_SYNC_THRESHOLD_MINUTES = 240; // 4 horas — da tiempo al SAT de procesar antes de retomar let isRunning = false; +/** + * Watchdog: marca como failed los jobs 'running' que exceden el umbral de tiempo. + * Para syncs 'daily', después de marcarlos como failed intenta lanzar un nuevo job + * automáticamente para ver si el SAT ya terminó de procesar la información. + * Esto evita que un job huérfano (timeout del script o reinicio del servidor) + * bloquee sincronizaciones futuras para ese tenant. + */ +async function cleanupOrphanedSyncs(): Promise { + const threshold = new Date(Date.now() - ORPHANED_SYNC_THRESHOLD_MINUTES * 60 * 1000); + + const orphaned = await prisma.satSyncJob.findMany({ + where: { + status: 'running', + startedAt: { lt: threshold }, + }, + select: { id: true, tenantId: true, type: true, startedAt: true, satRequestId: true }, + }); + + if (orphaned.length === 0) return 0; + + for (const job of orphaned) { + await prisma.satSyncJob.update({ + where: { id: job.id }, + data: { + status: 'failed', + errorMessage: `Watchdog: abandoned job detected (started ${job.startedAt?.toISOString() ?? 'unknown'}, threshold ${ORPHANED_SYNC_THRESHOLD_MINUTES} min)`, + completedAt: new Date(), + }, + }); + const startedStr = job.startedAt ? job.startedAt.toISOString() : 'unknown'; + console.log( + `[SAT Watchdog] Job ${job.id} (tenant ${job.tenantId}) marcado como failed por exceder ${ORPHANED_SYNC_THRESHOLD_MINUTES} min en running (started: ${startedStr})` + ); + + // Intentar retomar el satRequestId existente antes de crear uno nuevo. + // Esto evita desperdiciar consultas al SAT cuando la solicitud original + // ya fue aceptada y solo estaba esperando procesamiento. + if (job.satRequestId) { + try { + console.log(`[SAT Watchdog] Intentando retomar job ${job.id} con satRequestId ${job.satRequestId}`); + await resumeSatSync(job.id); + console.log(`[SAT Watchdog] Job ${job.id} retomado exitosamente`); + continue; // Éxito, no necesitamos crear un nuevo job + } catch (resumeError: any) { + console.error(`[SAT Watchdog] No se pudo retomar job ${job.id}:`, resumeError.message); + // Fall-through: marcar como failed y lanzar nuevo job + } + } + + // Fallback: crear un nuevo job si no se pudo retomar + if (job.type === 'daily') { + try { + const hasFiel = await hasFielConfigured(job.tenantId); + if (!hasFiel) { + console.log(`[SAT Watchdog] Tenant ${job.tenantId} sin FIEL, omitiendo auto-retry`); + continue; + } + + const activeSync = await prisma.satSyncJob.findFirst({ + where: { + tenantId: job.tenantId, + status: { in: ['pending', 'running'] }, + }, + }); + + if (activeSync) { + console.log(`[SAT Watchdog] Tenant ${job.tenantId} ya tiene sync activo (${activeSync.id}), omitiendo auto-retry`); + continue; + } + + console.log(`[SAT Watchdog] Auto-retry: lanzando nuevo daily sync para tenant ${job.tenantId}`); + const newJobId = await startSync(job.tenantId, 'daily'); + console.log(`[SAT Watchdog] Auto-retry exitoso: nuevo job ${newJobId} para tenant ${job.tenantId}`); + } catch (retryError: any) { + console.error(`[SAT Watchdog] Auto-retry falló para tenant ${job.tenantId}:`, retryError.message); + } + } + } + + return orphaned.length; +} + /** * Obtiene los tenants que tienen FIEL configurada y activa */ @@ -51,6 +134,9 @@ async function needsInitialSync(tenantId: string): Promise { */ async function syncTenant(tenantId: string): Promise { try { + // Limpieza de huérfanos antes de verificar (protección por si el cleanup global falló) + await cleanupOrphanedSyncs(); + // Verificar si hay sync activo const status = await getSyncStatus(tenantId); if (status.hasActiveSync) { @@ -83,6 +169,12 @@ async function runSyncJob(): Promise { console.log('[SAT Cron] Iniciando job de sincronización diaria'); try { + // Watchdog: limpiar jobs huérfanos antes de iniciar nuevos syncs + const cleaned = await cleanupOrphanedSyncs(); + if (cleaned > 0) { + console.log(`[SAT Cron] Watchdog limpió ${cleaned} job(s) huérfano(s)`); + } + const tenantIds = await getTenantsWithFiel(); console.log(`[SAT Cron] ${tenantIds.length} tenants con FIEL configurada`); @@ -117,6 +209,12 @@ async function runFrequentSyncJob(): Promise { console.log('[SAT Cron] Iniciando sync frecuente'); try { + // Watchdog: limpiar jobs huérfanos antes de iniciar nuevos syncs + const cleaned = await cleanupOrphanedSyncs(); + if (cleaned > 0) { + console.log(`[SAT Cron] Watchdog limpió ${cleaned} job(s) huérfano(s)`); + } + const tenants = await prisma.tenant.findMany({ where: { active: true, diff --git a/apps/api/src/services/impuestos.service.ts b/apps/api/src/services/impuestos.service.ts index 32ae2e0..e67e591 100644 --- a/apps/api/src/services/impuestos.service.ts +++ b/apps/api/src/services/impuestos.service.ts @@ -1,6 +1,32 @@ import type { Pool } from 'pg'; import type { IvaMensual, IsrMensual, ResumenIva, ResumenIsr } from '@horux/shared'; +/** + * Obtiene el RFC del contribuyente principal del tenant + */ +async function getContribuyenteRfc(pool: Pool): Promise { + try { + const { rows } = await pool.query(` + SELECT c.rfc + FROM contribuyentes c + JOIN entidades_gestionadas eg ON c.entidad_id = eg.id + LIMIT 1 + `); + return rows[0]?.rfc || null; + } catch { + return null; + } +} + +/** + * Construye el rango de fechas para un año/mes (acumulado año a la fecha) + */ +function getDateRange(año: number, mes: number): { inicio: Date; fin: Date } { + const inicio = new Date(año, 0, 1, 0, 0, 0); // 1 de enero + const fin = new Date(año, mes, 0, 23, 59, 59); // Último día del mes + return { inicio, fin }; +} + export async function getIvaMensual(pool: Pool, año: number): Promise { const { rows: data } = await pool.query(` SELECT @@ -47,13 +73,15 @@ export async function getResumenIva(pool: Pool, año: number, mes: number): Prom }; } + // Fallback: calcular sobre la marcha desde cfdis + // Corregido: usar type/status y iva_traslado_mxn en lugar de tipo/estado/iva const { rows: [calcResult] } = await pool.query(` SELECT - COALESCE(SUM(CASE WHEN tipo = 'ingreso' THEN iva ELSE 0 END), 0) as trasladado, - COALESCE(SUM(CASE WHEN tipo = 'egreso' THEN iva ELSE 0 END), 0) as acreditable, - COALESCE(SUM(iva_retenido), 0) as retenido + COALESCE(SUM(CASE WHEN type = 'EMITIDO' AND tipo_comprobante = 'I' THEN iva_traslado_mxn ELSE 0 END), 0) as trasladado, + COALESCE(SUM(CASE WHEN type = 'RECIBIDO' AND tipo_comprobante = 'I' THEN iva_traslado_mxn ELSE 0 END), 0) as acreditable, + COALESCE(SUM(iva_retencion_mxn), 0) as retenido FROM cfdis - WHERE estado = 'vigente' + WHERE status = 'Vigente' AND EXTRACT(YEAR FROM fecha_emision) = $1 AND EXTRACT(MONTH FROM fecha_emision) = $2 `, [año, mes]); @@ -105,41 +133,180 @@ export async function getIsrMensual(pool: Pool, año: number): Promise { - const { rows: [ingresos] } = await pool.query(` - SELECT COALESCE(SUM(total), 0) as total + const rfc = await getContribuyenteRfc(pool); + if (!rfc) { + console.error('[ISR] No se encontró RFC del contribuyente'); + return { + ingresosAcumulados: 0, + deducciones: 0, + baseGravable: 0, + isrCausado: 0, + isrRetenido: 0, + isrAPagar: 0, + }; + } + + const { inicio, fin } = getDateRange(año, mes); + + // Ingresos Bucket A: Facturas I PUE emitidas (ingresos de contado) + const { rows: ingresosContadoRows } = await pool.query(` + SELECT + COALESCE(regimen_fiscal_emisor, 'SIN') as regimen, + SUM( + total_mxn + - COALESCE(iva_traslado_mxn, 0) + - COALESCE(ieps_traslado_mxn, 0) + - COALESCE(impuestos_locales_trasladado_mxn, 0) + - COALESCE(descuento_mxn, 0) + ) as total FROM cfdis - WHERE tipo = 'ingreso' AND estado = 'vigente' - AND EXTRACT(YEAR FROM fecha_emision) = $1 - AND EXTRACT(MONTH FROM fecha_emision) <= $2 - `, [año, mes]); + WHERE rfc_emisor = $1 + AND tipo_comprobante = 'I' + AND metodo_pago = 'PUE' + AND status NOT IN ('Cancelado', '0') + AND fecha_emision >= $2 AND fecha_emision <= $3 + GROUP BY COALESCE(regimen_fiscal_emisor, 'SIN') + `, [rfc, inicio, fin]); - const { rows: [egresos] } = await pool.query(` - SELECT COALESCE(SUM(total), 0) as total + // Ingresos Bucket B: Pagos P recibidos (ingresos a crédito cobrados) + const { rows: ingresosCreditoRows } = await pool.query(` + SELECT + COALESCE(regimen_fiscal_receptor, 'SIN') as regimen, + SUM( + monto_pago_mxn + - COALESCE(iva_traslado_pago_mxn, 0) + - COALESCE(ieps_traslado_pago_mxn, 0) + - COALESCE(impuestos_locales_trasladado_mxn, 0) + ) as total FROM cfdis - WHERE tipo = 'egreso' AND estado = 'vigente' - AND EXTRACT(YEAR FROM fecha_emision) = $1 - AND EXTRACT(MONTH FROM fecha_emision) <= $2 - `, [año, mes]); + WHERE rfc_receptor = $1 + AND tipo_comprobante = 'P' + AND status NOT IN ('Cancelado', '0') + AND fecha_pago_p >= $2 AND fecha_pago_p <= $3 + GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN') + `, [rfc, inicio, fin]); - const { rows: [retenido] } = await pool.query(` - SELECT COALESCE(SUM(isr_retenido), 0) as total + // Bucket 1: Deducciones - Facturas I PUE recibidas + const { rows: bucket1Rows } = await pool.query(` + SELECT + COALESCE(regimen_fiscal_receptor, 'SIN') as regimen, + SUM( + total_mxn + - COALESCE(iva_traslado_mxn, 0) + - COALESCE(ieps_traslado_mxn, 0) + - COALESCE(impuestos_locales_trasladado_mxn, 0) + - COALESCE(descuento_mxn, 0) + ) as total FROM cfdis - WHERE estado = 'vigente' - AND EXTRACT(YEAR FROM fecha_emision) = $1 - AND EXTRACT(MONTH FROM fecha_emision) <= $2 - `, [año, mes]); + WHERE rfc_receptor = $1 + AND tipo_comprobante = 'I' + AND metodo_pago = 'PUE' + AND status NOT IN ('Cancelado', '0') + AND fecha_emision >= $2 AND fecha_emision <= $3 + GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN') + `, [rfc, inicio, fin]); - const ingresosAcumulados = Number(ingresos?.total || 0); - const deducciones = Number(egresos?.total || 0); - const baseGravable = Math.max(0, ingresosAcumulados - deducciones); + // Bucket 2: Deducciones - Pagos P emitidos (pagos a crédito) + const { rows: bucket2Rows } = await pool.query(` + SELECT + COALESCE(regimen_fiscal_emisor, 'SIN') as regimen, + SUM( + monto_pago_mxn + - COALESCE(iva_traslado_pago_mxn, 0) + - COALESCE(ieps_traslado_pago_mxn, 0) + - COALESCE(impuestos_locales_trasladado_mxn, 0) + ) as total + FROM cfdis + WHERE rfc_emisor = $1 + AND tipo_comprobante = 'P' + AND status NOT IN ('Cancelado', '0') + AND fecha_pago_p >= $2 AND fecha_pago_p <= $3 + GROUP BY COALESCE(regimen_fiscal_emisor, 'SIN') + `, [rfc, inicio, fin]); + // Bucket 3: Deducciones - Notas de crédito E PUE recibidas (resta) + const { rows: bucket3Rows } = await pool.query(` + SELECT + COALESCE(regimen_fiscal_receptor, 'SIN') as regimen, + SUM( + total_mxn + - COALESCE(iva_traslado_mxn, 0) + - COALESCE(ieps_traslado_mxn, 0) + - COALESCE(impuestos_locales_trasladado_mxn, 0) + - COALESCE(descuento_mxn, 0) + ) as total + FROM cfdis + WHERE rfc_receptor = $1 + AND tipo_comprobante = 'E' + AND metodo_pago = 'PUE' + AND status NOT IN ('Cancelado', '0') + AND fecha_emision >= $2 AND fecha_emision <= $3 + GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN') + `, [rfc, inicio, fin]); + + // ISR retenido (acumulado año a la fecha) + const { rows: [retenidoRow] } = await pool.query(` + SELECT COALESCE(SUM(isr_retencion_mxn), 0) as total + FROM cfdis + WHERE (rfc_emisor = $1 OR rfc_receptor = $1) + AND status NOT IN ('Cancelado', '0') + AND fecha_emision >= $2 AND fecha_emision <= $3 + `, [rfc, inicio, fin]); + + // Acumular totales por régimen + const porRegimen = new Map(); + + for (const row of ingresosContadoRows) { + const r = row.regimen as string; + const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 }; + actual.ingresos += Number(row.total || 0); + porRegimen.set(r, actual); + } + + for (const row of ingresosCreditoRows) { + const r = row.regimen as string; + const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 }; + actual.ingresos += Number(row.total || 0); + porRegimen.set(r, actual); + } + + for (const row of bucket1Rows) { + const r = row.regimen as string; + const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 }; + actual.deducciones += Number(row.total || 0); + porRegimen.set(r, actual); + } + + for (const row of bucket2Rows) { + const r = row.regimen as string; + const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 }; + actual.deducciones += Number(row.total || 0); + porRegimen.set(r, actual); + } + + for (const row of bucket3Rows) { + const r = row.regimen as string; + const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 }; + actual.deducciones -= Number(row.total || 0); + porRegimen.set(r, actual); + } + + let ingresosAcumulados = 0; + let deduccionesTotales = 0; + + for (const [, valores] of porRegimen) { + ingresosAcumulados += Math.max(0, valores.ingresos); + deduccionesTotales += Math.max(0, valores.deducciones); + } + + const baseGravable = Math.max(0, ingresosAcumulados - deduccionesTotales); const isrCausado = baseGravable * 0.30; - const isrRetenido = Number(retenido?.total || 0); + const isrRetenido = Number(retenidoRow?.total || 0); const isrAPagar = Math.max(0, isrCausado - isrRetenido); return { ingresosAcumulados, - deducciones, + deducciones: deduccionesTotales, baseGravable, isrCausado, isrRetenido, diff --git a/apps/api/src/services/metabase.service.ts b/apps/api/src/services/metabase.service.ts new file mode 100644 index 0000000..abbf6f4 --- /dev/null +++ b/apps/api/src/services/metabase.service.ts @@ -0,0 +1,167 @@ +/** + * Metabase integration service. + * Automatically registers newly-provisioned tenant databases in Metabase. + */ + +const METABASE_URL = process.env.METABASE_URL || 'http://192.168.10.170:3000'; +const METABASE_USERNAME = process.env.METABASE_USERNAME || 'ialcarazsalazar@consultoria-as.com'; +const METABASE_PASSWORD = process.env.METABASE_PASSWORD || ''; + +// PostgreSQL connection details exposed to Metabase +const PG_HOST = process.env.METABASE_PG_HOST || '192.168.10.90'; +const PG_PORT = parseInt(process.env.METABASE_PG_PORT || '5432', 10); +const PG_USER = process.env.METABASE_PG_USER || 'postgres'; +const PG_PASSWORD = process.env.METABASE_PG_PASSWORD || ''; + +let cachedSessionToken: string | null = null; +let tokenExpiresAt = 0; + +async function getSessionToken(): Promise { + // Re-use cached token if still valid (Metabase sessions last 2 weeks by default) + if (cachedSessionToken && Date.now() < tokenExpiresAt) { + return cachedSessionToken; + } + + if (!METABASE_PASSWORD) { + console.error('[METABASE] METABASE_PASSWORD not configured'); + return null; + } + + try { + const res = await fetch(`${METABASE_URL}/api/session`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + username: METABASE_USERNAME, + password: METABASE_PASSWORD, + }), + }); + + if (!res.ok) { + const text = await res.text(); + console.error(`[METABASE] Auth failed: ${res.status} ${text}`); + return null; + } + + const data = await res.json() as { id?: string }; + if (!data.id) { + console.error('[METABASE] Auth response missing session id'); + return null; + } + + cachedSessionToken = data.id; + tokenExpiresAt = Date.now() + 13 * 24 * 60 * 60 * 1000; // 13 days + return cachedSessionToken; + } catch (err) { + console.error('[METABASE] Error fetching session token:', err); + return null; + } +} + +interface RegisterDatabaseInput { + nombre: string; + dbName: string; +} + +export async function registerDatabase(input: RegisterDatabaseInput): Promise { + const sessionToken = await getSessionToken(); + if (!sessionToken) { + console.error('[METABASE] Skipping database registration — no session token'); + return; + } + + if (!PG_PASSWORD) { + console.error('[METABASE] METABASE_PG_PASSWORD not configured'); + return; + } + + const payload = { + name: input.nombre, + engine: 'postgres', + details: { + host: PG_HOST, + port: PG_PORT, + dbname: input.dbName, + user: PG_USER, + password: PG_PASSWORD, + ssl: false, + 'tunnel-enabled': false, + 'advanced-options': false, + 'schema-filters-type': 'all', + }, + auto_run_queries: true, + is_full_sync: true, + is_on_demand: false, + }; + + try { + const res = await fetch(`${METABASE_URL}/api/database`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Metabase-Session': sessionToken, + }, + body: JSON.stringify(payload), + }); + + if (!res.ok) { + const text = await res.text(); + // 409 or duplicate name is not fatal — log and continue + if (res.status === 400 && text.includes('already exists')) { + console.log(`[METABASE] Database "${input.nombre}" already registered`); + return; + } + console.error(`[METABASE] Register database failed: ${res.status} ${text}`); + return; + } + + const data = await res.json() as { id?: number }; + console.log(`[METABASE] Database "${input.nombre}" registered with id=${data.id}`); + } catch (err) { + console.error('[METABASE] Error registering database:', err); + } +} + +export async function deleteDatabase(databaseName: string): Promise { + const sessionToken = await getSessionToken(); + if (!sessionToken) { + console.error('[METABASE] Skipping database deletion — no session token'); + return; + } + + try { + // Find database by name + const listRes = await fetch(`${METABASE_URL}/api/database`, { + headers: { 'X-Metabase-Session': sessionToken }, + }); + + if (!listRes.ok) { + console.error(`[METABASE] Failed to list databases: ${listRes.status}`); + return; + } + + const listData = await listRes.json() as { data?: Array<{ id: number; name: string; details?: { dbname?: string } }> }; + const db = listData.data?.find( + (d) => d.details?.dbname === databaseName || d.name.includes(databaseName) + ); + + if (!db) { + console.log(`[METABASE] No database found for ${databaseName}`); + return; + } + + const deleteRes = await fetch(`${METABASE_URL}/api/database/${db.id}`, { + method: 'DELETE', + headers: { 'X-Metabase-Session': sessionToken }, + }); + + if (!deleteRes.ok) { + console.error(`[METABASE] Delete database failed: ${deleteRes.status}`); + return; + } + + console.log(`[METABASE] Database ${db.id} (${databaseName}) deleted`); + } catch (err) { + console.error('[METABASE] Error deleting database:', err); + } +} diff --git a/apps/api/src/services/sat/sat.service.ts b/apps/api/src/services/sat/sat.service.ts index df5198b..f30009d 100644 --- a/apps/api/src/services/sat/sat.service.ts +++ b/apps/api/src/services/sat/sat.service.ts @@ -417,6 +417,105 @@ async function processDailySync(ctx: SyncContext, jobId: string): Promise }); } +/** + * Retoma una sincronización huérfana usando el satRequestId existente. + * Esto evita desperdiciar una consulta al SAT cuando el job anterior + * murió durante el polling (timeout del script o reinicio del servidor). + */ +export async function resumeSatSync(jobId: string): Promise { + const job = await prisma.satSyncJob.findUnique({ where: { id: jobId } }); + if (!job) throw new Error('Job no encontrado'); + if (!job.satRequestId) throw new Error('Job no tiene satRequestId, no se puede retomar'); + if (job.status !== 'running') throw new Error(`Job está en estado ${job.status}, no se puede retomar`); + + const decryptedFiel = await getDecryptedFiel(job.tenantId); + if (!decryptedFiel) { + throw new Error('No hay FIEL configurada o está vencida'); + } + + const fielData: FielData = { + cerContent: decryptedFiel.cerContent, + keyContent: decryptedFiel.keyContent, + password: decryptedFiel.password, + }; + + const service = createSatService(fielData); + + const tenant = await prisma.tenant.findUnique({ + where: { id: job.tenantId }, + select: { databaseName: true }, + }); + + if (!tenant) throw new Error('Tenant no encontrado'); + + const ctx: SyncContext = { + fielData, + service, + rfc: decryptedFiel.rfc, + tenantId: job.tenantId, + databaseName: tenant.databaseName, + }; + + console.log(`[SAT Resume] Verificando solicitud ${job.satRequestId} para job ${jobId}`); + + const verifyResult = await verifySatRequest(service, job.satRequestId); + console.log(`[SAT Resume] Estado: ${verifyResult.status} | CFDIs: ${verifyResult.totalCfdis} | Paquetes: ${verifyResult.packageIds.length}`); + + if (verifyResult.status === 'ready') { + const packageIds = verifyResult.packageIds; + await updateJobProgress(jobId, { + satPackageIds: packageIds, + cfdisFound: verifyResult.totalCfdis, + }); + + let totalInserted = 0; + let totalUpdated = 0; + let totalDownloaded = 0; + + for (let i = 0; i < packageIds.length; i++) { + const packageId = packageIds[i]; + console.log(`[SAT Resume] Descargando paquete ${i + 1}/${packageIds.length}: ${packageId}`); + + const downloadResult = await downloadSatPackage(service, packageId); + if (!downloadResult.success) { + console.error(`[SAT Resume] Error descargando paquete ${packageId}: ${downloadResult.message}`); + continue; + } + + const cfdis = processPackage(downloadResult.packageContent); + totalDownloaded += cfdis.length; + + const { inserted, updated } = await saveCfdis(ctx, cfdis, jobId); + totalInserted += inserted; + totalUpdated += updated; + + const progress = Math.round(((i + 1) / packageIds.length) * 100); + await updateJobProgress(jobId, { + cfdisDownloaded: totalDownloaded, + cfdisInserted: totalInserted, + cfdisUpdated: totalUpdated, + progressPercent: progress, + }); + } + + await updateJobProgress(jobId, { + status: 'completed', + completedAt: new Date(), + progressPercent: 100, + }); + + console.log(`[SAT Resume] Job ${jobId} completado. Descargados: ${totalDownloaded}, Insertados: ${totalInserted}, Actualizados: ${totalUpdated}`); + return; + } + + if (verifyResult.status === 'processing' || verifyResult.status === 'pending') { + // Después de 4 horas, si sigue en progreso, probablemente la solicitud se perdió en el SAT + throw new Error(`Solicitud SAT aún en progreso después de 4 horas (${verifyResult.status}), se requiere nuevo intento`); + } + + throw new Error(`Solicitud SAT fallida: ${verifyResult.status} - ${verifyResult.message}`); +} + /** * Inicia la sincronización con el SAT */ diff --git a/apps/api/src/services/tenants.service.ts b/apps/api/src/services/tenants.service.ts index 64c81dc..12adfbb 100644 --- a/apps/api/src/services/tenants.service.ts +++ b/apps/api/src/services/tenants.service.ts @@ -1,6 +1,7 @@ import { prisma, tenantDb } from '../config/database.js'; import { PLANS } from '@horux/shared'; import { emailService } from './email/email.service.js'; +import * as metabaseService from './metabase.service.js'; import { randomBytes } from 'crypto'; import bcrypt from 'bcryptjs'; @@ -54,6 +55,12 @@ export async function createTenant(data: { // 1. Provision a dedicated database for this tenant const databaseName = await tenantDb.provisionDatabase(data.rfc); + // 1b. Register tenant database in Metabase (non-blocking, logs errors only) + metabaseService.registerDatabase({ + nombre: data.nombre, + dbName: databaseName, + }).catch(err => console.error('[METABASE] Register failed:', err)); + // 2. Create tenant record const tenant = await prisma.tenant.create({ data: { @@ -160,5 +167,9 @@ export async function deleteTenant(id: string) { if (tenant) { await tenantDb.deprovisionDatabase(tenant.databaseName); tenantDb.invalidatePool(id); + // Remove from Metabase (non-blocking) + metabaseService.deleteDatabase(tenant.databaseName).catch(err => + console.error('[METABASE] Delete failed:', err) + ); } }