# /home/Autopartes/pos/services/tenant_manager.py """Create and manage tenant databases.""" import os import re import unicodedata import psycopg2 from psycopg2 import sql from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE, TENANT_TEMPLATE_DB from tenant_db import get_master_conn, get_tenant_conn_by_dbname MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'migrations') SEED_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'seed') def generate_subdomain(name): """Generate a URL-safe subdomain from a business name. Examples: 'Refaccionaria López' -> 'refaccionaria-lopez' 'Auto Parts MX #3' -> 'auto-parts-mx-3' """ # Normalize unicode (strip accents) nfkd = unicodedata.normalize('NFKD', name) ascii_name = nfkd.encode('ascii', 'ignore').decode('ascii') # Lowercase, replace non-alphanumeric with hyphens slug = re.sub(r'[^a-z0-9]+', '-', ascii_name.lower()).strip('-') # Collapse multiple hyphens slug = re.sub(r'-{2,}', '-', slug) # Truncate to 100 chars (column limit) return slug[:100] def ensure_master_tables(): """Create tenants/subscriptions/schema_version tables in nexus_master if missing.""" conn = get_master_conn() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS tenants ( id SERIAL PRIMARY KEY, name VARCHAR(200) NOT NULL, db_name VARCHAR(100) UNIQUE NOT NULL, subdomain VARCHAR(100) UNIQUE, rfc VARCHAR(13), plan VARCHAR(50) DEFAULT 'basic', is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW() ) """) # Add subdomain column if table already existed without it cur.execute(""" ALTER TABLE tenants ADD COLUMN IF NOT EXISTS subdomain VARCHAR(100) UNIQUE """) cur.execute(""" CREATE INDEX IF NOT EXISTS idx_tenants_subdomain ON tenants(subdomain) """) cur.execute(""" CREATE TABLE IF NOT EXISTS subscriptions ( id SERIAL PRIMARY KEY, tenant_id INTEGER REFERENCES tenants(id), plan VARCHAR(50) NOT NULL, status VARCHAR(20) DEFAULT 'active', started_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ, stripe_id VARCHAR(100) ) """) cur.execute(""" CREATE TABLE IF NOT EXISTS tenant_schema_version ( tenant_id INTEGER PRIMARY KEY REFERENCES tenants(id), version VARCHAR(20) NOT NULL DEFAULT 'v1.0', updated_at TIMESTAMPTZ DEFAULT NOW() ) """) conn.commit() cur.close() conn.close() def create_template_db(): """Create tenant_template DB with full schema if it doesn't exist.""" conn = psycopg2.connect(MASTER_DB_URL) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() # Check if template already exists (idempotent — safe to call from multiple tasks) cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (TENANT_TEMPLATE_DB,)) if cur.fetchone(): cur.close() conn.close() return False # Already exists # Use psycopg2.sql.Identifier for safe dynamic DB name cur.execute( sql.SQL('CREATE DATABASE {}').format(sql.Identifier(TENANT_TEMPLATE_DB)) ) cur.close() conn.close() # Apply schema to template tpl_conn = get_tenant_conn_by_dbname(TENANT_TEMPLATE_DB) tpl_cur = tpl_conn.cursor() schema_path = os.path.join(MIGRATIONS_DIR, 'v1.0_initial.sql') with open(schema_path) as f: tpl_cur.execute(f.read()) seed_path = os.path.join(SEED_DIR, 'sat_accounts.sql') with open(seed_path) as f: tpl_cur.execute(f.read()) tpl_conn.commit() tpl_cur.close() tpl_conn.close() return True # Created def _generate_db_name(name): """Generate a safe database name from business name. Only lowercase ASCII letters, digits, and underscores. """ nfkd = unicodedata.normalize('NFKD', name) ascii_name = nfkd.encode('ascii', 'ignore').decode('ascii') slug = re.sub(r'[^a-z0-9]+', '_', ascii_name.lower()).strip('_') slug = re.sub(r'_{2,}', '_', slug) return f"tenant_{slug[:30]}" def provision_tenant(name, rfc=None, owner_name="Admin", owner_email=None, owner_pin="0000", subdomain=None): """Create a new tenant: register in master, create DB from template, create owner employee. If subdomain is not provided, one is auto-generated from the business name. Includes automatic rollback on failure to avoid orphaned databases. """ import bcrypt ensure_master_tables() create_template_db() # Run master migrations before creating tenant (ensures marketplace tables exist) from migrations.runner_master import run_master_migrations run_master_migrations() # Generate subdomain if not provided if not subdomain: subdomain = generate_subdomain(name) # Generate safe db_name db_name = _generate_db_name(name) conn = get_master_conn() cur = conn.cursor() # Validate uniqueness before inserting cur.execute("SELECT 1 FROM tenants WHERE db_name = %s LIMIT 1", (db_name,)) if cur.fetchone(): cur.close() conn.close() raise ValueError(f"A tenant with db_name '{db_name}' already exists.") cur.execute("SELECT 1 FROM tenants WHERE subdomain = %s LIMIT 1", (subdomain,)) if cur.fetchone(): cur.close() conn.close() raise ValueError(f"A tenant with subdomain '{subdomain}' already exists.") # Insert tenant cur.execute(""" INSERT INTO tenants (name, db_name, rfc, subdomain) VALUES (%s, %s, %s, %s) RETURNING id, db_name """, (name, db_name, rfc, subdomain)) tenant_id, db_name = cur.fetchone() # Track schema version (will be updated after migrations) cur.execute(""" INSERT INTO tenant_schema_version (tenant_id, version) VALUES (%s, 'v1.0') """, (tenant_id,)) conn.commit() cur.close() conn.close() tenant_conn = None try: # Create DB from template master_conn = psycopg2.connect(MASTER_DB_URL) master_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) master_cur = master_conn.cursor() master_cur.execute( sql.SQL('CREATE DATABASE {} TEMPLATE {}').format( sql.Identifier(db_name), sql.Identifier(TENANT_TEMPLATE_DB) ) ) master_cur.close() master_conn.close() # Apply pending migrations post-v1.0 from migrations.runner import MIGRATIONS, apply_migration sorted_versions = sorted(MIGRATIONS.keys()) for version in sorted_versions: if version <= 'v1.0': continue success = apply_migration(db_name, version) if not success: raise RuntimeError(f"Migration {version} failed for tenant {db_name}") # Update version in master mconn = get_master_conn() mcur = mconn.cursor() mcur.execute(""" INSERT INTO tenant_schema_version (tenant_id, version) VALUES (%s, %s) ON CONFLICT (tenant_id) DO UPDATE SET version = %s, updated_at = NOW() """, (tenant_id, version, version)) mconn.commit() mcur.close() mconn.close() # Create default branch and owner employee tenant_conn = get_tenant_conn_by_dbname(db_name) tenant_cur = tenant_conn.cursor() tenant_cur.execute(""" INSERT INTO branches (name) VALUES ('Principal') RETURNING id """) branch_id = tenant_cur.fetchone()[0] pin_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode() pwd_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode() tenant_cur.execute(""" INSERT INTO employees (name, email, pin, password_hash, role, branch_id, max_discount_pct, is_active) VALUES (%s, %s, %s, %s, 'owner', %s, 100, true) RETURNING id """, (owner_name, owner_email, pin_hash, pwd_hash, branch_id)) owner_id = tenant_cur.fetchone()[0] # Grant all permissions to owner (batch insert) permissions = [ 'pos.sell', 'pos.discount', 'pos.cancel', 'pos.view_cost', 'inventory.view', 'inventory.create', 'inventory.edit', 'inventory.adjust', 'inventory.transfer', 'catalog.view', 'catalog.edit', 'customers.view', 'customers.create', 'customers.edit', 'customers.edit_credit', 'customers.delete', 'accounting.view', 'accounting.create', 'accounting.close', 'invoicing.view', 'invoicing.create', 'invoicing.cancel', 'reports.view', 'reports.financial', 'config.view', 'config.edit', 'config.edit_prices' ] tenant_cur.executemany( "INSERT INTO employee_permissions (employee_id, permission) VALUES (%s, %s)", [(owner_id, perm) for perm in permissions] ) # Seed tenant_config with RFC and defaults if rfc: tenant_cur.execute(""" INSERT INTO tenant_config (key, value) VALUES ('tenant_rfc', %s), ('tenant_razon_social', %s), ('tenant_cp', '00000'), ('cfdi_regimen_fiscal', '601'), ('cfdi_serie', 'A') ON CONFLICT (key) DO NOTHING """, (rfc, name)) tenant_conn.commit() tenant_cur.close() tenant_conn.close() return {'tenant_id': tenant_id, 'db_name': db_name, 'owner_id': owner_id, 'subdomain': subdomain} except Exception as e: # Rollback: drop tenant DB and remove from master try: drop_conn = psycopg2.connect(MASTER_DB_URL) drop_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) drop_cur = drop_conn.cursor() drop_cur.execute( sql.SQL('DROP DATABASE IF EXISTS {}').format(sql.Identifier(db_name)) ) drop_cur.close() drop_conn.close() except Exception: pass try: cleanup_conn = get_master_conn() cleanup_cur = cleanup_conn.cursor() cleanup_cur.execute("DELETE FROM tenant_schema_version WHERE tenant_id = %s", (tenant_id,)) cleanup_cur.execute("DELETE FROM tenants WHERE id = %s", (tenant_id,)) cleanup_conn.commit() cleanup_cur.close() cleanup_conn.close() except Exception: pass raise RuntimeError(f"Failed to provision tenant: {e}") def list_tenants(): """List all tenants.""" conn = get_master_conn() cur = conn.cursor() cur.execute("SELECT id, name, db_name, rfc, plan, is_active, created_at, subdomain FROM tenants ORDER BY id") tenants = [] for row in cur.fetchall(): tenants.append({ 'id': row[0], 'name': row[1], 'db_name': row[2], 'rfc': row[3], 'plan': row[4], 'is_active': row[5], 'created_at': str(row[6]), 'subdomain': row[7] }) cur.close() conn.close() return tenants