feat(pos): add tenant manager — provision DBs from template with sql.Identifier

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-31 01:27:12 +00:00
parent 3d1a925c5c
commit c82a29279e
2 changed files with 220 additions and 0 deletions

View File

@@ -0,0 +1,189 @@
# /home/Autopartes/pos/services/tenant_manager.py
"""Create and manage tenant databases."""
import os
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE, TENANT_TEMPLATE_DB
from tenant_db import get_master_conn, get_tenant_conn_by_dbname
MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'migrations')
SEED_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'seed')
def ensure_master_tables():
"""Create tenants/subscriptions/schema_version tables in nexus_master if missing."""
conn = get_master_conn()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS tenants (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
db_name VARCHAR(100) UNIQUE NOT NULL,
rfc VARCHAR(13),
plan VARCHAR(50) DEFAULT 'basic',
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
id SERIAL PRIMARY KEY,
tenant_id INTEGER REFERENCES tenants(id),
plan VARCHAR(50) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
started_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ,
stripe_id VARCHAR(100)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS tenant_schema_version (
tenant_id INTEGER PRIMARY KEY REFERENCES tenants(id),
version VARCHAR(20) NOT NULL DEFAULT 'v1.0',
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""")
conn.commit()
cur.close()
conn.close()
def create_template_db():
"""Create tenant_template DB with full schema if it doesn't exist."""
conn = psycopg2.connect(MASTER_DB_URL)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
# Check if template already exists (idempotent — safe to call from multiple tasks)
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (TENANT_TEMPLATE_DB,))
if cur.fetchone():
cur.close()
conn.close()
return False # Already exists
# Use psycopg2.sql.Identifier for safe dynamic DB name
cur.execute(
sql.SQL('CREATE DATABASE {}').format(sql.Identifier(TENANT_TEMPLATE_DB))
)
cur.close()
conn.close()
# Apply schema to template
tpl_conn = get_tenant_conn_by_dbname(TENANT_TEMPLATE_DB)
tpl_cur = tpl_conn.cursor()
schema_path = os.path.join(MIGRATIONS_DIR, 'v1.0_initial.sql')
with open(schema_path) as f:
tpl_cur.execute(f.read())
seed_path = os.path.join(SEED_DIR, 'sat_accounts.sql')
with open(seed_path) as f:
tpl_cur.execute(f.read())
tpl_conn.commit()
tpl_cur.close()
tpl_conn.close()
return True # Created
def provision_tenant(name, rfc=None, owner_name="Admin", owner_email=None, owner_pin="0000"):
"""Create a new tenant: register in master, create DB from template, create owner employee."""
import bcrypt
ensure_master_tables()
create_template_db()
# Generate db_name
conn = get_master_conn()
cur = conn.cursor()
# Insert tenant
cur.execute("""
INSERT INTO tenants (name, db_name, rfc)
VALUES (%s, %s, %s)
RETURNING id, db_name
""", (name, f"tenant_{name.lower().replace(' ', '_')[:30]}", rfc))
tenant_id, db_name = cur.fetchone()
# Track schema version
cur.execute("""
INSERT INTO tenant_schema_version (tenant_id, version)
VALUES (%s, 'v1.0')
""", (tenant_id,))
conn.commit()
cur.close()
conn.close()
# Create DB from template — use psycopg2.sql.Identifier for safe dynamic names
master_conn = psycopg2.connect(MASTER_DB_URL)
master_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
master_cur = master_conn.cursor()
master_cur.execute(
sql.SQL('CREATE DATABASE {} TEMPLATE {}').format(
sql.Identifier(db_name),
sql.Identifier(TENANT_TEMPLATE_DB)
)
)
master_cur.close()
master_conn.close()
# Create default branch and owner employee
tenant_conn = get_tenant_conn_by_dbname(db_name)
tenant_cur = tenant_conn.cursor()
tenant_cur.execute("""
INSERT INTO branches (name) VALUES ('Principal') RETURNING id
""")
branch_id = tenant_cur.fetchone()[0]
pin_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode()
pwd_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode()
tenant_cur.execute("""
INSERT INTO employees (name, email, pin, password_hash, role, branch_id, max_discount_pct, is_active)
VALUES (%s, %s, %s, %s, 'owner', %s, 100, true)
RETURNING id
""", (owner_name, owner_email, pin_hash, pwd_hash, branch_id))
owner_id = tenant_cur.fetchone()[0]
# Grant all permissions to owner
permissions = [
'pos.sell', 'pos.discount', 'pos.cancel', 'pos.view_cost',
'inventory.view', 'inventory.create', 'inventory.edit', 'inventory.adjust', 'inventory.transfer',
'catalog.view', 'catalog.edit',
'customers.view', 'customers.create', 'customers.edit', 'customers.edit_credit', 'customers.delete',
'accounting.view', 'accounting.create', 'accounting.close',
'invoicing.view', 'invoicing.create', 'invoicing.cancel',
'reports.view', 'reports.financial',
'config.view', 'config.edit', 'config.edit_prices'
]
for perm in permissions:
tenant_cur.execute(
"INSERT INTO employee_permissions (employee_id, permission) VALUES (%s, %s)",
(owner_id, perm)
)
tenant_conn.commit()
tenant_cur.close()
tenant_conn.close()
return {'tenant_id': tenant_id, 'db_name': db_name, 'owner_id': owner_id}
def list_tenants():
"""List all tenants."""
conn = get_master_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, db_name, rfc, plan, is_active, created_at FROM tenants ORDER BY id")
tenants = []
for row in cur.fetchall():
tenants.append({
'id': row[0], 'name': row[1], 'db_name': row[2],
'rfc': row[3], 'plan': row[4], 'is_active': row[5],
'created_at': str(row[6])
})
cur.close()
conn.close()
return tenants