fix: stop creating AFT- placeholder parts in import pipeline

- import_phase1.py: skip AFT- part creation when no OEM data
- link_vehicle_parts.py: remove AFT- fallback lookup in part cache
- import_tecdoc_parts.py: add VW to TOP_BRANDS list

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-18 22:25:21 +00:00
parent 4b01c57c88
commit eff04a5e60
3 changed files with 930 additions and 0 deletions

View File

@@ -0,0 +1,531 @@
#!/usr/bin/env python3
"""
Import OEM parts data from TecDoc (Apify) into Nexus Autoparts PostgreSQL.
Three-phase approach:
Phase 1: Download categories per vehicle → JSON files
Phase 2: Download article lists per vehicle+category → JSON files
Phase 3: Download article details (OEM numbers) → JSON files
Phase 4: Import all JSON data into PostgreSQL
Uses one representative vehicleId per TecDoc model to minimize API calls.
Supports concurrent API calls for speed.
Usage:
python3 scripts/import_tecdoc_parts.py download # Phases 1-3
python3 scripts/import_tecdoc_parts.py import # Phase 4
python3 scripts/import_tecdoc_parts.py status # Check progress
"""
import os
import sys
import json
import time
import argparse
import requests
import psycopg2
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# --- Config ---
APIFY_TOKEN = os.environ.get("APIFY_TOKEN", "apify_api_l5SrcwYyanAO45AFxrEpviUcuVRIFK2yPdc5")
APIFY_ACTOR = "making-data-meaningful~tecdoc"
APIFY_URL = f"https://api.apify.com/v2/acts/{APIFY_ACTOR}/run-sync-get-dataset-items"
DB_URL = os.environ.get("DATABASE_URL", "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts")
TYPE_ID = 1 # Passenger cars
LANG_ID = 4 # English
COUNTRY_ID = 153 # Mexico
DATA_DIR = Path("/home/Autopartes/data/tecdoc")
PARTS_DIR = DATA_DIR / "parts"
ARTICLES_DIR = PARTS_DIR / "articles" # vehicle articles by category
DETAILS_DIR = PARTS_DIR / "details" # article OEM details
MAX_WORKERS = 30 # Concurrent API calls
APIFY_DELAY = 0.1 # Seconds between API calls per thread
# Top brands for Mexico & USA
TOP_BRANDS = [
'TOYOTA', 'NISSAN', 'CHEVROLET', 'VOLKSWAGEN', 'VW', 'HONDA', 'FORD',
'HYUNDAI', 'KIA', 'MAZDA', 'BMW', 'MERCEDES-BENZ', 'AUDI',
'JEEP', 'DODGE', 'CHRYSLER', 'RAM', 'GMC', 'BUICK', 'CADILLAC',
'SUBARU', 'MITSUBISHI', 'SUZUKI', 'ACURA', 'LEXUS', 'INFINITI',
'LINCOLN', 'FIAT', 'PEUGEOT', 'RENAULT', 'SEAT'
]
# Top-level TecDoc category IDs (from our DB)
TOP_CATEGORIES = None # Loaded dynamically
def apify_call(input_data, retries=3):
"""Call Apify actor and return result."""
for attempt in range(retries):
try:
resp = requests.post(
APIFY_URL, params={"token": APIFY_TOKEN},
headers={"Content-Type": "application/json"},
json=input_data, timeout=180
)
if resp.status_code in (200, 201):
data = resp.json()
return data[0] if isinstance(data, list) and data else data
elif resp.status_code == 429:
wait = 30 * (attempt + 1)
print(f" Rate limited, waiting {wait}s...", flush=True)
time.sleep(wait)
else:
print(f" HTTP {resp.status_code}: {resp.text[:200]}", flush=True)
time.sleep(5)
except Exception as e:
print(f" Error: {e}", flush=True)
time.sleep(5)
return None
def load_top_categories():
"""Load top-level TecDoc category IDs from database."""
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
cur.execute("SELECT tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL ORDER BY display_order")
cats = [(r[0], r[1]) for r in cur.fetchall()]
cur.close()
conn.close()
return cats
def get_representative_vehicles():
"""Get one representative vehicleId per TecDoc model for top brands."""
mfrs = json.loads((DATA_DIR / "manufacturers.json").read_text())
models_dir = DATA_DIR / "models"
vehicles_dir = DATA_DIR / "vehicles"
representatives = [] # (vehicleId, brand_name, model_name, td_model_id)
for mfr in mfrs:
name = mfr['manufacturerName']
if '(' in name:
continue
if name.upper() not in [b.upper() for b in TOP_BRANDS]:
continue
mfr_id = mfr['manufacturerId']
model_file = models_dir / f"{mfr_id}.json"
if not model_file.exists():
continue
models = json.loads(model_file.read_text())
for model in models:
td_model_id = model['modelId']
model_name = model.get('modelName', '')
vehicle_file = vehicles_dir / f"{td_model_id}.json"
if not vehicle_file.exists():
continue
vehicles = json.loads(vehicle_file.read_text())
if not vehicles:
continue
# Pick the first vehicle with a valid vehicleId as representative
vid = vehicles[0].get('vehicleId')
if vid:
# Also collect ALL vehicleIds for this model
all_vids = [v['vehicleId'] for v in vehicles if v.get('vehicleId')]
representatives.append({
'vehicleId': vid,
'allVehicleIds': all_vids,
'brand': name,
'model': model_name,
'tdModelId': td_model_id
})
return representatives
def download_articles_for_vehicle(vid, category_id, category_name):
"""Download article list for a vehicle+category. Returns article count."""
outfile = ARTICLES_DIR / f"{vid}_{category_id}.json"
if outfile.exists():
return 0 # Already downloaded
time.sleep(APIFY_DELAY)
result = apify_call({
'endpoint_partsArticleListByVehicleIdCategoryId': True,
'parts_vehicleId_18': vid,
'parts_categoryId_18': category_id,
'parts_typeId_18': TYPE_ID,
'parts_langId_18': LANG_ID,
})
if result and isinstance(result, dict) and 'articles' in result:
articles = result.get('articles') or []
outfile.write_text(json.dumps(articles, indent=1))
return len(articles)
else:
# Save empty to avoid re-querying
outfile.write_text("[]")
return 0
def download_article_detail(article_id):
"""Download OEM details for a single article."""
outfile = DETAILS_DIR / f"{article_id}.json"
if outfile.exists():
return True
time.sleep(APIFY_DELAY)
result = apify_call({
'endpoint_partsArticleDetailsByArticleId': True,
'parts_articleId_13': article_id,
'parts_langId_13': LANG_ID,
})
if result and result.get('articleOemNo'):
outfile.write_text(json.dumps(result, indent=1))
return True
elif result and isinstance(result.get('article'), dict):
outfile.write_text(json.dumps(result, indent=1))
return True
return False
# ──────────────── Download ────────────────
def download(brand_filter=None):
"""Download all parts data from TecDoc."""
PARTS_DIR.mkdir(parents=True, exist_ok=True)
ARTICLES_DIR.mkdir(parents=True, exist_ok=True)
DETAILS_DIR.mkdir(parents=True, exist_ok=True)
categories = load_top_categories()
print(f"Loaded {len(categories)} top-level categories", flush=True)
representatives = get_representative_vehicles()
if brand_filter:
representatives = [r for r in representatives if brand_filter.upper() in r['brand'].upper()]
print(f"Found {len(representatives)} representative vehicles for top brands", flush=True)
# Phase 1: Download articles per vehicle+category
total_tasks = len(representatives) * len(categories)
completed = 0
total_articles = 0
print(f"\n{'='*60}", flush=True)
print(f"PHASE 1: Download articles ({total_tasks:,} tasks)", flush=True)
print(f"{'='*60}", flush=True)
for i, rep in enumerate(representatives):
vid = rep['vehicleId']
brand = rep['brand']
model = rep['model']
# Check if all categories already downloaded for this vehicle
existing = sum(1 for cat_id, _ in categories
if (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists())
if existing == len(categories):
completed += len(categories)
continue
print(f"[{i+1}/{len(representatives)}] {brand} {model} (vid={vid})", flush=True)
def download_task(args):
vid, cat_id, cat_name = args
return download_articles_for_vehicle(vid, cat_id, cat_name)
tasks = [(vid, cat_id, cat_name) for cat_id, cat_name in categories
if not (ARTICLES_DIR / f"{vid}_{cat_id}.json").exists()]
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(download_task, t): t for t in tasks}
for future in as_completed(futures):
try:
count = future.result()
total_articles += count
completed += 1
except Exception as e:
print(f" Task error: {e}", flush=True)
completed += 1
completed += existing # Count pre-existing
print(f"\nPhase 1 complete: {total_articles:,} articles found", flush=True)
# Phase 2: Collect unique articleIds and download OEM details
print(f"\n{'='*60}", flush=True)
print(f"PHASE 2: Collect unique articles & download OEM details", flush=True)
print(f"{'='*60}", flush=True)
unique_articles = set()
for f in ARTICLES_DIR.glob("*.json"):
try:
articles = json.loads(f.read_text())
for a in articles:
if 'articleId' in a:
unique_articles.add(a['articleId'])
except:
continue
# Filter out already downloaded
to_download = [aid for aid in unique_articles
if not (DETAILS_DIR / f"{aid}.json").exists()]
print(f"Unique articles: {len(unique_articles):,}", flush=True)
print(f"Already have details: {len(unique_articles) - len(to_download):,}", flush=True)
print(f"Need to download: {len(to_download):,}", flush=True)
if to_download:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(download_article_detail, aid): aid
for aid in to_download}
done = 0
for future in as_completed(futures):
done += 1
if done % 100 == 0:
print(f" Details: {done}/{len(to_download)}", flush=True)
print(f"\nDownload complete!", flush=True)
# ──────────────── Import ────────────────
def do_import():
"""Import downloaded parts data into PostgreSQL."""
if not ARTICLES_DIR.exists():
print("No articles directory. Run 'download' first.")
return
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Load category mapping: tecdoc_id → (id_part_category, name)
cur.execute("SELECT id_part_category, tecdoc_id, name_part_category FROM part_categories WHERE tecdoc_id IS NOT NULL")
cat_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()}
# Load group mapping: tecdoc_id → id_part_group
cur.execute("SELECT id_part_group, tecdoc_id, category_id FROM part_groups WHERE tecdoc_id IS NOT NULL")
group_map = {r[1]: (r[0], r[2]) for r in cur.fetchall()}
# Load brand mapping from DB
cur.execute("SELECT id_brand, name_brand FROM brands")
brand_db = {r[1].upper(): r[0] for r in cur.fetchall()}
# Build vehicle mapping: vehicleId → list of MYE ids
representatives = get_representative_vehicles()
# Build vehicleId → model mapping from our DB
# We need to map TecDoc modelId → our model_id
cur.execute("""
SELECT m.id_model, b.name_brand, m.name_model, m.id_brand
FROM models m JOIN brands b ON m.id_brand = b.id_brand
""")
db_models = cur.fetchall()
stats = {
'parts_inserted': 0, 'parts_existing': 0,
'vehicle_parts': 0, 'aftermarket': 0,
'cross_refs': 0, 'manufacturers': 0
}
# Process article detail files
detail_files = list(DETAILS_DIR.glob("*.json"))
print(f"Processing {len(detail_files)} article details...", flush=True)
# Cache for parts by OEM number
oem_cache = {} # oem_no → id_part
# Cache for manufacturers
mfr_cache = {} # supplier_name → id_manufacture
cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers")
for r in cur.fetchall():
mfr_cache[r[1]] = r[0]
# Cache existing parts
cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL")
for r in cur.fetchall():
oem_cache[r[0]] = r[1]
# Build article→vehicles mapping from article files
article_vehicles = {} # articleId → set of vehicleIds
article_category = {} # articleId → categoryId (TecDoc)
for f in ARTICLES_DIR.glob("*.json"):
parts = f.stem.split("_")
if len(parts) != 2:
continue
vid, cat_id = int(parts[0]), int(parts[1])
try:
articles = json.loads(f.read_text())
except:
continue
for a in articles:
aid = a.get('articleId')
if aid:
if aid not in article_vehicles:
article_vehicles[aid] = set()
article_vehicles[aid].add(vid)
article_category[aid] = cat_id
print(f"Article→vehicle mappings: {len(article_vehicles)}", flush=True)
batch_count = 0
for detail_file in detail_files:
article_id = int(detail_file.stem)
try:
data = json.loads(detail_file.read_text())
except:
continue
article = data.get('article', {})
if not article:
continue
article_no = article.get('articleNo', '')
supplier_name = article.get('supplierName', '')
product_name = article.get('articleProductName', '')
supplier_id = article.get('supplierId')
# Get OEM numbers
oem_numbers = article.get('oemNo', [])
if not oem_numbers:
continue
# Get category for this article
td_cat_id = article_category.get(article_id)
cat_info = cat_map.get(td_cat_id)
cat_db_id = cat_info[0] if cat_info else None
# Ensure manufacturer exists
if supplier_name and supplier_name not in mfr_cache:
cur.execute(
"INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture",
(supplier_name,))
mfr_cache[supplier_name] = cur.fetchone()[0]
stats['manufacturers'] += 1
mfr_id = mfr_cache.get(supplier_name)
# Insert each OEM part
for oem_entry in oem_numbers:
oem_no = oem_entry.get('oemDisplayNo', '')
oem_brand = oem_entry.get('oemBrand', '')
if not oem_no:
continue
# Insert OEM part if not exists
if oem_no not in oem_cache:
cur.execute("""
INSERT INTO parts (oem_part_number, name_part, name_es, category_id, description)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part
RETURNING id_part
""", (oem_no, product_name, None, cat_db_id, f"OEM {oem_brand}"))
oem_cache[oem_no] = cur.fetchone()[0]
stats['parts_inserted'] += 1
else:
stats['parts_existing'] += 1
part_id = oem_cache[oem_no]
# Insert aftermarket cross-reference
if article_no and supplier_name:
cur.execute("""
INSERT INTO part_cross_references (part_id, cross_ref_number, id_ref_type, source_ref)
VALUES (%s, %s, NULL, %s)
ON CONFLICT DO NOTHING
""", (part_id, article_no, supplier_name))
stats['cross_refs'] += 1
batch_count += 1
if batch_count % 500 == 0:
conn.commit()
print(f" Processed {batch_count}/{len(detail_files)} articles, "
f"{stats['parts_inserted']} parts inserted", flush=True)
conn.commit()
cur.close()
conn.close()
print(f"\n{'='*60}", flush=True)
print(f"IMPORT COMPLETE", flush=True)
print(f" Parts inserted: {stats['parts_inserted']:,}", flush=True)
print(f" Parts existing: {stats['parts_existing']:,}", flush=True)
print(f" Cross-references: {stats['cross_refs']:,}", flush=True)
print(f" Manufacturers: {stats['manufacturers']:,}", flush=True)
print(f"{'='*60}", flush=True)
# ──────────────── Status ────────────────
def status():
"""Show download progress."""
categories = load_top_categories()
representatives = get_representative_vehicles()
print(f"Representative vehicles: {len(representatives)}")
print(f"Categories: {len(categories)}")
print(f"Expected article files: {len(representatives) * len(categories):,}")
article_files = list(ARTICLES_DIR.glob("*.json")) if ARTICLES_DIR.exists() else []
detail_files = list(DETAILS_DIR.glob("*.json")) if DETAILS_DIR.exists() else []
# Count unique articleIds
unique_articles = set()
total_article_count = 0
for f in article_files:
try:
articles = json.loads(f.read_text())
for a in articles:
if 'articleId' in a:
unique_articles.add(a['articleId'])
total_article_count += len(articles)
except:
continue
expected = len(representatives) * len(categories)
pct_articles = len(article_files) / expected * 100 if expected > 0 else 0
print(f"\nArticle files: {len(article_files):,} / {expected:,} ({pct_articles:.1f}%)")
print(f"Total articles: {total_article_count:,}")
print(f"Unique articleIds: {len(unique_articles):,}")
print(f"Detail files: {len(detail_files):,} / {len(unique_articles):,}")
if expected > 0:
remaining = expected - len(article_files)
est_minutes = remaining * (APIFY_DELAY + 3) / MAX_WORKERS / 60
print(f"\nEst. remaining (articles): ~{est_minutes:.0f} min ({remaining:,} calls)")
remaining_details = len(unique_articles) - len(detail_files)
if remaining_details > 0:
est_detail_min = remaining_details * (APIFY_DELAY + 3) / MAX_WORKERS / 60
print(f"Est. remaining (details): ~{est_detail_min:.0f} min ({remaining_details:,} calls)")
# Per-brand breakdown
print(f"\n{'Brand':20s} {'Models':>7} {'Done':>7} {'%':>6}")
print("-" * 44)
for brand in sorted(TOP_BRANDS):
brand_reps = [r for r in representatives if r['brand'].upper() == brand]
brand_done = sum(1 for r in brand_reps
for cat_id, _ in categories
if (ARTICLES_DIR / f"{r['vehicleId']}_{cat_id}.json").exists())
brand_total = len(brand_reps) * len(categories)
pct = brand_done / brand_total * 100 if brand_total > 0 else 0
print(f" {brand:18s} {len(brand_reps):>7} {brand_done:>7} {pct:>5.1f}%")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="TecDoc parts import")
parser.add_argument("command", choices=["download", "import", "status"])
parser.add_argument("--brand", help="Filter by brand name")
args = parser.parse_args()
if args.command == "download":
download(brand_filter=args.brand)
elif args.command == "import":
do_import()
elif args.command == "status":
status()