Files
Autoparts-DB/pos/blueprints/invoicing_bp.py
consultoria-as bc950efc26 fix(pos): resolve integration test failures for CFDI + accounting
- Fix sat_accounts.sql: split multi-row INSERT into individual statements
  so parent_id subqueries resolve correctly (was producing all NULLs)
- Add tenant_config table to v1.0 schema (required by CFDI invoicing)
- Seed tenant_config with RFC/regimen during tenant provisioning
- Fix cancel_sale to pass complete sale data for accounting reversal
- Fix CFDI XML builder: use `or` instead of dict.get() defaults to
  handle explicit None values from DB (clave_prod_serv, clave_unidad)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 04:54:50 +00:00

400 lines
13 KiB
Python

# /home/Autopartes/pos/blueprints/invoicing_bp.py
"""Invoicing blueprint: CFDI generation, queue management, cancellation.
All CFDI business logic lives in services (cfdi_builder, cfdi_queue).
This blueprint is the HTTP layer that validates input and returns JSON.
"""
import json
from flask import Blueprint, request, jsonify, g
from middleware import require_auth
from tenant_db import get_tenant_conn
from services.cfdi_builder import build_ingreso_xml, build_egreso_xml, build_pago_xml
from services.cfdi_queue import (
enqueue_cfdi, process_queue, retry_failed,
cancel_cfdi, get_queue_status,
)
from services.audit import log_action
invoicing_bp = Blueprint('invoicing', __name__, url_prefix='/pos/api/invoicing')
def _get_tenant_config(cur):
"""Load tenant CFDI configuration from tenant_config table.
Falls back to sensible defaults if config is incomplete.
"""
config = {}
cur.execute("SELECT key, value FROM tenant_config WHERE key LIKE 'cfdi_%' OR key LIKE 'tenant_%'")
for row in cur.fetchall():
config[row[0]] = row[1]
return {
'rfc': config.get('tenant_rfc', ''),
'razon_social': config.get('tenant_razon_social', ''),
'regimen_fiscal': config.get('cfdi_regimen_fiscal', '601'),
'cp': config.get('tenant_cp', '00000'),
'serie': config.get('cfdi_serie', 'A'),
'horux_api_url': config.get('cfdi_horux_api_url', ''),
'horux_api_key': config.get('cfdi_horux_api_key', ''),
}
def _get_sale_with_items(cur, sale_id):
"""Load a sale with its items for CFDI generation."""
cur.execute("""
SELECT id, branch_id, customer_id, employee_id, sale_type,
payment_method, subtotal, discount_total, tax_total, total,
metodo_pago_sat, forma_pago_sat, status, created_at
FROM sales WHERE id = %s
""", (sale_id,))
row = cur.fetchone()
if not row:
return None
sale = {
'id': row[0], 'branch_id': row[1], 'customer_id': row[2],
'employee_id': row[3], 'sale_type': row[4],
'payment_method': row[5],
'subtotal': float(row[6]) if row[6] else 0,
'discount_total': float(row[7]) if row[7] else 0,
'tax_total': float(row[8]) if row[8] else 0,
'total': float(row[9]) if row[9] else 0,
'metodo_pago_sat': row[10] or 'PUE',
'forma_pago_sat': row[11] or '01',
'status': row[12],
'created_at': str(row[13]),
}
cur.execute("""
SELECT id, inventory_id, part_number, name, quantity, unit_price,
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount,
subtotal, clave_prod_serv, clave_unidad
FROM sale_items WHERE sale_id = %s ORDER BY id
""", (sale_id,))
sale['items'] = []
for r in cur.fetchall():
sale['items'].append({
'id': r[0], 'inventory_id': r[1], 'part_number': r[2],
'name': r[3], 'quantity': r[4],
'unit_price': float(r[5]) if r[5] else 0,
'unit_cost': float(r[6]) if r[6] else 0,
'discount_pct': float(r[7]) if r[7] else 0,
'discount_amount': float(r[8]) if r[8] else 0,
'tax_rate': float(r[9]) if r[9] else 0.16,
'tax_amount': float(r[10]) if r[10] else 0,
'subtotal': float(r[11]) if r[11] else 0,
'clave_prod_serv': r[12] or '25174800',
'clave_unidad': r[13] or 'H87',
})
return sale
def _get_customer(cur, customer_id):
"""Load customer data for CFDI receptor."""
if not customer_id:
return None
cur.execute("""
SELECT id, name, rfc, razon_social, regimen_fiscal, uso_cfdi, cp
FROM customers WHERE id = %s
""", (customer_id,))
row = cur.fetchone()
if not row:
return None
return {
'id': row[0], 'name': row[1], 'rfc': row[2],
'razon_social': row[3], 'regimen_fiscal': row[4],
'uso_cfdi': row[5] or 'G03', 'cp': row[6],
}
# ─── Generate CFDI ─────────────────────────────────
@invoicing_bp.route('/invoice', methods=['POST'])
@require_auth('invoicing.create')
def generate_invoice():
"""Generate a CFDI for a sale and enqueue for timbrado.
Body: {
sale_id: int,
type: 'ingreso' (default) | 'egreso',
original_uuid: str (required for egreso)
}
"""
data = request.get_json() or {}
sale_id = data.get('sale_id')
cfdi_type = data.get('type', 'ingreso')
if not sale_id:
return jsonify({'error': 'sale_id is required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
try:
tenant_config = _get_tenant_config(cur)
if not tenant_config['rfc']:
return jsonify({'error': 'Tenant RFC not configured. Set tenant_rfc in config.'}), 400
sale = _get_sale_with_items(cur, sale_id)
if not sale:
return jsonify({'error': 'Sale not found'}), 404
if sale['status'] == 'cancelled':
return jsonify({'error': 'Cannot invoice a cancelled sale'}), 400
customer = _get_customer(cur, sale.get('customer_id'))
# Check if this sale already has a stamped CFDI
cur.execute("""
SELECT id, status FROM cfdi_queue
WHERE sale_id = %s AND type = %s AND status NOT IN ('cancelled', 'failed')
""", (sale_id, cfdi_type))
existing = cur.fetchone()
if existing:
return jsonify({
'error': f'Sale #{sale_id} already has a {cfdi_type} CFDI (queue #{existing[0]}, status: {existing[1]})'
}), 409
# Build XML
if cfdi_type == 'ingreso':
xml = build_ingreso_xml(sale, tenant_config, customer)
elif cfdi_type == 'egreso':
original_uuid = data.get('original_uuid')
if not original_uuid:
return jsonify({'error': 'original_uuid required for egreso'}), 400
xml = build_egreso_xml(sale, tenant_config, customer, original_uuid)
else:
return jsonify({'error': f'Invalid CFDI type: {cfdi_type}'}), 400
# Enqueue
result = enqueue_cfdi(conn, sale_id, cfdi_type, xml)
log_action(conn, 'CFDI_GENERATED', 'cfdi_queue', result['id'],
new_value={'sale_id': sale_id, 'type': cfdi_type,
'folio': result['provisional_folio']})
conn.commit()
cur.close()
conn.close()
return jsonify(result), 201
except ValueError as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': str(e)}), 500
# ─── Queue Management ──────────────────────────────
@invoicing_bp.route('/queue', methods=['GET'])
@require_auth('invoicing.view')
def list_queue():
"""List CFDI queue items.
Query params: status, sale_id, type, page, per_page
"""
conn = get_tenant_conn(g.tenant_id)
filters = {
'status': request.args.get('status'),
'sale_id': request.args.get('sale_id'),
'type': request.args.get('type'),
'page': request.args.get('page', 1),
'per_page': request.args.get('per_page', 50),
}
result = get_queue_status(conn, filters)
conn.close()
return jsonify(result)
@invoicing_bp.route('/queue/<int:cfdi_id>', methods=['GET'])
@require_auth('invoicing.view')
def get_queue_item(cfdi_id):
"""Get CFDI queue item detail (includes XML)."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT q.id, q.sale_id, q.type, q.xml_unsigned, q.xml_signed,
q.uuid_fiscal, q.status, q.retry_count, q.provisional_folio,
q.error_message, q.cancel_motive, q.cancel_replacement_uuid,
q.created_at, q.stamped_at
FROM cfdi_queue q WHERE q.id = %s
""", (cfdi_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'CFDI queue item not found'}), 404
item = {
'id': row[0], 'sale_id': row[1], 'type': row[2],
'xml_unsigned': row[3], 'xml_signed': row[4],
'uuid_fiscal': row[5], 'status': row[6],
'retry_count': row[7], 'provisional_folio': row[8],
'error_message': row[9], 'cancel_motive': row[10],
'cancel_replacement_uuid': row[11],
'created_at': str(row[12]) if row[12] else None,
'stamped_at': str(row[13]) if row[13] else None,
}
cur.close()
conn.close()
return jsonify(item)
@invoicing_bp.route('/queue/process', methods=['POST'])
@require_auth('invoicing.create')
def trigger_process_queue():
"""Manually trigger processing of pending CFDI queue items."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
try:
tenant_config = _get_tenant_config(cur)
horux_url = tenant_config.get('horux_api_url')
horux_key = tenant_config.get('horux_api_key')
if not horux_url or not horux_key:
cur.close()
conn.close()
return jsonify({'error': 'Horux API not configured'}), 400
# Reset eligible failed items first
reset_count = retry_failed(conn)
# Process the queue
result = process_queue(conn, horux_url, horux_key)
result['retries_reset'] = reset_count
cur.close()
conn.close()
return jsonify(result)
except Exception as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': str(e)}), 500
# ─── Cancel CFDI ────────────────────────────────────
@invoicing_bp.route('/cancel/<int:cfdi_id>', methods=['POST'])
@require_auth('invoicing.delete')
def cancel_invoice(cfdi_id):
"""Cancel a CFDI with SAT motive code.
Body: {
motive: '01' | '02' | '03' | '04',
replacement_uuid: str (required if motive == '01')
}
Only owner and admin can cancel CFDIs.
"""
if g.employee_role not in ('owner', 'admin'):
return jsonify({'error': 'Only owner or admin can cancel CFDIs'}), 403
data = request.get_json() or {}
motive = data.get('motive')
replacement_uuid = data.get('replacement_uuid')
if not motive:
return jsonify({'error': 'motive is required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
try:
tenant_config = _get_tenant_config(cur)
result = cancel_cfdi(
conn, cfdi_id, motive, replacement_uuid,
tenant_config.get('horux_api_url'),
tenant_config.get('horux_api_key'),
)
log_action(conn, 'CFDI_CANCELLED', 'cfdi_queue', cfdi_id,
new_value={'motive': motive, 'replacement_uuid': replacement_uuid})
conn.commit()
cur.close()
conn.close()
return jsonify(result)
except ValueError as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': str(e)}), 400
except Exception as e:
conn.rollback()
cur.close()
conn.close()
return jsonify({'error': str(e)}), 500
# ─── PDF Generation ─────────────────────────────────
@invoicing_bp.route('/<int:sale_id>/pdf', methods=['GET'])
@require_auth('invoicing.view')
def get_sale_pdf(sale_id):
"""Generate a PDF representation of the sale/CFDI.
Returns an HTML page styled for print/PDF generation.
For actual PDF file generation, the frontend uses window.print() or
a headless browser. This endpoint returns the formatted HTML.
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
sale = _get_sale_with_items(cur, sale_id)
if not sale:
cur.close(); conn.close()
return jsonify({'error': 'Sale not found'}), 404
tenant_config = _get_tenant_config(cur)
customer = _get_customer(cur, sale.get('customer_id'))
# Check if there's a stamped CFDI
cur.execute("""
SELECT uuid_fiscal, provisional_folio, status, stamped_at
FROM cfdi_queue
WHERE sale_id = %s AND type = 'ingreso' AND status = 'stamped'
ORDER BY stamped_at DESC LIMIT 1
""", (sale_id,))
cfdi_row = cur.fetchone()
cfdi_info = None
if cfdi_row:
cfdi_info = {
'uuid_fiscal': cfdi_row[0],
'provisional_folio': cfdi_row[1],
'status': cfdi_row[2],
'stamped_at': str(cfdi_row[3]) if cfdi_row[3] else None,
}
cur.close()
conn.close()
return jsonify({
'sale': sale,
'tenant': {
'rfc': tenant_config.get('rfc', ''),
'razon_social': tenant_config.get('razon_social', ''),
'regimen_fiscal': tenant_config.get('regimen_fiscal', ''),
'cp': tenant_config.get('cp', ''),
},
'customer': customer,
'cfdi': cfdi_info,
})