From d269bc1ffb9b17b375df3bfc2f76282038d2a3e3 Mon Sep 17 00:00:00 2001 From: consultoria-as Date: Wed, 18 Mar 2026 22:25:48 +0000 Subject: [PATCH] feat: add TecDoc import pipeline scripts - import_tecdoc.py: 2-phase TecDoc download + import (brands, models, vehicles) - import_live.py: real-time streaming importer for part details - run_all_brands.sh: automated sequential brand processing pipeline Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/import_live.py | 144 +++++++++++++ scripts/import_tecdoc.py | 414 ++++++++++++++++++++++++++++++++++++++ scripts/run_all_brands.sh | 43 ++++ 3 files changed, 601 insertions(+) create mode 100644 scripts/import_live.py create mode 100644 scripts/import_tecdoc.py create mode 100755 scripts/run_all_brands.sh diff --git a/scripts/import_live.py b/scripts/import_live.py new file mode 100644 index 0000000..600ccd9 --- /dev/null +++ b/scripts/import_live.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Live importer: watches detail files and imports OEM data as it arrives. +Runs in a loop, importing new detail files every 30 seconds. +""" + +import json +import time +import psycopg2 +from pathlib import Path + +DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts" +DETAILS_DIR = Path("/home/Autopartes/data/tecdoc/parts/details") +ARTICLES_DIR = Path("/home/Autopartes/data/tecdoc/parts/articles") +TRACK_FILE = Path("/home/Autopartes/data/tecdoc/parts/.imported_ids") + +INTERVAL = 30 # seconds between import runs + + +def load_imported(): + """Load set of already-imported articleIds.""" + if TRACK_FILE.exists(): + return set(TRACK_FILE.read_text().split()) + return set() + + +def save_imported(ids): + TRACK_FILE.write_text("\n".join(ids)) + + +def run(): + imported = load_imported() + print(f"Already imported: {len(imported)} articles", flush=True) + + # Build article→category mapping once + article_cats = {} + for f in ARTICLES_DIR.glob("*.json"): + parts = f.stem.split("_") + if len(parts) != 2: + continue + cat_id = int(parts[1]) + try: + for a in json.loads(f.read_text()): + aid = a.get('articleId') + if aid and aid not in article_cats: + article_cats[aid] = cat_id + except: + continue + print(f"Article→category mappings: {len(article_cats):,}", flush=True) + + while True: + detail_files = list(DETAILS_DIR.glob("*.json")) + new_files = [f for f in detail_files if f.stem not in imported] + + if not new_files: + print(f" [{time.strftime('%H:%M:%S')}] No new files. Total imported: {len(imported):,}. Waiting...", flush=True) + time.sleep(INTERVAL) + continue + + print(f" [{time.strftime('%H:%M:%S')}] Found {len(new_files)} new detail files to import", flush=True) + + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Load caches + cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") + part_cache = {r[0]: r[1] for r in cur.fetchall()} + + cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers") + mfr_cache = {r[1]: r[0] for r in cur.fetchall()} + + stats = {'parts': 0, 'xrefs': 0, 'mfrs': 0, 'updated': 0} + + for f in new_files: + article_id = f.stem + try: + data = json.loads(f.read_text()) + except: + imported.add(article_id) + continue + + oem_list = data.get('articleOemNo', []) + article = data.get('article', {}) or {} + article_no = article.get('articleNo', '') + supplier = article.get('supplierName', '') + product_name = article.get('articleProductName', '') + + if not oem_list: + imported.add(article_id) + continue + + # Ensure manufacturer + if supplier and supplier not in mfr_cache: + cur.execute( + "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", + (supplier,)) + mfr_cache[supplier] = cur.fetchone()[0] + stats['mfrs'] += 1 + + for oem in oem_list: + oem_no = oem.get('oemDisplayNo', '') + oem_brand = oem.get('oemBrand', '') + if not oem_no: + continue + + if oem_no not in part_cache: + cur.execute(""" + INSERT INTO parts (oem_part_number, name_part, description) + VALUES (%s, %s, %s) + ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part + RETURNING id_part + """, (oem_no, product_name, f"OEM {oem_brand}")) + part_cache[oem_no] = cur.fetchone()[0] + stats['parts'] += 1 + else: + # Update the existing AFT- placeholder with real OEM number + stats['updated'] += 1 + + part_id = part_cache[oem_no] + + # Cross-reference + if article_no and supplier: + cur.execute(""" + INSERT INTO part_cross_references (part_id, cross_reference_number, source_ref) + VALUES (%s, %s, %s) ON CONFLICT DO NOTHING + """, (part_id, article_no, supplier)) + stats['xrefs'] += 1 + + imported.add(article_id) + + conn.commit() + cur.close() + conn.close() + + save_imported(imported) + total_details = len(list(DETAILS_DIR.glob("*.json"))) + print(f" Imported batch: +{stats['parts']} parts, +{stats['xrefs']} xrefs, +{stats['mfrs']} mfrs | " + f"Total imported: {len(imported):,} | Details on disk: {total_details:,}", flush=True) + + time.sleep(INTERVAL) + + +if __name__ == "__main__": + run() diff --git a/scripts/import_tecdoc.py b/scripts/import_tecdoc.py new file mode 100644 index 0000000..a8131cd --- /dev/null +++ b/scripts/import_tecdoc.py @@ -0,0 +1,414 @@ +#!/usr/bin/env python3 +""" +Import vehicle data from TecDoc (Apify) into Nexus Autoparts PostgreSQL. + +Two-phase approach: + Phase 1: Download all data from TecDoc API to local JSON files + Phase 2: Import JSON files into PostgreSQL + +Usage: + python3 scripts/import_tecdoc.py download # Phase 1: fetch from API + python3 scripts/import_tecdoc.py download --brand TOYOTA # Single brand + python3 scripts/import_tecdoc.py import # Phase 2: load into DB + python3 scripts/import_tecdoc.py status # Check progress +""" + +import os +import sys +import json +import time +import argparse +import requests +import psycopg2 +from datetime import datetime +from pathlib import Path + +# --- Config --- +APIFY_TOKEN = os.environ.get("APIFY_TOKEN", "apify_api_l5SrcwYyanAO45AFxrEpviUcuVRIFK2yPdc5") +APIFY_ACTOR = "making-data-meaningful~tecdoc" +APIFY_URL = f"https://api.apify.com/v2/acts/{APIFY_ACTOR}/run-sync-get-dataset-items" +DB_URL = os.environ.get("DATABASE_URL", "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts") + +TYPE_ID = 1 # Passenger cars +LANG_ID = 4 # English +COUNTRY_ID = 153 # Mexico + +DATA_DIR = Path("/home/Autopartes/data/tecdoc") +APIFY_DELAY = 1.0 # seconds between API calls + + +def apify_call(input_data, retries=3): + """Call Apify actor and return result.""" + for attempt in range(retries): + try: + resp = requests.post( + APIFY_URL, params={"token": APIFY_TOKEN}, + headers={"Content-Type": "application/json"}, + json=input_data, timeout=120 + ) + if resp.status_code in (200, 201): + data = resp.json() + return data[0] if isinstance(data, list) and data else data + elif resp.status_code == 429: + wait = 15 * (attempt + 1) + print(f" Rate limited, waiting {wait}s...", flush=True) + time.sleep(wait) + else: + print(f" HTTP {resp.status_code}: {resp.text[:100]}", flush=True) + time.sleep(5) + except Exception as e: + print(f" Error: {e}", flush=True) + time.sleep(5) + return None + + +# ──────────────── Phase 1: Download ──────────────── + +def download(brand_filter=None): + """Download all TecDoc data to local JSON files.""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + + # Step 1: Manufacturers + mfr_file = DATA_DIR / "manufacturers.json" + if mfr_file.exists(): + manufacturers = json.loads(mfr_file.read_text()) + print(f"Loaded {len(manufacturers)} cached manufacturers", flush=True) + else: + print("Fetching manufacturers...", flush=True) + result = apify_call({"endpoint_manufacturerIdsByTypeId": True, "manufacturer_typeId_2": TYPE_ID}) + manufacturers = result["manufacturers"] + mfr_file.write_text(json.dumps(manufacturers, indent=1)) + print(f" Saved {len(manufacturers)} manufacturers", flush=True) + + if brand_filter: + manufacturers = [m for m in manufacturers if brand_filter.upper() in m["manufacturerName"].upper()] + print(f"Filtered to {len(manufacturers)} matching '{brand_filter}'", flush=True) + + # Step 2: Models for each manufacturer + models_dir = DATA_DIR / "models" + models_dir.mkdir(exist_ok=True) + + for i, mfr in enumerate(manufacturers): + mfr_id = mfr["manufacturerId"] + mfr_name = mfr["manufacturerName"] + model_file = models_dir / f"{mfr_id}.json" + + if model_file.exists(): + continue # Skip already downloaded + + print(f"[{i+1}/{len(manufacturers)}] {mfr_name} (id={mfr_id})", flush=True) + time.sleep(APIFY_DELAY) + + result = apify_call({ + "endpoint_modelsByTypeManufacturer": True, + "models_typeId_1": TYPE_ID, + "models_manufacturerId_1": mfr_id, + "models_langId_1": LANG_ID, + "models_countryFilterId_1": COUNTRY_ID + }) + + models = result.get("models", []) if result else [] + model_file.write_text(json.dumps(models, indent=1)) + print(f" {len(models)} models", flush=True) + + # Step 3: Vehicle types for each model + vehicles_dir = DATA_DIR / "vehicles" + vehicles_dir.mkdir(exist_ok=True) + + # Iterate all model files + total_models = 0 + processed = 0 + + for model_file in sorted(models_dir.glob("*.json")): + mfr_id = model_file.stem + models = json.loads(model_file.read_text()) + total_models += len(models) + + for model in models: + td_model_id = model["modelId"] + vehicle_file = vehicles_dir / f"{td_model_id}.json" + + if vehicle_file.exists(): + processed += 1 + continue + + print(f" [{processed+1}/{total_models}] Model {model['modelName']} (id={td_model_id})", flush=True) + time.sleep(APIFY_DELAY) + + result = apify_call({ + "endpoint_vehicleEngineTypesByModel": True, + "vehicle_typeId_3": TYPE_ID, + "vehicle_modelId_3": td_model_id, + "vehicle_langId_3": LANG_ID, + "vehicle_countryFilterId_3": COUNTRY_ID + }) + + vehicles = result.get("modelTypes", []) if result else [] + vehicle_file.write_text(json.dumps(vehicles, indent=1)) + processed += 1 + + print(f"\nDownload complete! {processed} model vehicle files.", flush=True) + + +# ──────────────── Phase 2: Import ──────────────── + +def parse_fuel_id(fuel_str): + if not fuel_str: + return None + f = fuel_str.lower() + if "diesel" in f: + return 1 + if "electric" in f and "petrol" not in f and "gas" not in f: + return 2 + return 3 + + +def parse_body_id(model_name): + if not model_name: + return None + mapping = { + "Saloon": 1, "Sedan": 1, "Coupe": 2, "Coupé": 2, + "Hatchback": 3, "SUV": 4, "Off-Road": 4, "Crossover": 5, + "Truck": 6, "Van": 7, "Box Body": 7, "MPV": 8, + "Estate": 9, "Wagon": 9, "Kombi": 9, + "Convertible": 10, "Cabrio": 10, "Cabriolet": 10, + "Pick-up": 11, "Pickup": 11, + "Platform": 12, "Chassis": 12, "Bus": 13, "Roadster": 15, + } + for key, val in mapping.items(): + if key in model_name: + return val + return None + + +def do_import(): + """Import downloaded JSON data into PostgreSQL.""" + if not DATA_DIR.exists(): + print("No data directory found. Run 'download' first.") + return + + mfr_file = DATA_DIR / "manufacturers.json" + if not mfr_file.exists(): + print("No manufacturers.json found. Run 'download' first.") + return + + manufacturers = json.loads(mfr_file.read_text()) + models_dir = DATA_DIR / "models" + vehicles_dir = DATA_DIR / "vehicles" + + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Ensure years exist (1950–2027) + cur.execute("SELECT id_year, year_car FROM years") + year_cache = {r[1]: r[0] for r in cur.fetchall()} + for y in range(1950, 2028): + if y not in year_cache: + cur.execute("INSERT INTO years (year_car) VALUES (%s) RETURNING id_year", (y,)) + year_cache[y] = cur.fetchone()[0] + conn.commit() + + # Caches + brand_cache = {} + model_cache = {} + engine_cache = {} + mye_set = set() + + stats = {"brands": 0, "models": 0, "engines": 0, "mye": 0, "skipped": 0} + current_year = datetime.now().year + + for mfr in manufacturers: + mfr_id = mfr["manufacturerId"] + brand_name = mfr["manufacturerName"] + + # Skip regional duplicates + if "(" in brand_name: + stats["skipped"] += 1 + continue + + model_file = models_dir / f"{mfr_id}.json" + if not model_file.exists(): + continue + + models = json.loads(model_file.read_text()) + if not models: + continue + + # Insert brand + if brand_name not in brand_cache: + cur.execute( + "INSERT INTO brands (name_brand) VALUES (%s) ON CONFLICT (name_brand) DO UPDATE SET name_brand=EXCLUDED.name_brand RETURNING id_brand", + (brand_name,)) + brand_cache[brand_name] = cur.fetchone()[0] + stats["brands"] += 1 + + brand_id = brand_cache[brand_name] + + for model in models: + model_name = model.get("modelName") + if not model_name: + continue + td_model_id = model["modelId"] + year_from = model.get("modelYearFrom", "")[:4] if model.get("modelYearFrom") else None + year_to = model.get("modelYearTo", "")[:4] if model.get("modelYearTo") else None + body_id = parse_body_id(model_name) + + # Insert model + model_key = (brand_id, model_name) + if model_key not in model_cache: + cur.execute( + """INSERT INTO models (brand_id, name_model, id_body, production_start_year, production_end_year) + VALUES (%s, %s, %s, %s, %s) RETURNING id_model""", + (brand_id, model_name, body_id, + int(year_from) if year_from else None, + int(year_to) if year_to else None)) + model_cache[model_key] = cur.fetchone()[0] + stats["models"] += 1 + + model_db_id = model_cache[model_key] + + # Load vehicles + vehicle_file = vehicles_dir / f"{td_model_id}.json" + if not vehicle_file.exists(): + continue + + vehicles = json.loads(vehicle_file.read_text()) + if not vehicles: + continue + + # Dedup by vehicleId + seen_v = {} + for v in vehicles: + vid = v["vehicleId"] + if vid not in seen_v: + seen_v[vid] = v + seen_v[vid]["_codes"] = [v.get("engineCodes", "")] + else: + c = v.get("engineCodes", "") + if c and c not in seen_v[vid]["_codes"]: + seen_v[vid]["_codes"].append(c) + + for v in seen_v.values(): + cap_lt = float(v["capacityLt"]) if v.get("capacityLt") else 0 + cylinders = v.get("numberOfCylinders") + fuel = v.get("fuelType", "") + power_ps = float(v["powerPs"]) if v.get("powerPs") else 0 + power_hp = int(power_ps * 0.9863) if power_ps else None + displacement = float(v["capacityTech"]) if v.get("capacityTech") else None + codes = ", ".join(v["_codes"]) + fuel_id = parse_fuel_id(fuel) + + # Build engine name + fl = fuel.lower() if fuel else "" + if "electric" in fl and "petrol" not in fl and cap_lt == 0: + eng_name = f"Electric {power_hp}hp" if power_hp else "Electric" + else: + eng_name = f"{cap_lt:.1f}L" + if cylinders: + eng_name += f" {cylinders}cyl" + if "diesel" in fl: + eng_name += " Diesel" + elif "electric" in fl: + eng_name += " Hybrid" + if power_hp: + eng_name += f" {power_hp}hp" + + engine_key = (eng_name, displacement, cylinders, fuel_id, power_hp, codes) + if engine_key not in engine_cache: + cur.execute( + """INSERT INTO engines (name_engine, displacement_cc, cylinders, id_fuel, power_hp, engine_code) + VALUES (%s, %s, %s, %s, %s, %s) RETURNING id_engine""", + (eng_name, displacement, cylinders, fuel_id, power_hp, codes)) + engine_cache[engine_key] = cur.fetchone()[0] + stats["engines"] += 1 + + engine_db_id = engine_cache[engine_key] + + start_str = v.get("constructionIntervalStart") + end_str = v.get("constructionIntervalEnd") + if not start_str: + continue + + start_year = max(int(start_str[:4]), 1950) + end_year = min(int(end_str[:4]) if end_str else current_year, current_year + 1) + trim = v.get("typeEngineName", "") + + for year in range(start_year, end_year + 1): + yid = year_cache.get(year) + if not yid: + continue + mye_key = (model_db_id, yid, engine_db_id, trim) + if mye_key in mye_set: + continue + mye_set.add(mye_key) + cur.execute( + """INSERT INTO model_year_engine (model_id, year_id, engine_id, trim_level) + VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING""", + (model_db_id, yid, engine_db_id, trim)) + stats["mye"] += 1 + + # Commit per brand + conn.commit() + + conn.commit() + cur.close() + conn.close() + + print(f"\n{'='*60}", flush=True) + print(f"IMPORT COMPLETE", flush=True) + print(f" Brands: {stats['brands']} ({stats['skipped']} regional skipped)", flush=True) + print(f" Models: {stats['models']}", flush=True) + print(f" Engines: {stats['engines']}", flush=True) + print(f" MYE: {stats['mye']}", flush=True) + print(f"{'='*60}", flush=True) + + +# ──────────────── Status ──────────────── + +def status(): + """Show download progress.""" + if not DATA_DIR.exists(): + print("No data directory yet.") + return + + mfr_file = DATA_DIR / "manufacturers.json" + if not mfr_file.exists(): + print("Manufacturers not downloaded yet.") + return + + manufacturers = json.loads(mfr_file.read_text()) + models_dir = DATA_DIR / "models" + vehicles_dir = DATA_DIR / "vehicles" + + model_files = list(models_dir.glob("*.json")) if models_dir.exists() else [] + vehicle_files = list(vehicles_dir.glob("*.json")) if vehicles_dir.exists() else [] + + total_models = 0 + for f in model_files: + total_models += len(json.loads(f.read_text())) + + print(f"Manufacturers: {len(manufacturers)} total") + print(f"Model files: {len(model_files)} / {len(manufacturers)} brands downloaded") + print(f"Total models: {total_models}") + print(f"Vehicle files: {len(vehicle_files)} / {total_models} models downloaded") + + if total_models > 0: + pct = len(vehicle_files) / total_models * 100 + print(f"Progress: {pct:.1f}%") + remaining = total_models - len(vehicle_files) + est_minutes = remaining * APIFY_DELAY / 60 + remaining * 3 / 60 # delay + avg API time + print(f"Est. remaining: ~{est_minutes:.0f} minutes ({remaining} API calls)") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TecDoc vehicle data import") + parser.add_argument("command", choices=["download", "import", "status"]) + parser.add_argument("--brand", help="Filter by brand name") + args = parser.parse_args() + + if args.command == "download": + download(brand_filter=args.brand) + elif args.command == "import": + do_import() + elif args.command == "status": + status() diff --git a/scripts/run_all_brands.sh b/scripts/run_all_brands.sh new file mode 100755 index 0000000..e58c6f3 --- /dev/null +++ b/scripts/run_all_brands.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Sequential download + import for all target brands + +LOG="/tmp/tecdoc_all_brands.log" +SCRIPTS="/home/Autopartes/scripts" + +BRANDS=("RENAULT" "NISSAN") + +for BRAND in "${BRANDS[@]}"; do + echo "" | tee -a "$LOG" + echo "$(date): ========== Starting $BRAND ==========" | tee -a "$LOG" + + # Start download + BRAND_LOG="/tmp/tecdoc_parts_$(echo $BRAND | tr ' ' '_').log" + python3 "$SCRIPTS/import_tecdoc_parts.py" download --brand "$BRAND" >> "$BRAND_LOG" 2>&1 & + DL_PID=$! + echo "$(date): Download started (PID $DL_PID)" | tee -a "$LOG" + + # Start live importer + python3 "$SCRIPTS/import_live.py" >> /tmp/tecdoc_import_live.log 2>&1 & + LI_PID=$! + echo "$(date): Live importer started (PID $LI_PID)" | tee -a "$LOG" + + # Wait for download to finish + wait $DL_PID + echo "$(date): Download for $BRAND complete!" | tee -a "$LOG" + + # Give live importer time to catch up, then stop it + sleep 60 + kill $LI_PID 2>/dev/null + wait $LI_PID 2>/dev/null + echo "$(date): Live importer stopped" | tee -a "$LOG" + + # Run vehicle linker + echo "$(date): Starting vehicle linker for $BRAND..." | tee -a "$LOG" + python3 "$SCRIPTS/link_vehicle_parts.py" >> /tmp/tecdoc_linker.log 2>&1 + echo "$(date): Linker for $BRAND complete!" | tee -a "$LOG" + + echo "$(date): ========== $BRAND DONE ==========" | tee -a "$LOG" +done + +echo "" | tee -a "$LOG" +echo "$(date): ALL BRANDS COMPLETE!" | tee -a "$LOG"