- Add upload_image() to MeliService using ML /pictures endpoint - Add get_listing_price() to fetch exact ML fees - Auto-upload inventory images to ML before validate/publish - Show fee breakdown and net earnings in validate modal - Fallback to approximate fees (13-18%) if ML API fails
328 lines
13 KiB
Python
328 lines
13 KiB
Python
"""MercadoLibre API client with OAuth2 auto-refresh.
|
|
|
|
Endpoints used:
|
|
- GET /users/me
|
|
- POST /items
|
|
- PUT /items/{id}
|
|
- GET /items/{id}
|
|
- GET /orders/search
|
|
- GET /orders/{id}
|
|
- POST /shipments/{id}/dispatch
|
|
- POST /oauth/token
|
|
|
|
References:
|
|
https://developers.mercadolibre.com.ar/es_ar/api-docs-es
|
|
"""
|
|
|
|
import time
|
|
import requests
|
|
from typing import Optional
|
|
|
|
BASE_URL = "https://api.mercadolibre.com"
|
|
AUTH_URL = "https://api.mercadolibre.com/oauth/token"
|
|
|
|
|
|
class MeliError(Exception):
|
|
def __init__(self, message, status_code=None, response_body=None):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.response_body = response_body
|
|
|
|
|
|
class MeliAuthError(MeliError):
|
|
pass
|
|
|
|
|
|
class MeliService:
|
|
def __init__(
|
|
self,
|
|
access_token: str,
|
|
refresh_token: Optional[str] = None,
|
|
client_id: Optional[str] = None,
|
|
client_secret: Optional[str] = None,
|
|
):
|
|
self.access_token = access_token
|
|
self.refresh_token = refresh_token
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self._session = requests.Session()
|
|
self._session.headers.update({"Authorization": f"Bearer {access_token}"})
|
|
|
|
# ─── Low-level request ───────────────────────────────────────────────
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
params: Optional[dict] = None,
|
|
json_payload: Optional[dict] = None,
|
|
retry_on_401: bool = True,
|
|
) -> dict:
|
|
url = f"{BASE_URL}{path}"
|
|
resp = self._session.request(
|
|
method, url, params=params, json=json_payload, timeout=30
|
|
)
|
|
|
|
if resp.status_code == 401 and retry_on_401 and self.refresh_token:
|
|
self._refresh_token()
|
|
# Retry once with new token
|
|
self._session.headers.update(
|
|
{"Authorization": f"Bearer {self.access_token}"}
|
|
)
|
|
resp = self._session.request(
|
|
method, url, params=params, json=json_payload, timeout=30
|
|
)
|
|
|
|
if resp.status_code == 401:
|
|
raise MeliAuthError(
|
|
"Unauthorized. Token may be expired or invalid.",
|
|
status_code=401,
|
|
response_body=resp.text,
|
|
)
|
|
|
|
if not resp.ok:
|
|
raise MeliError(
|
|
f"Meli API error {resp.status_code}: {resp.text}",
|
|
status_code=resp.status_code,
|
|
response_body=resp.text,
|
|
)
|
|
|
|
# Some endpoints return 204 No Content
|
|
if resp.status_code == 204:
|
|
return {}
|
|
try:
|
|
return resp.json()
|
|
except Exception:
|
|
return {"raw": resp.text}
|
|
|
|
def _refresh_token(self) -> dict:
|
|
if not self.client_id or not self.client_secret or not self.refresh_token:
|
|
raise MeliAuthError("Missing credentials for token refresh")
|
|
payload = {
|
|
"grant_type": "refresh_token",
|
|
"client_id": self.client_id,
|
|
"client_secret": self.client_secret,
|
|
"refresh_token": self.refresh_token,
|
|
}
|
|
resp = requests.post(AUTH_URL, data=payload, timeout=30)
|
|
if not resp.ok:
|
|
raise MeliAuthError(
|
|
f"Token refresh failed: {resp.status_code} {resp.text}",
|
|
status_code=resp.status_code,
|
|
response_body=resp.text,
|
|
)
|
|
data = resp.json()
|
|
self.access_token = data["access_token"]
|
|
if "refresh_token" in data:
|
|
self.refresh_token = data["refresh_token"]
|
|
return data
|
|
|
|
# ─── Auth / User ─────────────────────────────────────────────────────
|
|
|
|
def get_user(self) -> dict:
|
|
return self._request("GET", "/users/me")
|
|
|
|
@staticmethod
|
|
def exchange_code(
|
|
code: str, client_id: str, client_secret: str, redirect_uri: str
|
|
) -> dict:
|
|
"""Exchange authorization code for tokens."""
|
|
payload = {
|
|
"grant_type": "authorization_code",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"code": code,
|
|
"redirect_uri": redirect_uri,
|
|
}
|
|
resp = requests.post(AUTH_URL, data=payload, timeout=30)
|
|
if not resp.ok:
|
|
raise MeliAuthError(
|
|
f"Code exchange failed: {resp.status_code} {resp.text}",
|
|
status_code=resp.status_code,
|
|
response_body=resp.text,
|
|
)
|
|
return resp.json()
|
|
|
|
# ─── Images ──────────────────────────────────────────────────────────
|
|
|
|
def upload_image(self, image_path_or_url: str) -> dict:
|
|
"""Upload an image to MercadoLibre's image hosting.
|
|
|
|
Accepts either a local file path or a URL.
|
|
Returns the ML picture dict with 'id' and 'secure_url' / 'url' keys.
|
|
"""
|
|
import os
|
|
import requests as raw_requests
|
|
|
|
# If it's a URL, download it first
|
|
if image_path_or_url.startswith("http://") or image_path_or_url.startswith("https://"):
|
|
img_resp = raw_requests.get(image_path_or_url, timeout=30)
|
|
if not img_resp.ok:
|
|
raise MeliError(f"Failed to download image from {image_path_or_url}: {img_resp.status_code}")
|
|
file_bytes = img_resp.content
|
|
content_type = img_resp.headers.get("Content-Type", "image/jpeg")
|
|
filename = "image.jpg"
|
|
else:
|
|
if not os.path.exists(image_path_or_url):
|
|
raise MeliError(f"Image file not found: {image_path_or_url}")
|
|
with open(image_path_or_url, "rb") as f:
|
|
file_bytes = f.read()
|
|
content_type = "image/jpeg"
|
|
filename = os.path.basename(image_path_or_url)
|
|
|
|
upload_url = f"{BASE_URL}/pictures"
|
|
files = {"file": (filename, file_bytes, content_type)}
|
|
req_headers = {"Authorization": f"Bearer {self.access_token}"}
|
|
resp = raw_requests.post(upload_url, files=files, headers=req_headers, timeout=60)
|
|
|
|
if resp.status_code == 401 and self.refresh_token:
|
|
self._refresh_token()
|
|
req_headers["Authorization"] = f"Bearer {self.access_token}"
|
|
resp = raw_requests.post(upload_url, files=files, headers=req_headers, timeout=60)
|
|
|
|
if resp.status_code == 401:
|
|
raise MeliAuthError("Unauthorized. Token may be expired.", status_code=401, response_body=resp.text)
|
|
if not resp.ok:
|
|
raise MeliError(f"Image upload failed: {resp.status_code} {resp.text}", status_code=resp.status_code, response_body=resp.text)
|
|
|
|
return resp.json()
|
|
|
|
def get_listing_price(self, site_id: str, price: float, listing_type_id: str, category_id: str) -> dict:
|
|
"""Get the exact fee / net amount for a given price, listing type and category.
|
|
|
|
ML endpoint: GET /sites/{site_id}/listing_prices
|
|
Returns dict with sale_fee_amount, net_amount, etc.
|
|
"""
|
|
return self._request(
|
|
"GET",
|
|
f"/sites/{site_id}/listing_prices",
|
|
params={
|
|
"price": str(price),
|
|
"listing_type_id": listing_type_id,
|
|
"category_id": category_id,
|
|
},
|
|
)
|
|
|
|
# ─── Items (listings) ────────────────────────────────────────────────
|
|
|
|
def validate_item(self, payload: dict) -> dict:
|
|
"""Validate an item payload without creating it."""
|
|
return self._request("POST", "/items/validate", json_payload=payload)
|
|
|
|
def create_item(self, payload: dict) -> dict:
|
|
return self._request("POST", "/items", json_payload=payload)
|
|
|
|
def update_item(self, item_id: str, payload: dict) -> dict:
|
|
return self._request("PUT", f"/items/{item_id}", json_payload=payload)
|
|
|
|
def get_item(self, item_id: str) -> dict:
|
|
return self._request("GET", f"/items/{item_id}")
|
|
|
|
def get_user_items(self, user_id: str, status: str = None, limit: int = 50, offset: int = 0) -> dict:
|
|
"""Get all items published by a seller.
|
|
|
|
ML endpoint: GET /users/{user_id}/items/search
|
|
"""
|
|
params = {"limit": limit, "offset": offset}
|
|
if status:
|
|
params["status"] = status
|
|
return self._request("GET", f"/users/{user_id}/items/search", params=params)
|
|
|
|
def pause_item(self, item_id: str) -> dict:
|
|
return self.update_item(item_id, {"status": "paused"})
|
|
|
|
def activate_item(self, item_id: str) -> dict:
|
|
return self.update_item(item_id, {"status": "active"})
|
|
|
|
def close_item(self, item_id: str) -> dict:
|
|
return self.update_item(item_id, {"status": "closed"})
|
|
|
|
# ─── Questions & Answers ─────────────────────────────────────────────
|
|
|
|
def get_questions(self, item_id: str, status: str = None, offset: int = 0, limit: int = 50) -> dict:
|
|
params = {"item_id": item_id, "offset": offset, "limit": limit}
|
|
if status:
|
|
params["status"] = status
|
|
return self._request("GET", "/questions/search", params=params)
|
|
|
|
def get_question(self, question_id: str) -> dict:
|
|
return self._request("GET", f"/questions/{question_id}")
|
|
|
|
def answer_question(self, question_id: str, text: str) -> dict:
|
|
return self._request("POST", "/answers", json_payload={"question_id": question_id, "text": text})
|
|
|
|
def delete_question(self, question_id: str) -> dict:
|
|
return self._request("DELETE", f"/questions/{question_id}")
|
|
|
|
# ─── Categories ──────────────────────────────────────────────────────
|
|
|
|
def get_category(self, category_id: str) -> dict:
|
|
return self._request("GET", f"/categories/{category_id}")
|
|
|
|
def search_categories(self, site_id: str, query: str) -> dict:
|
|
# ML does not have a direct category search; we use the predictor
|
|
return self._request(
|
|
"GET",
|
|
f"/sites/{site_id}/domain_discovery/search",
|
|
params={"q": query},
|
|
)
|
|
|
|
def get_category_attributes(self, category_id: str) -> list:
|
|
return self._request("GET", f"/categories/{category_id}/attributes")
|
|
|
|
def get_shipping_preferences(self, user_id: str) -> dict:
|
|
return self._request("GET", f"/users/{user_id}/shipping_preferences")
|
|
|
|
# ─── Orders ──────────────────────────────────────────────────────────
|
|
|
|
def get_orders(
|
|
self,
|
|
seller_id: str,
|
|
status: Optional[str] = None,
|
|
date_from: Optional[str] = None,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
) -> dict:
|
|
params = {"seller": seller_id, "limit": limit, "offset": offset}
|
|
if status:
|
|
params["order.status"] = status
|
|
if date_from:
|
|
params["order.date_created.from"] = date_from
|
|
return self._request("GET", "/orders/search", params=params)
|
|
|
|
def get_order(self, order_id: str) -> dict:
|
|
return self._request("GET", f"/orders/{order_id}")
|
|
|
|
# ─── Shipments ───────────────────────────────────────────────────────
|
|
|
|
def get_shipment(self, shipment_id: str) -> dict:
|
|
return self._request("GET", f"/shipments/{shipment_id}")
|
|
|
|
def mark_ready_to_ship(self, shipment_id: str) -> dict:
|
|
return self._request(
|
|
"POST",
|
|
f"/shipments/{shipment_id}/dispatch",
|
|
json_payload={},
|
|
)
|
|
|
|
# ─── Notifications / Webhooks validation ─────────────────────────────
|
|
|
|
@staticmethod
|
|
def validate_webhook_signature(
|
|
secret: str, data: bytes, signature_header: str
|
|
) -> bool:
|
|
"""Validate MercadoLibre webhook signature.
|
|
|
|
ML sends: X-Signature: sha256=<hex_hmac>
|
|
"""
|
|
import hmac
|
|
import hashlib
|
|
|
|
if not signature_header or "=" not in signature_header:
|
|
return False
|
|
_, expected_hex = signature_header.split("=", 1)
|
|
computed = hmac.new(
|
|
secret.encode(), data, hashlib.sha256
|
|
).hexdigest()
|
|
return hmac.compare_digest(computed, expected_hex)
|