Files
Autoparts-DB/scripts/import_live.py
consultoria-as d269bc1ffb feat: add TecDoc import pipeline scripts
- import_tecdoc.py: 2-phase TecDoc download + import (brands, models, vehicles)
- import_live.py: real-time streaming importer for part details
- run_all_brands.sh: automated sequential brand processing pipeline

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 22:25:48 +00:00

145 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
Live importer: watches detail files and imports OEM data as it arrives.
Runs in a loop, importing new detail files every 30 seconds.
"""
import json
import time
import psycopg2
from pathlib import Path
DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts"
DETAILS_DIR = Path("/home/Autopartes/data/tecdoc/parts/details")
ARTICLES_DIR = Path("/home/Autopartes/data/tecdoc/parts/articles")
TRACK_FILE = Path("/home/Autopartes/data/tecdoc/parts/.imported_ids")
INTERVAL = 30 # seconds between import runs
def load_imported():
"""Load set of already-imported articleIds."""
if TRACK_FILE.exists():
return set(TRACK_FILE.read_text().split())
return set()
def save_imported(ids):
TRACK_FILE.write_text("\n".join(ids))
def run():
imported = load_imported()
print(f"Already imported: {len(imported)} articles", flush=True)
# Build article→category mapping once
article_cats = {}
for f in ARTICLES_DIR.glob("*.json"):
parts = f.stem.split("_")
if len(parts) != 2:
continue
cat_id = int(parts[1])
try:
for a in json.loads(f.read_text()):
aid = a.get('articleId')
if aid and aid not in article_cats:
article_cats[aid] = cat_id
except:
continue
print(f"Article→category mappings: {len(article_cats):,}", flush=True)
while True:
detail_files = list(DETAILS_DIR.glob("*.json"))
new_files = [f for f in detail_files if f.stem not in imported]
if not new_files:
print(f" [{time.strftime('%H:%M:%S')}] No new files. Total imported: {len(imported):,}. Waiting...", flush=True)
time.sleep(INTERVAL)
continue
print(f" [{time.strftime('%H:%M:%S')}] Found {len(new_files)} new detail files to import", flush=True)
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Load caches
cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL")
part_cache = {r[0]: r[1] for r in cur.fetchall()}
cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers")
mfr_cache = {r[1]: r[0] for r in cur.fetchall()}
stats = {'parts': 0, 'xrefs': 0, 'mfrs': 0, 'updated': 0}
for f in new_files:
article_id = f.stem
try:
data = json.loads(f.read_text())
except:
imported.add(article_id)
continue
oem_list = data.get('articleOemNo', [])
article = data.get('article', {}) or {}
article_no = article.get('articleNo', '')
supplier = article.get('supplierName', '')
product_name = article.get('articleProductName', '')
if not oem_list:
imported.add(article_id)
continue
# Ensure manufacturer
if supplier and supplier not in mfr_cache:
cur.execute(
"INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture",
(supplier,))
mfr_cache[supplier] = cur.fetchone()[0]
stats['mfrs'] += 1
for oem in oem_list:
oem_no = oem.get('oemDisplayNo', '')
oem_brand = oem.get('oemBrand', '')
if not oem_no:
continue
if oem_no not in part_cache:
cur.execute("""
INSERT INTO parts (oem_part_number, name_part, description)
VALUES (%s, %s, %s)
ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part
RETURNING id_part
""", (oem_no, product_name, f"OEM {oem_brand}"))
part_cache[oem_no] = cur.fetchone()[0]
stats['parts'] += 1
else:
# Update the existing AFT- placeholder with real OEM number
stats['updated'] += 1
part_id = part_cache[oem_no]
# Cross-reference
if article_no and supplier:
cur.execute("""
INSERT INTO part_cross_references (part_id, cross_reference_number, source_ref)
VALUES (%s, %s, %s) ON CONFLICT DO NOTHING
""", (part_id, article_no, supplier))
stats['xrefs'] += 1
imported.add(article_id)
conn.commit()
cur.close()
conn.close()
save_imported(imported)
total_details = len(list(DETAILS_DIR.glob("*.json")))
print(f" Imported batch: +{stats['parts']} parts, +{stats['xrefs']} xrefs, +{stats['mfrs']} mfrs | "
f"Total imported: {len(imported):,} | Details on disk: {total_details:,}", flush=True)
time.sleep(INTERVAL)
if __name__ == "__main__":
run()