- Add metabase.service.ts for automatic DB registration on tenant creation - Hook createTenant, addTenantToOwner and deleteTenant to sync with Metabase - Add environment variables for Metabase integration - Fix dashboard routing for global admin users - Fix CFDI status casing (Vigente vs vigente) - Fix sidebar empty nav crash - Fix KPI null regimen_fiscal values - Fix CFDI type mapping (EMITIDO/RECIBIDO) - Update branding from Horux360 to Horux Despachos - Add legacy migration scripts for central and tenant DBs
324 lines
12 KiB
TypeScript
324 lines
12 KiB
TypeScript
/**
|
|
* ETL: Migración de BDs tenant Horux360 → HoruxDespachos
|
|
*/
|
|
import 'dotenv/config';
|
|
import { Pool } from 'pg';
|
|
import { prisma } from '../src/config/database.js';
|
|
import { migrate } from '../src/config/tenant-migrations.js';
|
|
|
|
const BACKUP_SUFFIX = '_backup_20260427';
|
|
const BATCH_SIZE = 1000;
|
|
|
|
const adminPool = new Pool({
|
|
host: 'localhost',
|
|
port: 5432,
|
|
user: 'postgres',
|
|
password: 'ZxHMrmnwanvLfLDdNJdRthFjWF2Lj1Rb',
|
|
database: 'postgres',
|
|
max: 2,
|
|
});
|
|
|
|
async function renameDatabase(oldName: string, newName: string) {
|
|
await adminPool.query(`
|
|
SELECT pg_terminate_backend(pid)
|
|
FROM pg_stat_activity
|
|
WHERE datname = $1 AND pid <> pg_backend_pid();
|
|
`, [oldName]);
|
|
await adminPool.query(`ALTER DATABASE "${oldName}" RENAME TO "${newName}";`);
|
|
}
|
|
|
|
async function createDatabase(name: string) {
|
|
await adminPool.query(`CREATE DATABASE "${name}";`);
|
|
}
|
|
|
|
async function dropDatabase(name: string) {
|
|
await adminPool.query(`
|
|
SELECT pg_terminate_backend(pid)
|
|
FROM pg_stat_activity
|
|
WHERE datname = $1 AND pid <> pg_backend_pid();
|
|
`, [name]);
|
|
await adminPool.query(`DROP DATABASE IF EXISTS "${name}";`);
|
|
}
|
|
|
|
function mapTipoComprobante(tipo: string): string {
|
|
switch (tipo) {
|
|
case 'ingreso': return 'I';
|
|
case 'egreso': return 'E';
|
|
case 'traslado': return 'T';
|
|
case 'nomina': return 'N';
|
|
case 'pago': return 'P';
|
|
default: return 'I';
|
|
}
|
|
}
|
|
|
|
async function runEtl(backupPool: Pool, targetPool: Pool, tenant: any) {
|
|
// Count source CFDIs
|
|
const countRes = await backupPool.query('SELECT COUNT(*)::int as cnt FROM cfdis');
|
|
const total = countRes.rows[0].cnt;
|
|
if (total === 0) {
|
|
console.log(` No CFDIs to migrate`);
|
|
return { inserted: 0, total: 0 };
|
|
}
|
|
|
|
console.log(` Migrating ${total} CFDIs...`);
|
|
let offset = 0;
|
|
let inserted = 0;
|
|
|
|
while (offset < total) {
|
|
const { rows } = await backupPool.query(`
|
|
SELECT
|
|
uuid_fiscal, tipo, serie, folio, fecha_emision, fecha_timbrado,
|
|
rfc_emisor, nombre_emisor, rfc_receptor, nombre_receptor,
|
|
subtotal, descuento, iva, isr_retenido, iva_retenido, total,
|
|
moneda, tipo_cambio, metodo_pago, forma_pago, uso_cfdi, estado,
|
|
xml_url, pdf_url, xml_original, created_at, updated_at,
|
|
last_sat_sync, sat_sync_job_id, source
|
|
FROM cfdis
|
|
ORDER BY created_at
|
|
LIMIT $1 OFFSET $2
|
|
`, [BATCH_SIZE, offset]);
|
|
|
|
if (rows.length === 0) break;
|
|
|
|
const values: any[] = [];
|
|
const placeholders: string[] = [];
|
|
let paramIdx = 1;
|
|
|
|
for (const row of rows) {
|
|
const mxnMultiplier = row.moneda === 'MXN' ? 1 : (parseFloat(row.tipo_cambio) || 1);
|
|
const tipoComp = mapTipoComprobante(row.tipo);
|
|
const year = row.fecha_emision ? new Date(row.fecha_emision).getFullYear().toString() : null;
|
|
const month = row.fecha_emision ? String(new Date(row.fecha_emision).getMonth() + 1).padStart(2, '0') : null;
|
|
|
|
placeholders.push(`(
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++},
|
|
$${paramIdx++}, $${paramIdx++}, $${paramIdx++}, $${paramIdx++}
|
|
)`);
|
|
|
|
values.push(
|
|
year, month, row.tipo, row.uuid_fiscal ? String(row.uuid_fiscal).toLowerCase() : null,
|
|
row.serie, row.folio, row.estado, row.fecha_emision,
|
|
row.rfc_emisor, row.nombre_emisor, row.rfc_receptor, row.nombre_receptor,
|
|
row.subtotal, parseFloat(row.subtotal || 0) * mxnMultiplier,
|
|
row.descuento, parseFloat(row.descuento || 0) * mxnMultiplier,
|
|
row.total, parseFloat(row.total || 0) * mxnMultiplier,
|
|
row.moneda, row.tipo_cambio, tipoComp,
|
|
row.metodo_pago, row.forma_pago, row.uso_cfdi,
|
|
row.xml_url, row.pdf_url, row.xml_original,
|
|
row.source || 'manual', row.last_sat_sync, row.sat_sync_job_id,
|
|
row.created_at, row.updated_at,
|
|
parseFloat(row.iva || 0), parseFloat(row.iva || 0) * mxnMultiplier,
|
|
parseFloat(row.isr_retenido || 0), parseFloat(row.isr_retenido || 0) * mxnMultiplier,
|
|
parseFloat(row.iva_retenido || 0), parseFloat(row.iva_retenido || 0) * mxnMultiplier,
|
|
null // contribuyente_id will be updated later
|
|
);
|
|
}
|
|
|
|
const sql = `
|
|
INSERT INTO cfdis (
|
|
year, month, type, uuid, serie, folio, status,
|
|
fecha_emision, rfc_emisor, nombre_emisor, rfc_receptor, nombre_receptor,
|
|
subtotal, subtotal_mxn, descuento, descuento_mxn, total, total_mxn,
|
|
moneda, tipo_cambio, tipo_comprobante, metodo_pago, forma_pago, uso_cfdi,
|
|
xml_url, pdf_url, xml_original, source, last_sat_sync, sat_sync_job_id,
|
|
creado_en, actualizado_en,
|
|
iva_traslado, iva_traslado_mxn, isr_retencion, isr_retencion_mxn,
|
|
iva_retencion, iva_retencion_mxn, contribuyente_id
|
|
) VALUES ${placeholders.join(',')}
|
|
`;
|
|
|
|
await targetPool.query(sql, values);
|
|
inserted += rows.length;
|
|
offset += rows.length;
|
|
|
|
if (offset % 10000 === 0 || offset >= total) {
|
|
console.log(` ... ${inserted}/${total} CFDIs inserted`);
|
|
}
|
|
}
|
|
|
|
return { inserted, total };
|
|
}
|
|
|
|
async function createDefaultContribuyente(target: Pool, tenant: any) {
|
|
const entidadRes = await target.query(`
|
|
INSERT INTO entidades_gestionadas (id, tipo, nombre, identificador, active, created_at, updated_at)
|
|
VALUES (gen_random_uuid(), 'contribuyente', $1, $2, true, NOW(), NOW())
|
|
RETURNING id
|
|
`, [tenant.nombre, tenant.rfc]);
|
|
|
|
const entidadId = entidadRes.rows[0].id;
|
|
|
|
await target.query(`
|
|
INSERT INTO contribuyentes (entidad_id, rfc, codigo_postal, domicilio, email_preferences, activos_fijos_usos_excluidos)
|
|
VALUES ($1, $2, NULL, '{}'::jsonb, '{}'::jsonb, '[]'::jsonb)
|
|
`, [entidadId, tenant.rfc]);
|
|
|
|
return entidadId;
|
|
}
|
|
|
|
async function buildRfcCatalog(target: Pool) {
|
|
await target.query(`
|
|
INSERT INTO rfcs (rfc, razon_social)
|
|
SELECT DISTINCT rfc_emisor, nombre_emisor FROM cfdis WHERE rfc_emisor IS NOT NULL
|
|
ON CONFLICT (rfc) DO NOTHING;
|
|
`);
|
|
await target.query(`
|
|
INSERT INTO rfcs (rfc, razon_social)
|
|
SELECT DISTINCT rfc_receptor, nombre_receptor FROM cfdis WHERE rfc_receptor IS NOT NULL
|
|
ON CONFLICT (rfc) DO NOTHING;
|
|
`);
|
|
|
|
await target.query(`
|
|
UPDATE cfdis c
|
|
SET rfc_emisor_id = r.id
|
|
FROM rfcs r
|
|
WHERE c.rfc_emisor = r.rfc AND c.rfc_emisor_id IS NULL;
|
|
`);
|
|
await target.query(`
|
|
UPDATE cfdis c
|
|
SET rfc_receptor_id = r.id
|
|
FROM rfcs r
|
|
WHERE c.rfc_receptor = r.rfc AND c.rfc_receptor_id IS NULL;
|
|
`);
|
|
}
|
|
|
|
async function migrateTenant(tenant: any) {
|
|
console.log(`\n🔄 Processing tenant: ${tenant.rfc} (${tenant.nombre})`);
|
|
|
|
const oldDb = tenant.databaseName;
|
|
const backupDb = oldDb + BACKUP_SUFFIX;
|
|
|
|
const dbList = await adminPool.query(`
|
|
SELECT datname FROM pg_database WHERE datname = ANY($1)
|
|
`, [[oldDb, backupDb]]);
|
|
|
|
const names = dbList.rows.map((r: any) => r.datname);
|
|
const hasBackup = names.includes(backupDb);
|
|
const hasNew = names.includes(oldDb);
|
|
|
|
let backupPool: Pool;
|
|
let targetPool: Pool;
|
|
let isRetry = false;
|
|
|
|
if (hasBackup && hasNew) {
|
|
// Retry mode: backup and new both exist. Just re-run ETL after truncating data tables.
|
|
console.log(` ⏭️ Retry mode: both backup and new DB exist. Truncating and re-inserting...`);
|
|
isRetry = true;
|
|
targetPool = new Pool({ host: 'localhost', port: 5432, user: 'postgres', password: 'ZxHMrmnwanvLfLDdNJdRthFjWF2Lj1Rb', database: oldDb, max: 5 });
|
|
await targetPool.query(`TRUNCATE cfdis, rfcs, entidades_gestionadas, contribuyentes, activos_fijos_baja, cfdi_descartados, documentos_extras, papeleria_trabajo CASCADE`);
|
|
backupPool = new Pool({ host: 'localhost', port: 5432, user: 'postgres', password: 'ZxHMrmnwanvLfLDdNJdRthFjWF2Lj1Rb', database: backupDb, max: 3 });
|
|
} else if (hasBackup && !hasNew) {
|
|
console.log(` ⏭️ Already migrated (backup exists, new missing). Skipping.`);
|
|
return { status: 'skipped', tenant: tenant.rfc };
|
|
} else {
|
|
// Fresh migration
|
|
console.log(` 1/6 Renaming ${oldDb} → ${backupDb}`);
|
|
await renameDatabase(oldDb, backupDb);
|
|
|
|
console.log(` 2/6 Creating new database ${oldDb}`);
|
|
await createDatabase(oldDb);
|
|
|
|
targetPool = new Pool({ host: 'localhost', port: 5432, user: 'postgres', password: 'ZxHMrmnwanvLfLDdNJdRthFjWF2Lj1Rb', database: oldDb, max: 5 });
|
|
console.log(` 3/6 Applying SQL migrations...`);
|
|
const applied = await migrate(targetPool, tenant.rfc);
|
|
console.log(` Migrations applied: ${applied}`);
|
|
|
|
backupPool = new Pool({ host: 'localhost', port: 5432, user: 'postgres', password: 'ZxHMrmnwanvLfLDdNJdRthFjWF2Lj1Rb', database: backupDb, max: 3 });
|
|
}
|
|
|
|
// 4. Migrate CFDIs
|
|
console.log(` 4/6 Migrating CFDIs...`);
|
|
const contribuyenteId = await createDefaultContribuyente(targetPool, tenant);
|
|
console.log(` → Default contribuyente created: ${contribuyenteId}`);
|
|
|
|
const cfdiResult = await runEtl(backupPool, targetPool, tenant);
|
|
|
|
// Update contribuyente_id
|
|
if (cfdiResult.total > 0) {
|
|
await targetPool.query(`UPDATE cfdis SET contribuyente_id = $1 WHERE contribuyente_id IS NULL`, [contribuyenteId]);
|
|
}
|
|
|
|
// 5. Build RFC catalog
|
|
console.log(` 5/6 Building RFC catalog...`);
|
|
if (cfdiResult.total > 0) {
|
|
await buildRfcCatalog(targetPool);
|
|
}
|
|
|
|
// 6. Validation
|
|
console.log(` 6/6 Validating...`);
|
|
const valRes = await targetPool.query(`SELECT COUNT(*)::int as cnt FROM cfdis`);
|
|
const finalCount = valRes.rows[0].cnt;
|
|
const nullContrib = await targetPool.query(`SELECT COUNT(*)::int as cnt FROM cfdis WHERE contribuyente_id IS NULL`);
|
|
const dupUuids = await targetPool.query(`
|
|
SELECT LOWER(uuid::text), COUNT(*) FROM cfdis WHERE uuid IS NOT NULL
|
|
GROUP BY LOWER(uuid::text) HAVING COUNT(*) > 1
|
|
`);
|
|
|
|
console.log(` ✅ CFDIs: ${finalCount} (expected: ${cfdiResult.total})`);
|
|
console.log(` ✅ Null contribuyente_id: ${nullContrib.rows[0].cnt}`);
|
|
console.log(` ✅ Duplicate UUIDs: ${dupUuids.rows.length}`);
|
|
|
|
await backupPool.end();
|
|
await targetPool.end();
|
|
|
|
return {
|
|
status: 'success',
|
|
tenant: tenant.rfc,
|
|
cfdis: finalCount,
|
|
expected: cfdiResult.total,
|
|
};
|
|
}
|
|
|
|
async function main() {
|
|
console.log('=== Tenant DB Migration: Horux360 → HoruxDespachos ===\n');
|
|
|
|
const tenants = await prisma.tenant.findMany({
|
|
where: { active: true },
|
|
orderBy: { createdAt: 'asc' },
|
|
});
|
|
|
|
console.log(`Found ${tenants.length} active tenants\n`);
|
|
|
|
const results: any[] = [];
|
|
|
|
for (const tenant of tenants) {
|
|
try {
|
|
const result = await migrateTenant(tenant);
|
|
results.push(result);
|
|
} catch (err: any) {
|
|
console.error(` ❌ FAILED: ${err.message}`);
|
|
results.push({ status: 'failed', tenant: tenant.rfc, error: err.message });
|
|
}
|
|
}
|
|
|
|
console.log('\n=== Summary ===');
|
|
for (const r of results) {
|
|
const icon = r.status === 'success' ? '✅' : r.status === 'skipped' ? '⏭️' : '❌';
|
|
console.log(`${icon} ${r.tenant}: ${r.status}${r.cfdis !== undefined ? ` (${r.cfdis} CFDIs)` : ''}`);
|
|
}
|
|
|
|
const successCount = results.filter(r => r.status === 'success').length;
|
|
const failCount = results.filter(r => r.status === 'failed').length;
|
|
console.log(`\nSuccess: ${successCount}, Failed: ${failCount}, Skipped: ${results.length - successCount - failCount}`);
|
|
|
|
if (failCount > 0) {
|
|
process.exit(1);
|
|
}
|
|
}
|
|
|
|
main()
|
|
.catch((err) => {
|
|
console.error('Fatal error:', err);
|
|
process.exit(1);
|
|
})
|
|
.finally(async () => {
|
|
await adminPool.end();
|
|
await prisma.$disconnect();
|
|
});
|