- Add MercadoLibre OAuth, listings, orders, webhooks and category search - New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py - New marketplace_external.html/js with ML management UI - Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors - Inventory: new .btn--meli styles, select/label CSS fixes - WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog - DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue - Add Celery tasks for ML sync and webhook processing - Sidebar: MercadoLibre navigation link
322 lines
11 KiB
Python
322 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Nexus Autoparts — Sync ALL Strada inventory to marketplace (backfill 133k items).
|
|
|
|
Extends warehouse_inventory with seller listings for parts that don't match
|
|
the OEM catalog. Items that DO match keep their part_id; unmatched items
|
|
get part_id=NULL with seller_part_number / seller_part_name populated.
|
|
|
|
Usage:
|
|
export MASTER_DB_URL="postgresql://user:pass@localhost/nexus_autoparts"
|
|
export TENANT_DB_URL_TEMPLATE="postgresql://user:pass@localhost/{db_name}"
|
|
python3 sync_estrada_marketplace_full.py --tenant=28 --bodega=7 --branch=1
|
|
|
|
Safe to re-run: uses UPSERT semantics.
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import io
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
|
|
import psycopg2
|
|
from tenant_db import get_tenant_conn
|
|
|
|
|
|
BATCH_SIZE = 5000
|
|
|
|
|
|
def get_master_conn():
|
|
from config import MASTER_DB_URL
|
|
return psycopg2.connect(MASTER_DB_URL)
|
|
|
|
|
|
def load_catalog_maps(master_conn):
|
|
"""Pre-load OEM part_number → part_id and cross-reference maps."""
|
|
cur = master_conn.cursor()
|
|
|
|
print("[1/5] Loading OEM catalog...")
|
|
cur.execute("SELECT id_part, oem_part_number FROM parts")
|
|
oem_map = {}
|
|
for row in cur:
|
|
pn = row[1].strip() if row[1] else ''
|
|
if pn:
|
|
oem_map[pn] = row[0]
|
|
|
|
print("[2/5] Loading cross-references...")
|
|
cur.execute("SELECT cross_reference_number, part_id FROM part_cross_references")
|
|
cross_map = {}
|
|
for row in cur:
|
|
pn = row[0].strip() if row[0] else ''
|
|
if pn:
|
|
cross_map[pn] = row[1]
|
|
|
|
cur.close()
|
|
print(f" OEM parts: {len(oem_map):,} | Cross-references: {len(cross_map):,}")
|
|
return oem_map, cross_map
|
|
|
|
|
|
def read_tenant_inventory(tenant_conn, branch_id):
|
|
"""Read all active inventory items from tenant DB."""
|
|
cur = tenant_conn.cursor()
|
|
print("[3/5] Reading tenant inventory...")
|
|
|
|
# Join with categories to get category name
|
|
cur.execute("""
|
|
SELECT
|
|
i.id,
|
|
i.part_number,
|
|
i.name,
|
|
i.price_1,
|
|
COALESCE(iss.stock, 0) AS stock,
|
|
c.name AS category_name
|
|
FROM inventory i
|
|
LEFT JOIN inventory_stock_summary iss ON iss.inventory_id = i.id
|
|
LEFT JOIN part_categories c ON c.id = i.category_id
|
|
WHERE i.is_active = true
|
|
AND (i.branch_id = %s OR %s IS NULL)
|
|
ORDER BY i.id
|
|
""", (branch_id, branch_id))
|
|
|
|
items = []
|
|
for row in cur:
|
|
items.append({
|
|
'id': row[0],
|
|
'part_number': (row[1] or '').strip(),
|
|
'name': (row[2] or '').strip(),
|
|
'price': float(row[3] or 0),
|
|
'stock': int(row[4] or 0),
|
|
'category': (row[5] or '').strip(),
|
|
})
|
|
|
|
cur.close()
|
|
print(f" Tenant items: {len(items):,}")
|
|
return items
|
|
|
|
|
|
def classify_items(items, oem_map, cross_map):
|
|
"""Split items into OEM-matched and seller listings."""
|
|
matched = []
|
|
seller = []
|
|
|
|
for it in items:
|
|
pn = it['part_number']
|
|
if not pn:
|
|
continue
|
|
|
|
# Try exact match on OEM or cross-reference
|
|
part_id = oem_map.get(pn)
|
|
if not part_id:
|
|
# Try without brand prefix (e.g. "4S-86050" → "86050")
|
|
raw = pn.split('-', 1)[1] if '-' in pn else pn
|
|
part_id = oem_map.get(raw) or oem_map.get(pn) or cross_map.get(raw) or cross_map.get(pn)
|
|
|
|
if part_id:
|
|
matched.append({**it, 'part_id': part_id})
|
|
else:
|
|
seller.append(it)
|
|
|
|
print(f" Matched (OEM): {len(matched):,} | Seller listings: {len(seller):,}")
|
|
return matched, seller
|
|
|
|
|
|
def sync_to_warehouse(master_conn, bodega_id, matched, seller):
|
|
"""Bulk upsert into warehouse_inventory using COPY."""
|
|
cur = master_conn.cursor()
|
|
|
|
# Create temp table matching warehouse_inventory structure
|
|
cur.execute("""
|
|
CREATE TEMP TABLE tmp_wi (
|
|
user_id INT,
|
|
part_id INT,
|
|
seller_part_number VARCHAR(100),
|
|
seller_part_name VARCHAR(300),
|
|
seller_category VARCHAR(100),
|
|
tenant_inventory_id INT,
|
|
price NUMERIC(12,2),
|
|
stock_quantity INT,
|
|
min_order_quantity INT DEFAULT 1,
|
|
warehouse_location VARCHAR(100) DEFAULT 'Principal',
|
|
bodega_id INT,
|
|
currency VARCHAR(3) DEFAULT 'MXN'
|
|
) ON COMMIT DROP
|
|
""")
|
|
|
|
print("[4/5] Preparing batches...")
|
|
buffer = io.StringIO()
|
|
writer = csv.writer(buffer, lineterminator='\n',
|
|
quoting=csv.QUOTE_MINIMAL)
|
|
|
|
total = 0
|
|
for it in matched + seller:
|
|
part_id = it.get('part_id')
|
|
seller_pn = None if part_id else it['part_number']
|
|
seller_name = None if part_id else (it['name'] or it['part_number'])
|
|
seller_cat = None if part_id else it['category']
|
|
|
|
writer.writerow([
|
|
1, # user_id (legacy FK, must match existing rows for bodega 7)
|
|
part_id, # part_id (NULL for seller listings)
|
|
seller_pn, # seller_part_number
|
|
seller_name, # seller_part_name
|
|
seller_cat, # seller_category
|
|
it['id'], # tenant_inventory_id
|
|
it['price'], # price
|
|
max(0, it['stock']), # stock_quantity
|
|
1, # min_order_quantity
|
|
'Principal', # warehouse_location
|
|
bodega_id, # bodega_id
|
|
'MXN', # currency
|
|
])
|
|
total += 1
|
|
|
|
if total % BATCH_SIZE == 0:
|
|
buffer.seek(0)
|
|
cur.copy_expert("""
|
|
COPY tmp_wi (user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency)
|
|
FROM STDIN WITH (FORMAT CSV, NULL '')
|
|
""", buffer)
|
|
buffer = io.StringIO()
|
|
writer = csv.writer(buffer, lineterminator='\n',
|
|
quoting=csv.QUOTE_MINIMAL)
|
|
print(f" Buffered {total:,} rows...")
|
|
|
|
# Final batch
|
|
if buffer.tell() > 0:
|
|
buffer.seek(0)
|
|
cur.copy_expert("""
|
|
COPY tmp_wi (user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency)
|
|
FROM STDIN WITH (FORMAT CSV, NULL '')
|
|
""", buffer)
|
|
|
|
print(f"[5/5] Upserting {total:,} rows into warehouse_inventory...")
|
|
|
|
# --- Update existing OEM-matched rows ---
|
|
cur.execute("""
|
|
UPDATE warehouse_inventory wi
|
|
SET
|
|
price = tmp.price,
|
|
stock_quantity = tmp.stock_quantity,
|
|
user_id = tmp.user_id,
|
|
currency = tmp.currency,
|
|
updated_at = NOW()
|
|
FROM tmp_wi tmp
|
|
WHERE wi.bodega_id = tmp.bodega_id
|
|
AND wi.part_id = tmp.part_id
|
|
AND wi.warehouse_location = tmp.warehouse_location
|
|
AND tmp.part_id IS NOT NULL
|
|
""")
|
|
matched_updated = cur.rowcount
|
|
|
|
# --- Insert new OEM-matched rows ---
|
|
cur.execute("""
|
|
INSERT INTO warehouse_inventory (
|
|
user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency, updated_at
|
|
)
|
|
SELECT
|
|
user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency, NOW()
|
|
FROM tmp_wi tmp
|
|
WHERE part_id IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM warehouse_inventory wi
|
|
WHERE wi.bodega_id = tmp.bodega_id
|
|
AND wi.part_id = tmp.part_id
|
|
AND wi.warehouse_location = tmp.warehouse_location
|
|
)
|
|
""")
|
|
matched_inserted = cur.rowcount
|
|
|
|
# --- Update existing seller listings ---
|
|
cur.execute("""
|
|
UPDATE warehouse_inventory wi
|
|
SET
|
|
price = tmp.price,
|
|
stock_quantity = tmp.stock_quantity,
|
|
seller_part_name = tmp.seller_part_name,
|
|
seller_category = tmp.seller_category,
|
|
user_id = tmp.user_id,
|
|
currency = tmp.currency,
|
|
updated_at = NOW()
|
|
FROM tmp_wi tmp
|
|
WHERE wi.bodega_id = tmp.bodega_id
|
|
AND wi.seller_part_number = tmp.seller_part_number
|
|
AND wi.warehouse_location = tmp.warehouse_location
|
|
AND tmp.part_id IS NULL
|
|
""")
|
|
seller_updated = cur.rowcount
|
|
|
|
# --- Insert new seller listings ---
|
|
cur.execute("""
|
|
INSERT INTO warehouse_inventory (
|
|
user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency, updated_at
|
|
)
|
|
SELECT
|
|
user_id, part_id, seller_part_number, seller_part_name,
|
|
seller_category, tenant_inventory_id, price, stock_quantity,
|
|
min_order_quantity, warehouse_location, bodega_id, currency, NOW()
|
|
FROM tmp_wi tmp
|
|
WHERE part_id IS NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM warehouse_inventory wi
|
|
WHERE wi.bodega_id = tmp.bodega_id
|
|
AND wi.seller_part_number = tmp.seller_part_number
|
|
AND wi.warehouse_location = tmp.warehouse_location
|
|
)
|
|
""")
|
|
seller_inserted = cur.rowcount
|
|
|
|
matched_upserted = matched_updated + matched_inserted
|
|
seller_upserted = seller_updated + seller_inserted
|
|
|
|
master_conn.commit()
|
|
cur.close()
|
|
|
|
print(f"\n✓ Done!")
|
|
print(f" OEM matched upserted: {matched_upserted:,}")
|
|
print(f" Seller listings upserted: {seller_upserted:,}")
|
|
print(f" Total: {matched_upserted + seller_upserted:,}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Sync tenant inventory to marketplace')
|
|
parser.add_argument('--tenant', type=int, required=True, help='Tenant ID')
|
|
parser.add_argument('--bodega', type=int, required=True, help='Bodega ID in master DB')
|
|
parser.add_argument('--branch', type=int, default=1, help='Branch ID filter (default all)')
|
|
args = parser.parse_args()
|
|
|
|
start = time.time()
|
|
print(f"=== Sync tenant {args.tenant} → bodega {args.bodega} ===\n")
|
|
|
|
master_conn = get_master_conn()
|
|
tenant_conn = get_tenant_conn(args.tenant)
|
|
|
|
try:
|
|
oem_map, cross_map = load_catalog_maps(master_conn)
|
|
items = read_tenant_inventory(tenant_conn, args.branch)
|
|
matched, seller = classify_items(items, oem_map, cross_map)
|
|
sync_to_warehouse(master_conn, args.bodega, matched, seller)
|
|
finally:
|
|
tenant_conn.close()
|
|
master_conn.close()
|
|
|
|
elapsed = time.time() - start
|
|
print(f"\nElapsed: {elapsed:.1f}s")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|