feat: add TenantConnectionManager with dynamic pool management
- Adds pg dependency for direct PostgreSQL connections to tenant DBs - TenantConnectionManager: singleton managing Map<tenantId, Pool> - provisionDatabase: creates new DB with tables and indexes - deprovisionDatabase: soft-deletes by renaming DB - Automatic idle pool cleanup every 60s (5min threshold) - Max 3 connections per pool (6/tenant with 2 PM2 workers) - Graceful shutdown support for all pools Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -30,6 +30,7 @@
|
||||
"jsonwebtoken": "^9.0.2",
|
||||
"node-cron": "^4.2.1",
|
||||
"node-forge": "^1.3.3",
|
||||
"pg": "^8.18.0",
|
||||
"zod": "^3.23.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
@@ -41,6 +42,7 @@
|
||||
"@types/node": "^22.0.0",
|
||||
"@types/node-cron": "^3.0.11",
|
||||
"@types/node-forge": "^1.3.14",
|
||||
"@types/pg": "^8.18.0",
|
||||
"prisma": "^5.22.0",
|
||||
"tsx": "^4.19.0",
|
||||
"typescript": "^5.3.0"
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
import { Pool, type PoolConfig } from 'pg';
|
||||
import { env } from './env.js';
|
||||
|
||||
// ===========================================
|
||||
// Prisma Client (central database: horux360)
|
||||
// ===========================================
|
||||
|
||||
declare global {
|
||||
var prisma: PrismaClient | undefined;
|
||||
@@ -11,3 +17,303 @@ export const prisma = globalThis.prisma || new PrismaClient({
|
||||
if (process.env.NODE_ENV !== 'production') {
|
||||
globalThis.prisma = prisma;
|
||||
}
|
||||
|
||||
// ===========================================
|
||||
// TenantConnectionManager (per-tenant DBs)
|
||||
// ===========================================
|
||||
|
||||
interface PoolEntry {
|
||||
pool: Pool;
|
||||
lastAccess: Date;
|
||||
}
|
||||
|
||||
function parseDatabaseUrl(url: string) {
|
||||
const parsed = new URL(url);
|
||||
return {
|
||||
host: parsed.hostname,
|
||||
port: parseInt(parsed.port || '5432'),
|
||||
user: decodeURIComponent(parsed.username),
|
||||
password: decodeURIComponent(parsed.password),
|
||||
};
|
||||
}
|
||||
|
||||
class TenantConnectionManager {
|
||||
private pools: Map<string, PoolEntry> = new Map();
|
||||
private cleanupInterval: NodeJS.Timeout | null = null;
|
||||
private dbConfig: { host: string; port: number; user: string; password: string };
|
||||
|
||||
constructor() {
|
||||
this.dbConfig = parseDatabaseUrl(env.DATABASE_URL);
|
||||
this.cleanupInterval = setInterval(() => this.cleanupIdlePools(), 60_000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a connection pool for a tenant's database.
|
||||
*/
|
||||
getPool(tenantId: string, databaseName: string): Pool {
|
||||
const entry = this.pools.get(tenantId);
|
||||
if (entry) {
|
||||
entry.lastAccess = new Date();
|
||||
return entry.pool;
|
||||
}
|
||||
|
||||
const poolConfig: PoolConfig = {
|
||||
host: this.dbConfig.host,
|
||||
port: this.dbConfig.port,
|
||||
user: this.dbConfig.user,
|
||||
password: this.dbConfig.password,
|
||||
database: databaseName,
|
||||
max: 3,
|
||||
idleTimeoutMillis: 300_000,
|
||||
connectionTimeoutMillis: 10_000,
|
||||
};
|
||||
|
||||
const pool = new Pool(poolConfig);
|
||||
|
||||
pool.on('error', (err) => {
|
||||
console.error(`[TenantDB] Pool error for tenant ${tenantId} (${databaseName}):`, err.message);
|
||||
});
|
||||
|
||||
this.pools.set(tenantId, { pool, lastAccess: new Date() });
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new database for a tenant with all required tables and indexes.
|
||||
*/
|
||||
async provisionDatabase(rfc: string): Promise<string> {
|
||||
const databaseName = `horux_${rfc.toLowerCase().replace(/[^a-z0-9]/g, '')}`;
|
||||
|
||||
const adminPool = new Pool({
|
||||
...this.dbConfig,
|
||||
database: 'postgres',
|
||||
max: 1,
|
||||
});
|
||||
|
||||
try {
|
||||
const exists = await adminPool.query(
|
||||
`SELECT 1 FROM pg_database WHERE datname = $1`,
|
||||
[databaseName]
|
||||
);
|
||||
|
||||
if (exists.rows.length > 0) {
|
||||
throw new Error(`Database ${databaseName} already exists`);
|
||||
}
|
||||
|
||||
await adminPool.query(`CREATE DATABASE "${databaseName}"`);
|
||||
|
||||
const tenantPool = new Pool({
|
||||
...this.dbConfig,
|
||||
database: databaseName,
|
||||
max: 1,
|
||||
});
|
||||
|
||||
try {
|
||||
await this.createTables(tenantPool);
|
||||
await this.createIndexes(tenantPool);
|
||||
} finally {
|
||||
await tenantPool.end();
|
||||
}
|
||||
|
||||
return databaseName;
|
||||
} finally {
|
||||
await adminPool.end();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Soft-delete: rename database so it can be recovered.
|
||||
*/
|
||||
async deprovisionDatabase(databaseName: string): Promise<void> {
|
||||
// Close any active pool for this tenant
|
||||
for (const [tenantId, entry] of this.pools.entries()) {
|
||||
// We check pool config to match the database
|
||||
if ((entry.pool as any).options?.database === databaseName) {
|
||||
await entry.pool.end().catch(() => {});
|
||||
this.pools.delete(tenantId);
|
||||
}
|
||||
}
|
||||
|
||||
const timestamp = Date.now();
|
||||
const adminPool = new Pool({
|
||||
...this.dbConfig,
|
||||
database: 'postgres',
|
||||
max: 1,
|
||||
});
|
||||
|
||||
try {
|
||||
await adminPool.query(`
|
||||
SELECT pg_terminate_backend(pid)
|
||||
FROM pg_stat_activity
|
||||
WHERE datname = $1 AND pid <> pg_backend_pid()
|
||||
`, [databaseName]);
|
||||
|
||||
await adminPool.query(
|
||||
`ALTER DATABASE "${databaseName}" RENAME TO "${databaseName}_deleted_${timestamp}"`
|
||||
);
|
||||
} finally {
|
||||
await adminPool.end();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate (close and remove) a specific tenant's pool.
|
||||
*/
|
||||
invalidatePool(tenantId: string): void {
|
||||
const entry = this.pools.get(tenantId);
|
||||
if (entry) {
|
||||
entry.pool.end().catch(() => {});
|
||||
this.pools.delete(tenantId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove idle pools (not accessed in last 5 minutes).
|
||||
*/
|
||||
private cleanupIdlePools(): void {
|
||||
const now = Date.now();
|
||||
const maxIdle = 5 * 60 * 1000;
|
||||
|
||||
for (const [tenantId, entry] of this.pools.entries()) {
|
||||
if (now - entry.lastAccess.getTime() > maxIdle) {
|
||||
entry.pool.end().catch((err) =>
|
||||
console.error(`[TenantDB] Error closing idle pool for ${tenantId}:`, err.message)
|
||||
);
|
||||
this.pools.delete(tenantId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Graceful shutdown: close all pools.
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
if (this.cleanupInterval) {
|
||||
clearInterval(this.cleanupInterval);
|
||||
}
|
||||
|
||||
const closePromises = Array.from(this.pools.values()).map((entry) =>
|
||||
entry.pool.end()
|
||||
);
|
||||
await Promise.all(closePromises);
|
||||
this.pools.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get stats about active pools.
|
||||
*/
|
||||
getStats(): { activePools: number; tenantIds: string[] } {
|
||||
return {
|
||||
activePools: this.pools.size,
|
||||
tenantIds: Array.from(this.pools.keys()),
|
||||
};
|
||||
}
|
||||
|
||||
private async createTables(pool: Pool): Promise<void> {
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS cfdis (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
uuid_fiscal VARCHAR(36) UNIQUE NOT NULL,
|
||||
tipo VARCHAR(20) NOT NULL DEFAULT 'ingreso',
|
||||
serie VARCHAR(25),
|
||||
folio VARCHAR(40),
|
||||
fecha_emision TIMESTAMP NOT NULL,
|
||||
fecha_timbrado TIMESTAMP,
|
||||
rfc_emisor VARCHAR(13) NOT NULL,
|
||||
nombre_emisor VARCHAR(300) NOT NULL,
|
||||
rfc_receptor VARCHAR(13) NOT NULL,
|
||||
nombre_receptor VARCHAR(300) NOT NULL,
|
||||
subtotal DECIMAL(18,2) DEFAULT 0,
|
||||
descuento DECIMAL(18,2) DEFAULT 0,
|
||||
iva DECIMAL(18,2) DEFAULT 0,
|
||||
isr_retenido DECIMAL(18,2) DEFAULT 0,
|
||||
iva_retenido DECIMAL(18,2) DEFAULT 0,
|
||||
total DECIMAL(18,2) DEFAULT 0,
|
||||
moneda VARCHAR(10) DEFAULT 'MXN',
|
||||
tipo_cambio DECIMAL(10,4) DEFAULT 1,
|
||||
metodo_pago VARCHAR(10),
|
||||
forma_pago VARCHAR(10),
|
||||
uso_cfdi VARCHAR(10),
|
||||
estado VARCHAR(20) DEFAULT 'vigente',
|
||||
xml_url TEXT,
|
||||
pdf_url TEXT,
|
||||
xml_original TEXT,
|
||||
created_at TIMESTAMP DEFAULT NOW(),
|
||||
updated_at TIMESTAMP DEFAULT NOW(),
|
||||
last_sat_sync TIMESTAMP,
|
||||
sat_sync_job_id UUID,
|
||||
source VARCHAR(20) DEFAULT 'manual'
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS iva_mensual (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
año INTEGER NOT NULL,
|
||||
mes INTEGER NOT NULL,
|
||||
iva_trasladado DECIMAL(18,2) DEFAULT 0,
|
||||
iva_acreditable DECIMAL(18,2) DEFAULT 0,
|
||||
iva_retenido DECIMAL(18,2) DEFAULT 0,
|
||||
resultado DECIMAL(18,2) DEFAULT 0,
|
||||
acumulado DECIMAL(18,2) DEFAULT 0,
|
||||
estado VARCHAR(20) DEFAULT 'pendiente',
|
||||
fecha_declaracion TIMESTAMP,
|
||||
UNIQUE(año, mes)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS isr_mensual (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
año INTEGER NOT NULL,
|
||||
mes INTEGER NOT NULL,
|
||||
ingresos_acumulados DECIMAL(18,2) DEFAULT 0,
|
||||
deducciones DECIMAL(18,2) DEFAULT 0,
|
||||
base_gravable DECIMAL(18,2) DEFAULT 0,
|
||||
isr_causado DECIMAL(18,2) DEFAULT 0,
|
||||
isr_retenido DECIMAL(18,2) DEFAULT 0,
|
||||
isr_a_pagar DECIMAL(18,2) DEFAULT 0,
|
||||
estado VARCHAR(20) DEFAULT 'pendiente',
|
||||
fecha_declaracion TIMESTAMP,
|
||||
UNIQUE(año, mes)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS alertas (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
tipo VARCHAR(50) NOT NULL,
|
||||
titulo VARCHAR(200) NOT NULL,
|
||||
mensaje TEXT,
|
||||
prioridad VARCHAR(20) DEFAULT 'media',
|
||||
fecha_vencimiento TIMESTAMP,
|
||||
leida BOOLEAN DEFAULT FALSE,
|
||||
resuelta BOOLEAN DEFAULT FALSE,
|
||||
created_at TIMESTAMP DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS calendario_fiscal (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
titulo VARCHAR(200) NOT NULL,
|
||||
descripcion TEXT,
|
||||
tipo VARCHAR(50) NOT NULL,
|
||||
fecha_limite TIMESTAMP NOT NULL,
|
||||
recurrencia VARCHAR(20) DEFAULT 'unica',
|
||||
completado BOOLEAN DEFAULT FALSE,
|
||||
notas TEXT,
|
||||
created_at TIMESTAMP DEFAULT NOW()
|
||||
);
|
||||
`);
|
||||
}
|
||||
|
||||
private async createIndexes(pool: Pool): Promise<void> {
|
||||
await pool.query(`CREATE EXTENSION IF NOT EXISTS pg_trgm`);
|
||||
|
||||
await pool.query(`
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_fecha_emision ON cfdis(fecha_emision DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_tipo ON cfdis(tipo);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_emisor ON cfdis(rfc_emisor);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_rfc_receptor ON cfdis(rfc_receptor);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_estado ON cfdis(estado);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_emisor_trgm ON cfdis USING gin(nombre_emisor gin_trgm_ops);
|
||||
CREATE INDEX IF NOT EXISTS idx_cfdis_nombre_receptor_trgm ON cfdis USING gin(nombre_receptor gin_trgm_ops);
|
||||
`);
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
export const tenantDb = new TenantConnectionManager();
|
||||
|
||||
Reference in New Issue
Block a user