feat: MercadoLibre mejoras - importar existentes, sync stock, sync ordenes

- meli_service.py: agrega get_user_items() para obtener publicaciones del vendedor
- marketplace_external_service.py:
  - import_existing_listings(): importa publicaciones existentes de ML a marketplace_listings
  - process_meli_sync_queue(): procesa cola de sincronizacion de stock a ML
  - Actualiza stock en ML via update_item(available_quantity)
- marketplace_external_bp.py:
  - POST /listings/import-existing - importa publicaciones existentes
  - POST /sync-stock - procesa cola de stock manualmente
  - POST /orders/sync - sincroniza ordenes manualmente
- inventory_engine.py: inserta en meli_sync_queue tras cada operacion de inventario
- migration v4.2: crea tabla meli_sync_queue

Prueba en tenant_refaccionaria_rached: 52 publicaciones importadas exitosamente
This commit is contained in:
2026-06-11 09:13:27 +00:00
parent 08362c5677
commit 7a4a676890
5 changed files with 302 additions and 0 deletions

View File

@@ -235,6 +235,26 @@ def create_listings():
conn.close() conn.close()
@marketplace_ext_bp.route("/listings/import-existing", methods=["POST"])
@require_auth()
def import_existing_listings():
"""Import all existing MercadoLibre listings for the connected seller."""
err = _require_meli_manage()
if err:
return err
conn = get_tenant_conn(g.tenant_id)
try:
result = meli_svc.import_existing_listings(conn)
return jsonify(result), 200
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@marketplace_ext_bp.route("/inventory-check", methods=["POST"]) @marketplace_ext_bp.route("/inventory-check", methods=["POST"])
@require_auth() @require_auth()
def inventory_check(): def inventory_check():
@@ -254,6 +274,26 @@ def inventory_check():
conn.close() conn.close()
@marketplace_ext_bp.route("/sync-stock", methods=["POST"])
@require_auth()
def sync_stock_to_meli():
"""Process pending stock updates to MercadoLibre."""
err = _require_meli_manage()
if err:
return err
conn = get_tenant_conn(g.tenant_id)
try:
result = meli_svc.process_meli_sync_queue(conn)
return jsonify(result)
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@marketplace_ext_bp.route("/categories/<category_id>/attributes", methods=["GET"]) @marketplace_ext_bp.route("/categories/<category_id>/attributes", methods=["GET"])
@require_auth() @require_auth()
def category_attributes(category_id): def category_attributes(category_id):
@@ -526,6 +566,26 @@ def answer_question(question_id):
# ORDERS # ORDERS
# ═══════════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════════
@marketplace_ext_bp.route("/orders/sync", methods=["POST"])
@require_auth()
def sync_orders():
"""Manually trigger sync of MercadoLibre orders."""
err = _require_meli_manage()
if err:
return err
conn = get_tenant_conn(g.tenant_id)
try:
result = meli_svc.fetch_and_save_orders(conn)
return jsonify(result)
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
conn.close()
@marketplace_ext_bp.route("/orders", methods=["GET"]) @marketplace_ext_bp.route("/orders", methods=["GET"])
@require_auth() @require_auth()
def list_orders(): def list_orders():

View File

@@ -0,0 +1,14 @@
-- v4.2 — MercadoLibre sync queue for stock synchronization
CREATE TABLE IF NOT EXISTS meli_sync_queue (
id SERIAL PRIMARY KEY,
inventory_id INTEGER NOT NULL REFERENCES inventory(id),
action VARCHAR(20) NOT NULL DEFAULT 'stock_update',
status VARCHAR(20) NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
error_message TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_meli_sync_pending ON meli_sync_queue(status, created_at) WHERE status = 'pending';

View File

@@ -121,6 +121,18 @@ def record_operation(conn, inventory_id, branch_id, operation_type, quantity,
notes notes
)) ))
op_id = cur.fetchone()[0] op_id = cur.fetchone()[0]
# Queue ML stock sync if this product has an active ML listing
cur.execute("""
INSERT INTO meli_sync_queue (inventory_id, action, status)
SELECT %s, 'stock_update', 'pending'
WHERE EXISTS (
SELECT 1 FROM marketplace_listings
WHERE inventory_id = %s AND channel = 'mercadolibre' AND is_active = true
)
ON CONFLICT DO NOTHING
""", (inventory_id, inventory_id))
cur.close() cur.close()
return op_id return op_id

View File

@@ -1459,3 +1459,209 @@ def _create_sale_manual(tenant_conn, sale_data: dict, employee_id: int = None) -
tenant_conn.commit() tenant_conn.commit()
cur.close() cur.close()
return {"id": sale_id, **totals} return {"id": sale_id, **totals}
# ═══════════════════════════════════════════════════════════════════════════
# IMPORT EXISTING LISTINGS FROM ML
# ═══════════════════════════════════════════════════════════════════════════
def process_meli_sync_queue(tenant_conn, batch_size: int = 50) -> dict:
"""Process pending meli_sync_queue items and update stock on MercadoLibre.
Called by a cronjob or manual trigger.
"""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
cur.execute("""
SELECT id, inventory_id FROM meli_sync_queue
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT %s
""", (batch_size,))
items = cur.fetchall()
processed = 0
failed = 0
for queue_id, inventory_id in items:
# Get current stock
from services.inventory_engine import get_stock
stock = get_stock(tenant_conn, inventory_id, branch_id=None)
# Find the ML listing
cur.execute("""
SELECT external_item_id FROM marketplace_listings
WHERE inventory_id = %s AND channel = 'mercadolibre' AND is_active = true
LIMIT 1
""", (inventory_id,))
row = cur.fetchone()
if not row:
# No active listing, mark as done
cur.execute("""
UPDATE meli_sync_queue SET status = 'done', processed_at = NOW()
WHERE id = %s
""", (queue_id,))
tenant_conn.commit()
processed += 1
continue
external_item_id = row[0]
try:
svc.update_item(external_item_id, {"available_quantity": max(0, int(stock))})
cur.execute("""
UPDATE meli_sync_queue SET status = 'done', processed_at = NOW()
WHERE id = %s
""", (queue_id,))
tenant_conn.commit()
processed += 1
except MeliError as e:
tenant_conn.rollback()
err_msg = _extract_meli_error(e)
cur.execute("""
UPDATE meli_sync_queue
SET status = 'failed', retry_count = retry_count + 1,
error_message = %s, processed_at = NOW()
WHERE id = %s
""", (err_msg[:500], queue_id))
tenant_conn.commit()
failed += 1
except Exception:
tenant_conn.rollback()
cur.execute("""
UPDATE meli_sync_queue
SET status = 'failed', retry_count = retry_count + 1,
error_message = 'Unexpected error', processed_at = NOW()
WHERE id = %s
""", (queue_id,))
tenant_conn.commit()
failed += 1
cur.close()
return {"processed": processed, "failed": failed, "total": len(items)}
def import_existing_listings(tenant_conn, page_limit: int = 50) -> dict:
"""Import all existing MercadoLibre listings for the connected seller.
For each ML item, tries to match it to a local inventory product by:
1. seller_custom_field (if it contains a part_number)
2. inventory_sku_aliases.sku matching the ML item id
3. inventory.part_number matching the ML item id or title
Returns summary of imported / unmatched items.
"""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
user_id = cfg.get("meli_user_id")
if not svc or not user_id:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
# Build lookup maps for matching
cur.execute("SELECT id, part_number FROM inventory WHERE is_active = true")
inv_by_part = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]}
cur.execute("SELECT inventory_id, sku FROM inventory_sku_aliases WHERE is_active = true")
inv_by_sku = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]}
imported = 0
unmatched = 0
errors = 0
offset = 0
while True:
try:
resp = svc.get_user_items(str(user_id), limit=page_limit, offset=offset)
except MeliError as e:
cur.close()
raise ValueError(f"Failed to fetch items: {e}")
results = resp.get("results", [])
if not results:
break
for item_id in results:
try:
item = svc.get_item(item_id)
except MeliError:
errors += 1
continue
title = item.get("title", "")
price = item.get("price", 0)
status = item.get("status", "active")
permalink = item.get("permalink", "")
category_id = item.get("category_id", "")
custom_field = item.get("seller_custom_field", "") or ""
# Try to match to local inventory
inventory_id = None
cf_upper = custom_field.strip().upper()
if cf_upper and cf_upper in inv_by_part:
inventory_id = inv_by_part[cf_upper]
elif cf_upper and cf_upper in inv_by_sku:
inventory_id = inv_by_sku[cf_upper]
else:
item_id_upper = str(item_id).strip().upper()
if item_id_upper in inv_by_sku:
inventory_id = inv_by_sku[item_id_upper]
elif item_id_upper in inv_by_part:
inventory_id = inv_by_part[item_id_upper]
# Upsert listing
try:
cur.execute(
"""
INSERT INTO marketplace_listings
(inventory_id, channel, external_item_id, external_status,
external_permalink, title, meli_category_id, publish_price,
last_sync_at, sync_errors, is_active)
VALUES (%s, 'mercadolibre', %s, %s, %s, %s, %s, %s, NOW(), NULL, true)
ON CONFLICT (inventory_id, channel) WHERE is_active = true
DO UPDATE SET
external_item_id = EXCLUDED.external_item_id,
external_status = EXCLUDED.external_status,
external_permalink = EXCLUDED.external_permalink,
title = EXCLUDED.title,
meli_category_id = EXCLUDED.meli_category_id,
publish_price = EXCLUDED.publish_price,
last_sync_at = NOW(),
sync_errors = NULL,
is_active = true
""",
(
inventory_id,
str(item_id),
status,
permalink,
title,
category_id,
price,
),
)
tenant_conn.commit()
if inventory_id:
imported += 1
else:
unmatched += 1
except Exception:
tenant_conn.rollback()
errors += 1
if len(results) < page_limit:
break
offset += page_limit
cur.close()
return {
"imported": imported,
"unmatched": unmatched,
"errors": errors,
"total_processed": imported + unmatched + errors,
}

View File

@@ -158,6 +158,16 @@ class MeliService:
def get_item(self, item_id: str) -> dict: def get_item(self, item_id: str) -> dict:
return self._request("GET", f"/items/{item_id}") return self._request("GET", f"/items/{item_id}")
def get_user_items(self, user_id: str, status: str = None, limit: int = 50, offset: int = 0) -> dict:
"""Get all items published by a seller.
ML endpoint: GET /users/{user_id}/items/search
"""
params = {"limit": limit, "offset": offset}
if status:
params["status"] = status
return self._request("GET", f"/users/{user_id}/items/search", params=params)
def pause_item(self, item_id: str) -> dict: def pause_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "paused"}) return self.update_item(item_id, {"status": "paused"})