#!/usr/bin/env python3 """ Import aftermarket parts catalog from PDF into Nexus Autoparts DB. Usage: # Extract and preview (generates CSV for review) python3 scripts/import_pdf_catalog.py extract catalogo_bosch.pdf "BOSCH" --output bosch_preview.csv # Import after reviewing CSV python3 scripts/import_pdf_catalog.py import bosch_preview.csv "BOSCH" The CSV should have columns: part_number, name, price_usd, applications Applications column (optional): comma-separated vehicle descriptions like: "TOYOTA COROLLA 2015-2020, NISSAN SENTRA 2016-2019" If applications is empty, the part will be created but not linked to vehicles. """ import os import sys import re import csv import json import argparse import subprocess import psycopg2 from pathlib import Path # Add parent to path for config imports sys.path.insert(0, str(Path(__file__).parent.parent / "pos")) MASTER_DB_URL = os.environ.get("MASTER_DB_URL", "postgresql://postgres@localhost/nexus_autoparts") def get_db_conn(): return psycopg2.connect(MASTER_DB_URL) def pdf_to_text(pdf_path): """Extract text from PDF using pdftotext (preserves layout).""" result = subprocess.run( ["pdftotext", "-layout", pdf_path, "-"], capture_output=True, text=True ) if result.returncode != 0: raise RuntimeError(f"pdftotext failed: {result.stderr}") return result.stdout def extract_lines_fuzzy(text, min_cols=2): """ Heuristic table extractor. Looks for lines that have: - A part number pattern (alphanumeric with dashes/slashes, 3+ chars) - Some description text Returns list of dicts with raw columns. """ rows = [] lines = text.splitlines() # Part number patterns: BOSCH 0 986 AF1 041, MOOG K80001, NGK BKR6E, etc. part_number_patterns = [ re.compile(r'\b[0-9A-Z]{3,}(?:[-\s/][0-9A-Z]+){1,}\b'), # codes with separators re.compile(r'\b[A-Z]{1,3}\d{3,}[A-Z0-9]*\b'), # MOOG K80001, NGK BKR6E re.compile(r'\b\d{3,}[A-Z]{1,3}\d+\b'), # 123ABC45 ] for line in lines: line = line.strip() if len(line) < 10: continue # Try to find a part number part_number = None for pat in part_number_patterns: m = pat.search(line) if m: part_number = m.group(0).strip() break if not part_number: continue # Split line by 2+ spaces to get columns cols = [c.strip() for c in re.split(r'\s{2,}', line) if c.strip()] if len(cols) < min_cols: continue # Heuristic: part number is usually first or second column # The rest is description, possibly with price at the end name_parts = [] price = None for col in cols: if col == part_number: continue # Price detection price_m = re.match(r'^\$?([0-9]{1,6}(?:\.[0-9]{1,2})?)$', col.replace(',', '')) if price_m and not price: price = float(price_m.group(1)) continue name_parts.append(col) name = ' '.join(name_parts) if name_parts else part_number # Clean up name name = re.sub(r'\s+', ' ', name).strip() if len(name) < 3: name = part_number rows.append({ 'part_number': part_number, 'name': name, 'price_usd': price, 'applications': '', 'raw': line, }) return rows def preview_rows(rows, limit=20): print(f"\nExtracted {len(rows)} candidate rows. First {limit}:") print("-" * 100) for i, r in enumerate(rows[:limit]): print(f"{i+1}. PN: {r['part_number'][:30]:30s} | Name: {r['name'][:50]:50s} | Price: {r['price_usd']}") print("-" * 100) def save_csv(rows, path): with open(path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['part_number', 'name', 'price_usd', 'applications']) writer.writeheader() for r in rows: writer.writerow({ 'part_number': r['part_number'], 'name': r['name'], 'price_usd': r['price_usd'] or '', 'applications': r['applications'], }) print(f"Saved preview to {path}") def load_csv(path): rows = [] with open(path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: price = row.get('price_usd', '') try: price = float(price) if price else None except ValueError: price = None rows.append({ 'part_number': row.get('part_number', '').strip(), 'name': row.get('name', '').strip(), 'price_usd': price, 'applications': row.get('applications', '').strip(), }) return rows def resolve_manufacturer(cur, name): """Get or create manufacturer. Returns id_manufacture.""" cur.execute( "SELECT id_manufacture FROM manufacturers WHERE UPPER(name_manufacture) = UPPER(%s)", (name,) ) row = cur.fetchone() if row: return row[0] # Insert new manufacturer cur.execute( "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", (name.upper() if len(name) <= 6 else name,) ) return cur.fetchone()[0] def resolve_or_create_part(cur, oem_part_number, name): """ parts.oem_part_number has UNIQUE index. If it exists, return id_part. If not, insert. """ cur.execute( "SELECT id_part, name_part FROM parts WHERE oem_part_number = %s", (oem_part_number,) ) row = cur.fetchone() if row: return row[0] # Need a group_id. Use 'General' group as default. cur.execute("SELECT id_part_group FROM part_groups WHERE name_part_group = 'General' LIMIT 1") grow = cur.fetchone() group_id = grow[0] if grow else None cur.execute( """ INSERT INTO parts (oem_part_number, name_part, group_id) VALUES (%s, %s, %s) RETURNING id_part """, (oem_part_number, name, group_id) ) return cur.fetchone()[0] def parse_applications(app_text): """ Parse text like 'TOYOTA COROLLA 2015-2020, NISSAN SENTRA 2016-2019' into list of (brand, model, year_from, year_to). """ if not app_text: return [] results = [] # Split by commas or slashes entries = re.split(r'[,;/]', app_text) for entry in entries: entry = entry.strip() if not entry: continue # Pattern: BRAND MODEL YEAR-YEAR or BRAND MODEL YEAR m = re.match( r'^([A-Z][A-Z\s]{1,20}?)\s+([A-Z0-9][A-Z0-9\s\-_]{1,30}?)\s+(\d{4})(?:\s*-\s*(\d{4}))?$', entry.upper().strip() ) if m: brand = m.group(1).strip() model = m.group(2).strip() year_from = int(m.group(3)) year_to = int(m.group(4)) if m.group(4) else year_from results.append((brand, model, year_from, year_to)) else: # Try looser pattern: just BRAND MODEL m2 = re.match(r'^([A-Z][A-Z\s]{1,20}?)\s+([A-Z0-9][A-Z0-9\s\-_]{1,30})$', entry.upper().strip()) if m2: results.append((m2.group(1).strip(), m2.group(2).strip(), None, None)) return results def resolve_mye_ids(cur, brand_name, model_name, year_from, year_to): """Find MYE ids matching brand/model/year range.""" myes = [] # Find brand cur.execute("SELECT id_brand FROM brands WHERE UPPER(name_brand) = UPPER(%s)", (brand_name,)) brow = cur.fetchone() if not brow: return myes brand_id = brow[0] # Find model (fuzzy) cur.execute( """ SELECT id_model, name_model FROM models WHERE brand_id = %s AND UPPER(name_model) LIKE UPPER(%s) ORDER BY name_model LIMIT 5 """, (brand_id, f"%{model_name}%") ) models = cur.fetchall() if not models: return myes # Use first match model_id = models[0][0] # Find MYEs for year range if year_from and year_to: cur.execute( """ SELECT mye.id_mye FROM model_year_engine mye JOIN years y ON y.id_year = mye.year_id WHERE mye.model_id = %s AND y.year_car BETWEEN %s AND %s """, (model_id, year_from, year_to) ) elif year_from: cur.execute( """ SELECT mye.id_mye FROM model_year_engine mye JOIN years y ON y.id_year = mye.year_id WHERE mye.model_id = %s AND y.year_car = %s """, (model_id, year_from) ) else: cur.execute( "SELECT id_mye FROM model_year_engine WHERE model_id = %s", (model_id,) ) myes = [r[0] for r in cur.fetchall()] return myes def import_rows(rows, manufacturer_name, dry_run=False): conn = get_db_conn() cur = conn.cursor() try: manufacturer_id = resolve_manufacturer(cur, manufacturer_name) print(f"Manufacturer '{manufacturer_name}' → id={manufacturer_id}") inserted_parts = 0 inserted_am = 0 linked_vehicles = 0 skipped = 0 for i, row in enumerate(rows): pn = row['part_number'] name = row['name'] or pn price = row['price_usd'] if not pn: skipped += 1 continue if dry_run: print(f" [DRY] {pn} | {name[:40]} | ${price}") continue # 1. Ensure part exists in parts table part_id = resolve_or_create_part(cur, pn, name) # 2. Insert/upsert aftermarket_parts cur.execute( """ SELECT id_aftermarket_parts FROM aftermarket_parts WHERE part_number = %s AND manufacturer_id = %s """, (pn, manufacturer_id) ) existing = cur.fetchone() if existing: # Update cur.execute( """ UPDATE aftermarket_parts SET name_aftermarket_parts = %s, price_usd = COALESCE(%s, price_usd), oem_part_id = %s WHERE id_aftermarket_parts = %s """, (name, price, part_id, existing[0]) ) else: cur.execute( """ INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts, price_usd) VALUES (%s, %s, %s, %s, %s) """, (part_id, manufacturer_id, pn, name, price) ) inserted_am += 1 inserted_parts += 1 # 3. Link vehicles if applications provided apps = row.get('applications', '') if apps: parsed = parse_applications(apps) for brand, model, yf, yt in parsed: myes = resolve_mye_ids(cur, brand, model, yf, yt) for mye_id in myes: cur.execute( """ INSERT INTO vehicle_parts (part_id, model_year_engine_id) VALUES (%s, %s) ON CONFLICT DO NOTHING """, (part_id, mye_id) ) linked_vehicles += 1 if (i + 1) % 100 == 0: print(f" ... processed {i+1}/{len(rows)}") conn.commit() print(f"\nDone!") print(f" Parts processed: {inserted_parts}") print(f" Aftermarket parts inserted/updated: {inserted_am}") print(f" Vehicle links created: {linked_vehicles}") print(f" Skipped (no PN): {skipped}") except Exception as e: conn.rollback() raise finally: cur.close() conn.close() def main(): parser = argparse.ArgumentParser(description='Import aftermarket catalog from PDF') subparsers = parser.add_subparsers(dest='command') # Extract command ext = subparsers.add_parser('extract', help='Extract PDF to preview CSV') ext.add_argument('pdf', help='Path to PDF file') ext.add_argument('manufacturer', help='Manufacturer name') ext.add_argument('--output', '-o', default='catalog_preview.csv', help='Output CSV path') # Import command imp = subparsers.add_parser('import', help='Import reviewed CSV to DB') imp.add_argument('csv', help='Path to reviewed CSV') imp.add_argument('manufacturer', help='Manufacturer name') imp.add_argument('--dry-run', action='store_true', help='Preview without writing to DB') args = parser.parse_args() if args.command == 'extract': print(f"Extracting {args.pdf}...") text = pdf_to_text(args.pdf) rows = extract_lines_fuzzy(text) preview_rows(rows) save_csv(rows, args.output) print(f"\nNext step: Review {args.output}, add 'applications' column if needed,") print(f"then run: python3 scripts/import_pdf_catalog.py import {args.output} '{args.manufacturer}'") elif args.command == 'import': rows = load_csv(args.csv) print(f"Loaded {len(rows)} rows from {args.csv}") import_rows(rows, args.manufacturer, dry_run=args.dry_run) else: parser.print_help() if __name__ == '__main__': main()