#!/usr/bin/env python3 """ Import vehicle data from TecDoc (Apify) into Nexus Autoparts PostgreSQL. Two-phase approach: Phase 1: Download all data from TecDoc API to local JSON files Phase 2: Import JSON files into PostgreSQL Usage: python3 scripts/import_tecdoc.py download # Phase 1: fetch from API python3 scripts/import_tecdoc.py download --brand TOYOTA # Single brand python3 scripts/import_tecdoc.py import # Phase 2: load into DB python3 scripts/import_tecdoc.py status # Check progress """ import os import sys import json import time import argparse import requests import psycopg2 from datetime import datetime from pathlib import Path # --- Config --- APIFY_TOKEN = os.environ.get("APIFY_TOKEN", "apify_api_l5SrcwYyanAO45AFxrEpviUcuVRIFK2yPdc5") APIFY_ACTOR = "making-data-meaningful~tecdoc" APIFY_URL = f"https://api.apify.com/v2/acts/{APIFY_ACTOR}/run-sync-get-dataset-items" DB_URL = os.environ.get("DATABASE_URL", "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts") TYPE_ID = 1 # Passenger cars LANG_ID = 4 # English COUNTRY_ID = 153 # Mexico DATA_DIR = Path("/home/Autopartes/data/tecdoc") APIFY_DELAY = 1.0 # seconds between API calls def apify_call(input_data, retries=3): """Call Apify actor and return result.""" for attempt in range(retries): try: resp = requests.post( APIFY_URL, params={"token": APIFY_TOKEN}, headers={"Content-Type": "application/json"}, json=input_data, timeout=120 ) if resp.status_code in (200, 201): data = resp.json() return data[0] if isinstance(data, list) and data else data elif resp.status_code == 429: wait = 15 * (attempt + 1) print(f" Rate limited, waiting {wait}s...", flush=True) time.sleep(wait) else: print(f" HTTP {resp.status_code}: {resp.text[:100]}", flush=True) time.sleep(5) except Exception as e: print(f" Error: {e}", flush=True) time.sleep(5) return None # ──────────────── Phase 1: Download ──────────────── def download(brand_filter=None): """Download all TecDoc data to local JSON files.""" DATA_DIR.mkdir(parents=True, exist_ok=True) # Step 1: Manufacturers mfr_file = DATA_DIR / "manufacturers.json" if mfr_file.exists(): manufacturers = json.loads(mfr_file.read_text()) print(f"Loaded {len(manufacturers)} cached manufacturers", flush=True) else: print("Fetching manufacturers...", flush=True) result = apify_call({"endpoint_manufacturerIdsByTypeId": True, "manufacturer_typeId_2": TYPE_ID}) manufacturers = result["manufacturers"] mfr_file.write_text(json.dumps(manufacturers, indent=1)) print(f" Saved {len(manufacturers)} manufacturers", flush=True) if brand_filter: manufacturers = [m for m in manufacturers if brand_filter.upper() in m["manufacturerName"].upper()] print(f"Filtered to {len(manufacturers)} matching '{brand_filter}'", flush=True) # Step 2: Models for each manufacturer models_dir = DATA_DIR / "models" models_dir.mkdir(exist_ok=True) for i, mfr in enumerate(manufacturers): mfr_id = mfr["manufacturerId"] mfr_name = mfr["manufacturerName"] model_file = models_dir / f"{mfr_id}.json" if model_file.exists(): continue # Skip already downloaded print(f"[{i+1}/{len(manufacturers)}] {mfr_name} (id={mfr_id})", flush=True) time.sleep(APIFY_DELAY) result = apify_call({ "endpoint_modelsByTypeManufacturer": True, "models_typeId_1": TYPE_ID, "models_manufacturerId_1": mfr_id, "models_langId_1": LANG_ID, "models_countryFilterId_1": COUNTRY_ID }) models = result.get("models", []) if result else [] model_file.write_text(json.dumps(models, indent=1)) print(f" {len(models)} models", flush=True) # Step 3: Vehicle types for each model vehicles_dir = DATA_DIR / "vehicles" vehicles_dir.mkdir(exist_ok=True) # Iterate all model files total_models = 0 processed = 0 for model_file in sorted(models_dir.glob("*.json")): mfr_id = model_file.stem models = json.loads(model_file.read_text()) total_models += len(models) for model in models: td_model_id = model["modelId"] vehicle_file = vehicles_dir / f"{td_model_id}.json" if vehicle_file.exists(): processed += 1 continue print(f" [{processed+1}/{total_models}] Model {model['modelName']} (id={td_model_id})", flush=True) time.sleep(APIFY_DELAY) result = apify_call({ "endpoint_vehicleEngineTypesByModel": True, "vehicle_typeId_3": TYPE_ID, "vehicle_modelId_3": td_model_id, "vehicle_langId_3": LANG_ID, "vehicle_countryFilterId_3": COUNTRY_ID }) vehicles = result.get("modelTypes", []) if result else [] vehicle_file.write_text(json.dumps(vehicles, indent=1)) processed += 1 print(f"\nDownload complete! {processed} model vehicle files.", flush=True) # ──────────────── Phase 2: Import ──────────────── def parse_fuel_id(fuel_str): if not fuel_str: return None f = fuel_str.lower() if "diesel" in f: return 1 if "electric" in f and "petrol" not in f and "gas" not in f: return 2 return 3 def parse_body_id(model_name): if not model_name: return None mapping = { "Saloon": 1, "Sedan": 1, "Coupe": 2, "Coupé": 2, "Hatchback": 3, "SUV": 4, "Off-Road": 4, "Crossover": 5, "Truck": 6, "Van": 7, "Box Body": 7, "MPV": 8, "Estate": 9, "Wagon": 9, "Kombi": 9, "Convertible": 10, "Cabrio": 10, "Cabriolet": 10, "Pick-up": 11, "Pickup": 11, "Platform": 12, "Chassis": 12, "Bus": 13, "Roadster": 15, } for key, val in mapping.items(): if key in model_name: return val return None def do_import(): """Import downloaded JSON data into PostgreSQL.""" if not DATA_DIR.exists(): print("No data directory found. Run 'download' first.") return mfr_file = DATA_DIR / "manufacturers.json" if not mfr_file.exists(): print("No manufacturers.json found. Run 'download' first.") return manufacturers = json.loads(mfr_file.read_text()) models_dir = DATA_DIR / "models" vehicles_dir = DATA_DIR / "vehicles" conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Ensure years exist (1950–2027) cur.execute("SELECT id_year, year_car FROM years") year_cache = {r[1]: r[0] for r in cur.fetchall()} for y in range(1950, 2028): if y not in year_cache: cur.execute("INSERT INTO years (year_car) VALUES (%s) RETURNING id_year", (y,)) year_cache[y] = cur.fetchone()[0] conn.commit() # Caches brand_cache = {} model_cache = {} engine_cache = {} mye_set = set() stats = {"brands": 0, "models": 0, "engines": 0, "mye": 0, "skipped": 0} current_year = datetime.now().year for mfr in manufacturers: mfr_id = mfr["manufacturerId"] brand_name = mfr["manufacturerName"] # Skip regional duplicates if "(" in brand_name: stats["skipped"] += 1 continue model_file = models_dir / f"{mfr_id}.json" if not model_file.exists(): continue models = json.loads(model_file.read_text()) if not models: continue # Insert brand if brand_name not in brand_cache: cur.execute( "INSERT INTO brands (name_brand) VALUES (%s) ON CONFLICT (name_brand) DO UPDATE SET name_brand=EXCLUDED.name_brand RETURNING id_brand", (brand_name,)) brand_cache[brand_name] = cur.fetchone()[0] stats["brands"] += 1 brand_id = brand_cache[brand_name] for model in models: model_name = model.get("modelName") if not model_name: continue td_model_id = model["modelId"] year_from = model.get("modelYearFrom", "")[:4] if model.get("modelYearFrom") else None year_to = model.get("modelYearTo", "")[:4] if model.get("modelYearTo") else None body_id = parse_body_id(model_name) # Insert model model_key = (brand_id, model_name) if model_key not in model_cache: cur.execute( """INSERT INTO models (brand_id, name_model, id_body, production_start_year, production_end_year) VALUES (%s, %s, %s, %s, %s) RETURNING id_model""", (brand_id, model_name, body_id, int(year_from) if year_from else None, int(year_to) if year_to else None)) model_cache[model_key] = cur.fetchone()[0] stats["models"] += 1 model_db_id = model_cache[model_key] # Load vehicles vehicle_file = vehicles_dir / f"{td_model_id}.json" if not vehicle_file.exists(): continue vehicles = json.loads(vehicle_file.read_text()) if not vehicles: continue # Dedup by vehicleId seen_v = {} for v in vehicles: vid = v["vehicleId"] if vid not in seen_v: seen_v[vid] = v seen_v[vid]["_codes"] = [v.get("engineCodes", "")] else: c = v.get("engineCodes", "") if c and c not in seen_v[vid]["_codes"]: seen_v[vid]["_codes"].append(c) for v in seen_v.values(): cap_lt = float(v["capacityLt"]) if v.get("capacityLt") else 0 cylinders = v.get("numberOfCylinders") fuel = v.get("fuelType", "") power_ps = float(v["powerPs"]) if v.get("powerPs") else 0 power_hp = int(power_ps * 0.9863) if power_ps else None displacement = float(v["capacityTech"]) if v.get("capacityTech") else None codes = ", ".join(v["_codes"]) fuel_id = parse_fuel_id(fuel) # Build engine name fl = fuel.lower() if fuel else "" if "electric" in fl and "petrol" not in fl and cap_lt == 0: eng_name = f"Electric {power_hp}hp" if power_hp else "Electric" else: eng_name = f"{cap_lt:.1f}L" if cylinders: eng_name += f" {cylinders}cyl" if "diesel" in fl: eng_name += " Diesel" elif "electric" in fl: eng_name += " Hybrid" if power_hp: eng_name += f" {power_hp}hp" engine_key = (eng_name, displacement, cylinders, fuel_id, power_hp, codes) if engine_key not in engine_cache: cur.execute( """INSERT INTO engines (name_engine, displacement_cc, cylinders, id_fuel, power_hp, engine_code) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id_engine""", (eng_name, displacement, cylinders, fuel_id, power_hp, codes)) engine_cache[engine_key] = cur.fetchone()[0] stats["engines"] += 1 engine_db_id = engine_cache[engine_key] start_str = v.get("constructionIntervalStart") end_str = v.get("constructionIntervalEnd") if not start_str: continue start_year = max(int(start_str[:4]), 1950) end_year = min(int(end_str[:4]) if end_str else current_year, current_year + 1) trim = v.get("typeEngineName", "") for year in range(start_year, end_year + 1): yid = year_cache.get(year) if not yid: continue mye_key = (model_db_id, yid, engine_db_id, trim) if mye_key in mye_set: continue mye_set.add(mye_key) cur.execute( """INSERT INTO model_year_engine (model_id, year_id, engine_id, trim_level) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING""", (model_db_id, yid, engine_db_id, trim)) stats["mye"] += 1 # Commit per brand conn.commit() conn.commit() cur.close() conn.close() print(f"\n{'='*60}", flush=True) print(f"IMPORT COMPLETE", flush=True) print(f" Brands: {stats['brands']} ({stats['skipped']} regional skipped)", flush=True) print(f" Models: {stats['models']}", flush=True) print(f" Engines: {stats['engines']}", flush=True) print(f" MYE: {stats['mye']}", flush=True) print(f"{'='*60}", flush=True) # ──────────────── Status ──────────────── def status(): """Show download progress.""" if not DATA_DIR.exists(): print("No data directory yet.") return mfr_file = DATA_DIR / "manufacturers.json" if not mfr_file.exists(): print("Manufacturers not downloaded yet.") return manufacturers = json.loads(mfr_file.read_text()) models_dir = DATA_DIR / "models" vehicles_dir = DATA_DIR / "vehicles" model_files = list(models_dir.glob("*.json")) if models_dir.exists() else [] vehicle_files = list(vehicles_dir.glob("*.json")) if vehicles_dir.exists() else [] total_models = 0 for f in model_files: total_models += len(json.loads(f.read_text())) print(f"Manufacturers: {len(manufacturers)} total") print(f"Model files: {len(model_files)} / {len(manufacturers)} brands downloaded") print(f"Total models: {total_models}") print(f"Vehicle files: {len(vehicle_files)} / {total_models} models downloaded") if total_models > 0: pct = len(vehicle_files) / total_models * 100 print(f"Progress: {pct:.1f}%") remaining = total_models - len(vehicle_files) est_minutes = remaining * APIFY_DELAY / 60 + remaining * 3 / 60 # delay + avg API time print(f"Est. remaining: ~{est_minutes:.0f} minutes ({remaining} API calls)") if __name__ == "__main__": parser = argparse.ArgumentParser(description="TecDoc vehicle data import") parser.add_argument("command", choices=["download", "import", "status"]) parser.add_argument("--brand", help="Filter by brand name") args = parser.parse_args() if args.command == "download": download(brand_filter=args.brand) elif args.command == "import": do_import() elif args.command == "status": status()