feat: migrate to PostgreSQL + SQLAlchemy ORM, rebrand to Nexus Autoparts

- Migrate from SQLite to PostgreSQL with normalized schema
- Add 11 lookup tables (fuel_type, body_type, drivetrain, transmission,
  materials, position_part, manufacture_type, quality_tier, countries,
  reference_type, shapes)
- Rewrite dashboard/server.py (76 routes) using SQLAlchemy text() queries
- Rewrite console/db.py (27 methods) using SQLAlchemy ORM
- Add models.py with 27 SQLAlchemy model definitions
- Add config.py for centralized DB_URL configuration
- Add migrate_to_postgres.py migration script
- Add docs/METABASE_GUIDE.md with complete data entry guide
- Rebrand from "AUTOPARTS DB" to "NEXUS AUTOPARTS"
- Fill vehicle data gaps via NHTSA API + heuristics:
  engines (cylinders, power, torque), brands (country, founded_year),
  models (body_type, production years), MYE (drivetrain, transmission, trim)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 05:24:47 +00:00
parent 7ecf1295a5
commit 7b2a904498
41 changed files with 3519 additions and 3489 deletions

470
migrate_to_postgres.py Normal file
View File

@@ -0,0 +1,470 @@
#!/usr/bin/env python3
"""
Migrate data from SQLite (vehicle_database.db) to PostgreSQL (nexus_autoparts).
Usage:
python3 migrate_to_postgres.py
"""
import sqlite3
import sys
import time
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from config import DB_URL, SQLITE_PATH
from models import (
Base, SEARCH_VECTOR_TRIGGER_SQL,
FuelType, BodyType, Drivetrain, Transmission, Material,
PositionPart, ManufactureType, QualityTier, Country,
ReferenceType, Shape,
Brand, Year, Engine, Model, ModelYearEngine,
PartCategory, PartGroup, Part, VehiclePart,
Manufacturer, AftermarketPart, PartCrossReference,
Diagram, VehicleDiagram, DiagramHotspot, VinCache,
)
BATCH_SIZE = 5000
def log(msg):
print(f" {msg}")
def connect_sqlite():
conn = sqlite3.connect(SQLITE_PATH)
conn.row_factory = sqlite3.Row
return conn
def populate_lookup(pg, sqlite_conn, LookupClass, pk_col, name_col,
sql_query):
"""Extract distinct values from SQLite and insert into a lookup table.
Returns a dict mapping text value → new PK id.
"""
rows = sqlite_conn.execute(sql_query).fetchall()
values = sorted(set(r[0] for r in rows if r[0]))
mapping = {}
for i, val in enumerate(values, start=1):
obj = LookupClass(**{pk_col: i, name_col: val})
pg.add(obj)
mapping[val] = i
pg.flush()
log(f" {LookupClass.__tablename__}: {len(mapping)} values")
return mapping
def migrate_table(pg, sqlite_conn, query, build_obj_fn, label, batch=BATCH_SIZE):
"""Generic batch-migrate helper."""
rows = sqlite_conn.execute(query).fetchall()
total = len(rows)
count = 0
for i in range(0, total, batch):
chunk = rows[i:i + batch]
for row in chunk:
obj = build_obj_fn(row)
if obj is not None:
pg.add(obj)
pg.flush()
count += len(chunk)
if count % 50000 == 0 or count == total:
log(f" {label}: {count}/{total}")
return total
def main():
print("=" * 60)
print(" NEXUS AUTOPARTS — SQLite → PostgreSQL Migration")
print("=" * 60)
t0 = time.time()
# Connect
print("\n[1] Connecting...")
sqlite_conn = connect_sqlite()
engine = create_engine(DB_URL, echo=False)
Session = sessionmaker(bind=engine)
# Drop & recreate all tables
print("\n[2] Creating schema...")
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
log("All tables created")
pg = Session()
# ── Lookup tables ──────────────────────────────────────
print("\n[3] Populating lookup tables...")
fuel_map = populate_lookup(
pg, sqlite_conn, FuelType, "id_fuel", "name_fuel",
"SELECT DISTINCT fuel_type FROM engines WHERE fuel_type IS NOT NULL")
body_map = populate_lookup(
pg, sqlite_conn, BodyType, "id_body", "name_body",
"SELECT DISTINCT body_type FROM models WHERE body_type IS NOT NULL")
drive_map = populate_lookup(
pg, sqlite_conn, Drivetrain, "id_drivetrain", "name_drivetrain",
"SELECT DISTINCT drivetrain FROM model_year_engine WHERE drivetrain IS NOT NULL")
trans_map = populate_lookup(
pg, sqlite_conn, Transmission, "id_transmission", "name_transmission",
"SELECT DISTINCT transmission FROM model_year_engine WHERE transmission IS NOT NULL")
mat_map = populate_lookup(
pg, sqlite_conn, Material, "id_material", "name_material",
"SELECT DISTINCT material FROM parts WHERE material IS NOT NULL")
pos_map = populate_lookup(
pg, sqlite_conn, PositionPart, "id_position_part", "name_position_part",
"SELECT DISTINCT position FROM vehicle_parts WHERE position IS NOT NULL")
mtype_map = populate_lookup(
pg, sqlite_conn, ManufactureType, "id_type_manu", "name_type_manu",
"SELECT DISTINCT type FROM manufacturers WHERE type IS NOT NULL")
qtier_map = populate_lookup(
pg, sqlite_conn, QualityTier, "id_quality_tier", "name_quality",
"""SELECT DISTINCT quality_tier FROM (
SELECT quality_tier FROM manufacturers WHERE quality_tier IS NOT NULL
UNION
SELECT quality_tier FROM aftermarket_parts WHERE quality_tier IS NOT NULL
)""")
country_map = populate_lookup(
pg, sqlite_conn, Country, "id_country", "name_country",
"SELECT DISTINCT country FROM manufacturers WHERE country IS NOT NULL")
reftype_map = populate_lookup(
pg, sqlite_conn, ReferenceType, "id_ref_type", "name_ref_type",
"SELECT DISTINCT reference_type FROM part_cross_references WHERE reference_type IS NOT NULL")
shape_map = populate_lookup(
pg, sqlite_conn, Shape, "id_shape", "name_shape",
"SELECT DISTINCT shape FROM diagram_hotspots WHERE shape IS NOT NULL")
pg.commit()
# ── Core tables ────────────────────────────────────────
print("\n[4] Migrating core tables...")
# brands
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM brands ORDER BY id",
lambda r: Brand(
id_brand=r["id"], name_brand=r["name"],
country=r["country"], founded_year=r["founded_year"],
created_at=r["created_at"]),
"brands")
pg.commit()
log(f"brands: {n} rows")
# years
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM years ORDER BY id",
lambda r: Year(
id_year=r["id"], year_car=r["year"],
created_at=r["created_at"]),
"years")
pg.commit()
log(f"years: {n} rows")
# engines
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM engines ORDER BY id",
lambda r: Engine(
id_engine=r["id"], name_engine=r["name"],
displacement_cc=r["displacement_cc"], cylinders=r["cylinders"],
id_fuel=fuel_map.get(r["fuel_type"]),
power_hp=r["power_hp"], torque_nm=r["torque_nm"],
engine_code=r["engine_code"], created_at=r["created_at"]),
"engines")
pg.commit()
log(f"engines: {n} rows")
# models
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM models ORDER BY id",
lambda r: Model(
id_model=r["id"], brand_id=r["brand_id"],
name_model=r["name"],
id_body=body_map.get(r["body_type"]),
generation=r["generation"],
production_start_year=r["production_start_year"],
production_end_year=r["production_end_year"],
created_at=r["created_at"]),
"models")
pg.commit()
log(f"models: {n} rows")
# model_year_engine
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM model_year_engine ORDER BY id",
lambda r: ModelYearEngine(
id_mye=r["id"], model_id=r["model_id"],
year_id=r["year_id"], engine_id=r["engine_id"],
trim_level=r["trim_level"],
id_drivetrain=drive_map.get(r["drivetrain"]),
id_transmission=trans_map.get(r["transmission"]),
created_at=r["created_at"]),
"model_year_engine")
pg.commit()
log(f"model_year_engine: {n} rows")
# part_categories
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM part_categories ORDER BY id",
lambda r: PartCategory(
id_part_category=r["id"],
name_part_category=r["name"], name_es=r["name_es"],
parent_id=r["parent_id"], slug=r["slug"],
icon_name=r["icon_name"], display_order=r["display_order"],
created_at=r["created_at"]),
"part_categories")
pg.commit()
log(f"part_categories: {n} rows")
# part_groups
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM part_groups ORDER BY id",
lambda r: PartGroup(
id_part_group=r["id"], category_id=r["category_id"],
name_part_group=r["name"], name_es=r["name_es"],
slug=r["slug"], display_order=r["display_order"],
created_at=r["created_at"]),
"part_groups")
pg.commit()
log(f"part_groups: {n} rows")
# parts (without search_vector — trigger will fill it)
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM parts ORDER BY id",
lambda r: Part(
id_part=r["id"], oem_part_number=r["oem_part_number"],
name_part=r["name"], name_es=r["name_es"],
group_id=r["group_id"],
description=r["description"], description_es=r["description_es"],
weight_kg=r["weight_kg"],
id_material=mat_map.get(r["material"]),
created_at=r["created_at"]),
"parts")
pg.commit()
log(f"parts: {n} rows")
# vehicle_parts
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM vehicle_parts ORDER BY id",
lambda r: VehiclePart(
id_vehicle_part=r["id"],
model_year_engine_id=r["model_year_engine_id"],
part_id=r["part_id"],
quantity_required=r["quantity_required"],
id_position_part=pos_map.get(r["position"]),
fitment_notes=r["fitment_notes"],
created_at=r["created_at"]),
"vehicle_parts", batch=10000)
pg.commit()
log(f"vehicle_parts: {n} rows")
# manufacturers
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM manufacturers ORDER BY id",
lambda r: Manufacturer(
id_manufacture=r["id"], name_manufacture=r["name"],
id_type_manu=mtype_map.get(r["type"]),
id_quality_tier=qtier_map.get(r["quality_tier"]),
id_country=country_map.get(r["country"]),
logo_url=r["logo_url"], website=r["website"],
created_at=r["created_at"]),
"manufacturers")
pg.commit()
log(f"manufacturers: {n} rows")
# aftermarket_parts (skip orphans with missing oem_part_id)
valid_part_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM parts").fetchall())
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM aftermarket_parts ORDER BY id",
lambda r: AftermarketPart(
id_aftermarket_parts=r["id"],
oem_part_id=r["oem_part_id"],
manufacturer_id=r["manufacturer_id"],
part_number=r["part_number"],
name_aftermarket_parts=r["name"], name_es=r["name_es"],
id_quality_tier=qtier_map.get(r["quality_tier"]),
price_usd=r["price_usd"],
warranty_months=r["warranty_months"],
created_at=r["created_at"]) if r["oem_part_id"] in valid_part_ids else None,
"aftermarket_parts")
pg.commit()
log(f"aftermarket_parts: {n} rows")
# part_cross_references
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM part_cross_references ORDER BY id",
lambda r: PartCrossReference(
id_part_cross_ref=r["id"], part_id=r["part_id"],
cross_reference_number=r["cross_reference_number"],
id_ref_type=reftype_map.get(r["reference_type"]),
source_ref=r["source"], notes=r["notes"],
created_at=r["created_at"]),
"part_cross_references")
pg.commit()
log(f"part_cross_references: {n} rows")
# diagrams
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM diagrams ORDER BY id",
lambda r: Diagram(
id_diagram=r["id"], name_diagram=r["name"],
name_es=r["name_es"], group_id=r["group_id"],
image_path=r["image_path"],
thumbnail_path=r["thumbnail_path"],
display_order=r["display_order"],
source_diagram=r["source"],
created_at=r["created_at"]),
"diagrams")
pg.commit()
log(f"diagrams: {n} rows")
# vehicle_diagrams (skip orphans with missing diagram_id)
valid_diagram_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM diagrams").fetchall())
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM vehicle_diagrams ORDER BY id",
lambda r: VehicleDiagram(
id_vehicle_dgr=r["id"], diagram_id=r["diagram_id"],
model_year_engine_id=r["model_year_engine_id"],
notes=r["notes"], created_at=r["created_at"])
if r["diagram_id"] in valid_diagram_ids else None,
"vehicle_diagrams")
pg.commit()
log(f"vehicle_diagrams: {n} rows")
# diagram_hotspots
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM diagram_hotspots ORDER BY id",
lambda r: DiagramHotspot(
id_dgr_hotspot=r["id"], diagram_id=r["diagram_id"],
part_id=r["part_id"], callout_number=r["callout_number"],
id_shape=shape_map.get(r["shape"]),
coords=r["coords"], created_at=r["created_at"]),
"diagram_hotspots")
pg.commit()
log(f"diagram_hotspots: {n} rows")
# vin_cache
import json
n = migrate_table(pg, sqlite_conn,
"SELECT * FROM vin_cache ORDER BY id",
lambda r: VinCache(
id=r["id"], vin=r["vin"],
decoded_data=json.loads(r["decoded_data"]) if r["decoded_data"] else {},
make=r["make"], model=r["model"], year=r["year"],
engine_info=r["engine_info"], body_class=r["body_class"],
drive_type=r["drive_type"],
model_year_engine_id=r["model_year_engine_id"],
created_at=r["created_at"], expires_at=r["expires_at"]),
"vin_cache")
pg.commit()
log(f"vin_cache: {n} rows")
# ── Reset sequences ───────────────────────────────────
print("\n[5] Resetting sequences...")
seq_tables = [
("brands", "id_brand"),
("years", "id_year"),
("engines", "id_engine"),
("models", "id_model"),
("model_year_engine", "id_mye"),
("part_categories", "id_part_category"),
("part_groups", "id_part_group"),
("parts", "id_part"),
("vehicle_parts", "id_vehicle_part"),
("manufacturers", "id_manufacture"),
("aftermarket_parts", "id_aftermarket_parts"),
("part_cross_references", "id_part_cross_ref"),
("diagrams", "id_diagram"),
("vehicle_diagrams", "id_vehicle_dgr"),
("diagram_hotspots", "id_dgr_hotspot"),
("vin_cache", "id"),
("fuel_type", "id_fuel"),
("body_type", "id_body"),
("drivetrain", "id_drivetrain"),
("transmission", "id_transmission"),
("materials", "id_material"),
("position_part", "id_position_part"),
("manufacture_type", "id_type_manu"),
("quality_tier", "id_quality_tier"),
("countries", "id_country"),
("reference_type", "id_ref_type"),
("shapes", "id_shape"),
]
with engine.connect() as conn:
for table, pk in seq_tables:
conn.execute(text(
f"SELECT setval(pg_get_serial_sequence('{table}', '{pk}'), "
f"COALESCE((SELECT MAX({pk}) FROM {table}), 0) + 1, false)"
))
conn.commit()
log("All sequences reset")
# ── Full-text search trigger ──────────────────────────
print("\n[6] Creating search trigger & updating vectors...")
with engine.connect() as conn:
conn.execute(text(SEARCH_VECTOR_TRIGGER_SQL))
conn.commit()
# Backfill search_vector for existing rows
conn.execute(text("""
UPDATE parts SET search_vector = to_tsvector('spanish',
coalesce(oem_part_number, '') || ' ' ||
coalesce(name_part, '') || ' ' ||
coalesce(name_es, '') || ' ' ||
coalesce(description, ''))
"""))
conn.commit()
log("Search vectors populated")
# ── Verify counts ─────────────────────────────────────
print("\n[7] Verifying row counts...")
sqlite_tables = [
"brands", "models", "years", "engines", "model_year_engine",
"part_categories", "part_groups", "parts", "vehicle_parts",
"manufacturers", "aftermarket_parts", "part_cross_references",
"diagrams", "vehicle_diagrams", "diagram_hotspots", "vin_cache"
]
pg_tables = [
("brands", "id_brand"), ("models", "id_model"),
("years", "id_year"), ("engines", "id_engine"),
("model_year_engine", "id_mye"),
("part_categories", "id_part_category"),
("part_groups", "id_part_group"), ("parts", "id_part"),
("vehicle_parts", "id_vehicle_part"),
("manufacturers", "id_manufacture"),
("aftermarket_parts", "id_aftermarket_parts"),
("part_cross_references", "id_part_cross_ref"),
("diagrams", "id_diagram"),
("vehicle_diagrams", "id_vehicle_dgr"),
("diagram_hotspots", "id_dgr_hotspot"),
("vin_cache", "id"),
]
ok = True
with engine.connect() as conn:
for st, (pt, pk) in zip(sqlite_tables, pg_tables):
s_count = sqlite_conn.execute(f"SELECT COUNT(*) FROM {st}").fetchone()[0]
p_count = conn.execute(text(f"SELECT COUNT(*) FROM {pt}")).scalar()
status = "OK" if s_count == p_count else "MISMATCH"
if status == "MISMATCH":
ok = False
log(f" {st}: SQLite={s_count} PG={p_count} [{status}]")
sqlite_conn.close()
elapsed = time.time() - t0
print("\n" + "=" * 60)
if ok:
print(f" Migration completed successfully in {elapsed:.1f}s")
else:
print(f" Migration completed with MISMATCHES in {elapsed:.1f}s")
print("=" * 60)
if __name__ == "__main__":
main()