/** * PostgreSQL Connection Pool with Multi-Tenant Support * * Implements a connection pool manager that supports schema-based multi-tenancy. * Each tenant has their own schema, and connections can be scoped to a specific tenant. */ import { Pool, PoolClient, PoolConfig, QueryResult, QueryResultRow } from 'pg'; import { EventEmitter } from 'events'; // Database configuration interface export interface DatabaseConfig { host: string; port: number; database: string; user: string; password: string; ssl?: boolean | { rejectUnauthorized: boolean }; max?: number; idleTimeoutMillis?: number; connectionTimeoutMillis?: number; } // Tenant context for queries export interface TenantContext { tenantId: string; schemaName: string; userId?: string; } // Query options export interface QueryOptions { tenant?: TenantContext; timeout?: number; } // Connection pool events export interface PoolEvents { connect: (client: PoolClient) => void; acquire: (client: PoolClient) => void; release: (client: PoolClient) => void; error: (error: Error, client: PoolClient) => void; remove: (client: PoolClient) => void; } // Singleton pool instance let globalPool: Pool | null = null; // Pool statistics interface PoolStats { totalConnections: number; idleConnections: number; waitingRequests: number; } /** * Database connection manager with multi-tenant support */ export class DatabaseConnection extends EventEmitter { private pool: Pool; private config: DatabaseConfig; private isConnected: boolean = false; constructor(config?: DatabaseConfig) { super(); this.config = config || this.getConfigFromEnv(); const poolConfig: PoolConfig = { host: this.config.host, port: this.config.port, database: this.config.database, user: this.config.user, password: this.config.password, ssl: this.config.ssl, max: this.config.max || 20, idleTimeoutMillis: this.config.idleTimeoutMillis || 30000, connectionTimeoutMillis: this.config.connectionTimeoutMillis || 10000, }; this.pool = new Pool(poolConfig); this.setupPoolEvents(); } /** * Get configuration from environment variables */ private getConfigFromEnv(): DatabaseConfig { return { host: process.env.DB_HOST || 'localhost', port: parseInt(process.env.DB_PORT || '5432', 10), database: process.env.DB_NAME || 'horux_strategy', user: process.env.DB_USER || 'postgres', password: process.env.DB_PASSWORD || '', ssl: process.env.DB_SSL === 'true' ? { rejectUnauthorized: process.env.DB_SSL_REJECT_UNAUTHORIZED !== 'false' } : false, max: parseInt(process.env.DB_POOL_MAX || '20', 10), idleTimeoutMillis: parseInt(process.env.DB_IDLE_TIMEOUT || '30000', 10), connectionTimeoutMillis: parseInt(process.env.DB_CONNECTION_TIMEOUT || '10000', 10), }; } /** * Setup pool event handlers */ private setupPoolEvents(): void { this.pool.on('connect', (client) => { this.isConnected = true; this.emit('connect', client); }); this.pool.on('acquire', (client) => { this.emit('acquire', client); }); this.pool.on('release', (client) => { this.emit('release', client); }); this.pool.on('error', (err, client) => { console.error('Unexpected pool error:', err); this.emit('error', err, client); }); this.pool.on('remove', (client) => { this.emit('remove', client); }); } /** * Test database connection */ async connect(): Promise { try { const client = await this.pool.connect(); await client.query('SELECT 1'); client.release(); this.isConnected = true; console.log('Database connection established successfully'); return true; } catch (error) { console.error('Failed to connect to database:', error); this.isConnected = false; throw error; } } /** * Close all connections in the pool */ async disconnect(): Promise { await this.pool.end(); this.isConnected = false; console.log('Database connection pool closed'); } /** * Get pool statistics */ getStats(): PoolStats { return { totalConnections: this.pool.totalCount, idleConnections: this.pool.idleCount, waitingRequests: this.pool.waitingCount, }; } /** * Check if connected */ isPoolConnected(): boolean { return this.isConnected; } /** * Execute a query with optional tenant context */ async query( text: string, params?: unknown[], options?: QueryOptions ): Promise> { const client = await this.pool.connect(); try { // Set search path for tenant if provided if (options?.tenant) { await client.query(`SET search_path TO ${this.escapeIdentifier(options.tenant.schemaName)}, public`); } // Set query timeout if provided if (options?.timeout) { await client.query(`SET statement_timeout = ${options.timeout}`); } const result = await client.query(text, params); return result; } finally { // Reset search path and timeout before releasing if (options?.tenant || options?.timeout) { await client.query('RESET search_path; RESET statement_timeout;').catch(() => {}); } client.release(); } } /** * Execute a query in the public schema */ async queryPublic( text: string, params?: unknown[] ): Promise> { return this.query(text, params); } /** * Execute a query in a tenant schema */ async queryTenant( tenant: TenantContext, text: string, params?: unknown[] ): Promise> { return this.query(text, params, { tenant }); } /** * Get a client for transaction handling */ async getClient(): Promise { return this.pool.connect(); } /** * Execute a function within a transaction */ async transaction( fn: (client: PoolClient) => Promise, options?: QueryOptions ): Promise { const client = await this.pool.connect(); try { // Set search path for tenant if provided if (options?.tenant) { await client.query(`SET search_path TO ${this.escapeIdentifier(options.tenant.schemaName)}, public`); } await client.query('BEGIN'); try { const result = await fn(client); await client.query('COMMIT'); return result; } catch (error) { await client.query('ROLLBACK'); throw error; } } finally { if (options?.tenant) { await client.query('RESET search_path').catch(() => {}); } client.release(); } } /** * Execute a function within a tenant transaction */ async tenantTransaction( tenant: TenantContext, fn: (client: PoolClient) => Promise ): Promise { return this.transaction(fn, { tenant }); } /** * Escape an identifier (schema name, table name, etc.) */ private escapeIdentifier(identifier: string): string { // Validate identifier format (alphanumeric and underscores only) if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(identifier)) { throw new Error(`Invalid identifier: ${identifier}`); } return `"${identifier}"`; } /** * Check if a schema exists */ async schemaExists(schemaName: string): Promise { const result = await this.query<{ exists: boolean }>( `SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1) as exists`, [schemaName] ); return result.rows[0]?.exists ?? false; } /** * Get list of all tenant schemas */ async getTenantSchemas(): Promise { const result = await this.query<{ schema_name: string }>( `SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'tenant_%' ORDER BY schema_name` ); return result.rows.map(row => row.schema_name); } /** * Get the underlying pool (use with caution) */ getPool(): Pool { return this.pool; } } /** * Get or create the global database connection */ export function getDatabase(config?: DatabaseConfig): DatabaseConnection { if (!globalPool) { const connection = new DatabaseConnection(config); globalPool = connection.getPool(); return connection; } return new DatabaseConnection(config); } /** * Create a new database connection (non-singleton) */ export function createDatabase(config?: DatabaseConfig): DatabaseConnection { return new DatabaseConnection(config); } /** * Tenant-scoped database client * Provides a simplified interface for tenant-specific operations */ export class TenantDatabase { private db: DatabaseConnection; private tenant: TenantContext; constructor(db: DatabaseConnection, tenant: TenantContext) { this.db = db; this.tenant = tenant; } /** * Execute a query in the tenant schema */ async query( text: string, params?: unknown[] ): Promise> { return this.db.queryTenant(this.tenant, text, params); } /** * Execute a transaction in the tenant schema */ async transaction(fn: (client: PoolClient) => Promise): Promise { return this.db.tenantTransaction(this.tenant, fn); } /** * Get tenant context */ getContext(): TenantContext { return this.tenant; } /** * Query the public schema (for cross-tenant operations) */ async queryPublic( text: string, params?: unknown[] ): Promise> { return this.db.queryPublic(text, params); } } /** * Create a tenant-scoped database client */ export function createTenantDatabase( db: DatabaseConnection, tenantId: string, userId?: string ): TenantDatabase { const tenant: TenantContext = { tenantId, schemaName: `tenant_${tenantId}`, userId, }; return new TenantDatabase(db, tenant); } // Export types export type { Pool, PoolClient, QueryResult, QueryResultRow };