"""Supplier and purchase order engine. Provides CRUD for suppliers and the full purchase order lifecycle: create_po → send_po → receive_po → (optional: cancel_po) On receive, automatically: 1. Updates inventory stock via inventory_engine.record_purchase() 2. Creates accounting entry via accounting_engine.record_purchase_entry() """ from decimal import Decimal, ROUND_HALF_UP from datetime import date from services.inventory_engine import record_purchase from services.accounting_engine import record_purchase_entry from services.audit import log_action TWO = Decimal('0.01') def _to_dec(val): if val is None: return Decimal('0') return Decimal(str(val)) # ── SUPPLIER CRUD ────────────────────────────────────────────────────────── def create_supplier(conn, data): """Create a new supplier.""" cur = conn.cursor() cur.execute(""" INSERT INTO suppliers (name, contact_name, phone, email, rfc, address, payment_terms, notes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( data['name'], data.get('contact_name'), data.get('phone'), data.get('email'), data.get('rfc'), data.get('address'), data.get('payment_terms'), data.get('notes') )) supplier_id = cur.fetchone()[0] cur.close() log_action(conn, 'SUPPLIER_CREATE', 'supplier', supplier_id, new_value={'name': data['name']}) return supplier_id def update_supplier(conn, supplier_id, data): """Update supplier fields.""" allowed = ['name', 'contact_name', 'phone', 'email', 'rfc', 'address', 'payment_terms', 'notes', 'is_active'] sets = [] vals = [] for k in allowed: if k in data: sets.append(f"{k} = %s") vals.append(data[k]) if not sets: return False vals.append(supplier_id) cur = conn.cursor() cur.execute(f""" UPDATE suppliers SET {', '.join(sets)}, updated_at = NOW() WHERE id = %s """, vals) updated = cur.rowcount > 0 cur.close() if updated: log_action(conn, 'SUPPLIER_UPDATE', 'supplier', supplier_id, new_value=data) return updated def get_supplier(conn, supplier_id): """Get single supplier by ID.""" cur = conn.cursor() cur.execute(""" SELECT id, name, contact_name, phone, email, rfc, address, payment_terms, notes, is_active, created_at FROM suppliers WHERE id = %s """, (supplier_id,)) row = cur.fetchone() cur.close() if not row: return None return { 'id': row[0], 'name': row[1], 'contact_name': row[2], 'phone': row[3], 'email': row[4], 'rfc': row[5], 'address': row[6], 'payment_terms': row[7], 'notes': row[8], 'is_active': row[9], 'created_at': str(row[10]), } def list_suppliers(conn, active_only=True, limit=100, offset=0): """List suppliers.""" cur = conn.cursor() where = "WHERE is_active = true" if active_only else "" cur.execute(f""" SELECT id, name, contact_name, phone, email, rfc, is_active FROM suppliers {where} ORDER BY name LIMIT %s OFFSET %s """, (limit, offset)) rows = cur.fetchall() cur.close() return [{ 'id': r[0], 'name': r[1], 'contact_name': r[2], 'phone': r[3], 'email': r[4], 'rfc': r[5], 'is_active': r[6], } for r in rows] # ── PURCHASE ORDERS ──────────────────────────────────────────────────────── def create_po(conn, data, branch_id=None, employee_id=None): """Create a purchase order with items. Args: data: dict with keys: supplier_id: int items: [{inventory_id|part_number|name, quantity, unit_price, notes}] notes: str (optional) expected_date: str 'YYYY-MM-DD' (optional) currency: 'MXN'|'USD' (default 'MXN') exchange_rate: float (optional) Returns: dict: {po_id, status, total, item_count} """ supplier_id = data.get('supplier_id') items = data.get('items', []) if not items: raise ValueError("No items in purchase order") currency = data.get('currency', 'MXN') exchange_rate = float(data.get('exchange_rate', 1.0)) # Calculate totals subtotal = Decimal('0') po_items = [] for item in items: qty = int(item.get('quantity', 1)) price = _to_dec(item.get('unit_price', 0)) line_sub = (price * qty).quantize(TWO, ROUND_HALF_UP) subtotal += line_sub po_items.append({ 'inventory_id': item.get('inventory_id'), 'part_number': item.get('part_number', ''), 'name': item.get('name', ''), 'quantity': qty, 'unit_price': float(price), 'subtotal': float(line_sub), 'notes': item.get('notes'), }) tax_rate = Decimal('0.16') tax_total = (subtotal * tax_rate).quantize(TWO, ROUND_HALF_UP) total = (subtotal + tax_total).quantize(TWO, ROUND_HALF_UP) cur = conn.cursor() cur.execute(""" INSERT INTO purchase_orders (supplier_id, branch_id, employee_id, status, subtotal, tax_total, total, currency, exchange_rate, notes, expected_date) VALUES (%s, %s, %s, 'draft', %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( supplier_id, branch_id, employee_id, float(subtotal), float(tax_total), float(total), currency, exchange_rate, data.get('notes'), data.get('expected_date') )) po_id = cur.fetchone()[0] for item in po_items: cur.execute(""" INSERT INTO purchase_order_items (po_id, inventory_id, part_number, name, quantity, unit_price, subtotal, notes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( po_id, item['inventory_id'], item['part_number'], item['name'], item['quantity'], item['unit_price'], item['subtotal'], item['notes'] )) cur.close() log_action(conn, 'PO_CREATE', 'purchase_order', po_id, new_value={'supplier_id': supplier_id, 'total': float(total)}) return { 'po_id': po_id, 'status': 'draft', 'total': float(total), 'item_count': len(po_items), } def send_po(conn, po_id): """Mark PO as sent to supplier.""" cur = conn.cursor() cur.execute(""" UPDATE purchase_orders SET status = 'sent', sent_at = NOW() WHERE id = %s AND status = 'draft' """, (po_id,)) updated = cur.rowcount > 0 cur.close() if updated: log_action(conn, 'PO_SEND', 'purchase_order', po_id) return updated def receive_po(conn, po_id, received_items, supplier_invoice=None, notes=None): """Receive items from a PO. Updates stock and accounting. Args: received_items: list of {po_item_id, quantity} (quantity = qty received now) Returns: dict: {po_id, status, received_total} """ cur = conn.cursor() # Lock PO row cur.execute(""" SELECT id, supplier_id, branch_id, status, subtotal, tax_total, total, currency, exchange_rate FROM purchase_orders WHERE id = %s FOR UPDATE """, (po_id,)) po = cur.fetchone() if not po: raise ValueError("Purchase order not found") if po[3] == 'cancelled': raise ValueError("Cannot receive a cancelled PO") if po[3] == 'received': raise ValueError("PO already fully received") po_supplier_id = po[1] po_branch_id = po[2] po_currency = po[7] po_rate = float(po[8]) # Process each received item total_received_qty = 0 purchase_total_mxn = Decimal('0') for recv in received_items: poi_id = recv['po_item_id'] qty = int(recv['quantity']) if qty <= 0: continue cur.execute(""" SELECT inventory_id, part_number, name, quantity, received_qty, unit_price FROM purchase_order_items WHERE id = %s AND po_id = %s """, (poi_id, po_id)) row = cur.fetchone() if not row: raise ValueError(f"PO item {poi_id} not found") inv_id, part_num, name, ordered_qty, already_received, unit_price = row already_received = already_received or 0 new_received = already_received + qty if new_received > ordered_qty: raise ValueError( f"Cannot receive {qty} of {name or part_num}: " f"ordered={ordered_qty}, already_received={already_received}" ) # Update received quantity cur.execute(""" UPDATE purchase_order_items SET received_qty = %s WHERE id = %s """, (new_received, poi_id)) # Record inventory purchase (if linked to inventory) if inv_id and po_branch_id: record_purchase( conn, inv_id, po_branch_id, qty, unit_price, supplier_invoice=supplier_invoice, notes=f"Recepcion OC #{po_id}: {notes or ''}" ) # Accumulate for accounting (convert to MXN if needed) line_mxn = _to_dec(unit_price) * qty * _to_dec(po_rate) purchase_total_mxn += line_mxn.quantize(TWO, ROUND_HALF_UP) total_received_qty += qty # Determine new PO status cur.execute(""" SELECT SUM(quantity), SUM(received_qty) FROM purchase_order_items WHERE po_id = %s """, (po_id,)) totals = cur.fetchone() ordered_total = totals[0] or 0 received_total = totals[1] or 0 new_status = 'partial' if received_total < ordered_total else 'received' cur.execute(""" UPDATE purchase_orders SET status = %s, received_at = NOW(), supplier_invoice = COALESCE(%s, supplier_invoice), notes = COALESCE(notes || ' | ', '') || %s WHERE id = %s """, (new_status, supplier_invoice, f"Recepcion: {received_total} de {ordered_total} uds" + (f" | Factura: {supplier_invoice}" if supplier_invoice else ""), po_id)) cur.close() # Accounting entry (non-blocking) if purchase_total_mxn > 0: try: tax_mxn = (purchase_total_mxn * Decimal('0.16')).quantize(TWO, ROUND_HALF_UP) total_mxn = (purchase_total_mxn + tax_mxn).quantize(TWO, ROUND_HALF_UP) record_purchase_entry(conn, { 'reference_id': po_id, 'subtotal': float(purchase_total_mxn), 'tax_amount': float(tax_mxn), 'total': float(total_mxn), 'supplier_name': _get_supplier_name(conn, po_supplier_id), }) except Exception: pass # Accounting errors never block receiving log_action(conn, 'PO_RECEIVE', 'purchase_order', po_id, new_value={'received_qty': received_total, 'status': new_status}) return { 'po_id': po_id, 'status': new_status, 'received_total': received_total, 'ordered_total': ordered_total, } def cancel_po(conn, po_id, reason): """Cancel a PO. Only allowed if not fully received.""" if not reason or len(reason.strip()) < 3: raise ValueError("Cancellation reason is mandatory (min 3 characters)") cur = conn.cursor() cur.execute("SELECT status FROM purchase_orders WHERE id = %s", (po_id,)) row = cur.fetchone() if not row: raise ValueError("PO not found") if row[0] == 'received': raise ValueError("Cannot cancel a fully received PO") if row[0] == 'cancelled': raise ValueError("PO is already cancelled") cur.execute(""" UPDATE purchase_orders SET status = 'cancelled', cancelled_at = NOW(), notes = COALESCE(notes || ' | ', '') || %s WHERE id = %s """, (f"CANCELADA: {reason}", po_id)) cur.close() log_action(conn, 'PO_CANCEL', 'purchase_order', po_id, new_value={'reason': reason}) return True def get_po(conn, po_id): """Get full PO with items.""" cur = conn.cursor() cur.execute(""" SELECT po.id, po.status, po.subtotal, po.tax_total, po.total, po.currency, po.exchange_rate, po.notes, po.supplier_invoice, po.expected_date, po.sent_at, po.received_at, po.cancelled_at, po.created_at, s.name as supplier_name FROM purchase_orders po LEFT JOIN suppliers s ON po.supplier_id = s.id WHERE po.id = %s """, (po_id,)) po_row = cur.fetchone() if not po_row: cur.close() return None cur.execute(""" SELECT id, inventory_id, part_number, name, quantity, received_qty, unit_price, subtotal, notes FROM purchase_order_items WHERE po_id = %s """, (po_id,)) items = [] for r in cur.fetchall(): items.append({ 'id': r[0], 'inventory_id': r[1], 'part_number': r[2], 'name': r[3], 'quantity': r[4], 'received_qty': r[5], 'unit_price': float(r[6]), 'subtotal': float(r[7]), 'notes': r[8], }) cur.close() return { 'id': po_row[0], 'status': po_row[1], 'subtotal': float(po_row[2]), 'tax_total': float(po_row[3]), 'total': float(po_row[4]), 'currency': po_row[5], 'exchange_rate': float(po_row[6]), 'notes': po_row[7], 'supplier_invoice': po_row[8], 'expected_date': str(po_row[9]) if po_row[9] else None, 'sent_at': str(po_row[10]) if po_row[10] else None, 'received_at': str(po_row[11]) if po_row[11] else None, 'cancelled_at': str(po_row[12]) if po_row[12] else None, 'created_at': str(po_row[13]), 'supplier_name': po_row[14], 'items': items, } def list_pos(conn, status=None, supplier_id=None, limit=50, offset=0): """List purchase orders.""" cur = conn.cursor() filters = [] vals = [] if status: filters.append("po.status = %s") vals.append(status) if supplier_id: filters.append("po.supplier_id = %s") vals.append(supplier_id) where = "WHERE " + " AND ".join(filters) if filters else "" cur.execute(f""" SELECT po.id, po.status, po.total, po.currency, s.name as supplier_name, po.created_at FROM purchase_orders po LEFT JOIN suppliers s ON po.supplier_id = s.id {where} ORDER BY po.created_at DESC LIMIT %s OFFSET %s """, vals + [limit, offset]) rows = cur.fetchall() cur.close() return [{ 'id': r[0], 'status': r[1], 'total': float(r[2]), 'currency': r[3], 'supplier_name': r[4], 'created_at': str(r[5]), } for r in rows] # ── HELPERS ──────────────────────────────────────────────────────────────── def _get_supplier_name(conn, supplier_id): if not supplier_id: return 'Proveedor' cur = conn.cursor() cur.execute("SELECT name FROM suppliers WHERE id = %s", (supplier_id,)) row = cur.fetchone() cur.close() return row[0] if row else 'Proveedor'