- import_tecdoc.py: 2-phase TecDoc download + import (brands, models, vehicles) - import_live.py: real-time streaming importer for part details - run_all_brands.sh: automated sequential brand processing pipeline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
145 lines
5.0 KiB
Python
145 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Live importer: watches detail files and imports OEM data as it arrives.
|
|
Runs in a loop, importing new detail files every 30 seconds.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import psycopg2
|
|
from pathlib import Path
|
|
|
|
DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts"
|
|
DETAILS_DIR = Path("/home/Autopartes/data/tecdoc/parts/details")
|
|
ARTICLES_DIR = Path("/home/Autopartes/data/tecdoc/parts/articles")
|
|
TRACK_FILE = Path("/home/Autopartes/data/tecdoc/parts/.imported_ids")
|
|
|
|
INTERVAL = 30 # seconds between import runs
|
|
|
|
|
|
def load_imported():
|
|
"""Load set of already-imported articleIds."""
|
|
if TRACK_FILE.exists():
|
|
return set(TRACK_FILE.read_text().split())
|
|
return set()
|
|
|
|
|
|
def save_imported(ids):
|
|
TRACK_FILE.write_text("\n".join(ids))
|
|
|
|
|
|
def run():
|
|
imported = load_imported()
|
|
print(f"Already imported: {len(imported)} articles", flush=True)
|
|
|
|
# Build article→category mapping once
|
|
article_cats = {}
|
|
for f in ARTICLES_DIR.glob("*.json"):
|
|
parts = f.stem.split("_")
|
|
if len(parts) != 2:
|
|
continue
|
|
cat_id = int(parts[1])
|
|
try:
|
|
for a in json.loads(f.read_text()):
|
|
aid = a.get('articleId')
|
|
if aid and aid not in article_cats:
|
|
article_cats[aid] = cat_id
|
|
except:
|
|
continue
|
|
print(f"Article→category mappings: {len(article_cats):,}", flush=True)
|
|
|
|
while True:
|
|
detail_files = list(DETAILS_DIR.glob("*.json"))
|
|
new_files = [f for f in detail_files if f.stem not in imported]
|
|
|
|
if not new_files:
|
|
print(f" [{time.strftime('%H:%M:%S')}] No new files. Total imported: {len(imported):,}. Waiting...", flush=True)
|
|
time.sleep(INTERVAL)
|
|
continue
|
|
|
|
print(f" [{time.strftime('%H:%M:%S')}] Found {len(new_files)} new detail files to import", flush=True)
|
|
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Load caches
|
|
cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL")
|
|
part_cache = {r[0]: r[1] for r in cur.fetchall()}
|
|
|
|
cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers")
|
|
mfr_cache = {r[1]: r[0] for r in cur.fetchall()}
|
|
|
|
stats = {'parts': 0, 'xrefs': 0, 'mfrs': 0, 'updated': 0}
|
|
|
|
for f in new_files:
|
|
article_id = f.stem
|
|
try:
|
|
data = json.loads(f.read_text())
|
|
except:
|
|
imported.add(article_id)
|
|
continue
|
|
|
|
oem_list = data.get('articleOemNo', [])
|
|
article = data.get('article', {}) or {}
|
|
article_no = article.get('articleNo', '')
|
|
supplier = article.get('supplierName', '')
|
|
product_name = article.get('articleProductName', '')
|
|
|
|
if not oem_list:
|
|
imported.add(article_id)
|
|
continue
|
|
|
|
# Ensure manufacturer
|
|
if supplier and supplier not in mfr_cache:
|
|
cur.execute(
|
|
"INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture",
|
|
(supplier,))
|
|
mfr_cache[supplier] = cur.fetchone()[0]
|
|
stats['mfrs'] += 1
|
|
|
|
for oem in oem_list:
|
|
oem_no = oem.get('oemDisplayNo', '')
|
|
oem_brand = oem.get('oemBrand', '')
|
|
if not oem_no:
|
|
continue
|
|
|
|
if oem_no not in part_cache:
|
|
cur.execute("""
|
|
INSERT INTO parts (oem_part_number, name_part, description)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part
|
|
RETURNING id_part
|
|
""", (oem_no, product_name, f"OEM {oem_brand}"))
|
|
part_cache[oem_no] = cur.fetchone()[0]
|
|
stats['parts'] += 1
|
|
else:
|
|
# Update the existing AFT- placeholder with real OEM number
|
|
stats['updated'] += 1
|
|
|
|
part_id = part_cache[oem_no]
|
|
|
|
# Cross-reference
|
|
if article_no and supplier:
|
|
cur.execute("""
|
|
INSERT INTO part_cross_references (part_id, cross_reference_number, source_ref)
|
|
VALUES (%s, %s, %s) ON CONFLICT DO NOTHING
|
|
""", (part_id, article_no, supplier))
|
|
stats['xrefs'] += 1
|
|
|
|
imported.add(article_id)
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
save_imported(imported)
|
|
total_details = len(list(DETAILS_DIR.glob("*.json")))
|
|
print(f" Imported batch: +{stats['parts']} parts, +{stats['xrefs']} xrefs, +{stats['mfrs']} mfrs | "
|
|
f"Total imported: {len(imported):,} | Details on disk: {total_details:,}", flush=True)
|
|
|
|
time.sleep(INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|