Files
Autoparts-DB/pos/services/meli_service.py
consultoria-as 7a4a676890 feat: MercadoLibre mejoras - importar existentes, sync stock, sync ordenes
- meli_service.py: agrega get_user_items() para obtener publicaciones del vendedor
- marketplace_external_service.py:
  - import_existing_listings(): importa publicaciones existentes de ML a marketplace_listings
  - process_meli_sync_queue(): procesa cola de sincronizacion de stock a ML
  - Actualiza stock en ML via update_item(available_quantity)
- marketplace_external_bp.py:
  - POST /listings/import-existing - importa publicaciones existentes
  - POST /sync-stock - procesa cola de stock manualmente
  - POST /orders/sync - sincroniza ordenes manualmente
- inventory_engine.py: inserta en meli_sync_queue tras cada operacion de inventario
- migration v4.2: crea tabla meli_sync_queue

Prueba en tenant_refaccionaria_rached: 52 publicaciones importadas exitosamente
2026-06-11 09:13:27 +00:00

268 lines
9.8 KiB
Python

"""MercadoLibre API client with OAuth2 auto-refresh.
Endpoints used:
- GET /users/me
- POST /items
- PUT /items/{id}
- GET /items/{id}
- GET /orders/search
- GET /orders/{id}
- POST /shipments/{id}/dispatch
- POST /oauth/token
References:
https://developers.mercadolibre.com.ar/es_ar/api-docs-es
"""
import time
import requests
from typing import Optional
BASE_URL = "https://api.mercadolibre.com"
AUTH_URL = "https://api.mercadolibre.com/oauth/token"
class MeliError(Exception):
def __init__(self, message, status_code=None, response_body=None):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class MeliAuthError(MeliError):
pass
class MeliService:
def __init__(
self,
access_token: str,
refresh_token: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
):
self.access_token = access_token
self.refresh_token = refresh_token
self.client_id = client_id
self.client_secret = client_secret
self._session = requests.Session()
self._session.headers.update({"Authorization": f"Bearer {access_token}"})
# ─── Low-level request ───────────────────────────────────────────────
def _request(
self,
method: str,
path: str,
params: Optional[dict] = None,
json_payload: Optional[dict] = None,
retry_on_401: bool = True,
) -> dict:
url = f"{BASE_URL}{path}"
resp = self._session.request(
method, url, params=params, json=json_payload, timeout=30
)
if resp.status_code == 401 and retry_on_401 and self.refresh_token:
self._refresh_token()
# Retry once with new token
self._session.headers.update(
{"Authorization": f"Bearer {self.access_token}"}
)
resp = self._session.request(
method, url, params=params, json=json_payload, timeout=30
)
if resp.status_code == 401:
raise MeliAuthError(
"Unauthorized. Token may be expired or invalid.",
status_code=401,
response_body=resp.text,
)
if not resp.ok:
raise MeliError(
f"Meli API error {resp.status_code}: {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
# Some endpoints return 204 No Content
if resp.status_code == 204:
return {}
try:
return resp.json()
except Exception:
return {"raw": resp.text}
def _refresh_token(self) -> dict:
if not self.client_id or not self.client_secret or not self.refresh_token:
raise MeliAuthError("Missing credentials for token refresh")
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
}
resp = requests.post(AUTH_URL, data=payload, timeout=30)
if not resp.ok:
raise MeliAuthError(
f"Token refresh failed: {resp.status_code} {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
data = resp.json()
self.access_token = data["access_token"]
if "refresh_token" in data:
self.refresh_token = data["refresh_token"]
return data
# ─── Auth / User ─────────────────────────────────────────────────────
def get_user(self) -> dict:
return self._request("GET", "/users/me")
@staticmethod
def exchange_code(
code: str, client_id: str, client_secret: str, redirect_uri: str
) -> dict:
"""Exchange authorization code for tokens."""
payload = {
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"redirect_uri": redirect_uri,
}
resp = requests.post(AUTH_URL, data=payload, timeout=30)
if not resp.ok:
raise MeliAuthError(
f"Code exchange failed: {resp.status_code} {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
return resp.json()
# ─── Items (listings) ────────────────────────────────────────────────
def validate_item(self, payload: dict) -> dict:
"""Validate an item payload without creating it."""
return self._request("POST", "/items/validate", json_payload=payload)
def create_item(self, payload: dict) -> dict:
return self._request("POST", "/items", json_payload=payload)
def update_item(self, item_id: str, payload: dict) -> dict:
return self._request("PUT", f"/items/{item_id}", json_payload=payload)
def get_item(self, item_id: str) -> dict:
return self._request("GET", f"/items/{item_id}")
def get_user_items(self, user_id: str, status: str = None, limit: int = 50, offset: int = 0) -> dict:
"""Get all items published by a seller.
ML endpoint: GET /users/{user_id}/items/search
"""
params = {"limit": limit, "offset": offset}
if status:
params["status"] = status
return self._request("GET", f"/users/{user_id}/items/search", params=params)
def pause_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "paused"})
def activate_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "active"})
def close_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "closed"})
# ─── Questions & Answers ─────────────────────────────────────────────
def get_questions(self, item_id: str, status: str = None, offset: int = 0, limit: int = 50) -> dict:
params = {"item_id": item_id, "offset": offset, "limit": limit}
if status:
params["status"] = status
return self._request("GET", "/questions/search", params=params)
def get_question(self, question_id: str) -> dict:
return self._request("GET", f"/questions/{question_id}")
def answer_question(self, question_id: str, text: str) -> dict:
return self._request("POST", "/answers", json_payload={"question_id": question_id, "text": text})
def delete_question(self, question_id: str) -> dict:
return self._request("DELETE", f"/questions/{question_id}")
# ─── Categories ──────────────────────────────────────────────────────
def get_category(self, category_id: str) -> dict:
return self._request("GET", f"/categories/{category_id}")
def search_categories(self, site_id: str, query: str) -> dict:
# ML does not have a direct category search; we use the predictor
return self._request(
"GET",
f"/sites/{site_id}/domain_discovery/search",
params={"q": query},
)
def get_category_attributes(self, category_id: str) -> list:
return self._request("GET", f"/categories/{category_id}/attributes")
def get_shipping_preferences(self, user_id: str) -> dict:
return self._request("GET", f"/users/{user_id}/shipping_preferences")
# ─── Orders ──────────────────────────────────────────────────────────
def get_orders(
self,
seller_id: str,
status: Optional[str] = None,
date_from: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> dict:
params = {"seller": seller_id, "limit": limit, "offset": offset}
if status:
params["order.status"] = status
if date_from:
params["order.date_created.from"] = date_from
return self._request("GET", "/orders/search", params=params)
def get_order(self, order_id: str) -> dict:
return self._request("GET", f"/orders/{order_id}")
# ─── Shipments ───────────────────────────────────────────────────────
def get_shipment(self, shipment_id: str) -> dict:
return self._request("GET", f"/shipments/{shipment_id}")
def mark_ready_to_ship(self, shipment_id: str) -> dict:
return self._request(
"POST",
f"/shipments/{shipment_id}/dispatch",
json_payload={},
)
# ─── Notifications / Webhooks validation ─────────────────────────────
@staticmethod
def validate_webhook_signature(
secret: str, data: bytes, signature_header: str
) -> bool:
"""Validate MercadoLibre webhook signature.
ML sends: X-Signature: sha256=<hex_hmac>
"""
import hmac
import hashlib
if not signature_header or "=" not in signature_header:
return False
_, expected_hex = signature_header.split("=", 1)
computed = hmac.new(
secret.encode(), data, hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed, expected_hex)