From eff04a5e60c88570ca75ad7527747b2f6c3dece1 Mon Sep 17 00:00:00 2001 From: consultoria-as Date: Wed, 18 Mar 2026 22:25:21 +0000 Subject: [PATCH] fix: stop creating AFT- placeholder parts in import pipeline - import_phase1.py: skip AFT- part creation when no OEM data - link_vehicle_parts.py: remove AFT- fallback lookup in part cache - import_tecdoc_parts.py: add VW to TOP_BRANDS list Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/import_phase1.py | 148 +++++++++ scripts/import_tecdoc_parts.py | 531 +++++++++++++++++++++++++++++++++ scripts/link_vehicle_parts.py | 251 ++++++++++++++++ 3 files changed, 930 insertions(+) create mode 100644 scripts/import_phase1.py create mode 100644 scripts/import_tecdoc_parts.py create mode 100644 scripts/link_vehicle_parts.py diff --git a/scripts/import_phase1.py b/scripts/import_phase1.py new file mode 100644 index 0000000..e4a6e0a --- /dev/null +++ b/scripts/import_phase1.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Quick import of Phase 1 TecDoc article data into PostgreSQL. +Imports aftermarket parts and their vehicle mappings from article list files, +without waiting for OEM detail downloads. +""" + +import json +import psycopg2 +from pathlib import Path + +DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts" +ARTICLES_DIR = Path("/home/Autopartes/data/tecdoc/parts/articles") +DETAILS_DIR = Path("/home/Autopartes/data/tecdoc/parts/details") + +def run(): + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Load category mapping: tecdoc_id → id_part_category + cur.execute("SELECT id_part_category, tecdoc_id FROM part_categories WHERE tecdoc_id IS NOT NULL") + cat_map = {r[1]: r[0] for r in cur.fetchall()} + + # Load existing manufacturers + cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers") + mfr_cache = {r[1]: r[0] for r in cur.fetchall()} + + # Load existing parts by OEM + cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") + part_cache = {r[0]: r[1] for r in cur.fetchall()} + + # Load existing cross-refs to avoid duplicates + cur.execute("SELECT part_id, cross_reference_number, source_ref FROM part_cross_references") + xref_set = {(r[0], r[1], r[2]) for r in cur.fetchall()} + + # Also check detail files for OEM numbers + detail_oem = {} # articleId → list of {oemBrand, oemDisplayNo} + detail_files = list(DETAILS_DIR.glob("*.json")) + print(f"Loading {len(detail_files)} detail files for OEM data...", flush=True) + for f in detail_files: + try: + data = json.loads(f.read_text()) + article = data.get('article', {}) + if article and article.get('oemNo'): + detail_oem[int(f.stem)] = article['oemNo'] + except: + continue + + stats = {'parts': 0, 'xrefs': 0, 'mfrs': 0, 'skipped': 0} + + article_files = sorted(ARTICLES_DIR.glob("*.json")) + print(f"Processing {len(article_files)} article files...", flush=True) + + # Collect all unique articles across all files + all_articles = {} # articleId → article data + category + for f in article_files: + parts = f.stem.split("_") + if len(parts) != 2: + continue + cat_id = int(parts[1]) + cat_db_id = cat_map.get(cat_id) + + try: + articles = json.loads(f.read_text()) + except: + continue + + for a in articles: + aid = a.get('articleId') + if aid and aid not in all_articles: + a['_cat_db_id'] = cat_db_id + a['_cat_td_id'] = cat_id + all_articles[aid] = a + + print(f"Unique articles to process: {len(all_articles):,}", flush=True) + + batch = 0 + for aid, a in all_articles.items(): + article_no = a.get('articleNo', '') + supplier = a.get('supplierName', '') + product_name = a.get('articleProductName', '') + cat_db_id = a.get('_cat_db_id') + + if not article_no or not supplier: + stats['skipped'] += 1 + continue + + # Ensure manufacturer exists + if supplier not in mfr_cache: + cur.execute( + "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", + (supplier,)) + mfr_cache[supplier] = cur.fetchone()[0] + stats['mfrs'] += 1 + + # If we have OEM details for this article, create OEM parts + oem_numbers = detail_oem.get(aid, []) + if oem_numbers: + for oem in oem_numbers: + oem_no = oem.get('oemDisplayNo', '') + oem_brand = oem.get('oemBrand', '') + if not oem_no: + continue + + if oem_no not in part_cache: + cur.execute(""" + INSERT INTO parts (oem_part_number, name_part, description) + VALUES (%s, %s, %s) + ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part + RETURNING id_part + """, (oem_no, product_name, f"OEM {oem_brand}")) + part_cache[oem_no] = cur.fetchone()[0] + stats['parts'] += 1 + + part_id = part_cache[oem_no] + + # Add cross-reference (aftermarket → OEM) + xref_key = (part_id, article_no, supplier) + if xref_key not in xref_set: + cur.execute(""" + INSERT INTO part_cross_references (part_id, cross_reference_number, source_ref) + VALUES (%s, %s, %s) ON CONFLICT DO NOTHING + """, (part_id, article_no, supplier)) + xref_set.add(xref_key) + stats['xrefs'] += 1 + else: + # No OEM data yet - skip, will be imported when detail arrives + pass + + batch += 1 + if batch % 5000 == 0: + conn.commit() + print(f" {batch:,}/{len(all_articles):,} — {stats['parts']:,} parts, {stats['xrefs']:,} xrefs", flush=True) + + conn.commit() + cur.close() + conn.close() + + print(f"\n{'='*50}", flush=True) + print(f"IMPORT COMPLETE", flush=True) + print(f" Parts: {stats['parts']:,}", flush=True) + print(f" Cross-refs: {stats['xrefs']:,}", flush=True) + print(f" Manufacturers: {stats['mfrs']:,}", flush=True) + print(f" Skipped: {stats['skipped']:,}", flush=True) + print(f"{'='*50}", flush=True) + +if __name__ == "__main__": + run() diff --git a/scripts/import_tecdoc_parts.py b/scripts/import_tecdoc_parts.py new file mode 100644 index 0000000..7704723 --- /dev/null +++ b/scripts/import_tecdoc_parts.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +Import OEM parts data from TecDoc (Apify) into Nexus Autoparts PostgreSQL. + +Three-phase approach: + Phase 1: Download categories per vehicle → JSON files + Phase 2: Download article lists per vehicle+category → JSON files + Phase 3: Download article details (OEM numbers) → JSON files + Phase 4: Import all JSON data into PostgreSQL + +Uses one representative vehicleId per TecDoc model to minimize API calls. +Supports concurrent API calls for speed. + +Usage: + python3 scripts/import_tecdoc_parts.py download # Phases 1-3 + python3 scripts/import_tecdoc_parts.py import # Phase 4 + python3 scripts/import_tecdoc_parts.py status # Check progress +""" + +import os +import sys +import json +import time +import argparse +import requests +import psycopg2 +from datetime import datetime +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed + +# --- Config --- +APIFY_TOKEN = os.environ.get("APIFY_TOKEN", "apify_api_l5SrcwYyanAO45AFxrEpviUcuVRIFK2yPdc5") +APIFY_ACTOR = "making-data-meaningful~tecdoc" +APIFY_URL = f"https://api.apify.com/v2/acts/{APIFY_ACTOR}/run-sync-get-dataset-items" +DB_URL = os.environ.get("DATABASE_URL", "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts") + +TYPE_ID = 1 # Passenger cars +LANG_ID = 4 # English +COUNTRY_ID = 153 # Mexico + +DATA_DIR = Path("/home/Autopartes/data/tecdoc") +PARTS_DIR = DATA_DIR / "parts" +ARTICLES_DIR = PARTS_DIR / "articles" # vehicle articles by category +DETAILS_DIR = PARTS_DIR / "details" # article OEM details + +MAX_WORKERS = 30 # Concurrent API calls +APIFY_DELAY = 0.1 # Seconds between API calls per thread + +# Top brands for Mexico & USA +TOP_BRANDS = [ + 'TOYOTA', 'NISSAN', 'CHEVROLET', 'VOLKSWAGEN', 'VW', 'HONDA', 'FORD', + 'HYUNDAI', 'KIA', 'MAZDA', 'BMW', 'MERCEDES-BENZ', 'AUDI', + 'JEEP', 'DODGE', 'CHRYSLER', 'RAM', 'GMC', 'BUICK', 'CADILLAC', + 'SUBARU', 'MITSUBISHI', 'SUZUKI', 'ACURA', 'LEXUS', 'INFINITI', + 'LINCOLN', 'FIAT', 'PEUGEOT', 'RENAULT', 'SEAT' +] + +# Top-level TecDoc category IDs (from our DB) +TOP_CATEGORIES = None # Loaded dynamically + + +def apify_call(input_data, retries=3): + """Call Apify actor and return result.""" + for attempt in range(retries): + try: + resp = requests.post( + APIFY_URL, params={"token": APIFY_TOKEN}, + headers={"Content-Type": "application/json"}, + json=input_data, timeout=180 + ) + if resp.status_code in (200, 201): + data = resp.json() + return data[0] if isinstance(data, list) and data else data + elif resp.status_code == 429: + wait = 30 * (attempt + 1) + print(f" Rate limited, waiting {wait}s...", flush=True) + time.sleep(wait) + else: + print(f" HTTP {resp.status_code}: {resp.text[:200]}", flush=True) + time.sleep(5) + except Exception as e: + print(f" Error: {e}", flush=True) + time.sleep(5) + return None + + +def load_top_categories(): + """Load top-level TecDoc category IDs from database.""" + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + cur.execute("SELECT tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL ORDER BY display_order") + cats = [(r[0], r[1]) for r in cur.fetchall()] + cur.close() + conn.close() + return cats + + +def get_representative_vehicles(): + """Get one representative vehicleId per TecDoc model for top brands.""" + mfrs = json.loads((DATA_DIR / "manufacturers.json").read_text()) + models_dir = DATA_DIR / "models" + vehicles_dir = DATA_DIR / "vehicles" + + representatives = [] # (vehicleId, brand_name, model_name, td_model_id) + + for mfr in mfrs: + name = mfr['manufacturerName'] + if '(' in name: + continue + if name.upper() not in [b.upper() for b in TOP_BRANDS]: + continue + + mfr_id = mfr['manufacturerId'] + model_file = models_dir / f"{mfr_id}.json" + if not model_file.exists(): + continue + + models = json.loads(model_file.read_text()) + for model in models: + td_model_id = model['modelId'] + model_name = model.get('modelName', '') + vehicle_file = vehicles_dir / f"{td_model_id}.json" + if not vehicle_file.exists(): + continue + + vehicles = json.loads(vehicle_file.read_text()) + if not vehicles: + continue + + # Pick the first vehicle with a valid vehicleId as representative + vid = vehicles[0].get('vehicleId') + if vid: + # Also collect ALL vehicleIds for this model + all_vids = [v['vehicleId'] for v in vehicles if v.get('vehicleId')] + representatives.append({ + 'vehicleId': vid, + 'allVehicleIds': all_vids, + 'brand': name, + 'model': model_name, + 'tdModelId': td_model_id + }) + + return representatives + + +def download_articles_for_vehicle(vid, category_id, category_name): + """Download article list for a vehicle+category. Returns article count.""" + outfile = ARTICLES_DIR / f"{vid}_{category_id}.json" + if outfile.exists(): + return 0 # Already downloaded + + time.sleep(APIFY_DELAY) + result = apify_call({ + 'endpoint_partsArticleListByVehicleIdCategoryId': True, + 'parts_vehicleId_18': vid, + 'parts_categoryId_18': category_id, + 'parts_typeId_18': TYPE_ID, + 'parts_langId_18': LANG_ID, + }) + + if result and isinstance(result, dict) and 'articles' in result: + articles = result.get('articles') or [] + outfile.write_text(json.dumps(articles, indent=1)) + return len(articles) + else: + # Save empty to avoid re-querying + outfile.write_text("[]") + return 0 + + +def download_article_detail(article_id): + """Download OEM details for a single article.""" + outfile = DETAILS_DIR / f"{article_id}.json" + if outfile.exists(): + return True + + time.sleep(APIFY_DELAY) + result = apify_call({ + 'endpoint_partsArticleDetailsByArticleId': True, + 'parts_articleId_13': article_id, + 'parts_langId_13': LANG_ID, + }) + + if result and result.get('articleOemNo'): + outfile.write_text(json.dumps(result, indent=1)) + return True + elif result and isinstance(result.get('article'), dict): + outfile.write_text(json.dumps(result, indent=1)) + return True + return False + + +# ──────────────── Download ──────────────── + +def download(brand_filter=None): + """Download all parts data from TecDoc.""" + PARTS_DIR.mkdir(parents=True, exist_ok=True) + ARTICLES_DIR.mkdir(parents=True, exist_ok=True) + DETAILS_DIR.mkdir(parents=True, exist_ok=True) + + categories = load_top_categories() + print(f"Loaded {len(categories)} top-level categories", flush=True) + + representatives = get_representative_vehicles() + if brand_filter: + representatives = [r for r in representatives if brand_filter.upper() in r['brand'].upper()] + print(f"Found {len(representatives)} representative vehicles for top brands", flush=True) + + # Phase 1: Download articles per vehicle+category + total_tasks = len(representatives) * len(categories) + completed = 0 + total_articles = 0 + + print(f"\n{'='*60}", flush=True) + print(f"PHASE 1: Download articles ({total_tasks:,} tasks)", flush=True) + print(f"{'='*60}", flush=True) + + for i, rep in enumerate(representatives): + vid = rep['vehicleId'] + brand = rep['brand'] + model = rep['model'] + + # Check if all categories already downloaded for this vehicle + existing = sum(1 for cat_id, _ in categories + if (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists()) + if existing == len(categories): + completed += len(categories) + continue + + print(f"[{i+1}/{len(representatives)}] {brand} {model} (vid={vid})", flush=True) + + def download_task(args): + vid, cat_id, cat_name = args + return download_articles_for_vehicle(vid, cat_id, cat_name) + + tasks = [(vid, cat_id, cat_name) for cat_id, cat_name in categories + if not (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists()] + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = {executor.submit(download_task, t): t for t in tasks} + for future in as_completed(futures): + try: + count = future.result() + total_articles += count + completed += 1 + except Exception as e: + print(f" Task error: {e}", flush=True) + completed += 1 + + completed += existing # Count pre-existing + + print(f"\nPhase 1 complete: {total_articles:,} articles found", flush=True) + + # Phase 2: Collect unique articleIds and download OEM details + print(f"\n{'='*60}", flush=True) + print(f"PHASE 2: Collect unique articles & download OEM details", flush=True) + print(f"{'='*60}", flush=True) + + unique_articles = set() + for f in ARTICLES_DIR.glob("*.json"): + try: + articles = json.loads(f.read_text()) + for a in articles: + if 'articleId' in a: + unique_articles.add(a['articleId']) + except: + continue + + # Filter out already downloaded + to_download = [aid for aid in unique_articles + if not (DETAILS_DIR / f"{aid}.json").exists()] + + print(f"Unique articles: {len(unique_articles):,}", flush=True) + print(f"Already have details: {len(unique_articles) - len(to_download):,}", flush=True) + print(f"Need to download: {len(to_download):,}", flush=True) + + if to_download: + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = {executor.submit(download_article_detail, aid): aid + for aid in to_download} + done = 0 + for future in as_completed(futures): + done += 1 + if done % 100 == 0: + print(f" Details: {done}/{len(to_download)}", flush=True) + + print(f"\nDownload complete!", flush=True) + + +# ──────────────── Import ──────────────── + +def do_import(): + """Import downloaded parts data into PostgreSQL.""" + if not ARTICLES_DIR.exists(): + print("No articles directory. Run 'download' first.") + return + + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Load category mapping: tecdoc_id → (id_part_category, name) + cur.execute("SELECT id_part_category, tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL") + cat_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()} + + # Load group mapping: tecdoc_id → id_part_group + cur.execute("SELECT id_part_group, tecdoc_id, category_id FROM part_groups WHERE tecdoc_id IS NOT NULL") + group_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()} + + # Load brand mapping from DB + cur.execute("SELECT id_brand, name_brand FROM brands") + brand_db = {r[1].upper(): r[0] for r in cur.fetchall()} + + # Build vehicle mapping: vehicleId → list of MYE ids + representatives = get_representative_vehicles() + + # Build vehicleId → model mapping from our DB + # We need to map TecDoc modelId → our model_id + cur.execute(""" + SELECT m.id_model, b.name_brand, m.name_model, m.id_brand + FROM models m JOIN brands b ON m.id_brand = b.id_brand + """) + db_models = cur.fetchall() + + stats = { + 'parts_inserted': 0, 'parts_existing': 0, + 'vehicle_parts': 0, 'aftermarket': 0, + 'cross_refs': 0, 'manufacturers': 0 + } + + # Process article detail files + detail_files = list(DETAILS_DIR.glob("*.json")) + print(f"Processing {len(detail_files)} article details...", flush=True) + + # Cache for parts by OEM number + oem_cache = {} # oem_no → id_part + + # Cache for manufacturers + mfr_cache = {} # supplier_name → id_manufacture + cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers") + for r in cur.fetchall(): + mfr_cache[r[1]] = r[0] + + # Cache existing parts + cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") + for r in cur.fetchall(): + oem_cache[r[0]] = r[1] + + # Build article→vehicles mapping from article files + article_vehicles = {} # articleId → set of vehicleIds + article_category = {} # articleId → categoryId (TecDoc) + + for f in ARTICLES_DIR.glob("*.json"): + parts = f.stem.split("_") + if len(parts) != 2: + continue + vid, cat_id = int(parts[0]), int(parts[1]) + + try: + articles = json.loads(f.read_text()) + except: + continue + + for a in articles: + aid = a.get('articleId') + if aid: + if aid not in article_vehicles: + article_vehicles[aid] = set() + article_vehicles[aid].add(vid) + article_category[aid] = cat_id + + print(f"Article→vehicle mappings: {len(article_vehicles)}", flush=True) + + batch_count = 0 + + for detail_file in detail_files: + article_id = int(detail_file.stem) + + try: + data = json.loads(detail_file.read_text()) + except: + continue + + article = data.get('article', {}) + if not article: + continue + + article_no = article.get('articleNo', '') + supplier_name = article.get('supplierName', '') + product_name = article.get('articleProductName', '') + supplier_id = article.get('supplierId') + + # Get OEM numbers + oem_numbers = article.get('oemNo', []) + if not oem_numbers: + continue + + # Get category for this article + td_cat_id = article_category.get(article_id) + cat_info = cat_map.get(td_cat_id) + cat_db_id = cat_info[0] if cat_info else None + + # Ensure manufacturer exists + if supplier_name and supplier_name not in mfr_cache: + cur.execute( + "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", + (supplier_name,)) + mfr_cache[supplier_name] = cur.fetchone()[0] + stats['manufacturers'] += 1 + + mfr_id = mfr_cache.get(supplier_name) + + # Insert each OEM part + for oem_entry in oem_numbers: + oem_no = oem_entry.get('oemDisplayNo', '') + oem_brand = oem_entry.get('oemBrand', '') + if not oem_no: + continue + + # Insert OEM part if not exists + if oem_no not in oem_cache: + cur.execute(""" + INSERT INTO parts (oem_part_number, name_part, name_es, category_id, description) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part + RETURNING id_part + """, (oem_no, product_name, None, cat_db_id, f"OEM {oem_brand}")) + oem_cache[oem_no] = cur.fetchone()[0] + stats['parts_inserted'] += 1 + else: + stats['parts_existing'] += 1 + + part_id = oem_cache[oem_no] + + # Insert aftermarket cross-reference + if article_no and supplier_name: + cur.execute(""" + INSERT INTO part_cross_references (part_id, cross_ref_number, id_ref_type, source_ref) + VALUES (%s, %s, NULL, %s) + ON CONFLICT DO NOTHING + """, (part_id, article_no, supplier_name)) + stats['cross_refs'] += 1 + + batch_count += 1 + if batch_count % 500 == 0: + conn.commit() + print(f" Processed {batch_count}/{len(detail_files)} articles, " + f"{stats['parts_inserted']} parts inserted", flush=True) + + conn.commit() + cur.close() + conn.close() + + print(f"\n{'='*60}", flush=True) + print(f"IMPORT COMPLETE", flush=True) + print(f" Parts inserted: {stats['parts_inserted']:,}", flush=True) + print(f" Parts existing: {stats['parts_existing']:,}", flush=True) + print(f" Cross-references: {stats['cross_refs']:,}", flush=True) + print(f" Manufacturers: {stats['manufacturers']:,}", flush=True) + print(f"{'='*60}", flush=True) + + +# ──────────────── Status ──────────────── + +def status(): + """Show download progress.""" + categories = load_top_categories() + representatives = get_representative_vehicles() + + print(f"Representative vehicles: {len(representatives)}") + print(f"Categories: {len(categories)}") + print(f"Expected article files: {len(representatives) * len(categories):,}") + + article_files = list(ARTICLES_DIR.glob("*.json")) if ARTICLES_DIR.exists() else [] + detail_files = list(DETAILS_DIR.glob("*.json")) if DETAILS_DIR.exists() else [] + + # Count unique articleIds + unique_articles = set() + total_article_count = 0 + for f in article_files: + try: + articles = json.loads(f.read_text()) + for a in articles: + if 'articleId' in a: + unique_articles.add(a['articleId']) + total_article_count += len(articles) + except: + continue + + expected = len(representatives) * len(categories) + pct_articles = len(article_files) / expected * 100 if expected > 0 else 0 + + print(f"\nArticle files: {len(article_files):,} / {expected:,} ({pct_articles:.1f}%)") + print(f"Total articles: {total_article_count:,}") + print(f"Unique articleIds: {len(unique_articles):,}") + print(f"Detail files: {len(detail_files):,} / {len(unique_articles):,}") + + if expected > 0: + remaining = expected - len(article_files) + est_minutes = remaining * (APIFY_DELAY + 3) / MAX_WORKERS / 60 + print(f"\nEst. remaining (articles): ~{est_minutes:.0f} min ({remaining:,} calls)") + + remaining_details = len(unique_articles) - len(detail_files) + if remaining_details > 0: + est_detail_min = remaining_details * (APIFY_DELAY + 3) / MAX_WORKERS / 60 + print(f"Est. remaining (details): ~{est_detail_min:.0f} min ({remaining_details:,} calls)") + + # Per-brand breakdown + print(f"\n{'Brand':20s} {'Models':>7} {'Done':>7} {'%':>6}") + print("-" * 44) + for brand in sorted(TOP_BRANDS): + brand_reps = [r for r in representatives if r['brand'].upper() == brand] + brand_done = sum(1 for r in brand_reps + for cat_id, _ in categories + if (ARTICLES_DIR / f"{r['vehicleId']}_{cat_id}.json").exists()) + brand_total = len(brand_reps) * len(categories) + pct = brand_done / brand_total * 100 if brand_total > 0 else 0 + print(f" {brand:18s} {len(brand_reps):>7} {brand_done:>7} {pct:>5.1f}%") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TecDoc parts import") + parser.add_argument("command", choices=["download", "import", "status"]) + parser.add_argument("--brand", help="Filter by brand name") + args = parser.parse_args() + + if args.command == "download": + download(brand_filter=args.brand) + elif args.command == "import": + do_import() + elif args.command == "status": + status() diff --git a/scripts/link_vehicle_parts.py b/scripts/link_vehicle_parts.py new file mode 100644 index 0000000..8bb8212 --- /dev/null +++ b/scripts/link_vehicle_parts.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python3 +""" +Link parts to vehicles using TecDoc article files. +Maps: article file (vehicleId_categoryId.json) → parts → vehicle_parts (MYE ids) +Optimized v3: year+engine filtering + batch inserts. +""" + +import json +import re +import psycopg2 +from psycopg2.extras import execute_values +from pathlib import Path + +DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts" +DATA_DIR = Path("/home/Autopartes/data/tecdoc") +ARTICLES_DIR = DATA_DIR / "parts" / "articles" +DETAILS_DIR = DATA_DIR / "parts" / "details" + +BATCH_SIZE = 50000 + + +def parse_capacity_liters(cap): + """Convert TecDoc capacityLt (e.g. '1998.0000' cc) to liters float (1.998).""" + try: + cc = float(cap) + return round(cc / 1000, 1) + except: + return None + + +def extract_engine_liters(engine_name): + """Extract liters from engine name like '2.0L 4cyl 127hp'.""" + m = re.match(r'(\d+\.\d+)L', engine_name) + if m: + return round(float(m.group(1)), 1) + return None + + +def run(): + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Step 1: Build vehicleId → vehicle info from TecDoc files + print("Building vehicleId → vehicle info mapping...", flush=True) + mfrs = json.loads((DATA_DIR / "manufacturers.json").read_text()) + vid_info = {} # vehicleId → {brand, model, year_start, year_end, liters} + for mfr in mfrs: + brand = mfr['manufacturerName'] + if '(' in brand: + continue + mfr_id = mfr['manufacturerId'] + model_file = DATA_DIR / "models" / f"{mfr_id}.json" + if not model_file.exists(): + continue + models = json.loads(model_file.read_text()) + for model in models: + model_name = model.get('modelName', '') + if not model_name: + continue + vehicle_file = DATA_DIR / "vehicles" / f"{model['modelId']}.json" + if not vehicle_file.exists(): + continue + vehicles = json.loads(vehicle_file.read_text()) + if not vehicles: + continue + for v in vehicles: + vid = v.get('vehicleId') + if not vid: + continue + # Parse year range + year_start = None + year_end = None + try: + cs = v.get('constructionIntervalStart', '') + if cs: + year_start = int(cs[:4]) + ce = v.get('constructionIntervalEnd', '') + if ce: + year_end = int(ce[:4]) + except: + pass + # Parse engine capacity + liters = parse_capacity_liters(v.get('capacityLt') or v.get('capacityTax')) + vid_info[vid] = { + 'brand': brand, + 'model': model_name, + 'year_start': year_start, + 'year_end': year_end, + 'liters': liters, + } + + print(f" {len(vid_info):,} vehicleIds mapped", flush=True) + + # Step 2: Build (brand, modelName) → list of (mye_id, year, liters) from our DB + print("Building brand/model → MYE details mapping...", flush=True) + cur.execute(""" + SELECT b.name_brand, m.name_model, mye.id_mye, y.year_car, e.name_engine + FROM model_year_engine mye + JOIN models m ON mye.model_id = m.id_model + JOIN brands b ON m.brand_id = b.id_brand + JOIN years y ON mye.year_id = y.id_year + JOIN engines e ON mye.engine_id = e.id_engine + """) + brand_model_to_myes = {} + for brand, model, mye_id, year, engine_name in cur.fetchall(): + key = (brand, model) + liters = extract_engine_liters(engine_name) + if key not in brand_model_to_myes: + brand_model_to_myes[key] = [] + brand_model_to_myes[key].append((mye_id, year, liters)) + + print(f" {len(brand_model_to_myes):,} brand/model combos with {sum(len(v) for v in brand_model_to_myes.values()):,} MYEs", flush=True) + + # Step 3: Build OEM number → part_id from DB + print("Loading parts cache...", flush=True) + cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") + part_cache = {r[0]: r[1] for r in cur.fetchall()} + print(f" {len(part_cache):,} parts cached", flush=True) + + # Step 4: Load detail files to get articleId → OEM numbers + print("Loading article detail OEM mappings...", flush=True) + article_to_oems = {} + for f in DETAILS_DIR.glob("*.json"): + try: + data = json.loads(f.read_text()) + oem_list = data.get('articleOemNo', []) + if oem_list: + oem_nos = [o.get('oemDisplayNo') for o in oem_list if o.get('oemDisplayNo')] + if oem_nos: + article_to_oems[int(f.stem)] = oem_nos + except: + continue + print(f" {len(article_to_oems):,} articles with OEM data", flush=True) + + # Step 5: Process article files and create vehicle_parts + print("\nCreating vehicle_parts links (filtered + batch mode)...", flush=True) + + stats = {'links': 0, 'skipped_no_mye': 0, 'skipped_no_part': 0, 'files': 0, 'filtered_out': 0} + pending = [] + + def flush_batch(): + if not pending: + return + execute_values(cur, """ + INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required) + VALUES %s ON CONFLICT DO NOTHING + """, pending, page_size=10000) + conn.commit() + pending.clear() + + article_files = sorted(ARTICLES_DIR.glob("*.json")) + for f in article_files: + parts_split = f.stem.split("_") + if len(parts_split) != 2: + continue + vid = int(parts_split[0]) + + info = vid_info.get(vid) + if not info: + stats['skipped_no_mye'] += 1 + continue + + bm = (info['brand'], info['model']) + all_myes = brand_model_to_myes.get(bm, []) + if not all_myes: + stats['skipped_no_mye'] += 1 + continue + + # Filter MYEs by year range and engine capacity + td_ys = info['year_start'] + td_ye = info['year_end'] + td_lit = info['liters'] + + filtered_myes = [] + for mye_id, mye_year, mye_liters in all_myes: + # Year filter: MYE year must fall within TecDoc construction interval + if td_ys and td_ye: + if mye_year < td_ys or mye_year > td_ye: + stats['filtered_out'] += 1 + continue + elif td_ys: + if mye_year < td_ys: + stats['filtered_out'] += 1 + continue + + # Engine capacity filter: must match within 0.2L tolerance + if td_lit and mye_liters: + if abs(td_lit - mye_liters) > 0.2: + stats['filtered_out'] += 1 + continue + + filtered_myes.append(mye_id) + + if not filtered_myes: + # Fallback: if filtering removed everything, skip + stats['skipped_no_mye'] += 1 + continue + + try: + articles = json.loads(f.read_text()) + except: + continue + + for a in articles: + aid = a.get('articleId') + article_no = a.get('articleNo', '') + supplier = a.get('supplierName', '') + if not aid: + continue + + part_ids = set() + oem_nos = article_to_oems.get(aid, []) + for oem_no in oem_nos: + pid = part_cache.get(oem_no) + if pid: + part_ids.add(pid) + + if not part_ids: + stats['skipped_no_part'] += 1 + continue + + for mye_id in filtered_myes: + for part_id in part_ids: + pending.append((mye_id, part_id, 1)) + stats['links'] += 1 + + if len(pending) >= BATCH_SIZE: + flush_batch() + + stats['files'] += 1 + if stats['files'] % 500 == 0: + flush_batch() + print(f" {stats['files']:,}/{len(article_files):,} files | " + f"{stats['links']:,} links | {stats['filtered_out']:,} filtered out", flush=True) + + flush_batch() + cur.close() + conn.close() + + print(f"\n{'='*50}", flush=True) + print(f"LINKING COMPLETE", flush=True) + print(f" Files processed: {stats['files']:,}", flush=True) + print(f" Links created: {stats['links']:,}", flush=True) + print(f" Filtered out: {stats['filtered_out']:,}", flush=True) + print(f" Skipped (no MYE): {stats['skipped_no_mye']:,}", flush=True) + print(f" Skipped (no part):{stats['skipped_no_part']:,}", flush=True) + print(f"{'='*50}", flush=True) + + +if __name__ == "__main__": + run()