"""Celery tasks for Nexus POS background jobs.""" import os import sys import time # Ensure pos/ is on path for imports when Celery worker runs standalone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from celery_app import celery import psycopg2 import redis as redis_lib MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') def _get_db(): return psycopg2.connect(MASTER_DB_URL) def _get_redis(): return redis_lib.from_url(REDIS_URL, decode_responses=True) @celery.task(bind=True, max_retries=3) def warm_vehicle_cache_task(self, batch_size=5000, ttl=3600): """Warm Redis cache for vehicle info from part_vehicle_preview.""" conn = _get_db() cur = conn.cursor() r = _get_redis() r.ping() cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part") all_ids = [row[0] for row in cur.fetchall()] total = len(all_ids) processed = 0 cached = 0 start = time.time() for i in range(0, total, batch_size): batch = all_ids[i:i + batch_size] cur.execute(""" SELECT part_id, name_brand, name_model, year_car FROM part_vehicle_preview WHERE part_id = ANY(%s) """, (batch,)) pipe = r.pipeline() batch_cached = 0 for row in cur.fetchall(): info = f"{row[1]} {row[2]} {row[3]}" pipe.setex(f'nexus:vehicle:{row[0]}', ttl, info) batch_cached += 1 pipe.execute() processed += len(batch) cached += batch_cached self.update_state( state='PROGRESS', meta={'current': processed, 'total': total, 'cached': cached} ) cur.close() conn.close() elapsed = time.time() - start return {'total': total, 'cached': cached, 'elapsed': int(elapsed)} @celery.task(bind=True, max_retries=2) def bulk_import_inventory_task(self, csv_path, tenant_id, branch_id=None): """Bulk import inventory from CSV in background.""" from services.inventory_engine import bulk_import_csv conn = _get_db() try: result = bulk_import_csv(conn, csv_path, tenant_id, branch_id) conn.commit() return result except Exception as exc: conn.rollback() raise self.retry(exc=exc) finally: conn.close() @celery.task(bind=True, max_retries=1) def generate_report_task(self, report_type, params, tenant_id): """Generate heavy reports asynchronously.""" # Placeholder: implement actual report generation per type self.update_state(state='PROGRESS', meta={'step': 'collecting_data'}) time.sleep(2) # Simulate work return { 'report_type': report_type, 'tenant_id': tenant_id, 'status': 'completed', 'url': f'/pos/static/reports/{report_type}_{tenant_id}.pdf', } @celery.task(bind=True, max_retries=2) def sync_vehicle_compatibility_task(self, tenant_id, item_id, part_number, name, brand, compat_source): """Fetch AI/TecDoc vehicle compatibility in background after item creation.""" from tenant_db import get_tenant_conn, get_master_conn from services.inventory_vehicle_compat import auto_match_vehicle_compatibility, save_qwen_fitment from services.qwen_fitment import get_vehicle_fitment tenant_conn = None master_conn = None try: tenant_conn = get_tenant_conn(tenant_id) if compat_source in ('tecdoc', 'both'): try: master_conn = get_master_conn() auto_match_vehicle_compatibility( master_conn, tenant_conn, item_id, part_number, brand=brand, name=name ) master_conn.close() master_conn = None except Exception as am_err: print(f"[sync_vehicle_compat] TecDoc error for item {item_id}: {am_err}") if compat_source in ('qwen', 'both'): try: fitment = get_vehicle_fitment(part_number, name, brand or '') save_qwen_fitment(tenant_conn, item_id, fitment) except Exception as qwen_err: print(f"[sync_vehicle_compat] QWEN error for item {item_id}: {qwen_err}") tenant_conn.commit() return {'status': 'ok', 'item_id': item_id, 'tenant_id': tenant_id} except Exception as exc: if tenant_conn: tenant_conn.rollback() raise self.retry(exc=exc, countdown=10) finally: if tenant_conn: tenant_conn.close() if master_conn: master_conn.close()