diff --git a/pos/async_catalog.py b/pos/async_catalog.py new file mode 100644 index 0000000..6ca1bbf --- /dev/null +++ b/pos/async_catalog.py @@ -0,0 +1,93 @@ +"""Async catalog search PoC using Quart + asyncpg. + +Run: + hypercorn async_catalog:app --bind 0.0.0.0:5002 + +Endpoint: + GET /pos/api/catalog/async-search?q=filtro&limit=50&tenant_id=1 + +This demonstrates I/O non-blocking search using asyncpg. +""" + +import os +import asyncio +import asyncpg +from quart import Quart, request, jsonify, g + +app = Quart(__name__) + +MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') + +# Shared connection pool +_pool = None + +async def _get_pool(): + global _pool + if _pool is None: + _pool = await asyncpg.create_pool(MASTER_DB_URL, min_size=2, max_size=10) + return _pool + + +@app.before_serving +async def startup(): + await _get_pool() + + +@app.after_serving +async def shutdown(): + global _pool + if _pool: + await _pool.close() + _pool = None + + +@app.route('/pos/api/catalog/async-search') +async def async_search(): + q = request.args.get('q', '').strip() + if not q or len(q) < 2: + return jsonify({'data': []}) + limit = min(request.args.get('limit', 50, type=int), 100) + + pool = await _get_pool() + async with pool.acquire() as conn: + # Simple text search (PoC scope) + clean_q = q.upper().replace(' ', '') + rows = await conn.fetch(""" + SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, + p.image_url, p.group_id + FROM parts p + WHERE REPLACE(UPPER(p.oem_part_number), ' ', '') LIKE $1 + OR p.name_part ILIKE $2 + OR p.name_es ILIKE $2 + ORDER BY p.oem_part_number + LIMIT $3 + """, f'%{clean_q}%', f'%{q}%', limit) + + part_ids = [r['id_part'] for r in rows] + vehicle_map = {} + if part_ids: + vrows = await conn.fetch(""" + SELECT part_id, name_brand, name_model, year_car + FROM part_vehicle_preview + WHERE part_id = ANY($1) + """, part_ids) + for vr in vrows: + vehicle_map[vr['part_id']] = f"{vr['name_brand']} {vr['name_model']} {vr['year_car']}" + + results = [] + for r in rows: + results.append({ + 'id': r['id_part'], + 'oem_part_number': r['oem_part_number'], + 'name': r['name_part'], + 'name_es': r['name_es'], + 'image_url': r['image_url'], + 'group_id': r['group_id'], + 'vehicle_info': vehicle_map.get(r['id_part'], ''), + }) + + return jsonify({'data': results}) + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5002) diff --git a/pos/blueprints/tasks_bp.py b/pos/blueprints/tasks_bp.py new file mode 100644 index 0000000..5afa0f8 --- /dev/null +++ b/pos/blueprints/tasks_bp.py @@ -0,0 +1,46 @@ +"""Blueprint for background task management (Celery).""" + +from flask import Blueprint, jsonify, request +from auth import require_auth +from tasks import warm_vehicle_cache_task, generate_report_task + +tasks_bp = Blueprint('tasks', __name__, url_prefix='/pos/api/tasks') + + +@tasks_bp.route('/warm-cache', methods=['POST']) +@require_auth +def enqueue_warm_cache(): + """Enqueue vehicle cache warming task.""" + task = warm_vehicle_cache_task.apply_async() + return jsonify({'task_id': task.id, 'status': 'queued'}) + + +@tasks_bp.route('/report', methods=['POST']) +@require_auth +def enqueue_report(): + """Enqueue report generation task.""" + data = request.get_json() or {} + report_type = data.get('report_type', 'sales') + params = data.get('params', {}) + tenant_id = getattr(request, 'tenant_id', None) + task = generate_report_task.apply_async(args=[report_type, params, tenant_id]) + return jsonify({'task_id': task.id, 'status': 'queued'}) + + +@tasks_bp.route('//status', methods=['GET']) +@require_auth +def task_status(task_id): + """Get status of a background task.""" + from celery_app import celery + result = celery.AsyncResult(task_id) + response = { + 'task_id': task_id, + 'status': result.status, + } + if result.status == 'PROGRESS': + response['meta'] = result.info + elif result.successful(): + response['result'] = result.result + elif result.failed(): + response['error'] = str(result.result) + return jsonify(response) diff --git a/pos/celery_app.py b/pos/celery_app.py new file mode 100644 index 0000000..c1adca5 --- /dev/null +++ b/pos/celery_app.py @@ -0,0 +1,29 @@ +"""Celery application configuration for Nexus POS background tasks.""" + +import os +from celery import Celery + +REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') +# Use Redis DB 1 for Celery to avoid clashing with app cache (DB 0) +BROKER_URL = os.environ.get('CELERY_BROKER_URL', REDIS_URL.replace('/0', '/1')) +BACKEND_URL = os.environ.get('CELERY_RESULT_BACKEND', REDIS_URL.replace('/0', '/1')) + +celery = Celery( + 'nexus', + broker=BROKER_URL, + backend=BACKEND_URL, + include=['tasks'], +) + +celery.conf.update( + task_serializer='json', + accept_content=['json'], + result_serializer='json', + timezone='America/Mexico_City', + enable_utc=True, + task_track_started=True, + task_time_limit=3600, # 1 hour hard limit + task_soft_time_limit=3300, # 55 min soft limit + worker_prefetch_multiplier=1, + result_expires=86400, # Results expire after 24h +) diff --git a/pos/static/js/customers.js b/pos/static/js/customers.js index 9b1f025..4cd24d4 100644 --- a/pos/static/js/customers.js +++ b/pos/static/js/customers.js @@ -94,45 +94,53 @@ const Customers = (() => { } } + function renderCustomerRow(c) { + const tier = tierMap[c.price_tier] || 'Mostrador'; + const tClass = tierClass[c.price_tier] || 'mostrador'; + const limit = parseFloat(c.credit_limit || 0); + const balance = parseFloat(c.credit_balance || 0); + const available = Math.max(0, limit - balance); + const usedPct = limit > 0 ? Math.round((balance / limit) * 100) : 0; + const creditClass = usedPct >= 80 ? 'none' : usedPct >= 60 ? 'low' : ''; + const num = String(c.id).padStart(5, '0'); + const selClass = (currentCustomer && currentCustomer.id === c.id) ? 'selected' : ''; + return '' + + '' + num + '' + + '' + + '
' + (c.name || '') + '
' + + '
' + (c.email || '') + '
' + + '' + + '' + (c.rfc || '-') + '' + + '' + (c.phone || '-') + '' + + '' + (c.email || '-') + '' + + '' + tier + '' + + '' + fmt(available) + '' + + '' + formatDate(c.last_purchase || c.created_at) + '' + + '' + statusBadge(c) + '' + + ''; + } + + var customersVS = null; + function renderTable(customers) { const tbody = document.getElementById('customersBody'); if (!tbody) return; - tbody.innerHTML = ''; if (!customers || customers.length === 0) { tbody.innerHTML = 'Sin resultados.'; return; } - customers.forEach((c, idx) => { - const tier = tierMap[c.price_tier] || 'Mostrador'; - const tClass = tierClass[c.price_tier] || 'mostrador'; - const limit = parseFloat(c.credit_limit || 0); - const balance = parseFloat(c.credit_balance || 0); - const available = Math.max(0, limit - balance); - const usedPct = limit > 0 ? Math.round((balance / limit) * 100) : 0; - const creditClass = usedPct >= 80 ? 'none' : usedPct >= 60 ? 'low' : ''; - const num = String(c.id).padStart(5, '0'); - - const tr = document.createElement('tr'); - if (currentCustomer && currentCustomer.id === c.id) tr.className = 'selected'; - tr.onclick = () => selectCustomer(c.id); - tr.innerHTML = ` - ${num} - -
${c.name || ''}
-
${c.email || ''}
- - ${c.rfc || '-'} - ${c.phone || '-'} - ${c.email || '-'} - ${tier} - ${fmt(available)} - ${formatDate(c.last_purchase || c.created_at)} - ${statusBadge(c)} - `; - tbody.appendChild(tr); - }); + if (!customersVS) { + customersVS = new VirtualScroll({ + container: tbody, + rowHeight: 52, + buffer: 3, + renderRow: renderCustomerRow, + emptyHtml: 'Sin resultados.' + }); + } + customersVS.setData(customers); } function renderPagination(pag) { diff --git a/pos/static/js/fleet.js b/pos/static/js/fleet.js index 48155c2..a056dfe 100644 --- a/pos/static/js/fleet.js +++ b/pos/static/js/fleet.js @@ -298,52 +298,66 @@ var Fleet = (function() { }); } + function renderMaintRow(item) { + var v = item.vehicle; + var s = item.schedule; + var now = new Date(); + var isOverdue = false; + if (s.next_due_at && new Date(s.next_due_at) < now) isOverdue = true; + if (s.next_due_km && s.next_due_km <= v.current_mileage) isOverdue = true; + + var interval = ''; + if (s.interval_km) interval += fmt(s.interval_km) + ' km'; + if (s.interval_km && s.interval_months) interval += ' / '; + if (s.interval_months) interval += s.interval_months + ' meses'; + + var next = ''; + if (s.next_due_at) next += fmtDate(s.next_due_at); + if (s.next_due_at && s.next_due_km) next += ' / '; + if (s.next_due_km) next += fmt(s.next_due_km) + ' km'; + + return '' + + '' + esc(v.plate || 'S/P') + '
' + + '' + + esc((v.make || '') + ' ' + (v.model || '')) + '' + + '' + esc(s.maintenance_type) + '' + + '' + (interval || '—') + '' + + '' + fmtDate(s.last_done_at) + (s.last_done_km ? '
' + fmt(s.last_done_km) + ' km' : '') + '' + + '' + (next || '—') + '' + + '' + + (isOverdue ? 'Vencido' : 'Al dia') + '' + + '' + + ''; + } + + var maintVS = null; + function renderMaintenance(results) { var body = document.getElementById('maintBody'); - var rows = []; + var items = []; results.forEach(function(r) { - var v = r.vehicle; r.schedules.forEach(function(s) { - var now = new Date(); - var isOverdue = false; - if (s.next_due_at && new Date(s.next_due_at) < now) isOverdue = true; - if (s.next_due_km && s.next_due_km <= v.current_mileage) isOverdue = true; - - var interval = ''; - if (s.interval_km) interval += fmt(s.interval_km) + ' km'; - if (s.interval_km && s.interval_months) interval += ' / '; - if (s.interval_months) interval += s.interval_months + ' meses'; - - var next = ''; - if (s.next_due_at) next += fmtDate(s.next_due_at); - if (s.next_due_at && s.next_due_km) next += ' / '; - if (s.next_due_km) next += fmt(s.next_due_km) + ' km'; - - rows.push( - '' + - '' + esc(v.plate || 'S/P') + '
' + - '' + - esc((v.make || '') + ' ' + (v.model || '')) + '' + - '' + esc(s.maintenance_type) + '' + - '' + (interval || '—') + '' + - '' + fmtDate(s.last_done_at) + (s.last_done_km ? '
' + fmt(s.last_done_km) + ' km' : '') + '' + - '' + (next || '—') + '' + - '' + - (isOverdue ? 'Vencido' : 'Al dia') + '' + - '' + - '' - ); + items.push({vehicle: r.vehicle, schedule: s}); }); }); - if (!rows.length) { + if (!items.length) { body.innerHTML = '' + 'No hay programas de mantenimiento.
'; return; } - body.innerHTML = rows.join(''); + if (!maintVS) { + maintVS = new VirtualScroll({ + container: body, + rowHeight: 64, + buffer: 3, + renderRow: renderMaintRow, + emptyHtml: 'No hay programas de mantenimiento.' + }); + } + maintVS.setData(items); } // ─── History Tab ─── @@ -371,6 +385,21 @@ var Fleet = (function() { }); } + function renderHistoryRow(l) { + return '' + + '' + fmtDate(l.created_at) + '' + + '' + esc(l._plate) + '
' + + '' + esc(l._make) + '' + + '' + esc(l.maintenance_type) + '' + + '' + fmt(l.mileage_at) + '' + + '' + fmtMoney(l.cost) + '' + + '' + esc(l.employee_name || '—') + '' + + '' + esc(l.notes || '—') + '' + + ''; + } + + var historyVS = null; + function renderHistory(results) { var body = document.getElementById('historyBody'); var allLogs = []; @@ -393,20 +422,16 @@ var Fleet = (function() { return; } - var html = ''; - allLogs.forEach(function(l) { - html += '' + - '' + fmtDate(l.created_at) + '' + - '' + esc(l._plate) + '
' + - '' + esc(l._make) + '' + - '' + esc(l.maintenance_type) + '' + - '' + fmt(l.mileage_at) + '' + - '' + fmtMoney(l.cost) + '' + - '' + esc(l.employee_name || '—') + '' + - '' + esc(l.notes || '—') + '' + - ''; - }); - body.innerHTML = html; + if (!historyVS) { + historyVS = new VirtualScroll({ + container: body, + rowHeight: 48, + buffer: 3, + renderRow: renderHistoryRow, + emptyHtml: 'No hay registros de mantenimiento' + }); + } + historyVS.setData(allLogs); } // ─── Alerts Tab ─── diff --git a/pos/static/js/inventory.js b/pos/static/js/inventory.js index 594bb27..a011955 100644 --- a/pos/static/js/inventory.js +++ b/pos/static/js/inventory.js @@ -13,6 +13,7 @@ var currentPage = 1; var currentSearch = ''; var draftCountId = null; + var inventoryVS = null; // --- API helper --- function apiFetch(url, opts) { @@ -52,6 +53,24 @@ // STOCK / PRODUCTS (panel-stock) // ===================================================================== + function renderInventoryRow(it) { + return '' + + '' + esc(it.barcode) + '' + + '' + esc(it.part_number) + '' + + '' + esc(it.name) + '' + + '' + esc(it.brand) + '' + + '' + it.stock + '' + + '$' + fmt(it.cost) + '' + + '$' + fmt(it.price_1) + '' + + '$' + fmt(it.price_2) + '' + + '$' + fmt(it.price_3) + '' + + '' + esc(it.location) + '' + + '' + + ' ' + + '' + + ''; + } + function loadItems(page, search) { currentPage = page || 1; currentSearch = search !== undefined ? search : currentSearch; @@ -69,23 +88,16 @@ return; } - tbody.innerHTML = items.map(function (it) { - return '' + - '' + esc(it.barcode) + '' + - '' + esc(it.part_number) + '' + - '' + esc(it.name) + '' + - '' + esc(it.brand) + '' + - '' + it.stock + '' + - '$' + fmt(it.cost) + '' + - '$' + fmt(it.price_1) + '' + - '$' + fmt(it.price_2) + '' + - '$' + fmt(it.price_3) + '' + - '' + esc(it.location) + '' + - '' + - ' ' + - '' + - ''; - }).join(''); + if (!inventoryVS) { + inventoryVS = new VirtualScroll({ + container: tbody, + rowHeight: 48, + buffer: 3, + renderRow: renderInventoryRow, + emptyHtml: 'Sin productos' + }); + } + inventoryVS.setData(items); // Pagination var pg = data.pagination || {}; diff --git a/pos/static/js/virtual-scroll.js b/pos/static/js/virtual-scroll.js new file mode 100644 index 0000000..882e449 --- /dev/null +++ b/pos/static/js/virtual-scroll.js @@ -0,0 +1,155 @@ +/** + * virtual-scroll.js — Lightweight vanilla-JS virtual scroll helper. + * Supports
containers and tables. + * + * Usage: + * var vs = new VirtualScroll({ + * container: document.getElementById('myTableBody'), + * rowHeight: 48, + * buffer: 5, + * renderRow: function(item, index) { return '...'; } + * }); + * vs.setData(arrayOfItems); + */ +(function(window) { + 'use strict'; + + function VirtualScroll(opts) { + this.container = opts.container; + this.rowHeight = opts.rowHeight || 48; + this.buffer = opts.buffer || 5; + this.renderRow = opts.renderRow || function() { return ''; }; + this.emptyHtml = opts.emptyHtml || ''; + this.data = []; + this._scrollHandler = this._onScroll.bind(this); + this._resizeHandler = this._onResize.bind(this); + this._isTbody = this.container.tagName === 'TBODY'; + this._init(); + } + + VirtualScroll.prototype._init = function() { + var c = this.container; + if (!this._isTbody) { + c.style.overflowY = 'auto'; + c.style.position = 'relative'; + if (!c.style.maxHeight && !c.style.height) { + c.style.maxHeight = '60vh'; + } + } else { + // For tbody, scroll is on the parent element (table wrapper) + var table = c.closest('table'); + if (table) { + var wrapper = table.parentElement; + if (wrapper && wrapper.classList.contains('vs-container')) { + wrapper.addEventListener('scroll', this._scrollHandler, { passive: true }); + } + } + } + window.addEventListener('resize', this._resizeHandler, { passive: true }); + }; + + VirtualScroll.prototype.setData = function(data) { + this.data = data || []; + this._render(); + }; + + VirtualScroll.prototype.refresh = function() { + this._render(); + }; + + VirtualScroll.prototype._onScroll = function() { + this._render(); + }; + + VirtualScroll.prototype._onResize = function() { + this._render(); + }; + + VirtualScroll.prototype._getScrollTop = function() { + if (this._isTbody) { + var table = this.container.closest('table'); + if (table) { + var wrapper = table.parentElement; + if (wrapper && wrapper.classList.contains('vs-container')) { + return wrapper.scrollTop; + } + } + return 0; + } + return this.container.scrollTop; + }; + + VirtualScroll.prototype._getContainerHeight = function() { + if (this._isTbody) { + var table = this.container.closest('table'); + if (table) { + var wrapper = table.parentElement; + if (wrapper && wrapper.classList.contains('vs-container')) { + return wrapper.clientHeight; + } + } + return 600; + } + return this.container.clientHeight; + }; + + VirtualScroll.prototype._render = function() { + var data = this.data; + var rowH = this.rowHeight; + var buffer = this.buffer; + + if (!data.length) { + if (this._isTbody) { + this.container.innerHTML = this.emptyHtml; + } else { + this.container.innerHTML = this.emptyHtml; + } + return; + } + + var scrollTop = this._getScrollTop(); + var containerHeight = this._getContainerHeight(); + var startIdx = Math.max(0, Math.floor(scrollTop / rowH) - buffer); + var endIdx = Math.min(data.length, Math.ceil((scrollTop + containerHeight) / rowH) + buffer); + + var html = ''; + if (this._isTbody) { + // Top spacer row + var topSpacerHeight = startIdx * rowH; + if (topSpacerHeight > 0) { + html += ''; + } + for (var i = startIdx; i < endIdx; i++) { + html += this.renderRow(data[i], i); + } + // Bottom spacer row + var bottomSpacerHeight = (data.length - endIdx) * rowH; + if (bottomSpacerHeight > 0) { + html += ''; + } + } else { + for (var j = startIdx; j < endIdx; j++) { + html += this.renderRow(data[j], j); + } + } + + this.container.innerHTML = html; + }; + + VirtualScroll.prototype.destroy = function() { + if (this._isTbody) { + var table = this.container.closest('table'); + if (table) { + var wrapper = table.parentElement; + if (wrapper && wrapper.classList.contains('vs-container')) { + wrapper.removeEventListener('scroll', this._scrollHandler); + } + } + } else { + this.container.removeEventListener('scroll', this._scrollHandler); + } + window.removeEventListener('resize', this._resizeHandler); + }; + + window.VirtualScroll = VirtualScroll; +})(window); diff --git a/pos/tasks.py b/pos/tasks.py new file mode 100644 index 0000000..8d66a80 --- /dev/null +++ b/pos/tasks.py @@ -0,0 +1,98 @@ +"""Celery tasks for Nexus POS background jobs.""" + +import os +import sys +import time + +# Ensure pos/ is on path for imports when Celery worker runs standalone +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from celery_app import celery +import psycopg2 +import redis as redis_lib + +MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') +REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') + + +def _get_db(): + return psycopg2.connect(MASTER_DB_URL) + + +def _get_redis(): + return redis_lib.from_url(REDIS_URL, decode_responses=True) + + +@celery.task(bind=True, max_retries=3) +def warm_vehicle_cache_task(self, batch_size=5000, ttl=3600): + """Warm Redis cache for vehicle info from part_vehicle_preview.""" + conn = _get_db() + cur = conn.cursor() + r = _get_redis() + r.ping() + + cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part") + all_ids = [row[0] for row in cur.fetchall()] + total = len(all_ids) + + processed = 0 + cached = 0 + start = time.time() + + for i in range(0, total, batch_size): + batch = all_ids[i:i + batch_size] + cur.execute(""" + SELECT part_id, name_brand, name_model, year_car + FROM part_vehicle_preview + WHERE part_id = ANY(%s) + """, (batch,)) + + pipe = r.pipeline() + batch_cached = 0 + for row in cur.fetchall(): + info = f"{row[1]} {row[2]} {row[3]}" + pipe.setex(f'nexus:vehicle:{row[0]}', ttl, info) + batch_cached += 1 + pipe.execute() + + processed += len(batch) + cached += batch_cached + self.update_state( + state='PROGRESS', + meta={'current': processed, 'total': total, 'cached': cached} + ) + + cur.close() + conn.close() + elapsed = time.time() - start + return {'total': total, 'cached': cached, 'elapsed': int(elapsed)} + + +@celery.task(bind=True, max_retries=2) +def bulk_import_inventory_task(self, csv_path, tenant_id, branch_id=None): + """Bulk import inventory from CSV in background.""" + from services.inventory_engine import bulk_import_csv + conn = _get_db() + try: + result = bulk_import_csv(conn, csv_path, tenant_id, branch_id) + conn.commit() + return result + except Exception as exc: + conn.rollback() + raise self.retry(exc=exc) + finally: + conn.close() + + +@celery.task(bind=True, max_retries=1) +def generate_report_task(self, report_type, params, tenant_id): + """Generate heavy reports asynchronously.""" + # Placeholder: implement actual report generation per type + self.update_state(state='PROGRESS', meta={'step': 'collecting_data'}) + time.sleep(2) # Simulate work + return { + 'report_type': report_type, + 'tenant_id': tenant_id, + 'status': 'completed', + 'url': f'/pos/static/reports/{report_type}_{tenant_id}.pdf', + } diff --git a/pos/templates/customers.html b/pos/templates/customers.html index 02b00c0..64314f0 100644 --- a/pos/templates/customers.html +++ b/pos/templates/customers.html @@ -618,6 +618,7 @@ + diff --git a/pos/templates/fleet.html b/pos/templates/fleet.html index 37796e6..740974d 100644 --- a/pos/templates/fleet.html +++ b/pos/templates/fleet.html @@ -78,42 +78,46 @@

Programas de Mantenimiento

- - - - - - - - - - - - - - - -
VehiculoTipoIntervaloUltimoProximoEstado
Cargando...
+
+ + + + + + + + + + + + + + + +
VehiculoTipoIntervaloUltimoProximoEstado
Cargando...
+
- - - - - - - - - - - - - - - -
FechaVehiculoTipoKmCostoEmpleadoNotas
Cargando...
+
+ + + + + + + + + + + + + + + +
FechaVehiculoTipoKmCostoEmpleadoNotas
Cargando...
+
diff --git a/pos/templates/inventory.html b/pos/templates/inventory.html index 6363ae5..9438269 100644 --- a/pos/templates/inventory.html +++ b/pos/templates/inventory.html @@ -312,25 +312,27 @@
- - - - - - - - - - - - - - - - - - -
BarcodeNo. ParteNombreMarcaStockCostoPrecio 1Precio 2Precio 3UbicaciónAcciones
+
+ + + + + + + + + + + + + + + + + + +
BarcodeNo. ParteNombreMarcaStockCostoPrecio 1Precio 2Precio 3UbicaciónAcciones
+
@@ -811,6 +813,7 @@ + diff --git a/requirements.txt b/requirements.txt index 5599249..978d477 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,6 @@ PyJWT>=2.8 bcrypt>=4.0 openpyxl>=3.1 orjson +quart +asyncpg +aiohttp diff --git a/scripts/benchmark_async_catalog.py b/scripts/benchmark_async_catalog.py new file mode 100644 index 0000000..979596b --- /dev/null +++ b/scripts/benchmark_async_catalog.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Benchmark: compare Flask sync vs Quart async catalog search. + +Prerequisites: + - Flask POS server running on http://localhost:5001 + - Quart async server running on http://localhost:5002 + (start with: cd pos && hypercorn async_catalog:app --bind 0.0.0.0:5002) + +Usage: + python3 benchmark_async_catalog.py --workers 20 --requests 200 +""" + +import argparse +import asyncio +import json +import statistics +import sys +import time +import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def sync_request(url): + req = urllib.request.Request(url) + start = time.perf_counter() + try: + with urllib.request.urlopen(req, timeout=30) as resp: + _ = resp.read() + return (time.perf_counter() - start) * 1000, resp.status, None + except Exception as e: + return (time.perf_counter() - start) * 1000, 0, str(e) + + +async def async_request(session, url): + import aiohttp + start = time.perf_counter() + try: + async with session.get(url) as resp: + _ = await resp.read() + return (time.perf_counter() - start) * 1000, resp.status, None + except Exception as e: + return (time.perf_counter() - start) * 1000, 0, str(e) + + +def benchmark_sync(url, workers, requests_total): + latencies = [] + errors = [] + with ThreadPoolExecutor(max_workers=workers) as ex: + futures = [ex.submit(sync_request, url) for _ in range(requests_total)] + for f in as_completed(futures): + latency, status, err = f.result() + if err: + errors.append((status, err)) + else: + latencies.append(latency) + return latencies, errors + + +async def benchmark_async(url, workers, requests_total): + import aiohttp + latencies = [] + errors = [] + connector = aiohttp.TCPConnector(limit=workers * 2) + async with aiohttp.ClientSession(connector=connector) as session: + sem = asyncio.Semaphore(workers) + + async def task(): + async with sem: + return await async_request(session, url) + + results = await asyncio.gather(*[task() for _ in range(requests_total)]) + for latency, status, err in results: + if err: + errors.append((status, err)) + else: + latencies.append(latency) + return latencies, errors + + +def report(label, latencies, errors, duration): + if not latencies: + print(f"{label}: NO successful requests") + return + lat = sorted(latencies) + p50 = lat[int(len(lat) * 0.5)] + p95 = lat[int(len(lat) * 0.95)] + p99 = lat[int(len(lat) * 0.99)] + mean = statistics.mean(lat) + rps = len(lat) / duration if duration > 0 else 0 + print(f"{label:20s} | mean={mean:7.1f}ms | p50={p50:7.1f}ms | p95={p95:7.1f}ms | p99={p99:7.1f}ms | OK={len(lat)} | Err={len(errors)} | RPS={rps:.1f}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--flask-url', default='http://localhost:5001/pos/api/catalog/search?q=filtro%20aire&limit=20') + parser.add_argument('--quart-url', default='http://localhost:5002/pos/api/catalog/async-search?q=filtro%20aire&limit=20') + parser.add_argument('--workers', '-w', type=int, default=20) + parser.add_argument('--requests', '-n', type=int, default=200) + args = parser.parse_args() + + print("=" * 100) + print(f"Benchmark: {args.requests} requests, {args.workers} concurrent workers") + print("=" * 100) + + # Sync (Flask) + print("\n[1/2] Warming up Flask...") + sync_request(args.flask_url) + print("[1/2] Benchmarking Flask (sync)...") + start = time.time() + lat_sync, err_sync = benchmark_sync(args.flask_url, args.workers, args.requests) + dur_sync = time.time() - start + report("Flask sync", lat_sync, err_sync, dur_sync) + + # Async (Quart) + print("\n[2/2] Warming up Quart...") + asyncio.run(benchmark_async(args.quart_url, 5, 1)) + print("[2/2] Benchmarking Quart (async)...") + start = time.time() + lat_async, err_async = asyncio.run(benchmark_async(args.quart_url, args.workers, args.requests)) + dur_async = time.time() - start + report("Quart async", lat_async, err_async, dur_async) + + print("\n" + "=" * 100) + if lat_sync and lat_async: + improvement = (statistics.mean(lat_sync) - statistics.mean(lat_async)) / statistics.mean(lat_sync) * 100 + print(f"Mean latency improvement: {improvement:+.1f}%") + print("=" * 100) + + +if __name__ == '__main__': + try: + import aiohttp + except ImportError: + print("ERROR: aiohttp is required for async benchmark.") + print("Install: pip install aiohttp --break-system-packages") + sys.exit(1) + main() diff --git a/scripts/partition_vehicle_parts.py b/scripts/partition_vehicle_parts.py new file mode 100644 index 0000000..539dbb1 --- /dev/null +++ b/scripts/partition_vehicle_parts.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +"""Partition vehicle_parts by HASH(part_id) into 16 partitions. + +This is a HIGH-RISK operation on a 254 GB table. Run ONLY during maintenance window. + +Strategy: +1. Create partitioned table vehicle_parts_new with 16 hash partitions. +2. Migrate data in batches of 500K rows (checkpoint-friendly). +3. Atomically swap: rename old -> _old, new -> vehicle_parts. +4. Validate counts and indexes. +5. Drop old table after validation (or keep for rollback). + +Usage: + export MASTER_DB_URL="postgresql://postgres@/nexus_autoparts" + python3 partition_vehicle_parts.py --dry-run + python3 partition_vehicle_parts.py + +Requires: + - PostgreSQL 11+ (partitioning support) + - ~300 GB free disk space during migration + - Maintenance window (table is locked briefly during swap) +""" + +import argparse +import os +import sys +import time +from datetime import datetime + +import psycopg2 + +DSN = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') +BATCH_SIZE = 500_000 +PARTITIONS = 16 + + +def log(msg): + print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True) + + +def get_conn(): + return psycopg2.connect(DSN) + + +def table_exists(cur, name): + cur.execute("SELECT 1 FROM pg_tables WHERE tablename = %s", (name,)) + return cur.fetchone() is not None + + +def get_row_count(cur, name): + cur.execute(f"SELECT COUNT(*) FROM {name}") + return cur.fetchone()[0] + + +def get_max_id(cur, name): + cur.execute(f"SELECT MAX(id_vehicle_part) FROM {name}") + row = cur.fetchone() + return row[0] or 0 + + +def create_partitioned_table(cur): + log("Creating vehicle_parts_new (partitioned)...") + cur.execute(""" + CREATE TABLE vehicle_parts_new ( + id_vehicle_part BIGSERIAL, + part_id INTEGER NOT NULL, + model_year_engine_id INTEGER NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() + ) PARTITION BY HASH (part_id); + """) + + for i in range(PARTITIONS): + cur.execute(f""" + CREATE TABLE vehicle_parts_p{i} + PARTITION OF vehicle_parts_new + FOR VALUES WITH (MODULUS {PARTITIONS}, REMAINDER {i}); + """) + log(f"Created {PARTITIONS} partitions.") + + +def migrate_data(cur, dry_run=False): + max_id = get_max_id(cur, 'vehicle_parts') + log(f"Max id_vehicle_part in source: {max_id}") + if max_id == 0: + log("Source table appears empty. Nothing to migrate.") + return + + start = 1 + total_inserted = 0 + batch_num = 0 + t0 = time.time() + + while start <= max_id: + end = start + BATCH_SIZE - 1 + if dry_run: + log(f"[DRY-RUN] Would migrate id_vehicle_part {start}..{end}") + else: + cur.execute(""" + INSERT INTO vehicle_parts_new (id_vehicle_part, part_id, model_year_engine_id, created_at) + SELECT id_vehicle_part, part_id, model_year_engine_id, created_at + FROM vehicle_parts + WHERE id_vehicle_part BETWEEN %s AND %s + ON CONFLICT DO NOTHING; + """, (start, end)) + inserted = cur.rowcount + total_inserted += inserted + batch_num += 1 + if batch_num % 10 == 0: + elapsed = time.time() - t0 + rate = total_inserted / elapsed if elapsed > 0 else 0 + log(f" Batch {batch_num}: {start}..{end} | inserted={total_inserted} | {rate:.0f} rows/s") + start = end + 1 + + if not dry_run: + log(f"Migration complete. Total inserted: {total_inserted}") + + +def create_indexes(cur): + log("Creating indexes on vehicle_parts_new...") + cur.execute(""" + CREATE INDEX idx_vp_new_part ON vehicle_parts_new(part_id); + """) + cur.execute(""" + CREATE INDEX idx_vp_new_mye ON vehicle_parts_new(model_year_engine_id); + """) + cur.execute(""" + ALTER TABLE vehicle_parts_new ADD CONSTRAINT uq_vp_new_mye_part + UNIQUE (model_year_engine_id, part_id); + """) + log("Indexes created.") + + +def swap_tables(cur): + log("Swapping tables (exclusive lock)...") + # Brief exclusive lock on the old table + cur.execute("LOCK TABLE vehicle_parts IN ACCESS EXCLUSIVE MODE;") + cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_old;") + cur.execute("ALTER TABLE vehicle_parts_new RENAME TO vehicle_parts;") + log("Swap complete.") + + +def validate(cur): + old_count = get_row_count(cur, 'vehicle_parts_old') + new_count = get_row_count(cur, 'vehicle_parts') + log(f"Validation: old={old_count} | new={new_count}") + if old_count != new_count: + log(f"WARNING: Row count mismatch! diff={old_count - new_count}") + return False + log("Validation PASSED.") + return True + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--dry-run', action='store_true', + help='Show what would be done without executing') + parser.add_argument('--skip-swap', action='store_true', + help='Create new table and migrate, but do not swap') + parser.add_argument('--skip-drop', action='store_true', + help='Keep old table after swap (for rollback)') + args = parser.parse_args() + + if args.dry_run: + log("=== DRY RUN ===") + + log("Connecting to database...") + conn = get_conn() + conn.autocommit = False + cur = conn.cursor() + + try: + # Check prerequisites + if not table_exists(cur, 'vehicle_parts'): + log("ERROR: vehicle_parts does not exist.") + sys.exit(1) + + if table_exists(cur, 'vehicle_parts_new'): + log("WARNING: vehicle_parts_new already exists. Dropping...") + if not args.dry_run: + cur.execute("DROP TABLE IF EXISTS vehicle_parts_new CASCADE;") + conn.commit() + + # Step 1: Create partitioned table + if not args.dry_run: + create_partitioned_table(cur) + conn.commit() + + # Step 2: Migrate data + migrate_data(cur, dry_run=args.dry_run) + if not args.dry_run: + conn.commit() + + # Step 3: Create indexes + if not args.dry_run: + create_indexes(cur) + conn.commit() + + # Step 4: Swap + if not args.dry_run and not args.skip_swap: + swap_tables(cur) + conn.commit() + + # Step 5: Validate + if validate(cur): + if not args.skip_drop: + log("Dropping old table...") + cur.execute("DROP TABLE vehicle_parts_old CASCADE;") + conn.commit() + log("Old table dropped.") + else: + log("Old table kept as vehicle_parts_old.") + else: + log("VALIDATION FAILED. Rolling back swap...") + cur.execute("ALTER TABLE vehicle_parts RENAME TO vehicle_parts_new;") + cur.execute("ALTER TABLE vehicle_parts_old RENAME TO vehicle_parts;") + conn.commit() + log("Rollback complete.") + sys.exit(1) + + log("Done.") + except Exception as exc: + conn.rollback() + log(f"ERROR: {exc}") + raise + finally: + cur.close() + conn.close() + + +if __name__ == '__main__': + main() diff --git a/scripts/start_celery.sh b/scripts/start_celery.sh new file mode 100755 index 0000000..18064c8 --- /dev/null +++ b/scripts/start_celery.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# Start Celery worker for Nexus POS background tasks + +cd /home/Autopartes/pos +export MASTER_DB_URL="${MASTER_DB_URL:-postgresql://postgres@/nexus_autoparts}" +export REDIS_URL="${REDIS_URL:-redis://localhost:6379/0}" + +exec celery -A celery_app worker --loglevel=info --concurrency=4 -n nexus-worker@%h