# /home/Autopartes/pos/services/accounting_engine.py """Accounting engine: automatic journal entry generation for business operations. Every sale, purchase, cash cut, credit payment, and cancellation produces balanced journal entries (polizas). The engine looks up SAT account IDs by code and creates journal_entries + journal_entry_lines records. Account codes (from SAT seed): 110 = Caja 111 = Bancos 120 = Clientes 130 = Inventarios 140 = IVA Acreditable 210 = Proveedores 220 = IVA Trasladado 410 = Ventas 420 = Devoluciones sobre Ventas 510 = Costo de Mercancia Vendida All functions receive a psycopg2 connection (caller controls commit). """ from datetime import date from decimal import Decimal, ROUND_HALF_UP from flask import g def _to_dec(val): """Convert a value to Decimal for precise arithmetic.""" if val is None: return Decimal('0') return Decimal(str(val)) TWO = Decimal('0.01') def _get_account_id(cur, code): """Look up account ID by code. Raises ValueError if not found.""" cur.execute("SELECT id FROM accounts WHERE code = %s AND is_active = true", (code,)) row = cur.fetchone() if not row: raise ValueError(f"Account with code '{code}' not found") return row[0] def _get_account_ids(cur, codes): """Look up multiple account IDs by code. Returns dict {code: id}.""" result = {} for code in codes: result[code] = _get_account_id(cur, code) return result def get_next_entry_number(conn): """Get the next sequential journal entry number. Uses a simple MAX+1 approach. For high-concurrency environments this could be replaced with a sequence, but for single-tenant refaccionarias the transaction-level lock from the INSERT is sufficient. Args: conn: psycopg2 connection to tenant DB Returns: int: next entry number (starts at 1) """ cur = conn.cursor() cur.execute("SELECT COALESCE(MAX(entry_number), 0) + 1 FROM journal_entries") number = cur.fetchone()[0] cur.close() return number def _check_period_open(cur, entry_date): """Verify the fiscal period for the given date is open. If no fiscal_periods row exists for the month, it is considered open (periods are only created when explicitly closed). Args: cur: psycopg2 cursor entry_date: date object Raises: ValueError: if the period is closed """ cur.execute(""" SELECT status FROM fiscal_periods WHERE year = %s AND month = %s """, (entry_date.year, entry_date.month)) row = cur.fetchone() if row and row[0] == 'closed': raise ValueError( f"Fiscal period {entry_date.year}-{entry_date.month:02d} is closed. " f"Cannot create journal entries in a closed period." ) def _create_entry(cur, entry_number, entry_date, entry_type, description, reference_type, reference_id, lines, is_auto=True): """Create a journal entry with its lines. Validates that total debits == total credits before inserting. Args: cur: psycopg2 cursor entry_number: int sequential number entry_date: date entry_type: 'ingreso' | 'egreso' | 'diario' | 'poliza' description: str reference_type: 'sale' | 'purchase' | 'cash_register' | 'payment' | None reference_id: int or None lines: list of dicts with keys: account_id, debit, credit, description is_auto: bool (True for system-generated entries) Returns: int: journal entry ID Raises: ValueError: if debits != credits """ # Validate balance total_debit = sum(_to_dec(line['debit']) for line in lines) total_credit = sum(_to_dec(line['credit']) for line in lines) if total_debit.quantize(TWO, ROUND_HALF_UP) != total_credit.quantize(TWO, ROUND_HALF_UP): raise ValueError( f"Unbalanced entry: debits={total_debit} credits={total_credit}" ) created_by = getattr(g, 'employee_id', None) cur.execute(""" INSERT INTO journal_entries (entry_number, date, type, description, reference_type, reference_id, status, created_by, is_auto) VALUES (%s, %s, %s, %s, %s, %s, 'posted', %s, %s) RETURNING id """, ( entry_number, entry_date, entry_type, description, reference_type, reference_id, created_by, is_auto )) entry_id = cur.fetchone()[0] for line in lines: debit = float(_to_dec(line['debit']).quantize(TWO, ROUND_HALF_UP)) credit = float(_to_dec(line['credit']).quantize(TWO, ROUND_HALF_UP)) if debit == 0 and credit == 0: continue # skip zero lines cur.execute(""" INSERT INTO journal_entry_lines (journal_entry_id, account_id, debit, credit, description) VALUES (%s, %s, %s, %s, %s) """, (entry_id, line['account_id'], debit, credit, line.get('description', ''))) return entry_id def record_sale_entry(conn, sale): """Generate journal entries for a completed sale. For a cash sale: Debit 110 Caja (or 111 Bancos for transferencia/tarjeta) = total Credit 410 Ventas = subtotal Credit 220 IVA Trasladado = tax_total Debit 510 Costo de Mercancia Vendida = sum(unit_cost * qty) Credit 130 Inventarios = sum(unit_cost * qty) For a credit sale: Debit 120 Clientes = total Credit 410 Ventas = subtotal Credit 220 IVA Trasladado = tax_total Debit 510 Costo de Mercancia Vendida = sum(unit_cost * qty) Credit 130 Inventarios = sum(unit_cost * qty) Args: conn: psycopg2 connection to tenant DB sale: dict from process_sale() with keys: id, sale_type, payment_method, subtotal, discount_total, tax_total, total, items (with unit_cost, quantity) Returns: int: journal entry ID """ cur = conn.cursor() entry_date = date.today() _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) sale_id = sale['id'] sale_type = sale.get('sale_type', 'cash') payment_method = sale.get('payment_method', 'efectivo') subtotal = _to_dec(sale['subtotal']) tax_total = _to_dec(sale['tax_total']) total = _to_dec(sale['total']) # Calculate total cost of goods sold cost_total = Decimal('0') for item in sale.get('items', []): item_cost = _to_dec(item.get('unit_cost', 0)) * int(item.get('quantity', 1)) cost_total += item_cost.quantize(TWO, ROUND_HALF_UP) # Determine debit account for payment received accounts = _get_account_ids(cur, ['110', '111', '120', '130', '220', '410', '510']) if sale_type == 'credit': payment_account_id = accounts['120'] # Clientes payment_desc = f'Clientes - Venta a credito #{sale_id}' elif payment_method in ('transferencia', 'tarjeta'): payment_account_id = accounts['111'] # Bancos payment_desc = f'Bancos - Venta #{sale_id} ({payment_method})' else: payment_account_id = accounts['110'] # Caja payment_desc = f'Caja - Venta #{sale_id} (efectivo)' lines = [ # Payment received (debit) { 'account_id': payment_account_id, 'debit': float(total.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': payment_desc, }, # Revenue (credit) { 'account_id': accounts['410'], 'debit': 0, 'credit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), 'description': f'Ventas - Venta #{sale_id}', }, # IVA Trasladado (credit) { 'account_id': accounts['220'], 'debit': 0, 'credit': float(tax_total.quantize(TWO, ROUND_HALF_UP)), 'description': f'IVA Trasladado - Venta #{sale_id}', }, ] # Cost of goods sold entries (only if cost > 0) if cost_total > 0: lines.append({ 'account_id': accounts['510'], 'debit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'Costo mercancia - Venta #{sale_id}', }) lines.append({ 'account_id': accounts['130'], 'debit': 0, 'credit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), 'description': f'Inventarios - Venta #{sale_id}', }) entry_id = _create_entry( cur, entry_number, entry_date, 'ingreso', f'Venta #{sale_id} - {payment_method}', 'sale', sale_id, lines ) cur.close() return entry_id def record_purchase_entry(conn, purchase_data): """Generate journal entries for a purchase (inventory receipt). Debit 130 Inventarios = subtotal (cost of goods) Debit 140 IVA Acreditable = tax amount Credit 210 Proveedores = total (subtotal + tax) Args: conn: psycopg2 connection purchase_data: dict with keys: reference_id: int (purchase order or operation ID) subtotal: float (cost of goods before tax) tax_amount: float (IVA 16%) total: float (subtotal + tax) supplier_name: str (optional, for description) Returns: int: journal entry ID """ cur = conn.cursor() entry_date = date.today() _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) ref_id = purchase_data.get('reference_id') subtotal = _to_dec(purchase_data['subtotal']) tax_amount = _to_dec(purchase_data.get('tax_amount', 0)) total = _to_dec(purchase_data['total']) supplier = purchase_data.get('supplier_name', 'Proveedor') accounts = _get_account_ids(cur, ['130', '140', '210']) lines = [ { 'account_id': accounts['130'], 'debit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'Inventarios - Compra {supplier}', }, { 'account_id': accounts['210'], 'debit': 0, 'credit': float(total.quantize(TWO, ROUND_HALF_UP)), 'description': f'Proveedores - Compra {supplier}', }, ] # IVA Acreditable (only if tax > 0) if tax_amount > 0: lines.append({ 'account_id': accounts['140'], 'debit': float(tax_amount.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'IVA Acreditable - Compra {supplier}', }) entry_id = _create_entry( cur, entry_number, entry_date, 'diario', f'Compra - {supplier}', 'purchase', ref_id, lines ) cur.close() return entry_id def record_cash_cut_entry(conn, register): """Generate journal entry for a cash register close (corte Z). Moves cash from register (Caja) to bank (Bancos): Debit 111 Bancos = closing_amount Credit 110 Caja = closing_amount Args: conn: psycopg2 connection register: dict with keys: id: int (cash register ID) closing_amount: float register_number: int Returns: int: journal entry ID """ cur = conn.cursor() entry_date = date.today() _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) reg_id = register['id'] amount = _to_dec(register['closing_amount']) reg_num = register.get('register_number', '?') if amount <= 0: cur.close() return None # No entry for zero/negative close accounts = _get_account_ids(cur, ['110', '111']) lines = [ { 'account_id': accounts['111'], 'debit': float(amount.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'Bancos - Corte caja #{reg_num}', }, { 'account_id': accounts['110'], 'debit': 0, 'credit': float(amount.quantize(TWO, ROUND_HALF_UP)), 'description': f'Caja - Corte caja #{reg_num}', }, ] entry_id = _create_entry( cur, entry_number, entry_date, 'diario', f'Corte de caja #{reg_num} (registro #{reg_id})', 'cash_register', reg_id, lines ) cur.close() return entry_id def record_credit_payment_entry(conn, payment): """Generate journal entry for a customer credit payment. Debit 111 Bancos (or 110 Caja) = amount Credit 120 Clientes = amount Args: conn: psycopg2 connection payment: dict with keys: customer_id: int customer_name: str (optional) amount: float payment_method: 'efectivo' | 'transferencia' | 'tarjeta' reference_id: int (optional, e.g. sale_id being paid) Returns: int: journal entry ID """ cur = conn.cursor() entry_date = date.today() _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) amount = _to_dec(payment['amount']) customer_name = payment.get('customer_name', f"Cliente #{payment['customer_id']}") method = payment.get('payment_method', 'efectivo') ref_id = payment.get('reference_id') accounts = _get_account_ids(cur, ['110', '111', '120']) if method in ('transferencia', 'tarjeta'): debit_account = accounts['111'] # Bancos debit_desc = f'Bancos - Cobro credito {customer_name}' else: debit_account = accounts['110'] # Caja debit_desc = f'Caja - Cobro credito {customer_name}' lines = [ { 'account_id': debit_account, 'debit': float(amount.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': debit_desc, }, { 'account_id': accounts['120'], 'debit': 0, 'credit': float(amount.quantize(TWO, ROUND_HALF_UP)), 'description': f'Clientes - Cobro credito {customer_name}', }, ] entry_id = _create_entry( cur, entry_number, entry_date, 'ingreso', f'Cobro credito - {customer_name} ({method})', 'payment', ref_id, lines ) cur.close() return entry_id def record_cancellation_entry(conn, sale): """Generate reverse journal entry for a cancelled sale. This is the exact reverse of record_sale_entry(): Credit payment account (Caja/Bancos/Clientes) = total Debit 410 Ventas = subtotal Debit 220 IVA Trasladado = tax_total Credit 510 Costo de Mercancia Vendida = cost_total Debit 130 Inventarios = cost_total Args: conn: psycopg2 connection sale: dict with the same structure as record_sale_entry() expects. If items are not present, looks up from DB. Returns: int: journal entry ID """ cur = conn.cursor() entry_date = date.today() _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) sale_id = sale['id'] sale_type = sale.get('sale_type', 'cash') payment_method = sale.get('payment_method', 'efectivo') subtotal = _to_dec(sale.get('subtotal', 0)) tax_total = _to_dec(sale.get('tax_total', 0)) total = _to_dec(sale.get('total', 0)) # If sale dict lacks items, look up from DB items = sale.get('items') if not items: cur.execute(""" SELECT unit_cost, quantity FROM sale_items WHERE sale_id = %s """, (sale_id,)) items = [{'unit_cost': float(r[0]) if r[0] else 0, 'quantity': r[1]} for r in cur.fetchall()] # If sale dict lacks totals, look up from DB if total == 0: cur.execute(""" SELECT subtotal, tax_total, total, sale_type, payment_method FROM sales WHERE id = %s """, (sale_id,)) row = cur.fetchone() if row: subtotal = _to_dec(row[0]) tax_total = _to_dec(row[1]) total = _to_dec(row[2]) sale_type = row[3] payment_method = row[4] cost_total = Decimal('0') for item in items: item_cost = _to_dec(item.get('unit_cost', 0)) * int(item.get('quantity', 1)) cost_total += item_cost.quantize(TWO, ROUND_HALF_UP) accounts = _get_account_ids(cur, ['110', '111', '120', '130', '220', '410', '510']) # Determine which payment account to credit (reverse of debit in sale) if sale_type == 'credit': payment_account_id = accounts['120'] payment_desc = f'Clientes - Cancelacion venta #{sale_id}' elif payment_method in ('transferencia', 'tarjeta'): payment_account_id = accounts['111'] payment_desc = f'Bancos - Cancelacion venta #{sale_id}' else: payment_account_id = accounts['110'] payment_desc = f'Caja - Cancelacion venta #{sale_id}' lines = [ # Reverse payment (credit the account that was debited) { 'account_id': payment_account_id, 'debit': 0, 'credit': float(total.quantize(TWO, ROUND_HALF_UP)), 'description': payment_desc, }, # Reverse revenue (debit Ventas) { 'account_id': accounts['410'], 'debit': float(subtotal.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'Ventas - Cancelacion venta #{sale_id}', }, # Reverse IVA (debit IVA Trasladado) { 'account_id': accounts['220'], 'debit': float(tax_total.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'IVA Trasladado - Cancelacion venta #{sale_id}', }, ] # Reverse COGS entries (only if cost > 0) if cost_total > 0: lines.append({ 'account_id': accounts['510'], 'debit': 0, 'credit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), 'description': f'Costo mercancia - Cancelacion venta #{sale_id}', }) lines.append({ 'account_id': accounts['130'], 'debit': float(cost_total.quantize(TWO, ROUND_HALF_UP)), 'credit': 0, 'description': f'Inventarios - Cancelacion venta #{sale_id}', }) entry_id = _create_entry( cur, entry_number, entry_date, 'egreso', f'Cancelacion venta #{sale_id}', 'sale', sale_id, lines ) cur.close() return entry_id def create_manual_entry(conn, entry_data): """Create a manual journal entry (type='diario'). Used by accountants for adjustments not tied to a specific operation. Args: conn: psycopg2 connection entry_data: dict with keys: date: str 'YYYY-MM-DD' description: str lines: [{account_id, debit, credit, description}] Returns: int: journal entry ID Raises: ValueError: if entry is unbalanced or period is closed """ cur = conn.cursor() entry_date = date.fromisoformat(entry_data['date']) _check_period_open(cur, entry_date) entry_number = get_next_entry_number(conn) entry_id = _create_entry( cur, entry_number, entry_date, 'diario', entry_data.get('description', 'Poliza manual'), None, None, entry_data['lines'], is_auto=False ) cur.close() return entry_id