Files
Autoparts-DB/pos/services/cfdi_queue.py
consultoria-as 8796cadb56 feat(pos): migrate CFDI timbrado from Horux to Facturapi
- Add Facturapi REST service (invoices, customers, orgs, cancel, downloads)
- Add JSON payload builder for ingreso/egreso/pago/global invoices
- Replace XML queue with Facturapi JSON queue (payload_unsigned, external_id)
- Update invoicing blueprint with Facturapi config and download endpoints
- Update global invoice service to use Facturapi payloads
- Add migration v4.3_facturapi.sql and tenant rollout script
- Update invoicing UI: payload preview, PDF/XML downloads, PAC status panel
- Add FACTURAPI_USER_KEY to .env.example
2026-06-14 09:26:42 +00:00

373 lines
12 KiB
Python

# /home/Autopartes/pos/services/cfdi_queue.py
"""CFDI queue service: manages the Facturapi timbrado pipeline.
Flow:
1. enqueue_cfdi() — inserts Facturapi JSON payload into cfdi_queue with status='pending'
2. process_queue() — sends pending items to Facturapi, updates status
3. retry_failed() — retries failed items with exponential backoff
4. cancel_cfdi() — cancels a stamped CFDI via Facturapi
Facturapi endpoints used:
POST /v2/invoices — create and stamp an invoice
GET /v2/invoices/:id — fetch invoice metadata
DELETE /v2/invoices/:id — cancel with SAT motive
Retry backoff: 5s, 30s, 2m, 10m, 1h (max 5 retries)
"""
import json
import logging
from datetime import datetime, timedelta
from services import facturapi_service
logger = logging.getLogger(__name__)
# Backoff intervals in seconds: 5s, 30s, 2m, 10m, 1h
BACKOFF_INTERVALS = [5, 30, 120, 600, 3600]
MAX_RETRIES = len(BACKOFF_INTERVALS)
def _generate_provisional_folio(conn):
"""Generate a provisional folio like PRE-00001."""
cur = conn.cursor()
cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM cfdi_queue")
seq = cur.fetchone()[0]
cur.close()
return f'PRE-{seq:05d}'
def enqueue_cfdi(conn, sale_id, cfdi_type, payload):
"""Add a CFDI to the timbrado queue.
Args:
conn: psycopg2 connection
sale_id: int (FK to sales), may be None for global invoices
cfdi_type: 'ingreso' | 'egreso' | 'pago'
payload: dict (Facturapi JSON payload) or str (JSON string)
Returns:
dict: {id, sale_id, type, status, provisional_folio}
"""
provisional_folio = _generate_provisional_folio(conn)
cur = conn.cursor()
payload_json = payload if isinstance(payload, str) else json.dumps(payload)
cur.execute("""
INSERT INTO cfdi_queue
(sale_id, type, payload_unsigned, status, provisional_folio)
VALUES (%s, %s, %s, 'pending', %s)
RETURNING id, created_at
""", (sale_id, cfdi_type, payload_json, provisional_folio))
cfdi_id, created_at = cur.fetchone()
cur.close()
return {
'id': cfdi_id,
'sale_id': sale_id,
'type': cfdi_type,
'status': 'pending',
'provisional_folio': provisional_folio,
'created_at': str(created_at),
}
def process_queue(conn, tenant_config, dry_run=False):
"""Process all pending CFDI items in the queue.
Sends each pending payload to Facturapi for timbrado. On success, updates
the record with the signed XML and UUID fiscal. On failure, increments
retry_count and records the error.
Args:
conn: psycopg2 connection
tenant_config: dict with facturapi_key (and optional facturapi_org_id)
dry_run: if True, validates payload without stamping
Returns:
dict: {processed: int, stamped: int, failed: int, details: [...]}
"""
cur = conn.cursor()
cur.execute("""
SELECT id, sale_id, type, payload_unsigned, retry_count
FROM cfdi_queue
WHERE status IN ('pending', 'failed')
AND retry_count < %s
ORDER BY created_at ASC
LIMIT 50
""", (MAX_RETRIES,))
items = cur.fetchall()
results = {'processed': 0, 'stamped': 0, 'failed': 0, 'details': []}
api_key = tenant_config.get('facturapi_key')
if not api_key:
cur.close()
raise ValueError("Facturapi key not configured for tenant")
for cfdi_id, sale_id, cfdi_type, payload_unsigned, retry_count in items:
results['processed'] += 1
# Update status to 'sending'
cur.execute("""
UPDATE cfdi_queue SET status = 'sending' WHERE id = %s
""", (cfdi_id,))
conn.commit()
try:
payload = json.loads(payload_unsigned or '{}')
if not payload:
raise ValueError("Empty payload in queue item")
if dry_run:
# TODO: Facturapi dry-run validation (not officially supported)
# For now we just skip the API call and mark as stamped with a fake UUID
raise ValueError("dry_run is not supported with Facturapi")
invoice = facturapi_service.create_invoice(tenant_config, payload)
invoice_id = invoice.get('id')
uuid_fiscal = invoice.get('uuid')
# Download signed XML for storage
try:
xml_signed = facturapi_service.download_xml(tenant_config, invoice_id)
xml_signed_str = xml_signed.decode('utf-8') if isinstance(xml_signed, bytes) else str(xml_signed)
except Exception as xml_err:
logger.warning("Could not download signed XML for %s: %s", invoice_id, xml_err)
xml_signed_str = ''
cur.execute("""
UPDATE cfdi_queue
SET status = 'stamped',
xml_signed = %s,
uuid_fiscal = %s,
external_id = %s,
stamped_at = NOW(),
error_message = NULL
WHERE id = %s
""", (xml_signed_str, uuid_fiscal, invoice_id, cfdi_id))
conn.commit()
results['stamped'] += 1
results['details'].append({
'id': cfdi_id, 'status': 'stamped',
'uuid': uuid_fiscal, 'external_id': invoice_id,
})
except Exception as e:
error_msg = f'{type(e).__name__}: {str(e)[:500]}'
cur.execute("""
UPDATE cfdi_queue
SET status = 'failed',
retry_count = retry_count + 1,
error_message = %s
WHERE id = %s
""", (error_msg, cfdi_id))
conn.commit()
results['failed'] += 1
results['details'].append({
'id': cfdi_id, 'status': 'failed', 'error': error_msg
})
cur.close()
return results
def retry_failed(conn):
"""Find failed items eligible for retry and reset to pending.
Uses exponential backoff: item is eligible for retry only if enough
time has passed since the last attempt based on retry_count.
"""
cur = conn.cursor()
cur.execute("""
SELECT id, retry_count, created_at
FROM cfdi_queue
WHERE status = 'failed' AND retry_count < %s
ORDER BY created_at ASC
""", (MAX_RETRIES,))
items = cur.fetchall()
reset_count = 0
now = datetime.utcnow()
for cfdi_id, retry_count, created_at in items:
if retry_count < len(BACKOFF_INTERVALS):
wait_seconds = BACKOFF_INTERVALS[retry_count]
else:
wait_seconds = BACKOFF_INTERVALS[-1]
# Use created_at as approximation for last attempt.
# In production, track last_attempt_at separately.
elapsed = (now - created_at).total_seconds()
if elapsed >= wait_seconds:
cur.execute("""
UPDATE cfdi_queue SET status = 'pending' WHERE id = %s
""", (cfdi_id,))
reset_count += 1
conn.commit()
cur.close()
return reset_count
def cancel_cfdi(conn, cfdi_id, motive, replacement_uuid=None,
tenant_config=None):
"""Cancel a stamped CFDI via Facturapi.
SAT cancellation motives:
01: Comprobante emitido con errores con relacion (requires replacement UUID)
02: Comprobante emitido con errores sin relacion
03: No se llevo a cabo la operacion
04: Operacion nominativa relacionada en factura global
Args:
conn: psycopg2 connection
cfdi_id: int (cfdi_queue.id)
motive: str ('01', '02', '03', '04')
replacement_uuid: str (required if motive == '01')
tenant_config: dict with facturapi_key
Returns:
dict: {id, status, message}
Raises:
ValueError: on validation errors
"""
if motive not in ('01', '02', '03', '04'):
raise ValueError(f"Invalid SAT cancellation motive: {motive}")
if motive == '01' and not replacement_uuid:
raise ValueError("Motive 01 requires a replacement UUID")
cur = conn.cursor()
cur.execute("""
SELECT id, uuid_fiscal, external_id, status FROM cfdi_queue WHERE id = %s
""", (cfdi_id,))
row = cur.fetchone()
if not row:
raise ValueError(f"CFDI queue item {cfdi_id} not found")
_, uuid_fiscal, external_id, current_status = row
if current_status == 'cancelled':
raise ValueError("CFDI is already cancelled")
if current_status != 'stamped':
# If not stamped, we can just mark as cancelled locally
cur.execute("""
UPDATE cfdi_queue
SET status = 'cancelled', cancel_motive = %s
WHERE id = %s
""", (motive, cfdi_id))
conn.commit()
cur.close()
return {'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled locally (was not stamped)'}
if not tenant_config or not tenant_config.get('facturapi_key'):
cur.close()
raise ValueError("Facturapi key not configured for tenant")
if not external_id:
cur.close()
raise ValueError("Cannot cancel: no Facturapi invoice id stored")
try:
facturapi_service.cancel_invoice(
tenant_config, external_id, motive,
replacement_uuid=replacement_uuid,
)
cur.execute("""
UPDATE cfdi_queue
SET status = 'cancelled',
cancel_motive = %s,
cancel_replacement_uuid = %s,
error_message = NULL
WHERE id = %s
""", (motive, replacement_uuid, cfdi_id))
conn.commit()
cur.close()
return {
'id': cfdi_id,
'status': 'cancelled',
'message': f'Cancelled with SAT (motive {motive})',
}
except Exception as e:
error_msg = f'Cancel failed: {str(e)[:500]}'
cur.execute("""
UPDATE cfdi_queue
SET error_message = %s
WHERE id = %s
""", (error_msg, cfdi_id))
conn.commit()
cur.close()
raise ValueError(error_msg)
def get_queue_status(conn, filters=None):
"""Get CFDI queue items with optional filters."""
filters = filters or {}
cur = conn.cursor()
page = int(filters.get('page', 1))
per_page = min(int(filters.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
if filters.get('status'):
where_clauses.append("q.status = %s")
params.append(filters['status'])
if filters.get('sale_id'):
where_clauses.append("q.sale_id = %s")
params.append(int(filters['sale_id']))
if filters.get('type'):
where_clauses.append("q.type = %s")
params.append(filters['type'])
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM cfdi_queue q WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT q.id, q.sale_id, q.type, q.uuid_fiscal, q.status,
q.retry_count, q.provisional_folio, q.error_message,
q.cancel_motive, q.created_at, q.stamped_at, q.external_id
FROM cfdi_queue q
WHERE {where}
ORDER BY q.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'sale_id': r[1], 'type': r[2],
'uuid_fiscal': r[3], 'status': r[4],
'retry_count': r[5], 'provisional_folio': r[6],
'error_message': r[7], 'cancel_motive': r[8],
'created_at': str(r[9]) if r[9] else None,
'stamped_at': str(r[10]) if r[10] else None,
'external_id': r[11],
})
cur.close()
total_pages = (total + per_page - 1) // per_page
return {
'data': items,
'pagination': {
'page': page, 'per_page': per_page,
'total': total, 'total_pages': total_pages,
}
}