#!/usr/bin/env python3 """ Mass QWEN vehicle compatibility processor for Strada. Processes inventory items in batches of 5 with parallel workers. Usage: python3 qwen_batch_compat.py --tenant=28 --batch-size=5 --workers=10 --checkpoint=qwen_progress.json """ import argparse import json import os import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos')) import psycopg2 from tenant_db import get_tenant_conn, get_master_conn from services.qwen_fitment import get_vehicle_fitment DEFAULT_CHECKPOINT_FILE = '/tmp/qwen_progress.json' PROGRESS_LOCK = Lock() def load_checkpoint(checkpoint_file): if os.path.exists(checkpoint_file): with open(checkpoint_file, 'r') as f: return set(json.load(f)) return set() def save_checkpoint(checkpoint_file, processed_ids): with PROGRESS_LOCK: with open(checkpoint_file, 'w') as f: json.dump(list(processed_ids), f) def get_pending_items(tenant_id, processed_ids, limit=None): """Get inventory items that haven't been processed yet.""" conn = get_tenant_conn(tenant_id) cur = conn.cursor() # Get items without vehicle compatibility records if processed_ids: placeholders = ','.join(['%s'] * len(processed_ids)) query = f""" SELECT id, part_number, name, brand FROM inventory WHERE id NOT IN ({placeholders}) AND is_active = true ORDER BY id """ cur.execute(query, tuple(processed_ids)) else: cur.execute(""" SELECT id, part_number, name, brand FROM inventory WHERE is_active = true ORDER BY id """) rows = cur.fetchall() cur.close() conn.close() if limit: rows = rows[:limit] return rows def process_single_item(item_id, part_number, name, brand): """Process one item with QWEN.""" try: result = get_vehicle_fitment(part_number, name, brand) return { 'item_id': item_id, 'part_number': part_number, 'success': True, 'vehicles': result.get('vehicles', []), 'confidence': result.get('confidence', 0), 'notes': result.get('notes', ''), } except Exception as exc: return { 'item_id': item_id, 'part_number': part_number, 'success': False, 'error': str(exc), } def save_results(tenant_id, results): """Save QWEN results to inventory_vehicle_compat.""" if not results: return 0 conn = get_tenant_conn(tenant_id) cur = conn.cursor() saved = 0 for result in results: if not result.get('success'): continue item_id = result['item_id'] vehicles = result.get('vehicles', [])[:200] # Limit to 200 vehicles per item confidence = result.get('confidence', 0) for v in vehicles: mye_id = v.get('mye_id') if mye_id: cur.execute(""" INSERT INTO inventory_vehicle_compat (inventory_id, model_year_engine_id, source, confidence, make, model, year, created_at) VALUES (%s, %s, 'qwen_ai', %s, %s, %s, %s, NOW()) ON CONFLICT (inventory_id, model_year_engine_id, make, model, year) DO NOTHING """, (item_id, mye_id, confidence, v.get('make'), v.get('model'), v.get('year'))) else: cur.execute(""" INSERT INTO inventory_vehicle_compat (inventory_id, model_year_engine_id, source, confidence, make, model, year, engine, engine_code, created_at) VALUES (%s, NULL, 'qwen_ai', %s, %s, %s, %s, %s, %s, NOW()) """, (item_id, confidence, v.get('make'), v.get('model'), v.get('year'), v.get('engine'), v.get('engine_code'))) saved += 1 conn.commit() cur.close() conn.close() return saved def main(): parser = argparse.ArgumentParser(description='Batch QWEN vehicle compatibility processor') parser.add_argument('--tenant', type=int, required=True, help='Tenant ID') parser.add_argument('--workers', type=int, default=10, help='Number of parallel workers') parser.add_argument('--limit', type=int, default=None, help='Max items to process (None = all)') parser.add_argument('--checkpoint-file', default=DEFAULT_CHECKPOINT_FILE, help='Progress checkpoint file') args = parser.parse_args() checkpoint_file = args.checkpoint_file print(f"=== QWEN Batch Compatibility Processor ===", flush=True) print(f"Tenant: {args.tenant}", flush=True) print(f"Workers: {args.workers}", flush=True) print(f"Checkpoint: {args.checkpoint_file}", flush=True) print(flush=True) # Load checkpoint processed_ids = load_checkpoint(checkpoint_file) print(f"Previously processed: {len(processed_ids):,} items", flush=True) # Get pending items pending = get_pending_items(args.tenant, processed_ids, args.limit) print(f"Pending items: {len(pending):,}", flush=True) if not pending: print("Nothing to process!", flush=True) return # Process with thread pool total = len(pending) completed = 0 success_count = 0 fail_count = 0 total_vehicles = 0 start_time = time.time() with ThreadPoolExecutor(max_workers=args.workers) as executor: future_to_item = { executor.submit(process_single_item, item[0], item[1], item[2], item[3]): item for item in pending } batch_results = [] batch_size = 50 # Save to DB every N items for future in as_completed(future_to_item): item = future_to_item[future] try: result = future.result(timeout=180) batch_results.append(result) if result['success']: success_count += 1 total_vehicles += len(result.get('vehicles', [])) else: fail_count += 1 processed_ids.add(result['item_id']) except Exception as exc: fail_count += 1 processed_ids.add(item[0]) print(f"Worker exception for {item[1]}: {exc}") completed += 1 # Save checkpoint and DB batch periodically if len(batch_results) >= batch_size: save_checkpoint(checkpoint_file, list(processed_ids)) saved = save_results(args.tenant, batch_results) batch_results = [] elapsed = time.time() - start_time rate = completed / elapsed if elapsed > 0 else 0 eta = (total - completed) / rate if rate > 0 else 0 print(f" Progress: {completed:,}/{total:,} ({100*completed/total:.1f}%) | " f"Success: {success_count} | Fail: {fail_count} | " f"Vehicles: {total_vehicles} | " f"Rate: {rate:.1f} items/min | ETA: {eta/60:.0f} min", flush=True) # Final save if batch_results: save_results(args.tenant, batch_results) save_checkpoint(checkpoint_file, list(processed_ids)) elapsed = time.time() - start_time print(f"\n=== Complete ===", flush=True) print(f"Processed: {completed:,}", flush=True) print(f"Success: {success_count}", flush=True) print(f"Failed: {fail_count}", flush=True) print(f"Total vehicles found: {total_vehicles}", flush=True) print(f"Elapsed: {elapsed/3600:.1f} hours", flush=True) print(f"Avg rate: {completed/(elapsed/60):.1f} items/min", flush=True) if __name__ == '__main__': main()