"""Business logic for MercadoLibre external marketplace integration. Depends on: - meli_service.py (HTTP client) - pos_engine.py (sale creation) - inventory_engine.py (stock queries) - image_service.py (image URLs) """ import json import logging import urllib.parse from typing import Optional from decimal import Decimal from services.meli_service import MeliService, MeliError, MeliAuthError from services.pos_engine import process_sale, calculate_totals from services.inventory_engine import get_stock, get_stock_bulk logger = logging.getLogger(__name__) def _resolve_image_urls(images: list[str], base_url: str | None = None) -> list[str]: """Convert relative image paths to absolute URLs.""" resolved = [] for url in images: if not url: continue if base_url and url.startswith("/"): resolved.append(urllib.parse.urljoin(base_url.rstrip("/") + "/", url.lstrip("/"))) else: resolved.append(url) return resolved # ═══════════════════════════════════════════════════════════════════════════ # CONFIG HELPERS # ═══════════════════════════════════════════════════════════════════════════ MELI_CONFIG_KEYS = [ "meli_access_token", "meli_refresh_token", "meli_user_id", "meli_site_id", "meli_enabled", "meli_auto_publish", "meli_sync_interval_min", "meli_order_sync_interval_min", "meli_default_category_id", "meli_shipping_mode", "meli_client_id", "meli_client_secret", ] def _get_config_value(cur, key: str, default=None): cur.execute("SELECT value FROM tenant_config WHERE key = %s", (key,)) row = cur.fetchone() return row[0] if row else default def get_meli_config(tenant_conn) -> dict: """Read ML config from tenant_config.""" cur = tenant_conn.cursor() cfg = {} for key in MELI_CONFIG_KEYS: cfg[key] = _get_config_value(cur, key) cur.close() # Normalize booleans cfg["meli_enabled"] = (cfg.get("meli_enabled") or "").lower() == "true" cfg["meli_auto_publish"] = (cfg.get("meli_auto_publish") or "").lower() == "true" cfg["meli_sync_interval_min"] = int(cfg.get("meli_sync_interval_min") or 15) cfg["meli_order_sync_interval_min"] = int(cfg.get("meli_order_sync_interval_min") or 5) return cfg def save_meli_config(tenant_conn, updates: dict) -> None: """Upsert ML config keys into tenant_config.""" cur = tenant_conn.cursor() for key, value in updates.items(): if value is None: continue cur.execute( """ INSERT INTO tenant_config (key, value, updated_at) VALUES (%s, %s, NOW()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW() """, (key, str(value)), ) tenant_conn.commit() cur.close() def delete_meli_config(tenant_conn) -> None: """Remove all ML config keys.""" cur = tenant_conn.cursor() cur.execute( "DELETE FROM tenant_config WHERE key LIKE 'meli_%'" ) tenant_conn.commit() cur.close() def _get_meli_service(cfg: dict) -> Optional[MeliService]: """Build MeliService from config dict.""" token = cfg.get("meli_access_token") if not token: return None return MeliService( access_token=token, refresh_token=cfg.get("meli_refresh_token"), client_id=cfg.get("meli_client_id"), client_secret=cfg.get("meli_client_secret"), ) # ═══════════════════════════════════════════════════════════════════════════ # ITEM PAYLOAD BUILDER # ═══════════════════════════════════════════════════════════════════════════ def _extract_meli_error(err: MeliError) -> str: """Extract the most useful error message from a MeliError response.""" base = str(err) body = err.response_body if not body: return base try: data = json.loads(body) msg = data.get("message") or data.get("error") causes = data.get("cause", []) if causes and isinstance(causes, list): cause_msgs = [c.get("message") for c in causes if c.get("message")] if cause_msgs: msg = (msg + " | " if msg else "") + "; ".join(cause_msgs) if msg: # Translate common account-configuration errors to actionable messages lowered = msg.lower() if "me2 adoption is mandatory" in lowered: return msg + " | Debes activar MercadoEnvíos (ME2) en tu cuenta de MercadoLibre. Ve a Configuración > Envíos en el panel de vendedor de ML." if "user has not mode" in lowered: return msg + " | Tu cuenta no tiene configurado este modo de envío. Ve a Configuración > Envíos en MercadoLibre y completa la configuración de MercadoEnvíos." if "mandatory free shipping" in lowered: return msg + " | MercadoLibre está forzando envío gratis en tu cuenta. Esto es normal si estás en el programa de envíos gratuito. El artículo se publicará con envío gratis." return msg except Exception: pass return base def build_item_payload( inventory_row: dict, images: list[str], meli_category_id: str, price: float, stock: int, shipping_mode: str = "me2", listing_type_id: str = "gold_special", custom_title: str = None, extra_attributes: list = None, shipping_cost: float = None, base_url: str = None, ) -> dict: """Convert a Nexus inventory row into a MercadoLibre item payload.""" title = custom_title or f"{inventory_row['name']} {inventory_row['brand'] or ''} {inventory_row['part_number'] or ''}".strip() # ML title limit is 60 chars; truncate smartly if len(title) > 60: title = title[:57] + "..." shipping_payload = {"mode": shipping_mode} if shipping_mode == "custom" and shipping_cost is not None: shipping_payload["local_pick_up"] = False shipping_payload["free_shipping"] = False shipping_payload["methods"] = [] shipping_payload["costs"] = [{"description": "Envio", "cost": str(shipping_cost)}] payload = { "title": title, "category_id": meli_category_id, "price": round(float(price), 2), "currency_id": "MXN", "available_quantity": max(int(stock), 0), "buying_mode": "buy_it_now", "listing_type_id": listing_type_id, "condition": "new", "pictures": [{"source": url} for url in _resolve_image_urls(images, base_url) if url], "shipping": shipping_payload, "attributes": [], } # Collect extra attribute IDs to avoid duplicates extra_attr_ids = {a.get("id") for a in (extra_attributes or []) if a.get("id")} if inventory_row.get("brand") and "BRAND" not in extra_attr_ids: payload["attributes"].append( {"id": "BRAND", "value_name": inventory_row["brand"]} ) if inventory_row.get("part_number") and "PART_NUMBER" not in extra_attr_ids: payload["attributes"].append( {"id": "PART_NUMBER", "value_name": inventory_row["part_number"]} ) # Vehicle compatibility as attributes (if available) vehicle_compat = inventory_row.get("vehicle_compatibility") if vehicle_compat: if isinstance(vehicle_compat, str): try: vehicle_compat = json.loads(vehicle_compat) except Exception: vehicle_compat = None if isinstance(vehicle_compat, list) and vehicle_compat: first = vehicle_compat[0] if isinstance(first, dict): if first.get("brand") and "VEHICLE_MODEL" not in extra_attr_ids: payload["attributes"].append( {"id": "VEHICLE_MODEL", "value_name": first["brand"]} ) if first.get("model") and "VEHICLE_MODEL_NAME" not in extra_attr_ids: payload["attributes"].append( {"id": "VEHICLE_MODEL_NAME", "value_name": first["model"]} ) # Extra attributes from user input (category requirements) if extra_attributes: for attr in extra_attributes: if attr.get("id") and attr.get("value_name"): payload["attributes"].append(attr) return payload def _upload_images_to_meli(svc: MeliService, image_urls: list[str]) -> list[str]: """Upload images to MercadoLibre and return their secure URLs. If an image is already a ML URL, it's passed through unchanged. """ results = [] for url in image_urls: if not url: continue # Already a ML hosted image? if "mercadolibre" in url or "mercadoli.com" in url: results.append(url) continue try: pic = svc.upload_image(url) secure_url = pic.get("secure_url") or pic.get("url") or url results.append(secure_url) except Exception as e: logger.warning("Failed to upload image %s to ML: %s", url, e) # Fallback: try using the original URL anyway results.append(url) return results def _calculate_meli_net_price(svc: MeliService, site_id: str, price: float, listing_type_id: str, category_id: str) -> dict: """Calculate net amount after ML commissions. Returns dict with fee_amount, net_amount, fee_pct. Falls back to approximate percentages if the API call fails. """ try: fee_info = svc.get_listing_price(site_id, price, listing_type_id, category_id) # Response structure varies; try common keys sale_fee = fee_info.get("sale_fee_amount") or fee_info.get("fee_amount") or 0 net_amount = fee_info.get("net_amount") or fee_info.get("net_receive_amount") if net_amount is None: net_amount = price - sale_fee return { "fee_amount": float(sale_fee), "net_amount": float(net_amount), "fee_pct": round((sale_fee / price) * 100, 2) if price > 0 else 0, "source": "ml_api", } except Exception as e: logger.warning("Failed to fetch ML listing price for fee calc: %s", e) # Fallback approximations for MLM approx_fees = { "gold_special": 0.13, "gold_pro": 0.16, "gold_premium": 0.18, "free": 0.0, } fee_rate = approx_fees.get(listing_type_id, 0.13) fee_amount = round(price * fee_rate, 2) return { "fee_amount": fee_amount, "net_amount": round(price - fee_amount, 2), "fee_pct": round(fee_rate * 100, 2), "source": "approximation", } def check_meli_shipping_config(svc: MeliService, cfg: dict) -> dict: """Check if the user's ML account has the required shipping modes configured. Returns {"ok": True} or {"ok": False, "error": "...", "available_modes": [...]}. """ user_id = cfg.get("meli_user_id") if not user_id: return {"ok": False, "error": "Usuario de ML no configurado"} try: prefs = svc.get_shipping_preferences(str(user_id)) modes = prefs.get("modes", []) mandatory = prefs.get("mandatory_mode_for_user", []) if mandatory and not any(m in modes for m in mandatory): return { "ok": False, "error": f"Tu cuenta requiere obligatoriamente los modos de envío: {', '.join(mandatory)}. Actualmente solo tienes: {', '.join(modes)}. Configúralos en el panel de vendedor de MercadoLibre.", "available_modes": modes, "mandatory_modes": mandatory, } return {"ok": True, "available_modes": modes, "mandatory_modes": mandatory} except MeliError as e: logger.warning("Failed to fetch shipping preferences: %s", e) return {"ok": True} # Don't block on preference fetch failure except Exception as e: logger.warning("Failed to fetch shipping preferences: %s", e) return {"ok": True} # ═══════════════════════════════════════════════════════════════════════════ # LISTINGS CRUD # ═══════════════════════════════════════════════════════════════════════════ def check_inventory_ml_status(tenant_conn, inventory_ids: list[int], base_url: str = None) -> dict: """Check local pre-flight status for ML publishing. Returns per-item dict with checks: has_image, has_stock, has_price, already_published, and the generated title. """ cur = tenant_conn.cursor() cur.execute( """ SELECT id, part_number, name, brand, price_1, image_url FROM inventory WHERE id = ANY(%s) """, (inventory_ids,), ) rows = {r[0]: r for r in cur.fetchall()} stock_map = get_stock_bulk(tenant_conn, branch_id=None) # Check existing active listings cur.execute( """ SELECT inventory_id, external_item_id, external_status, external_permalink FROM marketplace_listings WHERE inventory_id = ANY(%s) AND channel = 'mercadolibre' AND is_active = true """, (inventory_ids,), ) listings = {r[0]: {"external_item_id": r[1], "status": r[2], "permalink": r[3]} for r in cur.fetchall()} cur.close() results = [] for inv_id in inventory_ids: row = rows.get(inv_id) if not row: results.append({"inventory_id": inv_id, "exists": False}) continue price = float(row[4]) if row[4] else 0 stock = stock_map.get(inv_id, 0) image_url = row[5] title = f"{row[2]} {row[3] or ''} {row[1] or ''}".strip() if len(title) > 60: title = title[:57] + "..." existing = listings.get(inv_id) results.append({ "inventory_id": inv_id, "exists": True, "title": title, "has_image": bool(image_url), "has_stock": stock > 0, "has_price": price > 0, "price": price, "stock": stock, "image_url": _resolve_image_urls([image_url], base_url)[0] if image_url else None, "already_published": existing is not None, "existing_listing": existing, }) return {"items": results} def validate_items( tenant_conn, inventory_ids: list[int], meli_category_id: str, listing_type_id: str = "gold_special", shipping_mode: str = "me2", custom_data: dict = None, base_url: str = None, ) -> dict: """Validate items against ML /items/validate without creating them. Returns per-item validation results with ML error details if any. """ cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") shipping_check = check_meli_shipping_config(svc, cfg) if not shipping_check["ok"]: return {"valid": [], "invalid": [{"inventory_id": "config", "error": shipping_check["error"]}]} cur = tenant_conn.cursor() cur.execute( """ SELECT id, part_number, name, brand, price_1, vehicle_compatibility, image_url, unit, is_active FROM inventory WHERE id = ANY(%s) AND is_active = true """, (inventory_ids,), ) rows = {r[0]: r for r in cur.fetchall()} stock_map = get_stock_bulk(tenant_conn, branch_id=None) custom_data = custom_data or {} results = {"valid": [], "invalid": []} for inv_id in inventory_ids: row = rows.get(inv_id) if not row: results["invalid"].append({"inventory_id": inv_id, "error": "No encontrado o inactivo"}) continue inv = { "id": row[0], "part_number": row[1], "name": row[2], "brand": row[3], "price_1": float(row[4]) if row[4] else 0, "vehicle_compatibility": row[5], "image_url": row[6], "unit": row[7], } stock = stock_map.get(inv_id, 0) if stock <= 0: results["invalid"].append({"inventory_id": inv_id, "error": "Sin stock disponible"}) continue if inv["price_1"] <= 0: results["invalid"].append({"inventory_id": inv_id, "error": "El precio debe ser mayor a 0"}) continue images = [] if inv.get("image_url"): images.append(inv["image_url"]) images = _resolve_image_urls(images, base_url) if not images: results["invalid"].append({"inventory_id": inv_id, "error": "El producto no tiene imagen"}) continue # Upload images to ML hosting so they are publicly accessible try: ml_images = _upload_images_to_meli(svc, images) except Exception as e: results["invalid"].append({"inventory_id": inv_id, "error": f"Error subiendo imagen a ML: {e}"}) continue if not ml_images: results["invalid"].append({"inventory_id": inv_id, "error": "No se pudo subir la imagen a MercadoLibre"}) continue title = (custom_data.get("titles") or {}).get(str(inv_id)) extra_attrs = (custom_data.get("attributes") or {}).get(str(inv_id)) price = (custom_data.get("prices") or {}).get(str(inv_id), inv["price_1"]) item_stock = (custom_data.get("stocks") or {}).get(str(inv_id), stock) shipping_cost = custom_data.get("shipping_cost") payload = build_item_payload( inv, ml_images, meli_category_id, price, item_stock, shipping_mode=shipping_mode, listing_type_id=listing_type_id, custom_title=title, extra_attributes=extra_attrs, shipping_cost=shipping_cost, base_url=base_url, ) try: validation = svc.validate_item(payload) if validation.get("status") == "valid": # Calculate estimated net earnings after ML fees site_id = cfg.get("meli_site_id", "MLM") fee_calc = _calculate_meli_net_price(svc, site_id, float(price), listing_type_id, meli_category_id) results["valid"].append({ "inventory_id": inv_id, "validation": validation, "price": float(price), "fee_amount": fee_calc["fee_amount"], "net_amount": fee_calc["net_amount"], "fee_pct": fee_calc["fee_pct"], "fee_source": fee_calc["source"], }) else: errors = validation.get("validation_errors", []) error_msgs = [f"{e.get('field', '')}: {e.get('message', '')}" for e in errors] results["invalid"].append({"inventory_id": inv_id, "error": " | ".join(error_msgs) or "Validación fallida", "validation": validation}) except MeliError as e: err_msg = _extract_meli_error(e) results["invalid"].append({"inventory_id": inv_id, "error": err_msg}) except Exception as e: results["invalid"].append({"inventory_id": inv_id, "error": str(e)}) cur.close() return results def publish_items( tenant_conn, inventory_ids: list[int], meli_category_id: str, listing_type_id: str = "gold_special", shipping_mode: str = "me2", custom_data: dict = None, base_url: str = None, ) -> dict: """Publish one or more inventory items to MercadoLibre. Returns summary dict with success/failure per item. """ cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") shipping_check = check_meli_shipping_config(svc, cfg) if not shipping_check["ok"]: return {"success": [], "failed": [{"inventory_id": "config", "error": shipping_check["error"]}]} cur = tenant_conn.cursor() # Batch fetch inventory rows cur.execute( """ SELECT id, part_number, name, brand, price_1, vehicle_compatibility, image_url, unit, is_active FROM inventory WHERE id = ANY(%s) AND is_active = true """, (inventory_ids,), ) rows = {r[0]: r for r in cur.fetchall()} # Batch fetch stock stock_map = get_stock_bulk(tenant_conn, branch_id=None) custom_data = custom_data or {} results = {"success": [], "failed": []} for inv_id in inventory_ids: row = rows.get(inv_id) if not row: results["failed"].append({"inventory_id": inv_id, "error": "Not found or inactive"}) continue inv = { "id": row[0], "part_number": row[1], "name": row[2], "brand": row[3], "price_1": float(row[4]) if row[4] else 0, "vehicle_compatibility": row[5], "image_url": row[6], "unit": row[7], } stock = stock_map.get(inv_id, 0) if stock <= 0: results["failed"].append({"inventory_id": inv_id, "error": "Sin stock disponible"}) continue if inv["price_1"] <= 0: results["failed"].append({"inventory_id": inv_id, "error": "El precio debe ser mayor a 0"}) continue # Build image list images = [] if inv.get("image_url"): images.append(inv["image_url"]) images = _resolve_image_urls(images, base_url) if not images: results["failed"].append({"inventory_id": inv_id, "error": "El producto no tiene imagen. ML requiere imagen para publicar."}) continue # Upload images to ML hosting so they are publicly accessible try: ml_images = _upload_images_to_meli(svc, images) except Exception as e: results["failed"].append({"inventory_id": inv_id, "error": f"Error subiendo imagen a ML: {e}"}) continue if not ml_images: results["failed"].append({"inventory_id": inv_id, "error": "No se pudo subir la imagen a MercadoLibre"}) continue title = (custom_data.get("titles") or {}).get(str(inv_id)) extra_attrs = (custom_data.get("attributes") or {}).get(str(inv_id)) price = (custom_data.get("prices") or {}).get(str(inv_id), inv["price_1"]) item_stock = (custom_data.get("stocks") or {}).get(str(inv_id), stock) shipping_cost = custom_data.get("shipping_cost") payload = build_item_payload( inv, ml_images, meli_category_id, price, item_stock, shipping_mode=shipping_mode, listing_type_id=listing_type_id, custom_title=title, extra_attributes=extra_attrs, shipping_cost=shipping_cost, base_url=base_url, ) try: ml_item = svc.create_item(payload) external_item_id = ml_item.get("id") permalink = ml_item.get("permalink") # Persist listing cur.execute( """ INSERT INTO marketplace_listings (inventory_id, channel, external_item_id, external_status, external_permalink, title, meli_category_id, publish_price, last_sync_at, sync_errors, is_active) VALUES (%s, 'mercadolibre', %s, 'active', %s, %s, %s, %s, NOW(), NULL, true) ON CONFLICT (inventory_id, channel) WHERE is_active = true DO UPDATE SET external_item_id = EXCLUDED.external_item_id, external_status = EXCLUDED.external_status, external_permalink = EXCLUDED.external_permalink, title = EXCLUDED.title, meli_category_id = EXCLUDED.meli_category_id, publish_price = EXCLUDED.publish_price, last_sync_at = NOW(), sync_errors = NULL, is_active = true """, ( inv_id, external_item_id, permalink, payload["title"], meli_category_id, inv["price_1"], ), ) tenant_conn.commit() results["success"].append( {"inventory_id": inv_id, "external_item_id": external_item_id, "permalink": permalink} ) except MeliError as e: tenant_conn.rollback() err_msg = _extract_meli_error(e) logger.warning("ML publish failed for inventory_id=%s: %s | payload=%s | response=%s", inv_id, err_msg, json.dumps(payload), e.response_body) results["failed"].append({"inventory_id": inv_id, "error": err_msg}) except Exception as e: tenant_conn.rollback() logger.exception("Unexpected error publishing inventory_id=%s", inv_id) results["failed"].append({"inventory_id": inv_id, "error": str(e)}) cur.close() return results def get_listings(tenant_conn, page: int = 1, per_page: int = 50, status: str = None): cur = tenant_conn.cursor() where = ["1=1"] params = [] if status: where.append("external_status = %s") params.append(status) count_sql = f"SELECT COUNT(*) FROM marketplace_listings WHERE {' AND '.join(where)}" cur.execute(count_sql, params) total = cur.fetchone()[0] sql = f""" SELECT l.id, l.inventory_id, l.external_item_id, l.external_status, l.external_permalink, l.title, l.meli_category_id, l.publish_price, l.last_sync_at, l.sync_errors, l.is_active, l.created_at, i.part_number, i.name, i.price_1, i.brand FROM marketplace_listings l LEFT JOIN inventory i ON i.id = l.inventory_id WHERE {' AND '.join(where)} ORDER BY l.created_at DESC LIMIT %s OFFSET %s """ cur.execute(sql, params + [per_page, (page - 1) * per_page]) rows = cur.fetchall() cur.close() items = [] for r in rows: items.append({ "id": r[0], "inventory_id": r[1], "external_item_id": r[2], "external_status": r[3], "external_permalink": r[4], "title": r[5], "meli_category_id": r[6], "publish_price": float(r[7]) if r[7] else None, "last_sync_at": str(r[8]) if r[8] else None, "sync_errors": r[9], "is_active": r[10], "created_at": str(r[11]), "part_number": r[12], "inventory_name": r[13], "current_price": float(r[14]) if r[14] else None, "brand": r[15], }) return {"items": items, "total": total, "page": page, "per_page": per_page} def sync_listing(tenant_conn, listing_id: int) -> dict: """Force a manual sync of stock/price for a single listing.""" cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( """ SELECT id, inventory_id, external_item_id, meli_category_id FROM marketplace_listings WHERE id = %s AND is_active = true """, (listing_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Listing not found") listing = { "id": row[0], "inventory_id": row[1], "external_item_id": row[2], "meli_category_id": row[3], } # Get current stock/price cur.execute( "SELECT price_1 FROM inventory WHERE id = %s", (listing["inventory_id"],), ) price_row = cur.fetchone() current_price = float(price_row[0]) if price_row and price_row[0] else 0 stock_map = get_stock_bulk(tenant_conn, branch_id=None) current_stock = stock_map.get(listing["inventory_id"], 0) try: svc.update_item( listing["external_item_id"], {"price": round(current_price, 2), "available_quantity": max(current_stock, 0)}, ) cur.execute( """ UPDATE marketplace_listings SET last_sync_at = NOW(), sync_errors = NULL, publish_price = %s WHERE id = %s """, (current_price, listing_id), ) tenant_conn.commit() cur.close() return {"ok": True, "price": current_price, "stock": current_stock} except MeliError as e: tenant_conn.rollback() cur.execute( "UPDATE marketplace_listings SET sync_errors = %s WHERE id = %s", (str(e)[:500], listing_id), ) tenant_conn.commit() cur.close() raise def pause_listing(tenant_conn, listing_id: int) -> dict: cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( "SELECT external_item_id FROM marketplace_listings WHERE id = %s", (listing_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Listing not found") svc.pause_item(row[0]) cur.execute( "UPDATE marketplace_listings SET external_status = 'paused' WHERE id = %s", (listing_id,), ) tenant_conn.commit() cur.close() return {"ok": True, "status": "paused"} def activate_listing(tenant_conn, listing_id: int) -> dict: cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( "SELECT external_item_id FROM marketplace_listings WHERE id = %s", (listing_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Listing not found") svc.activate_item(row[0]) cur.execute( "UPDATE marketplace_listings SET external_status = 'active' WHERE id = %s", (listing_id,), ) tenant_conn.commit() cur.close() return {"ok": True, "status": "active"} def close_listing(tenant_conn, listing_id: int) -> dict: cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( "SELECT external_item_id FROM marketplace_listings WHERE id = %s", (listing_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Listing not found") svc.close_item(row[0]) cur.execute( "UPDATE marketplace_listings SET external_status = 'closed', is_active = false WHERE id = %s", (listing_id,), ) tenant_conn.commit() cur.close() return {"ok": True, "status": "closed"} def delete_listing_permanently(tenant_conn, listing_id: int) -> dict: """Hard-delete a closed listing from the local DB. Sets listing_id = NULL on marketplace_order_items to avoid FK errors, then deletes the marketplace_listings row. """ cur = tenant_conn.cursor() cur.execute( "SELECT id, external_status FROM marketplace_listings WHERE id = %s", (listing_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Listing not found") # Clear FK references so we can delete safely cur.execute( "UPDATE marketplace_order_items SET listing_id = NULL WHERE listing_id = %s", (listing_id,), ) cur.execute( "DELETE FROM marketplace_listings WHERE id = %s", (listing_id,), ) tenant_conn.commit() cur.close() return {"ok": True, "deleted": True} # ═══════════════════════════════════════════════════════════════════════════ # QUESTIONS & ANSWERS # ═══════════════════════════════════════════════════════════════════════════ def _upsert_question(cur, q_data: dict, listing_id_map: dict): """Upsert a single question from ML API into marketplace_questions.""" external_qid = str(q_data.get("id")) external_item_id = str(q_data.get("item_id")) text = q_data.get("text", "") status = q_data.get("status", "unanswered") answer = q_data.get("answer", {}) answer_text = answer.get("text") if answer else None answer_date = None if answer and answer.get("date_created"): answer_date = answer["date_created"] from_user = q_data.get("from", {}) buyer_id = str(from_user.get("id")) if from_user else None buyer_nickname = from_user.get("nickname") if from_user else None question_date = q_data.get("date_created") listing_id = listing_id_map.get(external_item_id) cur.execute( """ INSERT INTO marketplace_questions (listing_id, external_question_id, external_item_id, question_text, answer_text, status, buyer_id, buyer_nickname, question_date, answer_date, raw_json, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (external_question_id) DO UPDATE SET question_text = EXCLUDED.question_text, answer_text = EXCLUDED.answer_text, status = EXCLUDED.status, buyer_id = EXCLUDED.buyer_id, buyer_nickname = EXCLUDED.buyer_nickname, question_date = EXCLUDED.question_date, answer_date = EXCLUDED.answer_date, raw_json = EXCLUDED.raw_json, updated_at = NOW() """, ( listing_id, external_qid, external_item_id, text, answer_text, status, buyer_id, buyer_nickname, question_date, answer_date, json.dumps(q_data), ), ) def sync_questions(tenant_conn) -> dict: """Fetch questions from ML for all active listings and upsert locally. Returns {"synced": N, "items": [...]}. """ cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( """ SELECT id, external_item_id FROM marketplace_listings WHERE channel = 'mercadolibre' AND is_active = true """ ) listings = {r[1]: r[0] for r in cur.fetchall()} cur.close() if not listings: return {"synced": 0, "items": []} total_synced = 0 for item_id in listings: try: resp = svc.get_questions(item_id, limit=50) questions = resp.get("questions", []) if not questions: continue cur = tenant_conn.cursor() for q in questions: _upsert_question(cur, q, listings) tenant_conn.commit() cur.close() total_synced += len(questions) except Exception as e: logger.warning("Failed to sync questions for item %s: %s", item_id, e) return {"synced": total_synced} def fetch_question_from_ml(tenant_conn, external_question_id: str) -> dict: """Fetch a single question from ML API and upsert locally.""" cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") q_data = svc.get_question(external_question_id) cur = tenant_conn.cursor() cur.execute( "SELECT id, external_item_id FROM marketplace_listings WHERE external_item_id = %s", (str(q_data.get("item_id")),), ) row = cur.fetchone() listing_id_map = {row[1]: row[0]} if row else {} _upsert_question(cur, q_data, listing_id_map) tenant_conn.commit() cur.close() return q_data def answer_question(tenant_conn, local_question_id: int, text: str) -> dict: """Answer a question via ML API and update local status.""" cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute( "SELECT external_question_id FROM marketplace_questions WHERE id = %s", (local_question_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Question not found") external_qid = row[0] cur.close() resp = svc.answer_question(external_qid, text) cur = tenant_conn.cursor() cur.execute( """ UPDATE marketplace_questions SET answer_text = %s, status = 'answered', answer_date = NOW(), updated_at = NOW() WHERE id = %s """, (text, local_question_id), ) tenant_conn.commit() cur.close() return {"ok": True, "ml_response": resp} def list_local_questions(tenant_conn, status: str = None) -> list: """Return questions from local DB, optionally filtered by status.""" cur = tenant_conn.cursor() if status: cur.execute( """ SELECT q.id, q.external_question_id, q.external_item_id, q.question_text, q.answer_text, q.status, q.buyer_nickname, q.question_date, q.answer_date, q.created_at, l.title, l.external_permalink FROM marketplace_questions q LEFT JOIN marketplace_listings l ON l.id = q.listing_id WHERE q.status = %s ORDER BY q.question_date DESC """, (status,), ) else: cur.execute( """ SELECT q.id, q.external_question_id, q.external_item_id, q.question_text, q.answer_text, q.status, q.buyer_nickname, q.question_date, q.answer_date, q.created_at, l.title, l.external_permalink FROM marketplace_questions q LEFT JOIN marketplace_listings l ON l.id = q.listing_id ORDER BY q.question_date DESC """ ) rows = cur.fetchall() cur.close() results = [] for r in rows: results.append({ "id": r[0], "external_question_id": r[1], "external_item_id": r[2], "question_text": r[3], "answer_text": r[4], "status": r[5], "buyer_nickname": r[6], "question_date": r[7], "answer_date": r[8], "created_at": r[9], "listing_title": r[10], "listing_permalink": r[11], }) return results # ═══════════════════════════════════════════════════════════════════════════ # ORDERS # ═══════════════════════════════════════════════════════════════════════════ def fetch_and_save_orders(tenant_conn, date_from: Optional[str] = None) -> dict: """Pull orders from ML and upsert into marketplace_orders.""" cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) user_id = cfg.get("meli_user_id") if not svc or not user_id: raise ValueError("MercadoLibre not configured") ml_resp = svc.get_orders(user_id, status="paid", date_from=date_from) orders = ml_resp.get("results", []) cur = tenant_conn.cursor() created = 0 updated = 0 for order_summary in orders: order_id = order_summary.get("id") if not order_id: continue # Fetch full order detail try: full = svc.get_order(str(order_id)) except MeliError: continue external_status = full.get("status") buyer = full.get("buyer", {}) shipping = full.get("shipping", {}) order_items = full.get("order_items", []) # Build shipping address JSON shipping_address = None if shipping: shipping_address = { "id": shipping.get("id"), "status": shipping.get("status"), "tracking_number": shipping.get("tracking_number"), "shipping_method": shipping.get("shipping_option", {}).get("name"), } total_amount = full.get("total_amount") shipping_cost = shipping.get("cost") if shipping else None # Upsert order cur.execute( """ INSERT INTO marketplace_orders (channel, external_order_id, external_status, buyer_name, buyer_email, buyer_phone, buyer_nickname, shipping_address, total_amount, shipping_cost, meli_shipping_id, raw_json, updated_at) VALUES ('mercadolibre', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (external_order_id) DO UPDATE SET external_status = EXCLUDED.external_status, buyer_name = EXCLUDED.buyer_name, buyer_email = EXCLUDED.buyer_email, buyer_phone = EXCLUDED.buyer_phone, shipping_address = EXCLUDED.shipping_address, total_amount = EXCLUDED.total_amount, shipping_cost = EXCLUDED.shipping_cost, meli_shipping_id = EXCLUDED.meli_shipping_id, raw_json = EXCLUDED.raw_json, updated_at = NOW() RETURNING id """, ( str(order_id), external_status, buyer.get("first_name", "") + " " + buyer.get("last_name", ""), buyer.get("email"), buyer.get("phone", {}).get("number"), buyer.get("nickname"), json.dumps(shipping_address) if shipping_address else None, total_amount, shipping_cost, str(shipping.get("id")) if shipping else None, json.dumps(full), ), ) row = cur.fetchone() mpo_id = row[0] if row else None if mpo_id: # Check if this was insert or update cur.execute( "SELECT created_at FROM marketplace_orders WHERE id = %s", (mpo_id,), ) created_at = cur.fetchone()[0] # Simple heuristic: if updated_at == created_at (within 1s), it's new is_new = True # ON CONFLICT always returns id; we count as updated for simplicity updated += 1 # Upsert order items if mpo_id and order_items: # Clear old items and re-insert cur.execute( "DELETE FROM marketplace_order_items WHERE marketplace_order_id = %s", (mpo_id,), ) for it in order_items: item_data = it.get("item", {}) cur.execute( """ INSERT INTO marketplace_order_items (marketplace_order_id, external_item_id, title, quantity, unit_price, total_price) VALUES (%s, %s, %s, %s, %s, %s) """, ( mpo_id, item_data.get("id"), item_data.get("title"), it.get("quantity"), it.get("unit_price"), it.get("full_unit_price"), ), ) tenant_conn.commit() cur.close() return {"processed": len(orders), "updated": updated} def get_orders(tenant_conn, page=1, per_page=50, status=None): cur = tenant_conn.cursor() where = ["1=1"] params = [] if status: where.append("status = %s") params.append(status) cur.execute( f"SELECT COUNT(*) FROM marketplace_orders WHERE {' AND '.join(where)}", params, ) total = cur.fetchone()[0] cur.execute( f""" SELECT o.id, o.external_order_id, o.external_status, o.buyer_name, o.buyer_nickname, o.total_amount, o.status, o.created_at, o.nexus_sale_id FROM marketplace_orders o WHERE {' AND '.join(where)} ORDER BY o.created_at DESC LIMIT %s OFFSET %s """, params + [per_page, (page - 1) * per_page], ) rows = cur.fetchall() cur.close() items = [] for r in rows: items.append({ "id": r[0], "external_order_id": r[1], "external_status": r[2], "buyer_name": r[3], "buyer_nickname": r[4], "total_amount": float(r[5]) if r[5] else None, "status": r[6], "created_at": str(r[7]), "nexus_sale_id": r[8], }) return {"items": items, "total": total, "page": page, "per_page": per_page} def get_order_detail(tenant_conn, order_id: int) -> dict: cur = tenant_conn.cursor() cur.execute( """ SELECT id, external_order_id, external_status, buyer_name, buyer_email, buyer_phone, buyer_nickname, shipping_address, total_amount, shipping_cost, meli_shipping_id, nexus_sale_id, status, notes, raw_json, created_at, updated_at FROM marketplace_orders WHERE id = %s """, (order_id,), ) row = cur.fetchone() if not row: cur.close() raise ValueError("Order not found") order = { "id": row[0], "external_order_id": row[1], "external_status": row[2], "buyer_name": row[3], "buyer_email": row[4], "buyer_phone": row[5], "buyer_nickname": row[6], "shipping_address": row[7], "total_amount": float(row[8]) if row[8] else None, "shipping_cost": float(row[9]) if row[9] else None, "meli_shipping_id": row[10], "nexus_sale_id": row[11], "status": row[12], "notes": row[13], "raw_json": row[14], "created_at": str(row[15]), "updated_at": str(row[16]), } cur.execute( """ SELECT id, inventory_id, external_item_id, title, quantity, unit_price, total_price FROM marketplace_order_items WHERE marketplace_order_id = %s """, (order_id,), ) items = [] for r in cur.fetchall(): items.append({ "id": r[0], "inventory_id": r[1], "external_item_id": r[2], "title": r[3], "quantity": r[4], "unit_price": float(r[5]) if r[5] else None, "total_price": float(r[6]) if r[6] else None, }) order["items"] = items cur.close() return order def update_order_status(tenant_conn, order_id: int, new_status: str) -> dict: valid = {"pending", "confirmed", "packed", "shipped", "delivered", "cancelled", "rejected"} if new_status not in valid: raise ValueError(f"Invalid status. Allowed: {valid}") cur = tenant_conn.cursor() cur.execute( "UPDATE marketplace_orders SET status = %s, updated_at = NOW() WHERE id = %s RETURNING external_order_id", (new_status, order_id), ) row = cur.fetchone() tenant_conn.commit() cur.close() if not row: raise ValueError("Order not found") return {"ok": True, "status": new_status, "external_order_id": row[0]} # ═══════════════════════════════════════════════════════════════════════════ # CONVERT ORDER → SALE # ═══════════════════════════════════════════════════════════════════════════ def convert_order_to_sale( tenant_conn, marketplace_order_id: int, employee_id: int = None, register_id: int = None ) -> dict: """Convert a marketplace order into a Nexus sale. 1. Look up marketplace_order + items 2. Map items to inventory_id (via marketplace_listings external_item_id) 3. Build sale_data compatible with process_sale() 4. Call process_sale() 5. Link sale back to marketplace_order """ order = get_order_detail(tenant_conn, marketplace_order_id) if order.get("nexus_sale_id"): raise ValueError("Order already converted to sale") cur = tenant_conn.cursor() # Build sale items sale_items = [] for it in order["items"]: # Map external_item_id -> inventory_id via marketplace_listings cur.execute( "SELECT inventory_id FROM marketplace_listings WHERE external_item_id = %s LIMIT 1", (it["external_item_id"],), ) inv_row = cur.fetchone() inventory_id = inv_row[0] if inv_row else None if not inventory_id: # Try to match by title fuzzy? Skip for now. cur.close() raise ValueError(f"Could not map item {it['external_item_id']} to inventory") sale_items.append({ "inventory_id": inventory_id, "quantity": it["quantity"], "unit_price": float(it["unit_price"] or 0), "discount_pct": 0, "tax_rate": 0.16, }) # Find or create generic "MercadoLibre" customer cur.execute( "SELECT id FROM customers WHERE name = 'MercadoLibre' LIMIT 1" ) cust = cur.fetchone() if cust: customer_id = cust[0] else: cur.execute( """ INSERT INTO customers (name, email, phone, is_active, price_tier) VALUES ('MercadoLibre', 'marketplace@mercadolibre.com', '', true, 1) RETURNING id """ ) customer_id = cur.fetchone()[0] # Build sale_data sale_data = { "items": sale_items, "customer_id": customer_id, "payment_method": "transferencia", "sale_type": "cash", "register_id": register_id, "amount_paid": float(order["total_amount"] or 0), "notes": f"MercadoLibre order #{order['external_order_id']}", } # We need to run process_sale inside the same connection. # process_sale expects the caller to commit. We'll do that. # However, process_sale uses flask.g for employee_id and branch_id. # We need to set those or pass them explicitly. # For now, we'll create the sale manually to avoid flask.g dependency. sale = _create_sale_manual(tenant_conn, sale_data, employee_id=employee_id) # Link order to sale cur.execute( "UPDATE marketplace_orders SET nexus_sale_id = %s, status = 'confirmed', updated_at = NOW() WHERE id = %s", (sale["id"], marketplace_order_id), ) tenant_conn.commit() cur.close() return {"sale_id": sale["id"], "marketplace_order_id": marketplace_order_id} def _create_sale_manual(tenant_conn, sale_data: dict, employee_id: int = None) -> dict: """Create a sale record without relying on flask.g. Simplified version of process_sale for background / external use. """ from services.inventory_engine import record_sale as inventory_record_sale from decimal import Decimal, ROUND_HALF_UP cur = tenant_conn.cursor() items = sale_data.get("items", []) customer_id = sale_data.get("customer_id") payment_method = sale_data.get("payment_method", "efectivo") sale_type = sale_data.get("sale_type", "cash") register_id = sale_data.get("register_id") amount_paid = float(sale_data.get("amount_paid", 0)) notes = sale_data.get("notes") if not items: raise ValueError("No items in sale") # Enrich items inv_ids = [it["inventory_id"] for it in items] cur.execute( """ SELECT id, part_number, name, cost, price_1, tax_rate FROM inventory WHERE id = ANY(%s) AND is_active = true ORDER BY id FOR UPDATE """, (inv_ids,), ) inv_map = {r[0]: r for r in cur.fetchall()} enriched = [] for it in items: inv_id = it["inventory_id"] inv = inv_map.get(inv_id) if not inv: raise ValueError(f"Inventory item {inv_id} not found") qty = int(it.get("quantity", 1)) unit_price = float(it.get("unit_price", inv[4] or 0)) discount_pct = float(it.get("discount_pct", 0)) tax_rate = float(it.get("tax_rate", inv[5] or 0.16)) unit_cost = float(inv[3]) if inv[3] else 0 enriched.append({ "inventory_id": inv_id, "part_number": inv[1], "name": inv[2], "quantity": qty, "unit_price": unit_price, "unit_cost": unit_cost, "discount_pct": discount_pct, "tax_rate": tax_rate, }) totals = calculate_totals(enriched) # Insert sale cur.execute( """ INSERT INTO sales (customer_id, employee_id, register_id, sale_type, payment_method, subtotal, discount_total, tax_total, total, amount_paid, change_given, status, notes, source, external_order_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'completed', %s, 'mercadolibre', %s) RETURNING id """, ( customer_id, employee_id, register_id, sale_type, payment_method, totals["subtotal"], totals["discount_total"], totals["tax_total"], totals["total"], amount_paid, 0, notes, sale_data.get("external_order_id"), ), ) sale_id = cur.fetchone()[0] # Insert sale_items for it in totals["items"]: cur.execute( """ INSERT INTO sale_items (sale_id, inventory_id, part_number, name, quantity, unit_price, unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( sale_id, it["inventory_id"], it["part_number"], it["name"], it["quantity"], it["unit_price"], it.get("unit_cost", 0), it.get("discount_pct", 0), it.get("discount_amount", 0), it.get("tax_rate", 0.16), it.get("tax_amount", 0), it["subtotal"], ), ) # Deduct inventory for it in enriched: inventory_record_sale( tenant_conn, it["inventory_id"], it["quantity"], reference=f"ML sale {sale_id}" ) tenant_conn.commit() cur.close() return {"id": sale_id, **totals} # ═══════════════════════════════════════════════════════════════════════════ # IMPORT EXISTING LISTINGS FROM ML # ═══════════════════════════════════════════════════════════════════════════ def process_meli_sync_queue(tenant_conn, batch_size: int = 50) -> dict: """Process pending meli_sync_queue items and update stock on MercadoLibre. Called by a cronjob or manual trigger. """ cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) if not svc: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() cur.execute(""" SELECT id, inventory_id FROM meli_sync_queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT %s """, (batch_size,)) items = cur.fetchall() processed = 0 failed = 0 for queue_id, inventory_id in items: # Get current stock from services.inventory_engine import get_stock stock = get_stock(tenant_conn, inventory_id, branch_id=None) # Find the ML listing cur.execute(""" SELECT external_item_id FROM marketplace_listings WHERE inventory_id = %s AND channel = 'mercadolibre' AND is_active = true LIMIT 1 """, (inventory_id,)) row = cur.fetchone() if not row: # No active listing, mark as done cur.execute(""" UPDATE meli_sync_queue SET status = 'done', processed_at = NOW() WHERE id = %s """, (queue_id,)) tenant_conn.commit() processed += 1 continue external_item_id = row[0] try: svc.update_item(external_item_id, {"available_quantity": max(0, int(stock))}) cur.execute(""" UPDATE meli_sync_queue SET status = 'done', processed_at = NOW() WHERE id = %s """, (queue_id,)) tenant_conn.commit() processed += 1 except MeliError as e: tenant_conn.rollback() err_msg = _extract_meli_error(e) cur.execute(""" UPDATE meli_sync_queue SET status = 'failed', retry_count = retry_count + 1, error_message = %s, processed_at = NOW() WHERE id = %s """, (err_msg[:500], queue_id)) tenant_conn.commit() failed += 1 except Exception: tenant_conn.rollback() cur.execute(""" UPDATE meli_sync_queue SET status = 'failed', retry_count = retry_count + 1, error_message = 'Unexpected error', processed_at = NOW() WHERE id = %s """, (queue_id,)) tenant_conn.commit() failed += 1 cur.close() return {"processed": processed, "failed": failed, "total": len(items)} def import_existing_listings(tenant_conn, page_limit: int = 50) -> dict: """Import all existing MercadoLibre listings for the connected seller. For each ML item, tries to match it to a local inventory product by: 1. seller_custom_field (if it contains a part_number) 2. inventory_sku_aliases.sku matching the ML item id 3. inventory.part_number matching the ML item id or title Returns summary of imported / unmatched items. """ cfg = get_meli_config(tenant_conn) svc = _get_meli_service(cfg) user_id = cfg.get("meli_user_id") if not svc or not user_id: raise ValueError("MercadoLibre not configured") cur = tenant_conn.cursor() # Build lookup maps for matching cur.execute("SELECT id, part_number FROM inventory WHERE is_active = true") inv_by_part = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]} cur.execute("SELECT inventory_id, sku FROM inventory_sku_aliases WHERE is_active = true") inv_by_sku = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]} imported = 0 unmatched = 0 errors = 0 offset = 0 while True: try: resp = svc.get_user_items(str(user_id), limit=page_limit, offset=offset) except MeliError as e: cur.close() raise ValueError(f"Failed to fetch items: {e}") results = resp.get("results", []) if not results: break for item_id in results: try: item = svc.get_item(item_id) except MeliError: errors += 1 continue title = item.get("title", "") price = item.get("price", 0) status = item.get("status", "active") permalink = item.get("permalink", "") category_id = item.get("category_id", "") custom_field = item.get("seller_custom_field", "") or "" # Try to match to local inventory inventory_id = None cf_upper = custom_field.strip().upper() if cf_upper and cf_upper in inv_by_part: inventory_id = inv_by_part[cf_upper] elif cf_upper and cf_upper in inv_by_sku: inventory_id = inv_by_sku[cf_upper] else: item_id_upper = str(item_id).strip().upper() if item_id_upper in inv_by_sku: inventory_id = inv_by_sku[item_id_upper] elif item_id_upper in inv_by_part: inventory_id = inv_by_part[item_id_upper] # Upsert listing try: cur.execute( """ INSERT INTO marketplace_listings (inventory_id, channel, external_item_id, external_status, external_permalink, title, meli_category_id, publish_price, last_sync_at, sync_errors, is_active) VALUES (%s, 'mercadolibre', %s, %s, %s, %s, %s, %s, NOW(), NULL, true) ON CONFLICT (inventory_id, channel) WHERE is_active = true DO UPDATE SET external_item_id = EXCLUDED.external_item_id, external_status = EXCLUDED.external_status, external_permalink = EXCLUDED.external_permalink, title = EXCLUDED.title, meli_category_id = EXCLUDED.meli_category_id, publish_price = EXCLUDED.publish_price, last_sync_at = NOW(), sync_errors = NULL, is_active = true """, ( inventory_id, str(item_id), status, permalink, title, category_id, price, ), ) tenant_conn.commit() if inventory_id: imported += 1 else: unmatched += 1 except Exception: tenant_conn.rollback() errors += 1 if len(results) < page_limit: break offset += page_limit cur.close() return { "imported": imported, "unmatched": unmatched, "errors": errors, "total_processed": imported + unmatched + errors, }