# Tenant Schema Migrations Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Implement a numbered SQL migration system for tenant databases so schema changes auto-apply to existing tenants via eager (deploy) and lazy (on-connect) strategies. **Architecture:** SQL files in `apps/api/src/migrations/tenant/` numbered `NNN_description.sql`. A `schema_migrations` table in each tenant DB tracks applied versions. `TenantMigrationRunner` reads files, diffs against the table, applies pending ones. Integrated into `getPool()` (lazy) and a CLI script (eager). **Tech Stack:** Node.js, pg Pool, filesystem (fs/path), Prisma (central DB query for eager), tsx (CLI runner) --- ### Task 1: Create migration SQL file from existing schema **Files:** - Create: `apps/api/src/migrations/tenant/001_initial_schema.sql` This file contains the exact SQL currently in `createTables()` and `createIndexes()` from `apps/api/src/config/database.ts:212-439`, prefixed with the `schema_migrations` table creation. - [ ] **Step 1: Create the migrations directory and 001 file** Create `apps/api/src/migrations/tenant/001_initial_schema.sql` with this content: ```sql -- 001_initial_schema.sql -- Initial tenant database schema (migrated from createTables + createIndexes) CREATE EXTENSION IF NOT EXISTS pg_trgm; -- ============================================= -- Tables -- ============================================= CREATE TABLE IF NOT EXISTS rfcs ( id SERIAL PRIMARY KEY, rfc VARCHAR(14) UNIQUE NOT NULL, razon_social VARCHAR(255), regimen_fiscal VARCHAR(3), codigo_postal VARCHAR(5) ); CREATE TABLE IF NOT EXISTS bancos ( id SERIAL PRIMARY KEY, banco VARCHAR(100) NOT NULL, terminacion_cuenta VARCHAR(4) NOT NULL, creado_en TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS cfdis ( id SERIAL PRIMARY KEY, year VARCHAR(4), month VARCHAR(2), type VARCHAR(10), uuid VARCHAR(36) UNIQUE, serie VARCHAR(50), folio VARCHAR(50), status VARCHAR(20), fecha_emision TIMESTAMP, rfc_emisor_id INTEGER REFERENCES rfcs(id), rfc_emisor VARCHAR(13), nombre_emisor VARCHAR(255), rfc_receptor_id INTEGER REFERENCES rfcs(id), rfc_receptor VARCHAR(13), nombre_receptor VARCHAR(255), subtotal NUMERIC(18,4), subtotal_mxn NUMERIC(18,4), descuento NUMERIC(18,4), descuento_mxn NUMERIC(18,4), total NUMERIC(18,4), total_mxn NUMERIC(18,4), saldo_insoluto TEXT, moneda VARCHAR(3), tipo_cambio NUMERIC(18,6), tipo_comprobante VARCHAR(1), metodo_pago VARCHAR(3), forma_pago VARCHAR(2), uso_cfdi VARCHAR(5), pac VARCHAR(13), fecha_cert_sat TIMESTAMP, fecha_cancelacion TIMESTAMP, uuid_relacionado TEXT, isr_retencion NUMERIC(18,4), isr_retencion_mxn NUMERIC(18,4), iva_traslado NUMERIC(18,4), iva_traslado_mxn NUMERIC(18,4), iva_retencion NUMERIC(18,4), iva_retencion_mxn NUMERIC(18,4), ieps_traslado NUMERIC(18,4), ieps_traslado_mxn NUMERIC(18,4), ieps_retencion NUMERIC(18,4), ieps_retencion_mxn NUMERIC(18,4), impuestos_locales_trasladado NUMERIC(18,4), impuestos_locales_trasladado_mxn NUMERIC(18,4), impuestos_locales_retenidos NUMERIC(18,4), impuestos_locales_retenidos_mxn NUMERIC(18,4), monto_pago NUMERIC(18,4), monto_pago_mxn NUMERIC(18,4), fecha_pago_p TIMESTAMP, num_parcialidad TEXT, isr_retencion_pago NUMERIC(18,4), isr_retencion_pago_mxn NUMERIC(18,4), iva_traslado_pago NUMERIC(18,4), iva_traslado_pago_mxn NUMERIC(18,4), iva_retencion_pago NUMERIC(18,4), iva_retencion_pago_mxn NUMERIC(18,4), ieps_traslado_pago NUMERIC(18,4), ieps_traslado_pago_mxn NUMERIC(18,4), ieps_retencion_pago NUMERIC(18,4), ieps_retencion_pago_mxn NUMERIC(18,4), saldo_pendiente NUMERIC(18,4), saldo_pendiente_mxn NUMERIC(18,4), fecha_liquidacion TIMESTAMP, fecha_pago DATE, fecha_inicial_pago DATE, fecha_final_pago DATE, num_dias_pagados NUMERIC(10,2), num_seguro_social VARCHAR(50), puesto VARCHAR(255), salario_base_cot_apor NUMERIC(18,4), salario_base_cot_apor_mxn NUMERIC(18,4), salario_diario_integrado NUMERIC(18,4), salario_diario_integrado_mxn NUMERIC(18,4), total_percepciones NUMERIC(18,4), total_percepciones_mxn NUMERIC(18,4), total_deducciones NUMERIC(18,4), total_deducciones_mxn NUMERIC(18,4), imp_retenidos_nomina NUMERIC(18,4), imp_retenidos_nomina_mxn NUMERIC(18,4), otras_deducciones_nomina NUMERIC(18,4), otras_deducciones_nomina_mxn NUMERIC(18,4), subsidio_causado NUMERIC(18,4), subsidio_causado_mxn NUMERIC(18,4), conciliado VARCHAR(50), id_conciliacion INTEGER, xml_url TEXT, pdf_url TEXT, xml_original TEXT, last_sat_sync TIMESTAMP, sat_sync_job_id UUID, source VARCHAR(20) DEFAULT 'manual', facturapi_id VARCHAR(50), regimen_fiscal_emisor VARCHAR(3), regimen_fiscal_receptor VARCHAR(3), creado_en TIMESTAMP DEFAULT NOW(), actualizado_en TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS cfdi_conceptos ( id SERIAL PRIMARY KEY, cfdi_id INTEGER REFERENCES cfdis(id) ON DELETE CASCADE, clave_prod_serv VARCHAR(10), no_identificacion VARCHAR(100), descripcion TEXT, cantidad NUMERIC(18,4), clave_unidad VARCHAR(10), unidad VARCHAR(100), valor_unitario NUMERIC(18,4), valor_unitario_mxn NUMERIC(18,4), importe NUMERIC(18,4), importe_mxn NUMERIC(18,4), descuento NUMERIC(18,4), descuento_mxn NUMERIC(18,4), isr_retencion NUMERIC(18,4), isr_retencion_mxn NUMERIC(18,4), iva_traslado NUMERIC(18,4), iva_traslado_mxn NUMERIC(18,4), iva_retencion NUMERIC(18,4), iva_retencion_mxn NUMERIC(18,4), ieps_traslado NUMERIC(18,4), ieps_traslado_mxn NUMERIC(18,4), ieps_retencion NUMERIC(18,4), ieps_retencion_mxn NUMERIC(18,4), impuestos_locales_trasladado NUMERIC(18,4), impuestos_locales_trasladado_mxn NUMERIC(18,4), impuestos_locales_retenidos NUMERIC(18,4), impuestos_locales_retenidos_mxn NUMERIC(18,4), total_percepciones NUMERIC(18,4), total_percepciones_mxn NUMERIC(18,4), total_deducciones NUMERIC(18,4), total_deducciones_mxn NUMERIC(18,4), imp_retenidos_nomina NUMERIC(18,4), imp_retenidos_nomina_mxn NUMERIC(18,4), otras_deducciones_nomina NUMERIC(18,4), otras_deducciones_nomina_mxn NUMERIC(18,4), subsidio_causado NUMERIC(18,4), subsidio_causado_mxn NUMERIC(18,4), creado_en TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS conciliaciones ( id SERIAL PRIMARY KEY, anio VARCHAR(4) NOT NULL, mes VARCHAR(2) NOT NULL, id_cfdi INTEGER NOT NULL UNIQUE REFERENCES cfdis(id), fecha_de_pago DATE NOT NULL, id_banco INTEGER NOT NULL REFERENCES bancos(id), creado_en TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS alertas ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tipo VARCHAR(50) NOT NULL, titulo VARCHAR(200) NOT NULL, mensaje TEXT, prioridad VARCHAR(20) DEFAULT 'media', fecha_vencimiento TIMESTAMP, leida BOOLEAN DEFAULT FALSE, resuelta BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS recordatorios ( id SERIAL PRIMARY KEY, titulo VARCHAR(200) NOT NULL, descripcion TEXT, fecha_limite DATE NOT NULL, notas TEXT, completado BOOLEAN DEFAULT FALSE, privado BOOLEAN DEFAULT FALSE, creado_por UUID NOT NULL, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); -- ============================================= -- Indexes -- ============================================= CREATE INDEX IF NOT EXISTS idx_cfdis_fecha_emision ON cfdis(fecha_emision DESC); CREATE INDEX IF NOT EXISTS idx_cfdis_type ON cfdis(type); CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_emisor ON cfdis(rfc_emisor); CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_receptor ON cfdis(rfc_receptor); CREATE INDEX IF NOT EXISTS idx_cfdis_status ON cfdis(status); CREATE INDEX IF NOT EXISTS idx_cfdis_year_month ON cfdis(year, month); CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_emisor_trgm ON cfdis USING gin(nombre_emisor gin_trgm_ops); CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_receptor_trgm ON cfdis USING gin(nombre_receptor gin_trgm_ops); CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_emisor_id ON cfdis(rfc_emisor_id); CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_receptor_id ON cfdis(rfc_receptor_id); CREATE INDEX IF NOT EXISTS idx_cfdi_conceptos_cfdi_id ON cfdi_conceptos(cfdi_id); CREATE INDEX IF NOT EXISTS idx_cfdi_conceptos_clave ON cfdi_conceptos(clave_prod_serv); CREATE INDEX IF NOT EXISTS idx_conciliaciones_anio_mes ON conciliaciones(anio, mes); CREATE INDEX IF NOT EXISTS idx_conciliaciones_id_cfdi ON conciliaciones(id_cfdi); CREATE INDEX IF NOT EXISTS idx_cfdis_id_conciliacion ON cfdis(id_conciliacion); -- Deferred FK for id_conciliacion DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cfdis_id_conciliacion_fkey') THEN ALTER TABLE cfdis ADD CONSTRAINT cfdis_id_conciliacion_fkey FOREIGN KEY (id_conciliacion) REFERENCES conciliaciones(id); END IF; END $$; ``` - [ ] **Step 2: Commit** ```bash git add apps/api/src/migrations/tenant/001_initial_schema.sql git commit -m "feat: add 001_initial_schema.sql tenant migration file" ``` --- ### Task 2: Create TenantMigrationRunner **Files:** - Create: `apps/api/src/config/tenant-migrations.ts` - [ ] **Step 1: Create tenant-migrations.ts** Create `apps/api/src/config/tenant-migrations.ts`: ```typescript import { Pool } from 'pg'; import { readdir, readFile } from 'fs/promises'; import { join } from 'path'; import { fileURLToPath } from 'url'; import { dirname } from 'path'; import { prisma } from './database.js'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); const MIGRATIONS_DIR = join(__dirname, '..', 'migrations', 'tenant'); interface MigrationFile { version: number; name: string; sql: string; } /** * Ensure the schema_migrations table exists in the tenant DB. */ async function ensureMigrationsTable(pool: Pool): Promise { await pool.query(` CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, name VARCHAR(255) NOT NULL, applied_at TIMESTAMP DEFAULT NOW() ); `); } /** * Read all .sql files from the migrations directory, sorted by version. */ export async function getMigrationFiles(): Promise { let files: string[]; try { files = await readdir(MIGRATIONS_DIR); } catch { console.warn('[Migrations] Migrations directory not found:', MIGRATIONS_DIR); return []; } const sqlFiles = files .filter(f => f.endsWith('.sql')) .sort(); const migrations: MigrationFile[] = []; for (const file of sqlFiles) { const match = file.match(/^(\d{3})_(.+)\.sql$/); if (!match) { console.warn(`[Migrations] Skipping invalid filename: ${file}`); continue; } const version = parseInt(match[1], 10); const sql = await readFile(join(MIGRATIONS_DIR, file), 'utf-8'); migrations.push({ version, name: file, sql }); } return migrations; } /** * Get versions already applied in this tenant DB. */ async function getAppliedVersions(pool: Pool): Promise> { const result = await pool.query('SELECT version FROM schema_migrations ORDER BY version'); return new Set(result.rows.map((r: { version: number }) => r.version)); } /** * Apply pending migrations to a single tenant database. * Returns the number of migrations applied. */ export async function migrate(pool: Pool, label?: string): Promise { await ensureMigrationsTable(pool); const allMigrations = await getMigrationFiles(); if (allMigrations.length === 0) return 0; const applied = await getAppliedVersions(pool); const pending = allMigrations.filter(m => !applied.has(m.version)); if (pending.length === 0) return 0; const tag = label ? ` (${label})` : ''; console.log(`[Migrations]${tag} Applying ${pending.length} pending migration(s)...`); let count = 0; for (const migration of pending) { const client = await pool.connect(); try { await client.query('BEGIN'); await client.query(migration.sql); await client.query( 'INSERT INTO schema_migrations (version, name) VALUES ($1, $2)', [migration.version, migration.name] ); await client.query('COMMIT'); console.log(`[Migrations]${tag} Applied: ${migration.name}`); count++; } catch (error) { await client.query('ROLLBACK'); console.error(`[Migrations]${tag} FAILED: ${migration.name}`, error); throw error; } finally { client.release(); } } return count; } /** * Eager migration: apply pending migrations to ALL active tenant databases. * Does not stop on individual tenant failure — logs and continues. */ export async function migrateAll(): Promise<{ success: number; failed: number; skipped: number }> { const tenants = await prisma.tenant.findMany({ where: { active: true }, select: { id: true, rfc: true, databaseName: true }, }); console.log(`[Migrations] Starting eager migration for ${tenants.length} tenant(s)...`); let success = 0; let failed = 0; let skipped = 0; for (const tenant of tenants) { const pool = new Pool({ connectionString: process.env.DATABASE_URL?.replace(/\/[^/]+$/, `/${tenant.databaseName}`), max: 1, }); try { const applied = await migrate(pool, tenant.rfc); if (applied > 0) { success++; } else { skipped++; } } catch (error) { console.error(`[Migrations] Failed for tenant ${tenant.rfc} (${tenant.databaseName}):`, error); failed++; } finally { await pool.end(); } } console.log(`[Migrations] Eager migration complete: ${success} migrated, ${skipped} up-to-date, ${failed} failed`); return { success, failed, skipped }; } ``` - [ ] **Step 2: Commit** ```bash git add apps/api/src/config/tenant-migrations.ts git commit -m "feat: add TenantMigrationRunner with migrate() and migrateAll()" ``` --- ### Task 3: Integrate lazy migration into TenantConnectionManager **Files:** - Modify: `apps/api/src/config/database.ts` Changes: 1. Add `migratedPools: Set` to the class 2. Import `migrate` from `tenant-migrations.ts` 3. Make `getPool()` async — run `migrate(pool)` on first access per tenant 4. Replace `createTables()` + `createIndexes()` in `provisionDatabase()` with `migrate(pool)` 5. Remove `createTables()` and `createIndexes()` methods 6. Clear `migratedPools` entry in `invalidatePool()` - [ ] **Step 1: Update database.ts imports** At the top of `apps/api/src/config/database.ts`, add the import: ```typescript import { migrate } from './tenant-migrations.js'; ``` - [ ] **Step 2: Add migratedPools Set to the class** In the `TenantConnectionManager` class, after `private dbConfig`: ```typescript private migratedPools: Set = new Set(); ``` - [ ] **Step 3: Make getPool() async with lazy migration** Replace the current `getPool()` method (lines 53-79) with: ```typescript /** * Get or create a connection pool for a tenant's database. * Runs pending migrations on first access per session. */ async getPool(tenantId: string, databaseName: string): Promise { const entry = this.pools.get(tenantId); let pool: Pool; if (entry) { entry.lastAccess = new Date(); pool = entry.pool; } else { const poolConfig: PoolConfig = { host: this.dbConfig.host, port: this.dbConfig.port, user: this.dbConfig.user, password: this.dbConfig.password, database: databaseName, max: 3, idleTimeoutMillis: 300_000, connectionTimeoutMillis: 10_000, }; pool = new Pool(poolConfig); pool.on('error', (err) => { console.error(`[TenantDB] Pool error for tenant ${tenantId} (${databaseName}):`, err.message); }); this.pools.set(tenantId, { pool, lastAccess: new Date() }); } // Lazy migration: run once per tenant per process lifetime if (!this.migratedPools.has(tenantId)) { try { await migrate(pool, databaseName); this.migratedPools.add(tenantId); } catch (error) { console.error(`[TenantDB] Migration failed for ${tenantId} (${databaseName}):`, error); // Don't block access — tenant can still work with current schema this.migratedPools.add(tenantId); } } return pool; } ``` - [ ] **Step 4: Update provisionDatabase() to use migrate()** Replace the `try` block inside `provisionDatabase()` that calls `createTables` and `createIndexes` (the inner try/finally around line 111-116) with: ```typescript try { await migrate(tenantPool, databaseName); } finally { await tenantPool.end(); } ``` - [ ] **Step 5: Update invalidatePool() to clear migration cache** Add `this.migratedPools.delete(tenantId);` to `invalidatePool()`: ```typescript invalidatePool(tenantId: string): void { const entry = this.pools.get(tenantId); if (entry) { entry.pool.end().catch(() => {}); this.pools.delete(tenantId); } this.migratedPools.delete(tenantId); } ``` - [ ] **Step 6: Remove createTables() and createIndexes() methods** Delete the `private async createTables(pool: Pool)` method (lines 212-406) and the `private async createIndexes(pool: Pool)` method (lines 408-439) entirely. Their content is now in `001_initial_schema.sql`. - [ ] **Step 7: Update all callers of getPool() to use await** Since `getPool()` is now async, every call site must `await` it. The callers are: In `apps/api/src/middlewares/tenant.middleware.ts`, change lines 75 and 85: ```typescript // Line 75 — impersonation path req.tenantPool = await tenantDb.getPool(tenantId, viewedTenant.databaseName); // Line 85 — normal path req.tenantPool = await tenantDb.getPool(tenantId, databaseName); ``` - [ ] **Step 8: Commit** ```bash git add apps/api/src/config/database.ts apps/api/src/middlewares/tenant.middleware.ts git commit -m "feat: integrate lazy tenant migrations into getPool()" ``` --- ### Task 4: Create eager migration CLI script **Files:** - Create: `apps/api/scripts/migrate-tenants.ts` - Modify: `apps/api/package.json` - Modify: `turbo.json` - [ ] **Step 1: Create the CLI script** Create `apps/api/scripts/migrate-tenants.ts`: ```typescript /** * Eager tenant migration script. * Run: pnpm --filter @horux/api db:migrate-tenants * Or: pnpm db:migrate-tenants (from monorepo root via Turborepo) * * Applies pending SQL migrations to all active tenant databases. */ import { migrateAll } from '../src/config/tenant-migrations.js'; async function main() { console.log('=== Tenant Schema Migration (Eager) ===\n'); const start = Date.now(); const result = await migrateAll(); const elapsed = ((Date.now() - start) / 1000).toFixed(1); console.log(`\n=== Done in ${elapsed}s ===`); console.log(` Migrated: ${result.success}`); console.log(` Up-to-date: ${result.skipped}`); console.log(` Failed: ${result.failed}`); if (result.failed > 0) { console.error('\n⚠ Some tenants failed migration. Check logs above.'); process.exit(1); } process.exit(0); } main().catch((err) => { console.error('Fatal error:', err); process.exit(1); }); ``` - [ ] **Step 2: Add script to apps/api/package.json** Add to the `"scripts"` section of `apps/api/package.json`: ```json "db:migrate-tenants": "tsx scripts/migrate-tenants.ts" ``` - [ ] **Step 3: Add task to turbo.json** Add to the `"tasks"` section of `turbo.json`: ```json "db:migrate-tenants": { "cache": false } ``` - [ ] **Step 4: Commit** ```bash git add apps/api/scripts/migrate-tenants.ts apps/api/package.json turbo.json git commit -m "feat: add eager tenant migration CLI script (pnpm db:migrate-tenants)" ``` --- ### Task 5: Update CLAUDE.md and README.md **Files:** - Modify: `CLAUDE.md` - Modify: `README.md` - [ ] **Step 1: Update CLAUDE.md** In the "Problemas conocidos / pendientes" section, replace item 1: ```markdown 1. ~~**Schema drift multi-tenant:**~~ Resuelto. Migraciones SQL numeradas en `apps/api/src/migrations/tenant/`. Se aplican eager (`pnpm db:migrate-tenants`) en deploy y lazy (auto en `getPool()`) como safety net. Para agregar un cambio de schema: crear `NNN_description.sql` en el directorio de migraciones. ``` - [ ] **Step 2: Update README.md deploy section** In README.md, update the deploy instructions to include the new migration step. The deploy flow should reference: ```bash git pull pnpm install pnpm build pnpm db:migrate-tenants # Apply schema changes to all tenant DBs pm2 restart all ``` - [ ] **Step 3: Commit** ```bash git add CLAUDE.md README.md git commit -m "docs: update CLAUDE.md and README.md with tenant migration system" ```