# /home/Autopartes/pos/services/tenant_manager.py """Create and manage tenant databases.""" import os import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE, TENANT_TEMPLATE_DB from tenant_db import get_master_conn, get_tenant_conn_by_dbname MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'migrations') SEED_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'seed') def ensure_master_tables(): """Create tenants/subscriptions/schema_version tables in nexus_master if missing.""" conn = get_master_conn() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS tenants ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, db_name VARCHAR(100) UNIQUE NOT NULL, rfc VARCHAR(13), plan VARCHAR(50) DEFAULT 'basic', is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW() ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS subscriptions ( id SERIAL PRIMARY KEY, tenant_id INTEGER REFERENCES tenants(id), plan VARCHAR(50) NOT NULL, status VARCHAR(20) DEFAULT 'active', started_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ, stripe_id VARCHAR(100) ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS tenant_schema_version ( tenant_id INTEGER PRIMARY KEY REFERENCES tenants(id), version VARCHAR(20) NOT NULL DEFAULT 'v1.0', updated_at TIMESTAMPTZ DEFAULT NOW() ) """) conn.commit() cur.close() conn.close() def create_template_db(): """Create tenant_template DB with full schema if it doesn't exist.""" conn = psycopg2.connect(MASTER_DB_URL) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() # Check if template already exists (idempotent — safe to call from multiple tasks) cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (TENANT_TEMPLATE_DB,)) if cur.fetchone(): cur.close() conn.close() return False # Already exists # Use psycopg2.sql.Identifier for safe dynamic DB name cur.execute( sql.SQL('CREATE DATABASE {}').format(sql.Identifier(TENANT_TEMPLATE_DB)) ) cur.close() conn.close() # Apply schema to template tpl_conn = get_tenant_conn_by_dbname(TENANT_TEMPLATE_DB) tpl_cur = tpl_conn.cursor() schema_path = os.path.join(MIGRATIONS_DIR, 'v1.0_initial.sql') with open(schema_path) as f: tpl_cur.execute(f.read()) seed_path = os.path.join(SEED_DIR, 'sat_accounts.sql') with open(seed_path) as f: tpl_cur.execute(f.read()) tpl_conn.commit() tpl_cur.close() tpl_conn.close() return True # Created def provision_tenant(name, rfc=None, owner_name="Admin", owner_email=None, owner_pin="0000"): """Create a new tenant: register in master, create DB from template, create owner employee.""" import bcrypt ensure_master_tables() create_template_db() # Generate db_name conn = get_master_conn() cur = conn.cursor() # Insert tenant cur.execute(""" INSERT INTO tenants (name, db_name, rfc) VALUES (%s, %s, %s) RETURNING id, db_name """, (name, f"tenant_{name.lower().replace(' ', '_')[:30]}", rfc)) tenant_id, db_name = cur.fetchone() # Track schema version cur.execute(""" INSERT INTO tenant_schema_version (tenant_id, version) VALUES (%s, 'v1.0') """, (tenant_id,)) conn.commit() cur.close() conn.close() # Create DB from template — use psycopg2.sql.Identifier for safe dynamic names master_conn = psycopg2.connect(MASTER_DB_URL) master_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) master_cur = master_conn.cursor() master_cur.execute( sql.SQL('CREATE DATABASE {} TEMPLATE {}').format( sql.Identifier(db_name), sql.Identifier(TENANT_TEMPLATE_DB) ) ) master_cur.close() master_conn.close() # Create default branch and owner employee tenant_conn = get_tenant_conn_by_dbname(db_name) tenant_cur = tenant_conn.cursor() tenant_cur.execute(""" INSERT INTO branches (name) VALUES ('Principal') RETURNING id """) branch_id = tenant_cur.fetchone()[0] pin_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode() pwd_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode() tenant_cur.execute(""" INSERT INTO employees (name, email, pin, password_hash, role, branch_id, max_discount_pct, is_active) VALUES (%s, %s, %s, %s, 'owner', %s, 100, true) RETURNING id """, (owner_name, owner_email, pin_hash, pwd_hash, branch_id)) owner_id = tenant_cur.fetchone()[0] # Grant all permissions to owner permissions = [ 'pos.sell', 'pos.discount', 'pos.cancel', 'pos.view_cost', 'inventory.view', 'inventory.create', 'inventory.edit', 'inventory.adjust', 'inventory.transfer', 'catalog.view', 'catalog.edit', 'customers.view', 'customers.create', 'customers.edit', 'customers.edit_credit', 'customers.delete', 'accounting.view', 'accounting.create', 'accounting.close', 'invoicing.view', 'invoicing.create', 'invoicing.cancel', 'reports.view', 'reports.financial', 'config.view', 'config.edit', 'config.edit_prices' ] for perm in permissions: tenant_cur.execute( "INSERT INTO employee_permissions (employee_id, permission) VALUES (%s, %s)", (owner_id, perm) ) tenant_conn.commit() tenant_cur.close() tenant_conn.close() return {'tenant_id': tenant_id, 'db_name': db_name, 'owner_id': owner_id} def list_tenants(): """List all tenants.""" conn = get_master_conn() cur = conn.cursor() cur.execute("SELECT id, name, db_name, rfc, plan, is_active, created_at FROM tenants ORDER BY id") tenants = [] for row in cur.fetchall(): tenants.append({ 'id': row[0], 'name': row[1], 'db_name': row[2], 'rfc': row[3], 'plan': row[4], 'is_active': row[5], 'created_at': str(row[6]) }) cur.close() conn.close() return tenants