Files
Autoparts-DB/pos/services/inventory_engine.py
consultoria-as 2b73c2c6db feat: Fase 1-3 completas - precios proveedor, multi-sucursal, factura global
Fase 1: Lista de precios de proveedor
- Tabla supplier_catalog_prices en master DB
- Endpoints GET/POST/PUT/DELETE /supplier-catalog/prices
- Upload CSV/Excel de precios de proveedor
- Visualizacion de supplier_price en catalogo y POS

Fase 2: Multi-sucursal completo
- Migracion v4.0: inventory.branch_id=NULL, tabla inventory_stock
- Campos fiscales en branches (RFC, regimen, CP, serie CFDI, certificados)
- Trigger trg_update_inventory_stock para sincronizar stock por sucursal
- Backend config_bp.py con CRUD de sucursales fiscales
- Backend inventory_bp.py y pos_bp.py refactorizados para inventario compartido
- Backend invoicing_bp.py usa datos fiscales de la sucursal de la venta
- Frontend config.html/js con modal de sucursales expandido

Fase 3: Factura global mensual
- Migracion v4.1: tablas global_invoice_sales, sales.global_invoiced_at
- build_global_invoice_xml() con InformacionGlobal SAT-compliant
- Servicio global_invoice.py para agrupar ventas PUE <=000
- Endpoints POST/GET /global-invoice y /global-invoice/eligible-sales
- Frontend invoicing.html/js con boton y modal de factura global
2026-06-11 08:59:56 +00:00

370 lines
13 KiB
Python

# /home/Autopartes/pos/services/inventory_engine.py
"""Inventory operations engine. All stock mutations go through here.
Stock is NEVER stored as a field — it is always computed as:
SUM(inventory_operations.quantity) WHERE inventory_id = X AND branch_id = Y
Operations are append-only. No UPDATE, no DELETE on inventory_operations.
"""
from flask import g
from services.audit import log_action
from services.redis_stock_cache import (
get_cached_stock, set_cached_stock, invalidate_stock
)
def _safe_g(attr, default=None):
"""Safely read flask.g attribute outside of app context."""
try:
return getattr(g, attr, default)
except RuntimeError:
return default
def get_stock(conn, inventory_id, branch_id=None):
"""Get current stock for an inventory item. Optionally filter by branch.
Uses Redis cache first, then inventory_stock (per-branch) or
inventory_stock_summary (total), falls back to PostgreSQL SUM query.
"""
# Try Redis first
cached = get_cached_stock(inventory_id, branch_id)
if cached is not None:
return cached
cur = conn.cursor()
if branch_id:
# Per-branch stock from inventory_stock
cur.execute(
"SELECT stock FROM inventory_stock WHERE inventory_id = %s AND branch_id = %s",
(inventory_id, branch_id)
)
else:
# Total stock from inventory_stock_summary
cur.execute(
"SELECT stock FROM inventory_stock_summary WHERE inventory_id = %s",
(inventory_id,)
)
row = cur.fetchone()
if row is not None:
cur.close()
set_cached_stock(inventory_id, row[0], branch_id)
return row[0]
# Fallback to PostgreSQL SUM (legacy, should not reach here if trigger works)
if branch_id:
cur.execute(
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s",
(inventory_id, branch_id)
)
else:
cur.execute(
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s",
(inventory_id,)
)
stock = cur.fetchone()[0]
cur.close()
# Cache the result
set_cached_stock(inventory_id, stock, branch_id)
return stock
def get_stock_bulk(conn, branch_id=None):
"""Get stock for all items. Returns dict {inventory_id: stock_quantity}.
Uses inventory_stock (per-branch) or inventory_stock_summary (total)
for O(1) bulk lookup.
"""
cur = conn.cursor()
if branch_id:
cur.execute("""
SELECT inventory_id, stock
FROM inventory_stock WHERE branch_id = %s
""", (branch_id,))
else:
cur.execute("""
SELECT inventory_id, stock FROM inventory_stock_summary
""")
stock_map = {r[0]: r[1] for r in cur.fetchall()}
cur.close()
# Populate Redis cache with results
for inv_id, qty in stock_map.items():
set_cached_stock(inv_id, qty, branch_id)
return stock_map
def record_operation(conn, inventory_id, branch_id, operation_type, quantity,
reference_id=None, reference_type=None, cost_at_time=None,
notes=None, employee_id=None):
"""Record a single inventory operation. Does NOT commit — caller controls transaction.
Args:
quantity: positive for entries (PURCHASE, RETURN, INITIAL), negative for exits (SALE)
operation_type: SALE, PURCHASE, RETURN, ADJUST, TRANSFER, INITIAL, QUOTE_RESERVE, QUOTE_RELEASE
"""
cur = conn.cursor()
cur.execute("""
INSERT INTO inventory_operations
(inventory_id, branch_id, operation_type, quantity, reference_id,
reference_type, cost_at_time, employee_id, device_id, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
inventory_id, branch_id, operation_type, quantity,
reference_id, reference_type, cost_at_time,
employee_id if employee_id is not None else _safe_g('employee_id'),
_safe_g('device_id'),
notes
))
op_id = cur.fetchone()[0]
cur.close()
return op_id
def record_purchase(conn, inventory_id, branch_id, quantity, unit_cost,
supplier_invoice=None, notes=None):
"""Record a purchase entry. Updates weighted average cost on the inventory item.
IMPORTANT: Cost is stored globally on the inventory item (not per-branch), so we
must use TOTAL stock across ALL branches when computing the weighted average.
Using branch-scoped stock would produce incorrect averages when the same item
exists in multiple branches.
Uses SELECT ... FOR UPDATE to prevent race conditions on concurrent purchases
of the same item.
"""
from decimal import Decimal, ROUND_HALF_UP
TWO = Decimal('0.01')
cur = conn.cursor()
cur.execute("SELECT cost FROM inventory WHERE id = %s FOR UPDATE", (inventory_id,))
row = cur.fetchone()
current_cost = Decimal(str(row[0] or 0)) if row else Decimal('0')
# Use GLOBAL stock (all branches) because cost is a global field on the inventory item
current_stock = Decimal(str(get_stock(conn, inventory_id, branch_id=None) or 0))
qty_dec = Decimal(str(quantity))
unit_cost_dec = Decimal(str(unit_cost))
# Weighted average cost (Decimal arithmetic)
stock_plus_qty = current_stock + qty_dec
if stock_plus_qty > 0:
numerator = (current_cost * current_stock) + (unit_cost_dec * qty_dec)
new_cost = (numerator / stock_plus_qty).quantize(TWO, rounding=ROUND_HALF_UP)
else:
new_cost = unit_cost_dec
# Update cost on inventory item
cur.execute("UPDATE inventory SET cost = %s WHERE id = %s", (float(new_cost), inventory_id))
cur.close()
ref_note = f"Compra: {quantity} uds @ ${float(unit_cost_dec):.2f}"
if supplier_invoice:
ref_note += f" | Factura: {supplier_invoice}"
if notes:
ref_note += f" | {notes}"
result = record_operation(
conn, inventory_id, branch_id, 'PURCHASE', quantity,
cost_at_time=float(unit_cost_dec), notes=ref_note
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_sale(conn, inventory_id, branch_id, quantity, sale_id=None, cost_at_time=None, remaining_stock=None):
"""Record a sale (negative quantity).
NOT exposed via HTTP endpoint — called directly by the POS blueprint
which imports inventory_engine as part of the full sale transaction.
Args:
remaining_stock: optional pre-calculated stock to avoid redundant SUM query.
If None, stock will be calculated internally.
"""
op_id = record_operation(
conn, inventory_id, branch_id, 'SALE', -abs(quantity),
reference_id=sale_id, reference_type='sale', cost_at_time=cost_at_time
)
# Invalidate cache immediately
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
# Check if stock hit zero — push to owner (best-effort)
try:
remaining = remaining_stock if remaining_stock is not None else get_stock(conn, inventory_id, branch_id)
if remaining <= 0:
cur = conn.cursor()
cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,))
inv_row = cur.fetchone()
cur.close()
if inv_row:
from services.push_service import notify_owner
notify_owner(
conn,
'Stock en Cero',
f'{inv_row[1] or inv_row[0]} se quedo sin existencias',
'/pos'
)
except Exception:
pass # Push failures never block sales
return op_id
def record_return(conn, inventory_id, branch_id, quantity, sale_id=None, notes=None):
"""Record a customer return (positive quantity)."""
result = record_operation(
conn, inventory_id, branch_id, 'RETURN', abs(quantity),
reference_id=sale_id, reference_type='return', notes=notes
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_adjustment(conn, inventory_id, branch_id, quantity, reason):
"""Record a manual stock adjustment. Reason is mandatory."""
if not reason or len(reason.strip()) < 3:
raise ValueError("Adjustment reason is mandatory (min 3 characters)")
log_action(conn, 'STOCK_ADJUST', 'inventory', inventory_id,
old_value={'stock': get_stock(conn, inventory_id, branch_id)},
new_value={'adjustment': quantity, 'reason': reason})
result = record_operation(
conn, inventory_id, branch_id, 'ADJUST', quantity,
notes=f"Ajuste: {reason}"
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_transfer(conn, inventory_id, from_branch_id, to_branch_id, quantity, notes=None):
"""Transfer stock between branches. Creates two operations (out + in)."""
out_id = record_operation(
conn, inventory_id, from_branch_id, 'TRANSFER', -abs(quantity),
notes=f"Transferencia a sucursal {to_branch_id}" + (f" | {notes}" if notes else "")
)
in_id = record_operation(
conn, inventory_id, to_branch_id, 'TRANSFER', abs(quantity),
notes=f"Transferencia desde sucursal {from_branch_id}" + (f" | {notes}" if notes else "")
)
invalidate_stock(inventory_id, from_branch_id)
invalidate_stock(inventory_id, to_branch_id)
invalidate_stock(inventory_id, None)
return out_id, in_id
def record_initial(conn, inventory_id, branch_id, quantity, cost=None):
"""Record initial stock load."""
result = record_operation(
conn, inventory_id, branch_id, 'INITIAL', quantity,
cost_at_time=cost, notes="Carga inicial de inventario"
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def get_alerts(conn, branch_id=None, limit_per_type=500):
"""Get stock alerts: zero stock, below minimum, above maximum.
Returns at most limit_per_type alerts per severity to avoid browser freeze.
"""
cur = conn.cursor()
branch_filter = ""
params = []
if branch_id:
branch_filter = " AND i.branch_id = %s"
params.append(branch_id)
# Use a single SQL query with window functions to rank and limit per type
cur.execute(f"""
WITH stock AS (
SELECT inventory_id, COALESCE(SUM(quantity), 0) AS qty
FROM inventory_operations
GROUP BY inventory_id
),
alerts_raw AS (
SELECT
i.id AS inventory_id,
i.part_number,
i.name,
COALESCE(s.qty, 0) AS stock,
i.min_stock,
i.max_stock,
i.branch_id,
CASE
WHEN COALESCE(s.qty, 0) <= 0 THEN 'zero'
WHEN i.min_stock IS NOT NULL AND COALESCE(s.qty, 0) < i.min_stock THEN 'low'
WHEN i.max_stock IS NOT NULL AND COALESCE(s.qty, 0) > i.max_stock THEN 'over'
END AS alert_type,
CASE
WHEN COALESCE(s.qty, 0) <= 0 THEN 'critical'
WHEN i.min_stock IS NOT NULL AND COALESCE(s.qty, 0) < i.min_stock THEN 'warning'
WHEN i.max_stock IS NOT NULL AND COALESCE(s.qty, 0) > i.max_stock THEN 'info'
END AS severity
FROM inventory i
LEFT JOIN stock s ON s.inventory_id = i.id
WHERE i.is_active = true {branch_filter}
),
ranked AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY alert_type ORDER BY inventory_id) AS rn
FROM alerts_raw
WHERE alert_type IS NOT NULL
)
SELECT inventory_id, part_number, name, stock, min_stock, max_stock, branch_id, alert_type, severity
FROM ranked
WHERE rn <= %s
ORDER BY severity DESC, inventory_id
""", params + [limit_per_type])
alerts = []
for row in cur.fetchall():
alerts.append({
'inventory_id': row[0],
'part_number': row[1],
'name': row[2],
'stock': row[3],
'min_stock': row[4],
'max_stock': row[5],
'branch_id': row[6],
'type': row[7],
'severity': row[8],
})
cur.close()
return alerts
def get_movement_history(conn, inventory_id, limit=50):
"""Get operation history for a specific item."""
cur = conn.cursor()
cur.execute("""
SELECT io.id, io.operation_type, io.quantity, io.cost_at_time,
io.notes, io.created_at, e.name as employee_name, io.branch_id
FROM inventory_operations io
LEFT JOIN employees e ON io.employee_id = e.id
WHERE io.inventory_id = %s
ORDER BY io.created_at DESC
LIMIT %s
""", (inventory_id, limit))
history = []
for r in cur.fetchall():
history.append({
'id': r[0], 'type': r[1], 'quantity': r[2],
'cost': float(r[3]) if r[3] else None,
'notes': r[4], 'date': str(r[5]),
'employee': r[6], 'branch_id': r[7]
})
cur.close()
return history