From 7a4a6768908092f6e53e829d23ed1b508eccd943 Mon Sep 17 00:00:00 2001 From: consultoria-as Date: Thu, 11 Jun 2026 09:13:27 +0000 Subject: [PATCH] feat: MercadoLibre mejoras - importar existentes, sync stock, sync ordenes - meli_service.py: agrega get_user_items() para obtener publicaciones del vendedor - marketplace_external_service.py: - import_existing_listings(): importa publicaciones existentes de ML a marketplace_listings - process_meli_sync_queue(): procesa cola de sincronizacion de stock a ML - Actualiza stock en ML via update_item(available_quantity) - marketplace_external_bp.py: - POST /listings/import-existing - importa publicaciones existentes - POST /sync-stock - procesa cola de stock manualmente - POST /orders/sync - sincroniza ordenes manualmente - inventory_engine.py: inserta en meli_sync_queue tras cada operacion de inventario - migration v4.2: crea tabla meli_sync_queue Prueba en tenant_refaccionaria_rached: 52 publicaciones importadas exitosamente --- pos/blueprints/marketplace_external_bp.py | 60 ++++++ pos/migrations/v4.2_meli_sync_queue.sql | 14 ++ pos/services/inventory_engine.py | 12 ++ pos/services/marketplace_external_service.py | 206 +++++++++++++++++++ pos/services/meli_service.py | 10 + 5 files changed, 302 insertions(+) create mode 100644 pos/migrations/v4.2_meli_sync_queue.sql diff --git a/pos/blueprints/marketplace_external_bp.py b/pos/blueprints/marketplace_external_bp.py index dd9fe05..08fb313 100644 --- a/pos/blueprints/marketplace_external_bp.py +++ b/pos/blueprints/marketplace_external_bp.py @@ -235,6 +235,26 @@ def create_listings(): conn.close() +@marketplace_ext_bp.route("/listings/import-existing", methods=["POST"]) +@require_auth() +def import_existing_listings(): + """Import all existing MercadoLibre listings for the connected seller.""" + err = _require_meli_manage() + if err: + return err + + conn = get_tenant_conn(g.tenant_id) + try: + result = meli_svc.import_existing_listings(conn) + return jsonify(result), 200 + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + conn.close() + + @marketplace_ext_bp.route("/inventory-check", methods=["POST"]) @require_auth() def inventory_check(): @@ -254,6 +274,26 @@ def inventory_check(): conn.close() +@marketplace_ext_bp.route("/sync-stock", methods=["POST"]) +@require_auth() +def sync_stock_to_meli(): + """Process pending stock updates to MercadoLibre.""" + err = _require_meli_manage() + if err: + return err + + conn = get_tenant_conn(g.tenant_id) + try: + result = meli_svc.process_meli_sync_queue(conn) + return jsonify(result) + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + conn.close() + + @marketplace_ext_bp.route("/categories//attributes", methods=["GET"]) @require_auth() def category_attributes(category_id): @@ -526,6 +566,26 @@ def answer_question(question_id): # ORDERS # ═══════════════════════════════════════════════════════════════════════════ +@marketplace_ext_bp.route("/orders/sync", methods=["POST"]) +@require_auth() +def sync_orders(): + """Manually trigger sync of MercadoLibre orders.""" + err = _require_meli_manage() + if err: + return err + + conn = get_tenant_conn(g.tenant_id) + try: + result = meli_svc.fetch_and_save_orders(conn) + return jsonify(result) + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + conn.close() + + @marketplace_ext_bp.route("/orders", methods=["GET"]) @require_auth() def list_orders(): diff --git a/pos/migrations/v4.2_meli_sync_queue.sql b/pos/migrations/v4.2_meli_sync_queue.sql new file mode 100644 index 0000000..2a2558d --- /dev/null +++ b/pos/migrations/v4.2_meli_sync_queue.sql @@ -0,0 +1,14 @@ +-- v4.2 — MercadoLibre sync queue for stock synchronization + +CREATE TABLE IF NOT EXISTS meli_sync_queue ( + id SERIAL PRIMARY KEY, + inventory_id INTEGER NOT NULL REFERENCES inventory(id), + action VARCHAR(20) NOT NULL DEFAULT 'stock_update', + status VARCHAR(20) NOT NULL DEFAULT 'pending', + retry_count INTEGER DEFAULT 0, + error_message TEXT, + created_at TIMESTAMPTZ DEFAULT NOW(), + processed_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_meli_sync_pending ON meli_sync_queue(status, created_at) WHERE status = 'pending'; diff --git a/pos/services/inventory_engine.py b/pos/services/inventory_engine.py index 92c2aed..26251b6 100644 --- a/pos/services/inventory_engine.py +++ b/pos/services/inventory_engine.py @@ -121,6 +121,18 @@ def record_operation(conn, inventory_id, branch_id, operation_type, quantity, notes )) op_id = cur.fetchone()[0] + + # Queue ML stock sync if this product has an active ML listing + cur.execute(""" + INSERT INTO meli_sync_queue (inventory_id, action, status) + SELECT %s, 'stock_update', 'pending' + WHERE EXISTS ( + SELECT 1 FROM marketplace_listings + WHERE inventory_id = %s AND channel = 'mercadolibre' AND is_active = true + ) + ON CONFLICT DO NOTHING + """, (inventory_id, inventory_id)) + cur.close() return op_id diff --git a/pos/services/marketplace_external_service.py b/pos/services/marketplace_external_service.py index a218c80..4eaa608 100644 --- a/pos/services/marketplace_external_service.py +++ b/pos/services/marketplace_external_service.py @@ -1459,3 +1459,209 @@ def _create_sale_manual(tenant_conn, sale_data: dict, employee_id: int = None) - tenant_conn.commit() cur.close() return {"id": sale_id, **totals} + + +# ═══════════════════════════════════════════════════════════════════════════ +# IMPORT EXISTING LISTINGS FROM ML +# ═══════════════════════════════════════════════════════════════════════════ + +def process_meli_sync_queue(tenant_conn, batch_size: int = 50) -> dict: + """Process pending meli_sync_queue items and update stock on MercadoLibre. + + Called by a cronjob or manual trigger. + """ + cfg = get_meli_config(tenant_conn) + svc = _get_meli_service(cfg) + if not svc: + raise ValueError("MercadoLibre not configured") + + cur = tenant_conn.cursor() + cur.execute(""" + SELECT id, inventory_id FROM meli_sync_queue + WHERE status = 'pending' + ORDER BY created_at ASC + LIMIT %s + """, (batch_size,)) + items = cur.fetchall() + + processed = 0 + failed = 0 + + for queue_id, inventory_id in items: + # Get current stock + from services.inventory_engine import get_stock + stock = get_stock(tenant_conn, inventory_id, branch_id=None) + + # Find the ML listing + cur.execute(""" + SELECT external_item_id FROM marketplace_listings + WHERE inventory_id = %s AND channel = 'mercadolibre' AND is_active = true + LIMIT 1 + """, (inventory_id,)) + row = cur.fetchone() + if not row: + # No active listing, mark as done + cur.execute(""" + UPDATE meli_sync_queue SET status = 'done', processed_at = NOW() + WHERE id = %s + """, (queue_id,)) + tenant_conn.commit() + processed += 1 + continue + + external_item_id = row[0] + + try: + svc.update_item(external_item_id, {"available_quantity": max(0, int(stock))}) + cur.execute(""" + UPDATE meli_sync_queue SET status = 'done', processed_at = NOW() + WHERE id = %s + """, (queue_id,)) + tenant_conn.commit() + processed += 1 + except MeliError as e: + tenant_conn.rollback() + err_msg = _extract_meli_error(e) + cur.execute(""" + UPDATE meli_sync_queue + SET status = 'failed', retry_count = retry_count + 1, + error_message = %s, processed_at = NOW() + WHERE id = %s + """, (err_msg[:500], queue_id)) + tenant_conn.commit() + failed += 1 + except Exception: + tenant_conn.rollback() + cur.execute(""" + UPDATE meli_sync_queue + SET status = 'failed', retry_count = retry_count + 1, + error_message = 'Unexpected error', processed_at = NOW() + WHERE id = %s + """, (queue_id,)) + tenant_conn.commit() + failed += 1 + + cur.close() + return {"processed": processed, "failed": failed, "total": len(items)} + + +def import_existing_listings(tenant_conn, page_limit: int = 50) -> dict: + """Import all existing MercadoLibre listings for the connected seller. + + For each ML item, tries to match it to a local inventory product by: + 1. seller_custom_field (if it contains a part_number) + 2. inventory_sku_aliases.sku matching the ML item id + 3. inventory.part_number matching the ML item id or title + + Returns summary of imported / unmatched items. + """ + cfg = get_meli_config(tenant_conn) + svc = _get_meli_service(cfg) + user_id = cfg.get("meli_user_id") + if not svc or not user_id: + raise ValueError("MercadoLibre not configured") + + cur = tenant_conn.cursor() + + # Build lookup maps for matching + cur.execute("SELECT id, part_number FROM inventory WHERE is_active = true") + inv_by_part = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]} + + cur.execute("SELECT inventory_id, sku FROM inventory_sku_aliases WHERE is_active = true") + inv_by_sku = {r[1].strip().upper(): r[0] for r in cur.fetchall() if r[1]} + + imported = 0 + unmatched = 0 + errors = 0 + offset = 0 + + while True: + try: + resp = svc.get_user_items(str(user_id), limit=page_limit, offset=offset) + except MeliError as e: + cur.close() + raise ValueError(f"Failed to fetch items: {e}") + + results = resp.get("results", []) + if not results: + break + + for item_id in results: + try: + item = svc.get_item(item_id) + except MeliError: + errors += 1 + continue + + title = item.get("title", "") + price = item.get("price", 0) + status = item.get("status", "active") + permalink = item.get("permalink", "") + category_id = item.get("category_id", "") + custom_field = item.get("seller_custom_field", "") or "" + + # Try to match to local inventory + inventory_id = None + cf_upper = custom_field.strip().upper() + if cf_upper and cf_upper in inv_by_part: + inventory_id = inv_by_part[cf_upper] + elif cf_upper and cf_upper in inv_by_sku: + inventory_id = inv_by_sku[cf_upper] + else: + item_id_upper = str(item_id).strip().upper() + if item_id_upper in inv_by_sku: + inventory_id = inv_by_sku[item_id_upper] + elif item_id_upper in inv_by_part: + inventory_id = inv_by_part[item_id_upper] + + # Upsert listing + try: + cur.execute( + """ + INSERT INTO marketplace_listings + (inventory_id, channel, external_item_id, external_status, + external_permalink, title, meli_category_id, publish_price, + last_sync_at, sync_errors, is_active) + VALUES (%s, 'mercadolibre', %s, %s, %s, %s, %s, %s, NOW(), NULL, true) + ON CONFLICT (inventory_id, channel) WHERE is_active = true + DO UPDATE SET + external_item_id = EXCLUDED.external_item_id, + external_status = EXCLUDED.external_status, + external_permalink = EXCLUDED.external_permalink, + title = EXCLUDED.title, + meli_category_id = EXCLUDED.meli_category_id, + publish_price = EXCLUDED.publish_price, + last_sync_at = NOW(), + sync_errors = NULL, + is_active = true + """, + ( + inventory_id, + str(item_id), + status, + permalink, + title, + category_id, + price, + ), + ) + tenant_conn.commit() + if inventory_id: + imported += 1 + else: + unmatched += 1 + except Exception: + tenant_conn.rollback() + errors += 1 + + if len(results) < page_limit: + break + offset += page_limit + + cur.close() + return { + "imported": imported, + "unmatched": unmatched, + "errors": errors, + "total_processed": imported + unmatched + errors, + } diff --git a/pos/services/meli_service.py b/pos/services/meli_service.py index 06afce3..3ac3c5e 100644 --- a/pos/services/meli_service.py +++ b/pos/services/meli_service.py @@ -158,6 +158,16 @@ class MeliService: def get_item(self, item_id: str) -> dict: return self._request("GET", f"/items/{item_id}") + def get_user_items(self, user_id: str, status: str = None, limit: int = 50, offset: int = 0) -> dict: + """Get all items published by a seller. + + ML endpoint: GET /users/{user_id}/items/search + """ + params = {"limit": limit, "offset": offset} + if status: + params["status"] = status + return self._request("GET", f"/users/{user_id}/items/search", params=params) + def pause_item(self, item_id: str) -> dict: return self.update_item(item_id, {"status": "paused"})