#!/usr/bin/env python3 """ Live importer: watches detail files and imports OEM data as it arrives. Runs in a loop, importing new detail files every 30 seconds. """ import json import time import psycopg2 from pathlib import Path DB_URL = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts" DETAILS_DIR = Path("/home/Autopartes/data/tecdoc/parts/details") ARTICLES_DIR = Path("/home/Autopartes/data/tecdoc/parts/articles") TRACK_FILE = Path("/home/Autopartes/data/tecdoc/parts/.imported_ids") INTERVAL = 30 # seconds between import runs def load_imported(): """Load set of already-imported articleIds.""" if TRACK_FILE.exists(): return set(TRACK_FILE.read_text().split()) return set() def save_imported(ids): TRACK_FILE.write_text("\n".join(ids)) def run(): imported = load_imported() print(f"Already imported: {len(imported)} articles", flush=True) # Build article→category mapping once article_cats = {} for f in ARTICLES_DIR.glob("*.json"): parts = f.stem.split("_") if len(parts) != 2: continue cat_id = int(parts[1]) try: for a in json.loads(f.read_text()): aid = a.get('articleId') if aid and aid not in article_cats: article_cats[aid] = cat_id except: continue print(f"Article→category mappings: {len(article_cats):,}", flush=True) while True: detail_files = list(DETAILS_DIR.glob("*.json")) new_files = [f for f in detail_files if f.stem not in imported] if not new_files: print(f" [{time.strftime('%H:%M:%S')}] No new files. Total imported: {len(imported):,}. Waiting...", flush=True) time.sleep(INTERVAL) continue print(f" [{time.strftime('%H:%M:%S')}] Found {len(new_files)} new detail files to import", flush=True) conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Load caches cur.execute("SELECT oem_part_number, id_part FROM parts WHERE oem_part_number IS NOT NULL") part_cache = {r[0]: r[1] for r in cur.fetchall()} cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers") mfr_cache = {r[1]: r[0] for r in cur.fetchall()} stats = {'parts': 0, 'xrefs': 0, 'mfrs': 0, 'updated': 0} for f in new_files: article_id = f.stem try: data = json.loads(f.read_text()) except: imported.add(article_id) continue oem_list = data.get('articleOemNo', []) article = data.get('article', {}) or {} article_no = article.get('articleNo', '') supplier = article.get('supplierName', '') product_name = article.get('articleProductName', '') if not oem_list: imported.add(article_id) continue # Ensure manufacturer if supplier and supplier not in mfr_cache: cur.execute( "INSERT INTO manufacturers (name_manufacture) VALUES (%s) RETURNING id_manufacture", (supplier,)) mfr_cache[supplier] = cur.fetchone()[0] stats['mfrs'] += 1 for oem in oem_list: oem_no = oem.get('oemDisplayNo', '') oem_brand = oem.get('oemBrand', '') if not oem_no: continue if oem_no not in part_cache: cur.execute(""" INSERT INTO parts (oem_part_number, name_part, description) VALUES (%s, %s, %s) ON CONFLICT (oem_part_number) DO UPDATE SET name_part = EXCLUDED.name_part RETURNING id_part """, (oem_no, product_name, f"OEM {oem_brand}")) part_cache[oem_no] = cur.fetchone()[0] stats['parts'] += 1 else: # Update the existing AFT- placeholder with real OEM number stats['updated'] += 1 part_id = part_cache[oem_no] # Cross-reference if article_no and supplier: cur.execute(""" INSERT INTO part_cross_references (part_id, cross_reference_number, source_ref) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, (part_id, article_no, supplier)) stats['xrefs'] += 1 imported.add(article_id) conn.commit() cur.close() conn.close() save_imported(imported) total_details = len(list(DETAILS_DIR.glob("*.json"))) print(f" Imported batch: +{stats['parts']} parts, +{stats['xrefs']} xrefs, +{stats['mfrs']} mfrs | " f"Total imported: {len(imported):,} | Details on disk: {total_details:,}", flush=True) time.sleep(INTERVAL) if __name__ == "__main__": run()