#!/usr/bin/env python3 """Load testing script for Nexus POS critical endpoints. Usage: python3 load_test.py --url-base http://localhost:5001 --workers 10 --requests 100 python3 load_test.py --url-base http://localhost:5001 --workers 20 --duration 30 """ import argparse import json import statistics import sys import time import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.error import HTTPError, URLError def make_request(url, method='GET', data=None, headers=None): """Execute a single HTTP request and return (status, latency_ms, error).""" req = urllib.request.Request(url, method=method, data=data, headers=headers or {}) start = time.perf_counter() try: with urllib.request.urlopen(req, timeout=30) as resp: _ = resp.read() latency = (time.perf_counter() - start) * 1000 return resp.status, latency, None except HTTPError as e: latency = (time.perf_counter() - start) * 1000 return e.code, latency, str(e) except URLError as e: latency = (time.perf_counter() - start) * 1000 return 0, latency, str(e.reason) except Exception as e: latency = (time.perf_counter() - start) * 1000 return 0, latency, str(e) def run_benchmark(url_base, endpoints, workers, requests_total, duration): """Run load test and return results dict.""" results = {} for name, cfg in endpoints.items(): url = url_base + cfg['path'] method = cfg.get('method', 'GET') data = cfg.get('data') headers = cfg.get('headers') if data and isinstance(data, dict): data = json.dumps(data).encode('utf-8') headers = headers or {} headers.setdefault('Content-Type', 'application/json') latencies = [] errors = [] start_time = time.time() completed = 0 def task(): return make_request(url, method, data, headers) if duration: # Run for a fixed duration, counting requests with ThreadPoolExecutor(max_workers=workers) as ex: futures = [] while time.time() - start_time < duration: if len(futures) < workers * 2: futures.append(ex.submit(task)) # Collect completed done = [f for f in futures if f.done()] for f in done: futures.remove(f) status, latency, err = f.result() if err: errors.append((status, err)) else: latencies.append(latency) completed += 1 if not done: time.sleep(0.01) # Drain remaining for f in as_completed(futures): status, latency, err = f.result() if err: errors.append((status, err)) else: latencies.append(latency) completed += 1 else: # Fixed request count with ThreadPoolExecutor(max_workers=workers) as ex: futures = [ex.submit(task) for _ in range(requests_total)] for f in as_completed(futures): status, latency, err = f.result() if err: errors.append((status, err)) else: latencies.append(latency) completed += 1 elapsed = time.time() - start_time results[name] = { 'url': url, 'completed': completed, 'success': len(latencies), 'errors': len(errors), 'throughput': completed / elapsed if elapsed > 0 else 0, 'latencies': latencies, 'error_samples': errors[:3], } return results def print_results(results): print("\n" + "=" * 90) print(f"{'Endpoint':<20} {'OK':>6} {'Err':>6} {'RPS':>8} {'p50':>8} {'p95':>8} {'p99':>8}") print("=" * 90) for name, r in results.items(): lat = sorted(r['latencies']) p50 = lat[int(len(lat) * 0.5)] if lat else 0 p95 = lat[int(len(lat) * 0.95)] if lat else 0 p99 = lat[int(len(lat) * 0.99)] if lat else 0 print(f"{name:<20} {r['success']:>6} {r['errors']:>6} {r['throughput']:>8.1f} {p50:>7.1f}ms {p95:>7.1f}ms {p99:>7.1f}ms") if r['error_samples']: for status, err in r['error_samples']: print(f" -> error sample: HTTP {status} {err[:60]}") print("=" * 90) def main(): parser = argparse.ArgumentParser(description='Nexus POS load test') parser.add_argument('--url-base', default='http://localhost:5001', help='Base URL of the POS server') parser.add_argument('--workers', '-w', type=int, default=10, help='Concurrent threads (default: 10)') parser.add_argument('--requests', '-n', type=int, default=100, help='Total requests per endpoint (default: 100)') parser.add_argument('--duration', '-d', type=int, default=0, help='Run for N seconds instead of fixed request count') parser.add_argument('--json', '-j', action='store_true', help='Output raw results as JSON') parser.add_argument('--auth-token', help='JWT bearer token for authenticated endpoints') args = parser.parse_args() endpoints = { 'catalog_search': { 'path': '/pos/api/catalog/search?q=filtro%20aire&limit=20', 'method': 'GET', }, 'inventory_items': { 'path': '/pos/api/inventory/items?page=1&per_page=50', 'method': 'GET', }, 'health': { 'path': '/pos/api/health', 'method': 'GET', }, } if args.auth_token: for cfg in endpoints.values(): cfg.setdefault('headers', {}) cfg['headers']['Authorization'] = f'Bearer {args.auth_token}' else: print("WARNING: No --auth-token provided. Authenticated endpoints may return 401.") print(" Run with a valid JWT if testing protected routes.\n") print(f"Load testing {args.url_base}") print(f"Workers: {args.workers} | Mode: {'duration ' + str(args.duration) + 's' if args.duration else 'requests ' + str(args.requests)}\n") results = run_benchmark(args.url_base, endpoints, args.workers, args.requests, args.duration) if args.json: # Strip raw latencies array from JSON to keep it small out = {k: {a: b for a, b in v.items() if a != 'latencies'} for k, v in results.items()} out['_summary'] = { 'url_base': args.url_base, 'workers': args.workers, 'mode': 'duration' if args.duration else 'requests', 'value': args.duration or args.requests, } print(json.dumps(out, indent=2)) else: print_results(results) if __name__ == '__main__': main()