OPCIÓN A: A2 Virtual Scroll + A3 Celery + A4 asyncpg PoC + A5 particionamiento
A2 — Virtual scroll en tablas grandes: - Nuevo helper VirtualScroll en pos/static/js/virtual-scroll.js - inventory.js: tabla de productos con virtual scroll - customers.js: tabla de clientes con virtual scroll - fleet.js: renderMaintenance() y renderHistory() con virtual scroll - Templates envueltos en .vs-container para scroll A3 — Celery worker queue: - pos/celery_app.py + pos/tasks.py (warm cache, bulk import, reports) - Blueprint tasks_bp.py con endpoints /pos/api/tasks/* - Script scripts/start_celery.sh A4 — asyncpg + Quart PoC: - pos/async_catalog.py: endpoint /pos/api/catalog/async-search - scripts/benchmark_async_catalog.py: benchmark Flask vs Quart A5 — Particionar vehicle_parts: - scripts/partition_vehicle_parts.py: migración segura por hash (16 particiones) - Soporta --dry-run, --skip-swap, --skip-drop Tests: 36/36 pasando
This commit is contained in:
137
scripts/benchmark_async_catalog.py
Normal file
137
scripts/benchmark_async_catalog.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark: compare Flask sync vs Quart async catalog search.
|
||||
|
||||
Prerequisites:
|
||||
- Flask POS server running on http://localhost:5001
|
||||
- Quart async server running on http://localhost:5002
|
||||
(start with: cd pos && hypercorn async_catalog:app --bind 0.0.0.0:5002)
|
||||
|
||||
Usage:
|
||||
python3 benchmark_async_catalog.py --workers 20 --requests 200
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
|
||||
def sync_request(url):
|
||||
req = urllib.request.Request(url)
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
_ = resp.read()
|
||||
return (time.perf_counter() - start) * 1000, resp.status, None
|
||||
except Exception as e:
|
||||
return (time.perf_counter() - start) * 1000, 0, str(e)
|
||||
|
||||
|
||||
async def async_request(session, url):
|
||||
import aiohttp
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
async with session.get(url) as resp:
|
||||
_ = await resp.read()
|
||||
return (time.perf_counter() - start) * 1000, resp.status, None
|
||||
except Exception as e:
|
||||
return (time.perf_counter() - start) * 1000, 0, str(e)
|
||||
|
||||
|
||||
def benchmark_sync(url, workers, requests_total):
|
||||
latencies = []
|
||||
errors = []
|
||||
with ThreadPoolExecutor(max_workers=workers) as ex:
|
||||
futures = [ex.submit(sync_request, url) for _ in range(requests_total)]
|
||||
for f in as_completed(futures):
|
||||
latency, status, err = f.result()
|
||||
if err:
|
||||
errors.append((status, err))
|
||||
else:
|
||||
latencies.append(latency)
|
||||
return latencies, errors
|
||||
|
||||
|
||||
async def benchmark_async(url, workers, requests_total):
|
||||
import aiohttp
|
||||
latencies = []
|
||||
errors = []
|
||||
connector = aiohttp.TCPConnector(limit=workers * 2)
|
||||
async with aiohttp.ClientSession(connector=connector) as session:
|
||||
sem = asyncio.Semaphore(workers)
|
||||
|
||||
async def task():
|
||||
async with sem:
|
||||
return await async_request(session, url)
|
||||
|
||||
results = await asyncio.gather(*[task() for _ in range(requests_total)])
|
||||
for latency, status, err in results:
|
||||
if err:
|
||||
errors.append((status, err))
|
||||
else:
|
||||
latencies.append(latency)
|
||||
return latencies, errors
|
||||
|
||||
|
||||
def report(label, latencies, errors, duration):
|
||||
if not latencies:
|
||||
print(f"{label}: NO successful requests")
|
||||
return
|
||||
lat = sorted(latencies)
|
||||
p50 = lat[int(len(lat) * 0.5)]
|
||||
p95 = lat[int(len(lat) * 0.95)]
|
||||
p99 = lat[int(len(lat) * 0.99)]
|
||||
mean = statistics.mean(lat)
|
||||
rps = len(lat) / duration if duration > 0 else 0
|
||||
print(f"{label:20s} | mean={mean:7.1f}ms | p50={p50:7.1f}ms | p95={p95:7.1f}ms | p99={p99:7.1f}ms | OK={len(lat)} | Err={len(errors)} | RPS={rps:.1f}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--flask-url', default='http://localhost:5001/pos/api/catalog/search?q=filtro%20aire&limit=20')
|
||||
parser.add_argument('--quart-url', default='http://localhost:5002/pos/api/catalog/async-search?q=filtro%20aire&limit=20')
|
||||
parser.add_argument('--workers', '-w', type=int, default=20)
|
||||
parser.add_argument('--requests', '-n', type=int, default=200)
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 100)
|
||||
print(f"Benchmark: {args.requests} requests, {args.workers} concurrent workers")
|
||||
print("=" * 100)
|
||||
|
||||
# Sync (Flask)
|
||||
print("\n[1/2] Warming up Flask...")
|
||||
sync_request(args.flask_url)
|
||||
print("[1/2] Benchmarking Flask (sync)...")
|
||||
start = time.time()
|
||||
lat_sync, err_sync = benchmark_sync(args.flask_url, args.workers, args.requests)
|
||||
dur_sync = time.time() - start
|
||||
report("Flask sync", lat_sync, err_sync, dur_sync)
|
||||
|
||||
# Async (Quart)
|
||||
print("\n[2/2] Warming up Quart...")
|
||||
asyncio.run(benchmark_async(args.quart_url, 5, 1))
|
||||
print("[2/2] Benchmarking Quart (async)...")
|
||||
start = time.time()
|
||||
lat_async, err_async = asyncio.run(benchmark_async(args.quart_url, args.workers, args.requests))
|
||||
dur_async = time.time() - start
|
||||
report("Quart async", lat_async, err_async, dur_async)
|
||||
|
||||
print("\n" + "=" * 100)
|
||||
if lat_sync and lat_async:
|
||||
improvement = (statistics.mean(lat_sync) - statistics.mean(lat_async)) / statistics.mean(lat_sync) * 100
|
||||
print(f"Mean latency improvement: {improvement:+.1f}%")
|
||||
print("=" * 100)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
import aiohttp
|
||||
except ImportError:
|
||||
print("ERROR: aiohttp is required for async benchmark.")
|
||||
print("Install: pip install aiohttp --break-system-packages")
|
||||
sys.exit(1)
|
||||
main()
|
||||
231
scripts/partition_vehicle_parts.py
Normal file
231
scripts/partition_vehicle_parts.py
Normal file
@@ -0,0 +1,231 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Partition vehicle_parts by HASH(part_id) into 16 partitions.
|
||||
|
||||
This is a HIGH-RISK operation on a 254 GB table. Run ONLY during maintenance window.
|
||||
|
||||
Strategy:
|
||||
1. Create partitioned table vehicle_parts_new with 16 hash partitions.
|
||||
2. Migrate data in batches of 500K rows (checkpoint-friendly).
|
||||
3. Atomically swap: rename old -> _old, new -> vehicle_parts.
|
||||
4. Validate counts and indexes.
|
||||
5. Drop old table after validation (or keep for rollback).
|
||||
|
||||
Usage:
|
||||
export MASTER_DB_URL="postgresql://postgres@/nexus_autoparts"
|
||||
python3 partition_vehicle_parts.py --dry-run
|
||||
python3 partition_vehicle_parts.py
|
||||
|
||||
Requires:
|
||||
- PostgreSQL 11+ (partitioning support)
|
||||
- ~300 GB free disk space during migration
|
||||
- Maintenance window (table is locked briefly during swap)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import psycopg2
|
||||
|
||||
DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
|
||||
BATCH_SIZE = 500_000
|
||||
PARTITIONS = 16
|
||||
|
||||
|
||||
def log(msg):
|
||||
print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True)
|
||||
|
||||
|
||||
def get_conn():
|
||||
return psycopg2.connect(DSN)
|
||||
|
||||
|
||||
def table_exists(cur, name):
|
||||
cur.execute("SELECT 1 FROM pg_tables WHERE tablename = %s", (name,))
|
||||
return cur.fetchone() is not None
|
||||
|
||||
|
||||
def get_row_count(cur, name):
|
||||
cur.execute(f"SELECT COUNT(*) FROM {name}")
|
||||
return cur.fetchone()[0]
|
||||
|
||||
|
||||
def get_max_id(cur, name):
|
||||
cur.execute(f"SELECT MAX(id_vehicle_part) FROM {name}")
|
||||
row = cur.fetchone()
|
||||
return row[0] or 0
|
||||
|
||||
|
||||
def create_partitioned_table(cur):
|
||||
log("Creating vehicle_parts_new (partitioned)...")
|
||||
cur.execute("""
|
||||
CREATE TABLE vehicle_parts_new (
|
||||
id_vehicle_part BIGSERIAL,
|
||||
part_id INTEGER NOT NULL,
|
||||
model_year_engine_id INTEGER NOT NULL,
|
||||
created_at TIMESTAMPTZ DEFAULT NOW()
|
||||
) PARTITION BY HASH (part_id);
|
||||
""")
|
||||
|
||||
for i in range(PARTITIONS):
|
||||
cur.execute(f"""
|
||||
CREATE TABLE vehicle_parts_p{i}
|
||||
PARTITION OF vehicle_parts_new
|
||||
FOR VALUES WITH (MODULUS {PARTITIONS}, REMAINDER {i});
|
||||
""")
|
||||
log(f"Created {PARTITIONS} partitions.")
|
||||
|
||||
|
||||
def migrate_data(cur, dry_run=False):
|
||||
max_id = get_max_id(cur, 'vehicle_parts')
|
||||
log(f"Max id_vehicle_part in source: {max_id}")
|
||||
if max_id == 0:
|
||||
log("Source table appears empty. Nothing to migrate.")
|
||||
return
|
||||
|
||||
start = 1
|
||||
total_inserted = 0
|
||||
batch_num = 0
|
||||
t0 = time.time()
|
||||
|
||||
while start <= max_id:
|
||||
end = start + BATCH_SIZE - 1
|
||||
if dry_run:
|
||||
log(f"[DRY-RUN] Would migrate id_vehicle_part {start}..{end}")
|
||||
else:
|
||||
cur.execute("""
|
||||
INSERT INTO vehicle_parts_new (id_vehicle_part, part_id, model_year_engine_id, created_at)
|
||||
SELECT id_vehicle_part, part_id, model_year_engine_id, created_at
|
||||
FROM vehicle_parts
|
||||
WHERE id_vehicle_part BETWEEN %s AND %s
|
||||
ON CONFLICT DO NOTHING;
|
||||
""", (start, end))
|
||||
inserted = cur.rowcount
|
||||
total_inserted += inserted
|
||||
batch_num += 1
|
||||
if batch_num % 10 == 0:
|
||||
elapsed = time.time() - t0
|
||||
rate = total_inserted / elapsed if elapsed > 0 else 0
|
||||
log(f" Batch {batch_num}: {start}..{end} | inserted={total_inserted} | {rate:.0f} rows/s")
|
||||
start = end + 1
|
||||
|
||||
if not dry_run:
|
||||
log(f"Migration complete. Total inserted: {total_inserted}")
|
||||
|
||||
|
||||
def create_indexes(cur):
|
||||
log("Creating indexes on vehicle_parts_new...")
|
||||
cur.execute("""
|
||||
CREATE INDEX idx_vp_new_part ON vehicle_parts_new(part_id);
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE INDEX idx_vp_new_mye ON vehicle_parts_new(model_year_engine_id);
|
||||
""")
|
||||
cur.execute("""
|
||||
ALTER TABLE vehicle_parts_new ADD CONSTRAINT uq_vp_new_mye_part
|
||||
UNIQUE (model_year_engine_id, part_id);
|
||||
""")
|
||||
log("Indexes created.")
|
||||
|
||||
|
||||
def swap_tables(cur):
|
||||
log("Swapping tables (exclusive lock)...")
|
||||
# Brief exclusive lock on the old table
|
||||
cur.execute("LOCK TABLE vehicle_parts IN ACCESS EXCLUSIVE MODE;")
|
||||
cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_old;")
|
||||
cur.execute("ALTER TABLE vehicle_parts_new RENAME TO vehicle_parts;")
|
||||
log("Swap complete.")
|
||||
|
||||
|
||||
def validate(cur):
|
||||
old_count = get_row_count(cur, 'vehicle_parts_old')
|
||||
new_count = get_row_count(cur, 'vehicle_parts')
|
||||
log(f"Validation: old={old_count} | new={new_count}")
|
||||
if old_count != new_count:
|
||||
log(f"WARNING: Row count mismatch! diff={old_count - new_count}")
|
||||
return False
|
||||
log("Validation PASSED.")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--dry-run', action='store_true',
|
||||
help='Show what would be done without executing')
|
||||
parser.add_argument('--skip-swap', action='store_true',
|
||||
help='Create new table and migrate, but do not swap')
|
||||
parser.add_argument('--skip-drop', action='store_true',
|
||||
help='Keep old table after swap (for rollback)')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.dry_run:
|
||||
log("=== DRY RUN ===")
|
||||
|
||||
log("Connecting to database...")
|
||||
conn = get_conn()
|
||||
conn.autocommit = False
|
||||
cur = conn.cursor()
|
||||
|
||||
try:
|
||||
# Check prerequisites
|
||||
if not table_exists(cur, 'vehicle_parts'):
|
||||
log("ERROR: vehicle_parts does not exist.")
|
||||
sys.exit(1)
|
||||
|
||||
if table_exists(cur, 'vehicle_parts_new'):
|
||||
log("WARNING: vehicle_parts_new already exists. Dropping...")
|
||||
if not args.dry_run:
|
||||
cur.execute("DROP TABLE IF EXISTS vehicle_parts_new CASCADE;")
|
||||
conn.commit()
|
||||
|
||||
# Step 1: Create partitioned table
|
||||
if not args.dry_run:
|
||||
create_partitioned_table(cur)
|
||||
conn.commit()
|
||||
|
||||
# Step 2: Migrate data
|
||||
migrate_data(cur, dry_run=args.dry_run)
|
||||
if not args.dry_run:
|
||||
conn.commit()
|
||||
|
||||
# Step 3: Create indexes
|
||||
if not args.dry_run:
|
||||
create_indexes(cur)
|
||||
conn.commit()
|
||||
|
||||
# Step 4: Swap
|
||||
if not args.dry_run and not args.skip_swap:
|
||||
swap_tables(cur)
|
||||
conn.commit()
|
||||
|
||||
# Step 5: Validate
|
||||
if validate(cur):
|
||||
if not args.skip_drop:
|
||||
log("Dropping old table...")
|
||||
cur.execute("DROP TABLE vehicle_parts_old CASCADE;")
|
||||
conn.commit()
|
||||
log("Old table dropped.")
|
||||
else:
|
||||
log("Old table kept as vehicle_parts_old.")
|
||||
else:
|
||||
log("VALIDATION FAILED. Rolling back swap...")
|
||||
cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_new;")
|
||||
cur.execute("ALTER TABLE vehicle_parts_old RENAME TO vehicle_parts;")
|
||||
conn.commit()
|
||||
log("Rollback complete.")
|
||||
sys.exit(1)
|
||||
|
||||
log("Done.")
|
||||
except Exception as exc:
|
||||
conn.rollback()
|
||||
log(f"ERROR: {exc}")
|
||||
raise
|
||||
finally:
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
8
scripts/start_celery.sh
Executable file
8
scripts/start_celery.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/bin/bash
|
||||
# Start Celery worker for Nexus POS background tasks
|
||||
|
||||
cd /home/Autopartes/pos
|
||||
export MASTER_DB_URL="${MASTER_DB_URL:-postgresql://postgres@/nexus_autoparts}"
|
||||
export REDIS_URL="${REDIS_URL:-redis://localhost:6379/0}"
|
||||
|
||||
exec celery -A celery_app worker --loglevel=info --concurrency=4 -n nexus-worker@%h
|
||||
Reference in New Issue
Block a user