Files
Autoparts-DB/pos/services/marketplace_external_service.py
consultoria-as 584cc385b9 feat: add custom/not_specified shipping modes with cost input for ML publish
- build_item_payload supports shipping_cost for custom mode with costs array
- Add shipping mode selector: me2, custom, not_specified
- Show shipping cost input when custom is selected
- Backend passes shipping_cost through custom_data to payload builder
2026-05-26 06:32:21 +00:00

1205 lines
42 KiB
Python

"""Business logic for MercadoLibre external marketplace integration.
Depends on:
- meli_service.py (HTTP client)
- pos_engine.py (sale creation)
- inventory_engine.py (stock queries)
- image_service.py (image URLs)
"""
import json
import logging
from typing import Optional
from decimal import Decimal
from services.meli_service import MeliService, MeliError, MeliAuthError
from services.pos_engine import process_sale, calculate_totals
from services.inventory_engine import get_stock, get_stock_bulk
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════════════════════
# CONFIG HELPERS
# ═══════════════════════════════════════════════════════════════════════════
MELI_CONFIG_KEYS = [
"meli_access_token",
"meli_refresh_token",
"meli_user_id",
"meli_site_id",
"meli_enabled",
"meli_auto_publish",
"meli_sync_interval_min",
"meli_order_sync_interval_min",
"meli_default_category_id",
"meli_shipping_mode",
"meli_client_id",
"meli_client_secret",
]
def _get_config_value(cur, key: str, default=None):
cur.execute("SELECT value FROM tenant_config WHERE key = %s", (key,))
row = cur.fetchone()
return row[0] if row else default
def get_meli_config(tenant_conn) -> dict:
"""Read ML config from tenant_config."""
cur = tenant_conn.cursor()
cfg = {}
for key in MELI_CONFIG_KEYS:
cfg[key] = _get_config_value(cur, key)
cur.close()
# Normalize booleans
cfg["meli_enabled"] = (cfg.get("meli_enabled") or "").lower() == "true"
cfg["meli_auto_publish"] = (cfg.get("meli_auto_publish") or "").lower() == "true"
cfg["meli_sync_interval_min"] = int(cfg.get("meli_sync_interval_min") or 15)
cfg["meli_order_sync_interval_min"] = int(cfg.get("meli_order_sync_interval_min") or 5)
return cfg
def save_meli_config(tenant_conn, updates: dict) -> None:
"""Upsert ML config keys into tenant_config."""
cur = tenant_conn.cursor()
for key, value in updates.items():
if value is None:
continue
cur.execute(
"""
INSERT INTO tenant_config (key, value, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()
""",
(key, str(value)),
)
tenant_conn.commit()
cur.close()
def delete_meli_config(tenant_conn) -> None:
"""Remove all ML config keys."""
cur = tenant_conn.cursor()
cur.execute(
"DELETE FROM tenant_config WHERE key LIKE 'meli_%'"
)
tenant_conn.commit()
cur.close()
def _get_meli_service(cfg: dict) -> Optional[MeliService]:
"""Build MeliService from config dict."""
token = cfg.get("meli_access_token")
if not token:
return None
return MeliService(
access_token=token,
refresh_token=cfg.get("meli_refresh_token"),
client_id=cfg.get("meli_client_id"),
client_secret=cfg.get("meli_client_secret"),
)
# ═══════════════════════════════════════════════════════════════════════════
# ITEM PAYLOAD BUILDER
# ═══════════════════════════════════════════════════════════════════════════
def _extract_meli_error(err: MeliError) -> str:
"""Extract the most useful error message from a MeliError response."""
base = str(err)
body = err.response_body
if not body:
return base
try:
data = json.loads(body)
msg = data.get("message") or data.get("error")
causes = data.get("cause", [])
if causes and isinstance(causes, list):
cause_msgs = [c.get("message") for c in causes if c.get("message")]
if cause_msgs:
msg = (msg + " | " if msg else "") + "; ".join(cause_msgs)
if msg:
# Translate common account-configuration errors to actionable messages
lowered = msg.lower()
if "me2 adoption is mandatory" in lowered:
return msg + " | Debes activar MercadoEnvíos (ME2) en tu cuenta de MercadoLibre. Ve a Configuración > Envíos en el panel de vendedor de ML."
if "user has not mode" in lowered:
return msg + " | Tu cuenta no tiene configurado este modo de envío. Ve a Configuración > Envíos en MercadoLibre y completa la configuración de MercadoEnvíos."
if "mandatory free shipping" in lowered:
return msg + " | MercadoLibre está forzando envío gratis en tu cuenta. Esto es normal si estás en el programa de envíos gratuito. El artículo se publicará con envío gratis."
return msg
except Exception:
pass
return base
def build_item_payload(
inventory_row: dict,
images: list[str],
meli_category_id: str,
price: float,
stock: int,
shipping_mode: str = "me2",
listing_type_id: str = "gold_special",
custom_title: str = None,
extra_attributes: list = None,
shipping_cost: float = None,
) -> dict:
"""Convert a Nexus inventory row into a MercadoLibre item payload."""
title = custom_title or f"{inventory_row['name']} {inventory_row['brand'] or ''} {inventory_row['part_number'] or ''}".strip()
# ML title limit is 60 chars; truncate smartly
if len(title) > 60:
title = title[:57] + "..."
shipping_payload = {"mode": shipping_mode}
if shipping_mode == "custom" and shipping_cost is not None:
shipping_payload["local_pick_up"] = False
shipping_payload["free_shipping"] = False
shipping_payload["methods"] = []
shipping_payload["costs"] = [{"description": "Envio", "cost": str(shipping_cost)}]
payload = {
"title": title,
"category_id": meli_category_id,
"price": round(float(price), 2),
"currency_id": "MXN",
"available_quantity": max(int(stock), 0),
"buying_mode": "buy_it_now",
"listing_type_id": listing_type_id,
"condition": "new",
"pictures": [{"source": url} for url in images if url],
"shipping": shipping_payload,
"attributes": [],
}
if inventory_row.get("brand"):
payload["attributes"].append(
{"id": "BRAND", "value_name": inventory_row["brand"]}
)
if inventory_row.get("part_number"):
payload["attributes"].append(
{"id": "PART_NUMBER", "value_name": inventory_row["part_number"]}
)
# Vehicle compatibility as attributes (if available)
vehicle_compat = inventory_row.get("vehicle_compatibility")
if vehicle_compat:
if isinstance(vehicle_compat, str):
try:
vehicle_compat = json.loads(vehicle_compat)
except Exception:
vehicle_compat = None
if isinstance(vehicle_compat, list) and vehicle_compat:
first = vehicle_compat[0]
if isinstance(first, dict):
if first.get("brand"):
payload["attributes"].append(
{"id": "VEHICLE_MODEL", "value_name": first["brand"]}
)
if first.get("model"):
payload["attributes"].append(
{"id": "VEHICLE_MODEL_NAME", "value_name": first["model"]}
)
# Extra attributes from user input (category requirements)
if extra_attributes:
for attr in extra_attributes:
if attr.get("id") and attr.get("value_name"):
payload["attributes"].append(attr)
return payload
def check_meli_shipping_config(svc: MeliService, cfg: dict) -> dict:
"""Check if the user's ML account has the required shipping modes configured.
Returns {"ok": True} or {"ok": False, "error": "...", "available_modes": [...]}.
"""
user_id = cfg.get("meli_user_id")
if not user_id:
return {"ok": False, "error": "Usuario de ML no configurado"}
try:
prefs = svc.get_shipping_preferences(str(user_id))
modes = prefs.get("modes", [])
mandatory = prefs.get("mandatory_mode_for_user", [])
if mandatory and not any(m in modes for m in mandatory):
return {
"ok": False,
"error": f"Tu cuenta requiere obligatoriamente los modos de envío: {', '.join(mandatory)}. Actualmente solo tienes: {', '.join(modes)}. Configúralos en el panel de vendedor de MercadoLibre.",
"available_modes": modes,
"mandatory_modes": mandatory,
}
return {"ok": True, "available_modes": modes, "mandatory_modes": mandatory}
except MeliError as e:
logger.warning("Failed to fetch shipping preferences: %s", e)
return {"ok": True} # Don't block on preference fetch failure
except Exception as e:
logger.warning("Failed to fetch shipping preferences: %s", e)
return {"ok": True}
# ═══════════════════════════════════════════════════════════════════════════
# LISTINGS CRUD
# ═══════════════════════════════════════════════════════════════════════════
def check_inventory_ml_status(tenant_conn, inventory_ids: list[int]) -> dict:
"""Check local pre-flight status for ML publishing.
Returns per-item dict with checks: has_image, has_stock, has_price,
already_published, and the generated title.
"""
cur = tenant_conn.cursor()
cur.execute(
"""
SELECT id, part_number, name, brand, price_1, image_url
FROM inventory
WHERE id = ANY(%s)
""",
(inventory_ids,),
)
rows = {r[0]: r for r in cur.fetchall()}
stock_map = get_stock_bulk(tenant_conn, branch_id=None)
# Check existing active listings
cur.execute(
"""
SELECT inventory_id, external_item_id, external_status, external_permalink
FROM marketplace_listings
WHERE inventory_id = ANY(%s) AND channel = 'mercadolibre' AND is_active = true
""",
(inventory_ids,),
)
listings = {r[0]: {"external_item_id": r[1], "status": r[2], "permalink": r[3]} for r in cur.fetchall()}
cur.close()
results = []
for inv_id in inventory_ids:
row = rows.get(inv_id)
if not row:
results.append({"inventory_id": inv_id, "exists": False})
continue
price = float(row[4]) if row[4] else 0
stock = stock_map.get(inv_id, 0)
image_url = row[5]
title = f"{row[2]} {row[3] or ''} {row[1] or ''}".strip()
if len(title) > 60:
title = title[:57] + "..."
existing = listings.get(inv_id)
results.append({
"inventory_id": inv_id,
"exists": True,
"title": title,
"has_image": bool(image_url),
"has_stock": stock > 0,
"has_price": price > 0,
"price": price,
"stock": stock,
"image_url": image_url,
"already_published": existing is not None,
"existing_listing": existing,
})
return {"items": results}
def validate_items(
tenant_conn,
inventory_ids: list[int],
meli_category_id: str,
listing_type_id: str = "gold_special",
shipping_mode: str = "me2",
custom_data: dict = None,
) -> dict:
"""Validate items against ML /items/validate without creating them.
Returns per-item validation results with ML error details if any.
"""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
shipping_check = check_meli_shipping_config(svc, cfg)
if not shipping_check["ok"]:
return {"valid": [], "invalid": [{"inventory_id": "config", "error": shipping_check["error"]}]}
cur = tenant_conn.cursor()
cur.execute(
"""
SELECT id, part_number, name, brand, price_1, vehicle_compatibility,
image_url, unit, is_active
FROM inventory
WHERE id = ANY(%s) AND is_active = true
""",
(inventory_ids,),
)
rows = {r[0]: r for r in cur.fetchall()}
stock_map = get_stock_bulk(tenant_conn, branch_id=None)
custom_data = custom_data or {}
results = {"valid": [], "invalid": []}
for inv_id in inventory_ids:
row = rows.get(inv_id)
if not row:
results["invalid"].append({"inventory_id": inv_id, "error": "No encontrado o inactivo"})
continue
inv = {
"id": row[0],
"part_number": row[1],
"name": row[2],
"brand": row[3],
"price_1": float(row[4]) if row[4] else 0,
"vehicle_compatibility": row[5],
"image_url": row[6],
"unit": row[7],
}
stock = stock_map.get(inv_id, 0)
if stock <= 0:
results["invalid"].append({"inventory_id": inv_id, "error": "Sin stock disponible"})
continue
if inv["price_1"] <= 0:
results["invalid"].append({"inventory_id": inv_id, "error": "El precio debe ser mayor a 0"})
continue
images = []
if inv.get("image_url"):
images.append(inv["image_url"])
if not images:
results["invalid"].append({"inventory_id": inv_id, "error": "El producto no tiene imagen"})
continue
title = (custom_data.get("titles") or {}).get(str(inv_id))
extra_attrs = (custom_data.get("attributes") or {}).get(str(inv_id))
price = (custom_data.get("prices") or {}).get(str(inv_id), inv["price_1"])
item_stock = (custom_data.get("stocks") or {}).get(str(inv_id), stock)
shipping_cost = custom_data.get("shipping_cost")
payload = build_item_payload(
inv, images, meli_category_id, price, item_stock,
shipping_mode=shipping_mode, listing_type_id=listing_type_id,
custom_title=title, extra_attributes=extra_attrs,
shipping_cost=shipping_cost,
)
try:
validation = svc.validate_item(payload)
if validation.get("status") == "valid":
results["valid"].append({"inventory_id": inv_id, "validation": validation})
else:
errors = validation.get("validation_errors", [])
error_msgs = [f"{e.get('field', '')}: {e.get('message', '')}" for e in errors]
results["invalid"].append({"inventory_id": inv_id, "error": " | ".join(error_msgs) or "Validación fallida", "validation": validation})
except MeliError as e:
err_msg = _extract_meli_error(e)
results["invalid"].append({"inventory_id": inv_id, "error": err_msg})
except Exception as e:
results["invalid"].append({"inventory_id": inv_id, "error": str(e)})
cur.close()
return results
def publish_items(
tenant_conn,
inventory_ids: list[int],
meli_category_id: str,
listing_type_id: str = "gold_special",
shipping_mode: str = "me2",
custom_data: dict = None,
) -> dict:
"""Publish one or more inventory items to MercadoLibre.
Returns summary dict with success/failure per item.
"""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
shipping_check = check_meli_shipping_config(svc, cfg)
if not shipping_check["ok"]:
return {"success": [], "failed": [{"inventory_id": "config", "error": shipping_check["error"]}]}
cur = tenant_conn.cursor()
# Batch fetch inventory rows
cur.execute(
"""
SELECT id, part_number, name, brand, price_1, vehicle_compatibility,
image_url, unit, is_active
FROM inventory
WHERE id = ANY(%s) AND is_active = true
""",
(inventory_ids,),
)
rows = {r[0]: r for r in cur.fetchall()}
# Batch fetch stock
stock_map = get_stock_bulk(tenant_conn, branch_id=None)
custom_data = custom_data or {}
results = {"success": [], "failed": []}
for inv_id in inventory_ids:
row = rows.get(inv_id)
if not row:
results["failed"].append({"inventory_id": inv_id, "error": "Not found or inactive"})
continue
inv = {
"id": row[0],
"part_number": row[1],
"name": row[2],
"brand": row[3],
"price_1": float(row[4]) if row[4] else 0,
"vehicle_compatibility": row[5],
"image_url": row[6],
"unit": row[7],
}
stock = stock_map.get(inv_id, 0)
if stock <= 0:
results["failed"].append({"inventory_id": inv_id, "error": "Sin stock disponible"})
continue
if inv["price_1"] <= 0:
results["failed"].append({"inventory_id": inv_id, "error": "El precio debe ser mayor a 0"})
continue
# Build image list
images = []
if inv.get("image_url"):
images.append(inv["image_url"])
if not images:
results["failed"].append({"inventory_id": inv_id, "error": "El producto no tiene imagen. ML requiere imagen para publicar."})
continue
title = (custom_data.get("titles") or {}).get(str(inv_id))
extra_attrs = (custom_data.get("attributes") or {}).get(str(inv_id))
price = (custom_data.get("prices") or {}).get(str(inv_id), inv["price_1"])
item_stock = (custom_data.get("stocks") or {}).get(str(inv_id), stock)
shipping_cost = custom_data.get("shipping_cost")
payload = build_item_payload(
inv, images, meli_category_id, price, item_stock,
shipping_mode=shipping_mode, listing_type_id=listing_type_id,
custom_title=title, extra_attributes=extra_attrs,
shipping_cost=shipping_cost,
)
try:
ml_item = svc.create_item(payload)
external_item_id = ml_item.get("id")
permalink = ml_item.get("permalink")
# Persist listing
cur.execute(
"""
INSERT INTO marketplace_listings
(inventory_id, channel, external_item_id, external_status,
external_permalink, title, meli_category_id, publish_price,
last_sync_at, sync_errors, is_active)
VALUES (%s, 'mercadolibre', %s, 'active', %s, %s, %s, %s, NOW(), NULL, true)
ON CONFLICT (inventory_id, channel) WHERE is_active = true
DO UPDATE SET
external_item_id = EXCLUDED.external_item_id,
external_status = EXCLUDED.external_status,
external_permalink = EXCLUDED.external_permalink,
title = EXCLUDED.title,
meli_category_id = EXCLUDED.meli_category_id,
publish_price = EXCLUDED.publish_price,
last_sync_at = NOW(),
sync_errors = NULL,
is_active = true
""",
(
inv_id,
external_item_id,
permalink,
payload["title"],
meli_category_id,
inv["price_1"],
),
)
tenant_conn.commit()
results["success"].append(
{"inventory_id": inv_id, "external_item_id": external_item_id, "permalink": permalink}
)
except MeliError as e:
tenant_conn.rollback()
err_msg = _extract_meli_error(e)
logger.warning("ML publish failed for inventory_id=%s: %s | payload=%s | response=%s", inv_id, err_msg, json.dumps(payload), e.response_body)
results["failed"].append({"inventory_id": inv_id, "error": err_msg})
except Exception as e:
tenant_conn.rollback()
logger.exception("Unexpected error publishing inventory_id=%s", inv_id)
results["failed"].append({"inventory_id": inv_id, "error": str(e)})
cur.close()
return results
def get_listings(tenant_conn, page: int = 1, per_page: int = 50, status: str = None):
cur = tenant_conn.cursor()
where = ["1=1"]
params = []
if status:
where.append("external_status = %s")
params.append(status)
count_sql = f"SELECT COUNT(*) FROM marketplace_listings WHERE {' AND '.join(where)}"
cur.execute(count_sql, params)
total = cur.fetchone()[0]
sql = f"""
SELECT l.id, l.inventory_id, l.external_item_id, l.external_status,
l.external_permalink, l.title, l.meli_category_id, l.publish_price,
l.last_sync_at, l.sync_errors, l.is_active, l.created_at,
i.part_number, i.name, i.price_1, i.brand
FROM marketplace_listings l
JOIN inventory i ON i.id = l.inventory_id
WHERE {' AND '.join(where)}
ORDER BY l.created_at DESC
LIMIT %s OFFSET %s
"""
cur.execute(sql, params + [per_page, (page - 1) * per_page])
rows = cur.fetchall()
cur.close()
items = []
for r in rows:
items.append({
"id": r[0],
"inventory_id": r[1],
"external_item_id": r[2],
"external_status": r[3],
"external_permalink": r[4],
"title": r[5],
"meli_category_id": r[6],
"publish_price": float(r[7]) if r[7] else None,
"last_sync_at": str(r[8]) if r[8] else None,
"sync_errors": r[9],
"is_active": r[10],
"created_at": str(r[11]),
"part_number": r[12],
"inventory_name": r[13],
"current_price": float(r[14]) if r[14] else None,
"brand": r[15],
})
return {"items": items, "total": total, "page": page, "per_page": per_page}
def sync_listing(tenant_conn, listing_id: int) -> dict:
"""Force a manual sync of stock/price for a single listing."""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
cur.execute(
"""
SELECT id, inventory_id, external_item_id, meli_category_id
FROM marketplace_listings WHERE id = %s AND is_active = true
""",
(listing_id,),
)
row = cur.fetchone()
if not row:
cur.close()
raise ValueError("Listing not found")
listing = {
"id": row[0],
"inventory_id": row[1],
"external_item_id": row[2],
"meli_category_id": row[3],
}
# Get current stock/price
cur.execute(
"SELECT price_1 FROM inventory WHERE id = %s",
(listing["inventory_id"],),
)
price_row = cur.fetchone()
current_price = float(price_row[0]) if price_row and price_row[0] else 0
stock_map = get_stock_bulk(tenant_conn, branch_id=None)
current_stock = stock_map.get(listing["inventory_id"], 0)
try:
svc.update_item(
listing["external_item_id"],
{"price": round(current_price, 2), "available_quantity": max(current_stock, 0)},
)
cur.execute(
"""
UPDATE marketplace_listings
SET last_sync_at = NOW(), sync_errors = NULL, publish_price = %s
WHERE id = %s
""",
(current_price, listing_id),
)
tenant_conn.commit()
cur.close()
return {"ok": True, "price": current_price, "stock": current_stock}
except MeliError as e:
tenant_conn.rollback()
cur.execute(
"UPDATE marketplace_listings SET sync_errors = %s WHERE id = %s",
(str(e)[:500], listing_id),
)
tenant_conn.commit()
cur.close()
raise
def pause_listing(tenant_conn, listing_id: int) -> dict:
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
cur.execute(
"SELECT external_item_id FROM marketplace_listings WHERE id = %s",
(listing_id,),
)
row = cur.fetchone()
if not row:
cur.close()
raise ValueError("Listing not found")
svc.pause_item(row[0])
cur.execute(
"UPDATE marketplace_listings SET external_status = 'paused' WHERE id = %s",
(listing_id,),
)
tenant_conn.commit()
cur.close()
return {"ok": True, "status": "paused"}
def activate_listing(tenant_conn, listing_id: int) -> dict:
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
cur.execute(
"SELECT external_item_id FROM marketplace_listings WHERE id = %s",
(listing_id,),
)
row = cur.fetchone()
if not row:
cur.close()
raise ValueError("Listing not found")
svc.activate_item(row[0])
cur.execute(
"UPDATE marketplace_listings SET external_status = 'active' WHERE id = %s",
(listing_id,),
)
tenant_conn.commit()
cur.close()
return {"ok": True, "status": "active"}
def close_listing(tenant_conn, listing_id: int) -> dict:
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
if not svc:
raise ValueError("MercadoLibre not configured")
cur = tenant_conn.cursor()
cur.execute(
"SELECT external_item_id FROM marketplace_listings WHERE id = %s",
(listing_id,),
)
row = cur.fetchone()
if not row:
cur.close()
raise ValueError("Listing not found")
svc.close_item(row[0])
cur.execute(
"UPDATE marketplace_listings SET external_status = 'closed', is_active = false WHERE id = %s",
(listing_id,),
)
tenant_conn.commit()
cur.close()
return {"ok": True, "status": "closed"}
# ═══════════════════════════════════════════════════════════════════════════
# ORDERS
# ═══════════════════════════════════════════════════════════════════════════
def fetch_and_save_orders(tenant_conn, date_from: Optional[str] = None) -> dict:
"""Pull orders from ML and upsert into marketplace_orders."""
cfg = get_meli_config(tenant_conn)
svc = _get_meli_service(cfg)
user_id = cfg.get("meli_user_id")
if not svc or not user_id:
raise ValueError("MercadoLibre not configured")
ml_resp = svc.get_orders(user_id, status="paid", date_from=date_from)
orders = ml_resp.get("results", [])
cur = tenant_conn.cursor()
created = 0
updated = 0
for order_summary in orders:
order_id = order_summary.get("id")
if not order_id:
continue
# Fetch full order detail
try:
full = svc.get_order(str(order_id))
except MeliError:
continue
external_status = full.get("status")
buyer = full.get("buyer", {})
shipping = full.get("shipping", {})
order_items = full.get("order_items", [])
# Build shipping address JSON
shipping_address = None
if shipping:
shipping_address = {
"id": shipping.get("id"),
"status": shipping.get("status"),
"tracking_number": shipping.get("tracking_number"),
"shipping_method": shipping.get("shipping_option", {}).get("name"),
}
total_amount = full.get("total_amount")
shipping_cost = shipping.get("cost") if shipping else None
# Upsert order
cur.execute(
"""
INSERT INTO marketplace_orders
(channel, external_order_id, external_status, buyer_name,
buyer_email, buyer_phone, buyer_nickname, shipping_address,
total_amount, shipping_cost, meli_shipping_id, raw_json, updated_at)
VALUES ('mercadolibre', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (external_order_id) DO UPDATE SET
external_status = EXCLUDED.external_status,
buyer_name = EXCLUDED.buyer_name,
buyer_email = EXCLUDED.buyer_email,
buyer_phone = EXCLUDED.buyer_phone,
shipping_address = EXCLUDED.shipping_address,
total_amount = EXCLUDED.total_amount,
shipping_cost = EXCLUDED.shipping_cost,
meli_shipping_id = EXCLUDED.meli_shipping_id,
raw_json = EXCLUDED.raw_json,
updated_at = NOW()
RETURNING id
""",
(
str(order_id),
external_status,
buyer.get("first_name", "") + " " + buyer.get("last_name", ""),
buyer.get("email"),
buyer.get("phone", {}).get("number"),
buyer.get("nickname"),
json.dumps(shipping_address) if shipping_address else None,
total_amount,
shipping_cost,
str(shipping.get("id")) if shipping else None,
json.dumps(full),
),
)
row = cur.fetchone()
mpo_id = row[0] if row else None
if mpo_id:
# Check if this was insert or update
cur.execute(
"SELECT created_at FROM marketplace_orders WHERE id = %s",
(mpo_id,),
)
created_at = cur.fetchone()[0]
# Simple heuristic: if updated_at == created_at (within 1s), it's new
is_new = True # ON CONFLICT always returns id; we count as updated for simplicity
updated += 1
# Upsert order items
if mpo_id and order_items:
# Clear old items and re-insert
cur.execute(
"DELETE FROM marketplace_order_items WHERE marketplace_order_id = %s",
(mpo_id,),
)
for it in order_items:
item_data = it.get("item", {})
cur.execute(
"""
INSERT INTO marketplace_order_items
(marketplace_order_id, external_item_id, title,
quantity, unit_price, total_price)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
mpo_id,
item_data.get("id"),
item_data.get("title"),
it.get("quantity"),
it.get("unit_price"),
it.get("full_unit_price"),
),
)
tenant_conn.commit()
cur.close()
return {"processed": len(orders), "updated": updated}
def get_orders(tenant_conn, page=1, per_page=50, status=None):
cur = tenant_conn.cursor()
where = ["1=1"]
params = []
if status:
where.append("status = %s")
params.append(status)
cur.execute(
f"SELECT COUNT(*) FROM marketplace_orders WHERE {' AND '.join(where)}",
params,
)
total = cur.fetchone()[0]
cur.execute(
f"""
SELECT o.id, o.external_order_id, o.external_status, o.buyer_name,
o.buyer_nickname, o.total_amount, o.status, o.created_at,
o.nexus_sale_id
FROM marketplace_orders o
WHERE {' AND '.join(where)}
ORDER BY o.created_at DESC
LIMIT %s OFFSET %s
""",
params + [per_page, (page - 1) * per_page],
)
rows = cur.fetchall()
cur.close()
items = []
for r in rows:
items.append({
"id": r[0],
"external_order_id": r[1],
"external_status": r[2],
"buyer_name": r[3],
"buyer_nickname": r[4],
"total_amount": float(r[5]) if r[5] else None,
"status": r[6],
"created_at": str(r[7]),
"nexus_sale_id": r[8],
})
return {"items": items, "total": total, "page": page, "per_page": per_page}
def get_order_detail(tenant_conn, order_id: int) -> dict:
cur = tenant_conn.cursor()
cur.execute(
"""
SELECT id, external_order_id, external_status, buyer_name, buyer_email,
buyer_phone, buyer_nickname, shipping_address, total_amount,
shipping_cost, meli_shipping_id, nexus_sale_id, status, notes,
raw_json, created_at, updated_at
FROM marketplace_orders WHERE id = %s
""",
(order_id,),
)
row = cur.fetchone()
if not row:
cur.close()
raise ValueError("Order not found")
order = {
"id": row[0],
"external_order_id": row[1],
"external_status": row[2],
"buyer_name": row[3],
"buyer_email": row[4],
"buyer_phone": row[5],
"buyer_nickname": row[6],
"shipping_address": row[7],
"total_amount": float(row[8]) if row[8] else None,
"shipping_cost": float(row[9]) if row[9] else None,
"meli_shipping_id": row[10],
"nexus_sale_id": row[11],
"status": row[12],
"notes": row[13],
"raw_json": row[14],
"created_at": str(row[15]),
"updated_at": str(row[16]),
}
cur.execute(
"""
SELECT id, inventory_id, external_item_id, title, quantity,
unit_price, total_price
FROM marketplace_order_items WHERE marketplace_order_id = %s
""",
(order_id,),
)
items = []
for r in cur.fetchall():
items.append({
"id": r[0],
"inventory_id": r[1],
"external_item_id": r[2],
"title": r[3],
"quantity": r[4],
"unit_price": float(r[5]) if r[5] else None,
"total_price": float(r[6]) if r[6] else None,
})
order["items"] = items
cur.close()
return order
def update_order_status(tenant_conn, order_id: int, new_status: str) -> dict:
valid = {"pending", "confirmed", "packed", "shipped", "delivered", "cancelled", "rejected"}
if new_status not in valid:
raise ValueError(f"Invalid status. Allowed: {valid}")
cur = tenant_conn.cursor()
cur.execute(
"UPDATE marketplace_orders SET status = %s, updated_at = NOW() WHERE id = %s RETURNING external_order_id",
(new_status, order_id),
)
row = cur.fetchone()
tenant_conn.commit()
cur.close()
if not row:
raise ValueError("Order not found")
return {"ok": True, "status": new_status, "external_order_id": row[0]}
# ═══════════════════════════════════════════════════════════════════════════
# CONVERT ORDER → SALE
# ═══════════════════════════════════════════════════════════════════════════
def convert_order_to_sale(
tenant_conn, marketplace_order_id: int, employee_id: int = None, register_id: int = None
) -> dict:
"""Convert a marketplace order into a Nexus sale.
1. Look up marketplace_order + items
2. Map items to inventory_id (via marketplace_listings external_item_id)
3. Build sale_data compatible with process_sale()
4. Call process_sale()
5. Link sale back to marketplace_order
"""
order = get_order_detail(tenant_conn, marketplace_order_id)
if order.get("nexus_sale_id"):
raise ValueError("Order already converted to sale")
cur = tenant_conn.cursor()
# Build sale items
sale_items = []
for it in order["items"]:
# Map external_item_id -> inventory_id via marketplace_listings
cur.execute(
"SELECT inventory_id FROM marketplace_listings WHERE external_item_id = %s LIMIT 1",
(it["external_item_id"],),
)
inv_row = cur.fetchone()
inventory_id = inv_row[0] if inv_row else None
if not inventory_id:
# Try to match by title fuzzy? Skip for now.
cur.close()
raise ValueError(f"Could not map item {it['external_item_id']} to inventory")
sale_items.append({
"inventory_id": inventory_id,
"quantity": it["quantity"],
"unit_price": float(it["unit_price"] or 0),
"discount_pct": 0,
"tax_rate": 0.16,
})
# Find or create generic "MercadoLibre" customer
cur.execute(
"SELECT id FROM customers WHERE name = 'MercadoLibre' LIMIT 1"
)
cust = cur.fetchone()
if cust:
customer_id = cust[0]
else:
cur.execute(
"""
INSERT INTO customers (name, email, phone, is_active, price_tier)
VALUES ('MercadoLibre', 'marketplace@mercadolibre.com', '', true, 1)
RETURNING id
"""
)
customer_id = cur.fetchone()[0]
# Build sale_data
sale_data = {
"items": sale_items,
"customer_id": customer_id,
"payment_method": "transferencia",
"sale_type": "cash",
"register_id": register_id,
"amount_paid": float(order["total_amount"] or 0),
"notes": f"MercadoLibre order #{order['external_order_id']}",
}
# We need to run process_sale inside the same connection.
# process_sale expects the caller to commit. We'll do that.
# However, process_sale uses flask.g for employee_id and branch_id.
# We need to set those or pass them explicitly.
# For now, we'll create the sale manually to avoid flask.g dependency.
sale = _create_sale_manual(tenant_conn, sale_data, employee_id=employee_id)
# Link order to sale
cur.execute(
"UPDATE marketplace_orders SET nexus_sale_id = %s, status = 'confirmed', updated_at = NOW() WHERE id = %s",
(sale["id"], marketplace_order_id),
)
tenant_conn.commit()
cur.close()
return {"sale_id": sale["id"], "marketplace_order_id": marketplace_order_id}
def _create_sale_manual(tenant_conn, sale_data: dict, employee_id: int = None) -> dict:
"""Create a sale record without relying on flask.g.
Simplified version of process_sale for background / external use.
"""
from services.inventory_engine import record_sale as inventory_record_sale
from decimal import Decimal, ROUND_HALF_UP
cur = tenant_conn.cursor()
items = sale_data.get("items", [])
customer_id = sale_data.get("customer_id")
payment_method = sale_data.get("payment_method", "efectivo")
sale_type = sale_data.get("sale_type", "cash")
register_id = sale_data.get("register_id")
amount_paid = float(sale_data.get("amount_paid", 0))
notes = sale_data.get("notes")
if not items:
raise ValueError("No items in sale")
# Enrich items
inv_ids = [it["inventory_id"] for it in items]
cur.execute(
"""
SELECT id, part_number, name, cost, price_1, tax_rate
FROM inventory WHERE id = ANY(%s) AND is_active = true
ORDER BY id FOR UPDATE
""",
(inv_ids,),
)
inv_map = {r[0]: r for r in cur.fetchall()}
enriched = []
for it in items:
inv_id = it["inventory_id"]
inv = inv_map.get(inv_id)
if not inv:
raise ValueError(f"Inventory item {inv_id} not found")
qty = int(it.get("quantity", 1))
unit_price = float(it.get("unit_price", inv[4] or 0))
discount_pct = float(it.get("discount_pct", 0))
tax_rate = float(it.get("tax_rate", inv[5] or 0.16))
unit_cost = float(inv[3]) if inv[3] else 0
enriched.append({
"inventory_id": inv_id,
"part_number": inv[1],
"name": inv[2],
"quantity": qty,
"unit_price": unit_price,
"unit_cost": unit_cost,
"discount_pct": discount_pct,
"tax_rate": tax_rate,
})
totals = calculate_totals(enriched)
# Insert sale
cur.execute(
"""
INSERT INTO sales
(customer_id, employee_id, register_id, sale_type, payment_method,
subtotal, discount_total, tax_total, total, amount_paid, change_given,
status, notes, source, external_order_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'completed', %s, 'mercadolibre', %s)
RETURNING id
""",
(
customer_id,
employee_id,
register_id,
sale_type,
payment_method,
totals["subtotal"],
totals["discount_total"],
totals["tax_total"],
totals["total"],
amount_paid,
0,
notes,
sale_data.get("external_order_id"),
),
)
sale_id = cur.fetchone()[0]
# Insert sale_items
for it in totals["items"]:
cur.execute(
"""
INSERT INTO sale_items
(sale_id, inventory_id, part_number, name, quantity, unit_price,
unit_cost, discount_pct, discount_amount, tax_rate, tax_amount, subtotal)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
sale_id,
it["inventory_id"],
it["part_number"],
it["name"],
it["quantity"],
it["unit_price"],
it.get("unit_cost", 0),
it.get("discount_pct", 0),
it.get("discount_amount", 0),
it.get("tax_rate", 0.16),
it.get("tax_amount", 0),
it["subtotal"],
),
)
# Deduct inventory
for it in enriched:
inventory_record_sale(
tenant_conn, it["inventory_id"], it["quantity"], reference=f"ML sale {sale_id}"
)
tenant_conn.commit()
cur.close()
return {"id": sale_id, **totals}