feat: watchdog SAT sync con auto-resume, fix ISR buckets y schema columns
- sat-sync.job.ts: watchdog a 4h que limpia jobs huérfanos y retoma satRequestId existente antes de crear nuevo job (evita desperdiciar consultas SAT). Fallback a startSync si no se puede retomar. - sat.service.ts: nueva función resumeSatSync() para verificar y descargar paquetes de un job huérfano usando su satRequestId. - impuestos.service.ts: reescritura completa del cálculo ISR con modelo de caja y 3 buckets: * Ingresos: I PUE emitidas + P recibidos * Deducciones: I PUE recibidas + P emitidos - E PUE recibidas * Corregidos nombres de columnas type/status vs tipo/estado - tenants.service.ts: integración Metabase (register/delete db)
This commit is contained in:
@@ -1,15 +1,98 @@
|
||||
import cron from 'node-cron';
|
||||
import { prisma } from '../config/database.js';
|
||||
import { startSync, getSyncStatus } from '../services/sat/sat.service.js';
|
||||
import { startSync, getSyncStatus, resumeSatSync } from '../services/sat/sat.service.js';
|
||||
import { hasFielConfigured } from '../services/fiel.service.js';
|
||||
|
||||
const SYNC_CRON_SCHEDULE = '0 3 * * *'; // 3:00 AM todos los días
|
||||
const FREQUENT_SYNC_SCHEDULE = '0 */6 * * *'; // Cada 6 horas (00:00, 06:00, 12:00, 18:00)
|
||||
const FREQUENT_SYNC_RFCS = ['ROEM691011EZ4']; // Tenants con sync frecuente
|
||||
const CONCURRENT_SYNCS = 3; // Máximo de sincronizaciones simultáneas
|
||||
const ORPHANED_SYNC_THRESHOLD_MINUTES = 240; // 4 horas — da tiempo al SAT de procesar antes de retomar
|
||||
|
||||
let isRunning = false;
|
||||
|
||||
/**
|
||||
* Watchdog: marca como failed los jobs 'running' que exceden el umbral de tiempo.
|
||||
* Para syncs 'daily', después de marcarlos como failed intenta lanzar un nuevo job
|
||||
* automáticamente para ver si el SAT ya terminó de procesar la información.
|
||||
* Esto evita que un job huérfano (timeout del script o reinicio del servidor)
|
||||
* bloquee sincronizaciones futuras para ese tenant.
|
||||
*/
|
||||
async function cleanupOrphanedSyncs(): Promise<number> {
|
||||
const threshold = new Date(Date.now() - ORPHANED_SYNC_THRESHOLD_MINUTES * 60 * 1000);
|
||||
|
||||
const orphaned = await prisma.satSyncJob.findMany({
|
||||
where: {
|
||||
status: 'running',
|
||||
startedAt: { lt: threshold },
|
||||
},
|
||||
select: { id: true, tenantId: true, type: true, startedAt: true, satRequestId: true },
|
||||
});
|
||||
|
||||
if (orphaned.length === 0) return 0;
|
||||
|
||||
for (const job of orphaned) {
|
||||
await prisma.satSyncJob.update({
|
||||
where: { id: job.id },
|
||||
data: {
|
||||
status: 'failed',
|
||||
errorMessage: `Watchdog: abandoned job detected (started ${job.startedAt?.toISOString() ?? 'unknown'}, threshold ${ORPHANED_SYNC_THRESHOLD_MINUTES} min)`,
|
||||
completedAt: new Date(),
|
||||
},
|
||||
});
|
||||
const startedStr = job.startedAt ? job.startedAt.toISOString() : 'unknown';
|
||||
console.log(
|
||||
`[SAT Watchdog] Job ${job.id} (tenant ${job.tenantId}) marcado como failed por exceder ${ORPHANED_SYNC_THRESHOLD_MINUTES} min en running (started: ${startedStr})`
|
||||
);
|
||||
|
||||
// Intentar retomar el satRequestId existente antes de crear uno nuevo.
|
||||
// Esto evita desperdiciar consultas al SAT cuando la solicitud original
|
||||
// ya fue aceptada y solo estaba esperando procesamiento.
|
||||
if (job.satRequestId) {
|
||||
try {
|
||||
console.log(`[SAT Watchdog] Intentando retomar job ${job.id} con satRequestId ${job.satRequestId}`);
|
||||
await resumeSatSync(job.id);
|
||||
console.log(`[SAT Watchdog] Job ${job.id} retomado exitosamente`);
|
||||
continue; // Éxito, no necesitamos crear un nuevo job
|
||||
} catch (resumeError: any) {
|
||||
console.error(`[SAT Watchdog] No se pudo retomar job ${job.id}:`, resumeError.message);
|
||||
// Fall-through: marcar como failed y lanzar nuevo job
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: crear un nuevo job si no se pudo retomar
|
||||
if (job.type === 'daily') {
|
||||
try {
|
||||
const hasFiel = await hasFielConfigured(job.tenantId);
|
||||
if (!hasFiel) {
|
||||
console.log(`[SAT Watchdog] Tenant ${job.tenantId} sin FIEL, omitiendo auto-retry`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const activeSync = await prisma.satSyncJob.findFirst({
|
||||
where: {
|
||||
tenantId: job.tenantId,
|
||||
status: { in: ['pending', 'running'] },
|
||||
},
|
||||
});
|
||||
|
||||
if (activeSync) {
|
||||
console.log(`[SAT Watchdog] Tenant ${job.tenantId} ya tiene sync activo (${activeSync.id}), omitiendo auto-retry`);
|
||||
continue;
|
||||
}
|
||||
|
||||
console.log(`[SAT Watchdog] Auto-retry: lanzando nuevo daily sync para tenant ${job.tenantId}`);
|
||||
const newJobId = await startSync(job.tenantId, 'daily');
|
||||
console.log(`[SAT Watchdog] Auto-retry exitoso: nuevo job ${newJobId} para tenant ${job.tenantId}`);
|
||||
} catch (retryError: any) {
|
||||
console.error(`[SAT Watchdog] Auto-retry falló para tenant ${job.tenantId}:`, retryError.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return orphaned.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtiene los tenants que tienen FIEL configurada y activa
|
||||
*/
|
||||
@@ -51,6 +134,9 @@ async function needsInitialSync(tenantId: string): Promise<boolean> {
|
||||
*/
|
||||
async function syncTenant(tenantId: string): Promise<void> {
|
||||
try {
|
||||
// Limpieza de huérfanos antes de verificar (protección por si el cleanup global falló)
|
||||
await cleanupOrphanedSyncs();
|
||||
|
||||
// Verificar si hay sync activo
|
||||
const status = await getSyncStatus(tenantId);
|
||||
if (status.hasActiveSync) {
|
||||
@@ -83,6 +169,12 @@ async function runSyncJob(): Promise<void> {
|
||||
console.log('[SAT Cron] Iniciando job de sincronización diaria');
|
||||
|
||||
try {
|
||||
// Watchdog: limpiar jobs huérfanos antes de iniciar nuevos syncs
|
||||
const cleaned = await cleanupOrphanedSyncs();
|
||||
if (cleaned > 0) {
|
||||
console.log(`[SAT Cron] Watchdog limpió ${cleaned} job(s) huérfano(s)`);
|
||||
}
|
||||
|
||||
const tenantIds = await getTenantsWithFiel();
|
||||
console.log(`[SAT Cron] ${tenantIds.length} tenants con FIEL configurada`);
|
||||
|
||||
@@ -117,6 +209,12 @@ async function runFrequentSyncJob(): Promise<void> {
|
||||
console.log('[SAT Cron] Iniciando sync frecuente');
|
||||
|
||||
try {
|
||||
// Watchdog: limpiar jobs huérfanos antes de iniciar nuevos syncs
|
||||
const cleaned = await cleanupOrphanedSyncs();
|
||||
if (cleaned > 0) {
|
||||
console.log(`[SAT Cron] Watchdog limpió ${cleaned} job(s) huérfano(s)`);
|
||||
}
|
||||
|
||||
const tenants = await prisma.tenant.findMany({
|
||||
where: {
|
||||
active: true,
|
||||
|
||||
@@ -1,6 +1,32 @@
|
||||
import type { Pool } from 'pg';
|
||||
import type { IvaMensual, IsrMensual, ResumenIva, ResumenIsr } from '@horux/shared';
|
||||
|
||||
/**
|
||||
* Obtiene el RFC del contribuyente principal del tenant
|
||||
*/
|
||||
async function getContribuyenteRfc(pool: Pool): Promise<string | null> {
|
||||
try {
|
||||
const { rows } = await pool.query(`
|
||||
SELECT c.rfc
|
||||
FROM contribuyentes c
|
||||
JOIN entidades_gestionadas eg ON c.entidad_id = eg.id
|
||||
LIMIT 1
|
||||
`);
|
||||
return rows[0]?.rfc || null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construye el rango de fechas para un año/mes (acumulado año a la fecha)
|
||||
*/
|
||||
function getDateRange(año: number, mes: number): { inicio: Date; fin: Date } {
|
||||
const inicio = new Date(año, 0, 1, 0, 0, 0); // 1 de enero
|
||||
const fin = new Date(año, mes, 0, 23, 59, 59); // Último día del mes
|
||||
return { inicio, fin };
|
||||
}
|
||||
|
||||
export async function getIvaMensual(pool: Pool, año: number): Promise<IvaMensual[]> {
|
||||
const { rows: data } = await pool.query(`
|
||||
SELECT
|
||||
@@ -47,13 +73,15 @@ export async function getResumenIva(pool: Pool, año: number, mes: number): Prom
|
||||
};
|
||||
}
|
||||
|
||||
// Fallback: calcular sobre la marcha desde cfdis
|
||||
// Corregido: usar type/status y iva_traslado_mxn en lugar de tipo/estado/iva
|
||||
const { rows: [calcResult] } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(SUM(CASE WHEN tipo = 'ingreso' THEN iva ELSE 0 END), 0) as trasladado,
|
||||
COALESCE(SUM(CASE WHEN tipo = 'egreso' THEN iva ELSE 0 END), 0) as acreditable,
|
||||
COALESCE(SUM(iva_retenido), 0) as retenido
|
||||
COALESCE(SUM(CASE WHEN type = 'EMITIDO' AND tipo_comprobante = 'I' THEN iva_traslado_mxn ELSE 0 END), 0) as trasladado,
|
||||
COALESCE(SUM(CASE WHEN type = 'RECIBIDO' AND tipo_comprobante = 'I' THEN iva_traslado_mxn ELSE 0 END), 0) as acreditable,
|
||||
COALESCE(SUM(iva_retencion_mxn), 0) as retenido
|
||||
FROM cfdis
|
||||
WHERE estado = 'vigente'
|
||||
WHERE status = 'Vigente'
|
||||
AND EXTRACT(YEAR FROM fecha_emision) = $1
|
||||
AND EXTRACT(MONTH FROM fecha_emision) = $2
|
||||
`, [año, mes]);
|
||||
@@ -105,41 +133,180 @@ export async function getIsrMensual(pool: Pool, año: number): Promise<IsrMensua
|
||||
}
|
||||
|
||||
export async function getResumenIsr(pool: Pool, año: number, mes: number): Promise<ResumenIsr> {
|
||||
const { rows: [ingresos] } = await pool.query(`
|
||||
SELECT COALESCE(SUM(total), 0) as total
|
||||
const rfc = await getContribuyenteRfc(pool);
|
||||
if (!rfc) {
|
||||
console.error('[ISR] No se encontró RFC del contribuyente');
|
||||
return {
|
||||
ingresosAcumulados: 0,
|
||||
deducciones: 0,
|
||||
baseGravable: 0,
|
||||
isrCausado: 0,
|
||||
isrRetenido: 0,
|
||||
isrAPagar: 0,
|
||||
};
|
||||
}
|
||||
|
||||
const { inicio, fin } = getDateRange(año, mes);
|
||||
|
||||
// Ingresos Bucket A: Facturas I PUE emitidas (ingresos de contado)
|
||||
const { rows: ingresosContadoRows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(regimen_fiscal_emisor, 'SIN') as regimen,
|
||||
SUM(
|
||||
total_mxn
|
||||
- COALESCE(iva_traslado_mxn, 0)
|
||||
- COALESCE(ieps_traslado_mxn, 0)
|
||||
- COALESCE(impuestos_locales_trasladado_mxn, 0)
|
||||
- COALESCE(descuento_mxn, 0)
|
||||
) as total
|
||||
FROM cfdis
|
||||
WHERE tipo = 'ingreso' AND estado = 'vigente'
|
||||
AND EXTRACT(YEAR FROM fecha_emision) = $1
|
||||
AND EXTRACT(MONTH FROM fecha_emision) <= $2
|
||||
`, [año, mes]);
|
||||
WHERE rfc_emisor = $1
|
||||
AND tipo_comprobante = 'I'
|
||||
AND metodo_pago = 'PUE'
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_emision >= $2 AND fecha_emision <= $3
|
||||
GROUP BY COALESCE(regimen_fiscal_emisor, 'SIN')
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
const { rows: [egresos] } = await pool.query(`
|
||||
SELECT COALESCE(SUM(total), 0) as total
|
||||
// Ingresos Bucket B: Pagos P recibidos (ingresos a crédito cobrados)
|
||||
const { rows: ingresosCreditoRows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(regimen_fiscal_receptor, 'SIN') as regimen,
|
||||
SUM(
|
||||
monto_pago_mxn
|
||||
- COALESCE(iva_traslado_pago_mxn, 0)
|
||||
- COALESCE(ieps_traslado_pago_mxn, 0)
|
||||
- COALESCE(impuestos_locales_trasladado_mxn, 0)
|
||||
) as total
|
||||
FROM cfdis
|
||||
WHERE tipo = 'egreso' AND estado = 'vigente'
|
||||
AND EXTRACT(YEAR FROM fecha_emision) = $1
|
||||
AND EXTRACT(MONTH FROM fecha_emision) <= $2
|
||||
`, [año, mes]);
|
||||
WHERE rfc_receptor = $1
|
||||
AND tipo_comprobante = 'P'
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_pago_p >= $2 AND fecha_pago_p <= $3
|
||||
GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN')
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
const { rows: [retenido] } = await pool.query(`
|
||||
SELECT COALESCE(SUM(isr_retenido), 0) as total
|
||||
// Bucket 1: Deducciones - Facturas I PUE recibidas
|
||||
const { rows: bucket1Rows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(regimen_fiscal_receptor, 'SIN') as regimen,
|
||||
SUM(
|
||||
total_mxn
|
||||
- COALESCE(iva_traslado_mxn, 0)
|
||||
- COALESCE(ieps_traslado_mxn, 0)
|
||||
- COALESCE(impuestos_locales_trasladado_mxn, 0)
|
||||
- COALESCE(descuento_mxn, 0)
|
||||
) as total
|
||||
FROM cfdis
|
||||
WHERE estado = 'vigente'
|
||||
AND EXTRACT(YEAR FROM fecha_emision) = $1
|
||||
AND EXTRACT(MONTH FROM fecha_emision) <= $2
|
||||
`, [año, mes]);
|
||||
WHERE rfc_receptor = $1
|
||||
AND tipo_comprobante = 'I'
|
||||
AND metodo_pago = 'PUE'
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_emision >= $2 AND fecha_emision <= $3
|
||||
GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN')
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
const ingresosAcumulados = Number(ingresos?.total || 0);
|
||||
const deducciones = Number(egresos?.total || 0);
|
||||
const baseGravable = Math.max(0, ingresosAcumulados - deducciones);
|
||||
// Bucket 2: Deducciones - Pagos P emitidos (pagos a crédito)
|
||||
const { rows: bucket2Rows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(regimen_fiscal_emisor, 'SIN') as regimen,
|
||||
SUM(
|
||||
monto_pago_mxn
|
||||
- COALESCE(iva_traslado_pago_mxn, 0)
|
||||
- COALESCE(ieps_traslado_pago_mxn, 0)
|
||||
- COALESCE(impuestos_locales_trasladado_mxn, 0)
|
||||
) as total
|
||||
FROM cfdis
|
||||
WHERE rfc_emisor = $1
|
||||
AND tipo_comprobante = 'P'
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_pago_p >= $2 AND fecha_pago_p <= $3
|
||||
GROUP BY COALESCE(regimen_fiscal_emisor, 'SIN')
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
// Bucket 3: Deducciones - Notas de crédito E PUE recibidas (resta)
|
||||
const { rows: bucket3Rows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(regimen_fiscal_receptor, 'SIN') as regimen,
|
||||
SUM(
|
||||
total_mxn
|
||||
- COALESCE(iva_traslado_mxn, 0)
|
||||
- COALESCE(ieps_traslado_mxn, 0)
|
||||
- COALESCE(impuestos_locales_trasladado_mxn, 0)
|
||||
- COALESCE(descuento_mxn, 0)
|
||||
) as total
|
||||
FROM cfdis
|
||||
WHERE rfc_receptor = $1
|
||||
AND tipo_comprobante = 'E'
|
||||
AND metodo_pago = 'PUE'
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_emision >= $2 AND fecha_emision <= $3
|
||||
GROUP BY COALESCE(regimen_fiscal_receptor, 'SIN')
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
// ISR retenido (acumulado año a la fecha)
|
||||
const { rows: [retenidoRow] } = await pool.query(`
|
||||
SELECT COALESCE(SUM(isr_retencion_mxn), 0) as total
|
||||
FROM cfdis
|
||||
WHERE (rfc_emisor = $1 OR rfc_receptor = $1)
|
||||
AND status NOT IN ('Cancelado', '0')
|
||||
AND fecha_emision >= $2 AND fecha_emision <= $3
|
||||
`, [rfc, inicio, fin]);
|
||||
|
||||
// Acumular totales por régimen
|
||||
const porRegimen = new Map<string, { ingresos: number; deducciones: number }>();
|
||||
|
||||
for (const row of ingresosContadoRows) {
|
||||
const r = row.regimen as string;
|
||||
const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 };
|
||||
actual.ingresos += Number(row.total || 0);
|
||||
porRegimen.set(r, actual);
|
||||
}
|
||||
|
||||
for (const row of ingresosCreditoRows) {
|
||||
const r = row.regimen as string;
|
||||
const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 };
|
||||
actual.ingresos += Number(row.total || 0);
|
||||
porRegimen.set(r, actual);
|
||||
}
|
||||
|
||||
for (const row of bucket1Rows) {
|
||||
const r = row.regimen as string;
|
||||
const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 };
|
||||
actual.deducciones += Number(row.total || 0);
|
||||
porRegimen.set(r, actual);
|
||||
}
|
||||
|
||||
for (const row of bucket2Rows) {
|
||||
const r = row.regimen as string;
|
||||
const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 };
|
||||
actual.deducciones += Number(row.total || 0);
|
||||
porRegimen.set(r, actual);
|
||||
}
|
||||
|
||||
for (const row of bucket3Rows) {
|
||||
const r = row.regimen as string;
|
||||
const actual = porRegimen.get(r) || { ingresos: 0, deducciones: 0 };
|
||||
actual.deducciones -= Number(row.total || 0);
|
||||
porRegimen.set(r, actual);
|
||||
}
|
||||
|
||||
let ingresosAcumulados = 0;
|
||||
let deduccionesTotales = 0;
|
||||
|
||||
for (const [, valores] of porRegimen) {
|
||||
ingresosAcumulados += Math.max(0, valores.ingresos);
|
||||
deduccionesTotales += Math.max(0, valores.deducciones);
|
||||
}
|
||||
|
||||
const baseGravable = Math.max(0, ingresosAcumulados - deduccionesTotales);
|
||||
const isrCausado = baseGravable * 0.30;
|
||||
const isrRetenido = Number(retenido?.total || 0);
|
||||
const isrRetenido = Number(retenidoRow?.total || 0);
|
||||
const isrAPagar = Math.max(0, isrCausado - isrRetenido);
|
||||
|
||||
return {
|
||||
ingresosAcumulados,
|
||||
deducciones,
|
||||
deducciones: deduccionesTotales,
|
||||
baseGravable,
|
||||
isrCausado,
|
||||
isrRetenido,
|
||||
|
||||
167
apps/api/src/services/metabase.service.ts
Normal file
167
apps/api/src/services/metabase.service.ts
Normal file
@@ -0,0 +1,167 @@
|
||||
/**
|
||||
* Metabase integration service.
|
||||
* Automatically registers newly-provisioned tenant databases in Metabase.
|
||||
*/
|
||||
|
||||
const METABASE_URL = process.env.METABASE_URL || 'http://192.168.10.170:3000';
|
||||
const METABASE_USERNAME = process.env.METABASE_USERNAME || 'ialcarazsalazar@consultoria-as.com';
|
||||
const METABASE_PASSWORD = process.env.METABASE_PASSWORD || '';
|
||||
|
||||
// PostgreSQL connection details exposed to Metabase
|
||||
const PG_HOST = process.env.METABASE_PG_HOST || '192.168.10.90';
|
||||
const PG_PORT = parseInt(process.env.METABASE_PG_PORT || '5432', 10);
|
||||
const PG_USER = process.env.METABASE_PG_USER || 'postgres';
|
||||
const PG_PASSWORD = process.env.METABASE_PG_PASSWORD || '';
|
||||
|
||||
let cachedSessionToken: string | null = null;
|
||||
let tokenExpiresAt = 0;
|
||||
|
||||
async function getSessionToken(): Promise<string | null> {
|
||||
// Re-use cached token if still valid (Metabase sessions last 2 weeks by default)
|
||||
if (cachedSessionToken && Date.now() < tokenExpiresAt) {
|
||||
return cachedSessionToken;
|
||||
}
|
||||
|
||||
if (!METABASE_PASSWORD) {
|
||||
console.error('[METABASE] METABASE_PASSWORD not configured');
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await fetch(`${METABASE_URL}/api/session`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
username: METABASE_USERNAME,
|
||||
password: METABASE_PASSWORD,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text();
|
||||
console.error(`[METABASE] Auth failed: ${res.status} ${text}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const data = await res.json() as { id?: string };
|
||||
if (!data.id) {
|
||||
console.error('[METABASE] Auth response missing session id');
|
||||
return null;
|
||||
}
|
||||
|
||||
cachedSessionToken = data.id;
|
||||
tokenExpiresAt = Date.now() + 13 * 24 * 60 * 60 * 1000; // 13 days
|
||||
return cachedSessionToken;
|
||||
} catch (err) {
|
||||
console.error('[METABASE] Error fetching session token:', err);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
interface RegisterDatabaseInput {
|
||||
nombre: string;
|
||||
dbName: string;
|
||||
}
|
||||
|
||||
export async function registerDatabase(input: RegisterDatabaseInput): Promise<void> {
|
||||
const sessionToken = await getSessionToken();
|
||||
if (!sessionToken) {
|
||||
console.error('[METABASE] Skipping database registration — no session token');
|
||||
return;
|
||||
}
|
||||
|
||||
if (!PG_PASSWORD) {
|
||||
console.error('[METABASE] METABASE_PG_PASSWORD not configured');
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = {
|
||||
name: input.nombre,
|
||||
engine: 'postgres',
|
||||
details: {
|
||||
host: PG_HOST,
|
||||
port: PG_PORT,
|
||||
dbname: input.dbName,
|
||||
user: PG_USER,
|
||||
password: PG_PASSWORD,
|
||||
ssl: false,
|
||||
'tunnel-enabled': false,
|
||||
'advanced-options': false,
|
||||
'schema-filters-type': 'all',
|
||||
},
|
||||
auto_run_queries: true,
|
||||
is_full_sync: true,
|
||||
is_on_demand: false,
|
||||
};
|
||||
|
||||
try {
|
||||
const res = await fetch(`${METABASE_URL}/api/database`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Metabase-Session': sessionToken,
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text();
|
||||
// 409 or duplicate name is not fatal — log and continue
|
||||
if (res.status === 400 && text.includes('already exists')) {
|
||||
console.log(`[METABASE] Database "${input.nombre}" already registered`);
|
||||
return;
|
||||
}
|
||||
console.error(`[METABASE] Register database failed: ${res.status} ${text}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const data = await res.json() as { id?: number };
|
||||
console.log(`[METABASE] Database "${input.nombre}" registered with id=${data.id}`);
|
||||
} catch (err) {
|
||||
console.error('[METABASE] Error registering database:', err);
|
||||
}
|
||||
}
|
||||
|
||||
export async function deleteDatabase(databaseName: string): Promise<void> {
|
||||
const sessionToken = await getSessionToken();
|
||||
if (!sessionToken) {
|
||||
console.error('[METABASE] Skipping database deletion — no session token');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Find database by name
|
||||
const listRes = await fetch(`${METABASE_URL}/api/database`, {
|
||||
headers: { 'X-Metabase-Session': sessionToken },
|
||||
});
|
||||
|
||||
if (!listRes.ok) {
|
||||
console.error(`[METABASE] Failed to list databases: ${listRes.status}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const listData = await listRes.json() as { data?: Array<{ id: number; name: string; details?: { dbname?: string } }> };
|
||||
const db = listData.data?.find(
|
||||
(d) => d.details?.dbname === databaseName || d.name.includes(databaseName)
|
||||
);
|
||||
|
||||
if (!db) {
|
||||
console.log(`[METABASE] No database found for ${databaseName}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const deleteRes = await fetch(`${METABASE_URL}/api/database/${db.id}`, {
|
||||
method: 'DELETE',
|
||||
headers: { 'X-Metabase-Session': sessionToken },
|
||||
});
|
||||
|
||||
if (!deleteRes.ok) {
|
||||
console.error(`[METABASE] Delete database failed: ${deleteRes.status}`);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[METABASE] Database ${db.id} (${databaseName}) deleted`);
|
||||
} catch (err) {
|
||||
console.error('[METABASE] Error deleting database:', err);
|
||||
}
|
||||
}
|
||||
@@ -417,6 +417,105 @@ async function processDailySync(ctx: SyncContext, jobId: string): Promise<void>
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retoma una sincronización huérfana usando el satRequestId existente.
|
||||
* Esto evita desperdiciar una consulta al SAT cuando el job anterior
|
||||
* murió durante el polling (timeout del script o reinicio del servidor).
|
||||
*/
|
||||
export async function resumeSatSync(jobId: string): Promise<void> {
|
||||
const job = await prisma.satSyncJob.findUnique({ where: { id: jobId } });
|
||||
if (!job) throw new Error('Job no encontrado');
|
||||
if (!job.satRequestId) throw new Error('Job no tiene satRequestId, no se puede retomar');
|
||||
if (job.status !== 'running') throw new Error(`Job está en estado ${job.status}, no se puede retomar`);
|
||||
|
||||
const decryptedFiel = await getDecryptedFiel(job.tenantId);
|
||||
if (!decryptedFiel) {
|
||||
throw new Error('No hay FIEL configurada o está vencida');
|
||||
}
|
||||
|
||||
const fielData: FielData = {
|
||||
cerContent: decryptedFiel.cerContent,
|
||||
keyContent: decryptedFiel.keyContent,
|
||||
password: decryptedFiel.password,
|
||||
};
|
||||
|
||||
const service = createSatService(fielData);
|
||||
|
||||
const tenant = await prisma.tenant.findUnique({
|
||||
where: { id: job.tenantId },
|
||||
select: { databaseName: true },
|
||||
});
|
||||
|
||||
if (!tenant) throw new Error('Tenant no encontrado');
|
||||
|
||||
const ctx: SyncContext = {
|
||||
fielData,
|
||||
service,
|
||||
rfc: decryptedFiel.rfc,
|
||||
tenantId: job.tenantId,
|
||||
databaseName: tenant.databaseName,
|
||||
};
|
||||
|
||||
console.log(`[SAT Resume] Verificando solicitud ${job.satRequestId} para job ${jobId}`);
|
||||
|
||||
const verifyResult = await verifySatRequest(service, job.satRequestId);
|
||||
console.log(`[SAT Resume] Estado: ${verifyResult.status} | CFDIs: ${verifyResult.totalCfdis} | Paquetes: ${verifyResult.packageIds.length}`);
|
||||
|
||||
if (verifyResult.status === 'ready') {
|
||||
const packageIds = verifyResult.packageIds;
|
||||
await updateJobProgress(jobId, {
|
||||
satPackageIds: packageIds,
|
||||
cfdisFound: verifyResult.totalCfdis,
|
||||
});
|
||||
|
||||
let totalInserted = 0;
|
||||
let totalUpdated = 0;
|
||||
let totalDownloaded = 0;
|
||||
|
||||
for (let i = 0; i < packageIds.length; i++) {
|
||||
const packageId = packageIds[i];
|
||||
console.log(`[SAT Resume] Descargando paquete ${i + 1}/${packageIds.length}: ${packageId}`);
|
||||
|
||||
const downloadResult = await downloadSatPackage(service, packageId);
|
||||
if (!downloadResult.success) {
|
||||
console.error(`[SAT Resume] Error descargando paquete ${packageId}: ${downloadResult.message}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const cfdis = processPackage(downloadResult.packageContent);
|
||||
totalDownloaded += cfdis.length;
|
||||
|
||||
const { inserted, updated } = await saveCfdis(ctx, cfdis, jobId);
|
||||
totalInserted += inserted;
|
||||
totalUpdated += updated;
|
||||
|
||||
const progress = Math.round(((i + 1) / packageIds.length) * 100);
|
||||
await updateJobProgress(jobId, {
|
||||
cfdisDownloaded: totalDownloaded,
|
||||
cfdisInserted: totalInserted,
|
||||
cfdisUpdated: totalUpdated,
|
||||
progressPercent: progress,
|
||||
});
|
||||
}
|
||||
|
||||
await updateJobProgress(jobId, {
|
||||
status: 'completed',
|
||||
completedAt: new Date(),
|
||||
progressPercent: 100,
|
||||
});
|
||||
|
||||
console.log(`[SAT Resume] Job ${jobId} completado. Descargados: ${totalDownloaded}, Insertados: ${totalInserted}, Actualizados: ${totalUpdated}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (verifyResult.status === 'processing' || verifyResult.status === 'pending') {
|
||||
// Después de 4 horas, si sigue en progreso, probablemente la solicitud se perdió en el SAT
|
||||
throw new Error(`Solicitud SAT aún en progreso después de 4 horas (${verifyResult.status}), se requiere nuevo intento`);
|
||||
}
|
||||
|
||||
throw new Error(`Solicitud SAT fallida: ${verifyResult.status} - ${verifyResult.message}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inicia la sincronización con el SAT
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { prisma, tenantDb } from '../config/database.js';
|
||||
import { PLANS } from '@horux/shared';
|
||||
import { emailService } from './email/email.service.js';
|
||||
import * as metabaseService from './metabase.service.js';
|
||||
import { randomBytes } from 'crypto';
|
||||
import bcrypt from 'bcryptjs';
|
||||
|
||||
@@ -54,6 +55,12 @@ export async function createTenant(data: {
|
||||
// 1. Provision a dedicated database for this tenant
|
||||
const databaseName = await tenantDb.provisionDatabase(data.rfc);
|
||||
|
||||
// 1b. Register tenant database in Metabase (non-blocking, logs errors only)
|
||||
metabaseService.registerDatabase({
|
||||
nombre: data.nombre,
|
||||
dbName: databaseName,
|
||||
}).catch(err => console.error('[METABASE] Register failed:', err));
|
||||
|
||||
// 2. Create tenant record
|
||||
const tenant = await prisma.tenant.create({
|
||||
data: {
|
||||
@@ -160,5 +167,9 @@ export async function deleteTenant(id: string) {
|
||||
if (tenant) {
|
||||
await tenantDb.deprovisionDatabase(tenant.databaseName);
|
||||
tenantDb.invalidatePool(id);
|
||||
// Remove from Metabase (non-blocking)
|
||||
metabaseService.deleteDatabase(tenant.databaseName).catch(err =>
|
||||
console.error('[METABASE] Delete failed:', err)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user