"""Migration orchestration service.""" import os import sys POS_DIR = os.environ.get("POS_DIR", "/home/Autopartes/pos") if POS_DIR not in sys.path: sys.path.insert(0, POS_DIR) from tenant_db import get_master_conn from config import MIGRATIONS_DIR def list_available_migrations(): """List migrations found in POS migrations directory.""" migrations = [] if os.path.isdir(MIGRATIONS_DIR): for fname in sorted(os.listdir(MIGRATIONS_DIR)): if fname.endswith(".sql") and fname.startswith("v"): version = fname.replace(".sql", "") migrations.append({"version": version, "file": fname}) return migrations def get_tenant_versions(): """Get schema version for every tenant.""" conn = get_master_conn() cur = conn.cursor() cur.execute(""" SELECT t.id, t.name, t.db_name, COALESCE(v.version, 'v0.0') as version FROM tenants t LEFT JOIN tenant_schema_version v ON v.tenant_id = t.id WHERE t.is_active = true ORDER BY t.id """) results = [] for row in cur.fetchall(): results.append({ "tenant_id": row[0], "name": row[1], "db_name": row[2], "version": row[3] }) cur.close() conn.close() return results def run_migration_on_tenant(db_name, version): """Apply a single migration file to a tenant DB.""" from migrations.runner import apply_migration return apply_migration(db_name, version) def run_all_pending_migrations(): """Run all pending migrations on all active tenants (wrapper around POS runner).""" from migrations.runner import run_migrations import io import contextlib # Capture stdout to return as log f = io.StringIO() with contextlib.redirect_stdout(f): run_migrations() return {"log": f.getvalue()} def run_migration_on_all_tenants(version): """Apply one specific migration version to all tenants that don't have it.""" from migrations.runner import MIGRATIONS, apply_migration conn = get_master_conn() cur = conn.cursor() cur.execute(""" SELECT t.id, t.db_name, COALESCE(v.version, 'v0.0') as version FROM tenants t LEFT JOIN tenant_schema_version v ON v.tenant_id = t.id WHERE t.is_active = true """) tenants = cur.fetchall() cur.close() conn.close() results = [] for tenant_id, db_name, current_version in tenants: if current_version >= version: results.append({"tenant_id": tenant_id, "db_name": db_name, "skipped": True, "reason": "already at or past version"}) continue success = apply_migration(db_name, version) if success: # Update version tracker conn2 = get_master_conn() cur2 = conn2.cursor() cur2.execute(""" INSERT INTO tenant_schema_version (tenant_id, version) VALUES (%s, %s) ON CONFLICT (tenant_id) DO UPDATE SET version = %s, updated_at = NOW() """, (tenant_id, version, version)) conn2.commit() cur2.close() conn2.close() results.append({"tenant_id": tenant_id, "db_name": db_name, "success": success}) return results