OPCIÓN C + A1: Consolidación técnica + orjson

C1: Materialized view part_vehicle_preview (creación en progreso)
- Migración v3.3_materialized_view.sql
- catalog_service.py y dashboard/server.py ahora usan la MV
- Script refresh_part_vehicle_preview.py + warm_vehicle_cache.py actualizado

C2: Fix cache warming script (autónomo)
- Auto-re-ejecuta con sudo -u postgres si peer auth falla
- Args CLI: --dsn, --batch-size, --ttl, --dry-run

C3: CSS dinámico residual extraído
- sidebar.js → sidebar.css (nuevo)
- pos-utils.js → common.css (nuevo)
- Links agregados a 14 templates POS

C4: Script de load testing básico
- scripts/load_test.py: métricas p50/p95/p99, throughput, errores

C5: Documentación actualizada
- FASES_IMPLEMENTADAS.md: test count real, FASE 7 completa
- performance_audit_2026.md: anexo post-FASE 7, métricas actualizadas

A1: Serialización orjson
- pos/json_provider.py: DefaultJSONProvider con orjson.dumps/loads
- Aplicado a POS app y Dashboard server
- Fix indentation error en pos_bp.py

Tests: 73/73 pasando
This commit is contained in:
2026-04-27 09:36:03 +00:00
parent f893391916
commit 042acd6207
33 changed files with 998 additions and 281 deletions

189
scripts/load_test.py Normal file
View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Load testing script for Nexus POS critical endpoints.
Usage:
python3 load_test.py --url-base http://localhost:5001 --workers 10 --requests 100
python3 load_test.py --url-base http://localhost:5001 --workers 20 --duration 30
"""
import argparse
import json
import statistics
import sys
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.error import HTTPError, URLError
def make_request(url, method='GET', data=None, headers=None):
"""Execute a single HTTP request and return (status, latency_ms, error)."""
req = urllib.request.Request(url, method=method, data=data, headers=headers or {})
start = time.perf_counter()
try:
with urllib.request.urlopen(req, timeout=30) as resp:
_ = resp.read()
latency = (time.perf_counter() - start) * 1000
return resp.status, latency, None
except HTTPError as e:
latency = (time.perf_counter() - start) * 1000
return e.code, latency, str(e)
except URLError as e:
latency = (time.perf_counter() - start) * 1000
return 0, latency, str(e.reason)
except Exception as e:
latency = (time.perf_counter() - start) * 1000
return 0, latency, str(e)
def run_benchmark(url_base, endpoints, workers, requests_total, duration):
"""Run load test and return results dict."""
results = {}
for name, cfg in endpoints.items():
url = url_base + cfg['path']
method = cfg.get('method', 'GET')
data = cfg.get('data')
headers = cfg.get('headers')
if data and isinstance(data, dict):
data = json.dumps(data).encode('utf-8')
headers = headers or {}
headers.setdefault('Content-Type', 'application/json')
latencies = []
errors = []
start_time = time.time()
completed = 0
def task():
return make_request(url, method, data, headers)
if duration:
# Run for a fixed duration, counting requests
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = []
while time.time() - start_time < duration:
if len(futures) < workers * 2:
futures.append(ex.submit(task))
# Collect completed
done = [f for f in futures if f.done()]
for f in done:
futures.remove(f)
status, latency, err = f.result()
if err:
errors.append((status, err))
else:
latencies.append(latency)
completed += 1
if not done:
time.sleep(0.01)
# Drain remaining
for f in as_completed(futures):
status, latency, err = f.result()
if err:
errors.append((status, err))
else:
latencies.append(latency)
completed += 1
else:
# Fixed request count
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = [ex.submit(task) for _ in range(requests_total)]
for f in as_completed(futures):
status, latency, err = f.result()
if err:
errors.append((status, err))
else:
latencies.append(latency)
completed += 1
elapsed = time.time() - start_time
results[name] = {
'url': url,
'completed': completed,
'success': len(latencies),
'errors': len(errors),
'throughput': completed / elapsed if elapsed > 0 else 0,
'latencies': latencies,
'error_samples': errors[:3],
}
return results
def print_results(results):
print("\n" + "=" * 90)
print(f"{'Endpoint':<20} {'OK':>6} {'Err':>6} {'RPS':>8} {'p50':>8} {'p95':>8} {'p99':>8}")
print("=" * 90)
for name, r in results.items():
lat = sorted(r['latencies'])
p50 = lat[int(len(lat) * 0.5)] if lat else 0
p95 = lat[int(len(lat) * 0.95)] if lat else 0
p99 = lat[int(len(lat) * 0.99)] if lat else 0
print(f"{name:<20} {r['success']:>6} {r['errors']:>6} {r['throughput']:>8.1f} {p50:>7.1f}ms {p95:>7.1f}ms {p99:>7.1f}ms")
if r['error_samples']:
for status, err in r['error_samples']:
print(f" -> error sample: HTTP {status} {err[:60]}")
print("=" * 90)
def main():
parser = argparse.ArgumentParser(description='Nexus POS load test')
parser.add_argument('--url-base', default='http://localhost:5001',
help='Base URL of the POS server')
parser.add_argument('--workers', '-w', type=int, default=10,
help='Concurrent threads (default: 10)')
parser.add_argument('--requests', '-n', type=int, default=100,
help='Total requests per endpoint (default: 100)')
parser.add_argument('--duration', '-d', type=int, default=0,
help='Run for N seconds instead of fixed request count')
parser.add_argument('--json', '-j', action='store_true',
help='Output raw results as JSON')
parser.add_argument('--auth-token',
help='JWT bearer token for authenticated endpoints')
args = parser.parse_args()
endpoints = {
'catalog_search': {
'path': '/pos/api/catalog/search?q=filtro%20aire&limit=20',
'method': 'GET',
},
'inventory_items': {
'path': '/pos/api/inventory/items?page=1&per_page=50',
'method': 'GET',
},
'health': {
'path': '/pos/api/health',
'method': 'GET',
},
}
if args.auth_token:
for cfg in endpoints.values():
cfg.setdefault('headers', {})
cfg['headers']['Authorization'] = f'Bearer {args.auth_token}'
else:
print("WARNING: No --auth-token provided. Authenticated endpoints may return 401.")
print(" Run with a valid JWT if testing protected routes.\n")
print(f"Load testing {args.url_base}")
print(f"Workers: {args.workers} | Mode: {'duration ' + str(args.duration) + 's' if args.duration else 'requests ' + str(args.requests)}\n")
results = run_benchmark(args.url_base, endpoints, args.workers, args.requests, args.duration)
if args.json:
# Strip raw latencies array from JSON to keep it small
out = {k: {a: b for a, b in v.items() if a != 'latencies'} for k, v in results.items()}
out['_summary'] = {
'url_base': args.url_base,
'workers': args.workers,
'mode': 'duration' if args.duration else 'requests',
'value': args.duration or args.requests,
}
print(json.dumps(out, indent=2))
else:
print_results(results)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""Refresh the part_vehicle_preview materialized view.
Uses REFRESH MATERIALIZED VIEW CONCURRENTLY so reads are not blocked.
Requires the unique index idx_pvp_part to exist.
Usage:
python3 refresh_part_vehicle_preview.py
python3 refresh_part_vehicle_preview.py --dsn "postgresql://..."
Recommended cron (as postgres user or via systemd timer):
0 3 * * * /usr/bin/python3 /home/Autopartes/scripts/refresh_part_vehicle_preview.py >> /var/log/nexus-pos/mv_refresh.log 2>&1
"""
import argparse
import os
import subprocess
import sys
import time
from datetime import datetime
import psycopg2
DEFAULT_DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
def log(msg):
print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True)
def _connect(dsn):
return psycopg2.connect(dsn)
def _ensure_connection(dsn):
try:
return _connect(dsn)
except psycopg2.OperationalError as exc:
err = str(exc).lower()
if 'peer' in err or 'authentication' in err:
if os.geteuid() == 0:
log("ERROR: PostgreSQL peer authentication failed.")
log(" Run as postgres OS user: sudo -u postgres python3 " + __file__)
sys.exit(1)
log("Peer auth failed. Re-running with sudo -u postgres ...")
cmd = ['sudo', '-u', 'postgres', sys.executable, __file__]
env = os.environ.copy()
env['MASTER_DB_URL'] = dsn
for i, arg in enumerate(sys.argv[1:], start=1):
if arg in ('--dsn', '-d') and i < len(sys.argv) - 1:
env['MASTER_DB_URL'] = sys.argv[i + 1]
ret = subprocess.call(cmd, env=env)
sys.exit(ret)
raise
def main():
parser = argparse.ArgumentParser(description='Refresh part_vehicle_preview MV')
parser.add_argument('--dsn', '-d', default=DEFAULT_DSN, help='PostgreSQL DSN')
args = parser.parse_args()
log("Starting REFRESH MATERIALIZED VIEW CONCURRENTLY part_vehicle_preview ...")
conn = _ensure_connection(args.dsn)
conn.autocommit = True
cur = conn.cursor()
start = time.time()
try:
cur.execute("SET statement_timeout = 0;")
cur.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY part_vehicle_preview;")
elapsed = time.time() - start
log(f"Refresh completed in {elapsed:.1f}s")
except psycopg2.Error as exc:
log(f"ERROR: {exc}")
sys.exit(1)
finally:
cur.close()
conn.close()
if __name__ == '__main__':
main()

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Warm Redis cache for vehicle info (part_vehicle_preview alternative).
"""Warm Redis cache for vehicle info.
Runs in batches over all parts in the catalog, populating
nexus:vehicle:{part_id} keys in Redis. This eliminates the
@@ -7,71 +7,126 @@ DISTINCT ON + 4 JOINs query on vehicle_parts (2B rows) for
cached parts.
Usage:
export MASTER_DB_URL="postgresql://..."
export REDIS_URL="redis://localhost:6379/0"
python3 warm_vehicle_cache.py
python3 warm_vehicle_cache.py --dsn "postgresql://user:pass@localhost/db"
python3 warm_vehicle_cache.py --batch-size 10000 --ttl 7200
"""
import os, sys, json, time
import argparse
import os
import subprocess
import sys
import time
from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
import psycopg2
import redis
MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
DEFAULT_DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
BATCH_SIZE = 5000
TTL_SECONDS = 3600
DEFAULT_BATCH_SIZE = 5000
DEFAULT_TTL = 3600
def log(msg):
print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True)
def _connect(dsn):
"""Connect to PostgreSQL; raise on failure."""
return psycopg2.connect(dsn)
def _ensure_connection(dsn):
"""Try to connect. On peer-auth failure, re-run with sudo -u postgres."""
try:
return _connect(dsn)
except psycopg2.OperationalError as exc:
err = str(exc).lower()
if 'peer' in err or 'authentication' in err:
if os.geteuid() == 0:
# Already root — can't sudo to postgres usefully; give clear message
log("ERROR: PostgreSQL peer authentication failed.")
log(" Run as postgres OS user:")
log(" sudo -u postgres python3 " + __file__)
log(" Or set MASTER_DB_URL with TCP host+password:")
log(" export MASTER_DB_URL=postgresql://user:pass@localhost/nexus_autoparts")
sys.exit(1)
log("Peer auth failed. Re-running with sudo -u postgres ...")
cmd = ['sudo', '-u', 'postgres', sys.executable, __file__]
# Forward original env + CLI args
env = os.environ.copy()
env['MASTER_DB_URL'] = dsn
for i, arg in enumerate(sys.argv[1:], start=1):
if arg in ('--dsn', '-d') and i < len(sys.argv) - 1:
env['MASTER_DB_URL'] = sys.argv[i + 1]
ret = subprocess.call(cmd, env=env)
sys.exit(ret)
raise
def main():
print("Connecting to master DB and Redis...")
conn = psycopg2.connect(MASTER_DB_URL)
parser = argparse.ArgumentParser(description='Warm Redis cache for vehicle info')
parser.add_argument('--dsn', '-d', default=DEFAULT_DSN,
help='PostgreSQL DSN (default: MASTER_DB_URL env or peer auth)')
parser.add_argument('--batch-size', '-b', type=int, default=DEFAULT_BATCH_SIZE,
help=f'Batch size (default: {DEFAULT_BATCH_SIZE})')
parser.add_argument('--ttl', '-t', type=int, default=DEFAULT_TTL,
help=f'Redis TTL in seconds (default: {DEFAULT_TTL})')
parser.add_argument('--dry-run', action='store_true',
help='Do not write to Redis, just log what would be done')
args = parser.parse_args()
log("Connecting to master DB and Redis...")
conn = _ensure_connection(args.dsn)
cur = conn.cursor()
r = redis.from_url(REDIS_URL, decode_responses=True)
r.ping()
log("Connected.")
# Get all part_ids
cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part")
all_ids = [r[0] for r in cur.fetchall()]
all_ids = [row[0] for row in cur.fetchall()]
total = len(all_ids)
print(f"Total parts to warm: {total}")
log(f"Total parts to warm: {total}")
if total == 0:
log("No parts found. Exiting.")
return
processed = 0
cached = 0
start = time.time()
for i in range(0, total, BATCH_SIZE):
batch = all_ids[i:i + BATCH_SIZE]
for i in range(0, total, args.batch_size):
batch = all_ids[i:i + args.batch_size]
cur.execute("""
SELECT DISTINCT ON (vp.part_id)
vp.part_id, b.name_brand, m.name_model, y.year_car
FROM vehicle_parts vp
JOIN model_year_engine mye ON mye.id_mye = vp.model_year_engine_id
JOIN models m ON m.id_model = mye.model_id
JOIN brands b ON b.id_brand = m.brand_id
JOIN years y ON y.id_year = mye.year_id
WHERE vp.part_id = ANY(%s)
ORDER BY vp.part_id, y.year_car DESC
SELECT part_id, name_brand, name_model, year_car
FROM part_vehicle_preview
WHERE part_id = ANY(%s)
""", (batch,))
pipe = r.pipeline()
batch_cached = 0
for row in cur.fetchall():
info = f"{row[1]} {row[2]} {row[3]}"
pipe.setex(f'nexus:vehicle:{row[0]}', TTL_SECONDS, info)
batch_cached += 1
pipe.execute()
rows = cur.fetchall()
if not args.dry_run:
pipe = r.pipeline()
for row in rows:
info = f"{row[1]} {row[2]} {row[3]}"
pipe.setex(f'nexus:vehicle:{row[0]}', args.ttl, info)
pipe.execute()
batch_cached = len(rows)
processed += len(batch)
cached += batch_cached
elapsed = time.time() - start
rate = processed / elapsed if elapsed > 0 else 0
print(f" [{processed}/{total}] cached={batch_cached} ({rate:.0f}/s)")
log(f"[{processed}/{total}] cached={batch_cached} ({rate:.0f}/s)")
cur.close()
conn.close()
print(f"\nDone. Cached {cached} vehicle entries in {elapsed:.0f}s")
elapsed = time.time() - start
log(f"Done. Cached {cached} vehicle entries in {elapsed:.0f}s")
if __name__ == '__main__':