# /home/Autopartes/pos/services/cfdi_queue.py """CFDI queue service: manages the Facturapi timbrado pipeline. Flow: 1. enqueue_cfdi() — inserts Facturapi JSON payload into cfdi_queue with status='pending' 2. process_queue() — sends pending items to Facturapi, updates status 3. retry_failed() — retries failed items with exponential backoff 4. cancel_cfdi() — cancels a stamped CFDI via Facturapi Facturapi endpoints used: POST /v2/invoices — create and stamp an invoice GET /v2/invoices/:id — fetch invoice metadata DELETE /v2/invoices/:id — cancel with SAT motive Retry backoff: 5s, 30s, 2m, 10m, 1h (max 5 retries) """ import json import logging from datetime import datetime, timedelta from services import facturapi_service logger = logging.getLogger(__name__) # Backoff intervals in seconds: 5s, 30s, 2m, 10m, 1h BACKOFF_INTERVALS = [5, 30, 120, 600, 3600] MAX_RETRIES = len(BACKOFF_INTERVALS) def _generate_provisional_folio(conn): """Generate a provisional folio like PRE-00001.""" cur = conn.cursor() cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM cfdi_queue") seq = cur.fetchone()[0] cur.close() return f'PRE-{seq:05d}' def enqueue_cfdi(conn, sale_id, cfdi_type, payload): """Add a CFDI to the timbrado queue. Args: conn: psycopg2 connection sale_id: int (FK to sales), may be None for global invoices cfdi_type: 'ingreso' | 'egreso' | 'pago' payload: dict (Facturapi JSON payload) or str (JSON string) Returns: dict: {id, sale_id, type, status, provisional_folio} """ provisional_folio = _generate_provisional_folio(conn) cur = conn.cursor() payload_json = payload if isinstance(payload, str) else json.dumps(payload) cur.execute(""" INSERT INTO cfdi_queue (sale_id, type, payload_unsigned, status, provisional_folio) VALUES (%s, %s, %s, 'pending', %s) RETURNING id, created_at """, (sale_id, cfdi_type, payload_json, provisional_folio)) cfdi_id, created_at = cur.fetchone() cur.close() return { 'id': cfdi_id, 'sale_id': sale_id, 'type': cfdi_type, 'status': 'pending', 'provisional_folio': provisional_folio, 'created_at': str(created_at), } def process_queue(conn, tenant_config, dry_run=False): """Process all pending CFDI items in the queue. Sends each pending payload to Facturapi for timbrado. On success, updates the record with the signed XML and UUID fiscal. On failure, increments retry_count and records the error. Args: conn: psycopg2 connection tenant_config: dict with facturapi_key (and optional facturapi_org_id) dry_run: if True, validates payload without stamping Returns: dict: {processed: int, stamped: int, failed: int, details: [...]} """ cur = conn.cursor() cur.execute(""" SELECT id, sale_id, type, payload_unsigned, retry_count FROM cfdi_queue WHERE status IN ('pending', 'failed') AND retry_count < %s ORDER BY created_at ASC LIMIT 50 """, (MAX_RETRIES,)) items = cur.fetchall() results = {'processed': 0, 'stamped': 0, 'failed': 0, 'details': []} api_key = tenant_config.get('facturapi_key') if not api_key: cur.close() raise ValueError("Facturapi key not configured for tenant") for cfdi_id, sale_id, cfdi_type, payload_unsigned, retry_count in items: results['processed'] += 1 # Update status to 'sending' cur.execute(""" UPDATE cfdi_queue SET status = 'sending' WHERE id = %s """, (cfdi_id,)) conn.commit() try: payload = json.loads(payload_unsigned or '{}') if not payload: raise ValueError("Empty payload in queue item") if dry_run: # TODO: Facturapi dry-run validation (not officially supported) # For now we just skip the API call and mark as stamped with a fake UUID raise ValueError("dry_run is not supported with Facturapi") invoice = facturapi_service.create_invoice(tenant_config, payload) invoice_id = invoice.get('id') uuid_fiscal = invoice.get('uuid') # Download signed XML for storage try: xml_signed = facturapi_service.download_xml(tenant_config, invoice_id) xml_signed_str = xml_signed.decode('utf-8') if isinstance(xml_signed, bytes) else str(xml_signed) except Exception as xml_err: logger.warning("Could not download signed XML for %s: %s", invoice_id, xml_err) xml_signed_str = '' cur.execute(""" UPDATE cfdi_queue SET status = 'stamped', xml_signed = %s, uuid_fiscal = %s, external_id = %s, stamped_at = NOW(), error_message = NULL WHERE id = %s """, (xml_signed_str, uuid_fiscal, invoice_id, cfdi_id)) conn.commit() results['stamped'] += 1 results['details'].append({ 'id': cfdi_id, 'status': 'stamped', 'uuid': uuid_fiscal, 'external_id': invoice_id, }) except Exception as e: error_msg = f'{type(e).__name__}: {str(e)[:500]}' cur.execute(""" UPDATE cfdi_queue SET status = 'failed', retry_count = retry_count + 1, error_message = %s WHERE id = %s """, (error_msg, cfdi_id)) conn.commit() results['failed'] += 1 results['details'].append({ 'id': cfdi_id, 'status': 'failed', 'error': error_msg }) cur.close() return results def retry_failed(conn): """Find failed items eligible for retry and reset to pending. Uses exponential backoff: item is eligible for retry only if enough time has passed since the last attempt based on retry_count. """ cur = conn.cursor() cur.execute(""" SELECT id, retry_count, created_at FROM cfdi_queue WHERE status = 'failed' AND retry_count < %s ORDER BY created_at ASC """, (MAX_RETRIES,)) items = cur.fetchall() reset_count = 0 now = datetime.utcnow() for cfdi_id, retry_count, created_at in items: if retry_count < len(BACKOFF_INTERVALS): wait_seconds = BACKOFF_INTERVALS[retry_count] else: wait_seconds = BACKOFF_INTERVALS[-1] # Use created_at as approximation for last attempt. # In production, track last_attempt_at separately. elapsed = (now - created_at).total_seconds() if elapsed >= wait_seconds: cur.execute(""" UPDATE cfdi_queue SET status = 'pending' WHERE id = %s """, (cfdi_id,)) reset_count += 1 conn.commit() cur.close() return reset_count def cancel_cfdi(conn, cfdi_id, motive, replacement_uuid=None, tenant_config=None): """Cancel a stamped CFDI via Facturapi. SAT cancellation motives: 01: Comprobante emitido con errores con relacion (requires replacement UUID) 02: Comprobante emitido con errores sin relacion 03: No se llevo a cabo la operacion 04: Operacion nominativa relacionada en factura global Args: conn: psycopg2 connection cfdi_id: int (cfdi_queue.id) motive: str ('01', '02', '03', '04') replacement_uuid: str (required if motive == '01') tenant_config: dict with facturapi_key Returns: dict: {id, status, message} Raises: ValueError: on validation errors """ if motive not in ('01', '02', '03', '04'): raise ValueError(f"Invalid SAT cancellation motive: {motive}") if motive == '01' and not replacement_uuid: raise ValueError("Motive 01 requires a replacement UUID") cur = conn.cursor() cur.execute(""" SELECT id, uuid_fiscal, external_id, status FROM cfdi_queue WHERE id = %s """, (cfdi_id,)) row = cur.fetchone() if not row: raise ValueError(f"CFDI queue item {cfdi_id} not found") _, uuid_fiscal, external_id, current_status = row if current_status == 'cancelled': raise ValueError("CFDI is already cancelled") if current_status != 'stamped': # If not stamped, we can just mark as cancelled locally cur.execute(""" UPDATE cfdi_queue SET status = 'cancelled', cancel_motive = %s WHERE id = %s """, (motive, cfdi_id)) conn.commit() cur.close() return {'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled locally (was not stamped)'} if not tenant_config or not tenant_config.get('facturapi_key'): cur.close() raise ValueError("Facturapi key not configured for tenant") if not external_id: cur.close() raise ValueError("Cannot cancel: no Facturapi invoice id stored") try: facturapi_service.cancel_invoice( tenant_config, external_id, motive, replacement_uuid=replacement_uuid, ) cur.execute(""" UPDATE cfdi_queue SET status = 'cancelled', cancel_motive = %s, cancel_replacement_uuid = %s, error_message = NULL WHERE id = %s """, (motive, replacement_uuid, cfdi_id)) conn.commit() cur.close() return { 'id': cfdi_id, 'status': 'cancelled', 'message': f'Cancelled with SAT (motive {motive})', } except Exception as e: error_msg = f'Cancel failed: {str(e)[:500]}' cur.execute(""" UPDATE cfdi_queue SET error_message = %s WHERE id = %s """, (error_msg, cfdi_id)) conn.commit() cur.close() raise ValueError(error_msg) def get_queue_status(conn, filters=None): """Get CFDI queue items with optional filters.""" filters = filters or {} cur = conn.cursor() page = int(filters.get('page', 1)) per_page = min(int(filters.get('per_page', 50)), 200) where_clauses = ["1=1"] params = [] if filters.get('status'): where_clauses.append("q.status = %s") params.append(filters['status']) if filters.get('sale_id'): where_clauses.append("q.sale_id = %s") params.append(int(filters['sale_id'])) if filters.get('type'): where_clauses.append("q.type = %s") params.append(filters['type']) where = " AND ".join(where_clauses) cur.execute(f"SELECT count(*) FROM cfdi_queue q WHERE {where}", params) total = cur.fetchone()[0] cur.execute(f""" SELECT q.id, q.sale_id, q.type, q.uuid_fiscal, q.status, q.retry_count, q.provisional_folio, q.error_message, q.cancel_motive, q.created_at, q.stamped_at, q.external_id FROM cfdi_queue q WHERE {where} ORDER BY q.created_at DESC LIMIT %s OFFSET %s """, params + [per_page, (page - 1) * per_page]) items = [] for r in cur.fetchall(): items.append({ 'id': r[0], 'sale_id': r[1], 'type': r[2], 'uuid_fiscal': r[3], 'status': r[4], 'retry_count': r[5], 'provisional_folio': r[6], 'error_message': r[7], 'cancel_motive': r[8], 'created_at': str(r[9]) if r[9] else None, 'stamped_at': str(r[10]) if r[10] else None, 'external_id': r[11], }) cur.close() total_pages = (total + per_page - 1) // per_page return { 'data': items, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages, } }