Files
Autoparts-DB/pos/blueprints/tasks_bp.py
consultoria-as a1be8dd0ea OPCIÓN A: A2 Virtual Scroll + A3 Celery + A4 asyncpg PoC + A5 particionamiento
A2 — Virtual scroll en tablas grandes:
- Nuevo helper VirtualScroll en pos/static/js/virtual-scroll.js
- inventory.js: tabla de productos con virtual scroll
- customers.js: tabla de clientes con virtual scroll
- fleet.js: renderMaintenance() y renderHistory() con virtual scroll
- Templates envueltos en .vs-container para scroll

A3 — Celery worker queue:
- pos/celery_app.py + pos/tasks.py (warm cache, bulk import, reports)
- Blueprint tasks_bp.py con endpoints /pos/api/tasks/*
- Script scripts/start_celery.sh

A4 — asyncpg + Quart PoC:
- pos/async_catalog.py: endpoint /pos/api/catalog/async-search
- scripts/benchmark_async_catalog.py: benchmark Flask vs Quart

A5 — Particionar vehicle_parts:
- scripts/partition_vehicle_parts.py: migración segura por hash (16 particiones)
- Soporta --dry-run, --skip-swap, --skip-drop

Tests: 36/36 pasando
2026-04-27 09:53:36 +00:00

47 lines
1.5 KiB
Python

"""Blueprint for background task management (Celery)."""
from flask import Blueprint, jsonify, request
from auth import require_auth
from tasks import warm_vehicle_cache_task, generate_report_task
tasks_bp = Blueprint('tasks', __name__, url_prefix='/pos/api/tasks')
@tasks_bp.route('/warm-cache', methods=['POST'])
@require_auth
def enqueue_warm_cache():
"""Enqueue vehicle cache warming task."""
task = warm_vehicle_cache_task.apply_async()
return jsonify({'task_id': task.id, 'status': 'queued'})
@tasks_bp.route('/report', methods=['POST'])
@require_auth
def enqueue_report():
"""Enqueue report generation task."""
data = request.get_json() or {}
report_type = data.get('report_type', 'sales')
params = data.get('params', {})
tenant_id = getattr(request, 'tenant_id', None)
task = generate_report_task.apply_async(args=[report_type, params, tenant_id])
return jsonify({'task_id': task.id, 'status': 'queued'})
@tasks_bp.route('/<task_id>/status', methods=['GET'])
@require_auth
def task_status(task_id):
"""Get status of a background task."""
from celery_app import celery
result = celery.AsyncResult(task_id)
response = {
'task_id': task_id,
'status': result.status,
}
if result.status == 'PROGRESS':
response['meta'] = result.info
elif result.successful():
response['result'] = result.result
elif result.failed():
response['error'] = str(result.result)
return jsonify(response)