Files
Autoparts-DB/scripts/import_pdf_catalog.py
consultoria-as a236187f3a feat: MercadoLibre integration + inventory bulk publish + WhatsApp bridge fixes
- Add MercadoLibre OAuth, listings, orders, webhooks and category search
- New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py
- New marketplace_external.html/js with ML management UI
- Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors
- Inventory: new .btn--meli styles, select/label CSS fixes
- WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog
- DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue
- Add Celery tasks for ML sync and webhook processing
- Sidebar: MercadoLibre navigation link
2026-05-26 04:24:07 +00:00

440 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Import aftermarket parts catalog from PDF into Nexus Autoparts DB.
Usage:
# Extract and preview (generates CSV for review)
python3 scripts/import_pdf_catalog.py extract catalogo_bosch.pdf "BOSCH" --output bosch_preview.csv
# Import after reviewing CSV
python3 scripts/import_pdf_catalog.py import bosch_preview.csv "BOSCH"
The CSV should have columns:
part_number, name, price_usd, applications
Applications column (optional): comma-separated vehicle descriptions like:
"TOYOTA COROLLA 2015-2020, NISSAN SENTRA 2016-2019"
If applications is empty, the part will be created but not linked to vehicles.
"""
import os
import sys
import re
import csv
import json
import argparse
import subprocess
import psycopg2
from pathlib import Path
# Add parent to path for config imports
sys.path.insert(0, str(Path(__file__).parent.parent / "pos"))
MASTER_DB_URL = os.environ.get("MASTER_DB_URL", "postgresql://postgres@localhost/nexus_autoparts")
def get_db_conn():
return psycopg2.connect(MASTER_DB_URL)
def pdf_to_text(pdf_path):
"""Extract text from PDF using pdftotext (preserves layout)."""
result = subprocess.run(
["pdftotext", "-layout", pdf_path, "-"],
capture_output=True, text=True
)
if result.returncode != 0:
raise RuntimeError(f"pdftotext failed: {result.stderr}")
return result.stdout
def extract_lines_fuzzy(text, min_cols=2):
"""
Heuristic table extractor.
Looks for lines that have:
- A part number pattern (alphanumeric with dashes/slashes, 3+ chars)
- Some description text
Returns list of dicts with raw columns.
"""
rows = []
lines = text.splitlines()
# Part number patterns: BOSCH 0 986 AF1 041, MOOG K80001, NGK BKR6E, etc.
part_number_patterns = [
re.compile(r'\b[0-9A-Z]{3,}(?:[-\s/][0-9A-Z]+){1,}\b'), # codes with separators
re.compile(r'\b[A-Z]{1,3}\d{3,}[A-Z0-9]*\b'), # MOOG K80001, NGK BKR6E
re.compile(r'\b\d{3,}[A-Z]{1,3}\d+\b'), # 123ABC45
]
for line in lines:
line = line.strip()
if len(line) < 10:
continue
# Try to find a part number
part_number = None
for pat in part_number_patterns:
m = pat.search(line)
if m:
part_number = m.group(0).strip()
break
if not part_number:
continue
# Split line by 2+ spaces to get columns
cols = [c.strip() for c in re.split(r'\s{2,}', line) if c.strip()]
if len(cols) < min_cols:
continue
# Heuristic: part number is usually first or second column
# The rest is description, possibly with price at the end
name_parts = []
price = None
for col in cols:
if col == part_number:
continue
# Price detection
price_m = re.match(r'^\$?([0-9]{1,6}(?:\.[0-9]{1,2})?)$', col.replace(',', ''))
if price_m and not price:
price = float(price_m.group(1))
continue
name_parts.append(col)
name = ' '.join(name_parts) if name_parts else part_number
# Clean up name
name = re.sub(r'\s+', ' ', name).strip()
if len(name) < 3:
name = part_number
rows.append({
'part_number': part_number,
'name': name,
'price_usd': price,
'applications': '',
'raw': line,
})
return rows
def preview_rows(rows, limit=20):
print(f"\nExtracted {len(rows)} candidate rows. First {limit}:")
print("-" * 100)
for i, r in enumerate(rows[:limit]):
print(f"{i+1}. PN: {r['part_number'][:30]:30s} | Name: {r['name'][:50]:50s} | Price: {r['price_usd']}")
print("-" * 100)
def save_csv(rows, path):
with open(path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=['part_number', 'name', 'price_usd', 'applications'])
writer.writeheader()
for r in rows:
writer.writerow({
'part_number': r['part_number'],
'name': r['name'],
'price_usd': r['price_usd'] or '',
'applications': r['applications'],
})
print(f"Saved preview to {path}")
def load_csv(path):
rows = []
with open(path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
price = row.get('price_usd', '')
try:
price = float(price) if price else None
except ValueError:
price = None
rows.append({
'part_number': row.get('part_number', '').strip(),
'name': row.get('name', '').strip(),
'price_usd': price,
'applications': row.get('applications', '').strip(),
})
return rows
def resolve_manufacturer(cur, name):
"""Get or create manufacturer. Returns id_manufacture."""
cur.execute(
"SELECT id_manufacture FROM manufacturers WHERE UPPER(name_manufacture) = UPPER(%s)",
(name,)
)
row = cur.fetchone()
if row:
return row[0]
# Insert new manufacturer
cur.execute(
"INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture",
(name.upper() if len(name) <= 6 else name,)
)
return cur.fetchone()[0]
def resolve_or_create_part(cur, oem_part_number, name):
"""
parts.oem_part_number has UNIQUE index.
If it exists, return id_part. If not, insert.
"""
cur.execute(
"SELECT id_part, name_part FROM parts WHERE oem_part_number = %s",
(oem_part_number,)
)
row = cur.fetchone()
if row:
return row[0]
# Need a group_id. Use 'General' group as default.
cur.execute("SELECT id_part_group FROM part_groups WHERE name_part_group = 'General' LIMIT 1")
grow = cur.fetchone()
group_id = grow[0] if grow else None
cur.execute(
"""
INSERT INTO parts (oem_part_number, name_part, group_id)
VALUES (%s, %s, %s)
RETURNING id_part
""",
(oem_part_number, name, group_id)
)
return cur.fetchone()[0]
def parse_applications(app_text):
"""
Parse text like 'TOYOTA COROLLA 2015-2020, NISSAN SENTRA 2016-2019'
into list of (brand, model, year_from, year_to).
"""
if not app_text:
return []
results = []
# Split by commas or slashes
entries = re.split(r'[,;/]', app_text)
for entry in entries:
entry = entry.strip()
if not entry:
continue
# Pattern: BRAND MODEL YEAR-YEAR or BRAND MODEL YEAR
m = re.match(
r'^([A-Z][A-Z\s]{1,20}?)\s+([A-Z0-9][A-Z0-9\s\-_]{1,30}?)\s+(\d{4})(?:\s*-\s*(\d{4}))?$',
entry.upper().strip()
)
if m:
brand = m.group(1).strip()
model = m.group(2).strip()
year_from = int(m.group(3))
year_to = int(m.group(4)) if m.group(4) else year_from
results.append((brand, model, year_from, year_to))
else:
# Try looser pattern: just BRAND MODEL
m2 = re.match(r'^([A-Z][A-Z\s]{1,20}?)\s+([A-Z0-9][A-Z0-9\s\-_]{1,30})$', entry.upper().strip())
if m2:
results.append((m2.group(1).strip(), m2.group(2).strip(), None, None))
return results
def resolve_mye_ids(cur, brand_name, model_name, year_from, year_to):
"""Find MYE ids matching brand/model/year range."""
myes = []
# Find brand
cur.execute("SELECT id_brand FROM brands WHERE UPPER(name_brand) = UPPER(%s)", (brand_name,))
brow = cur.fetchone()
if not brow:
return myes
brand_id = brow[0]
# Find model (fuzzy)
cur.execute(
"""
SELECT id_model, name_model FROM models
WHERE brand_id = %s AND UPPER(name_model) LIKE UPPER(%s)
ORDER BY name_model
LIMIT 5
""",
(brand_id, f"%{model_name}%")
)
models = cur.fetchall()
if not models:
return myes
# Use first match
model_id = models[0][0]
# Find MYEs for year range
if year_from and year_to:
cur.execute(
"""
SELECT mye.id_mye FROM model_year_engine mye
JOIN years y ON y.id_year = mye.year_id
WHERE mye.model_id = %s AND y.year_car BETWEEN %s AND %s
""",
(model_id, year_from, year_to)
)
elif year_from:
cur.execute(
"""
SELECT mye.id_mye FROM model_year_engine mye
JOIN years y ON y.id_year = mye.year_id
WHERE mye.model_id = %s AND y.year_car = %s
""",
(model_id, year_from)
)
else:
cur.execute(
"SELECT id_mye FROM model_year_engine WHERE model_id = %s",
(model_id,)
)
myes = [r[0] for r in cur.fetchall()]
return myes
def import_rows(rows, manufacturer_name, dry_run=False):
conn = get_db_conn()
cur = conn.cursor()
try:
manufacturer_id = resolve_manufacturer(cur, manufacturer_name)
print(f"Manufacturer '{manufacturer_name}' → id={manufacturer_id}")
inserted_parts = 0
inserted_am = 0
linked_vehicles = 0
skipped = 0
for i, row in enumerate(rows):
pn = row['part_number']
name = row['name'] or pn
price = row['price_usd']
if not pn:
skipped += 1
continue
if dry_run:
print(f" [DRY] {pn} | {name[:40]} | ${price}")
continue
# 1. Ensure part exists in parts table
part_id = resolve_or_create_part(cur, pn, name)
# 2. Insert/upsert aftermarket_parts
cur.execute(
"""
SELECT id_aftermarket_parts FROM aftermarket_parts
WHERE part_number = %s AND manufacturer_id = %s
""",
(pn, manufacturer_id)
)
existing = cur.fetchone()
if existing:
# Update
cur.execute(
"""
UPDATE aftermarket_parts
SET name_aftermarket_parts = %s,
price_usd = COALESCE(%s, price_usd),
oem_part_id = %s
WHERE id_aftermarket_parts = %s
""",
(name, price, part_id, existing[0])
)
else:
cur.execute(
"""
INSERT INTO aftermarket_parts
(oem_part_id, manufacturer_id, part_number, name_aftermarket_parts, price_usd)
VALUES (%s, %s, %s, %s, %s)
""",
(part_id, manufacturer_id, pn, name, price)
)
inserted_am += 1
inserted_parts += 1
# 3. Link vehicles if applications provided
apps = row.get('applications', '')
if apps:
parsed = parse_applications(apps)
for brand, model, yf, yt in parsed:
myes = resolve_mye_ids(cur, brand, model, yf, yt)
for mye_id in myes:
cur.execute(
"""
INSERT INTO vehicle_parts (part_id, model_year_engine_id)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
""",
(part_id, mye_id)
)
linked_vehicles += 1
if (i + 1) % 100 == 0:
print(f" ... processed {i+1}/{len(rows)}")
conn.commit()
print(f"\nDone!")
print(f" Parts processed: {inserted_parts}")
print(f" Aftermarket parts inserted/updated: {inserted_am}")
print(f" Vehicle links created: {linked_vehicles}")
print(f" Skipped (no PN): {skipped}")
except Exception as e:
conn.rollback()
raise
finally:
cur.close()
conn.close()
def main():
parser = argparse.ArgumentParser(description='Import aftermarket catalog from PDF')
subparsers = parser.add_subparsers(dest='command')
# Extract command
ext = subparsers.add_parser('extract', help='Extract PDF to preview CSV')
ext.add_argument('pdf', help='Path to PDF file')
ext.add_argument('manufacturer', help='Manufacturer name')
ext.add_argument('--output', '-o', default='catalog_preview.csv', help='Output CSV path')
# Import command
imp = subparsers.add_parser('import', help='Import reviewed CSV to DB')
imp.add_argument('csv', help='Path to reviewed CSV')
imp.add_argument('manufacturer', help='Manufacturer name')
imp.add_argument('--dry-run', action='store_true', help='Preview without writing to DB')
args = parser.parse_args()
if args.command == 'extract':
print(f"Extracting {args.pdf}...")
text = pdf_to_text(args.pdf)
rows = extract_lines_fuzzy(text)
preview_rows(rows)
save_csv(rows, args.output)
print(f"\nNext step: Review {args.output}, add 'applications' column if needed,")
print(f"then run: python3 scripts/import_pdf_catalog.py import {args.output} '{args.manufacturer}'")
elif args.command == 'import':
rows = load_csv(args.csv)
print(f"Loaded {len(rows)} rows from {args.csv}")
import_rows(rows, args.manufacturer, dry_run=args.dry_run)
else:
parser.print_help()
if __name__ == '__main__':
main()