#!/usr/bin/env python3 """ Import OEM parts data from TecDoc (Apify) into Nexus Autoparts PostgreSQL. Three-phase approach: Phase 1: Download categories per vehicle → JSON files Phase 2: Download article lists per vehicle+category → JSON files Phase 3: Download article details (OEM numbers) → JSON files Phase 4: Import all JSON data into PostgreSQL Uses one representative vehicleId per TecDoc model to minimize API calls. Supports concurrent API calls for speed. Usage: python3 scripts/import_tecdoc_parts.py download # Phases 1-3 python3 scripts/import_tecdoc_parts.py import # Phase 4 python3 scripts/import_tecdoc_parts.py status # Check progress """ import os import sys import json import time import argparse import requests import psycopg2 from datetime import datetime from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed # --- Config --- APIFY_TOKEN = os.environ.get("APIFY_TOKEN", "apify_api_l5SrcwYyanAO45AFxrEpviUcuVRIFK2yPdc5") APIFY_ACTOR = "making-data-meaningful~tecdoc" APIFY_URL = f"https://api.apify.com/v2/acts/{APIFY_ACTOR}/run-sync-get-dataset-items" DB_URL = os.environ.get("DATABASE_URL", "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts") TYPE_ID = 1 # Passenger cars LANG_ID = 4 # English COUNTRY_ID = 153 # Mexico DATA_DIR = Path("/home/Autopartes/data/tecdoc") PARTS_DIR = DATA_DIR / "parts" ARTICLES_DIR = PARTS_DIR / "articles" # vehicle articles by category DETAILS_DIR = PARTS_DIR / "details" # article OEM details MAX_WORKERS = 30 # Concurrent API calls APIFY_DELAY = 0.1 # Seconds between API calls per thread # Top brands for Mexico & USA TOP_BRANDS = [ 'TOYOTA', 'NISSAN', 'CHEVROLET', 'VOLKSWAGEN', 'VW', 'HONDA', 'FORD', 'HYUNDAI', 'KIA', 'MAZDA', 'BMW', 'MERCEDES-BENZ', 'AUDI', 'JEEP', 'DODGE', 'CHRYSLER', 'RAM', 'GMC', 'BUICK', 'CADILLAC', 'SUBARU', 'MITSUBISHI', 'SUZUKI', 'ACURA', 'LEXUS', 'INFINITI', 'LINCOLN', 'FIAT', 'PEUGEOT', 'RENAULT', 'SEAT' ] # Top-level TecDoc category IDs (from our DB) TOP_CATEGORIES = None # Loaded dynamically def apify_call(input_data, retries=3): """Call Apify actor and return result.""" for attempt in range(retries): try: resp = requests.post( APIFY_URL, params={"token": APIFY_TOKEN}, headers={"Content-Type": "application/json"}, json=input_data, timeout=180 ) if resp.status_code in (200, 201): data = resp.json() return data[0] if isinstance(data, list) and data else data elif resp.status_code == 429: wait = 30 * (attempt + 1) print(f" Rate limited, waiting {wait}s...", flush=True) time.sleep(wait) else: print(f" HTTP {resp.status_code}: {resp.text[:200]}", flush=True) time.sleep(5) except Exception as e: print(f" Error: {e}", flush=True) time.sleep(5) return None def load_top_categories(): """Load top-level TecDoc category IDs from database.""" conn = psycopg2.connect(DB_URL) cur = conn.cursor() cur.execute("SELECT tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL ORDER BY display_order") cats = [(r[0], r[1]) for r in cur.fetchall()] cur.close() conn.close() return cats def get_representative_vehicles(): """Get one representative vehicleId per TecDoc model for top brands.""" mfrs = json.loads((DATA_DIR / "manufacturers.json").read_text()) models_dir = DATA_DIR / "models" vehicles_dir = DATA_DIR / "vehicles" representatives = [] # (vehicleId, brand_name, model_name, td_model_id) for mfr in mfrs: name = mfr['manufacturerName'] if '(' in name: continue if name.upper() not in [b.upper() for b in TOP_BRANDS]: continue mfr_id = mfr['manufacturerId'] model_file = models_dir / f"{mfr_id}.json" if not model_file.exists(): continue models = json.loads(model_file.read_text()) for model in models: td_model_id = model['modelId'] model_name = model.get('modelName', '') vehicle_file = vehicles_dir / f"{td_model_id}.json" if not vehicle_file.exists(): continue vehicles = json.loads(vehicle_file.read_text()) if not vehicles: continue # Pick the first vehicle with a valid vehicleId as representative vid = vehicles[0].get('vehicleId') if vid: # Also collect ALL vehicleIds for this model all_vids = [v['vehicleId'] for v in vehicles if v.get('vehicleId')] representatives.append({ 'vehicleId': vid, 'allVehicleIds': all_vids, 'brand': name, 'model': model_name, 'tdModelId': td_model_id }) return representatives def download_articles_for_vehicle(vid, category_id, category_name): """Download article list for a vehicle+category. Returns article count.""" outfile = ARTICLES_DIR / f"{vid}_{category_id}.json" if outfile.exists(): return 0 # Already downloaded time.sleep(APIFY_DELAY) result = apify_call({ 'endpoint_partsArticleListByVehicleIdCategoryId': True, 'parts_vehicleId_18': vid, 'parts_categoryId_18': category_id, 'parts_typeId_18': TYPE_ID, 'parts_langId_18': LANG_ID, }) if result and isinstance(result, dict) and 'articles' in result: articles = result.get('articles') or [] outfile.write_text(json.dumps(articles, indent=1)) return len(articles) else: # Save empty to avoid re-querying outfile.write_text("[]") return 0 def download_article_detail(article_id): """Download OEM details for a single article.""" outfile = DETAILS_DIR / f"{article_id}.json" if outfile.exists(): return True time.sleep(APIFY_DELAY) result = apify_call({ 'endpoint_partsArticleDetailsByArticleId': True, 'parts_articleId_13': article_id, 'parts_langId_13': LANG_ID, }) if result and result.get('articleOemNo'): outfile.write_text(json.dumps(result, indent=1)) return True elif result and isinstance(result.get('article'), dict): outfile.write_text(json.dumps(result, indent=1)) return True return False # ──────────────── Download ──────────────── def download(brand_filter=None): """Download all parts data from TecDoc.""" PARTS_DIR.mkdir(parents=True, exist_ok=True) ARTICLES_DIR.mkdir(parents=True, exist_ok=True) DETAILS_DIR.mkdir(parents=True, exist_ok=True) categories = load_top_categories() print(f"Loaded {len(categories)} top-level categories", flush=True) representatives = get_representative_vehicles() if brand_filter: representatives = [r for r in representatives if brand_filter.upper() in r['brand'].upper()] print(f"Found {len(representatives)} representative vehicles for top brands", flush=True) # Phase 1: Download articles per vehicle+category total_tasks = len(representatives) * len(categories) completed = 0 total_articles = 0 print(f"\n{'='*60}", flush=True) print(f"PHASE 1: Download articles ({total_tasks:,} tasks)", flush=True) print(f"{'='*60}", flush=True) for i, rep in enumerate(representatives): vid = rep['vehicleId'] brand = rep['brand'] model = rep['model'] # Check if all categories already downloaded for this vehicle existing = sum(1 for cat_id, _ in categories if (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists()) if existing == len(categories): completed += len(categories) continue print(f"[{i+1}/{len(representatives)}] {brand} {model} (vid={vid})", flush=True) def download_task(args): vid, cat_id, cat_name = args return download_articles_for_vehicle(vid, cat_id, cat_name) tasks = [(vid, cat_id, cat_name) for cat_id, cat_name in categories if not (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists()] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(download_task, t): t for t in tasks} for future in as_completed(futures): try: count = future.result() total_articles += count completed += 1 except Exception as e: print(f" Task error: {e}", flush=True) completed += 1 completed += existing # Count pre-existing print(f"\nPhase 1 complete: {total_articles:,} articles found", flush=True) # Phase 2: Collect unique articleIds and download OEM details print(f"\n{'='*60}", flush=True) print(f"PHASE 2: Collect unique articles & download OEM details", flush=True) print(f"{'='*60}", flush=True) unique_articles = set() for f in ARTICLES_DIR.glob("*.json"): try: articles = json.loads(f.read_text()) for a in articles: if 'articleId' in a: unique_articles.add(a['articleId']) except: continue # Filter out already downloaded to_download = [aid for aid in unique_articles if not (DETAILS_DIR / f"{aid}.json").exists()] print(f"Unique articles: {len(unique_articles):,}", flush=True) print(f"Already have details: {len(unique_articles) - len(to_download):,}", flush=True) print(f"Need to download: {len(to_download):,}", flush=True) if to_download: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {executor.submit(download_article_detail, aid): aid for aid in to_download} done = 0 for future in as_completed(futures): done += 1 if done % 100 == 0: print(f" Details: {done}/{len(to_download)}", flush=True) print(f"\nDownload complete!", flush=True) # ──────────────── Import ──────────────── def do_import(): """Import downloaded parts data into PostgreSQL.""" if not ARTICLES_DIR.exists(): print("No articles directory. Run 'download' first.") return conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Load category mapping: tecdoc_id → (id_part_category, name) cur.execute("SELECT id_part_category, tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL") cat_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()} # Load group mapping: tecdoc_id → id_part_group cur.execute("SELECT id_part_group, tecdoc_id, category_id FROM part_groups WHERE tecdoc_id IS NOT NULL") group_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()} # Load brand mapping from DB cur.execute("SELECT id_brand, name_brand FROM brands") brand_db = {r[1].upper(): r[0] for r in cur.fetchall()} # Build vehicle mapping: vehicleId → list of MYE ids representatives = get_representative_vehicles() # Build vehicleId → model mapping from our DB # We need to map TecDoc modelId → our model_id cur.execute(""" SELECT m.id_model, b.name_brand, m.name_model, m.id_brand FROM models m JOIN brands b ON m.id_brand = b.id_brand """) db_models = cur.fetchall() stats = { 'parts_inserted': 0, 'parts_existing': 0, 'vehicle_parts': 0, 'aftermarket': 0, 'cross_refs': 0, 'manufacturers': 0 } # Process article detail files detail_files = list(DETAILS_DIR.glob("*.json")) print(f"Processing {len(detail_files)} article details...", flush=True) # Cache for parts by OEM number oem_cache = {} # oem_no → id_part # Cache for manufacturers mfr_cache = {} # supplier_name → id_manufacture cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers") for r in cur.fetchall(): mfr_cache[r[1]] = r[0] # Cache existing parts cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") for r in cur.fetchall(): oem_cache[r[0]] = r[1] # Build article→vehicles mapping from article files article_vehicles = {} # articleId → set of vehicleIds article_category = {} # articleId → categoryId (TecDoc) for f in ARTICLES_DIR.glob("*.json"): parts = f.stem.split("_") if len(parts) != 2: continue vid, cat_id = int(parts[0]), int(parts[1]) try: articles = json.loads(f.read_text()) except: continue for a in articles: aid = a.get('articleId') if aid: if aid not in article_vehicles: article_vehicles[aid] = set() article_vehicles[aid].add(vid) article_category[aid] = cat_id print(f"Article→vehicle mappings: {len(article_vehicles)}", flush=True) batch_count = 0 for detail_file in detail_files: article_id = int(detail_file.stem) try: data = json.loads(detail_file.read_text()) except: continue article = data.get('article', {}) if not article: continue article_no = article.get('articleNo', '') supplier_name = article.get('supplierName', '') product_name = article.get('articleProductName', '') supplier_id = article.get('supplierId') # Get OEM numbers oem_numbers = article.get('oemNo', []) if not oem_numbers: continue # Get category for this article td_cat_id = article_category.get(article_id) cat_info = cat_map.get(td_cat_id) cat_db_id = cat_info[0] if cat_info else None # Ensure manufacturer exists if supplier_name and supplier_name not in mfr_cache: cur.execute( "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", (supplier_name,)) mfr_cache[supplier_name] = cur.fetchone()[0] stats['manufacturers'] += 1 mfr_id = mfr_cache.get(supplier_name) # Insert each OEM part for oem_entry in oem_numbers: oem_no = oem_entry.get('oemDisplayNo', '') oem_brand = oem_entry.get('oemBrand', '') if not oem_no: continue # Insert OEM part if not exists if oem_no not in oem_cache: cur.execute(""" INSERT INTO parts (oem_part_number, name_part, name_es, category_id, description) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part RETURNING id_part """, (oem_no, product_name, None, cat_db_id, f"OEM {oem_brand}")) oem_cache[oem_no] = cur.fetchone()[0] stats['parts_inserted'] += 1 else: stats['parts_existing'] += 1 part_id = oem_cache[oem_no] # Insert aftermarket cross-reference if article_no and supplier_name: cur.execute(""" INSERT INTO part_cross_references (part_id, cross_ref_number, id_ref_type, source_ref) VALUES (%s, %s, NULL, %s) ON CONFLICT DO NOTHING """, (part_id, article_no, supplier_name)) stats['cross_refs'] += 1 batch_count += 1 if batch_count % 500 == 0: conn.commit() print(f" Processed {batch_count}/{len(detail_files)} articles, " f"{stats['parts_inserted']} parts inserted", flush=True) conn.commit() cur.close() conn.close() print(f"\n{'='*60}", flush=True) print(f"IMPORT COMPLETE", flush=True) print(f" Parts inserted: {stats['parts_inserted']:,}", flush=True) print(f" Parts existing: {stats['parts_existing']:,}", flush=True) print(f" Cross-references: {stats['cross_refs']:,}", flush=True) print(f" Manufacturers: {stats['manufacturers']:,}", flush=True) print(f"{'='*60}", flush=True) # ──────────────── Status ──────────────── def status(): """Show download progress.""" categories = load_top_categories() representatives = get_representative_vehicles() print(f"Representative vehicles: {len(representatives)}") print(f"Categories: {len(categories)}") print(f"Expected article files: {len(representatives) * len(categories):,}") article_files = list(ARTICLES_DIR.glob("*.json")) if ARTICLES_DIR.exists() else [] detail_files = list(DETAILS_DIR.glob("*.json")) if DETAILS_DIR.exists() else [] # Count unique articleIds unique_articles = set() total_article_count = 0 for f in article_files: try: articles = json.loads(f.read_text()) for a in articles: if 'articleId' in a: unique_articles.add(a['articleId']) total_article_count += len(articles) except: continue expected = len(representatives) * len(categories) pct_articles = len(article_files) / expected * 100 if expected > 0 else 0 print(f"\nArticle files: {len(article_files):,} / {expected:,} ({pct_articles:.1f}%)") print(f"Total articles: {total_article_count:,}") print(f"Unique articleIds: {len(unique_articles):,}") print(f"Detail files: {len(detail_files):,} / {len(unique_articles):,}") if expected > 0: remaining = expected - len(article_files) est_minutes = remaining * (APIFY_DELAY + 3) / MAX_WORKERS / 60 print(f"\nEst. remaining (articles): ~{est_minutes:.0f} min ({remaining:,} calls)") remaining_details = len(unique_articles) - len(detail_files) if remaining_details > 0: est_detail_min = remaining_details * (APIFY_DELAY + 3) / MAX_WORKERS / 60 print(f"Est. remaining (details): ~{est_detail_min:.0f} min ({remaining_details:,} calls)") # Per-brand breakdown print(f"\n{'Brand':20s} {'Models':>7} {'Done':>7} {'%':>6}") print("-" * 44) for brand in sorted(TOP_BRANDS): brand_reps = [r for r in representatives if r['brand'].upper() == brand] brand_done = sum(1 for r in brand_reps for cat_id, _ in categories if (ARTICLES_DIR / f"{r['vehicleId']}_{cat_id}.json").exists()) brand_total = len(brand_reps) * len(categories) pct = brand_done / brand_total * 100 if brand_total > 0 else 0 print(f" {brand:18s} {len(brand_reps):>7} {brand_done:>7} {pct:>5.1f}%") if __name__ == "__main__": parser = argparse.ArgumentParser(description="TecDoc parts import") parser.add_argument("command", choices=["download", "import", "status"]) parser.add_argument("--brand", help="Filter by brand name") args = parser.parse_args() if args.command == "download": download(brand_filter=args.brand) elif args.command == "import": do_import() elif args.command == "status": status()