Files
Autoparts-DB/pos/services/meli_service.py
consultoria-as 3d70c3fcc9 feat(ml): upload images to ML hosting + show earnings estimate on validate
- Add upload_image() to MeliService using ML /pictures endpoint
- Add get_listing_price() to fetch exact ML fees
- Auto-upload inventory images to ML before validate/publish
- Show fee breakdown and net earnings in validate modal
- Fallback to approximate fees (13-18%) if ML API fails
2026-06-11 18:35:25 +00:00

328 lines
13 KiB
Python

"""MercadoLibre API client with OAuth2 auto-refresh.
Endpoints used:
- GET /users/me
- POST /items
- PUT /items/{id}
- GET /items/{id}
- GET /orders/search
- GET /orders/{id}
- POST /shipments/{id}/dispatch
- POST /oauth/token
References:
https://developers.mercadolibre.com.ar/es_ar/api-docs-es
"""
import time
import requests
from typing import Optional
BASE_URL = "https://api.mercadolibre.com"
AUTH_URL = "https://api.mercadolibre.com/oauth/token"
class MeliError(Exception):
def __init__(self, message, status_code=None, response_body=None):
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
class MeliAuthError(MeliError):
pass
class MeliService:
def __init__(
self,
access_token: str,
refresh_token: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
):
self.access_token = access_token
self.refresh_token = refresh_token
self.client_id = client_id
self.client_secret = client_secret
self._session = requests.Session()
self._session.headers.update({"Authorization": f"Bearer {access_token}"})
# ─── Low-level request ───────────────────────────────────────────────
def _request(
self,
method: str,
path: str,
params: Optional[dict] = None,
json_payload: Optional[dict] = None,
retry_on_401: bool = True,
) -> dict:
url = f"{BASE_URL}{path}"
resp = self._session.request(
method, url, params=params, json=json_payload, timeout=30
)
if resp.status_code == 401 and retry_on_401 and self.refresh_token:
self._refresh_token()
# Retry once with new token
self._session.headers.update(
{"Authorization": f"Bearer {self.access_token}"}
)
resp = self._session.request(
method, url, params=params, json=json_payload, timeout=30
)
if resp.status_code == 401:
raise MeliAuthError(
"Unauthorized. Token may be expired or invalid.",
status_code=401,
response_body=resp.text,
)
if not resp.ok:
raise MeliError(
f"Meli API error {resp.status_code}: {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
# Some endpoints return 204 No Content
if resp.status_code == 204:
return {}
try:
return resp.json()
except Exception:
return {"raw": resp.text}
def _refresh_token(self) -> dict:
if not self.client_id or not self.client_secret or not self.refresh_token:
raise MeliAuthError("Missing credentials for token refresh")
payload = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
}
resp = requests.post(AUTH_URL, data=payload, timeout=30)
if not resp.ok:
raise MeliAuthError(
f"Token refresh failed: {resp.status_code} {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
data = resp.json()
self.access_token = data["access_token"]
if "refresh_token" in data:
self.refresh_token = data["refresh_token"]
return data
# ─── Auth / User ─────────────────────────────────────────────────────
def get_user(self) -> dict:
return self._request("GET", "/users/me")
@staticmethod
def exchange_code(
code: str, client_id: str, client_secret: str, redirect_uri: str
) -> dict:
"""Exchange authorization code for tokens."""
payload = {
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"code": code,
"redirect_uri": redirect_uri,
}
resp = requests.post(AUTH_URL, data=payload, timeout=30)
if not resp.ok:
raise MeliAuthError(
f"Code exchange failed: {resp.status_code} {resp.text}",
status_code=resp.status_code,
response_body=resp.text,
)
return resp.json()
# ─── Images ──────────────────────────────────────────────────────────
def upload_image(self, image_path_or_url: str) -> dict:
"""Upload an image to MercadoLibre's image hosting.
Accepts either a local file path or a URL.
Returns the ML picture dict with 'id' and 'secure_url' / 'url' keys.
"""
import os
import requests as raw_requests
# If it's a URL, download it first
if image_path_or_url.startswith("http://") or image_path_or_url.startswith("https://"):
img_resp = raw_requests.get(image_path_or_url, timeout=30)
if not img_resp.ok:
raise MeliError(f"Failed to download image from {image_path_or_url}: {img_resp.status_code}")
file_bytes = img_resp.content
content_type = img_resp.headers.get("Content-Type", "image/jpeg")
filename = "image.jpg"
else:
if not os.path.exists(image_path_or_url):
raise MeliError(f"Image file not found: {image_path_or_url}")
with open(image_path_or_url, "rb") as f:
file_bytes = f.read()
content_type = "image/jpeg"
filename = os.path.basename(image_path_or_url)
upload_url = f"{BASE_URL}/pictures"
files = {"file": (filename, file_bytes, content_type)}
req_headers = {"Authorization": f"Bearer {self.access_token}"}
resp = raw_requests.post(upload_url, files=files, headers=req_headers, timeout=60)
if resp.status_code == 401 and self.refresh_token:
self._refresh_token()
req_headers["Authorization"] = f"Bearer {self.access_token}"
resp = raw_requests.post(upload_url, files=files, headers=req_headers, timeout=60)
if resp.status_code == 401:
raise MeliAuthError("Unauthorized. Token may be expired.", status_code=401, response_body=resp.text)
if not resp.ok:
raise MeliError(f"Image upload failed: {resp.status_code} {resp.text}", status_code=resp.status_code, response_body=resp.text)
return resp.json()
def get_listing_price(self, site_id: str, price: float, listing_type_id: str, category_id: str) -> dict:
"""Get the exact fee / net amount for a given price, listing type and category.
ML endpoint: GET /sites/{site_id}/listing_prices
Returns dict with sale_fee_amount, net_amount, etc.
"""
return self._request(
"GET",
f"/sites/{site_id}/listing_prices",
params={
"price": str(price),
"listing_type_id": listing_type_id,
"category_id": category_id,
},
)
# ─── Items (listings) ────────────────────────────────────────────────
def validate_item(self, payload: dict) -> dict:
"""Validate an item payload without creating it."""
return self._request("POST", "/items/validate", json_payload=payload)
def create_item(self, payload: dict) -> dict:
return self._request("POST", "/items", json_payload=payload)
def update_item(self, item_id: str, payload: dict) -> dict:
return self._request("PUT", f"/items/{item_id}", json_payload=payload)
def get_item(self, item_id: str) -> dict:
return self._request("GET", f"/items/{item_id}")
def get_user_items(self, user_id: str, status: str = None, limit: int = 50, offset: int = 0) -> dict:
"""Get all items published by a seller.
ML endpoint: GET /users/{user_id}/items/search
"""
params = {"limit": limit, "offset": offset}
if status:
params["status"] = status
return self._request("GET", f"/users/{user_id}/items/search", params=params)
def pause_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "paused"})
def activate_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "active"})
def close_item(self, item_id: str) -> dict:
return self.update_item(item_id, {"status": "closed"})
# ─── Questions & Answers ─────────────────────────────────────────────
def get_questions(self, item_id: str, status: str = None, offset: int = 0, limit: int = 50) -> dict:
params = {"item_id": item_id, "offset": offset, "limit": limit}
if status:
params["status"] = status
return self._request("GET", "/questions/search", params=params)
def get_question(self, question_id: str) -> dict:
return self._request("GET", f"/questions/{question_id}")
def answer_question(self, question_id: str, text: str) -> dict:
return self._request("POST", "/answers", json_payload={"question_id": question_id, "text": text})
def delete_question(self, question_id: str) -> dict:
return self._request("DELETE", f"/questions/{question_id}")
# ─── Categories ──────────────────────────────────────────────────────
def get_category(self, category_id: str) -> dict:
return self._request("GET", f"/categories/{category_id}")
def search_categories(self, site_id: str, query: str) -> dict:
# ML does not have a direct category search; we use the predictor
return self._request(
"GET",
f"/sites/{site_id}/domain_discovery/search",
params={"q": query},
)
def get_category_attributes(self, category_id: str) -> list:
return self._request("GET", f"/categories/{category_id}/attributes")
def get_shipping_preferences(self, user_id: str) -> dict:
return self._request("GET", f"/users/{user_id}/shipping_preferences")
# ─── Orders ──────────────────────────────────────────────────────────
def get_orders(
self,
seller_id: str,
status: Optional[str] = None,
date_from: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> dict:
params = {"seller": seller_id, "limit": limit, "offset": offset}
if status:
params["order.status"] = status
if date_from:
params["order.date_created.from"] = date_from
return self._request("GET", "/orders/search", params=params)
def get_order(self, order_id: str) -> dict:
return self._request("GET", f"/orders/{order_id}")
# ─── Shipments ───────────────────────────────────────────────────────
def get_shipment(self, shipment_id: str) -> dict:
return self._request("GET", f"/shipments/{shipment_id}")
def mark_ready_to_ship(self, shipment_id: str) -> dict:
return self._request(
"POST",
f"/shipments/{shipment_id}/dispatch",
json_payload={},
)
# ─── Notifications / Webhooks validation ─────────────────────────────
@staticmethod
def validate_webhook_signature(
secret: str, data: bytes, signature_header: str
) -> bool:
"""Validate MercadoLibre webhook signature.
ML sends: X-Signature: sha256=<hex_hmac>
"""
import hmac
import hashlib
if not signature_header or "=" not in signature_header:
return False
_, expected_hex = signature_header.split("=", 1)
computed = hmac.new(
secret.encode(), data, hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed, expected_hex)