Files
Autoparts-DB/scripts/check_facturapi_tenants.py
consultoria-as d67887284d feat(pos/facturapi): finalize Horux-to-Facturapi migration
- Normalize Facturapi key/org_id resolution (supports both cfdi_ prefixed
  tenant_config keys and short names used by invoicing_bp).
- Add CSD upload end-to-end (backend + frontend).
- Add helper scripts: setup_facturapi_orgs.py and check_facturapi_tenants.py.
- Add 20 unit tests with mocks (pos/tests/test_facturapi_service.py).
- Add CI workflow for lint + console tests on Python 3.11/3.13.
- Add pyproject.toml and requirements-dev.txt with ruff/pytest config.
- Update FASES_IMPLEMENTADAS.md with FASE 8 documentation.

Tests: 81 passing (61 console + 20 Facturapi).
2026-06-15 04:58:42 +00:00

242 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""Check Facturapi configuration status for all active tenants.
Usage:
export MASTER_DB_URL=postgresql://user:pass@host/nexus_autoparts
export TENANT_DB_URL_TEMPLATE="postgresql://user:pass@host/{db_name}"
export FACTURAPI_USER_KEY=sk_user_xxx # optional, for org auto-discovery
python3 scripts/check_facturapi_tenants.py
Output: table (default), --json, or --csv.
"""
import argparse
import csv
import json
import os
import sys
# Allow importing pos/services
POS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "pos")
sys.path.insert(0, POS_DIR)
import psycopg2 # noqa: E402
from services import facturapi_service # noqa: E402
def get_tenants(master_dsn: str):
conn = psycopg2.connect(master_dsn)
try:
cur = conn.cursor()
cur.execute(
"""
SELECT t.id, t.db_name, t.name, t.subdomain, COALESCE(v.version, 'v0.0') AS version
FROM tenants t
LEFT JOIN tenant_schema_version v ON t.id = v.tenant_id
WHERE t.is_active = true
ORDER BY t.id
"""
)
rows = cur.fetchall()
cur.close()
return rows
finally:
conn.close()
def get_tenant_config(db_name: str, template_dsn: str) -> dict[str, str]:
dsn = template_dsn.format(db_name=db_name)
conn = psycopg2.connect(dsn)
try:
cur = conn.cursor()
# Business-level fiscal data
cur.execute(
"""
SELECT key, value FROM tenant_config
WHERE key IN (
'tenant_rfc', 'tenant_razon_social', 'tenant_cp',
'cfdi_regimen_fiscal', 'cfdi_serie',
'cfdi_facturapi_key', 'cfdi_facturapi_org_id'
)
"""
)
config = {row[0]: row[1] or "" for row in cur.fetchall()}
# Main branch fiscal data (used as fallback by _get_issuer_config)
cur.execute(
"""
SELECT rfc, razon_social, regimen_fiscal, codigo_postal, serie_cfdi
FROM branches WHERE is_main = true LIMIT 1
"""
)
branch = cur.fetchone()
if branch:
config["rfc"] = (branch[0] or config.get("tenant_rfc", "")).strip()
config["razon_social"] = (branch[1] or config.get("tenant_razon_social", "")).strip()
config["regimen_fiscal"] = (branch[2] or config.get("cfdi_regimen_fiscal", "")).strip()
config["cp"] = (branch[3] or config.get("tenant_cp", "")).strip()
config["serie"] = (branch[4] or config.get("cfdi_serie", "")).strip()
else:
config["rfc"] = config.get("tenant_rfc", "").strip()
config["razon_social"] = config.get("tenant_razon_social", "").strip()
config["regimen_fiscal"] = config.get("cfdi_regimen_fiscal", "").strip()
config["cp"] = config.get("tenant_cp", "").strip()
config["serie"] = config.get("cfdi_serie", "").strip()
cur.close()
return config
finally:
conn.close()
def check_tenant(tenant_id: int, db_name: str, name: str, version: str, template_dsn: str) -> dict:
result = {
"tenant_id": tenant_id,
"db_name": db_name,
"name": name,
"schema_version": version,
"rfc": "",
"razon_social": "",
"has_key": False,
"has_org_id": False,
"has_csd": False,
"configured": False,
"pending_steps": [],
"error": None,
}
try:
config = get_tenant_config(db_name, template_dsn)
result["rfc"] = config.get("rfc", "")
result["razon_social"] = config.get("razon_social", "")
status = facturapi_service.get_org_status(config)
result.update(
{
"has_key": status.get("has_key", False),
"has_org_id": status.get("has_org_id", False),
"has_csd": status.get("has_csd", False),
"configured": status.get("configured", False),
"pending_steps": status.get("pending_steps", []),
"error": status.get("error"),
}
)
except Exception as e:
result["error"] = f"{type(e).__name__}: {str(e)[:200]}"
return result
def print_table(results: list[dict]):
headers = ["ID", "Tenant", "RFC", "Key", "Org", "CSD", "Status", "Error/Pending"]
rows = []
for r in results:
status = "OK" if r["configured"] and r["has_csd"] else "PENDING"
pending = (
"; ".join((s.get("description") or s.get("type") or str(s)) for s in r["pending_steps"])
if r["pending_steps"]
else ""
)
detail = (r["error"] or pending or "")[:60]
rows.append(
[
str(r["tenant_id"]),
r["name"][:28],
r["rfc"] or "-",
"" if r["has_key"] else "No",
"" if r["has_org_id"] else "No",
"" if r["has_csd"] else "No",
status,
detail,
]
)
widths = [max(len(str(row[i])) for row in [headers] + rows) for i in range(len(headers))]
sep = "+-" + "-+-".join("-" * w for w in widths) + "-+"
def fmt(row):
return "| " + " | ".join(str(row[i]).ljust(widths[i]) for i in range(len(row))) + " |"
print(sep)
print(fmt(headers))
print(sep)
for row in rows:
print(fmt(row))
print(sep)
print(
f"\nTotal: {len(results)} tenants | Listos: {sum(1 for r in results if r['configured'] and r['has_csd'])} | Pendientes: {sum(1 for r in results if not (r['configured'] and r['has_csd']))}"
)
def print_json(results: list[dict]):
print(json.dumps(results, indent=2, default=str))
def print_csv(results: list[dict]):
writer = csv.DictWriter(
sys.stdout,
fieldnames=[
"tenant_id",
"name",
"db_name",
"schema_version",
"rfc",
"has_key",
"has_org_id",
"has_csd",
"configured",
"error",
],
)
writer.writeheader()
for r in results:
writer.writerow(
{
"tenant_id": r["tenant_id"],
"name": r["name"],
"db_name": r["db_name"],
"schema_version": r["schema_version"],
"rfc": r["rfc"],
"has_key": r["has_key"],
"has_org_id": r["has_org_id"],
"has_csd": r["has_csd"],
"configured": r["configured"],
"error": r["error"],
}
)
def main():
parser = argparse.ArgumentParser(description="Check Facturapi status for all tenants")
parser.add_argument("--json", action="store_true", help="Output JSON")
parser.add_argument("--csv", action="store_true", help="Output CSV")
args = parser.parse_args()
master_dsn = os.environ.get("MASTER_DB_URL")
template_dsn = os.environ.get("TENANT_DB_URL_TEMPLATE")
if not master_dsn or not template_dsn:
print("Set MASTER_DB_URL and TENANT_DB_URL_TEMPLATE", file=sys.stderr)
sys.exit(1)
tenants = get_tenants(master_dsn)
if not tenants:
print("No active tenants found.")
return
results = []
for tenant_id, db_name, name, _subdomain, version in tenants:
results.append(check_tenant(tenant_id, db_name, name, version, template_dsn))
if args.json:
print_json(results)
elif args.csv:
print_csv(results)
else:
print_table(results)
if __name__ == "__main__":
main()