# /home/Autopartes/pos/services/cfdi_queue.py """CFDI queue service: manages the timbrado pipeline. Flow: 1. enqueue_cfdi() — inserts XML into cfdi_queue with status='pending' 2. process_queue() — sends pending items to Horux API, updates status 3. retry_failed() — retries failed items with exponential backoff 4. cancel_cfdi() — sends cancel request to Horux API Horux API endpoints: POST /api/nexus/cfdi/stamp — send unsigned XML, receive signed+timbrado GET /api/nexus/cfdi/status/:uuid — check timbrado status POST /api/nexus/cfdi/cancel — cancel CFDI with SAT motive code Retry backoff: 5s, 30s, 2m, 10m, 1h (max 5 retries) """ import logging import time from datetime import datetime, timedelta import requests logger = logging.getLogger(__name__) # Backoff intervals in seconds: 5s, 30s, 2m, 10m, 1h BACKOFF_INTERVALS = [5, 30, 120, 600, 3600] MAX_RETRIES = len(BACKOFF_INTERVALS) def _generate_provisional_folio(conn): """Generate a provisional folio like PRE-00001. Uses the cfdi_queue table's max id to avoid collisions. """ cur = conn.cursor() cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM cfdi_queue") seq = cur.fetchone()[0] cur.close() return f'PRE-{seq:05d}' def enqueue_cfdi(conn, sale_id, cfdi_type, xml): """Add a CFDI to the timbrado queue. Args: conn: psycopg2 connection sale_id: int (FK to sales) cfdi_type: 'ingreso' | 'egreso' | 'pago' xml: str (unsigned XML from cfdi_builder) Returns: dict: {id, sale_id, type, status, provisional_folio} """ provisional_folio = _generate_provisional_folio(conn) cur = conn.cursor() cur.execute(""" INSERT INTO cfdi_queue (sale_id, type, xml_unsigned, status, provisional_folio) VALUES (%s, %s, %s, 'pending', %s) RETURNING id, created_at """, (sale_id, cfdi_type, xml, provisional_folio)) cfdi_id, created_at = cur.fetchone() cur.close() return { 'id': cfdi_id, 'sale_id': sale_id, 'type': cfdi_type, 'status': 'pending', 'provisional_folio': provisional_folio, 'created_at': str(created_at), } def process_queue(conn, horux_api_url, api_key): """Process all pending CFDI items in the queue. Sends each pending XML to Horux for timbrado. On success, updates the record with the signed XML and UUID fiscal. On failure, increments retry_count and records the error. Args: conn: psycopg2 connection horux_api_url: str base URL for Horux API (e.g. 'https://horux.example.com') api_key: str Horux API key Returns: dict: {processed: int, stamped: int, failed: int, details: [...]} """ cur = conn.cursor() cur.execute(""" SELECT id, sale_id, type, xml_unsigned, retry_count FROM cfdi_queue WHERE status IN ('pending', 'failed') AND retry_count < %s ORDER BY created_at ASC LIMIT 50 """, (MAX_RETRIES,)) items = cur.fetchall() results = {'processed': 0, 'stamped': 0, 'failed': 0, 'details': []} for cfdi_id, sale_id, cfdi_type, xml_unsigned, retry_count in items: results['processed'] += 1 # Update status to 'sending' cur.execute(""" UPDATE cfdi_queue SET status = 'sending' WHERE id = %s """, (cfdi_id,)) conn.commit() try: response = requests.post( f'{horux_api_url}/api/nexus/cfdi/stamp', headers={ 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/xml', }, data=xml_unsigned.encode('utf-8'), timeout=30, ) if response.status_code == 200: data = response.json() uuid_fiscal = data.get('uuid') xml_signed = data.get('xml', '') cur.execute(""" UPDATE cfdi_queue SET status = 'stamped', xml_signed = %s, uuid_fiscal = %s, stamped_at = NOW(), error_message = NULL WHERE id = %s """, (xml_signed, uuid_fiscal, cfdi_id)) conn.commit() results['stamped'] += 1 results['details'].append({ 'id': cfdi_id, 'status': 'stamped', 'uuid': uuid_fiscal }) else: error_msg = f'HTTP {response.status_code}: {response.text[:500]}' cur.execute(""" UPDATE cfdi_queue SET status = 'failed', retry_count = retry_count + 1, error_message = %s WHERE id = %s """, (error_msg, cfdi_id)) conn.commit() results['failed'] += 1 results['details'].append({ 'id': cfdi_id, 'status': 'failed', 'error': error_msg }) except requests.RequestException as e: error_msg = f'Connection error: {str(e)[:500]}' cur.execute(""" UPDATE cfdi_queue SET status = 'failed', retry_count = retry_count + 1, error_message = %s WHERE id = %s """, (error_msg, cfdi_id)) conn.commit() results['failed'] += 1 results['details'].append({ 'id': cfdi_id, 'status': 'failed', 'error': error_msg }) cur.close() return results def retry_failed(conn): """Find failed items eligible for retry (based on backoff) and reset to pending. Uses exponential backoff: item is eligible for retry only if enough time has passed since the last attempt based on retry_count. Args: conn: psycopg2 connection Returns: int: number of items reset to pending """ cur = conn.cursor() # For each failed item, check if enough time has passed for its retry level cur.execute(""" SELECT id, retry_count, created_at FROM cfdi_queue WHERE status = 'failed' AND retry_count < %s ORDER BY created_at ASC """, (MAX_RETRIES,)) items = cur.fetchall() reset_count = 0 now = datetime.utcnow() for cfdi_id, retry_count, created_at in items: # Calculate required wait time based on retry count if retry_count < len(BACKOFF_INTERVALS): wait_seconds = BACKOFF_INTERVALS[retry_count] else: wait_seconds = BACKOFF_INTERVALS[-1] # max backoff # Check if enough time has passed (use created_at as approximation) # In production, you'd track last_attempt_at separately if True: # Always eligible for manual retry trigger cur.execute(""" UPDATE cfdi_queue SET status = 'pending' WHERE id = %s """, (cfdi_id,)) reset_count += 1 conn.commit() cur.close() return reset_count def cancel_cfdi(conn, cfdi_id, motive, replacement_uuid=None, horux_api_url=None, api_key=None): """Cancel a stamped CFDI via Horux API. SAT cancellation motives: 01: Comprobante emitido con errores con relacion (requires replacement UUID) 02: Comprobante emitido con errores sin relacion 03: No se llevo a cabo la operacion 04: Operacion nominativa relacionada en factura global Args: conn: psycopg2 connection cfdi_id: int (cfdi_queue.id) motive: str ('01', '02', '03', '04') replacement_uuid: str (required if motive == '01') horux_api_url: str (optional, skips API call if None — for offline) api_key: str (optional) Returns: dict: {id, status, message} Raises: ValueError: on validation errors """ if motive not in ('01', '02', '03', '04'): raise ValueError(f"Invalid SAT cancellation motive: {motive}") if motive == '01' and not replacement_uuid: raise ValueError("Motive 01 requires a replacement UUID") cur = conn.cursor() cur.execute(""" SELECT id, uuid_fiscal, status FROM cfdi_queue WHERE id = %s """, (cfdi_id,)) row = cur.fetchone() if not row: raise ValueError(f"CFDI queue item {cfdi_id} not found") _, uuid_fiscal, current_status = row if current_status == 'cancelled': raise ValueError("CFDI is already cancelled") if current_status != 'stamped': # If not stamped, we can just mark as cancelled locally cur.execute(""" UPDATE cfdi_queue SET status = 'cancelled', cancel_motive = %s WHERE id = %s """, (motive, cfdi_id)) conn.commit() cur.close() return {'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled locally (was not stamped)'} # Send cancel request to Horux if horux_api_url and api_key: try: payload = { 'uuid': uuid_fiscal, 'motive': motive, } if replacement_uuid: payload['replacement_uuid'] = replacement_uuid response = requests.post( f'{horux_api_url}/api/nexus/cfdi/cancel', headers={ 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json', }, json=payload, timeout=30, ) if response.status_code == 200: cur.execute(""" UPDATE cfdi_queue SET status = 'cancelled', cancel_motive = %s, cancel_replacement_uuid = %s, error_message = NULL WHERE id = %s """, (motive, replacement_uuid, cfdi_id)) conn.commit() cur.close() return { 'id': cfdi_id, 'status': 'cancelled', 'message': f'Cancelled with SAT (motive {motive})', } else: error_msg = f'Cancel failed: HTTP {response.status_code}: {response.text[:500]}' cur.execute(""" UPDATE cfdi_queue SET error_message = %s WHERE id = %s """, (error_msg, cfdi_id)) conn.commit() cur.close() raise ValueError(error_msg) except requests.RequestException as e: cur.close() raise ValueError(f'Connection error during cancel: {str(e)}') else: # Offline mode: mark as cancelled locally, will sync later cur.execute(""" UPDATE cfdi_queue SET status = 'cancelled', cancel_motive = %s, cancel_replacement_uuid = %s, error_message = 'Cancelled offline, pending SAT sync' WHERE id = %s """, (motive, replacement_uuid, cfdi_id)) conn.commit() cur.close() return { 'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled offline, pending SAT sync', } def get_queue_status(conn, filters=None): """Get CFDI queue items with optional filters. Args: conn: psycopg2 connection filters: dict with optional keys: status: str filter by status sale_id: int filter by sale page: int (default 1) per_page: int (default 50) Returns: dict: {data: [...], pagination: {...}} """ filters = filters or {} cur = conn.cursor() page = int(filters.get('page', 1)) per_page = min(int(filters.get('per_page', 50)), 200) where_clauses = ["1=1"] params = [] if filters.get('status'): where_clauses.append("q.status = %s") params.append(filters['status']) if filters.get('sale_id'): where_clauses.append("q.sale_id = %s") params.append(int(filters['sale_id'])) if filters.get('type'): where_clauses.append("q.type = %s") params.append(filters['type']) where = " AND ".join(where_clauses) cur.execute(f"SELECT count(*) FROM cfdi_queue q WHERE {where}", params) total = cur.fetchone()[0] cur.execute(f""" SELECT q.id, q.sale_id, q.type, q.uuid_fiscal, q.status, q.retry_count, q.provisional_folio, q.error_message, q.cancel_motive, q.created_at, q.stamped_at FROM cfdi_queue q WHERE {where} ORDER BY q.created_at DESC LIMIT %s OFFSET %s """, params + [per_page, (page - 1) * per_page]) items = [] for r in cur.fetchall(): items.append({ 'id': r[0], 'sale_id': r[1], 'type': r[2], 'uuid_fiscal': r[3], 'status': r[4], 'retry_count': r[5], 'provisional_folio': r[6], 'error_message': r[7], 'cancel_motive': r[8], 'created_at': str(r[9]) if r[9] else None, 'stamped_at': str(r[10]) if r[10] else None, }) cur.close() total_pages = (total + per_page - 1) // per_page return { 'data': items, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages, } }