- Cleaned 137+ fake engine-displacement models from supplier imports (v3/v4 scripts: Chevrolet, Ford, Chrysler, Dodge, Jeep, Nissan, etc.) - Removed 1,251+ corrupted models (INT. prefixes, year-suffix, torque specs, empty names, trailing-year variants) - Migrated supplier tables to master DB (supplier_catalog, supplier_catalog_compat, supplier_catalog_interchange) - Fixed _get_mye_ids_with_parts() to query supplier_catalog_compat from master DB so supplier-only vehicles appear for all tenants - Added fuzzy model matcher with parenthesis stripping, noise suffix removal, compact matching, prefix/substring fallback, model aliases, and ±3 year proximity - Matched compat rows: KEEP GREEN +14,152, KNADIAN +3,021, VAZLO +127,500, LUK +477, RAYBESTOS +1,743 - Added KNADIAN catalog importer with year-range expansion and future-year filtering - Added VAZLO catalog importer with position parsing and SKU-in-model cleanup - Added Keep Green, LUK, Yokomitsu, Raybestos catalog importers - Cache clearing after cleanups (_classify_cache_*, nexus:mye_ids:*, nexus:brand_mye_counts:*) Final match rates: - KEEP GREEN: 90.3% - VAZLO: 93.6% - YOKOMITSU: 100.0% - KNADIAN: 57.4% - LUK: 51.0% - RAYBESTOS: 55.9%
305 lines
10 KiB
Python
305 lines
10 KiB
Python
"""Celery tasks for Nexus POS background jobs."""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
# Ensure pos/ is on path for imports when Celery worker runs standalone
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from celery_app import celery
|
|
import psycopg2
|
|
import redis as redis_lib
|
|
|
|
MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@/nexus_autoparts')
|
|
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
|
|
|
|
|
|
def _get_db():
|
|
return psycopg2.connect(MASTER_DB_URL)
|
|
|
|
|
|
def _get_redis():
|
|
return redis_lib.from_url(REDIS_URL, decode_responses=True)
|
|
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def warm_vehicle_cache_task(self, batch_size=5000, ttl=3600):
|
|
"""Warm Redis cache for vehicle info from part_vehicle_preview."""
|
|
conn = _get_db()
|
|
cur = conn.cursor()
|
|
r = _get_redis()
|
|
r.ping()
|
|
|
|
cur.execute("SELECT id_part FROM parts WHERE oem_part_number IS NOT NULL ORDER BY id_part")
|
|
all_ids = [row[0] for row in cur.fetchall()]
|
|
total = len(all_ids)
|
|
|
|
processed = 0
|
|
cached = 0
|
|
start = time.time()
|
|
|
|
for i in range(0, total, batch_size):
|
|
batch = all_ids[i:i + batch_size]
|
|
cur.execute("""
|
|
SELECT part_id, name_brand, name_model, year_car
|
|
FROM part_vehicle_preview
|
|
WHERE part_id = ANY(%s)
|
|
""", (batch,))
|
|
|
|
pipe = r.pipeline()
|
|
batch_cached = 0
|
|
for row in cur.fetchall():
|
|
info = f"{row[1]} {row[2]} {row[3]}"
|
|
pipe.setex(f'nexus:vehicle:{row[0]}', ttl, info)
|
|
batch_cached += 1
|
|
pipe.execute()
|
|
|
|
processed += len(batch)
|
|
cached += batch_cached
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={'current': processed, 'total': total, 'cached': cached}
|
|
)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
elapsed = time.time() - start
|
|
return {'total': total, 'cached': cached, 'elapsed': int(elapsed)}
|
|
|
|
|
|
@celery.task(bind=True, max_retries=2)
|
|
def bulk_import_inventory_task(self, csv_path, tenant_id, branch_id=None):
|
|
"""Bulk import inventory from CSV in background."""
|
|
from services.inventory_engine import bulk_import_csv
|
|
conn = _get_db()
|
|
try:
|
|
result = bulk_import_csv(conn, csv_path, tenant_id, branch_id)
|
|
conn.commit()
|
|
return result
|
|
except Exception as exc:
|
|
conn.rollback()
|
|
raise self.retry(exc=exc)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@celery.task(bind=True, max_retries=1)
|
|
def generate_report_task(self, report_type, params, tenant_id):
|
|
"""Generate heavy reports asynchronously."""
|
|
# Placeholder: implement actual report generation per type
|
|
self.update_state(state='PROGRESS', meta={'step': 'collecting_data'})
|
|
time.sleep(2) # Simulate work
|
|
return {
|
|
'report_type': report_type,
|
|
'tenant_id': tenant_id,
|
|
'status': 'completed',
|
|
'url': f'/pos/static/reports/{report_type}_{tenant_id}.pdf',
|
|
}
|
|
|
|
|
|
# ─── MercadoLibre Tasks ───────────────────────────────────────────────────
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def sync_meli_stock_price_task(self, tenant_id: int):
|
|
"""Sync stock and price for all active ML listings."""
|
|
from services import marketplace_external_service as meli_svc
|
|
from tenant_db import get_tenant_conn
|
|
|
|
conn = get_tenant_conn(tenant_id)
|
|
try:
|
|
cfg = meli_svc.get_meli_config(conn)
|
|
svc = meli_svc._get_meli_service(cfg)
|
|
if not svc:
|
|
return {'error': 'MercadoLibre not configured'}
|
|
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
SELECT id, inventory_id, external_item_id, publish_price
|
|
FROM marketplace_listings
|
|
WHERE is_active = true AND channel = 'mercadolibre'
|
|
"""
|
|
)
|
|
listings = cur.fetchall()
|
|
cur.close()
|
|
|
|
from services.inventory_engine import get_stock_bulk
|
|
stock_map = get_stock_bulk(conn, branch_id=None)
|
|
|
|
updated = 0
|
|
failed = 0
|
|
for row in listings:
|
|
listing_id, inv_id, ext_id, last_price = row
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT price_1 FROM inventory WHERE id = %s", (inv_id,))
|
|
price_row = cur.fetchone()
|
|
cur.close()
|
|
current_price = float(price_row[0]) if price_row and price_row[0] else 0
|
|
current_stock = stock_map.get(inv_id, 0)
|
|
|
|
try:
|
|
svc.update_item(
|
|
ext_id,
|
|
{"price": round(current_price, 2), "available_quantity": max(current_stock, 0)},
|
|
)
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
UPDATE marketplace_listings
|
|
SET last_sync_at = NOW(), sync_errors = NULL, publish_price = %s
|
|
WHERE id = %s
|
|
""",
|
|
(current_price, listing_id),
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
updated += 1
|
|
except Exception as e:
|
|
conn.rollback()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"UPDATE marketplace_listings SET sync_errors = %s WHERE id = %s",
|
|
(str(e)[:500], listing_id),
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
failed += 1
|
|
|
|
return {'updated': updated, 'failed': failed, 'total': len(listings)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def sync_meli_orders_task(self, tenant_id: int):
|
|
"""Fetch new orders from MercadoLibre."""
|
|
from services import marketplace_external_service as meli_svc
|
|
from tenant_db import get_tenant_conn
|
|
|
|
conn = get_tenant_conn(tenant_id)
|
|
try:
|
|
# Determine last check date from most recent order
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT MAX(created_at) FROM marketplace_orders WHERE channel = 'mercadolibre'"
|
|
)
|
|
row = cur.fetchone()
|
|
last_check = row[0]
|
|
cur.close()
|
|
|
|
date_from = None
|
|
if last_check:
|
|
date_from = last_check.strftime('%Y-%m-%dT%H:%M:%S.000-00:00')
|
|
|
|
result = meli_svc.fetch_and_save_orders(conn, date_from=date_from)
|
|
return result
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@celery.task(bind=True, max_retries=3)
|
|
def process_meli_webhook_task(self, tenant_id: int, topic: str, resource: str):
|
|
"""Process incoming MercadoLibre webhook asynchronously."""
|
|
from services import marketplace_external_service as meli_svc
|
|
from tenant_db import get_tenant_conn
|
|
|
|
conn = get_tenant_conn(tenant_id)
|
|
try:
|
|
if topic.startswith("orders"):
|
|
# Fetch full order and upsert
|
|
cfg = meli_svc.get_meli_config(conn)
|
|
svc = meli_svc._get_meli_service(cfg)
|
|
if svc and resource:
|
|
order_id = resource.split("/")[-1]
|
|
full = svc.get_order(order_id)
|
|
# Re-use fetch_and_save_orders by passing the order directly
|
|
# For simplicity, trigger a full sync for recent orders
|
|
return meli_svc.fetch_and_save_orders(conn)
|
|
|
|
if topic.startswith("questions"):
|
|
# Fetch single question and upsert locally
|
|
if resource:
|
|
question_id = resource.split("/")[-1]
|
|
try:
|
|
meli_svc.fetch_question_from_ml(conn, question_id)
|
|
return {'ok': True, 'topic': topic, 'question_id': question_id}
|
|
except Exception as qe:
|
|
return {'ok': False, 'topic': topic, 'error': str(qe)}
|
|
|
|
return {'ok': True, 'topic': topic}
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@celery.task(bind=True, max_retries=2)
|
|
def sync_vehicle_compatibility_task(self, tenant_id, item_id, part_number, name, brand, compat_source):
|
|
"""Fetch AI/TecDoc vehicle compatibility in background after item creation."""
|
|
from tenant_db import get_tenant_conn, get_master_conn
|
|
from services.inventory_vehicle_compat import auto_match_vehicle_compatibility, save_qwen_fitment
|
|
from services.qwen_fitment import get_vehicle_fitment
|
|
|
|
tenant_conn = None
|
|
master_conn = None
|
|
try:
|
|
tenant_conn = get_tenant_conn(tenant_id)
|
|
|
|
if compat_source in ('tecdoc', 'both'):
|
|
try:
|
|
master_conn = get_master_conn()
|
|
auto_match_vehicle_compatibility(
|
|
master_conn, tenant_conn, item_id, part_number,
|
|
brand=brand, name=name
|
|
)
|
|
master_conn.close()
|
|
master_conn = None
|
|
except Exception as am_err:
|
|
print(f"[sync_vehicle_compat] TecDoc error for item {item_id}: {am_err}")
|
|
|
|
if compat_source in ('qwen', 'both'):
|
|
try:
|
|
fitment = get_vehicle_fitment(part_number, name, brand or '')
|
|
save_qwen_fitment(tenant_conn, item_id, fitment)
|
|
except Exception as qwen_err:
|
|
print(f"[sync_vehicle_compat] QWEN error for item {item_id}: {qwen_err}")
|
|
|
|
tenant_conn.commit()
|
|
return {'status': 'ok', 'item_id': item_id, 'tenant_id': tenant_id}
|
|
except Exception as exc:
|
|
if tenant_conn:
|
|
tenant_conn.rollback()
|
|
raise self.retry(exc=exc, countdown=10)
|
|
finally:
|
|
if tenant_conn:
|
|
tenant_conn.close()
|
|
if master_conn:
|
|
master_conn.close()
|
|
|
|
|
|
@celery.task(bind=True)
|
|
def publish_meli_items_task(self, tenant_id: int, inventory_ids: list, category_id: str, listing_type: str = "gold_special", shipping_mode: str = "me2", custom_data: dict = None, base_url: str = None):
|
|
"""Publish inventory items to MercadoLibre asynchronously."""
|
|
from services import marketplace_external_service as meli_svc
|
|
from tenant_db import get_tenant_conn
|
|
|
|
conn = get_tenant_conn(tenant_id)
|
|
try:
|
|
result = meli_svc.publish_items(
|
|
conn,
|
|
inventory_ids=inventory_ids,
|
|
meli_category_id=category_id,
|
|
listing_type_id=listing_type,
|
|
shipping_mode=shipping_mode,
|
|
custom_data=custom_data or {},
|
|
base_url=base_url,
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
return {"success": [], "failed": [{"inventory_id": "batch", "error": str(e)}]}
|
|
finally:
|
|
conn.close()
|