- Migrate from SQLite to PostgreSQL with normalized schema - Add 11 lookup tables (fuel_type, body_type, drivetrain, transmission, materials, position_part, manufacture_type, quality_tier, countries, reference_type, shapes) - Rewrite dashboard/server.py (76 routes) using SQLAlchemy text() queries - Rewrite console/db.py (27 methods) using SQLAlchemy ORM - Add models.py with 27 SQLAlchemy model definitions - Add config.py for centralized DB_URL configuration - Add migrate_to_postgres.py migration script - Add docs/METABASE_GUIDE.md with complete data entry guide - Rebrand from "AUTOPARTS DB" to "NEXUS AUTOPARTS" - Fill vehicle data gaps via NHTSA API + heuristics: engines (cylinders, power, torque), brands (country, founded_year), models (body_type, production years), MYE (drivetrain, transmission, trim) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
471 lines
18 KiB
Python
471 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Migrate data from SQLite (vehicle_database.db) to PostgreSQL (nexus_autoparts).
|
|
|
|
Usage:
|
|
python3 migrate_to_postgres.py
|
|
"""
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
from config import DB_URL, SQLITE_PATH
|
|
from models import (
|
|
Base, SEARCH_VECTOR_TRIGGER_SQL,
|
|
FuelType, BodyType, Drivetrain, Transmission, Material,
|
|
PositionPart, ManufactureType, QualityTier, Country,
|
|
ReferenceType, Shape,
|
|
Brand, Year, Engine, Model, ModelYearEngine,
|
|
PartCategory, PartGroup, Part, VehiclePart,
|
|
Manufacturer, AftermarketPart, PartCrossReference,
|
|
Diagram, VehicleDiagram, DiagramHotspot, VinCache,
|
|
)
|
|
|
|
BATCH_SIZE = 5000
|
|
|
|
|
|
def log(msg):
|
|
print(f" {msg}")
|
|
|
|
|
|
def connect_sqlite():
|
|
conn = sqlite3.connect(SQLITE_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def populate_lookup(pg, sqlite_conn, LookupClass, pk_col, name_col,
|
|
sql_query):
|
|
"""Extract distinct values from SQLite and insert into a lookup table.
|
|
Returns a dict mapping text value → new PK id.
|
|
"""
|
|
rows = sqlite_conn.execute(sql_query).fetchall()
|
|
values = sorted(set(r[0] for r in rows if r[0]))
|
|
mapping = {}
|
|
for i, val in enumerate(values, start=1):
|
|
obj = LookupClass(**{pk_col: i, name_col: val})
|
|
pg.add(obj)
|
|
mapping[val] = i
|
|
pg.flush()
|
|
log(f" {LookupClass.__tablename__}: {len(mapping)} values")
|
|
return mapping
|
|
|
|
|
|
def migrate_table(pg, sqlite_conn, query, build_obj_fn, label, batch=BATCH_SIZE):
|
|
"""Generic batch-migrate helper."""
|
|
rows = sqlite_conn.execute(query).fetchall()
|
|
total = len(rows)
|
|
count = 0
|
|
for i in range(0, total, batch):
|
|
chunk = rows[i:i + batch]
|
|
for row in chunk:
|
|
obj = build_obj_fn(row)
|
|
if obj is not None:
|
|
pg.add(obj)
|
|
pg.flush()
|
|
count += len(chunk)
|
|
if count % 50000 == 0 or count == total:
|
|
log(f" {label}: {count}/{total}")
|
|
return total
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print(" NEXUS AUTOPARTS — SQLite → PostgreSQL Migration")
|
|
print("=" * 60)
|
|
t0 = time.time()
|
|
|
|
# Connect
|
|
print("\n[1] Connecting...")
|
|
sqlite_conn = connect_sqlite()
|
|
engine = create_engine(DB_URL, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
# Drop & recreate all tables
|
|
print("\n[2] Creating schema...")
|
|
Base.metadata.drop_all(engine)
|
|
Base.metadata.create_all(engine)
|
|
log("All tables created")
|
|
|
|
pg = Session()
|
|
|
|
# ── Lookup tables ──────────────────────────────────────
|
|
print("\n[3] Populating lookup tables...")
|
|
|
|
fuel_map = populate_lookup(
|
|
pg, sqlite_conn, FuelType, "id_fuel", "name_fuel",
|
|
"SELECT DISTINCT fuel_type FROM engines WHERE fuel_type IS NOT NULL")
|
|
|
|
body_map = populate_lookup(
|
|
pg, sqlite_conn, BodyType, "id_body", "name_body",
|
|
"SELECT DISTINCT body_type FROM models WHERE body_type IS NOT NULL")
|
|
|
|
drive_map = populate_lookup(
|
|
pg, sqlite_conn, Drivetrain, "id_drivetrain", "name_drivetrain",
|
|
"SELECT DISTINCT drivetrain FROM model_year_engine WHERE drivetrain IS NOT NULL")
|
|
|
|
trans_map = populate_lookup(
|
|
pg, sqlite_conn, Transmission, "id_transmission", "name_transmission",
|
|
"SELECT DISTINCT transmission FROM model_year_engine WHERE transmission IS NOT NULL")
|
|
|
|
mat_map = populate_lookup(
|
|
pg, sqlite_conn, Material, "id_material", "name_material",
|
|
"SELECT DISTINCT material FROM parts WHERE material IS NOT NULL")
|
|
|
|
pos_map = populate_lookup(
|
|
pg, sqlite_conn, PositionPart, "id_position_part", "name_position_part",
|
|
"SELECT DISTINCT position FROM vehicle_parts WHERE position IS NOT NULL")
|
|
|
|
mtype_map = populate_lookup(
|
|
pg, sqlite_conn, ManufactureType, "id_type_manu", "name_type_manu",
|
|
"SELECT DISTINCT type FROM manufacturers WHERE type IS NOT NULL")
|
|
|
|
qtier_map = populate_lookup(
|
|
pg, sqlite_conn, QualityTier, "id_quality_tier", "name_quality",
|
|
"""SELECT DISTINCT quality_tier FROM (
|
|
SELECT quality_tier FROM manufacturers WHERE quality_tier IS NOT NULL
|
|
UNION
|
|
SELECT quality_tier FROM aftermarket_parts WHERE quality_tier IS NOT NULL
|
|
)""")
|
|
|
|
country_map = populate_lookup(
|
|
pg, sqlite_conn, Country, "id_country", "name_country",
|
|
"SELECT DISTINCT country FROM manufacturers WHERE country IS NOT NULL")
|
|
|
|
reftype_map = populate_lookup(
|
|
pg, sqlite_conn, ReferenceType, "id_ref_type", "name_ref_type",
|
|
"SELECT DISTINCT reference_type FROM part_cross_references WHERE reference_type IS NOT NULL")
|
|
|
|
shape_map = populate_lookup(
|
|
pg, sqlite_conn, Shape, "id_shape", "name_shape",
|
|
"SELECT DISTINCT shape FROM diagram_hotspots WHERE shape IS NOT NULL")
|
|
|
|
pg.commit()
|
|
|
|
# ── Core tables ────────────────────────────────────────
|
|
print("\n[4] Migrating core tables...")
|
|
|
|
# brands
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM brands ORDER BY id",
|
|
lambda r: Brand(
|
|
id_brand=r["id"], name_brand=r["name"],
|
|
country=r["country"], founded_year=r["founded_year"],
|
|
created_at=r["created_at"]),
|
|
"brands")
|
|
pg.commit()
|
|
log(f"brands: {n} rows")
|
|
|
|
# years
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM years ORDER BY id",
|
|
lambda r: Year(
|
|
id_year=r["id"], year_car=r["year"],
|
|
created_at=r["created_at"]),
|
|
"years")
|
|
pg.commit()
|
|
log(f"years: {n} rows")
|
|
|
|
# engines
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM engines ORDER BY id",
|
|
lambda r: Engine(
|
|
id_engine=r["id"], name_engine=r["name"],
|
|
displacement_cc=r["displacement_cc"], cylinders=r["cylinders"],
|
|
id_fuel=fuel_map.get(r["fuel_type"]),
|
|
power_hp=r["power_hp"], torque_nm=r["torque_nm"],
|
|
engine_code=r["engine_code"], created_at=r["created_at"]),
|
|
"engines")
|
|
pg.commit()
|
|
log(f"engines: {n} rows")
|
|
|
|
# models
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM models ORDER BY id",
|
|
lambda r: Model(
|
|
id_model=r["id"], brand_id=r["brand_id"],
|
|
name_model=r["name"],
|
|
id_body=body_map.get(r["body_type"]),
|
|
generation=r["generation"],
|
|
production_start_year=r["production_start_year"],
|
|
production_end_year=r["production_end_year"],
|
|
created_at=r["created_at"]),
|
|
"models")
|
|
pg.commit()
|
|
log(f"models: {n} rows")
|
|
|
|
# model_year_engine
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM model_year_engine ORDER BY id",
|
|
lambda r: ModelYearEngine(
|
|
id_mye=r["id"], model_id=r["model_id"],
|
|
year_id=r["year_id"], engine_id=r["engine_id"],
|
|
trim_level=r["trim_level"],
|
|
id_drivetrain=drive_map.get(r["drivetrain"]),
|
|
id_transmission=trans_map.get(r["transmission"]),
|
|
created_at=r["created_at"]),
|
|
"model_year_engine")
|
|
pg.commit()
|
|
log(f"model_year_engine: {n} rows")
|
|
|
|
# part_categories
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM part_categories ORDER BY id",
|
|
lambda r: PartCategory(
|
|
id_part_category=r["id"],
|
|
name_part_category=r["name"], name_es=r["name_es"],
|
|
parent_id=r["parent_id"], slug=r["slug"],
|
|
icon_name=r["icon_name"], display_order=r["display_order"],
|
|
created_at=r["created_at"]),
|
|
"part_categories")
|
|
pg.commit()
|
|
log(f"part_categories: {n} rows")
|
|
|
|
# part_groups
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM part_groups ORDER BY id",
|
|
lambda r: PartGroup(
|
|
id_part_group=r["id"], category_id=r["category_id"],
|
|
name_part_group=r["name"], name_es=r["name_es"],
|
|
slug=r["slug"], display_order=r["display_order"],
|
|
created_at=r["created_at"]),
|
|
"part_groups")
|
|
pg.commit()
|
|
log(f"part_groups: {n} rows")
|
|
|
|
# parts (without search_vector — trigger will fill it)
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM parts ORDER BY id",
|
|
lambda r: Part(
|
|
id_part=r["id"], oem_part_number=r["oem_part_number"],
|
|
name_part=r["name"], name_es=r["name_es"],
|
|
group_id=r["group_id"],
|
|
description=r["description"], description_es=r["description_es"],
|
|
weight_kg=r["weight_kg"],
|
|
id_material=mat_map.get(r["material"]),
|
|
created_at=r["created_at"]),
|
|
"parts")
|
|
pg.commit()
|
|
log(f"parts: {n} rows")
|
|
|
|
# vehicle_parts
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM vehicle_parts ORDER BY id",
|
|
lambda r: VehiclePart(
|
|
id_vehicle_part=r["id"],
|
|
model_year_engine_id=r["model_year_engine_id"],
|
|
part_id=r["part_id"],
|
|
quantity_required=r["quantity_required"],
|
|
id_position_part=pos_map.get(r["position"]),
|
|
fitment_notes=r["fitment_notes"],
|
|
created_at=r["created_at"]),
|
|
"vehicle_parts", batch=10000)
|
|
pg.commit()
|
|
log(f"vehicle_parts: {n} rows")
|
|
|
|
# manufacturers
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM manufacturers ORDER BY id",
|
|
lambda r: Manufacturer(
|
|
id_manufacture=r["id"], name_manufacture=r["name"],
|
|
id_type_manu=mtype_map.get(r["type"]),
|
|
id_quality_tier=qtier_map.get(r["quality_tier"]),
|
|
id_country=country_map.get(r["country"]),
|
|
logo_url=r["logo_url"], website=r["website"],
|
|
created_at=r["created_at"]),
|
|
"manufacturers")
|
|
pg.commit()
|
|
log(f"manufacturers: {n} rows")
|
|
|
|
# aftermarket_parts (skip orphans with missing oem_part_id)
|
|
valid_part_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM parts").fetchall())
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM aftermarket_parts ORDER BY id",
|
|
lambda r: AftermarketPart(
|
|
id_aftermarket_parts=r["id"],
|
|
oem_part_id=r["oem_part_id"],
|
|
manufacturer_id=r["manufacturer_id"],
|
|
part_number=r["part_number"],
|
|
name_aftermarket_parts=r["name"], name_es=r["name_es"],
|
|
id_quality_tier=qtier_map.get(r["quality_tier"]),
|
|
price_usd=r["price_usd"],
|
|
warranty_months=r["warranty_months"],
|
|
created_at=r["created_at"]) if r["oem_part_id"] in valid_part_ids else None,
|
|
"aftermarket_parts")
|
|
pg.commit()
|
|
log(f"aftermarket_parts: {n} rows")
|
|
|
|
# part_cross_references
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM part_cross_references ORDER BY id",
|
|
lambda r: PartCrossReference(
|
|
id_part_cross_ref=r["id"], part_id=r["part_id"],
|
|
cross_reference_number=r["cross_reference_number"],
|
|
id_ref_type=reftype_map.get(r["reference_type"]),
|
|
source_ref=r["source"], notes=r["notes"],
|
|
created_at=r["created_at"]),
|
|
"part_cross_references")
|
|
pg.commit()
|
|
log(f"part_cross_references: {n} rows")
|
|
|
|
# diagrams
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM diagrams ORDER BY id",
|
|
lambda r: Diagram(
|
|
id_diagram=r["id"], name_diagram=r["name"],
|
|
name_es=r["name_es"], group_id=r["group_id"],
|
|
image_path=r["image_path"],
|
|
thumbnail_path=r["thumbnail_path"],
|
|
display_order=r["display_order"],
|
|
source_diagram=r["source"],
|
|
created_at=r["created_at"]),
|
|
"diagrams")
|
|
pg.commit()
|
|
log(f"diagrams: {n} rows")
|
|
|
|
# vehicle_diagrams (skip orphans with missing diagram_id)
|
|
valid_diagram_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM diagrams").fetchall())
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM vehicle_diagrams ORDER BY id",
|
|
lambda r: VehicleDiagram(
|
|
id_vehicle_dgr=r["id"], diagram_id=r["diagram_id"],
|
|
model_year_engine_id=r["model_year_engine_id"],
|
|
notes=r["notes"], created_at=r["created_at"])
|
|
if r["diagram_id"] in valid_diagram_ids else None,
|
|
"vehicle_diagrams")
|
|
pg.commit()
|
|
log(f"vehicle_diagrams: {n} rows")
|
|
|
|
# diagram_hotspots
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM diagram_hotspots ORDER BY id",
|
|
lambda r: DiagramHotspot(
|
|
id_dgr_hotspot=r["id"], diagram_id=r["diagram_id"],
|
|
part_id=r["part_id"], callout_number=r["callout_number"],
|
|
id_shape=shape_map.get(r["shape"]),
|
|
coords=r["coords"], created_at=r["created_at"]),
|
|
"diagram_hotspots")
|
|
pg.commit()
|
|
log(f"diagram_hotspots: {n} rows")
|
|
|
|
# vin_cache
|
|
import json
|
|
n = migrate_table(pg, sqlite_conn,
|
|
"SELECT * FROM vin_cache ORDER BY id",
|
|
lambda r: VinCache(
|
|
id=r["id"], vin=r["vin"],
|
|
decoded_data=json.loads(r["decoded_data"]) if r["decoded_data"] else {},
|
|
make=r["make"], model=r["model"], year=r["year"],
|
|
engine_info=r["engine_info"], body_class=r["body_class"],
|
|
drive_type=r["drive_type"],
|
|
model_year_engine_id=r["model_year_engine_id"],
|
|
created_at=r["created_at"], expires_at=r["expires_at"]),
|
|
"vin_cache")
|
|
pg.commit()
|
|
log(f"vin_cache: {n} rows")
|
|
|
|
# ── Reset sequences ───────────────────────────────────
|
|
print("\n[5] Resetting sequences...")
|
|
seq_tables = [
|
|
("brands", "id_brand"),
|
|
("years", "id_year"),
|
|
("engines", "id_engine"),
|
|
("models", "id_model"),
|
|
("model_year_engine", "id_mye"),
|
|
("part_categories", "id_part_category"),
|
|
("part_groups", "id_part_group"),
|
|
("parts", "id_part"),
|
|
("vehicle_parts", "id_vehicle_part"),
|
|
("manufacturers", "id_manufacture"),
|
|
("aftermarket_parts", "id_aftermarket_parts"),
|
|
("part_cross_references", "id_part_cross_ref"),
|
|
("diagrams", "id_diagram"),
|
|
("vehicle_diagrams", "id_vehicle_dgr"),
|
|
("diagram_hotspots", "id_dgr_hotspot"),
|
|
("vin_cache", "id"),
|
|
("fuel_type", "id_fuel"),
|
|
("body_type", "id_body"),
|
|
("drivetrain", "id_drivetrain"),
|
|
("transmission", "id_transmission"),
|
|
("materials", "id_material"),
|
|
("position_part", "id_position_part"),
|
|
("manufacture_type", "id_type_manu"),
|
|
("quality_tier", "id_quality_tier"),
|
|
("countries", "id_country"),
|
|
("reference_type", "id_ref_type"),
|
|
("shapes", "id_shape"),
|
|
]
|
|
with engine.connect() as conn:
|
|
for table, pk in seq_tables:
|
|
conn.execute(text(
|
|
f"SELECT setval(pg_get_serial_sequence('{table}', '{pk}'), "
|
|
f"COALESCE((SELECT MAX({pk}) FROM {table}), 0) + 1, false)"
|
|
))
|
|
conn.commit()
|
|
log("All sequences reset")
|
|
|
|
# ── Full-text search trigger ──────────────────────────
|
|
print("\n[6] Creating search trigger & updating vectors...")
|
|
with engine.connect() as conn:
|
|
conn.execute(text(SEARCH_VECTOR_TRIGGER_SQL))
|
|
conn.commit()
|
|
# Backfill search_vector for existing rows
|
|
conn.execute(text("""
|
|
UPDATE parts SET search_vector = to_tsvector('spanish',
|
|
coalesce(oem_part_number, '') || ' ' ||
|
|
coalesce(name_part, '') || ' ' ||
|
|
coalesce(name_es, '') || ' ' ||
|
|
coalesce(description, ''))
|
|
"""))
|
|
conn.commit()
|
|
log("Search vectors populated")
|
|
|
|
# ── Verify counts ─────────────────────────────────────
|
|
print("\n[7] Verifying row counts...")
|
|
sqlite_tables = [
|
|
"brands", "models", "years", "engines", "model_year_engine",
|
|
"part_categories", "part_groups", "parts", "vehicle_parts",
|
|
"manufacturers", "aftermarket_parts", "part_cross_references",
|
|
"diagrams", "vehicle_diagrams", "diagram_hotspots", "vin_cache"
|
|
]
|
|
pg_tables = [
|
|
("brands", "id_brand"), ("models", "id_model"),
|
|
("years", "id_year"), ("engines", "id_engine"),
|
|
("model_year_engine", "id_mye"),
|
|
("part_categories", "id_part_category"),
|
|
("part_groups", "id_part_group"), ("parts", "id_part"),
|
|
("vehicle_parts", "id_vehicle_part"),
|
|
("manufacturers", "id_manufacture"),
|
|
("aftermarket_parts", "id_aftermarket_parts"),
|
|
("part_cross_references", "id_part_cross_ref"),
|
|
("diagrams", "id_diagram"),
|
|
("vehicle_diagrams", "id_vehicle_dgr"),
|
|
("diagram_hotspots", "id_dgr_hotspot"),
|
|
("vin_cache", "id"),
|
|
]
|
|
ok = True
|
|
with engine.connect() as conn:
|
|
for st, (pt, pk) in zip(sqlite_tables, pg_tables):
|
|
s_count = sqlite_conn.execute(f"SELECT COUNT(*) FROM {st}").fetchone()[0]
|
|
p_count = conn.execute(text(f"SELECT COUNT(*) FROM {pt}")).scalar()
|
|
status = "OK" if s_count == p_count else "MISMATCH"
|
|
if status == "MISMATCH":
|
|
ok = False
|
|
log(f" {st}: SQLite={s_count} PG={p_count} [{status}]")
|
|
|
|
sqlite_conn.close()
|
|
elapsed = time.time() - t0
|
|
|
|
print("\n" + "=" * 60)
|
|
if ok:
|
|
print(f" Migration completed successfully in {elapsed:.1f}s")
|
|
else:
|
|
print(f" Migration completed with MISMATCHES in {elapsed:.1f}s")
|
|
print("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|