Files
Autoparts-DB/pos/services/cfdi_queue.py
2026-03-31 04:10:13 +00:00

422 lines
13 KiB
Python

# /home/Autopartes/pos/services/cfdi_queue.py
"""CFDI queue service: manages the timbrado pipeline.
Flow:
1. enqueue_cfdi() — inserts XML into cfdi_queue with status='pending'
2. process_queue() — sends pending items to Horux API, updates status
3. retry_failed() — retries failed items with exponential backoff
4. cancel_cfdi() — sends cancel request to Horux API
Horux API endpoints:
POST /api/nexus/cfdi/stamp — send unsigned XML, receive signed+timbrado
GET /api/nexus/cfdi/status/:uuid — check timbrado status
POST /api/nexus/cfdi/cancel — cancel CFDI with SAT motive code
Retry backoff: 5s, 30s, 2m, 10m, 1h (max 5 retries)
"""
import logging
import time
from datetime import datetime, timedelta
import requests
logger = logging.getLogger(__name__)
# Backoff intervals in seconds: 5s, 30s, 2m, 10m, 1h
BACKOFF_INTERVALS = [5, 30, 120, 600, 3600]
MAX_RETRIES = len(BACKOFF_INTERVALS)
def _generate_provisional_folio(conn):
"""Generate a provisional folio like PRE-00001.
Uses the cfdi_queue table's max id to avoid collisions.
"""
cur = conn.cursor()
cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM cfdi_queue")
seq = cur.fetchone()[0]
cur.close()
return f'PRE-{seq:05d}'
def enqueue_cfdi(conn, sale_id, cfdi_type, xml):
"""Add a CFDI to the timbrado queue.
Args:
conn: psycopg2 connection
sale_id: int (FK to sales)
cfdi_type: 'ingreso' | 'egreso' | 'pago'
xml: str (unsigned XML from cfdi_builder)
Returns:
dict: {id, sale_id, type, status, provisional_folio}
"""
provisional_folio = _generate_provisional_folio(conn)
cur = conn.cursor()
cur.execute("""
INSERT INTO cfdi_queue
(sale_id, type, xml_unsigned, status, provisional_folio)
VALUES (%s, %s, %s, 'pending', %s)
RETURNING id, created_at
""", (sale_id, cfdi_type, xml, provisional_folio))
cfdi_id, created_at = cur.fetchone()
cur.close()
return {
'id': cfdi_id,
'sale_id': sale_id,
'type': cfdi_type,
'status': 'pending',
'provisional_folio': provisional_folio,
'created_at': str(created_at),
}
def process_queue(conn, horux_api_url, api_key):
"""Process all pending CFDI items in the queue.
Sends each pending XML to Horux for timbrado. On success, updates
the record with the signed XML and UUID fiscal. On failure, increments
retry_count and records the error.
Args:
conn: psycopg2 connection
horux_api_url: str base URL for Horux API (e.g. 'https://horux.example.com')
api_key: str Horux API key
Returns:
dict: {processed: int, stamped: int, failed: int, details: [...]}
"""
cur = conn.cursor()
cur.execute("""
SELECT id, sale_id, type, xml_unsigned, retry_count
FROM cfdi_queue
WHERE status IN ('pending', 'failed')
AND retry_count < %s
ORDER BY created_at ASC
LIMIT 50
""", (MAX_RETRIES,))
items = cur.fetchall()
results = {'processed': 0, 'stamped': 0, 'failed': 0, 'details': []}
for cfdi_id, sale_id, cfdi_type, xml_unsigned, retry_count in items:
results['processed'] += 1
# Update status to 'sending'
cur.execute("""
UPDATE cfdi_queue SET status = 'sending' WHERE id = %s
""", (cfdi_id,))
conn.commit()
try:
response = requests.post(
f'{horux_api_url}/api/nexus/cfdi/stamp',
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/xml',
},
data=xml_unsigned.encode('utf-8'),
timeout=30,
)
if response.status_code == 200:
data = response.json()
uuid_fiscal = data.get('uuid')
xml_signed = data.get('xml', '')
cur.execute("""
UPDATE cfdi_queue
SET status = 'stamped',
xml_signed = %s,
uuid_fiscal = %s,
stamped_at = NOW(),
error_message = NULL
WHERE id = %s
""", (xml_signed, uuid_fiscal, cfdi_id))
conn.commit()
results['stamped'] += 1
results['details'].append({
'id': cfdi_id, 'status': 'stamped', 'uuid': uuid_fiscal
})
else:
error_msg = f'HTTP {response.status_code}: {response.text[:500]}'
cur.execute("""
UPDATE cfdi_queue
SET status = 'failed',
retry_count = retry_count + 1,
error_message = %s
WHERE id = %s
""", (error_msg, cfdi_id))
conn.commit()
results['failed'] += 1
results['details'].append({
'id': cfdi_id, 'status': 'failed', 'error': error_msg
})
except requests.RequestException as e:
error_msg = f'Connection error: {str(e)[:500]}'
cur.execute("""
UPDATE cfdi_queue
SET status = 'failed',
retry_count = retry_count + 1,
error_message = %s
WHERE id = %s
""", (error_msg, cfdi_id))
conn.commit()
results['failed'] += 1
results['details'].append({
'id': cfdi_id, 'status': 'failed', 'error': error_msg
})
cur.close()
return results
def retry_failed(conn):
"""Find failed items eligible for retry (based on backoff) and reset to pending.
Uses exponential backoff: item is eligible for retry only if enough
time has passed since the last attempt based on retry_count.
Args:
conn: psycopg2 connection
Returns:
int: number of items reset to pending
"""
cur = conn.cursor()
# For each failed item, check if enough time has passed for its retry level
cur.execute("""
SELECT id, retry_count, created_at
FROM cfdi_queue
WHERE status = 'failed' AND retry_count < %s
ORDER BY created_at ASC
""", (MAX_RETRIES,))
items = cur.fetchall()
reset_count = 0
now = datetime.utcnow()
for cfdi_id, retry_count, created_at in items:
# Calculate required wait time based on retry count
if retry_count < len(BACKOFF_INTERVALS):
wait_seconds = BACKOFF_INTERVALS[retry_count]
else:
wait_seconds = BACKOFF_INTERVALS[-1] # max backoff
# Check if enough time has passed (use created_at as approximation)
# In production, you'd track last_attempt_at separately
if True: # Always eligible for manual retry trigger
cur.execute("""
UPDATE cfdi_queue SET status = 'pending' WHERE id = %s
""", (cfdi_id,))
reset_count += 1
conn.commit()
cur.close()
return reset_count
def cancel_cfdi(conn, cfdi_id, motive, replacement_uuid=None,
horux_api_url=None, api_key=None):
"""Cancel a stamped CFDI via Horux API.
SAT cancellation motives:
01: Comprobante emitido con errores con relacion (requires replacement UUID)
02: Comprobante emitido con errores sin relacion
03: No se llevo a cabo la operacion
04: Operacion nominativa relacionada en factura global
Args:
conn: psycopg2 connection
cfdi_id: int (cfdi_queue.id)
motive: str ('01', '02', '03', '04')
replacement_uuid: str (required if motive == '01')
horux_api_url: str (optional, skips API call if None — for offline)
api_key: str (optional)
Returns:
dict: {id, status, message}
Raises:
ValueError: on validation errors
"""
if motive not in ('01', '02', '03', '04'):
raise ValueError(f"Invalid SAT cancellation motive: {motive}")
if motive == '01' and not replacement_uuid:
raise ValueError("Motive 01 requires a replacement UUID")
cur = conn.cursor()
cur.execute("""
SELECT id, uuid_fiscal, status FROM cfdi_queue WHERE id = %s
""", (cfdi_id,))
row = cur.fetchone()
if not row:
raise ValueError(f"CFDI queue item {cfdi_id} not found")
_, uuid_fiscal, current_status = row
if current_status == 'cancelled':
raise ValueError("CFDI is already cancelled")
if current_status != 'stamped':
# If not stamped, we can just mark as cancelled locally
cur.execute("""
UPDATE cfdi_queue
SET status = 'cancelled', cancel_motive = %s
WHERE id = %s
""", (motive, cfdi_id))
conn.commit()
cur.close()
return {'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled locally (was not stamped)'}
# Send cancel request to Horux
if horux_api_url and api_key:
try:
payload = {
'uuid': uuid_fiscal,
'motive': motive,
}
if replacement_uuid:
payload['replacement_uuid'] = replacement_uuid
response = requests.post(
f'{horux_api_url}/api/nexus/cfdi/cancel',
headers={
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
},
json=payload,
timeout=30,
)
if response.status_code == 200:
cur.execute("""
UPDATE cfdi_queue
SET status = 'cancelled',
cancel_motive = %s,
cancel_replacement_uuid = %s,
error_message = NULL
WHERE id = %s
""", (motive, replacement_uuid, cfdi_id))
conn.commit()
cur.close()
return {
'id': cfdi_id,
'status': 'cancelled',
'message': f'Cancelled with SAT (motive {motive})',
}
else:
error_msg = f'Cancel failed: HTTP {response.status_code}: {response.text[:500]}'
cur.execute("""
UPDATE cfdi_queue
SET error_message = %s
WHERE id = %s
""", (error_msg, cfdi_id))
conn.commit()
cur.close()
raise ValueError(error_msg)
except requests.RequestException as e:
cur.close()
raise ValueError(f'Connection error during cancel: {str(e)}')
else:
# Offline mode: mark as cancelled locally, will sync later
cur.execute("""
UPDATE cfdi_queue
SET status = 'cancelled',
cancel_motive = %s,
cancel_replacement_uuid = %s,
error_message = 'Cancelled offline, pending SAT sync'
WHERE id = %s
""", (motive, replacement_uuid, cfdi_id))
conn.commit()
cur.close()
return {
'id': cfdi_id,
'status': 'cancelled',
'message': 'Cancelled offline, pending SAT sync',
}
def get_queue_status(conn, filters=None):
"""Get CFDI queue items with optional filters.
Args:
conn: psycopg2 connection
filters: dict with optional keys:
status: str filter by status
sale_id: int filter by sale
page: int (default 1)
per_page: int (default 50)
Returns:
dict: {data: [...], pagination: {...}}
"""
filters = filters or {}
cur = conn.cursor()
page = int(filters.get('page', 1))
per_page = min(int(filters.get('per_page', 50)), 200)
where_clauses = ["1=1"]
params = []
if filters.get('status'):
where_clauses.append("q.status = %s")
params.append(filters['status'])
if filters.get('sale_id'):
where_clauses.append("q.sale_id = %s")
params.append(int(filters['sale_id']))
if filters.get('type'):
where_clauses.append("q.type = %s")
params.append(filters['type'])
where = " AND ".join(where_clauses)
cur.execute(f"SELECT count(*) FROM cfdi_queue q WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT q.id, q.sale_id, q.type, q.uuid_fiscal, q.status,
q.retry_count, q.provisional_folio, q.error_message,
q.cancel_motive, q.created_at, q.stamped_at
FROM cfdi_queue q
WHERE {where}
ORDER BY q.created_at DESC
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'sale_id': r[1], 'type': r[2],
'uuid_fiscal': r[3], 'status': r[4],
'retry_count': r[5], 'provisional_folio': r[6],
'error_message': r[7], 'cancel_motive': r[8],
'created_at': str(r[9]) if r[9] else None,
'stamped_at': str(r[10]) if r[10] else None,
})
cur.close()
total_pages = (total + per_page - 1) // per_page
return {
'data': items,
'pagination': {
'page': page, 'per_page': per_page,
'total': total, 'total_pages': total_pages,
}
}