diff --git a/pos/services/accounting_engine.py b/pos/services/accounting_engine.py new file mode 100644 index 0000000..c556006 --- /dev/null +++ b/pos/services/accounting_engine.py @@ -0,0 +1,604 @@ +# /home/Autopartes/pos/services/accounting_engine.py +"""Accounting engine: automatic journal entry generation for business operations. + +Every sale, purchase, cash cut, credit payment, and cancellation produces +balanced journal entries (polizas). The engine looks up SAT account IDs +by code and creates journal_entries + journal_entry_lines records. + +Account codes (from SAT seed): + 110 = Caja + 111 = Bancos + 120 = Clientes + 130 = Inventarios + 140 = IVA Acreditable + 210 = Proveedores + 220 = IVA Trasladado + 410 = Ventas + 420 = Devoluciones sobre Ventas + 510 = Costo de Mercancia Vendida + +All functions receive a psycopg2 connection (caller controls commit). +""" + +from datetime import date +from decimal import Decimal, ROUND_HALF_UP +from flask import g + + +def _to_dec(val): + """Convert a value to Decimal for precise arithmetic.""" + if val is None: + return Decimal('0') + return Decimal(str(val)) + + +TWO = Decimal('0.01') + + +def _get_account_id(cur, code): + """Look up account ID by code. Raises ValueError if not found.""" + cur.execute("SELECT id FROM accounts WHERE code = %s AND is_active = true", (code,)) + row = cur.fetchone() + if not row: + raise ValueError(f"Account with code '{code}' not found") + return row[0] + + +def _get_account_ids(cur, codes): + """Look up multiple account IDs by code. Returns dict {code: id}.""" + result = {} + for code in codes: + result[code] = _get_account_id(cur, code) + return result + + +def get_next_entry_number(conn): + """Get the next sequential journal entry number. + + Uses a simple MAX+1 approach. For high-concurrency environments this + could be replaced with a sequence, but for single-tenant refaccionarias + the transaction-level lock from the INSERT is sufficient. + + Args: + conn: psycopg2 connection to tenant DB + + Returns: + int: next entry number (starts at 1) + """ + cur = conn.cursor() + cur.execute("SELECT COALESCE(MAX(entry_number), 0) + 1 FROM journal_entries") + number = cur.fetchone()[0] + cur.close() + return number + + +def _check_period_open(cur, entry_date): + """Verify the fiscal period for the given date is open. + + If no fiscal_periods row exists for the month, it is considered open + (periods are only created when explicitly closed). + + Args: + cur: psycopg2 cursor + entry_date: date object + + Raises: + ValueError: if the period is closed + """ + cur.execute(""" + SELECT status FROM fiscal_periods + WHERE year = %s AND month = %s + """, (entry_date.year, entry_date.month)) + row = cur.fetchone() + if row and row[0] == 'closed': + raise ValueError( + f"Fiscal period {entry_date.year}-{entry_date.month:02d} is closed. " + f"Cannot create journal entries in a closed period." + ) + + +def _create_entry(cur, entry_number, entry_date, entry_type, description, + reference_type, reference_id, lines, is_auto=True): + """Create a journal entry with its lines. + + Validates that total debits == total credits before inserting. + + Args: + cur: psycopg2 cursor + entry_number: int sequential number + entry_date: date + entry_type: 'ingreso' | 'egreso' | 'diario' | 'poliza' + description: str + reference_type: 'sale' | 'purchase' | 'cash_register' | 'payment' | None + reference_id: int or None + lines: list of dicts with keys: account_id, debit, credit, description + is_auto: bool (True for system-generated entries) + + Returns: + int: journal entry ID + + Raises: + ValueError: if debits != credits + """ + # Validate balance + total_debit = sum(_to_dec(line['debit']) for line in lines) + total_credit = sum(_to_dec(line['credit']) for line in lines) + + if total_debit.quantize(TWO, ROUND_HALF_UP) != total_credit.quantize(TWO, ROUND_HALF_UP): + raise ValueError( + f"Unbalanced entry: debits={total_debit} credits={total_credit}" + ) + + created_by = getattr(g, 'employee_id', None) + + cur.execute(""" + INSERT INTO journal_entries + (entry_number, date, type, description, reference_type, reference_id, + status, created_by, is_auto) + VALUES (%s, %s, %s, %s, %s, %s, 'posted', %s, %s) + RETURNING id + """, ( + entry_number, entry_date, entry_type, description, + reference_type, reference_id, created_by, is_auto + )) + entry_id = cur.fetchone()[0] + + for line in lines: + debit = float(_to_dec(line['debit']).quantize(TWO, ROUND_HALF_UP)) + credit = float(_to_dec(line['credit']).quantize(TWO, ROUND_HALF_UP)) + if debit == 0 and credit == 0: + continue # skip zero lines + cur.execute(""" + INSERT INTO journal_entry_lines + (journal_entry_id, account_id, debit, credit, description) + VALUES (%s, %s, %s, %s, %s) + """, (entry_id, line['account_id'], debit, credit, line.get('description', ''))) + + return entry_id + + +def record_sale_entry(conn, sale): + """Generate journal entries for a completed sale. + + For a cash sale: + Debit 110 Caja (or 111 Bancos for transferencia/tarjeta) = total + Credit 410 Ventas = subtotal + Credit 220 IVA Trasladado = tax_total + Debit 510 Costo de Mercancia Vendida = sum(unit_cost * qty) + Credit 130 Inventarios = sum(unit_cost * qty) + + For a credit sale: + Debit 120 Clientes = total + Credit 410 Ventas = subtotal + Credit 220 IVA Trasladado = tax_total + Debit 510 Costo de Mercancia Vendida = sum(unit_cost * qty) + Credit 130 Inventarios = sum(unit_cost * qty) + + Args: + conn: psycopg2 connection to tenant DB + sale: dict from process_sale() with keys: + id, sale_type, payment_method, subtotal, discount_total, + tax_total, total, items (with unit_cost, quantity) + + Returns: + int: journal entry ID + """ + cur = conn.cursor() + entry_date = date.today() + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + sale_id = sale['id'] + sale_type = sale.get('sale_type', 'cash') + payment_method = sale.get('payment_method', 'efectivo') + subtotal = _to_dec(sale['subtotal']) + tax_total = _to_dec(sale['tax_total']) + total = _to_dec(sale['total']) + + # Calculate total cost of goods sold + cost_total = Decimal('0') + for item in sale.get('items', []): + item_cost = _to_dec(item.get('unit_cost', 0)) * int(item.get('quantity', 1)) + cost_total += item_cost.quantize(TWO, ROUND_HALF_UP) + + # Determine debit account for payment received + accounts = _get_account_ids(cur, ['110', '111', '120', '130', '220', '410', '510']) + + if sale_type == 'credit': + payment_account_id = accounts['120'] # Clientes + payment_desc = f'Clientes - Venta a credito #{sale_id}' + elif payment_method in ('transferencia', 'tarjeta'): + payment_account_id = accounts['111'] # Bancos + payment_desc = f'Bancos - Venta #{sale_id} ({payment_method})' + else: + payment_account_id = accounts['110'] # Caja + payment_desc = f'Caja - Venta #{sale_id} (efectivo)' + + lines = [ + # Payment received (debit) + { + 'account_id': payment_account_id, + 'debit': float(total.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': payment_desc, + }, + # Revenue (credit) + { + 'account_id': accounts['410'], + 'debit': 0, + 'credit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Ventas - Venta #{sale_id}', + }, + # IVA Trasladado (credit) + { + 'account_id': accounts['220'], + 'debit': 0, + 'credit': float(tax_total.quantize(TWO, ROUND_HALF_UP)), + 'description': f'IVA Trasladado - Venta #{sale_id}', + }, + ] + + # Cost of goods sold entries (only if cost > 0) + if cost_total > 0: + lines.append({ + 'account_id': accounts['510'], + 'debit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'Costo mercancia - Venta #{sale_id}', + }) + lines.append({ + 'account_id': accounts['130'], + 'debit': 0, + 'credit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Inventarios - Venta #{sale_id}', + }) + + entry_id = _create_entry( + cur, entry_number, entry_date, 'ingreso', + f'Venta #{sale_id} - {payment_method}', + 'sale', sale_id, lines + ) + + cur.close() + return entry_id + + +def record_purchase_entry(conn, purchase_data): + """Generate journal entries for a purchase (inventory receipt). + + Debit 130 Inventarios = subtotal (cost of goods) + Debit 140 IVA Acreditable = tax amount + Credit 210 Proveedores = total (subtotal + tax) + + Args: + conn: psycopg2 connection + purchase_data: dict with keys: + reference_id: int (purchase order or operation ID) + subtotal: float (cost of goods before tax) + tax_amount: float (IVA 16%) + total: float (subtotal + tax) + supplier_name: str (optional, for description) + + Returns: + int: journal entry ID + """ + cur = conn.cursor() + entry_date = date.today() + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + ref_id = purchase_data.get('reference_id') + subtotal = _to_dec(purchase_data['subtotal']) + tax_amount = _to_dec(purchase_data.get('tax_amount', 0)) + total = _to_dec(purchase_data['total']) + supplier = purchase_data.get('supplier_name', 'Proveedor') + + accounts = _get_account_ids(cur, ['130', '140', '210']) + + lines = [ + { + 'account_id': accounts['130'], + 'debit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'Inventarios - Compra {supplier}', + }, + { + 'account_id': accounts['210'], + 'debit': 0, + 'credit': float(total.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Proveedores - Compra {supplier}', + }, + ] + + # IVA Acreditable (only if tax > 0) + if tax_amount > 0: + lines.append({ + 'account_id': accounts['140'], + 'debit': float(tax_amount.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'IVA Acreditable - Compra {supplier}', + }) + + entry_id = _create_entry( + cur, entry_number, entry_date, 'diario', + f'Compra - {supplier}', + 'purchase', ref_id, lines + ) + + cur.close() + return entry_id + + +def record_cash_cut_entry(conn, register): + """Generate journal entry for a cash register close (corte Z). + + Moves cash from register (Caja) to bank (Bancos): + Debit 111 Bancos = closing_amount + Credit 110 Caja = closing_amount + + Args: + conn: psycopg2 connection + register: dict with keys: + id: int (cash register ID) + closing_amount: float + register_number: int + + Returns: + int: journal entry ID + """ + cur = conn.cursor() + entry_date = date.today() + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + reg_id = register['id'] + amount = _to_dec(register['closing_amount']) + reg_num = register.get('register_number', '?') + + if amount <= 0: + cur.close() + return None # No entry for zero/negative close + + accounts = _get_account_ids(cur, ['110', '111']) + + lines = [ + { + 'account_id': accounts['111'], + 'debit': float(amount.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'Bancos - Corte caja #{reg_num}', + }, + { + 'account_id': accounts['110'], + 'debit': 0, + 'credit': float(amount.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Caja - Corte caja #{reg_num}', + }, + ] + + entry_id = _create_entry( + cur, entry_number, entry_date, 'diario', + f'Corte de caja #{reg_num} (registro #{reg_id})', + 'cash_register', reg_id, lines + ) + + cur.close() + return entry_id + + +def record_credit_payment_entry(conn, payment): + """Generate journal entry for a customer credit payment. + + Debit 111 Bancos (or 110 Caja) = amount + Credit 120 Clientes = amount + + Args: + conn: psycopg2 connection + payment: dict with keys: + customer_id: int + customer_name: str (optional) + amount: float + payment_method: 'efectivo' | 'transferencia' | 'tarjeta' + reference_id: int (optional, e.g. sale_id being paid) + + Returns: + int: journal entry ID + """ + cur = conn.cursor() + entry_date = date.today() + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + amount = _to_dec(payment['amount']) + customer_name = payment.get('customer_name', f"Cliente #{payment['customer_id']}") + method = payment.get('payment_method', 'efectivo') + ref_id = payment.get('reference_id') + + accounts = _get_account_ids(cur, ['110', '111', '120']) + + if method in ('transferencia', 'tarjeta'): + debit_account = accounts['111'] # Bancos + debit_desc = f'Bancos - Cobro credito {customer_name}' + else: + debit_account = accounts['110'] # Caja + debit_desc = f'Caja - Cobro credito {customer_name}' + + lines = [ + { + 'account_id': debit_account, + 'debit': float(amount.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': debit_desc, + }, + { + 'account_id': accounts['120'], + 'debit': 0, + 'credit': float(amount.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Clientes - Cobro credito {customer_name}', + }, + ] + + entry_id = _create_entry( + cur, entry_number, entry_date, 'ingreso', + f'Cobro credito - {customer_name} ({method})', + 'payment', ref_id, lines + ) + + cur.close() + return entry_id + + +def record_cancellation_entry(conn, sale): + """Generate reverse journal entry for a cancelled sale. + + This is the exact reverse of record_sale_entry(): + Credit payment account (Caja/Bancos/Clientes) = total + Debit 410 Ventas = subtotal + Debit 220 IVA Trasladado = tax_total + Credit 510 Costo de Mercancia Vendida = cost_total + Debit 130 Inventarios = cost_total + + Args: + conn: psycopg2 connection + sale: dict with the same structure as record_sale_entry() expects. + If items are not present, looks up from DB. + + Returns: + int: journal entry ID + """ + cur = conn.cursor() + entry_date = date.today() + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + sale_id = sale['id'] + sale_type = sale.get('sale_type', 'cash') + payment_method = sale.get('payment_method', 'efectivo') + subtotal = _to_dec(sale.get('subtotal', 0)) + tax_total = _to_dec(sale.get('tax_total', 0)) + total = _to_dec(sale.get('total', 0)) + + # If sale dict lacks items, look up from DB + items = sale.get('items') + if not items: + cur.execute(""" + SELECT unit_cost, quantity FROM sale_items WHERE sale_id = %s + """, (sale_id,)) + items = [{'unit_cost': float(r[0]) if r[0] else 0, 'quantity': r[1]} + for r in cur.fetchall()] + + # If sale dict lacks totals, look up from DB + if total == 0: + cur.execute(""" + SELECT subtotal, tax_total, total, sale_type, payment_method + FROM sales WHERE id = %s + """, (sale_id,)) + row = cur.fetchone() + if row: + subtotal = _to_dec(row[0]) + tax_total = _to_dec(row[1]) + total = _to_dec(row[2]) + sale_type = row[3] + payment_method = row[4] + + cost_total = Decimal('0') + for item in items: + item_cost = _to_dec(item.get('unit_cost', 0)) * int(item.get('quantity', 1)) + cost_total += item_cost.quantize(TWO, ROUND_HALF_UP) + + accounts = _get_account_ids(cur, ['110', '111', '120', '130', '220', '410', '510']) + + # Determine which payment account to credit (reverse of debit in sale) + if sale_type == 'credit': + payment_account_id = accounts['120'] + payment_desc = f'Clientes - Cancelacion venta #{sale_id}' + elif payment_method in ('transferencia', 'tarjeta'): + payment_account_id = accounts['111'] + payment_desc = f'Bancos - Cancelacion venta #{sale_id}' + else: + payment_account_id = accounts['110'] + payment_desc = f'Caja - Cancelacion venta #{sale_id}' + + lines = [ + # Reverse payment (credit the account that was debited) + { + 'account_id': payment_account_id, + 'debit': 0, + 'credit': float(total.quantize(TWO, ROUND_HALF_UP)), + 'description': payment_desc, + }, + # Reverse revenue (debit Ventas) + { + 'account_id': accounts['410'], + 'debit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'Ventas - Cancelacion venta #{sale_id}', + }, + # Reverse IVA (debit IVA Trasladado) + { + 'account_id': accounts['220'], + 'debit': float(tax_total.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'IVA Trasladado - Cancelacion venta #{sale_id}', + }, + ] + + # Reverse COGS entries (only if cost > 0) + if cost_total > 0: + lines.append({ + 'account_id': accounts['510'], + 'debit': 0, + 'credit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), + 'description': f'Costo mercancia - Cancelacion venta #{sale_id}', + }) + lines.append({ + 'account_id': accounts['130'], + 'debit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), + 'credit': 0, + 'description': f'Inventarios - Cancelacion venta #{sale_id}', + }) + + entry_id = _create_entry( + cur, entry_number, entry_date, 'egreso', + f'Cancelacion venta #{sale_id}', + 'sale', sale_id, lines + ) + + cur.close() + return entry_id + + +def create_manual_entry(conn, entry_data): + """Create a manual journal entry (type='diario'). + + Used by accountants for adjustments not tied to a specific operation. + + Args: + conn: psycopg2 connection + entry_data: dict with keys: + date: str 'YYYY-MM-DD' + description: str + lines: [{account_id, debit, credit, description}] + + Returns: + int: journal entry ID + + Raises: + ValueError: if entry is unbalanced or period is closed + """ + cur = conn.cursor() + entry_date = date.fromisoformat(entry_data['date']) + _check_period_open(cur, entry_date) + + entry_number = get_next_entry_number(conn) + + entry_id = _create_entry( + cur, entry_number, entry_date, 'diario', + entry_data.get('description', 'Poliza manual'), + None, None, + entry_data['lines'], + is_auto=False + ) + + cur.close() + return entry_id