Files
Autoparts-DB/scripts/import_knadian_catalog.py
consultoria-as ea29cc31c0 feat(catalog): supplier catalog cleanup, fuzzy matching, and navigation fixes
- Cleaned 137+ fake engine-displacement models from supplier imports
  (v3/v4 scripts: Chevrolet, Ford, Chrysler, Dodge, Jeep, Nissan, etc.)
- Removed 1,251+ corrupted models (INT. prefixes, year-suffix, torque specs,
  empty names, trailing-year variants)
- Migrated supplier tables to master DB (supplier_catalog,
  supplier_catalog_compat, supplier_catalog_interchange)
- Fixed _get_mye_ids_with_parts() to query supplier_catalog_compat from
  master DB so supplier-only vehicles appear for all tenants
- Added fuzzy model matcher with parenthesis stripping, noise suffix removal,
  compact matching, prefix/substring fallback, model aliases, and ±3 year
  proximity
- Matched compat rows: KEEP GREEN +14,152, KNADIAN +3,021, VAZLO +127,500,
  LUK +477, RAYBESTOS +1,743
- Added KNADIAN catalog importer with year-range expansion and future-year
  filtering
- Added VAZLO catalog importer with position parsing and SKU-in-model cleanup
- Added Keep Green, LUK, Yokomitsu, Raybestos catalog importers
- Cache clearing after cleanups (_classify_cache_*, nexus:mye_ids:*,
  nexus:brand_mye_counts:*)

Final match rates:
- KEEP GREEN: 90.3%
- VAZLO: 93.6%
- YOKOMITSU: 100.0%
- KNADIAN: 57.4%
- LUK: 51.0%
- RAYBESTOS: 55.9%
2026-06-09 07:47:42 +00:00

313 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
Import KNADIAN catalog from Excel into supplier_catalog tables.
Usage:
python scripts/import_knadian_catalog.py
"""
import os
import re
import sys
from collections import defaultdict
from datetime import datetime
import psycopg2
from openpyxl import load_workbook
MASTER_DB_URL = os.environ.get('MASTER_DB_URL', 'postgresql://postgres@localhost/nexus_autoparts')
EXCEL_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'KNADIAN.xlsx')
SUPPLIER_NAME = 'KNADIAN'
MAX_IMPORT_YEAR = datetime.now().year + 1 # reject future years from bad supplier data
MULTI_WORD_MAKES = {
('MERCEDES', 'BENZ'): 'MERCEDES BENZ',
('LAND', 'ROVER'): 'LAND ROVER',
('ALFA', 'ROMEO'): 'ALFA ROMEO',
('AMERICAN', 'MOTORS'): 'AMERICAN MOTORS',
('ROLLS', 'ROYCE'): 'ROLLS ROYCE',
('ASTON', 'MARTIN'): 'ASTON MARTIN',
('GREAT', 'WALL'): 'GREAT WALL',
}
def connect_master():
return psycopg2.connect(MASTER_DB_URL)
def normalize_name(name):
if not name:
return ''
return ' '.join(str(name).replace('\n', ' ').split())
def parse_year_token(token):
"""Parse a year token like '05', '1998', '2015'."""
if not token or not re.match(r'^\d+$', str(token)):
return None
val = int(token)
if 1000 <= val <= 2100:
return val
if 70 <= val <= 99:
return 1900 + val
if 0 <= val <= 69:
return 2000 + val
return None
def extract_years(text):
"""Extract year(s) from end of a string like '05/10', '2011', '1315', '97/99'."""
if not text:
return [None], ''
s = str(text).strip()
# Try trailing range with / or -: YY/YY, YYYY-YYYY, YY-YY
m = re.search(r'\s+(\d{2,4})\s*[-/]\s*(\d{2,4})$', s)
if m:
start = parse_year_token(m.group(1))
end = parse_year_token(m.group(2))
if start and end:
if end < start:
start, end = end, start
if end - start <= 100:
rest = s[:m.start()].strip()
return list(range(start, end + 1)), rest
# Try trailing 4-digit year
m = re.search(r'\s+(19|20)\d{2}$', s)
if m:
year = int(m.group(0).strip())
rest = s[:m.start()].strip()
return [year], rest
# Try trailing 4 consecutive digits that look like a merged range: 1315 -> 2013,2014,2015
m = re.search(r'\s+(\d{4})$', s)
if m:
digits = m.group(1)
# If first two and last two are valid years, treat as range
y1 = parse_year_token(digits[:2])
y2 = parse_year_token(digits[2:])
if y1 and y2 and y1 <= y2 and y2 - y1 <= 30:
rest = s[:m.start()].strip()
return list(range(y1, y2 + 1)), rest
return [None], s
def parse_carro(carro):
"""Parse CARRO_PERTENECIENTE like 'ACURA TL 05/10' -> make, model, years."""
if not carro:
return {'make': None, 'model': None, 'years': [None], 'raw': carro}
s = str(carro).strip()
years, rest = extract_years(s)
parts = rest.split()
if not parts:
return {'make': None, 'model': None, 'years': years, 'raw': s}
# Extract make
make = parts[0].upper()
if len(parts) >= 2:
key = (parts[0].upper(), parts[1].upper())
if key in MULTI_WORD_MAKES:
make = MULTI_WORD_MAKES[key]
parts = parts[2:]
else:
parts = parts[1:]
else:
parts = parts[1:]
model = ' '.join(parts) if parts else None
return {
'make': make,
'model': model,
'years': years,
'raw': s,
}
def extract_engine(name):
"""Extract engine description from NOMBRE_PIEZA like 'BOMBA_REFRIGERANTE L4 2.0'."""
if not name:
return None
s = normalize_name(name)
parts = s.split()
if len(parts) <= 1:
return None
# Everything after first word
engine = ' '.join(parts[1:])
# Filter out meaningless tokens that should not be engines
if engine.upper() in {'DEL.', 'TRAS.', 'FRONT.', 'EXT.', 'IZQ.', 'DER.', 'INF.', 'SUP.', 'TRANS.'}:
return None
return engine or None
def extract_interchanges(row):
"""Extract (brand, part_number) pairs from interchange columns.
KNADIAN: interchanges start at col 3 (MARCA.1) through col 15 (INTERCAMBIO.5).
"""
interchanges = []
for i in range(6):
marca_col = 3 + i * 2
inter_col = 4 + i * 2
if marca_col < len(row) and row[marca_col]:
brand = str(row[marca_col]).strip()
pn = str(row[inter_col]).strip() if inter_col < len(row) and row[inter_col] else ''
if brand and pn:
interchanges.append((brand, pn))
return interchanges
def main():
print(f"[{datetime.now().isoformat()}] Starting KNADIAN import...")
if not os.path.exists(EXCEL_PATH):
print(f"ERROR: Excel not found at {EXCEL_PATH}")
sys.exit(1)
print(f"Loading {EXCEL_PATH}...")
wb = load_workbook(EXCEL_PATH, read_only=True, data_only=True)
master_conn = connect_master()
master_cur = master_conn.cursor()
upsert_catalog_sql = """
INSERT INTO supplier_catalog (supplier_name, sku, name, category, is_active)
VALUES (%s, %s, %s, %s, true)
ON CONFLICT (supplier_name, sku, category) DO UPDATE SET
name = EXCLUDED.name,
category = EXCLUDED.category,
is_active = true
RETURNING id
"""
insert_compat_sql = """
INSERT INTO supplier_catalog_compat
(catalog_id, make, model, year, engine, model_year_engine_id, source)
VALUES (%s, %s, %s, %s, %s, NULL, %s)
ON CONFLICT (catalog_id, make, model, year, engine) DO NOTHING
"""
insert_interchange_sql = """
INSERT INTO supplier_catalog_interchange (catalog_id, brand, part_number)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
"""
stats = defaultdict(int)
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
rows = list(ws.iter_rows(values_only=True))
if not rows:
continue
data_rows = rows[1:]
stats['sheets'] += 1
print(f"\nProcessing sheet '{sheet_name}' with {len(data_rows)} rows...")
catalog_id_cache = {}
for idx, row in enumerate(data_rows):
if idx % 2000 == 0 and idx > 0:
print(f" ...{idx} rows processed")
if not row or len(row) < 3 or not row[2]:
stats['skipped_no_sku'] += 1
continue
make_col = str(row[0]).strip().upper() if row[0] else ''
model_col = str(row[1]).strip() if row[1] else ''
sku = str(row[2]).strip()
name = normalize_name(row[15]) if len(row) > 15 and row[15] else sheet_name
carro = str(row[16]).strip() if len(row) > 16 and row[16] else ''
if not sku:
stats['skipped_no_sku'] += 1
continue
# Always try to parse year from CARRO_PERTENECIENTE
parsed = parse_carro(carro)
years = parsed['years']
# Prefer explicit make/model columns; fallback to parsed carro
if make_col:
make = make_col
else:
make = parsed['make']
if model_col:
model = model_col
else:
model = parsed['model']
# If year still missing, maybe the model column itself contains a year
if years == [None] and model_col:
years, _ = extract_years(model_col)
if not make or not model:
stats['skipped_no_vehicle'] += 1
continue
# Filter out future years and de-duplicate
filtered_years = []
for y in years:
if y is None:
if None not in filtered_years:
filtered_years.append(None)
elif y <= MAX_IMPORT_YEAR:
if y not in filtered_years:
filtered_years.append(y)
years = filtered_years if filtered_years else [None]
stats['rows'] += 1
# Upsert catalog item (keyed by sku)
cache_key = sku
catalog_id = catalog_id_cache.get(cache_key)
if catalog_id is None:
master_cur.execute(upsert_catalog_sql, (SUPPLIER_NAME, sku, name, sheet_name))
row_result = master_cur.fetchone()
catalog_id = row_result[0] if row_result else None
catalog_id_cache[cache_key] = catalog_id
stats['catalog_items'] += 1
if catalog_id is None:
stats['skipped_no_catalog'] += 1
continue
engine = extract_engine(name)
for year in years:
master_cur.execute(insert_compat_sql, (
catalog_id,
make,
model,
year,
engine,
'import_text',
))
stats['compat_rows'] += 1
interchanges = extract_interchanges(row)
for brand, pn in interchanges:
master_cur.execute(insert_interchange_sql, (catalog_id, brand, pn))
stats['interchange_rows'] += 1
master_conn.commit()
print(f" Sheet '{sheet_name}' committed.")
print(f"\n{'='*60}")
print("IMPORT COMPLETE")
print(f"{'='*60}")
for k, v in sorted(stats.items()):
print(f"{k:25s}: {v}")
master_cur.close()
master_conn.close()
if __name__ == '__main__':
main()