import { PrismaClient } from '@prisma/client'; import { Pool, type PoolConfig } from 'pg'; import { env } from './env.js'; import { migrate } from './tenant-migrations.js'; // =========================================== // Prisma Client (central database: horux360) // =========================================== declare global { var prisma: PrismaClient | undefined; } export const prisma = globalThis.prisma || new PrismaClient({ log: process.env.NODE_ENV === 'development' ? ['query', 'error', 'warn'] : ['error'], }); if (process.env.NODE_ENV !== 'production') { globalThis.prisma = prisma; } // =========================================== // TenantConnectionManager (per-tenant DBs) // =========================================== interface PoolEntry { pool: Pool; lastAccess: Date; } function parseDatabaseUrl(url: string) { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port || '5432'), user: decodeURIComponent(parsed.username), password: decodeURIComponent(parsed.password), }; } class TenantConnectionManager { private pools: Map = new Map(); private cleanupInterval: NodeJS.Timeout | null = null; private dbConfig: { host: string; port: number; user: string; password: string }; private migratedPools: Set = new Set(); constructor() { this.dbConfig = parseDatabaseUrl(env.DATABASE_URL); this.cleanupInterval = setInterval(() => this.cleanupIdlePools(), 60_000); } /** * Get or create a connection pool for a tenant's database. * Runs lazy migrations on first access (or after pool invalidation). */ async getPool( tenantId: string, databaseName: string, connectionOverride?: { host: string; port: number; user: string; password: string }, ): Promise { let pool: Pool; const entry = this.pools.get(tenantId); if (entry) { entry.lastAccess = new Date(); pool = entry.pool; } else { const poolConfig: PoolConfig = { host: connectionOverride?.host ?? this.dbConfig.host, port: connectionOverride?.port ?? this.dbConfig.port, user: connectionOverride?.user ?? this.dbConfig.user, password: connectionOverride?.password ?? this.dbConfig.password, database: databaseName, max: 10, idleTimeoutMillis: 300_000, connectionTimeoutMillis: 30_000, }; pool = new Pool(poolConfig); pool.on('error', (err) => { console.error(`[TenantDB] Pool error for tenant ${tenantId} (${databaseName}):`, err.message); }); this.pools.set(tenantId, { pool, lastAccess: new Date() }); } if (!this.migratedPools.has(tenantId)) { try { await migrate(pool, databaseName); } catch (err) { console.error(`[TenantDB] Migration error for tenant ${tenantId} (${databaseName}):`, err); } this.migratedPools.add(tenantId); } return pool; } /** * Create a new database for a tenant with all required tables and indexes. */ async provisionDatabase(rfc: string, overrideDatabaseName?: string): Promise { const databaseName = overrideDatabaseName || `horux_${rfc.toLowerCase().replace(/[^a-z0-9]/g, '')}`; const adminPool = new Pool({ ...this.dbConfig, database: 'postgres', max: 1, }); try { const exists = await adminPool.query( `SELECT 1 FROM pg_database WHERE datname = $1`, [databaseName] ); if (exists.rows.length > 0) { throw new Error(`Database ${databaseName} already exists`); } await adminPool.query(`CREATE DATABASE "${databaseName}"`); const tenantPool = new Pool({ ...this.dbConfig, database: databaseName, max: 1, }); try { await migrate(tenantPool, databaseName); } finally { await tenantPool.end(); } return databaseName; } finally { await adminPool.end(); } } /** * Soft-delete: rename database so it can be recovered. */ async deprovisionDatabase(databaseName: string): Promise { // Close any active pool for this tenant for (const [tenantId, entry] of this.pools.entries()) { // We check pool config to match the database if ((entry.pool as any).options?.database === databaseName) { await entry.pool.end().catch(() => {}); this.pools.delete(tenantId); } } const timestamp = Date.now(); const adminPool = new Pool({ ...this.dbConfig, database: 'postgres', max: 1, }); try { await adminPool.query(` SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = $1 AND pid <> pg_backend_pid() `, [databaseName]); await adminPool.query( `ALTER DATABASE "${databaseName}" RENAME TO "${databaseName}_deleted_${timestamp}"` ); } finally { await adminPool.end(); } } /** * Invalidate (close and remove) a specific tenant's pool. */ invalidatePool(tenantId: string): void { const entry = this.pools.get(tenantId); if (entry) { entry.pool.end().catch(() => {}); this.pools.delete(tenantId); } this.migratedPools.delete(tenantId); } /** * Remove idle pools (not accessed in last 12 hours). * SAT syncs (initial/daily) can run for hours in background; * a 5-minute timeout caused 'pool already ended' errors mid-sync. */ private cleanupIdlePools(): void { const now = Date.now(); const maxIdle = 12 * 60 * 60 * 1000; for (const [tenantId, entry] of this.pools.entries()) { if (now - entry.lastAccess.getTime() > maxIdle) { entry.pool.end().catch((err) => console.error(`[TenantDB] Error closing idle pool for ${tenantId}:`, err.message) ); this.pools.delete(tenantId); } } } /** * Graceful shutdown: close all pools. */ async shutdown(): Promise { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); } const closePromises = Array.from(this.pools.values()).map((entry) => entry.pool.end() ); await Promise.all(closePromises); this.pools.clear(); } /** * Get stats about active pools. */ getStats(): { activePools: number; tenantIds: string[] } { return { activePools: this.pools.size, tenantIds: Array.from(this.pools.keys()), }; } } // Singleton instance export const tenantDb = new TenantConnectionManager();