Files
Autoparts-DB/pos/services/tenant_manager.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

318 lines
11 KiB
Python

# /home/Autopartes/pos/services/tenant_manager.py
"""Create and manage tenant databases."""
import os
import re
import unicodedata
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from config import MASTER_DB_URL, TENANT_DB_URL_TEMPLATE, TENANT_TEMPLATE_DB
from tenant_db import get_master_conn, get_tenant_conn_by_dbname
MIGRATIONS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'migrations')
SEED_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'seed')
def generate_subdomain(name):
"""Generate a URL-safe subdomain from a business name.
Examples:
'Refaccionaria López' -> 'refaccionaria-lopez'
'Auto Parts MX #3' -> 'auto-parts-mx-3'
"""
# Normalize unicode (strip accents)
nfkd = unicodedata.normalize('NFKD', name)
ascii_name = nfkd.encode('ascii', 'ignore').decode('ascii')
# Lowercase, replace non-alphanumeric with hyphens
slug = re.sub(r'[^a-z0-9]+', '-', ascii_name.lower()).strip('-')
# Collapse multiple hyphens
slug = re.sub(r'-{2,}', '-', slug)
# Truncate to 100 chars (column limit)
return slug[:100]
def ensure_master_tables():
"""Create tenants/subscriptions/schema_version tables in nexus_master if missing."""
conn = get_master_conn()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS tenants (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
db_name VARCHAR(100) UNIQUE NOT NULL,
subdomain VARCHAR(100) UNIQUE,
rfc VARCHAR(13),
plan VARCHAR(50) DEFAULT 'basic',
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Add subdomain column if table already existed without it
cur.execute("""
ALTER TABLE tenants ADD COLUMN IF NOT EXISTS subdomain VARCHAR(100) UNIQUE
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_tenants_subdomain ON tenants(subdomain)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS subscriptions (
id SERIAL PRIMARY KEY,
tenant_id INTEGER REFERENCES tenants(id),
plan VARCHAR(50) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
started_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ,
stripe_id VARCHAR(100)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS tenant_schema_version (
tenant_id INTEGER PRIMARY KEY REFERENCES tenants(id),
version VARCHAR(20) NOT NULL DEFAULT 'v1.0',
updated_at TIMESTAMPTZ DEFAULT NOW()
)
""")
conn.commit()
cur.close()
conn.close()
def create_template_db():
"""Create tenant_template DB with full schema if it doesn't exist."""
conn = psycopg2.connect(MASTER_DB_URL)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
# Check if template already exists (idempotent — safe to call from multiple tasks)
cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (TENANT_TEMPLATE_DB,))
if cur.fetchone():
cur.close()
conn.close()
return False # Already exists
# Use psycopg2.sql.Identifier for safe dynamic DB name
cur.execute(
sql.SQL('CREATE DATABASE {}').format(sql.Identifier(TENANT_TEMPLATE_DB))
)
cur.close()
conn.close()
# Apply schema to template
tpl_conn = get_tenant_conn_by_dbname(TENANT_TEMPLATE_DB)
tpl_cur = tpl_conn.cursor()
schema_path = os.path.join(MIGRATIONS_DIR, 'v1.0_initial.sql')
with open(schema_path) as f:
tpl_cur.execute(f.read())
seed_path = os.path.join(SEED_DIR, 'sat_accounts.sql')
with open(seed_path) as f:
tpl_cur.execute(f.read())
tpl_conn.commit()
tpl_cur.close()
tpl_conn.close()
return True # Created
def _generate_db_name(name):
"""Generate a safe database name from business name.
Only lowercase ASCII letters, digits, and underscores.
"""
nfkd = unicodedata.normalize('NFKD', name)
ascii_name = nfkd.encode('ascii', 'ignore').decode('ascii')
slug = re.sub(r'[^a-z0-9]+', '_', ascii_name.lower()).strip('_')
slug = re.sub(r'_{2,}', '_', slug)
return f"tenant_{slug[:30]}"
def provision_tenant(name, rfc=None, owner_name="Admin", owner_email=None, owner_pin="0000", subdomain=None):
"""Create a new tenant: register in master, create DB from template, create owner employee.
If subdomain is not provided, one is auto-generated from the business name.
Includes automatic rollback on failure to avoid orphaned databases.
"""
import bcrypt
ensure_master_tables()
create_template_db()
# Run master migrations before creating tenant (ensures marketplace tables exist)
from migrations.runner_master import run_master_migrations
run_master_migrations()
# Generate subdomain if not provided
if not subdomain:
subdomain = generate_subdomain(name)
# Generate safe db_name
db_name = _generate_db_name(name)
conn = get_master_conn()
cur = conn.cursor()
# Validate uniqueness before inserting
cur.execute("SELECT 1 FROM tenants WHERE db_name = %s LIMIT 1", (db_name,))
if cur.fetchone():
cur.close()
conn.close()
raise ValueError(f"A tenant with db_name '{db_name}' already exists.")
cur.execute("SELECT 1 FROM tenants WHERE subdomain = %s LIMIT 1", (subdomain,))
if cur.fetchone():
cur.close()
conn.close()
raise ValueError(f"A tenant with subdomain '{subdomain}' already exists.")
# Insert tenant
cur.execute("""
INSERT INTO tenants (name, db_name, rfc, subdomain)
VALUES (%s, %s, %s, %s)
RETURNING id, db_name
""", (name, db_name, rfc, subdomain))
tenant_id, db_name = cur.fetchone()
# Track schema version (will be updated after migrations)
cur.execute("""
INSERT INTO tenant_schema_version (tenant_id, version)
VALUES (%s, 'v1.0')
""", (tenant_id,))
conn.commit()
cur.close()
conn.close()
tenant_conn = None
try:
# Create DB from template
master_conn = psycopg2.connect(MASTER_DB_URL)
master_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
master_cur = master_conn.cursor()
master_cur.execute(
sql.SQL('CREATE DATABASE {} TEMPLATE {}').format(
sql.Identifier(db_name),
sql.Identifier(TENANT_TEMPLATE_DB)
)
)
master_cur.close()
master_conn.close()
# Apply pending migrations post-v1.0
from migrations.runner import MIGRATIONS, apply_migration
sorted_versions = sorted(MIGRATIONS.keys())
for version in sorted_versions:
if version <= 'v1.0':
continue
success = apply_migration(db_name, version)
if not success:
raise RuntimeError(f"Migration {version} failed for tenant {db_name}")
# Update version in master
mconn = get_master_conn()
mcur = mconn.cursor()
mcur.execute("""
INSERT INTO tenant_schema_version (tenant_id, version)
VALUES (%s, %s)
ON CONFLICT (tenant_id) DO UPDATE SET version = %s, updated_at = NOW()
""", (tenant_id, version, version))
mconn.commit()
mcur.close()
mconn.close()
# Create default branch and owner employee
tenant_conn = get_tenant_conn_by_dbname(db_name)
tenant_cur = tenant_conn.cursor()
tenant_cur.execute("""
INSERT INTO branches (name) VALUES ('Principal') RETURNING id
""")
branch_id = tenant_cur.fetchone()[0]
pin_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode()
pwd_hash = bcrypt.hashpw(owner_pin.encode(), bcrypt.gensalt()).decode()
tenant_cur.execute("""
INSERT INTO employees (name, email, pin, password_hash, role, branch_id, max_discount_pct, is_active)
VALUES (%s, %s, %s, %s, 'owner', %s, 100, true)
RETURNING id
""", (owner_name, owner_email, pin_hash, pwd_hash, branch_id))
owner_id = tenant_cur.fetchone()[0]
# Grant all permissions to owner (batch insert)
permissions = [
'pos.sell', 'pos.discount', 'pos.cancel', 'pos.view_cost',
'inventory.view', 'inventory.create', 'inventory.edit', 'inventory.adjust', 'inventory.transfer',
'catalog.view', 'catalog.edit',
'customers.view', 'customers.create', 'customers.edit', 'customers.edit_credit', 'customers.delete',
'accounting.view', 'accounting.create', 'accounting.close',
'invoicing.view', 'invoicing.create', 'invoicing.cancel',
'reports.view', 'reports.financial',
'config.view', 'config.edit', 'config.edit_prices'
]
tenant_cur.executemany(
"INSERT INTO employee_permissions (employee_id, permission) VALUES (%s, %s)",
[(owner_id, perm) for perm in permissions]
)
# Seed tenant_config with RFC and defaults
if rfc:
tenant_cur.execute("""
INSERT INTO tenant_config (key, value) VALUES
('tenant_rfc', %s),
('tenant_razon_social', %s),
('tenant_cp', '00000'),
('cfdi_regimen_fiscal', '601'),
('cfdi_serie', 'A')
ON CONFLICT (key) DO NOTHING
""", (rfc, name))
tenant_conn.commit()
tenant_cur.close()
tenant_conn.close()
return {'tenant_id': tenant_id, 'db_name': db_name, 'owner_id': owner_id, 'subdomain': subdomain}
except Exception as e:
# Rollback: drop tenant DB and remove from master
try:
drop_conn = psycopg2.connect(MASTER_DB_URL)
drop_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
drop_cur = drop_conn.cursor()
drop_cur.execute(
sql.SQL('DROP DATABASE IF EXISTS {}').format(sql.Identifier(db_name))
)
drop_cur.close()
drop_conn.close()
except Exception:
pass
try:
cleanup_conn = get_master_conn()
cleanup_cur = cleanup_conn.cursor()
cleanup_cur.execute("DELETE FROM tenant_schema_version WHERE tenant_id = %s", (tenant_id,))
cleanup_cur.execute("DELETE FROM tenants WHERE id = %s", (tenant_id,))
cleanup_conn.commit()
cleanup_cur.close()
cleanup_conn.close()
except Exception:
pass
raise RuntimeError(f"Failed to provision tenant: {e}")
def list_tenants():
"""List all tenants."""
conn = get_master_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, db_name, rfc, plan, is_active, created_at, subdomain FROM tenants ORDER BY id")
tenants = []
for row in cur.fetchall():
tenants.append({
'id': row[0], 'name': row[1], 'db_name': row[2],
'rfc': row[3], 'plan': row[4], 'is_active': row[5],
'created_at': str(row[6]), 'subdomain': row[7]
})
cur.close()
conn.close()
return tenants