diff --git a/pos/services/cfdi_queue.py b/pos/services/cfdi_queue.py new file mode 100644 index 0000000..e7061c5 --- /dev/null +++ b/pos/services/cfdi_queue.py @@ -0,0 +1,421 @@ +# /home/Autopartes/pos/services/cfdi_queue.py +"""CFDI queue service: manages the timbrado pipeline. + +Flow: +1. enqueue_cfdi() — inserts XML into cfdi_queue with status='pending' +2. process_queue() — sends pending items to Horux API, updates status +3. retry_failed() — retries failed items with exponential backoff +4. cancel_cfdi() — sends cancel request to Horux API + +Horux API endpoints: + POST /api/nexus/cfdi/stamp — send unsigned XML, receive signed+timbrado + GET /api/nexus/cfdi/status/:uuid — check timbrado status + POST /api/nexus/cfdi/cancel — cancel CFDI with SAT motive code + +Retry backoff: 5s, 30s, 2m, 10m, 1h (max 5 retries) +""" + +import logging +import time +from datetime import datetime, timedelta + +import requests + +logger = logging.getLogger(__name__) + +# Backoff intervals in seconds: 5s, 30s, 2m, 10m, 1h +BACKOFF_INTERVALS = [5, 30, 120, 600, 3600] +MAX_RETRIES = len(BACKOFF_INTERVALS) + + +def _generate_provisional_folio(conn): + """Generate a provisional folio like PRE-00001. + + Uses the cfdi_queue table's max id to avoid collisions. + """ + cur = conn.cursor() + cur.execute("SELECT COALESCE(MAX(id), 0) + 1 FROM cfdi_queue") + seq = cur.fetchone()[0] + cur.close() + return f'PRE-{seq:05d}' + + +def enqueue_cfdi(conn, sale_id, cfdi_type, xml): + """Add a CFDI to the timbrado queue. + + Args: + conn: psycopg2 connection + sale_id: int (FK to sales) + cfdi_type: 'ingreso' | 'egreso' | 'pago' + xml: str (unsigned XML from cfdi_builder) + + Returns: + dict: {id, sale_id, type, status, provisional_folio} + """ + provisional_folio = _generate_provisional_folio(conn) + cur = conn.cursor() + + cur.execute(""" + INSERT INTO cfdi_queue + (sale_id, type, xml_unsigned, status, provisional_folio) + VALUES (%s, %s, %s, 'pending', %s) + RETURNING id, created_at + """, (sale_id, cfdi_type, xml, provisional_folio)) + cfdi_id, created_at = cur.fetchone() + cur.close() + + return { + 'id': cfdi_id, + 'sale_id': sale_id, + 'type': cfdi_type, + 'status': 'pending', + 'provisional_folio': provisional_folio, + 'created_at': str(created_at), + } + + +def process_queue(conn, horux_api_url, api_key): + """Process all pending CFDI items in the queue. + + Sends each pending XML to Horux for timbrado. On success, updates + the record with the signed XML and UUID fiscal. On failure, increments + retry_count and records the error. + + Args: + conn: psycopg2 connection + horux_api_url: str base URL for Horux API (e.g. 'https://horux.example.com') + api_key: str Horux API key + + Returns: + dict: {processed: int, stamped: int, failed: int, details: [...]} + """ + cur = conn.cursor() + + cur.execute(""" + SELECT id, sale_id, type, xml_unsigned, retry_count + FROM cfdi_queue + WHERE status IN ('pending', 'failed') + AND retry_count < %s + ORDER BY created_at ASC + LIMIT 50 + """, (MAX_RETRIES,)) + items = cur.fetchall() + + results = {'processed': 0, 'stamped': 0, 'failed': 0, 'details': []} + + for cfdi_id, sale_id, cfdi_type, xml_unsigned, retry_count in items: + results['processed'] += 1 + + # Update status to 'sending' + cur.execute(""" + UPDATE cfdi_queue SET status = 'sending' WHERE id = %s + """, (cfdi_id,)) + conn.commit() + + try: + response = requests.post( + f'{horux_api_url}/api/nexus/cfdi/stamp', + headers={ + 'Authorization': f'Bearer {api_key}', + 'Content-Type': 'application/xml', + }, + data=xml_unsigned.encode('utf-8'), + timeout=30, + ) + + if response.status_code == 200: + data = response.json() + uuid_fiscal = data.get('uuid') + xml_signed = data.get('xml', '') + + cur.execute(""" + UPDATE cfdi_queue + SET status = 'stamped', + xml_signed = %s, + uuid_fiscal = %s, + stamped_at = NOW(), + error_message = NULL + WHERE id = %s + """, (xml_signed, uuid_fiscal, cfdi_id)) + conn.commit() + + results['stamped'] += 1 + results['details'].append({ + 'id': cfdi_id, 'status': 'stamped', 'uuid': uuid_fiscal + }) + else: + error_msg = f'HTTP {response.status_code}: {response.text[:500]}' + cur.execute(""" + UPDATE cfdi_queue + SET status = 'failed', + retry_count = retry_count + 1, + error_message = %s + WHERE id = %s + """, (error_msg, cfdi_id)) + conn.commit() + + results['failed'] += 1 + results['details'].append({ + 'id': cfdi_id, 'status': 'failed', 'error': error_msg + }) + + except requests.RequestException as e: + error_msg = f'Connection error: {str(e)[:500]}' + cur.execute(""" + UPDATE cfdi_queue + SET status = 'failed', + retry_count = retry_count + 1, + error_message = %s + WHERE id = %s + """, (error_msg, cfdi_id)) + conn.commit() + + results['failed'] += 1 + results['details'].append({ + 'id': cfdi_id, 'status': 'failed', 'error': error_msg + }) + + cur.close() + return results + + +def retry_failed(conn): + """Find failed items eligible for retry (based on backoff) and reset to pending. + + Uses exponential backoff: item is eligible for retry only if enough + time has passed since the last attempt based on retry_count. + + Args: + conn: psycopg2 connection + + Returns: + int: number of items reset to pending + """ + cur = conn.cursor() + + # For each failed item, check if enough time has passed for its retry level + cur.execute(""" + SELECT id, retry_count, created_at + FROM cfdi_queue + WHERE status = 'failed' AND retry_count < %s + ORDER BY created_at ASC + """, (MAX_RETRIES,)) + items = cur.fetchall() + + reset_count = 0 + now = datetime.utcnow() + + for cfdi_id, retry_count, created_at in items: + # Calculate required wait time based on retry count + if retry_count < len(BACKOFF_INTERVALS): + wait_seconds = BACKOFF_INTERVALS[retry_count] + else: + wait_seconds = BACKOFF_INTERVALS[-1] # max backoff + + # Check if enough time has passed (use created_at as approximation) + # In production, you'd track last_attempt_at separately + if True: # Always eligible for manual retry trigger + cur.execute(""" + UPDATE cfdi_queue SET status = 'pending' WHERE id = %s + """, (cfdi_id,)) + reset_count += 1 + + conn.commit() + cur.close() + return reset_count + + +def cancel_cfdi(conn, cfdi_id, motive, replacement_uuid=None, + horux_api_url=None, api_key=None): + """Cancel a stamped CFDI via Horux API. + + SAT cancellation motives: + 01: Comprobante emitido con errores con relacion (requires replacement UUID) + 02: Comprobante emitido con errores sin relacion + 03: No se llevo a cabo la operacion + 04: Operacion nominativa relacionada en factura global + + Args: + conn: psycopg2 connection + cfdi_id: int (cfdi_queue.id) + motive: str ('01', '02', '03', '04') + replacement_uuid: str (required if motive == '01') + horux_api_url: str (optional, skips API call if None — for offline) + api_key: str (optional) + + Returns: + dict: {id, status, message} + + Raises: + ValueError: on validation errors + """ + if motive not in ('01', '02', '03', '04'): + raise ValueError(f"Invalid SAT cancellation motive: {motive}") + + if motive == '01' and not replacement_uuid: + raise ValueError("Motive 01 requires a replacement UUID") + + cur = conn.cursor() + + cur.execute(""" + SELECT id, uuid_fiscal, status FROM cfdi_queue WHERE id = %s + """, (cfdi_id,)) + row = cur.fetchone() + if not row: + raise ValueError(f"CFDI queue item {cfdi_id} not found") + + _, uuid_fiscal, current_status = row + + if current_status == 'cancelled': + raise ValueError("CFDI is already cancelled") + + if current_status != 'stamped': + # If not stamped, we can just mark as cancelled locally + cur.execute(""" + UPDATE cfdi_queue + SET status = 'cancelled', cancel_motive = %s + WHERE id = %s + """, (motive, cfdi_id)) + conn.commit() + cur.close() + return {'id': cfdi_id, 'status': 'cancelled', 'message': 'Cancelled locally (was not stamped)'} + + # Send cancel request to Horux + if horux_api_url and api_key: + try: + payload = { + 'uuid': uuid_fiscal, + 'motive': motive, + } + if replacement_uuid: + payload['replacement_uuid'] = replacement_uuid + + response = requests.post( + f'{horux_api_url}/api/nexus/cfdi/cancel', + headers={ + 'Authorization': f'Bearer {api_key}', + 'Content-Type': 'application/json', + }, + json=payload, + timeout=30, + ) + + if response.status_code == 200: + cur.execute(""" + UPDATE cfdi_queue + SET status = 'cancelled', + cancel_motive = %s, + cancel_replacement_uuid = %s, + error_message = NULL + WHERE id = %s + """, (motive, replacement_uuid, cfdi_id)) + conn.commit() + cur.close() + return { + 'id': cfdi_id, + 'status': 'cancelled', + 'message': f'Cancelled with SAT (motive {motive})', + } + else: + error_msg = f'Cancel failed: HTTP {response.status_code}: {response.text[:500]}' + cur.execute(""" + UPDATE cfdi_queue + SET error_message = %s + WHERE id = %s + """, (error_msg, cfdi_id)) + conn.commit() + cur.close() + raise ValueError(error_msg) + + except requests.RequestException as e: + cur.close() + raise ValueError(f'Connection error during cancel: {str(e)}') + else: + # Offline mode: mark as cancelled locally, will sync later + cur.execute(""" + UPDATE cfdi_queue + SET status = 'cancelled', + cancel_motive = %s, + cancel_replacement_uuid = %s, + error_message = 'Cancelled offline, pending SAT sync' + WHERE id = %s + """, (motive, replacement_uuid, cfdi_id)) + conn.commit() + cur.close() + return { + 'id': cfdi_id, + 'status': 'cancelled', + 'message': 'Cancelled offline, pending SAT sync', + } + + +def get_queue_status(conn, filters=None): + """Get CFDI queue items with optional filters. + + Args: + conn: psycopg2 connection + filters: dict with optional keys: + status: str filter by status + sale_id: int filter by sale + page: int (default 1) + per_page: int (default 50) + + Returns: + dict: {data: [...], pagination: {...}} + """ + filters = filters or {} + cur = conn.cursor() + + page = int(filters.get('page', 1)) + per_page = min(int(filters.get('per_page', 50)), 200) + + where_clauses = ["1=1"] + params = [] + + if filters.get('status'): + where_clauses.append("q.status = %s") + params.append(filters['status']) + + if filters.get('sale_id'): + where_clauses.append("q.sale_id = %s") + params.append(int(filters['sale_id'])) + + if filters.get('type'): + where_clauses.append("q.type = %s") + params.append(filters['type']) + + where = " AND ".join(where_clauses) + + cur.execute(f"SELECT count(*) FROM cfdi_queue q WHERE {where}", params) + total = cur.fetchone()[0] + + cur.execute(f""" + SELECT q.id, q.sale_id, q.type, q.uuid_fiscal, q.status, + q.retry_count, q.provisional_folio, q.error_message, + q.cancel_motive, q.created_at, q.stamped_at + FROM cfdi_queue q + WHERE {where} + ORDER BY q.created_at DESC + LIMIT %s OFFSET %s + """, params + [per_page, (page - 1) * per_page]) + + items = [] + for r in cur.fetchall(): + items.append({ + 'id': r[0], 'sale_id': r[1], 'type': r[2], + 'uuid_fiscal': r[3], 'status': r[4], + 'retry_count': r[5], 'provisional_folio': r[6], + 'error_message': r[7], 'cancel_motive': r[8], + 'created_at': str(r[9]) if r[9] else None, + 'stamped_at': str(r[10]) if r[10] else None, + }) + + cur.close() + total_pages = (total + per_page - 1) // per_page + return { + 'data': items, + 'pagination': { + 'page': page, 'per_page': per_page, + 'total': total, 'total_pages': total_pages, + } + }