- Normalize Facturapi key/org_id resolution (supports both cfdi_ prefixed tenant_config keys and short names used by invoicing_bp). - Add CSD upload end-to-end (backend + frontend). - Add helper scripts: setup_facturapi_orgs.py and check_facturapi_tenants.py. - Add 20 unit tests with mocks (pos/tests/test_facturapi_service.py). - Add CI workflow for lint + console tests on Python 3.11/3.13. - Add pyproject.toml and requirements-dev.txt with ruff/pytest config. - Update FASES_IMPLEMENTADAS.md with FASE 8 documentation. Tests: 81 passing (61 console + 20 Facturapi).
242 lines
7.5 KiB
Python
242 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Check Facturapi configuration status for all active tenants.
|
|
|
|
Usage:
|
|
export MASTER_DB_URL=postgresql://user:pass@host/nexus_autoparts
|
|
export TENANT_DB_URL_TEMPLATE="postgresql://user:pass@host/{db_name}"
|
|
export FACTURAPI_USER_KEY=sk_user_xxx # optional, for org auto-discovery
|
|
python3 scripts/check_facturapi_tenants.py
|
|
|
|
Output: table (default), --json, or --csv.
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
# Allow importing pos/services
|
|
POS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "pos")
|
|
sys.path.insert(0, POS_DIR)
|
|
|
|
import psycopg2 # noqa: E402
|
|
from services import facturapi_service # noqa: E402
|
|
|
|
|
|
def get_tenants(master_dsn: str):
|
|
conn = psycopg2.connect(master_dsn)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
SELECT t.id, t.db_name, t.name, t.subdomain, COALESCE(v.version, 'v0.0') AS version
|
|
FROM tenants t
|
|
LEFT JOIN tenant_schema_version v ON t.id = v.tenant_id
|
|
WHERE t.is_active = true
|
|
ORDER BY t.id
|
|
"""
|
|
)
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return rows
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_tenant_config(db_name: str, template_dsn: str) -> dict[str, str]:
|
|
dsn = template_dsn.format(db_name=db_name)
|
|
conn = psycopg2.connect(dsn)
|
|
try:
|
|
cur = conn.cursor()
|
|
|
|
# Business-level fiscal data
|
|
cur.execute(
|
|
"""
|
|
SELECT key, value FROM tenant_config
|
|
WHERE key IN (
|
|
'tenant_rfc', 'tenant_razon_social', 'tenant_cp',
|
|
'cfdi_regimen_fiscal', 'cfdi_serie',
|
|
'cfdi_facturapi_key', 'cfdi_facturapi_org_id'
|
|
)
|
|
"""
|
|
)
|
|
config = {row[0]: row[1] or "" for row in cur.fetchall()}
|
|
|
|
# Main branch fiscal data (used as fallback by _get_issuer_config)
|
|
cur.execute(
|
|
"""
|
|
SELECT rfc, razon_social, regimen_fiscal, codigo_postal, serie_cfdi
|
|
FROM branches WHERE is_main = true LIMIT 1
|
|
"""
|
|
)
|
|
branch = cur.fetchone()
|
|
if branch:
|
|
config["rfc"] = (branch[0] or config.get("tenant_rfc", "")).strip()
|
|
config["razon_social"] = (branch[1] or config.get("tenant_razon_social", "")).strip()
|
|
config["regimen_fiscal"] = (branch[2] or config.get("cfdi_regimen_fiscal", "")).strip()
|
|
config["cp"] = (branch[3] or config.get("tenant_cp", "")).strip()
|
|
config["serie"] = (branch[4] or config.get("cfdi_serie", "")).strip()
|
|
else:
|
|
config["rfc"] = config.get("tenant_rfc", "").strip()
|
|
config["razon_social"] = config.get("tenant_razon_social", "").strip()
|
|
config["regimen_fiscal"] = config.get("cfdi_regimen_fiscal", "").strip()
|
|
config["cp"] = config.get("tenant_cp", "").strip()
|
|
config["serie"] = config.get("cfdi_serie", "").strip()
|
|
|
|
cur.close()
|
|
return config
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def check_tenant(tenant_id: int, db_name: str, name: str, version: str, template_dsn: str) -> dict:
|
|
result = {
|
|
"tenant_id": tenant_id,
|
|
"db_name": db_name,
|
|
"name": name,
|
|
"schema_version": version,
|
|
"rfc": "",
|
|
"razon_social": "",
|
|
"has_key": False,
|
|
"has_org_id": False,
|
|
"has_csd": False,
|
|
"configured": False,
|
|
"pending_steps": [],
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
config = get_tenant_config(db_name, template_dsn)
|
|
result["rfc"] = config.get("rfc", "")
|
|
result["razon_social"] = config.get("razon_social", "")
|
|
|
|
status = facturapi_service.get_org_status(config)
|
|
result.update(
|
|
{
|
|
"has_key": status.get("has_key", False),
|
|
"has_org_id": status.get("has_org_id", False),
|
|
"has_csd": status.get("has_csd", False),
|
|
"configured": status.get("configured", False),
|
|
"pending_steps": status.get("pending_steps", []),
|
|
"error": status.get("error"),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
result["error"] = f"{type(e).__name__}: {str(e)[:200]}"
|
|
|
|
return result
|
|
|
|
|
|
def print_table(results: list[dict]):
|
|
headers = ["ID", "Tenant", "RFC", "Key", "Org", "CSD", "Status", "Error/Pending"]
|
|
rows = []
|
|
for r in results:
|
|
status = "OK" if r["configured"] and r["has_csd"] else "PENDING"
|
|
pending = (
|
|
"; ".join((s.get("description") or s.get("type") or str(s)) for s in r["pending_steps"])
|
|
if r["pending_steps"]
|
|
else ""
|
|
)
|
|
detail = (r["error"] or pending or "")[:60]
|
|
rows.append(
|
|
[
|
|
str(r["tenant_id"]),
|
|
r["name"][:28],
|
|
r["rfc"] or "-",
|
|
"Sí" if r["has_key"] else "No",
|
|
"Sí" if r["has_org_id"] else "No",
|
|
"Sí" if r["has_csd"] else "No",
|
|
status,
|
|
detail,
|
|
]
|
|
)
|
|
|
|
widths = [max(len(str(row[i])) for row in [headers] + rows) for i in range(len(headers))]
|
|
sep = "+-" + "-+-".join("-" * w for w in widths) + "-+"
|
|
|
|
def fmt(row):
|
|
return "| " + " | ".join(str(row[i]).ljust(widths[i]) for i in range(len(row))) + " |"
|
|
|
|
print(sep)
|
|
print(fmt(headers))
|
|
print(sep)
|
|
for row in rows:
|
|
print(fmt(row))
|
|
print(sep)
|
|
print(
|
|
f"\nTotal: {len(results)} tenants | Listos: {sum(1 for r in results if r['configured'] and r['has_csd'])} | Pendientes: {sum(1 for r in results if not (r['configured'] and r['has_csd']))}"
|
|
)
|
|
|
|
|
|
def print_json(results: list[dict]):
|
|
print(json.dumps(results, indent=2, default=str))
|
|
|
|
|
|
def print_csv(results: list[dict]):
|
|
writer = csv.DictWriter(
|
|
sys.stdout,
|
|
fieldnames=[
|
|
"tenant_id",
|
|
"name",
|
|
"db_name",
|
|
"schema_version",
|
|
"rfc",
|
|
"has_key",
|
|
"has_org_id",
|
|
"has_csd",
|
|
"configured",
|
|
"error",
|
|
],
|
|
)
|
|
writer.writeheader()
|
|
for r in results:
|
|
writer.writerow(
|
|
{
|
|
"tenant_id": r["tenant_id"],
|
|
"name": r["name"],
|
|
"db_name": r["db_name"],
|
|
"schema_version": r["schema_version"],
|
|
"rfc": r["rfc"],
|
|
"has_key": r["has_key"],
|
|
"has_org_id": r["has_org_id"],
|
|
"has_csd": r["has_csd"],
|
|
"configured": r["configured"],
|
|
"error": r["error"],
|
|
}
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Check Facturapi status for all tenants")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON")
|
|
parser.add_argument("--csv", action="store_true", help="Output CSV")
|
|
args = parser.parse_args()
|
|
|
|
master_dsn = os.environ.get("MASTER_DB_URL")
|
|
template_dsn = os.environ.get("TENANT_DB_URL_TEMPLATE")
|
|
|
|
if not master_dsn or not template_dsn:
|
|
print("Set MASTER_DB_URL and TENANT_DB_URL_TEMPLATE", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
tenants = get_tenants(master_dsn)
|
|
if not tenants:
|
|
print("No active tenants found.")
|
|
return
|
|
|
|
results = []
|
|
for tenant_id, db_name, name, _subdomain, version in tenants:
|
|
results.append(check_tenant(tenant_id, db_name, name, version, template_dsn))
|
|
|
|
if args.json:
|
|
print_json(results)
|
|
elif args.csv:
|
|
print_csv(results)
|
|
else:
|
|
print_table(results)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|