C1: Materialized view part_vehicle_preview (creación en progreso) - Migración v3.3_materialized_view.sql - catalog_service.py y dashboard/server.py ahora usan la MV - Script refresh_part_vehicle_preview.py + warm_vehicle_cache.py actualizado C2: Fix cache warming script (autónomo) - Auto-re-ejecuta con sudo -u postgres si peer auth falla - Args CLI: --dsn, --batch-size, --ttl, --dry-run C3: CSS dinámico residual extraído - sidebar.js → sidebar.css (nuevo) - pos-utils.js → common.css (nuevo) - Links agregados a 14 templates POS C4: Script de load testing básico - scripts/load_test.py: métricas p50/p95/p99, throughput, errores C5: Documentación actualizada - FASES_IMPLEMENTADAS.md: test count real, FASE 7 completa - performance_audit_2026.md: anexo post-FASE 7, métricas actualizadas A1: Serialización orjson - pos/json_provider.py: DefaultJSONProvider con orjson.dumps/loads - Aplicado a POS app y Dashboard server - Fix indentation error en pos_bp.py Tests: 73/73 pasando
190 lines
7.1 KiB
Python
190 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Load testing script for Nexus POS critical endpoints.
|
|
|
|
Usage:
|
|
python3 load_test.py --url-base http://localhost:5001 --workers 10 --requests 100
|
|
python3 load_test.py --url-base http://localhost:5001 --workers 20 --duration 30
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import statistics
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
|
|
def make_request(url, method='GET', data=None, headers=None):
|
|
"""Execute a single HTTP request and return (status, latency_ms, error)."""
|
|
req = urllib.request.Request(url, method=method, data=data, headers=headers or {})
|
|
start = time.perf_counter()
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
_ = resp.read()
|
|
latency = (time.perf_counter() - start) * 1000
|
|
return resp.status, latency, None
|
|
except HTTPError as e:
|
|
latency = (time.perf_counter() - start) * 1000
|
|
return e.code, latency, str(e)
|
|
except URLError as e:
|
|
latency = (time.perf_counter() - start) * 1000
|
|
return 0, latency, str(e.reason)
|
|
except Exception as e:
|
|
latency = (time.perf_counter() - start) * 1000
|
|
return 0, latency, str(e)
|
|
|
|
|
|
def run_benchmark(url_base, endpoints, workers, requests_total, duration):
|
|
"""Run load test and return results dict."""
|
|
results = {}
|
|
|
|
for name, cfg in endpoints.items():
|
|
url = url_base + cfg['path']
|
|
method = cfg.get('method', 'GET')
|
|
data = cfg.get('data')
|
|
headers = cfg.get('headers')
|
|
if data and isinstance(data, dict):
|
|
data = json.dumps(data).encode('utf-8')
|
|
headers = headers or {}
|
|
headers.setdefault('Content-Type', 'application/json')
|
|
|
|
latencies = []
|
|
errors = []
|
|
start_time = time.time()
|
|
completed = 0
|
|
|
|
def task():
|
|
return make_request(url, method, data, headers)
|
|
|
|
if duration:
|
|
# Run for a fixed duration, counting requests
|
|
with ThreadPoolExecutor(max_workers=workers) as ex:
|
|
futures = []
|
|
while time.time() - start_time < duration:
|
|
if len(futures) < workers * 2:
|
|
futures.append(ex.submit(task))
|
|
# Collect completed
|
|
done = [f for f in futures if f.done()]
|
|
for f in done:
|
|
futures.remove(f)
|
|
status, latency, err = f.result()
|
|
if err:
|
|
errors.append((status, err))
|
|
else:
|
|
latencies.append(latency)
|
|
completed += 1
|
|
if not done:
|
|
time.sleep(0.01)
|
|
# Drain remaining
|
|
for f in as_completed(futures):
|
|
status, latency, err = f.result()
|
|
if err:
|
|
errors.append((status, err))
|
|
else:
|
|
latencies.append(latency)
|
|
completed += 1
|
|
else:
|
|
# Fixed request count
|
|
with ThreadPoolExecutor(max_workers=workers) as ex:
|
|
futures = [ex.submit(task) for _ in range(requests_total)]
|
|
for f in as_completed(futures):
|
|
status, latency, err = f.result()
|
|
if err:
|
|
errors.append((status, err))
|
|
else:
|
|
latencies.append(latency)
|
|
completed += 1
|
|
|
|
elapsed = time.time() - start_time
|
|
results[name] = {
|
|
'url': url,
|
|
'completed': completed,
|
|
'success': len(latencies),
|
|
'errors': len(errors),
|
|
'throughput': completed / elapsed if elapsed > 0 else 0,
|
|
'latencies': latencies,
|
|
'error_samples': errors[:3],
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
def print_results(results):
|
|
print("\n" + "=" * 90)
|
|
print(f"{'Endpoint':<20} {'OK':>6} {'Err':>6} {'RPS':>8} {'p50':>8} {'p95':>8} {'p99':>8}")
|
|
print("=" * 90)
|
|
for name, r in results.items():
|
|
lat = sorted(r['latencies'])
|
|
p50 = lat[int(len(lat) * 0.5)] if lat else 0
|
|
p95 = lat[int(len(lat) * 0.95)] if lat else 0
|
|
p99 = lat[int(len(lat) * 0.99)] if lat else 0
|
|
print(f"{name:<20} {r['success']:>6} {r['errors']:>6} {r['throughput']:>8.1f} {p50:>7.1f}ms {p95:>7.1f}ms {p99:>7.1f}ms")
|
|
if r['error_samples']:
|
|
for status, err in r['error_samples']:
|
|
print(f" -> error sample: HTTP {status} {err[:60]}")
|
|
print("=" * 90)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Nexus POS load test')
|
|
parser.add_argument('--url-base', default='http://localhost:5001',
|
|
help='Base URL of the POS server')
|
|
parser.add_argument('--workers', '-w', type=int, default=10,
|
|
help='Concurrent threads (default: 10)')
|
|
parser.add_argument('--requests', '-n', type=int, default=100,
|
|
help='Total requests per endpoint (default: 100)')
|
|
parser.add_argument('--duration', '-d', type=int, default=0,
|
|
help='Run for N seconds instead of fixed request count')
|
|
parser.add_argument('--json', '-j', action='store_true',
|
|
help='Output raw results as JSON')
|
|
parser.add_argument('--auth-token',
|
|
help='JWT bearer token for authenticated endpoints')
|
|
args = parser.parse_args()
|
|
|
|
endpoints = {
|
|
'catalog_search': {
|
|
'path': '/pos/api/catalog/search?q=filtro%20aire&limit=20',
|
|
'method': 'GET',
|
|
},
|
|
'inventory_items': {
|
|
'path': '/pos/api/inventory/items?page=1&per_page=50',
|
|
'method': 'GET',
|
|
},
|
|
'health': {
|
|
'path': '/pos/api/health',
|
|
'method': 'GET',
|
|
},
|
|
}
|
|
|
|
if args.auth_token:
|
|
for cfg in endpoints.values():
|
|
cfg.setdefault('headers', {})
|
|
cfg['headers']['Authorization'] = f'Bearer {args.auth_token}'
|
|
else:
|
|
print("WARNING: No --auth-token provided. Authenticated endpoints may return 401.")
|
|
print(" Run with a valid JWT if testing protected routes.\n")
|
|
|
|
print(f"Load testing {args.url_base}")
|
|
print(f"Workers: {args.workers} | Mode: {'duration ' + str(args.duration) + 's' if args.duration else 'requests ' + str(args.requests)}\n")
|
|
|
|
results = run_benchmark(args.url_base, endpoints, args.workers, args.requests, args.duration)
|
|
|
|
if args.json:
|
|
# Strip raw latencies array from JSON to keep it small
|
|
out = {k: {a: b for a, b in v.items() if a != 'latencies'} for k, v in results.items()}
|
|
out['_summary'] = {
|
|
'url_base': args.url_base,
|
|
'workers': args.workers,
|
|
'mode': 'duration' if args.duration else 'requests',
|
|
'value': args.duration or args.requests,
|
|
}
|
|
print(json.dumps(out, indent=2))
|
|
else:
|
|
print_results(results)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|