Files
Autoparts-DB/manager/services/migration_service.py
consultoria-as be4bb8d9ad feat(manager): add Nexus Instance Manager for demo orchestration
- Complete Flask-based control panel for multi-tenant POS instances
- Dashboard with global stats, system health, and recent demos
- Demo provisioning in 1 click with auto-expiration tracking
- Tenant management: activate/deactivate, reset data, delete
- Health monitoring: PostgreSQL, Redis, disk, memory, systemd services
- Migration orchestration UI for running schema updates across all tenants
- JWT authentication with manager_users table
- Dark theme SPA frontend with real-time search and actions
- systemd service file included
2026-05-17 21:01:01 +00:00

101 lines
3.2 KiB
Python

"""Migration orchestration service."""
import os
import sys
POS_DIR = os.environ.get("POS_DIR", "/home/Autopartes/pos")
if POS_DIR not in sys.path:
sys.path.insert(0, POS_DIR)
from tenant_db import get_master_conn
from config import MIGRATIONS_DIR
def list_available_migrations():
"""List migrations found in POS migrations directory."""
migrations = []
if os.path.isdir(MIGRATIONS_DIR):
for fname in sorted(os.listdir(MIGRATIONS_DIR)):
if fname.endswith(".sql") and fname.startswith("v"):
version = fname.replace(".sql", "")
migrations.append({"version": version, "file": fname})
return migrations
def get_tenant_versions():
"""Get schema version for every tenant."""
conn = get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT t.id, t.name, t.db_name, COALESCE(v.version, 'v0.0') as version
FROM tenants t
LEFT JOIN tenant_schema_version v ON v.tenant_id = t.id
WHERE t.is_active = true
ORDER BY t.id
""")
results = []
for row in cur.fetchall():
results.append({
"tenant_id": row[0], "name": row[1], "db_name": row[2], "version": row[3]
})
cur.close()
conn.close()
return results
def run_migration_on_tenant(db_name, version):
"""Apply a single migration file to a tenant DB."""
from migrations.runner import apply_migration
return apply_migration(db_name, version)
def run_all_pending_migrations():
"""Run all pending migrations on all active tenants (wrapper around POS runner)."""
from migrations.runner import run_migrations
import io
import contextlib
# Capture stdout to return as log
f = io.StringIO()
with contextlib.redirect_stdout(f):
run_migrations()
return {"log": f.getvalue()}
def run_migration_on_all_tenants(version):
"""Apply one specific migration version to all tenants that don't have it."""
from migrations.runner import MIGRATIONS, apply_migration
conn = get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT t.id, t.db_name, COALESCE(v.version, 'v0.0') as version
FROM tenants t
LEFT JOIN tenant_schema_version v ON v.tenant_id = t.id
WHERE t.is_active = true
""")
tenants = cur.fetchall()
cur.close()
conn.close()
results = []
for tenant_id, db_name, current_version in tenants:
if current_version >= version:
results.append({"tenant_id": tenant_id, "db_name": db_name, "skipped": True, "reason": "already at or past version"})
continue
success = apply_migration(db_name, version)
if success:
# Update version tracker
conn2 = get_master_conn()
cur2 = conn2.cursor()
cur2.execute("""
INSERT INTO tenant_schema_version (tenant_id, version)
VALUES (%s, %s)
ON CONFLICT (tenant_id) DO UPDATE SET version = %s, updated_at = NOW()
""", (tenant_id, version, version))
conn2.commit()
cur2.close()
conn2.close()
results.append({"tenant_id": tenant_id, "db_name": db_name, "success": success})
return results