diff --git a/pos/services/pos_engine.py b/pos/services/pos_engine.py new file mode 100644 index 0000000..619e579 --- /dev/null +++ b/pos/services/pos_engine.py @@ -0,0 +1,488 @@ +# /home/Autopartes/pos/services/pos_engine.py +"""POS engine: sale processing, totals calculation, pricing, cancellation. + +All sale operations go through this service. Stock deductions are delegated +to inventory_engine.record_sale() — this service NEVER creates inventory +operations directly. + +Monetary amounts: NUMERIC(12,2) in DB, float in Python. +Tax: 16% IVA per item (from item.tax_rate field). +""" + +from datetime import datetime, timedelta +from decimal import Decimal, ROUND_HALF_UP +from flask import g +from services.audit import log_action +from services.inventory_engine import ( + record_sale as inventory_record_sale, + record_operation, + get_stock, +) + + +def _to_dec(val): + """Convert a value to Decimal for precise arithmetic.""" + return Decimal(str(val)) + + +def calculate_totals(items): + """Compute subtotal, discount amounts, tax, and total for a list of items. + + Uses Python Decimal for all intermediate calculations to avoid + floating-point accumulation errors. Each line item is rounded + individually before summing. Converts back to float only at + the end for JSON serialization. + + Each item dict must have: unit_price, quantity, discount_pct, tax_rate. + Returns dict with computed values and enriched items list. + + Args: + items: list of dicts with keys: + - unit_price (float): price per unit + - quantity (int): number of units + - discount_pct (float): discount percentage (0-100) + - tax_rate (float): tax rate as decimal (e.g., 0.16 for 16%) + + Returns: + dict: {subtotal, discount_total, tax_total, total, items: [...enriched...]} + """ + subtotal = Decimal('0') + discount_total = Decimal('0') + tax_total = Decimal('0') + enriched_items = [] + TWO = Decimal('0.01') + + for item in items: + qty = int(item['quantity']) + price = _to_dec(item['unit_price']) + discount_pct = _to_dec(item.get('discount_pct', 0)) + tax_rate = _to_dec(item.get('tax_rate', '0.16')) + + line_gross = (price * qty).quantize(TWO, rounding=ROUND_HALF_UP) + line_discount = (line_gross * discount_pct / Decimal('100')).quantize(TWO, rounding=ROUND_HALF_UP) + line_after_discount = (line_gross - line_discount).quantize(TWO, rounding=ROUND_HALF_UP) + line_tax = (line_after_discount * tax_rate).quantize(TWO, rounding=ROUND_HALF_UP) + line_subtotal = (line_after_discount + line_tax).quantize(TWO, rounding=ROUND_HALF_UP) + + subtotal += line_after_discount + discount_total += line_discount + tax_total += line_tax + + enriched_items.append({ + **item, + 'line_gross': float(line_gross), + 'discount_amount': float(line_discount), + 'tax_amount': float(line_tax), + 'subtotal': float(line_subtotal), + }) + + return { + 'subtotal': float(subtotal.quantize(TWO, rounding=ROUND_HALF_UP)), + 'discount_total': float(discount_total.quantize(TWO, rounding=ROUND_HALF_UP)), + 'tax_total': float(tax_total.quantize(TWO, rounding=ROUND_HALF_UP)), + 'total': float((subtotal + tax_total).quantize(TWO, rounding=ROUND_HALF_UP)), + 'items': enriched_items, + } + + +def get_price_for_customer(inventory_item, customer): + """Return the correct price based on the customer's price tier. + + Args: + inventory_item: dict with price_1, price_2, price_3 + customer: dict with price_tier (1, 2, or 3), or None for publico general + + Returns: + float: the applicable price + """ + if customer is None: + return float(inventory_item.get('price_1', 0)) + + tier = customer.get('price_tier', 1) + if tier == 3: + price = inventory_item.get('price_3', 0) + elif tier == 2: + price = inventory_item.get('price_2', 0) + else: + price = inventory_item.get('price_1', 0) + + return float(price) if price else float(inventory_item.get('price_1', 0)) + + +def get_margin_info(inventory_item, selling_price=None, discount_pct=0): + """Return cost, price, margin %, and max discount without losing margin. + + Only meaningful for employees with pos.view_cost permission (checked by caller). + + Args: + inventory_item: dict with cost, price_1 (or selling_price override) + selling_price: override price (if None, uses price_1) + discount_pct: current discount percentage + + Returns: + dict: {cost, price, margin_pct, max_discount_pct} + """ + cost = float(inventory_item.get('cost', 0)) + price = float(selling_price) if selling_price else float(inventory_item.get('price_1', 0)) + + if price <= 0: + return {'cost': cost, 'price': price, 'margin_pct': 0.0, 'max_discount_pct': 0.0} + + effective_price = price * (1 - discount_pct / 100) + margin_pct = ((effective_price - cost) / effective_price * 100) if effective_price > 0 else 0.0 + + # Max discount before margin hits zero: price * (1 - d/100) = cost => d = (1 - cost/price) * 100 + max_discount_pct = ((1 - cost / price) * 100) if price > cost else 0.0 + + return { + 'cost': round(cost, 2), + 'price': round(price, 2), + 'margin_pct': round(margin_pct, 2), + 'max_discount_pct': round(max_discount_pct, 2), + } + + +def process_sale(conn, sale_data): + """Process a complete sale: validate, create records, deduct inventory, record payment. + + This is the main entry point for creating a sale. It handles the full transaction: + 1. Validate all items exist and have sufficient stock + 2. Calculate totals + 3. Create sale + sale_items records + 4. Call inventory_engine.record_sale() for each item (stock deduction) + 5. Record payment on cash register + 6. Update customer credit balance (if credit sale) + 7. Create audit log entry + + Args: + conn: psycopg2 connection to tenant DB (caller controls commit) + sale_data: dict with keys: + - items: [{inventory_id, quantity, unit_price, discount_pct, tax_rate}] + - customer_id: int or None (publico general) + - payment_method: 'efectivo' | 'transferencia' | 'tarjeta' | 'mixto' + - sale_type: 'cash' | 'credit' | 'mixed' + - register_id: int (cash register session ID) + - amount_paid: float + - payment_details: [{method, amount, reference}] (for mixed payments) + - notes: str (optional) + + Returns: + dict: complete sale object with id, items, totals, change + + Raises: + ValueError: on validation errors (insufficient stock, invalid items, etc.) + """ + cur = conn.cursor() + items = sale_data.get('items', []) + customer_id = sale_data.get('customer_id') + payment_method = sale_data.get('payment_method', 'efectivo') + sale_type = sale_data.get('sale_type', 'cash') + register_id = sale_data.get('register_id') + amount_paid = float(sale_data.get('amount_paid', 0)) + payment_details = sale_data.get('payment_details', []) + notes = sale_data.get('notes') + branch_id = getattr(g, 'branch_id', None) + employee_id = getattr(g, 'employee_id', None) + + if not items: + raise ValueError("No items in sale") + + # Validate register is open + if register_id: + cur.execute("SELECT status FROM cash_registers WHERE id = %s", (register_id,)) + reg = cur.fetchone() + if not reg or reg[0] != 'open': + raise ValueError("Cash register is not open") + + # Validate and enrich items from inventory + enriched_items = [] + for item in items: + inv_id = item.get('inventory_id') + qty = int(item.get('quantity', 1)) + if qty <= 0: + raise ValueError(f"Invalid quantity for inventory_id {inv_id}") + + cur.execute(""" + SELECT id, part_number, name, cost, price_1, price_2, price_3, + tax_rate, branch_id + FROM inventory WHERE id = %s AND is_active = true + """, (inv_id,)) + inv = cur.fetchone() + if not inv: + raise ValueError(f"Inventory item {inv_id} not found or inactive") + + # Check stock (allow negative stock for offline tolerance, but warn) + current_stock = get_stock(conn, inv_id, inv[8]) # inv[8] = branch_id + + # Use provided price or fetch from inventory + unit_price = float(item.get('unit_price', inv[4])) # default to price_1 + discount_pct = float(item.get('discount_pct', 0)) + tax_rate = float(item.get('tax_rate', inv[7] or 0.16)) + unit_cost = float(inv[3]) if inv[3] else 0 + + # Validate discount against employee max + max_discount = float(getattr(g, 'max_discount_pct', 100) or 100) + if g.employee_role not in ('owner', 'admin') and discount_pct > max_discount: + raise ValueError( + f"Discount {discount_pct}% exceeds your maximum allowed {max_discount}% " + f"for item {inv[2]}" + ) + + enriched_items.append({ + 'inventory_id': inv_id, + 'part_number': inv[1], + 'name': inv[2], + 'quantity': qty, + 'unit_price': unit_price, + 'unit_cost': unit_cost, + 'discount_pct': discount_pct, + 'tax_rate': tax_rate, + 'branch_id': inv[8], + 'stock_before': current_stock, + }) + + # Calculate totals + totals = calculate_totals(enriched_items) + + # Validate credit sale + if sale_type == 'credit' and customer_id: + cur.execute("SELECT credit_limit, credit_balance FROM customers WHERE id = %s", (customer_id,)) + cust = cur.fetchone() + if cust: + credit_limit = float(cust[0] or 0) + credit_balance = float(cust[1] or 0) + credit_available = credit_limit - credit_balance + if totals['total'] > credit_available and credit_limit > 0: + raise ValueError( + f"Insufficient credit. Available: ${credit_available:.2f}, " + f"Required: ${totals['total']:.2f}" + ) + + # Calculate change + change_given = 0.0 + if sale_type == 'cash' and payment_method == 'efectivo': + change_given = round(max(amount_paid - totals['total'], 0), 2) + + # SAT payment method codes + metodo_pago_sat = 'PPD' if sale_type == 'credit' else 'PUE' + forma_pago_map = { + 'efectivo': '01', 'transferencia': '03', 'tarjeta': '04', 'mixto': '99' + } + forma_pago_sat = forma_pago_map.get(payment_method, '99') + + # Create sale record + cur.execute(""" + INSERT INTO sales + (branch_id, customer_id, employee_id, register_id, sale_type, + payment_method, subtotal, discount_total, tax_total, total, + amount_paid, change_given, metodo_pago_sat, forma_pago_sat, + status, device_id, notes) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'completed',%s,%s) + RETURNING id, created_at + """, ( + branch_id, customer_id, employee_id, register_id, sale_type, + payment_method, totals['subtotal'], totals['discount_total'], + totals['tax_total'], totals['total'], amount_paid, change_given, + metodo_pago_sat, forma_pago_sat, + getattr(g, 'device_id', None), notes + )) + sale_id, created_at = cur.fetchone() + + # Create sale items and deduct inventory + sale_items = [] + for idx, item in enumerate(totals['items']): + cur.execute(""" + INSERT INTO sale_items + (sale_id, inventory_id, part_number, name, quantity, + unit_price, unit_cost, discount_pct, discount_amount, + tax_rate, tax_amount, subtotal) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + RETURNING id + """, ( + sale_id, item['inventory_id'], item['part_number'], item['name'], + item['quantity'], item['unit_price'], item.get('unit_cost', 0), + item['discount_pct'], item['discount_amount'], + item['tax_rate'], item['tax_amount'], item['subtotal'] + )) + sale_item_id = cur.fetchone()[0] + + # Deduct inventory via inventory_engine (NEVER create operations directly) + inventory_record_sale( + conn, + item['inventory_id'], + item.get('branch_id', branch_id), + item['quantity'], + sale_id=sale_id, + cost_at_time=item.get('unit_cost') + ) + + sale_items.append({ + 'id': sale_item_id, + 'inventory_id': item['inventory_id'], + 'part_number': item['part_number'], + 'name': item['name'], + 'quantity': item['quantity'], + 'unit_price': item['unit_price'], + 'unit_cost': item.get('unit_cost', 0), + 'discount_pct': item['discount_pct'], + 'discount_amount': item['discount_amount'], + 'tax_rate': item['tax_rate'], + 'tax_amount': item['tax_amount'], + 'subtotal': item['subtotal'], + }) + + # Record payment on cash register (cash movements for efectivo) + if register_id and payment_details: + for pd in payment_details: + method = pd.get('method', payment_method) + amt = float(pd.get('amount', 0)) + ref = pd.get('reference', '') + cur.execute(""" + INSERT INTO sale_payments + (sale_id, register_id, method, amount, reference) + VALUES (%s,%s,%s,%s,%s) + """, (sale_id, register_id, method, amt, ref)) + elif register_id: + cur.execute(""" + INSERT INTO sale_payments + (sale_id, register_id, method, amount, reference) + VALUES (%s,%s,%s,%s,%s) + """, (sale_id, register_id, payment_method, amount_paid, sale_data.get('reference', ''))) + + # Update customer credit balance if credit sale + if sale_type == 'credit' and customer_id: + cur.execute(""" + UPDATE customers + SET credit_balance = credit_balance + %s + WHERE id = %s + """, (totals['total'], customer_id)) + + # Audit log + log_action(conn, 'SALE', 'sale', sale_id, + new_value={ + 'total': totals['total'], + 'items_count': len(sale_items), + 'payment_method': payment_method, + 'sale_type': sale_type, + 'customer_id': customer_id, + }) + + cur.close() + + return { + 'id': sale_id, + 'branch_id': branch_id, + 'customer_id': customer_id, + 'employee_id': employee_id, + 'register_id': register_id, + 'sale_type': sale_type, + 'payment_method': payment_method, + 'subtotal': totals['subtotal'], + 'discount_total': totals['discount_total'], + 'tax_total': totals['tax_total'], + 'total': totals['total'], + 'amount_paid': amount_paid, + 'change_given': change_given, + 'metodo_pago_sat': metodo_pago_sat, + 'forma_pago_sat': forma_pago_sat, + 'status': 'completed', + 'items': sale_items, + 'created_at': str(created_at), + } + + +def cancel_sale(conn, sale_id, reason): + """Cancel a sale: validate permissions, reverse inventory, update credit. + + Business rules: + - Cashiers can only cancel their own sales within 30 minutes + - Admins and owners can cancel any sale + - Cancelled sales are not deleted, just marked as 'cancelled' + - Inventory is restored via RETURN operations + - Customer credit balance is adjusted back + + Args: + conn: psycopg2 connection + sale_id: int + reason: str (mandatory, min 3 chars) + + Returns: + dict: cancellation result + + Raises: + ValueError: on validation errors + """ + if not reason or len(reason.strip()) < 3: + raise ValueError("Cancellation reason is mandatory (min 3 characters)") + + cur = conn.cursor() + + # Get sale details + cur.execute(""" + SELECT id, employee_id, customer_id, sale_type, total, status, created_at, + branch_id, register_id + FROM sales WHERE id = %s + """, (sale_id,)) + sale = cur.fetchone() + if not sale: + raise ValueError("Sale not found") + + s_id, s_emp_id, s_cust_id, s_type, s_total, s_status, s_created, s_branch, s_register = sale + + if s_status == 'cancelled': + raise ValueError("Sale is already cancelled") + + # Permission check: cashiers can only cancel own sales within 30 min + role = getattr(g, 'employee_role', 'cashier') + emp_id = getattr(g, 'employee_id', None) + + if role == 'cashier': + if s_emp_id != emp_id: + raise ValueError("Cashiers can only cancel their own sales") + if datetime.utcnow() - s_created > timedelta(minutes=30): + raise ValueError("Cashiers can only cancel sales within 30 minutes of creation") + + # Get sale items for inventory reversal + cur.execute(""" + SELECT inventory_id, quantity, unit_cost + FROM sale_items WHERE sale_id = %s + """, (sale_id,)) + sale_items = cur.fetchall() + + # Reverse inventory: create RETURN operations (positive quantity) + from services.inventory_engine import record_return + for inv_id, qty, cost in sale_items: + record_return( + conn, inv_id, s_branch, qty, + sale_id=sale_id, + notes=f"Cancelacion venta #{sale_id}: {reason}" + ) + + # Update sale status + cur.execute(""" + UPDATE sales SET status = 'cancelled', notes = COALESCE(notes || ' | ', '') || %s + WHERE id = %s + """, (f"CANCELADA: {reason}", sale_id)) + + # Reverse customer credit balance if credit sale + if s_type == 'credit' and s_cust_id: + cur.execute(""" + UPDATE customers + SET credit_balance = credit_balance - %s + WHERE id = %s + """, (float(s_total), s_cust_id)) + + # Audit log + log_action(conn, 'CANCEL', 'sale', sale_id, + old_value={'status': 'completed', 'total': float(s_total)}, + new_value={'status': 'cancelled', 'reason': reason}) + + cur.close() + + return { + 'sale_id': sale_id, + 'status': 'cancelled', + 'reason': reason, + 'items_reversed': len(sale_items), + 'total_reversed': float(s_total), + }