# /home/Autopartes/pos/services/pos_engine.py """POS engine: sale processing, totals calculation, pricing, cancellation. All sale operations go through this service. Stock deductions are delegated to inventory_engine.record_sale() — this service NEVER creates inventory operations directly. Monetary amounts: NUMERIC(12,2) in DB, float in Python. Tax: 16% IVA per item (from item.tax_rate field). """ from datetime import datetime, timedelta from decimal import Decimal, ROUND_HALF_UP from flask import g from services.audit import log_action from services.inventory_engine import ( record_sale as inventory_record_sale, record_operation, get_stock, ) def _to_dec(val): """Convert a value to Decimal for precise arithmetic.""" return Decimal(str(val)) def calculate_totals(items): """Compute subtotal, discount amounts, tax, and total for a list of items. Uses Python Decimal for all intermediate calculations to avoid floating-point accumulation errors. Each line item is rounded individually before summing. Converts back to float only at the end for JSON serialization. Each item dict must have: unit_price, quantity, discount_pct, tax_rate. Returns dict with computed values and enriched items list. Args: items: list of dicts with keys: - unit_price (float): price per unit - quantity (int): number of units - discount_pct (float): discount percentage (0-100) - tax_rate (float): tax rate as decimal (e.g., 0.16 for 16%) Returns: dict: {subtotal, discount_total, tax_total, total, items: [...enriched...]} """ subtotal = Decimal('0') discount_total = Decimal('0') tax_total = Decimal('0') enriched_items = [] TWO = Decimal('0.01') for item in items: qty = int(item['quantity']) price = _to_dec(item['unit_price']) discount_pct = _to_dec(item.get('discount_pct', 0)) tax_rate = _to_dec(item.get('tax_rate', '0.16')) line_gross = (price * qty).quantize(TWO, rounding=ROUND_HALF_UP) line_discount = (line_gross * discount_pct / Decimal('100')).quantize(TWO, rounding=ROUND_HALF_UP) line_after_discount = (line_gross - line_discount).quantize(TWO, rounding=ROUND_HALF_UP) line_tax = (line_after_discount * tax_rate).quantize(TWO, rounding=ROUND_HALF_UP) line_subtotal = (line_after_discount + line_tax).quantize(TWO, rounding=ROUND_HALF_UP) subtotal += line_after_discount discount_total += line_discount tax_total += line_tax enriched_items.append({ **item, 'line_gross': float(line_gross), 'discount_amount': float(line_discount), 'tax_amount': float(line_tax), 'subtotal': float(line_subtotal), }) return { 'subtotal': float(subtotal.quantize(TWO, rounding=ROUND_HALF_UP)), 'discount_total': float(discount_total.quantize(TWO, rounding=ROUND_HALF_UP)), 'tax_total': float(tax_total.quantize(TWO, rounding=ROUND_HALF_UP)), 'total': float((subtotal + tax_total).quantize(TWO, rounding=ROUND_HALF_UP)), 'items': enriched_items, } def get_price_for_customer(inventory_item, customer): """Return the correct price based on the customer's price tier. Args: inventory_item: dict with price_1, price_2, price_3 customer: dict with price_tier (1, 2, or 3), or None for publico general Returns: float: the applicable price """ if customer is None: return float(inventory_item.get('price_1', 0)) tier = customer.get('price_tier', 1) if tier == 3: price = inventory_item.get('price_3', 0) elif tier == 2: price = inventory_item.get('price_2', 0) else: price = inventory_item.get('price_1', 0) return float(price) if price else float(inventory_item.get('price_1', 0)) def get_margin_info(inventory_item, selling_price=None, discount_pct=0): """Return cost, price, margin %, and max discount without losing margin. Only meaningful for employees with pos.view_cost permission (checked by caller). Args: inventory_item: dict with cost, price_1 (or selling_price override) selling_price: override price (if None, uses price_1) discount_pct: current discount percentage Returns: dict: {cost, price, margin_pct, max_discount_pct} """ cost = float(inventory_item.get('cost', 0)) price = float(selling_price) if selling_price else float(inventory_item.get('price_1', 0)) if price <= 0: return {'cost': cost, 'price': price, 'margin_pct': 0.0, 'max_discount_pct': 0.0} effective_price = price * (1 - discount_pct / 100) margin_pct = ((effective_price - cost) / effective_price * 100) if effective_price > 0 else 0.0 # Max discount before margin hits zero: price * (1 - d/100) = cost => d = (1 - cost/price) * 100 max_discount_pct = ((1 - cost / price) * 100) if price > cost else 0.0 return { 'cost': round(cost, 2), 'price': round(price, 2), 'margin_pct': round(margin_pct, 2), 'max_discount_pct': round(max_discount_pct, 2), } def process_sale(conn, sale_data): """Process a complete sale: validate, create records, deduct inventory, record payment. This is the main entry point for creating a sale. It handles the full transaction: 1. Validate all items exist and have sufficient stock 2. Calculate totals 3. Create sale + sale_items records 4. Call inventory_engine.record_sale() for each item (stock deduction) 5. Record payment on cash register 6. Update customer credit balance (if credit sale) 7. Create audit log entry Args: conn: psycopg2 connection to tenant DB (caller controls commit) sale_data: dict with keys: - items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}] - customer_id: int or None (publico general) - payment_method: 'efectivo' | 'transferencia' | 'tarjeta' | 'mixto' - sale_type: 'cash' | 'credit' | 'mixed' - register_id: int (cash register session ID) - amount_paid: float - payment_details: [{method, amount, reference}] (for mixed payments) - notes: str (optional) Returns: dict: complete sale object with id, items, totals, change Raises: ValueError: on validation errors (insufficient stock, invalid items, etc.) """ cur = conn.cursor() items = sale_data.get('items', []) customer_id = sale_data.get('customer_id') payment_method = sale_data.get('payment_method', 'efectivo') sale_type = sale_data.get('sale_type', 'cash') register_id = sale_data.get('register_id') amount_paid = float(sale_data.get('amount_paid', 0)) payment_details = sale_data.get('payment_details', []) notes = sale_data.get('notes') branch_id = getattr(g, 'branch_id', None) employee_id = getattr(g, 'employee_id', None) if not items: raise ValueError("No items in sale") # Validate register is open if register_id: cur.execute("SELECT status FROM cash_registers WHERE id = %s", (register_id,)) reg = cur.fetchone() if not reg or reg[0] != 'open': raise ValueError("Cash register is not open") # Validate and enrich items from inventory enriched_items = [] for item in items: inv_id = item.get('inventory_id') qty = int(item.get('quantity', 1)) if qty <= 0: raise ValueError(f"Invalid quantity for inventory_id {inv_id}") cur.execute(""" SELECT id, part_number, name, cost, price_1, price_2, price_3, tax_rate, branch_id FROM inventory WHERE id = %s AND is_active = true """, (inv_id,)) inv = cur.fetchone() if not inv: raise ValueError(f"Inventory item {inv_id} not found or inactive") # Check stock (allow negative stock for offline tolerance, but warn) current_stock = get_stock(conn, inv_id, inv[8]) # inv[8] = branch_id # Use provided price or fetch from inventory unit_price = float(item.get('unit_price', inv[4])) # default to price_1 discount_pct = float(item.get('discount_pct', 0)) tax_rate = float(item.get('tax_rate', inv[7] or 0.16)) unit_cost = float(inv[3]) if inv[3] else 0 # Validate discount against employee max max_discount = float(getattr(g, 'max_discount_pct', 100) or 100) if g.employee_role not in ('owner', 'admin') and discount_pct > max_discount: raise ValueError( f"Discount {discount_pct}% exceeds your maximum allowed {max_discount}% " f"for item {inv[2]}" ) enriched_items.append({ 'inventory_id': inv_id, 'part_number': inv[1], 'name': inv[2], 'quantity': qty, 'unit_price': unit_price, 'unit_cost': unit_cost, 'discount_pct': discount_pct, 'tax_rate': tax_rate, 'branch_id': inv[8], 'stock_before': current_stock, }) # Calculate totals totals = calculate_totals(enriched_items) # Validate credit sale if sale_type == 'credit' and customer_id: cur.execute("SELECT credit_limit, credit_balance FROM customers WHERE id = %s", (customer_id,)) cust = cur.fetchone() if cust: credit_limit = float(cust[0] or 0) credit_balance = float(cust[1] or 0) credit_available = credit_limit - credit_balance if totals['total'] > credit_available and credit_limit > 0: raise ValueError( f"Insufficient credit. Available: ${credit_available:.2f}, " f"Required: ${totals['total']:.2f}" ) # Calculate change change_given = 0.0 if sale_type == 'cash' and payment_method == 'efectivo': change_given = round(max(amount_paid - totals['total'], 0), 2) # SAT payment method codes metodo_pago_sat = 'PPD' if sale_type == 'credit' else 'PUE' forma_pago_map = { 'efectivo': '01', 'transferencia': '03', 'tarjeta': '04', 'mixto': '99' } forma_pago_sat = forma_pago_map.get(payment_method, '99') # Create sale record cur.execute(""" INSERT INTO sales (branch_id, customer_id, employee_id, register_id, sale_type, payment_method, subtotal, discount_total, tax_total, total, amount_paid, change_given, metodo_pago_sat, forma_pago_sat, status, device_id, notes) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'completed',%s,%s) RETURNING id, created_at """, ( branch_id, customer_id, employee_id, register_id, sale_type, payment_method, totals['subtotal'], totals['discount_total'], totals['tax_total'], totals['total'], amount_paid, change_given, metodo_pago_sat, forma_pago_sat, getattr(g, 'device_id', None), notes )) sale_id, created_at = cur.fetchone() # Create sale items and deduct inventory sale_items = [] for idx, item in enumerate(totals['items']): cur.execute(""" INSERT INTO sale_items (sale_id, inventory_id, part_number, name, quantity, unit_price, unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id """, ( sale_id, item['inventory_id'], item['part_number'], item['name'], item['quantity'], item['unit_price'], item.get('unit_cost', 0), item['discount_pct'], item['discount_amount'], item['tax_rate'], item['tax_amount'], item['subtotal'] )) sale_item_id = cur.fetchone()[0] # Deduct inventory via inventory_engine (NEVER create operations directly) inventory_record_sale( conn, item['inventory_id'], item.get('branch_id', branch_id), item['quantity'], sale_id=sale_id, cost_at_time=item.get('unit_cost') ) sale_items.append({ 'id': sale_item_id, 'inventory_id': item['inventory_id'], 'part_number': item['part_number'], 'name': item['name'], 'quantity': item['quantity'], 'unit_price': item['unit_price'], 'unit_cost': item.get('unit_cost', 0), 'discount_pct': item['discount_pct'], 'discount_amount': item['discount_amount'], 'tax_rate': item['tax_rate'], 'tax_amount': item['tax_amount'], 'subtotal': item['subtotal'], }) # Record payment on cash register (cash movements for efectivo) if register_id and payment_details: for pd in payment_details: method = pd.get('method', payment_method) amt = float(pd.get('amount', 0)) ref = pd.get('reference', '') cur.execute(""" INSERT INTO sale_payments (sale_id, register_id, method, amount, reference) VALUES (%s,%s,%s,%s,%s) """, (sale_id, register_id, method, amt, ref)) elif register_id: cur.execute(""" INSERT INTO sale_payments (sale_id, register_id, method, amount, reference) VALUES (%s,%s,%s,%s,%s) """, (sale_id, register_id, payment_method, amount_paid, sale_data.get('reference', ''))) # Update customer credit balance if credit sale if sale_type == 'credit' and customer_id: cur.execute(""" UPDATE customers SET credit_balance = credit_balance + %s WHERE id = %s """, (totals['total'], customer_id)) # Audit log log_action(conn, 'SALE', 'sale', sale_id, new_value={ 'total': totals['total'], 'items_count': len(sale_items), 'payment_method': payment_method, 'sale_type': sale_type, 'customer_id': customer_id, }) cur.close() return { 'id': sale_id, 'branch_id': branch_id, 'customer_id': customer_id, 'employee_id': employee_id, 'register_id': register_id, 'sale_type': sale_type, 'payment_method': payment_method, 'subtotal': totals['subtotal'], 'discount_total': totals['discount_total'], 'tax_total': totals['tax_total'], 'total': totals['total'], 'amount_paid': amount_paid, 'change_given': change_given, 'metodo_pago_sat': metodo_pago_sat, 'forma_pago_sat': forma_pago_sat, 'status': 'completed', 'items': sale_items, 'created_at': str(created_at), } def cancel_sale(conn, sale_id, reason): """Cancel a sale: validate permissions, reverse inventory, update credit. Business rules: - Cashiers can only cancel their own sales within 30 minutes - Admins and owners can cancel any sale - Cancelled sales are not deleted, just marked as 'cancelled' - Inventory is restored via RETURN operations - Customer credit balance is adjusted back Args: conn: psycopg2 connection sale_id: int reason: str (mandatory, min 3 chars) Returns: dict: cancellation result Raises: ValueError: on validation errors """ if not reason or len(reason.strip()) < 3: raise ValueError("Cancellation reason is mandatory (min 3 characters)") cur = conn.cursor() # Get sale details cur.execute(""" SELECT id, employee_id, customer_id, sale_type, total, status, created_at, branch_id, register_id FROM sales WHERE id = %s """, (sale_id,)) sale = cur.fetchone() if not sale: raise ValueError("Sale not found") s_id, s_emp_id, s_cust_id, s_type, s_total, s_status, s_created, s_branch, s_register = sale if s_status == 'cancelled': raise ValueError("Sale is already cancelled") # Permission check: cashiers can only cancel own sales within 30 min role = getattr(g, 'employee_role', 'cashier') emp_id = getattr(g, 'employee_id', None) if role == 'cashier': if s_emp_id != emp_id: raise ValueError("Cashiers can only cancel their own sales") if datetime.utcnow() - s_created > timedelta(minutes=30): raise ValueError("Cashiers can only cancel sales within 30 minutes of creation") # Get sale items for inventory reversal cur.execute(""" SELECT inventory_id, quantity, unit_cost FROM sale_items WHERE sale_id = %s """, (sale_id,)) sale_items = cur.fetchall() # Reverse inventory: create RETURN operations (positive quantity) from services.inventory_engine import record_return for inv_id, qty, cost in sale_items: record_return( conn, inv_id, s_branch, qty, sale_id=sale_id, notes=f"Cancelacion venta #{sale_id}: {reason}" ) # Update sale status cur.execute(""" UPDATE sales SET status = 'cancelled', notes = COALESCE(notes || ' | ', '') || %s WHERE id = %s """, (f"CANCELADA: {reason}", sale_id)) # Reverse customer credit balance if credit sale if s_type == 'credit' and s_cust_id: cur.execute(""" UPDATE customers SET credit_balance = credit_balance - %s WHERE id = %s """, (float(s_total), s_cust_id)) # Audit log log_action(conn, 'CANCEL', 'sale', sale_id, old_value={'status': 'completed', 'total': float(s_total)}, new_value={'status': 'cancelled', 'reason': reason}) cur.close() return { 'sale_id': sale_id, 'status': 'cancelled', 'reason': reason, 'items_reversed': len(sale_items), 'total_reversed': float(s_total), }