Files
Autoparts-DB/scripts/qwen_batch_compat.py
consultoria-as a236187f3a feat: MercadoLibre integration + inventory bulk publish + WhatsApp bridge fixes
- Add MercadoLibre OAuth, listings, orders, webhooks and category search
- New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py
- New marketplace_external.html/js with ML management UI
- Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors
- Inventory: new .btn--meli styles, select/label CSS fixes
- WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog
- DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue
- Add Celery tasks for ML sync and webhook processing
- Sidebar: MercadoLibre navigation link
2026-05-26 04:24:07 +00:00

232 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Mass QWEN vehicle compatibility processor for Strada.
Processes inventory items in batches of 5 with parallel workers.
Usage:
python3 qwen_batch_compat.py --tenant=28 --batch-size=5 --workers=10 --checkpoint=qwen_progress.json
"""
import argparse
import json
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
import psycopg2
from tenant_db import get_tenant_conn, get_master_conn
from services.qwen_fitment import get_vehicle_fitment
DEFAULT_CHECKPOINT_FILE = '/tmp/qwen_progress.json'
PROGRESS_LOCK = Lock()
def load_checkpoint(checkpoint_file):
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'r') as f:
return set(json.load(f))
return set()
def save_checkpoint(checkpoint_file, processed_ids):
with PROGRESS_LOCK:
with open(checkpoint_file, 'w') as f:
json.dump(list(processed_ids), f)
def get_pending_items(tenant_id, processed_ids, limit=None):
"""Get inventory items that haven't been processed yet."""
conn = get_tenant_conn(tenant_id)
cur = conn.cursor()
# Get items without vehicle compatibility records
if processed_ids:
placeholders = ','.join(['%s'] * len(processed_ids))
query = f"""
SELECT id, part_number, name, brand
FROM inventory
WHERE id NOT IN ({placeholders})
AND is_active = true
ORDER BY id
"""
cur.execute(query, tuple(processed_ids))
else:
cur.execute("""
SELECT id, part_number, name, brand
FROM inventory
WHERE is_active = true
ORDER BY id
""")
rows = cur.fetchall()
cur.close()
conn.close()
if limit:
rows = rows[:limit]
return rows
def process_single_item(item_id, part_number, name, brand):
"""Process one item with QWEN."""
try:
result = get_vehicle_fitment(part_number, name, brand)
return {
'item_id': item_id,
'part_number': part_number,
'success': True,
'vehicles': result.get('vehicles', []),
'confidence': result.get('confidence', 0),
'notes': result.get('notes', ''),
}
except Exception as exc:
return {
'item_id': item_id,
'part_number': part_number,
'success': False,
'error': str(exc),
}
def save_results(tenant_id, results):
"""Save QWEN results to inventory_vehicle_compat."""
if not results:
return 0
conn = get_tenant_conn(tenant_id)
cur = conn.cursor()
saved = 0
for result in results:
if not result.get('success'):
continue
item_id = result['item_id']
vehicles = result.get('vehicles', [])[:200] # Limit to 200 vehicles per item
confidence = result.get('confidence', 0)
for v in vehicles:
mye_id = v.get('mye_id')
if mye_id:
cur.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence, make, model, year, created_at)
VALUES (%s, %s, 'qwen_ai', %s, %s, %s, %s, NOW())
ON CONFLICT (inventory_id, model_year_engine_id, make, model, year) DO NOTHING
""", (item_id, mye_id, confidence, v.get('make'), v.get('model'), v.get('year')))
else:
cur.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence, make, model, year, engine, engine_code, created_at)
VALUES (%s, NULL, 'qwen_ai', %s, %s, %s, %s, %s, %s, NOW())
""", (item_id, confidence, v.get('make'), v.get('model'), v.get('year'), v.get('engine'), v.get('engine_code')))
saved += 1
conn.commit()
cur.close()
conn.close()
return saved
def main():
parser = argparse.ArgumentParser(description='Batch QWEN vehicle compatibility processor')
parser.add_argument('--tenant', type=int, required=True, help='Tenant ID')
parser.add_argument('--workers', type=int, default=10, help='Number of parallel workers')
parser.add_argument('--limit', type=int, default=None, help='Max items to process (None = all)')
parser.add_argument('--checkpoint-file', default=DEFAULT_CHECKPOINT_FILE, help='Progress checkpoint file')
args = parser.parse_args()
checkpoint_file = args.checkpoint_file
print(f"=== QWEN Batch Compatibility Processor ===", flush=True)
print(f"Tenant: {args.tenant}", flush=True)
print(f"Workers: {args.workers}", flush=True)
print(f"Checkpoint: {args.checkpoint_file}", flush=True)
print(flush=True)
# Load checkpoint
processed_ids = load_checkpoint(checkpoint_file)
print(f"Previously processed: {len(processed_ids):,} items", flush=True)
# Get pending items
pending = get_pending_items(args.tenant, processed_ids, args.limit)
print(f"Pending items: {len(pending):,}", flush=True)
if not pending:
print("Nothing to process!", flush=True)
return
# Process with thread pool
total = len(pending)
completed = 0
success_count = 0
fail_count = 0
total_vehicles = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
future_to_item = {
executor.submit(process_single_item, item[0], item[1], item[2], item[3]): item
for item in pending
}
batch_results = []
batch_size = 50 # Save to DB every N items
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result(timeout=180)
batch_results.append(result)
if result['success']:
success_count += 1
total_vehicles += len(result.get('vehicles', []))
else:
fail_count += 1
processed_ids.add(result['item_id'])
except Exception as exc:
fail_count += 1
processed_ids.add(item[0])
print(f"Worker exception for {item[1]}: {exc}")
completed += 1
# Save checkpoint and DB batch periodically
if len(batch_results) >= batch_size:
save_checkpoint(checkpoint_file, list(processed_ids))
saved = save_results(args.tenant, batch_results)
batch_results = []
elapsed = time.time() - start_time
rate = completed / elapsed if elapsed > 0 else 0
eta = (total - completed) / rate if rate > 0 else 0
print(f" Progress: {completed:,}/{total:,} ({100*completed/total:.1f}%) | "
f"Success: {success_count} | Fail: {fail_count} | "
f"Vehicles: {total_vehicles} | "
f"Rate: {rate:.1f} items/min | ETA: {eta/60:.0f} min", flush=True)
# Final save
if batch_results:
save_results(args.tenant, batch_results)
save_checkpoint(checkpoint_file, list(processed_ids))
elapsed = time.time() - start_time
print(f"\n=== Complete ===", flush=True)
print(f"Processed: {completed:,}", flush=True)
print(f"Success: {success_count}", flush=True)
print(f"Failed: {fail_count}", flush=True)
print(f"Total vehicles found: {total_vehicles}", flush=True)
print(f"Elapsed: {elapsed/3600:.1f} hours", flush=True)
print(f"Avg rate: {completed/(elapsed/60):.1f} items/min", flush=True)
if __name__ == '__main__':
main()