"""Celery tasks for Nexus POS background jobs.""" import os import sys import time # Ensure pos/ is on path for imports when Celery worker runs standalone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from celery_app import celery import psycopg2 import redis as redis_lib MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') def _get_db(): return psycopg2.connect(MASTER_DB_URL) def _get_redis(): return redis_lib.from_url(REDIS_URL, decode_responses=True) @celery.task(bind=True, max_retries=3) def warm_vehicle_cache_task(self, batch_size=5000, ttl=3600): """Warm Redis cache for vehicle info from part_vehicle_preview.""" conn = _get_db() cur = conn.cursor() r = _get_redis() r.ping() cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part") all_ids = [row[0] for row in cur.fetchall()] total = len(all_ids) processed = 0 cached = 0 start = time.time() for i in range(0, total, batch_size): batch = all_ids[i:i + batch_size] cur.execute(""" SELECT part_id, name_brand, name_model, year_car FROM part_vehicle_preview WHERE part_id = ANY(%s) """, (batch,)) pipe = r.pipeline() batch_cached = 0 for row in cur.fetchall(): info = f"{row[1]} {row[2]} {row[3]}" pipe.setex(f'nexus:vehicle:{row[0]}', ttl, info) batch_cached += 1 pipe.execute() processed += len(batch) cached += batch_cached self.update_state( state='PROGRESS', meta={'current': processed, 'total': total, 'cached': cached} ) cur.close() conn.close() elapsed = time.time() - start return {'total': total, 'cached': cached, 'elapsed': int(elapsed)} @celery.task(bind=True, max_retries=2) def bulk_import_inventory_task(self, csv_path, tenant_id, branch_id=None): """Bulk import inventory from CSV in background.""" from services.inventory_engine import bulk_import_csv conn = _get_db() try: result = bulk_import_csv(conn, csv_path, tenant_id, branch_id) conn.commit() return result except Exception as exc: conn.rollback() raise self.retry(exc=exc) finally: conn.close() @celery.task(bind=True, max_retries=1) def generate_report_task(self, report_type, params, tenant_id): """Generate heavy reports asynchronously.""" # Placeholder: implement actual report generation per type self.update_state(state='PROGRESS', meta={'step': 'collecting_data'}) time.sleep(2) # Simulate work return { 'report_type': report_type, 'tenant_id': tenant_id, 'status': 'completed', 'url': f'/pos/static/reports/{report_type}_{tenant_id}.pdf', }