20 KiB
Tenant Schema Migrations Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Implement a numbered SQL migration system for tenant databases so schema changes auto-apply to existing tenants via eager (deploy) and lazy (on-connect) strategies.
Architecture: SQL files in apps/api/src/migrations/tenant/ numbered NNN_description.sql. A schema_migrations table in each tenant DB tracks applied versions. TenantMigrationRunner reads files, diffs against the table, applies pending ones. Integrated into getPool() (lazy) and a CLI script (eager).
Tech Stack: Node.js, pg Pool, filesystem (fs/path), Prisma (central DB query for eager), tsx (CLI runner)
Task 1: Create migration SQL file from existing schema
Files:
- Create:
apps/api/src/migrations/tenant/001_initial_schema.sql
This file contains the exact SQL currently in createTables() and createIndexes() from apps/api/src/config/database.ts:212-439, prefixed with the schema_migrations table creation.
- Step 1: Create the migrations directory and 001 file
Create apps/api/src/migrations/tenant/001_initial_schema.sql with this content:
-- 001_initial_schema.sql
-- Initial tenant database schema (migrated from createTables + createIndexes)
CREATE EXTENSION IF NOT EXISTS pg_trgm;
-- =============================================
-- Tables
-- =============================================
CREATE TABLE IF NOT EXISTS rfcs (
id SERIAL PRIMARY KEY,
rfc VARCHAR(14) UNIQUE NOT NULL,
razon_social VARCHAR(255),
regimen_fiscal VARCHAR(3),
codigo_postal VARCHAR(5)
);
CREATE TABLE IF NOT EXISTS bancos (
id SERIAL PRIMARY KEY,
banco VARCHAR(100) NOT NULL,
terminacion_cuenta VARCHAR(4) NOT NULL,
creado_en TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS cfdis (
id SERIAL PRIMARY KEY,
year VARCHAR(4),
month VARCHAR(2),
type VARCHAR(10),
uuid VARCHAR(36) UNIQUE,
serie VARCHAR(50),
folio VARCHAR(50),
status VARCHAR(20),
fecha_emision TIMESTAMP,
rfc_emisor_id INTEGER REFERENCES rfcs(id),
rfc_emisor VARCHAR(13),
nombre_emisor VARCHAR(255),
rfc_receptor_id INTEGER REFERENCES rfcs(id),
rfc_receptor VARCHAR(13),
nombre_receptor VARCHAR(255),
subtotal NUMERIC(18,4),
subtotal_mxn NUMERIC(18,4),
descuento NUMERIC(18,4),
descuento_mxn NUMERIC(18,4),
total NUMERIC(18,4),
total_mxn NUMERIC(18,4),
saldo_insoluto TEXT,
moneda VARCHAR(3),
tipo_cambio NUMERIC(18,6),
tipo_comprobante VARCHAR(1),
metodo_pago VARCHAR(3),
forma_pago VARCHAR(2),
uso_cfdi VARCHAR(5),
pac VARCHAR(13),
fecha_cert_sat TIMESTAMP,
fecha_cancelacion TIMESTAMP,
uuid_relacionado TEXT,
isr_retencion NUMERIC(18,4),
isr_retencion_mxn NUMERIC(18,4),
iva_traslado NUMERIC(18,4),
iva_traslado_mxn NUMERIC(18,4),
iva_retencion NUMERIC(18,4),
iva_retencion_mxn NUMERIC(18,4),
ieps_traslado NUMERIC(18,4),
ieps_traslado_mxn NUMERIC(18,4),
ieps_retencion NUMERIC(18,4),
ieps_retencion_mxn NUMERIC(18,4),
impuestos_locales_trasladado NUMERIC(18,4),
impuestos_locales_trasladado_mxn NUMERIC(18,4),
impuestos_locales_retenidos NUMERIC(18,4),
impuestos_locales_retenidos_mxn NUMERIC(18,4),
monto_pago NUMERIC(18,4),
monto_pago_mxn NUMERIC(18,4),
fecha_pago_p TIMESTAMP,
num_parcialidad TEXT,
isr_retencion_pago NUMERIC(18,4),
isr_retencion_pago_mxn NUMERIC(18,4),
iva_traslado_pago NUMERIC(18,4),
iva_traslado_pago_mxn NUMERIC(18,4),
iva_retencion_pago NUMERIC(18,4),
iva_retencion_pago_mxn NUMERIC(18,4),
ieps_traslado_pago NUMERIC(18,4),
ieps_traslado_pago_mxn NUMERIC(18,4),
ieps_retencion_pago NUMERIC(18,4),
ieps_retencion_pago_mxn NUMERIC(18,4),
saldo_pendiente NUMERIC(18,4),
saldo_pendiente_mxn NUMERIC(18,4),
fecha_liquidacion TIMESTAMP,
fecha_pago DATE,
fecha_inicial_pago DATE,
fecha_final_pago DATE,
num_dias_pagados NUMERIC(10,2),
num_seguro_social VARCHAR(50),
puesto VARCHAR(255),
salario_base_cot_apor NUMERIC(18,4),
salario_base_cot_apor_mxn NUMERIC(18,4),
salario_diario_integrado NUMERIC(18,4),
salario_diario_integrado_mxn NUMERIC(18,4),
total_percepciones NUMERIC(18,4),
total_percepciones_mxn NUMERIC(18,4),
total_deducciones NUMERIC(18,4),
total_deducciones_mxn NUMERIC(18,4),
imp_retenidos_nomina NUMERIC(18,4),
imp_retenidos_nomina_mxn NUMERIC(18,4),
otras_deducciones_nomina NUMERIC(18,4),
otras_deducciones_nomina_mxn NUMERIC(18,4),
subsidio_causado NUMERIC(18,4),
subsidio_causado_mxn NUMERIC(18,4),
conciliado VARCHAR(50),
id_conciliacion INTEGER,
xml_url TEXT,
pdf_url TEXT,
xml_original TEXT,
last_sat_sync TIMESTAMP,
sat_sync_job_id UUID,
source VARCHAR(20) DEFAULT 'manual',
facturapi_id VARCHAR(50),
regimen_fiscal_emisor VARCHAR(3),
regimen_fiscal_receptor VARCHAR(3),
creado_en TIMESTAMP DEFAULT NOW(),
actualizado_en TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS cfdi_conceptos (
id SERIAL PRIMARY KEY,
cfdi_id INTEGER REFERENCES cfdis(id) ON DELETE CASCADE,
clave_prod_serv VARCHAR(10),
no_identificacion VARCHAR(100),
descripcion TEXT,
cantidad NUMERIC(18,4),
clave_unidad VARCHAR(10),
unidad VARCHAR(100),
valor_unitario NUMERIC(18,4),
valor_unitario_mxn NUMERIC(18,4),
importe NUMERIC(18,4),
importe_mxn NUMERIC(18,4),
descuento NUMERIC(18,4),
descuento_mxn NUMERIC(18,4),
isr_retencion NUMERIC(18,4),
isr_retencion_mxn NUMERIC(18,4),
iva_traslado NUMERIC(18,4),
iva_traslado_mxn NUMERIC(18,4),
iva_retencion NUMERIC(18,4),
iva_retencion_mxn NUMERIC(18,4),
ieps_traslado NUMERIC(18,4),
ieps_traslado_mxn NUMERIC(18,4),
ieps_retencion NUMERIC(18,4),
ieps_retencion_mxn NUMERIC(18,4),
impuestos_locales_trasladado NUMERIC(18,4),
impuestos_locales_trasladado_mxn NUMERIC(18,4),
impuestos_locales_retenidos NUMERIC(18,4),
impuestos_locales_retenidos_mxn NUMERIC(18,4),
total_percepciones NUMERIC(18,4),
total_percepciones_mxn NUMERIC(18,4),
total_deducciones NUMERIC(18,4),
total_deducciones_mxn NUMERIC(18,4),
imp_retenidos_nomina NUMERIC(18,4),
imp_retenidos_nomina_mxn NUMERIC(18,4),
otras_deducciones_nomina NUMERIC(18,4),
otras_deducciones_nomina_mxn NUMERIC(18,4),
subsidio_causado NUMERIC(18,4),
subsidio_causado_mxn NUMERIC(18,4),
creado_en TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS conciliaciones (
id SERIAL PRIMARY KEY,
anio VARCHAR(4) NOT NULL,
mes VARCHAR(2) NOT NULL,
id_cfdi INTEGER NOT NULL UNIQUE REFERENCES cfdis(id),
fecha_de_pago DATE NOT NULL,
id_banco INTEGER NOT NULL REFERENCES bancos(id),
creado_en TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS alertas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tipo VARCHAR(50) NOT NULL,
titulo VARCHAR(200) NOT NULL,
mensaje TEXT,
prioridad VARCHAR(20) DEFAULT 'media',
fecha_vencimiento TIMESTAMP,
leida BOOLEAN DEFAULT FALSE,
resuelta BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS recordatorios (
id SERIAL PRIMARY KEY,
titulo VARCHAR(200) NOT NULL,
descripcion TEXT,
fecha_limite DATE NOT NULL,
notas TEXT,
completado BOOLEAN DEFAULT FALSE,
privado BOOLEAN DEFAULT FALSE,
creado_por UUID NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- =============================================
-- Indexes
-- =============================================
CREATE INDEX IF NOT EXISTS idx_cfdis_fecha_emision ON cfdis(fecha_emision DESC);
CREATE INDEX IF NOT EXISTS idx_cfdis_type ON cfdis(type);
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_emisor ON cfdis(rfc_emisor);
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_receptor ON cfdis(rfc_receptor);
CREATE INDEX IF NOT EXISTS idx_cfdis_status ON cfdis(status);
CREATE INDEX IF NOT EXISTS idx_cfdis_year_month ON cfdis(year, month);
CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_emisor_trgm ON cfdis USING gin(nombre_emisor gin_trgm_ops);
CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_receptor_trgm ON cfdis USING gin(nombre_receptor gin_trgm_ops);
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_emisor_id ON cfdis(rfc_emisor_id);
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_receptor_id ON cfdis(rfc_receptor_id);
CREATE INDEX IF NOT EXISTS idx_cfdi_conceptos_cfdi_id ON cfdi_conceptos(cfdi_id);
CREATE INDEX IF NOT EXISTS idx_cfdi_conceptos_clave ON cfdi_conceptos(clave_prod_serv);
CREATE INDEX IF NOT EXISTS idx_conciliaciones_anio_mes ON conciliaciones(anio, mes);
CREATE INDEX IF NOT EXISTS idx_conciliaciones_id_cfdi ON conciliaciones(id_cfdi);
CREATE INDEX IF NOT EXISTS idx_cfdis_id_conciliacion ON cfdis(id_conciliacion);
-- Deferred FK for id_conciliacion
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cfdis_id_conciliacion_fkey') THEN
ALTER TABLE cfdis ADD CONSTRAINT cfdis_id_conciliacion_fkey FOREIGN KEY (id_conciliacion) REFERENCES conciliaciones(id);
END IF;
END $$;
- Step 2: Commit
git add apps/api/src/migrations/tenant/001_initial_schema.sql
git commit -m "feat: add 001_initial_schema.sql tenant migration file"
Task 2: Create TenantMigrationRunner
Files:
-
Create:
apps/api/src/config/tenant-migrations.ts -
Step 1: Create tenant-migrations.ts
Create apps/api/src/config/tenant-migrations.ts:
import { Pool } from 'pg';
import { readdir, readFile } from 'fs/promises';
import { join } from 'path';
import { fileURLToPath } from 'url';
import { dirname } from 'path';
import { prisma } from './database.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const MIGRATIONS_DIR = join(__dirname, '..', 'migrations', 'tenant');
interface MigrationFile {
version: number;
name: string;
sql: string;
}
/**
* Ensure the schema_migrations table exists in the tenant DB.
*/
async function ensureMigrationsTable(pool: Pool): Promise<void> {
await pool.query(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name VARCHAR(255) NOT NULL,
applied_at TIMESTAMP DEFAULT NOW()
);
`);
}
/**
* Read all .sql files from the migrations directory, sorted by version.
*/
export async function getMigrationFiles(): Promise<MigrationFile[]> {
let files: string[];
try {
files = await readdir(MIGRATIONS_DIR);
} catch {
console.warn('[Migrations] Migrations directory not found:', MIGRATIONS_DIR);
return [];
}
const sqlFiles = files
.filter(f => f.endsWith('.sql'))
.sort();
const migrations: MigrationFile[] = [];
for (const file of sqlFiles) {
const match = file.match(/^(\d{3})_(.+)\.sql$/);
if (!match) {
console.warn(`[Migrations] Skipping invalid filename: ${file}`);
continue;
}
const version = parseInt(match[1], 10);
const sql = await readFile(join(MIGRATIONS_DIR, file), 'utf-8');
migrations.push({ version, name: file, sql });
}
return migrations;
}
/**
* Get versions already applied in this tenant DB.
*/
async function getAppliedVersions(pool: Pool): Promise<Set<number>> {
const result = await pool.query('SELECT version FROM schema_migrations ORDER BY version');
return new Set(result.rows.map((r: { version: number }) => r.version));
}
/**
* Apply pending migrations to a single tenant database.
* Returns the number of migrations applied.
*/
export async function migrate(pool: Pool, label?: string): Promise<number> {
await ensureMigrationsTable(pool);
const allMigrations = await getMigrationFiles();
if (allMigrations.length === 0) return 0;
const applied = await getAppliedVersions(pool);
const pending = allMigrations.filter(m => !applied.has(m.version));
if (pending.length === 0) return 0;
const tag = label ? ` (${label})` : '';
console.log(`[Migrations]${tag} Applying ${pending.length} pending migration(s)...`);
let count = 0;
for (const migration of pending) {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(migration.sql);
await client.query(
'INSERT INTO schema_migrations (version, name) VALUES ($1, $2)',
[migration.version, migration.name]
);
await client.query('COMMIT');
console.log(`[Migrations]${tag} Applied: ${migration.name}`);
count++;
} catch (error) {
await client.query('ROLLBACK');
console.error(`[Migrations]${tag} FAILED: ${migration.name}`, error);
throw error;
} finally {
client.release();
}
}
return count;
}
/**
* Eager migration: apply pending migrations to ALL active tenant databases.
* Does not stop on individual tenant failure — logs and continues.
*/
export async function migrateAll(): Promise<{ success: number; failed: number; skipped: number }> {
const tenants = await prisma.tenant.findMany({
where: { active: true },
select: { id: true, rfc: true, databaseName: true },
});
console.log(`[Migrations] Starting eager migration for ${tenants.length} tenant(s)...`);
let success = 0;
let failed = 0;
let skipped = 0;
for (const tenant of tenants) {
const pool = new Pool({
connectionString: process.env.DATABASE_URL?.replace(/\/[^/]+$/, `/${tenant.databaseName}`),
max: 1,
});
try {
const applied = await migrate(pool, tenant.rfc);
if (applied > 0) {
success++;
} else {
skipped++;
}
} catch (error) {
console.error(`[Migrations] Failed for tenant ${tenant.rfc} (${tenant.databaseName}):`, error);
failed++;
} finally {
await pool.end();
}
}
console.log(`[Migrations] Eager migration complete: ${success} migrated, ${skipped} up-to-date, ${failed} failed`);
return { success, failed, skipped };
}
- Step 2: Commit
git add apps/api/src/config/tenant-migrations.ts
git commit -m "feat: add TenantMigrationRunner with migrate() and migrateAll()"
Task 3: Integrate lazy migration into TenantConnectionManager
Files:
- Modify:
apps/api/src/config/database.ts
Changes:
- Add
migratedPools: Set<string>to the class - Import
migratefromtenant-migrations.ts - Make
getPool()async — runmigrate(pool)on first access per tenant - Replace
createTables()+createIndexes()inprovisionDatabase()withmigrate(pool) - Remove
createTables()andcreateIndexes()methods - Clear
migratedPoolsentry ininvalidatePool()
- Step 1: Update database.ts imports
At the top of apps/api/src/config/database.ts, add the import:
import { migrate } from './tenant-migrations.js';
- Step 2: Add migratedPools Set to the class
In the TenantConnectionManager class, after private dbConfig:
private migratedPools: Set<string> = new Set();
- Step 3: Make getPool() async with lazy migration
Replace the current getPool() method (lines 53-79) with:
/**
* Get or create a connection pool for a tenant's database.
* Runs pending migrations on first access per session.
*/
async getPool(tenantId: string, databaseName: string): Promise<Pool> {
const entry = this.pools.get(tenantId);
let pool: Pool;
if (entry) {
entry.lastAccess = new Date();
pool = entry.pool;
} else {
const poolConfig: PoolConfig = {
host: this.dbConfig.host,
port: this.dbConfig.port,
user: this.dbConfig.user,
password: this.dbConfig.password,
database: databaseName,
max: 3,
idleTimeoutMillis: 300_000,
connectionTimeoutMillis: 10_000,
};
pool = new Pool(poolConfig);
pool.on('error', (err) => {
console.error(`[TenantDB] Pool error for tenant ${tenantId} (${databaseName}):`, err.message);
});
this.pools.set(tenantId, { pool, lastAccess: new Date() });
}
// Lazy migration: run once per tenant per process lifetime
if (!this.migratedPools.has(tenantId)) {
try {
await migrate(pool, databaseName);
this.migratedPools.add(tenantId);
} catch (error) {
console.error(`[TenantDB] Migration failed for ${tenantId} (${databaseName}):`, error);
// Don't block access — tenant can still work with current schema
this.migratedPools.add(tenantId);
}
}
return pool;
}
- Step 4: Update provisionDatabase() to use migrate()
Replace the try block inside provisionDatabase() that calls createTables and createIndexes (the inner try/finally around line 111-116) with:
try {
await migrate(tenantPool, databaseName);
} finally {
await tenantPool.end();
}
- Step 5: Update invalidatePool() to clear migration cache
Add this.migratedPools.delete(tenantId); to invalidatePool():
invalidatePool(tenantId: string): void {
const entry = this.pools.get(tenantId);
if (entry) {
entry.pool.end().catch(() => {});
this.pools.delete(tenantId);
}
this.migratedPools.delete(tenantId);
}
- Step 6: Remove createTables() and createIndexes() methods
Delete the private async createTables(pool: Pool) method (lines 212-406) and the private async createIndexes(pool: Pool) method (lines 408-439) entirely. Their content is now in 001_initial_schema.sql.
- Step 7: Update all callers of getPool() to use await
Since getPool() is now async, every call site must await it. The callers are:
In apps/api/src/middlewares/tenant.middleware.ts, change lines 75 and 85:
// Line 75 — impersonation path
req.tenantPool = await tenantDb.getPool(tenantId, viewedTenant.databaseName);
// Line 85 — normal path
req.tenantPool = await tenantDb.getPool(tenantId, databaseName);
- Step 8: Commit
git add apps/api/src/config/database.ts apps/api/src/middlewares/tenant.middleware.ts
git commit -m "feat: integrate lazy tenant migrations into getPool()"
Task 4: Create eager migration CLI script
Files:
-
Create:
apps/api/scripts/migrate-tenants.ts -
Modify:
apps/api/package.json -
Modify:
turbo.json -
Step 1: Create the CLI script
Create apps/api/scripts/migrate-tenants.ts:
/**
* Eager tenant migration script.
* Run: pnpm --filter @horux/api db:migrate-tenants
* Or: pnpm db:migrate-tenants (from monorepo root via Turborepo)
*
* Applies pending SQL migrations to all active tenant databases.
*/
import { migrateAll } from '../src/config/tenant-migrations.js';
async function main() {
console.log('=== Tenant Schema Migration (Eager) ===\n');
const start = Date.now();
const result = await migrateAll();
const elapsed = ((Date.now() - start) / 1000).toFixed(1);
console.log(`\n=== Done in ${elapsed}s ===`);
console.log(` Migrated: ${result.success}`);
console.log(` Up-to-date: ${result.skipped}`);
console.log(` Failed: ${result.failed}`);
if (result.failed > 0) {
console.error('\n⚠ Some tenants failed migration. Check logs above.');
process.exit(1);
}
process.exit(0);
}
main().catch((err) => {
console.error('Fatal error:', err);
process.exit(1);
});
- Step 2: Add script to apps/api/package.json
Add to the "scripts" section of apps/api/package.json:
"db:migrate-tenants": "tsx scripts/migrate-tenants.ts"
- Step 3: Add task to turbo.json
Add to the "tasks" section of turbo.json:
"db:migrate-tenants": {
"cache": false
}
- Step 4: Commit
git add apps/api/scripts/migrate-tenants.ts apps/api/package.json turbo.json
git commit -m "feat: add eager tenant migration CLI script (pnpm db:migrate-tenants)"
Task 5: Update CLAUDE.md and README.md
Files:
-
Modify:
CLAUDE.md -
Modify:
README.md -
Step 1: Update CLAUDE.md
In the "Problemas conocidos / pendientes" section, replace item 1:
1. ~~**Schema drift multi-tenant:**~~ Resuelto. Migraciones SQL numeradas en `apps/api/src/migrations/tenant/`. Se aplican eager (`pnpm db:migrate-tenants`) en deploy y lazy (auto en `getPool()`) como safety net. Para agregar un cambio de schema: crear `NNN_description.sql` en el directorio de migraciones.
- Step 2: Update README.md deploy section
In README.md, update the deploy instructions to include the new migration step. The deploy flow should reference:
git pull
pnpm install
pnpm build
pnpm db:migrate-tenants # Apply schema changes to all tenant DBs
pm2 restart all
- Step 3: Commit
git add CLAUDE.md README.md
git commit -m "docs: update CLAUDE.md and README.md with tenant migration system"