#!/usr/bin/env python3 """Check Facturapi configuration status for all active tenants. Usage: export MASTER_DB_URL=postgresql://user:pass@host/nexus_autoparts export TENANT_DB_URL_TEMPLATE="postgresql://user:pass@host/{db_name}" export FACTURAPI_USER_KEY=sk_user_xxx # optional, for org auto-discovery python3 scripts/check_facturapi_tenants.py Output: table (default), --json, or --csv. """ import argparse import csv import json import os import sys # Allow importing pos/services POS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "pos") sys.path.insert(0, POS_DIR) import psycopg2 # noqa: E402 from services import facturapi_service # noqa: E402 def get_tenants(master_dsn: str): conn = psycopg2.connect(master_dsn) try: cur = conn.cursor() cur.execute( """ SELECT t.id, t.db_name, t.name, t.subdomain, COALESCE(v.version, 'v0.0') AS version FROM tenants t LEFT JOIN tenant_schema_version v ON t.id = v.tenant_id WHERE t.is_active = true ORDER BY t.id """ ) rows = cur.fetchall() cur.close() return rows finally: conn.close() def get_tenant_config(db_name: str, template_dsn: str) -> dict[str, str]: dsn = template_dsn.format(db_name=db_name) conn = psycopg2.connect(dsn) try: cur = conn.cursor() # Business-level fiscal data cur.execute( """ SELECT key, value FROM tenant_config WHERE key IN ( 'tenant_rfc', 'tenant_razon_social', 'tenant_cp', 'cfdi_regimen_fiscal', 'cfdi_serie', 'cfdi_facturapi_key', 'cfdi_facturapi_org_id' ) """ ) config = {row[0]: row[1] or "" for row in cur.fetchall()} # Main branch fiscal data (used as fallback by _get_issuer_config) cur.execute( """ SELECT rfc, razon_social, regimen_fiscal, codigo_postal, serie_cfdi FROM branches WHERE is_main = true LIMIT 1 """ ) branch = cur.fetchone() if branch: config["rfc"] = (branch[0] or config.get("tenant_rfc", "")).strip() config["razon_social"] = (branch[1] or config.get("tenant_razon_social", "")).strip() config["regimen_fiscal"] = (branch[2] or config.get("cfdi_regimen_fiscal", "")).strip() config["cp"] = (branch[3] or config.get("tenant_cp", "")).strip() config["serie"] = (branch[4] or config.get("cfdi_serie", "")).strip() else: config["rfc"] = config.get("tenant_rfc", "").strip() config["razon_social"] = config.get("tenant_razon_social", "").strip() config["regimen_fiscal"] = config.get("cfdi_regimen_fiscal", "").strip() config["cp"] = config.get("tenant_cp", "").strip() config["serie"] = config.get("cfdi_serie", "").strip() cur.close() return config finally: conn.close() def check_tenant(tenant_id: int, db_name: str, name: str, version: str, template_dsn: str) -> dict: result = { "tenant_id": tenant_id, "db_name": db_name, "name": name, "schema_version": version, "rfc": "", "razon_social": "", "has_key": False, "has_org_id": False, "has_csd": False, "configured": False, "pending_steps": [], "error": None, } try: config = get_tenant_config(db_name, template_dsn) result["rfc"] = config.get("rfc", "") result["razon_social"] = config.get("razon_social", "") status = facturapi_service.get_org_status(config) result.update( { "has_key": status.get("has_key", False), "has_org_id": status.get("has_org_id", False), "has_csd": status.get("has_csd", False), "configured": status.get("configured", False), "pending_steps": status.get("pending_steps", []), "error": status.get("error"), } ) except Exception as e: result["error"] = f"{type(e).__name__}: {str(e)[:200]}" return result def print_table(results: list[dict]): headers = ["ID", "Tenant", "RFC", "Key", "Org", "CSD", "Status", "Error/Pending"] rows = [] for r in results: status = "OK" if r["configured"] and r["has_csd"] else "PENDING" pending = ( "; ".join((s.get("description") or s.get("type") or str(s)) for s in r["pending_steps"]) if r["pending_steps"] else "" ) detail = (r["error"] or pending or "")[:60] rows.append( [ str(r["tenant_id"]), r["name"][:28], r["rfc"] or "-", "Sí" if r["has_key"] else "No", "Sí" if r["has_org_id"] else "No", "Sí" if r["has_csd"] else "No", status, detail, ] ) widths = [max(len(str(row[i])) for row in [headers] + rows) for i in range(len(headers))] sep = "+-" + "-+-".join("-" * w for w in widths) + "-+" def fmt(row): return "| " + " | ".join(str(row[i]).ljust(widths[i]) for i in range(len(row))) + " |" print(sep) print(fmt(headers)) print(sep) for row in rows: print(fmt(row)) print(sep) print( f"\nTotal: {len(results)} tenants | Listos: {sum(1 for r in results if r['configured'] and r['has_csd'])} | Pendientes: {sum(1 for r in results if not (r['configured'] and r['has_csd']))}" ) def print_json(results: list[dict]): print(json.dumps(results, indent=2, default=str)) def print_csv(results: list[dict]): writer = csv.DictWriter( sys.stdout, fieldnames=[ "tenant_id", "name", "db_name", "schema_version", "rfc", "has_key", "has_org_id", "has_csd", "configured", "error", ], ) writer.writeheader() for r in results: writer.writerow( { "tenant_id": r["tenant_id"], "name": r["name"], "db_name": r["db_name"], "schema_version": r["schema_version"], "rfc": r["rfc"], "has_key": r["has_key"], "has_org_id": r["has_org_id"], "has_csd": r["has_csd"], "configured": r["configured"], "error": r["error"], } ) def main(): parser = argparse.ArgumentParser(description="Check Facturapi status for all tenants") parser.add_argument("--json", action="store_true", help="Output JSON") parser.add_argument("--csv", action="store_true", help="Output CSV") args = parser.parse_args() master_dsn = os.environ.get("MASTER_DB_URL") template_dsn = os.environ.get("TENANT_DB_URL_TEMPLATE") if not master_dsn or not template_dsn: print("Set MASTER_DB_URL and TENANT_DB_URL_TEMPLATE", file=sys.stderr) sys.exit(1) tenants = get_tenants(master_dsn) if not tenants: print("No active tenants found.") return results = [] for tenant_id, db_name, name, _subdomain, version in tenants: results.append(check_tenant(tenant_id, db_name, name, version, template_dsn)) if args.json: print_json(results) elif args.csv: print_csv(results) else: print_table(results) if __name__ == "__main__": main()