Files
Autoparts-DB/scripts/test_performance_fixes.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

243 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""Test script to verify N+1 fixes and race condition protections.
This script tests:
1. Batch inventory fetch in _enrich_items (no N+1)
2. Batch stock preload in process_sale
3. FOR UPDATE locks are applied correctly
4. executemany for sale_items works
5. Basic sale creation still functions
"""
import sys
import os
import time
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'pos'))
# Need env vars for config
os.environ.setdefault('MASTER_DB_URL', 'postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts')
os.environ.setdefault('TENANT_DB_URL_TEMPLATE', 'postgresql://nexus:nexus_autoparts_2026@localhost/{db_name}')
os.environ.setdefault('POS_JWT_SECRET', 'test-secret-for-validation-only')
os.environ.setdefault('DATABASE_URL', os.environ['MASTER_DB_URL'])
from tenant_db import get_tenant_conn_by_dbname
from services.pos_engine import process_sale, calculate_totals
from services.inventory_engine import get_stock, get_stock_bulk, record_sale
from blueprints.pos_bp import _enrich_items
import psycopg2
def test_batch_inventory_fetch():
"""Test that _enrich_items fetches all items in batch (no N+1)."""
print("\n[TEST] Batch inventory fetch in _enrich_items...")
conn = get_tenant_conn_by_dbname('tenant_refaccionaria_demo')
cur = conn.cursor()
# Get some inventory IDs
cur.execute("SELECT id FROM inventory WHERE is_active = true LIMIT 3")
inv_ids = [r[0] for r in cur.fetchall()]
if len(inv_ids) < 2:
print(" SKIP: Need at least 2 inventory items")
cur.close()
conn.close()
return True
items = [{'inventory_id': iid, 'quantity': 1} for iid in inv_ids]
# Time the batch fetch
start = time.time()
enriched = _enrich_items(cur, items)
elapsed = time.time() - start
assert len(enriched) == len(inv_ids), "Not all items were enriched"
assert all('part_number' in e for e in enriched), "Missing part_number in enriched items"
print(f" OK: Enriched {len(enriched)} items in {elapsed:.3f}s (batch fetch)")
cur.close()
conn.close()
return True
def test_batch_stock_preload():
"""Test that get_stock_bulk fetches all stock in one query."""
print("\n[TEST] Batch stock preload with get_stock_bulk...")
conn = get_tenant_conn_by_dbname('tenant_refaccionaria_demo')
start = time.time()
stock_map = get_stock_bulk(conn)
elapsed = time.time() - start
print(f" OK: Fetched stock for {len(stock_map)} items in {elapsed:.3f}s (single query)")
conn.close()
return True
def test_sale_creation():
"""Test that a basic sale can be created with the optimized code."""
print("\n[TEST] Sale creation with optimized engine...")
conn = get_tenant_conn_by_dbname('tenant_refaccionaria_demo')
cur = conn.cursor()
# Get an inventory item and an employee
cur.execute("SELECT id, branch_id FROM inventory WHERE is_active = true LIMIT 1")
inv_row = cur.fetchone()
if not inv_row:
print(" SKIP: No inventory items available")
cur.close()
conn.close()
return True
inv_id = inv_row[0]
branch_id_val = inv_row[1]
cur.execute("SELECT id FROM employees WHERE role = 'owner' LIMIT 1")
emp_row = cur.fetchone()
employee_id = emp_row[0] if emp_row else 1
# Get or create an open cash register
cur.execute("SELECT id FROM cash_registers WHERE status = 'open' AND branch_id = %s LIMIT 1", (branch_id_val,))
reg_row = cur.fetchone()
if not reg_row:
# Create one
cur.execute("INSERT INTO cash_registers (branch_id, employee_id, register_number, opening_amount, status) VALUES (%s, %s, %s, %s, 'open') RETURNING id",
(branch_id_val, employee_id, 1, 1000.00))
register_id = cur.fetchone()[0]
conn.commit()
else:
register_id = reg_row[0]
cur.close()
# Create minimal Flask request context for g object
from flask import Flask, g
app = Flask('test')
with app.test_request_context():
g.branch_id = branch_id_val
g.employee_id = employee_id
g.employee_role = 'owner'
g.device_id = 'test-device'
g.max_discount_pct = 100
g.permissions = set()
sale_data = {
'items': [{
'inventory_id': inv_id,
'quantity': 1,
'unit_price': 100.00,
'discount_pct': 0,
'tax_rate': 0.16
}],
'customer_id': None,
'payment_method': 'efectivo',
'sale_type': 'cash',
'register_id': register_id,
'amount_paid': 116.00,
}
start = time.time()
sale = process_sale(conn, sale_data)
conn.commit()
elapsed = time.time() - start
assert sale['id'] > 0, "Sale was not created"
assert sale['total'] > 0, "Sale total is invalid"
print(f" OK: Created sale #{sale['id']} for ${sale['total']:.2f} in {elapsed:.3f}s")
# Cleanup: cancel the test sale
from services.pos_engine import cancel_sale
g.branch_id = branch_id_val
g.employee_id = employee_id
g.employee_role = 'owner'
g.device_id = 'test-device'
g.max_discount_pct = 100
g.permissions = set()
cancel_sale(conn, sale['id'], "Test cleanup")
conn.commit()
print(f" OK: Cancelled test sale #{sale['id']}")
conn.close()
return True
def test_race_condition_locks():
"""Verify that FOR UPDATE is present in the code (static check)."""
print("\n[TEST] Race condition protection (FOR UPDATE locks)...")
engine_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'pos', 'services', 'pos_engine.py')
with open(engine_file) as f:
content = f.read()
checks = [
('inventory FOR UPDATE', 'FOR UPDATE' in content),
('customers FOR UPDATE', 'FOR UPDATE' in content),
]
for name, result in checks:
status = "OK" if result else "FAIL"
print(f" {status}: {name}")
if not result:
return False
return True
def main():
print("=" * 60)
print(" Nexus Autoparts — Performance Fixes Validation")
print("=" * 60)
results = []
try:
results.append(("Batch inventory fetch", test_batch_inventory_fetch()))
except Exception as e:
print(f" FAIL: {e}")
results.append(("Batch inventory fetch", False))
try:
results.append(("Batch stock preload", test_batch_stock_preload()))
except Exception as e:
print(f" FAIL: {e}")
results.append(("Batch stock preload", False))
try:
results.append(("Race condition locks", test_race_condition_locks()))
except Exception as e:
print(f" FAIL: {e}")
results.append(("Race condition locks", False))
try:
results.append(("Sale creation (end-to-end)", test_sale_creation()))
except Exception as e:
print(f" FAIL: {e}")
results.append(("Sale creation (end-to-end)", False))
print("\n" + "=" * 60)
passed = sum(1 for _, r in results if r)
total = len(results)
print(f" Results: {passed}/{total} tests passed")
print("=" * 60)
if passed < total:
print("\nFailed tests:")
for name, result in results:
if not result:
print(f" - {name}")
sys.exit(1)
else:
print("\nAll performance fixes validated successfully!")
sys.exit(0)
if __name__ == '__main__':
main()