Files
Autoparts-DB/scripts/sync_estrada_marketplace_full.py
consultoria-as a236187f3a feat: MercadoLibre integration + inventory bulk publish + WhatsApp bridge fixes
- Add MercadoLibre OAuth, listings, orders, webhooks and category search
- New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py
- New marketplace_external.html/js with ML management UI
- Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors
- Inventory: new .btn--meli styles, select/label CSS fixes
- WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog
- DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue
- Add Celery tasks for ML sync and webhook processing
- Sidebar: MercadoLibre navigation link
2026-05-26 04:24:07 +00:00

322 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Nexus Autoparts — Sync ALL Strada inventory to marketplace (backfill 133k items).
Extends warehouse_inventory with seller listings for parts that don't match
the OEM catalog. Items that DO match keep their part_id; unmatched items
get part_id=NULL with seller_part_number / seller_part_name populated.
Usage:
export MASTER_DB_URL="postgresql://user:pass@localhost/nexus_autoparts"
export TENANT_DB_URL_TEMPLATE="postgresql://user:pass@localhost/{db_name}"
python3 sync_estrada_marketplace_full.py --tenant=28 --bodega=7 --branch=1
Safe to re-run: uses UPSERT semantics.
"""
import argparse
import csv
import io
import os
import sys
import tempfile
import time
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'pos'))
import psycopg2
from tenant_db import get_tenant_conn
BATCH_SIZE = 5000
def get_master_conn():
from config import MASTER_DB_URL
return psycopg2.connect(MASTER_DB_URL)
def load_catalog_maps(master_conn):
"""Pre-load OEM part_number → part_id and cross-reference maps."""
cur = master_conn.cursor()
print("[1/5] Loading OEM catalog...")
cur.execute("SELECT id_part, oem_part_number FROM parts")
oem_map = {}
for row in cur:
pn = row[1].strip() if row[1] else ''
if pn:
oem_map[pn] = row[0]
print("[2/5] Loading cross-references...")
cur.execute("SELECT cross_reference_number, part_id FROM part_cross_references")
cross_map = {}
for row in cur:
pn = row[0].strip() if row[0] else ''
if pn:
cross_map[pn] = row[1]
cur.close()
print(f" OEM parts: {len(oem_map):,} | Cross-references: {len(cross_map):,}")
return oem_map, cross_map
def read_tenant_inventory(tenant_conn, branch_id):
"""Read all active inventory items from tenant DB."""
cur = tenant_conn.cursor()
print("[3/5] Reading tenant inventory...")
# Join with categories to get category name
cur.execute("""
SELECT
i.id,
i.part_number,
i.name,
i.price_1,
COALESCE(iss.stock, 0) AS stock,
c.name AS category_name
FROM inventory i
LEFT JOIN inventory_stock_summary iss ON iss.inventory_id = i.id
LEFT JOIN part_categories c ON c.id = i.category_id
WHERE i.is_active = true
AND (i.branch_id = %s OR %s IS NULL)
ORDER BY i.id
""", (branch_id, branch_id))
items = []
for row in cur:
items.append({
'id': row[0],
'part_number': (row[1] or '').strip(),
'name': (row[2] or '').strip(),
'price': float(row[3] or 0),
'stock': int(row[4] or 0),
'category': (row[5] or '').strip(),
})
cur.close()
print(f" Tenant items: {len(items):,}")
return items
def classify_items(items, oem_map, cross_map):
"""Split items into OEM-matched and seller listings."""
matched = []
seller = []
for it in items:
pn = it['part_number']
if not pn:
continue
# Try exact match on OEM or cross-reference
part_id = oem_map.get(pn)
if not part_id:
# Try without brand prefix (e.g. "4S-86050" → "86050")
raw = pn.split('-', 1)[1] if '-' in pn else pn
part_id = oem_map.get(raw) or oem_map.get(pn) or cross_map.get(raw) or cross_map.get(pn)
if part_id:
matched.append({**it, 'part_id': part_id})
else:
seller.append(it)
print(f" Matched (OEM): {len(matched):,} | Seller listings: {len(seller):,}")
return matched, seller
def sync_to_warehouse(master_conn, bodega_id, matched, seller):
"""Bulk upsert into warehouse_inventory using COPY."""
cur = master_conn.cursor()
# Create temp table matching warehouse_inventory structure
cur.execute("""
CREATE TEMP TABLE tmp_wi (
user_id INT,
part_id INT,
seller_part_number VARCHAR(100),
seller_part_name VARCHAR(300),
seller_category VARCHAR(100),
tenant_inventory_id INT,
price NUMERIC(12,2),
stock_quantity INT,
min_order_quantity INT DEFAULT 1,
warehouse_location VARCHAR(100) DEFAULT 'Principal',
bodega_id INT,
currency VARCHAR(3) DEFAULT 'MXN'
) ON COMMIT DROP
""")
print("[4/5] Preparing batches...")
buffer = io.StringIO()
writer = csv.writer(buffer, lineterminator='\n',
quoting=csv.QUOTE_MINIMAL)
total = 0
for it in matched + seller:
part_id = it.get('part_id')
seller_pn = None if part_id else it['part_number']
seller_name = None if part_id else (it['name'] or it['part_number'])
seller_cat = None if part_id else it['category']
writer.writerow([
1, # user_id (legacy FK, must match existing rows for bodega 7)
part_id, # part_id (NULL for seller listings)
seller_pn, # seller_part_number
seller_name, # seller_part_name
seller_cat, # seller_category
it['id'], # tenant_inventory_id
it['price'], # price
max(0, it['stock']), # stock_quantity
1, # min_order_quantity
'Principal', # warehouse_location
bodega_id, # bodega_id
'MXN', # currency
])
total += 1
if total % BATCH_SIZE == 0:
buffer.seek(0)
cur.copy_expert("""
COPY tmp_wi (user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency)
FROM STDIN WITH (FORMAT CSV, NULL '')
""", buffer)
buffer = io.StringIO()
writer = csv.writer(buffer, lineterminator='\n',
quoting=csv.QUOTE_MINIMAL)
print(f" Buffered {total:,} rows...")
# Final batch
if buffer.tell() > 0:
buffer.seek(0)
cur.copy_expert("""
COPY tmp_wi (user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency)
FROM STDIN WITH (FORMAT CSV, NULL '')
""", buffer)
print(f"[5/5] Upserting {total:,} rows into warehouse_inventory...")
# --- Update existing OEM-matched rows ---
cur.execute("""
UPDATE warehouse_inventory wi
SET
price = tmp.price,
stock_quantity = tmp.stock_quantity,
user_id = tmp.user_id,
currency = tmp.currency,
updated_at = NOW()
FROM tmp_wi tmp
WHERE wi.bodega_id = tmp.bodega_id
AND wi.part_id = tmp.part_id
AND wi.warehouse_location = tmp.warehouse_location
AND tmp.part_id IS NOT NULL
""")
matched_updated = cur.rowcount
# --- Insert new OEM-matched rows ---
cur.execute("""
INSERT INTO warehouse_inventory (
user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency, updated_at
)
SELECT
user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency, NOW()
FROM tmp_wi tmp
WHERE part_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM warehouse_inventory wi
WHERE wi.bodega_id = tmp.bodega_id
AND wi.part_id = tmp.part_id
AND wi.warehouse_location = tmp.warehouse_location
)
""")
matched_inserted = cur.rowcount
# --- Update existing seller listings ---
cur.execute("""
UPDATE warehouse_inventory wi
SET
price = tmp.price,
stock_quantity = tmp.stock_quantity,
seller_part_name = tmp.seller_part_name,
seller_category = tmp.seller_category,
user_id = tmp.user_id,
currency = tmp.currency,
updated_at = NOW()
FROM tmp_wi tmp
WHERE wi.bodega_id = tmp.bodega_id
AND wi.seller_part_number = tmp.seller_part_number
AND wi.warehouse_location = tmp.warehouse_location
AND tmp.part_id IS NULL
""")
seller_updated = cur.rowcount
# --- Insert new seller listings ---
cur.execute("""
INSERT INTO warehouse_inventory (
user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency, updated_at
)
SELECT
user_id, part_id, seller_part_number, seller_part_name,
seller_category, tenant_inventory_id, price, stock_quantity,
min_order_quantity, warehouse_location, bodega_id, currency, NOW()
FROM tmp_wi tmp
WHERE part_id IS NULL
AND NOT EXISTS (
SELECT 1 FROM warehouse_inventory wi
WHERE wi.bodega_id = tmp.bodega_id
AND wi.seller_part_number = tmp.seller_part_number
AND wi.warehouse_location = tmp.warehouse_location
)
""")
seller_inserted = cur.rowcount
matched_upserted = matched_updated + matched_inserted
seller_upserted = seller_updated + seller_inserted
master_conn.commit()
cur.close()
print(f"\n✓ Done!")
print(f" OEM matched upserted: {matched_upserted:,}")
print(f" Seller listings upserted: {seller_upserted:,}")
print(f" Total: {matched_upserted + seller_upserted:,}")
def main():
parser = argparse.ArgumentParser(description='Sync tenant inventory to marketplace')
parser.add_argument('--tenant', type=int, required=True, help='Tenant ID')
parser.add_argument('--bodega', type=int, required=True, help='Bodega ID in master DB')
parser.add_argument('--branch', type=int, default=1, help='Branch ID filter (default all)')
args = parser.parse_args()
start = time.time()
print(f"=== Sync tenant {args.tenant} → bodega {args.bodega} ===\n")
master_conn = get_master_conn()
tenant_conn = get_tenant_conn(args.tenant)
try:
oem_map, cross_map = load_catalog_maps(master_conn)
items = read_tenant_inventory(tenant_conn, args.branch)
matched, seller = classify_items(items, oem_map, cross_map)
sync_to_warehouse(master_conn, args.bodega, matched, seller)
finally:
tenant_conn.close()
master_conn.close()
elapsed = time.time() - start
print(f"\nElapsed: {elapsed:.1f}s")
if __name__ == '__main__':
main()