"""MercadoLibre API client with OAuth2 auto-refresh. Endpoints used: - GET /users/me - POST /items - PUT /items/{id} - GET /items/{id} - GET /orders/search - GET /orders/{id} - POST /shipments/{id}/dispatch - POST /oauth/token References: https://developers.mercadolibre.com.ar/es_ar/api-docs-es """ import time import requests from typing import Optional BASE_URL = "https://api.mercadolibre.com" AUTH_URL = "https://api.mercadolibre.com/oauth/token" class MeliError(Exception): def __init__(self, message, status_code=None, response_body=None): super().__init__(message) self.status_code = status_code self.response_body = response_body class MeliAuthError(MeliError): pass class MeliService: def __init__( self, access_token: str, refresh_token: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, ): self.access_token = access_token self.refresh_token = refresh_token self.client_id = client_id self.client_secret = client_secret self._session = requests.Session() self._session.headers.update({"Authorization": f"Bearer {access_token}"}) # ─── Low-level request ─────────────────────────────────────────────── def _request( self, method: str, path: str, params: Optional[dict] = None, json_payload: Optional[dict] = None, retry_on_401: bool = True, ) -> dict: url = f"{BASE_URL}{path}" resp = self._session.request( method, url, params=params, json=json_payload, timeout=30 ) if resp.status_code == 401 and retry_on_401 and self.refresh_token: self._refresh_token() # Retry once with new token self._session.headers.update( {"Authorization": f"Bearer {self.access_token}"} ) resp = self._session.request( method, url, params=params, json=json_payload, timeout=30 ) if resp.status_code == 401: raise MeliAuthError( "Unauthorized. Token may be expired or invalid.", status_code=401, response_body=resp.text, ) if not resp.ok: raise MeliError( f"Meli API error {resp.status_code}: {resp.text}", status_code=resp.status_code, response_body=resp.text, ) # Some endpoints return 204 No Content if resp.status_code == 204: return {} try: return resp.json() except Exception: return {"raw": resp.text} def _refresh_token(self) -> dict: if not self.client_id or not self.client_secret or not self.refresh_token: raise MeliAuthError("Missing credentials for token refresh") payload = { "grant_type": "refresh_token", "client_id": self.client_id, "client_secret": self.client_secret, "refresh_token": self.refresh_token, } resp = requests.post(AUTH_URL, data=payload, timeout=30) if not resp.ok: raise MeliAuthError( f"Token refresh failed: {resp.status_code} {resp.text}", status_code=resp.status_code, response_body=resp.text, ) data = resp.json() self.access_token = data["access_token"] if "refresh_token" in data: self.refresh_token = data["refresh_token"] return data # ─── Auth / User ───────────────────────────────────────────────────── def get_user(self) -> dict: return self._request("GET", "/users/me") @staticmethod def exchange_code( code: str, client_id: str, client_secret: str, redirect_uri: str ) -> dict: """Exchange authorization code for tokens.""" payload = { "grant_type": "authorization_code", "client_id": client_id, "client_secret": client_secret, "code": code, "redirect_uri": redirect_uri, } resp = requests.post(AUTH_URL, data=payload, timeout=30) if not resp.ok: raise MeliAuthError( f"Code exchange failed: {resp.status_code} {resp.text}", status_code=resp.status_code, response_body=resp.text, ) return resp.json() # ─── Items (listings) ──────────────────────────────────────────────── def create_item(self, payload: dict) -> dict: return self._request("POST", "/items", json_payload=payload) def update_item(self, item_id: str, payload: dict) -> dict: return self._request("PUT", f"/items/{item_id}", json_payload=payload) def get_item(self, item_id: str) -> dict: return self._request("GET", f"/items/{item_id}") def pause_item(self, item_id: str) -> dict: return self.update_item(item_id, {"status": "paused"}) def activate_item(self, item_id: str) -> dict: return self.update_item(item_id, {"status": "active"}) def close_item(self, item_id: str) -> dict: return self.update_item(item_id, {"status": "closed"}) # ─── Categories ────────────────────────────────────────────────────── def get_category(self, category_id: str) -> dict: return self._request("GET", f"/categories/{category_id}") def search_categories(self, site_id: str, query: str) -> dict: # ML does not have a direct category search; we use the predictor return self._request( "GET", f"/sites/{site_id}/domain_discovery/search", params={"q": query}, ) def get_category_attributes(self, category_id: str) -> list: return self._request("GET", f"/categories/{category_id}/attributes") # ─── Orders ────────────────────────────────────────────────────────── def get_orders( self, seller_id: str, status: Optional[str] = None, date_from: Optional[str] = None, limit: int = 50, offset: int = 0, ) -> dict: params = {"seller": seller_id, "limit": limit, "offset": offset} if status: params["order.status"] = status if date_from: params["order.date_created.from"] = date_from return self._request("GET", "/orders/search", params=params) def get_order(self, order_id: str) -> dict: return self._request("GET", f"/orders/{order_id}") # ─── Shipments ─────────────────────────────────────────────────────── def get_shipment(self, shipment_id: str) -> dict: return self._request("GET", f"/shipments/{shipment_id}") def mark_ready_to_ship(self, shipment_id: str) -> dict: return self._request( "POST", f"/shipments/{shipment_id}/dispatch", json_payload={}, ) # ─── Notifications / Webhooks validation ───────────────────────────── @staticmethod def validate_webhook_signature( secret: str, data: bytes, signature_header: str ) -> bool: """Validate MercadoLibre webhook signature. ML sends: X-Signature: sha256= """ import hmac import hashlib if not signature_header or "=" not in signature_header: return False _, expected_hex = signature_header.split("=", 1) computed = hmac.new( secret.encode(), data, hashlib.sha256 ).hexdigest() return hmac.compare_digest(computed, expected_hex)