1. Spanish translations for TecDoc catalog (translations.py) applied to catalog_service.py and dashboard server.py endpoints 2. Printable quotation HTML endpoint (/pos/api/quotations/<id>/pdf) with @media print CSS for clean browser-to-PDF output 3. Web Push notifications to owner/admin on sale cancellation, stock zero, and cash register differences > $500. Includes service worker, VAPID key management, and subscription endpoints. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
253 lines
9.3 KiB
Python
253 lines
9.3 KiB
Python
# /home/Autopartes/pos/services/inventory_engine.py
|
|
"""Inventory operations engine. All stock mutations go through here.
|
|
|
|
Stock is NEVER stored as a field — it is always computed as:
|
|
SUM(inventory_operations.quantity) WHERE inventory_id = X AND branch_id = Y
|
|
|
|
Operations are append-only. No UPDATE, no DELETE on inventory_operations.
|
|
"""
|
|
|
|
from flask import g
|
|
from services.audit import log_action
|
|
|
|
|
|
def get_stock(conn, inventory_id, branch_id=None):
|
|
"""Get current stock for an inventory item. Optionally filter by branch."""
|
|
cur = conn.cursor()
|
|
if branch_id:
|
|
cur.execute(
|
|
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s AND branch_id = %s",
|
|
(inventory_id, branch_id)
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT COALESCE(SUM(quantity), 0) FROM inventory_operations WHERE inventory_id = %s",
|
|
(inventory_id,)
|
|
)
|
|
stock = cur.fetchone()[0]
|
|
cur.close()
|
|
return stock
|
|
|
|
|
|
def get_stock_bulk(conn, branch_id=None):
|
|
"""Get stock for all items. Returns dict {inventory_id: stock_quantity}."""
|
|
cur = conn.cursor()
|
|
if branch_id:
|
|
cur.execute("""
|
|
SELECT inventory_id, COALESCE(SUM(quantity), 0)
|
|
FROM inventory_operations WHERE branch_id = %s
|
|
GROUP BY inventory_id
|
|
""", (branch_id,))
|
|
else:
|
|
cur.execute("""
|
|
SELECT inventory_id, COALESCE(SUM(quantity), 0)
|
|
FROM inventory_operations
|
|
GROUP BY inventory_id
|
|
""")
|
|
stock_map = {r[0]: r[1] for r in cur.fetchall()}
|
|
cur.close()
|
|
return stock_map
|
|
|
|
|
|
def record_operation(conn, inventory_id, branch_id, operation_type, quantity,
|
|
reference_id=None, reference_type=None, cost_at_time=None, notes=None):
|
|
"""Record a single inventory operation. Does NOT commit — caller controls transaction.
|
|
|
|
Args:
|
|
quantity: positive for entries (PURCHASE, RETURN, INITIAL), negative for exits (SALE)
|
|
operation_type: SALE, PURCHASE, RETURN, ADJUST, TRANSFER, INITIAL
|
|
"""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO inventory_operations
|
|
(inventory_id, branch_id, operation_type, quantity, reference_id,
|
|
reference_type, cost_at_time, employee_id, device_id, notes)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
inventory_id, branch_id, operation_type, quantity,
|
|
reference_id, reference_type, cost_at_time,
|
|
getattr(g, 'employee_id', None),
|
|
getattr(g, 'device_id', None),
|
|
notes
|
|
))
|
|
op_id = cur.fetchone()[0]
|
|
cur.close()
|
|
return op_id
|
|
|
|
|
|
def record_purchase(conn, inventory_id, branch_id, quantity, unit_cost,
|
|
supplier_invoice=None, notes=None):
|
|
"""Record a purchase entry. Updates weighted average cost on the inventory item.
|
|
|
|
IMPORTANT: Cost is stored globally on the inventory item (not per-branch), so we
|
|
must use TOTAL stock across ALL branches when computing the weighted average.
|
|
Using branch-scoped stock would produce incorrect averages when the same item
|
|
exists in multiple branches.
|
|
"""
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT cost FROM inventory WHERE id = %s", (inventory_id,))
|
|
current_cost = float(cur.fetchone()[0] or 0)
|
|
|
|
# Use GLOBAL stock (all branches) because cost is a global field on the inventory item
|
|
current_stock = get_stock(conn, inventory_id, branch_id=None)
|
|
|
|
# Weighted average cost
|
|
if current_stock + quantity > 0:
|
|
new_cost = ((current_cost * max(current_stock, 0)) + (unit_cost * quantity)) / (max(current_stock, 0) + quantity)
|
|
else:
|
|
new_cost = unit_cost
|
|
|
|
# Update cost on inventory item
|
|
cur.execute("UPDATE inventory SET cost = %s WHERE id = %s", (round(new_cost, 2), inventory_id))
|
|
cur.close()
|
|
|
|
ref_note = f"Compra: {quantity} uds @ ${unit_cost:.2f}"
|
|
if supplier_invoice:
|
|
ref_note += f" | Factura: {supplier_invoice}"
|
|
if notes:
|
|
ref_note += f" | {notes}"
|
|
|
|
return record_operation(
|
|
conn, inventory_id, branch_id, 'PURCHASE', quantity,
|
|
cost_at_time=unit_cost, notes=ref_note
|
|
)
|
|
|
|
|
|
def record_sale(conn, inventory_id, branch_id, quantity, sale_id=None, cost_at_time=None):
|
|
"""Record a sale (negative quantity).
|
|
|
|
NOT exposed via HTTP endpoint — called directly by the POS blueprint (Plan 3)
|
|
which imports inventory_engine as part of the full sale transaction.
|
|
"""
|
|
op_id = record_operation(
|
|
conn, inventory_id, branch_id, 'SALE', -abs(quantity),
|
|
reference_id=sale_id, reference_type='sale', cost_at_time=cost_at_time
|
|
)
|
|
|
|
# Check if stock hit zero — push to owner (best-effort)
|
|
try:
|
|
remaining = get_stock(conn, inventory_id, branch_id)
|
|
if remaining <= 0:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT part_number, name FROM inventory WHERE id = %s", (inventory_id,))
|
|
inv_row = cur.fetchone()
|
|
cur.close()
|
|
if inv_row:
|
|
from services.push_service import notify_owner
|
|
notify_owner(
|
|
conn,
|
|
'Stock en Cero',
|
|
f'{inv_row[1] or inv_row[0]} se quedo sin existencias',
|
|
'/pos'
|
|
)
|
|
except Exception:
|
|
pass # Push failures never block sales
|
|
|
|
return op_id
|
|
|
|
|
|
def record_return(conn, inventory_id, branch_id, quantity, sale_id=None, notes=None):
|
|
"""Record a customer return (positive quantity)."""
|
|
return record_operation(
|
|
conn, inventory_id, branch_id, 'RETURN', abs(quantity),
|
|
reference_id=sale_id, reference_type='return', notes=notes
|
|
)
|
|
|
|
|
|
def record_adjustment(conn, inventory_id, branch_id, quantity, reason):
|
|
"""Record a manual stock adjustment. Reason is mandatory."""
|
|
if not reason or len(reason.strip()) < 3:
|
|
raise ValueError("Adjustment reason is mandatory (min 3 characters)")
|
|
|
|
log_action(conn, 'STOCK_ADJUST', 'inventory', inventory_id,
|
|
old_value={'stock': get_stock(conn, inventory_id, branch_id)},
|
|
new_value={'adjustment': quantity, 'reason': reason})
|
|
|
|
return record_operation(
|
|
conn, inventory_id, branch_id, 'ADJUST', quantity,
|
|
notes=f"Ajuste: {reason}"
|
|
)
|
|
|
|
|
|
def record_transfer(conn, inventory_id, from_branch_id, to_branch_id, quantity, notes=None):
|
|
"""Transfer stock between branches. Creates two operations (out + in)."""
|
|
out_id = record_operation(
|
|
conn, inventory_id, from_branch_id, 'TRANSFER', -abs(quantity),
|
|
notes=f"Transferencia a sucursal {to_branch_id}" + (f" | {notes}" if notes else "")
|
|
)
|
|
in_id = record_operation(
|
|
conn, inventory_id, to_branch_id, 'TRANSFER', abs(quantity),
|
|
notes=f"Transferencia desde sucursal {from_branch_id}" + (f" | {notes}" if notes else "")
|
|
)
|
|
return out_id, in_id
|
|
|
|
|
|
def record_initial(conn, inventory_id, branch_id, quantity, cost=None):
|
|
"""Record initial stock load."""
|
|
return record_operation(
|
|
conn, inventory_id, branch_id, 'INITIAL', quantity,
|
|
cost_at_time=cost, notes="Carga inicial de inventario"
|
|
)
|
|
|
|
|
|
def get_alerts(conn, branch_id=None):
|
|
"""Get stock alerts: zero stock, below minimum, above maximum."""
|
|
stock_map = get_stock_bulk(conn, branch_id)
|
|
cur = conn.cursor()
|
|
|
|
where = "WHERE i.is_active = true"
|
|
params = []
|
|
if branch_id:
|
|
where += " AND i.branch_id = %s"
|
|
params.append(branch_id)
|
|
|
|
cur.execute(f"""
|
|
SELECT i.id, i.part_number, i.name, i.min_stock, i.max_stock, i.branch_id
|
|
FROM inventory i {where}
|
|
""", params)
|
|
|
|
alerts = []
|
|
for row in cur.fetchall():
|
|
inv_id, part_num, name, min_s, max_s, br_id = row
|
|
stock = stock_map.get(inv_id, 0)
|
|
|
|
if stock <= 0:
|
|
alerts.append({'type': 'zero', 'severity': 'critical', 'inventory_id': inv_id,
|
|
'part_number': part_num, 'name': name, 'stock': stock, 'branch_id': br_id})
|
|
elif min_s and stock < min_s:
|
|
alerts.append({'type': 'low', 'severity': 'warning', 'inventory_id': inv_id,
|
|
'part_number': part_num, 'name': name, 'stock': stock,
|
|
'min_stock': min_s, 'branch_id': br_id})
|
|
elif max_s and stock > max_s:
|
|
alerts.append({'type': 'over', 'severity': 'info', 'inventory_id': inv_id,
|
|
'part_number': part_num, 'name': name, 'stock': stock,
|
|
'max_stock': max_s, 'branch_id': br_id})
|
|
|
|
cur.close()
|
|
return alerts
|
|
|
|
|
|
def get_movement_history(conn, inventory_id, limit=50):
|
|
"""Get operation history for a specific item."""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT io.id, io.operation_type, io.quantity, io.cost_at_time,
|
|
io.notes, io.created_at, e.name as employee_name, io.branch_id
|
|
FROM inventory_operations io
|
|
LEFT JOIN employees e ON io.employee_id = e.id
|
|
WHERE io.inventory_id = %s
|
|
ORDER BY io.created_at DESC
|
|
LIMIT %s
|
|
""", (inventory_id, limit))
|
|
history = []
|
|
for r in cur.fetchall():
|
|
history.append({
|
|
'id': r[0], 'type': r[1], 'quantity': r[2],
|
|
'cost': float(r[3]) if r[3] else None,
|
|
'notes': r[4], 'date': str(r[5]),
|
|
'employee': r[6], 'branch_id': r[7]
|
|
})
|
|
cur.close()
|
|
return history
|