Files
Autoparts-DB/pos/services/inventory_engine.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

319 lines
11 KiB
Python

# /home/Autopartes/pos/services/inventory_engine.py
"""Inventory operations engine. All stock mutations go through here.
Stock is NEVER stored as a field — it is always computed as:
SUM(inventory_operations.quantity) WHERE inventory_id = X AND branch_id = Y
Operations are append-only. No UPDATE, no DELETE on inventory_operations.
"""
from flask import g
from services.audit import log_action
from services.redis_stock_cache import (
get_cached_stock, set_cached_stock, invalidate_stock
)
def _safe_g(attr, default=None):
"""Safely read flask.g attribute outside of app context."""
try:
return getattr(g, attr, default)
except RuntimeError:
return default
def get_stock(conn, inventory_id, branch_id=None):
"""Get current stock for an inventory item. Optionally filter by branch.
Uses Redis cache first, falls back to PostgreSQL SUM query.
"""
# Try Redis first
cached = get_cached_stock(inventory_id, branch_id)
if cached is not None:
return cached
# Fallback to PostgreSQL
cur = conn.cursor()
if branch_id:
cur.execute(
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s",
(inventory_id, branch_id)
)
else:
cur.execute(
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s",
(inventory_id,)
)
stock = cur.fetchone()[0]
cur.close()
# Cache the result
set_cached_stock(inventory_id, stock, branch_id)
return stock
def get_stock_bulk(conn, branch_id=None):
"""Get stock for all items. Returns dict {inventory_id: stock_quantity}.
Uses PostgreSQL directly (bulk operation, Redis wouldn't help much here
unless we pre-populated all keys).
"""
cur = conn.cursor()
if branch_id:
cur.execute("""
SELECT inventory_id, COALESCE(SUM(quantity), 0)
FROM inventory_operations WHERE branch_id = %s
GROUP BY inventory_id
""", (branch_id,))
else:
cur.execute("""
SELECT inventory_id, COALESCE(SUM(quantity), 0)
FROM inventory_operations
GROUP BY inventory_id
""")
stock_map = {r[0]: r[1] for r in cur.fetchall()}
cur.close()
# Populate Redis cache with results
for inv_id, qty in stock_map.items():
set_cached_stock(inv_id, qty, branch_id)
return stock_map
def record_operation(conn, inventory_id, branch_id, operation_type, quantity,
reference_id=None, reference_type=None, cost_at_time=None, notes=None):
"""Record a single inventory operation. Does NOT commit — caller controls transaction.
Args:
quantity: positive for entries (PURCHASE, RETURN, INITIAL), negative for exits (SALE)
operation_type: SALE, PURCHASE, RETURN, ADJUST, TRANSFER, INITIAL
"""
cur = conn.cursor()
cur.execute("""
INSERT INTO inventory_operations
(inventory_id, branch_id, operation_type, quantity, reference_id,
reference_type, cost_at_time, employee_id, device_id, notes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""", (
inventory_id, branch_id, operation_type, quantity,
reference_id, reference_type, cost_at_time,
_safe_g('employee_id'),
_safe_g('device_id'),
notes
))
op_id = cur.fetchone()[0]
cur.close()
return op_id
def record_purchase(conn, inventory_id, branch_id, quantity, unit_cost,
supplier_invoice=None, notes=None):
"""Record a purchase entry. Updates weighted average cost on the inventory item.
IMPORTANT: Cost is stored globally on the inventory item (not per-branch), so we
must use TOTAL stock across ALL branches when computing the weighted average.
Using branch-scoped stock would produce incorrect averages when the same item
exists in multiple branches.
Uses SELECT ... FOR UPDATE to prevent race conditions on concurrent purchases
of the same item.
"""
from decimal import Decimal, ROUND_HALF_UP
TWO = Decimal('0.01')
cur = conn.cursor()
cur.execute("SELECT cost FROM inventory WHERE id = %s FOR UPDATE", (inventory_id,))
row = cur.fetchone()
current_cost = Decimal(str(row[0] or 0)) if row else Decimal('0')
# Use GLOBAL stock (all branches) because cost is a global field on the inventory item
current_stock = Decimal(str(get_stock(conn, inventory_id, branch_id=None) or 0))
qty_dec = Decimal(str(quantity))
unit_cost_dec = Decimal(str(unit_cost))
# Weighted average cost (Decimal arithmetic)
stock_plus_qty = current_stock + qty_dec
if stock_plus_qty > 0:
numerator = (current_cost * current_stock) + (unit_cost_dec * qty_dec)
new_cost = (numerator / stock_plus_qty).quantize(TWO, rounding=ROUND_HALF_UP)
else:
new_cost = unit_cost_dec
# Update cost on inventory item
cur.execute("UPDATE inventory SET cost = %s WHERE id = %s", (float(new_cost), inventory_id))
cur.close()
ref_note = f"Compra: {quantity} uds @ ${float(unit_cost_dec):.2f}"
if supplier_invoice:
ref_note += f" | Factura: {supplier_invoice}"
if notes:
ref_note += f" | {notes}"
result = record_operation(
conn, inventory_id, branch_id, 'PURCHASE', quantity,
cost_at_time=float(unit_cost_dec), notes=ref_note
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_sale(conn, inventory_id, branch_id, quantity, sale_id=None, cost_at_time=None, remaining_stock=None):
"""Record a sale (negative quantity).
NOT exposed via HTTP endpoint — called directly by the POS blueprint
which imports inventory_engine as part of the full sale transaction.
Args:
remaining_stock: optional pre-calculated stock to avoid redundant SUM query.
If None, stock will be calculated internally.
"""
op_id = record_operation(
conn, inventory_id, branch_id, 'SALE', -abs(quantity),
reference_id=sale_id, reference_type='sale', cost_at_time=cost_at_time
)
# Invalidate cache immediately
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
# Check if stock hit zero — push to owner (best-effort)
try:
remaining = remaining_stock if remaining_stock is not None else get_stock(conn, inventory_id, branch_id)
if remaining <= 0:
cur = conn.cursor()
cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,))
inv_row = cur.fetchone()
cur.close()
if inv_row:
from services.push_service import notify_owner
notify_owner(
conn,
'Stock en Cero',
f'{inv_row[1] or inv_row[0]} se quedo sin existencias',
'/pos'
)
except Exception:
pass # Push failures never block sales
return op_id
def record_return(conn, inventory_id, branch_id, quantity, sale_id=None, notes=None):
"""Record a customer return (positive quantity)."""
result = record_operation(
conn, inventory_id, branch_id, 'RETURN', abs(quantity),
reference_id=sale_id, reference_type='return', notes=notes
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_adjustment(conn, inventory_id, branch_id, quantity, reason):
"""Record a manual stock adjustment. Reason is mandatory."""
if not reason or len(reason.strip()) < 3:
raise ValueError("Adjustment reason is mandatory (min 3 characters)")
log_action(conn, 'STOCK_ADJUST', 'inventory', inventory_id,
old_value={'stock': get_stock(conn, inventory_id, branch_id)},
new_value={'adjustment': quantity, 'reason': reason})
result = record_operation(
conn, inventory_id, branch_id, 'ADJUST', quantity,
notes=f"Ajuste: {reason}"
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def record_transfer(conn, inventory_id, from_branch_id, to_branch_id, quantity, notes=None):
"""Transfer stock between branches. Creates two operations (out + in)."""
out_id = record_operation(
conn, inventory_id, from_branch_id, 'TRANSFER', -abs(quantity),
notes=f"Transferencia a sucursal {to_branch_id}" + (f" | {notes}" if notes else "")
)
in_id = record_operation(
conn, inventory_id, to_branch_id, 'TRANSFER', abs(quantity),
notes=f"Transferencia desde sucursal {from_branch_id}" + (f" | {notes}" if notes else "")
)
invalidate_stock(inventory_id, from_branch_id)
invalidate_stock(inventory_id, to_branch_id)
invalidate_stock(inventory_id, None)
return out_id, in_id
def record_initial(conn, inventory_id, branch_id, quantity, cost=None):
"""Record initial stock load."""
result = record_operation(
conn, inventory_id, branch_id, 'INITIAL', quantity,
cost_at_time=cost, notes="Carga inicial de inventario"
)
invalidate_stock(inventory_id, branch_id)
invalidate_stock(inventory_id, None)
return result
def get_alerts(conn, branch_id=None):
"""Get stock alerts: zero stock, below minimum, above maximum."""
stock_map = get_stock_bulk(conn, branch_id)
cur = conn.cursor()
where = "WHERE i.is_active = true"
params = []
if branch_id:
where += " AND i.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.min_stock, i.max_stock, i.branch_id
FROM inventory i {where}
""", params)
alerts = []
for row in cur.fetchall():
inv_id, part_num, name, min_s, max_s, br_id = row
stock = stock_map.get(inv_id, 0)
if stock <= 0:
alerts.append({'type': 'zero', 'severity': 'critical', 'inventory_id': inv_id,
'part_number': part_num, 'name': name, 'stock': stock, 'branch_id': br_id})
elif min_s and stock < min_s:
alerts.append({'type': 'low', 'severity': 'warning', 'inventory_id': inv_id,
'part_number': part_num, 'name': name, 'stock': stock,
'min_stock': min_s, 'branch_id': br_id})
elif max_s and stock > max_s:
alerts.append({'type': 'over', 'severity': 'info', 'inventory_id': inv_id,
'part_number': part_num, 'name': name, 'stock': stock,
'max_stock': max_s, 'branch_id': br_id})
cur.close()
return alerts
def get_movement_history(conn, inventory_id, limit=50):
"""Get operation history for a specific item."""
cur = conn.cursor()
cur.execute("""
SELECT io.id, io.operation_type, io.quantity, io.cost_at_time,
io.notes, io.created_at, e.name as employee_name, io.branch_id
FROM inventory_operations io
LEFT JOIN employees e ON io.employee_id = e.id
WHERE io.inventory_id = %s
ORDER BY io.created_at DESC
LIMIT %s
""", (inventory_id, limit))
history = []
for r in cur.fetchall():
history.append({
'id': r[0], 'type': r[1], 'quantity': r[2],
'cost': float(r[3]) if r[3] else None,
'notes': r[4], 'date': str(r[5]),
'employee': r[6], 'branch_id': r[7]
})
cur.close()
return history