"""Celery tasks for Nexus POS background jobs.""" import os import sys import time # Ensure pos/ is on path for imports when Celery worker runs standalone sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from celery_app import celery import psycopg2 import redis as redis_lib MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts') REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') def _get_db(): return psycopg2.connect(MASTER_DB_URL) def _get_redis(): return redis_lib.from_url(REDIS_URL, decode_responses=True) @celery.task(bind=True, max_retries=3) def warm_vehicle_cache_task(self, batch_size=5000, ttl=3600): """Warm Redis cache for vehicle info from part_vehicle_preview.""" conn = _get_db() cur = conn.cursor() r = _get_redis() r.ping() cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part") all_ids = [row[0] for row in cur.fetchall()] total = len(all_ids) processed = 0 cached = 0 start = time.time() for i in range(0, total, batch_size): batch = all_ids[i:i + batch_size] cur.execute(""" SELECT part_id, name_brand, name_model, year_car FROM part_vehicle_preview WHERE part_id = ANY(%s) """, (batch,)) pipe = r.pipeline() batch_cached = 0 for row in cur.fetchall(): info = f"{row[1]} {row[2]} {row[3]}" pipe.setex(f'nexus:vehicle:{row[0]}', ttl, info) batch_cached += 1 pipe.execute() processed += len(batch) cached += batch_cached self.update_state( state='PROGRESS', meta={'current': processed, 'total': total, 'cached': cached} ) cur.close() conn.close() elapsed = time.time() - start return {'total': total, 'cached': cached, 'elapsed': int(elapsed)} @celery.task(bind=True, max_retries=2) def bulk_import_inventory_task(self, csv_path, tenant_id, branch_id=None): """Bulk import inventory from CSV in background.""" from services.inventory_engine import bulk_import_csv conn = _get_db() try: result = bulk_import_csv(conn, csv_path, tenant_id, branch_id) conn.commit() return result except Exception as exc: conn.rollback() raise self.retry(exc=exc) finally: conn.close() @celery.task(bind=True, max_retries=1) def generate_report_task(self, report_type, params, tenant_id): """Generate heavy reports asynchronously.""" # Placeholder: implement actual report generation per type self.update_state(state='PROGRESS', meta={'step': 'collecting_data'}) time.sleep(2) # Simulate work return { 'report_type': report_type, 'tenant_id': tenant_id, 'status': 'completed', 'url': f'/pos/static/reports/{report_type}_{tenant_id}.pdf', } # ─── MercadoLibre Tasks ─────────────────────────────────────────────────── @celery.task(bind=True, max_retries=3) def sync_meli_stock_price_task(self, tenant_id: int): """Sync stock and price for all active ML listings.""" from services import marketplace_external_service as meli_svc from tenant_db import get_tenant_conn conn = get_tenant_conn(tenant_id) try: cfg = meli_svc.get_meli_config(conn) svc = meli_svc._get_meli_service(cfg) if not svc: return {'error': 'MercadoLibre not configured'} cur = conn.cursor() cur.execute( """ SELECT id, inventory_id, external_item_id, publish_price FROM marketplace_listings WHERE is_active = true AND channel = 'mercadolibre' """ ) listings = cur.fetchall() cur.close() from services.inventory_engine import get_stock_bulk stock_map = get_stock_bulk(conn, branch_id=None) updated = 0 failed = 0 for row in listings: listing_id, inv_id, ext_id, last_price = row cur = conn.cursor() cur.execute("SELECT price_1 FROM inventory WHERE id = %s", (inv_id,)) price_row = cur.fetchone() cur.close() current_price = float(price_row[0]) if price_row and price_row[0] else 0 current_stock = stock_map.get(inv_id, 0) try: svc.update_item( ext_id, {"price": round(current_price, 2), "available_quantity": max(current_stock, 0)}, ) cur = conn.cursor() cur.execute( """ UPDATE marketplace_listings SET last_sync_at = NOW(), sync_errors = NULL, publish_price = %s WHERE id = %s """, (current_price, listing_id), ) conn.commit() cur.close() updated += 1 except Exception as e: conn.rollback() cur = conn.cursor() cur.execute( "UPDATE marketplace_listings SET sync_errors = %s WHERE id = %s", (str(e)[:500], listing_id), ) conn.commit() cur.close() failed += 1 return {'updated': updated, 'failed': failed, 'total': len(listings)} finally: conn.close() @celery.task(bind=True, max_retries=3) def sync_meli_orders_task(self, tenant_id: int): """Fetch new orders from MercadoLibre.""" from services import marketplace_external_service as meli_svc from tenant_db import get_tenant_conn conn = get_tenant_conn(tenant_id) try: # Determine last check date from most recent order cur = conn.cursor() cur.execute( "SELECT MAX(created_at) FROM marketplace_orders WHERE channel = 'mercadolibre'" ) row = cur.fetchone() last_check = row[0] cur.close() date_from = None if last_check: date_from = last_check.strftime('%Y-%m-%dT%H:%M:%S.000-00:00') result = meli_svc.fetch_and_save_orders(conn, date_from=date_from) return result except Exception as e: return {'error': str(e)} finally: conn.close() @celery.task(bind=True, max_retries=3) def process_meli_webhook_task(self, tenant_id: int, topic: str, resource: str): """Process incoming MercadoLibre webhook asynchronously.""" from services import marketplace_external_service as meli_svc from tenant_db import get_tenant_conn conn = get_tenant_conn(tenant_id) try: if topic.startswith("orders"): # Fetch full order and upsert cfg = meli_svc.get_meli_config(conn) svc = meli_svc._get_meli_service(cfg) if svc and resource: order_id = resource.split("/")[-1] full = svc.get_order(order_id) # Re-use fetch_and_save_orders by passing the order directly # For simplicity, trigger a full sync for recent orders return meli_svc.fetch_and_save_orders(conn) return {'ok': True, 'topic': topic} except Exception as e: return {'error': str(e)} finally: conn.close() @celery.task(bind=True, max_retries=2) def sync_vehicle_compatibility_task(self, tenant_id, item_id, part_number, name, brand, compat_source): """Fetch AI/TecDoc vehicle compatibility in background after item creation.""" from tenant_db import get_tenant_conn, get_master_conn from services.inventory_vehicle_compat import auto_match_vehicle_compatibility, save_qwen_fitment from services.qwen_fitment import get_vehicle_fitment tenant_conn = None master_conn = None try: tenant_conn = get_tenant_conn(tenant_id) if compat_source in ('tecdoc', 'both'): try: master_conn = get_master_conn() auto_match_vehicle_compatibility( master_conn, tenant_conn, item_id, part_number, brand=brand, name=name ) master_conn.close() master_conn = None except Exception as am_err: print(f"[sync_vehicle_compat] TecDoc error for item {item_id}: {am_err}") if compat_source in ('qwen', 'both'): try: fitment = get_vehicle_fitment(part_number, name, brand or '') save_qwen_fitment(tenant_conn, item_id, fitment) except Exception as qwen_err: print(f"[sync_vehicle_compat] QWEN error for item {item_id}: {qwen_err}") tenant_conn.commit() return {'status': 'ok', 'item_id': item_id, 'tenant_id': tenant_id} except Exception as exc: if tenant_conn: tenant_conn.rollback() raise self.retry(exc=exc, countdown=10) finally: if tenant_conn: tenant_conn.close() if master_conn: master_conn.close() @celery.task(bind=True) def publish_meli_items_task(self, tenant_id: int, inventory_ids: list, category_id: str, listing_type: str = "gold_special", shipping_mode: str = "me2", custom_data: dict = None): """Publish inventory items to MercadoLibre asynchronously.""" from services import marketplace_external_service as meli_svc from tenant_db import get_tenant_conn conn = get_tenant_conn(tenant_id) try: result = meli_svc.publish_items( conn, inventory_ids=inventory_ids, meli_category_id=category_id, listing_type_id=listing_type, shipping_mode=shipping_mode, custom_data=custom_data or {}, ) return result except Exception as e: return {"success": [], "failed": [{"inventory_id": "batch", "error": str(e)}]} finally: conn.close()