feat: add aftermarket migration script — move AFT- parts to proper table
Migrates 357K AFT-prefixed parts from parts table to aftermarket_parts.
Parses part_number and manufacturer from AFT-{partNo}-{manufacturer} format.
Links to OEM parts via cross-references. Batch processing with progress.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
220
scripts/migrate_aftermarket.py
Executable file
220
scripts/migrate_aftermarket.py
Executable file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Migrate AFT- prefixed parts from `parts` table to `aftermarket_parts` table.
|
||||
|
||||
Parts with oem_part_number like 'AFT-{partNumber}-{manufacturerName}' are
|
||||
aftermarket parts stored incorrectly in the parts table. This script parses
|
||||
them, links to OEM parts via cross-references, inserts into aftermarket_parts,
|
||||
and deletes the originals from parts (CASCADE cleans vehicle_parts & cross_refs).
|
||||
|
||||
Usage:
|
||||
python3 scripts/migrate_aftermarket.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
DB_DSN = "postgresql://nexus:nexus_autoparts_2026@localhost/nexus_autoparts"
|
||||
BATCH_SIZE = 5000
|
||||
|
||||
|
||||
def load_manufacturers(cur):
|
||||
"""Load all manufacturer names and ids into a dict {lowercase_name: id}."""
|
||||
cur.execute("SELECT id_manufacture, name_manufacture FROM manufacturers")
|
||||
mfrs = {}
|
||||
for row in cur.fetchall():
|
||||
mfrs[row[1].lower()] = row[0]
|
||||
print(f"Loaded {len(mfrs)} known manufacturers", flush=True)
|
||||
return mfrs
|
||||
|
||||
|
||||
def parse_aft_part(oem_part_number, known_manufacturers):
|
||||
"""
|
||||
Parse 'AFT-{partNumber}-{manufacturerName}' into (part_number, manufacturer_name).
|
||||
|
||||
The manufacturer is the longest right-side suffix (after a '-') that matches
|
||||
a known manufacturer. Fallback: last segment is the manufacturer.
|
||||
|
||||
Examples:
|
||||
AFT-AC191-PARTQUIP -> ('AC191', 'PARTQUIP')
|
||||
AFT-10-0058-Airstal -> ('10-0058', 'Airstal')
|
||||
AFT-A-123-Some-Brand -> ('A-123', 'Some-Brand') if 'some-brand' is known
|
||||
"""
|
||||
without_prefix = oem_part_number[4:] # remove 'AFT-'
|
||||
segments = without_prefix.split('-')
|
||||
|
||||
if len(segments) < 2:
|
||||
# Can't split — treat entire thing as part number, no manufacturer
|
||||
return without_prefix, None
|
||||
|
||||
# Try increasingly longer suffixes from the right to find a known manufacturer
|
||||
# e.g. for segments [A, B, C, D], try "D", "C-D", "B-C-D"
|
||||
# (don't try the full string — need at least one segment for part_number)
|
||||
for i in range(len(segments) - 1, 0, -1):
|
||||
candidate = '-'.join(segments[i:])
|
||||
if candidate.lower() in known_manufacturers:
|
||||
part_number = '-'.join(segments[:i])
|
||||
return part_number, candidate
|
||||
|
||||
# Fallback: last segment is manufacturer
|
||||
manufacturer_name = segments[-1]
|
||||
part_number = '-'.join(segments[:-1])
|
||||
return part_number, manufacturer_name
|
||||
|
||||
|
||||
def main():
|
||||
conn = psycopg2.connect(DB_DSN)
|
||||
conn.autocommit = False
|
||||
cur = conn.cursor()
|
||||
|
||||
# Step 1: Load known manufacturers
|
||||
known_manufacturers = load_manufacturers(cur)
|
||||
|
||||
# Step 2: Count AFT- parts
|
||||
cur.execute("SELECT COUNT(*) FROM parts WHERE oem_part_number LIKE 'AFT-%%'")
|
||||
total = cur.fetchone()[0]
|
||||
print(f"Found {total:,} AFT- parts to migrate", flush=True)
|
||||
|
||||
if total == 0:
|
||||
print("Nothing to do.")
|
||||
cur.close()
|
||||
conn.close()
|
||||
return
|
||||
|
||||
# Load all AFT- parts into memory (357K rows is manageable)
|
||||
print("Loading all AFT- parts into memory...", flush=True)
|
||||
cur.execute(
|
||||
"SELECT id_part, oem_part_number, name_part, description, group_id "
|
||||
"FROM parts WHERE oem_part_number LIKE 'AFT-%%' "
|
||||
"ORDER BY id_part"
|
||||
)
|
||||
all_aft_parts = cur.fetchall()
|
||||
print(f" Loaded {len(all_aft_parts):,} rows", flush=True)
|
||||
|
||||
migrated = 0
|
||||
skipped_no_mfr = 0
|
||||
skipped_no_oem = 0
|
||||
inserted = 0
|
||||
batch_num = 0
|
||||
new_manufacturers = 0
|
||||
|
||||
# Process in batches
|
||||
for i in range(0, len(all_aft_parts), BATCH_SIZE):
|
||||
batch = all_aft_parts[i:i + BATCH_SIZE]
|
||||
stats = process_batch(conn, cur, batch, known_manufacturers)
|
||||
inserted += stats['inserted']
|
||||
skipped_no_mfr += stats['skipped_no_mfr']
|
||||
skipped_no_oem += stats['skipped_no_oem']
|
||||
new_manufacturers += stats['new_manufacturers']
|
||||
migrated += len(batch)
|
||||
batch_num += 1
|
||||
print(f" Batch {batch_num}: processed {migrated:,}/{total:,} "
|
||||
f"(inserted={inserted:,}, no_oem={skipped_no_oem:,}, "
|
||||
f"no_mfr={skipped_no_mfr:,})", flush=True)
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
print(f"\n=== Migration Complete ===")
|
||||
print(f" Total AFT- parts processed: {migrated:,}")
|
||||
print(f" Inserted into aftermarket_parts: {inserted:,}")
|
||||
print(f" Skipped (no manufacturer parsed): {skipped_no_mfr:,}")
|
||||
print(f" Skipped (no OEM part found): {skipped_no_oem:,}")
|
||||
print(f" New manufacturers created: {new_manufacturers:,}")
|
||||
|
||||
|
||||
def process_batch(conn, cur, batch, known_manufacturers):
|
||||
"""Process a batch of AFT- parts. Returns stats dict."""
|
||||
stats = {'inserted': 0, 'skipped_no_mfr': 0, 'skipped_no_oem': 0, 'new_manufacturers': 0}
|
||||
|
||||
parts_to_delete = []
|
||||
aftermarket_rows = [] # (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts)
|
||||
|
||||
for id_part, oem_part_number, name_part, description, group_id in batch:
|
||||
part_number, manufacturer_name = parse_aft_part(oem_part_number, known_manufacturers)
|
||||
|
||||
if not manufacturer_name:
|
||||
stats['skipped_no_mfr'] += 1
|
||||
# Still delete the malformed AFT- part from parts table
|
||||
parts_to_delete.append(id_part)
|
||||
continue
|
||||
|
||||
# Ensure manufacturer exists
|
||||
mfr_key = manufacturer_name.lower()
|
||||
if mfr_key not in known_manufacturers:
|
||||
cur.execute(
|
||||
"INSERT INTO manufacturers (name_manufacture) VALUES (%s) "
|
||||
"ON CONFLICT (name_manufacture) DO NOTHING "
|
||||
"RETURNING id_manufacture",
|
||||
(manufacturer_name,)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row:
|
||||
known_manufacturers[mfr_key] = row[0]
|
||||
stats['new_manufacturers'] += 1
|
||||
else:
|
||||
# Was inserted by a concurrent process; fetch it
|
||||
cur.execute(
|
||||
"SELECT id_manufacture FROM manufacturers WHERE name_manufacture = %s",
|
||||
(manufacturer_name,)
|
||||
)
|
||||
known_manufacturers[mfr_key] = cur.fetchone()[0]
|
||||
|
||||
manufacturer_id = known_manufacturers[mfr_key]
|
||||
|
||||
# Find the OEM part via cross-references
|
||||
cur.execute(
|
||||
"SELECT part_id FROM part_cross_references "
|
||||
"WHERE cross_reference_number = %s AND source_ref = %s "
|
||||
"LIMIT 1",
|
||||
(part_number, manufacturer_name)
|
||||
)
|
||||
xref_row = cur.fetchone()
|
||||
|
||||
if not xref_row:
|
||||
stats['skipped_no_oem'] += 1
|
||||
# Still delete — it doesn't belong in parts table
|
||||
parts_to_delete.append(id_part)
|
||||
continue
|
||||
|
||||
oem_part_id = xref_row[0]
|
||||
aftermarket_rows.append((oem_part_id, manufacturer_id, part_number, name_part))
|
||||
parts_to_delete.append(id_part)
|
||||
|
||||
# Batch insert into aftermarket_parts
|
||||
if aftermarket_rows:
|
||||
execute_values(
|
||||
cur,
|
||||
"INSERT INTO aftermarket_parts "
|
||||
"(oem_part_id, manufacturer_id, part_number, name_aftermarket_parts) "
|
||||
"VALUES %s ON CONFLICT DO NOTHING",
|
||||
aftermarket_rows,
|
||||
page_size=1000
|
||||
)
|
||||
stats['inserted'] = len(aftermarket_rows)
|
||||
|
||||
# Delete dependent rows first (FK without CASCADE)
|
||||
if parts_to_delete:
|
||||
cur.execute(
|
||||
"DELETE FROM vehicle_parts WHERE part_id = ANY(%s)",
|
||||
(parts_to_delete,)
|
||||
)
|
||||
cur.execute(
|
||||
"DELETE FROM part_cross_references WHERE part_id = ANY(%s)",
|
||||
(parts_to_delete,)
|
||||
)
|
||||
cur.execute(
|
||||
"DELETE FROM parts WHERE id_part = ANY(%s)",
|
||||
(parts_to_delete,)
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
return stats
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
t0 = time.time()
|
||||
main()
|
||||
elapsed = time.time() - t0
|
||||
print(f"Elapsed: {elapsed:.1f}s")
|
||||
Reference in New Issue
Block a user