#!/usr/bin/env python3 """ Migrate data from SQLite (vehicle_database.db) to PostgreSQL (nexus_autoparts). Usage: python3 migrate_to_postgres.py """ import sqlite3 import sys import time from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from config import DB_URL, SQLITE_PATH from models import ( Base, SEARCH_VECTOR_TRIGGER_SQL, FuelType, BodyType, Drivetrain, Transmission, Material, PositionPart, ManufactureType, QualityTier, Country, ReferenceType, Shape, Brand, Year, Engine, Model, ModelYearEngine, PartCategory, PartGroup, Part, VehiclePart, Manufacturer, AftermarketPart, PartCrossReference, Diagram, VehicleDiagram, DiagramHotspot, VinCache, ) BATCH_SIZE = 5000 def log(msg): print(f" {msg}") def connect_sqlite(): conn = sqlite3.connect(SQLITE_PATH) conn.row_factory = sqlite3.Row return conn def populate_lookup(pg, sqlite_conn, LookupClass, pk_col, name_col, sql_query): """Extract distinct values from SQLite and insert into a lookup table. Returns a dict mapping text value → new PK id. """ rows = sqlite_conn.execute(sql_query).fetchall() values = sorted(set(r[0] for r in rows if r[0])) mapping = {} for i, val in enumerate(values, start=1): obj = LookupClass(**{pk_col: i, name_col: val}) pg.add(obj) mapping[val] = i pg.flush() log(f" {LookupClass.__tablename__}: {len(mapping)} values") return mapping def migrate_table(pg, sqlite_conn, query, build_obj_fn, label, batch=BATCH_SIZE): """Generic batch-migrate helper.""" rows = sqlite_conn.execute(query).fetchall() total = len(rows) count = 0 for i in range(0, total, batch): chunk = rows[i:i + batch] for row in chunk: obj = build_obj_fn(row) if obj is not None: pg.add(obj) pg.flush() count += len(chunk) if count % 50000 == 0 or count == total: log(f" {label}: {count}/{total}") return total def main(): print("=" * 60) print(" NEXUS AUTOPARTS — SQLite → PostgreSQL Migration") print("=" * 60) t0 = time.time() # Connect print("\n[1] Connecting...") sqlite_conn = connect_sqlite() engine = create_engine(DB_URL, echo=False) Session = sessionmaker(bind=engine) # Drop & recreate all tables print("\n[2] Creating schema...") Base.metadata.drop_all(engine) Base.metadata.create_all(engine) log("All tables created") pg = Session() # ── Lookup tables ────────────────────────────────────── print("\n[3] Populating lookup tables...") fuel_map = populate_lookup( pg, sqlite_conn, FuelType, "id_fuel", "name_fuel", "SELECT DISTINCT fuel_type FROM engines WHERE fuel_type IS NOT NULL") body_map = populate_lookup( pg, sqlite_conn, BodyType, "id_body", "name_body", "SELECT DISTINCT body_type FROM models WHERE body_type IS NOT NULL") drive_map = populate_lookup( pg, sqlite_conn, Drivetrain, "id_drivetrain", "name_drivetrain", "SELECT DISTINCT drivetrain FROM model_year_engine WHERE drivetrain IS NOT NULL") trans_map = populate_lookup( pg, sqlite_conn, Transmission, "id_transmission", "name_transmission", "SELECT DISTINCT transmission FROM model_year_engine WHERE transmission IS NOT NULL") mat_map = populate_lookup( pg, sqlite_conn, Material, "id_material", "name_material", "SELECT DISTINCT material FROM parts WHERE material IS NOT NULL") pos_map = populate_lookup( pg, sqlite_conn, PositionPart, "id_position_part", "name_position_part", "SELECT DISTINCT position FROM vehicle_parts WHERE position IS NOT NULL") mtype_map = populate_lookup( pg, sqlite_conn, ManufactureType, "id_type_manu", "name_type_manu", "SELECT DISTINCT type FROM manufacturers WHERE type IS NOT NULL") qtier_map = populate_lookup( pg, sqlite_conn, QualityTier, "id_quality_tier", "name_quality", """SELECT DISTINCT quality_tier FROM ( SELECT quality_tier FROM manufacturers WHERE quality_tier IS NOT NULL UNION SELECT quality_tier FROM aftermarket_parts WHERE quality_tier IS NOT NULL )""") country_map = populate_lookup( pg, sqlite_conn, Country, "id_country", "name_country", "SELECT DISTINCT country FROM manufacturers WHERE country IS NOT NULL") reftype_map = populate_lookup( pg, sqlite_conn, ReferenceType, "id_ref_type", "name_ref_type", "SELECT DISTINCT reference_type FROM part_cross_references WHERE reference_type IS NOT NULL") shape_map = populate_lookup( pg, sqlite_conn, Shape, "id_shape", "name_shape", "SELECT DISTINCT shape FROM diagram_hotspots WHERE shape IS NOT NULL") pg.commit() # ── Core tables ──────────────────────────────────────── print("\n[4] Migrating core tables...") # brands n = migrate_table(pg, sqlite_conn, "SELECT * FROM brands ORDER BY id", lambda r: Brand( id_brand=r["id"], name_brand=r["name"], country=r["country"], founded_year=r["founded_year"], created_at=r["created_at"]), "brands") pg.commit() log(f"brands: {n} rows") # years n = migrate_table(pg, sqlite_conn, "SELECT * FROM years ORDER BY id", lambda r: Year( id_year=r["id"], year_car=r["year"], created_at=r["created_at"]), "years") pg.commit() log(f"years: {n} rows") # engines n = migrate_table(pg, sqlite_conn, "SELECT * FROM engines ORDER BY id", lambda r: Engine( id_engine=r["id"], name_engine=r["name"], displacement_cc=r["displacement_cc"], cylinders=r["cylinders"], id_fuel=fuel_map.get(r["fuel_type"]), power_hp=r["power_hp"], torque_nm=r["torque_nm"], engine_code=r["engine_code"], created_at=r["created_at"]), "engines") pg.commit() log(f"engines: {n} rows") # models n = migrate_table(pg, sqlite_conn, "SELECT * FROM models ORDER BY id", lambda r: Model( id_model=r["id"], brand_id=r["brand_id"], name_model=r["name"], id_body=body_map.get(r["body_type"]), generation=r["generation"], production_start_year=r["production_start_year"], production_end_year=r["production_end_year"], created_at=r["created_at"]), "models") pg.commit() log(f"models: {n} rows") # model_year_engine n = migrate_table(pg, sqlite_conn, "SELECT * FROM model_year_engine ORDER BY id", lambda r: ModelYearEngine( id_mye=r["id"], model_id=r["model_id"], year_id=r["year_id"], engine_id=r["engine_id"], trim_level=r["trim_level"], id_drivetrain=drive_map.get(r["drivetrain"]), id_transmission=trans_map.get(r["transmission"]), created_at=r["created_at"]), "model_year_engine") pg.commit() log(f"model_year_engine: {n} rows") # part_categories n = migrate_table(pg, sqlite_conn, "SELECT * FROM part_categories ORDER BY id", lambda r: PartCategory( id_part_category=r["id"], name_part_category=r["name"], name_es=r["name_es"], parent_id=r["parent_id"], slug=r["slug"], icon_name=r["icon_name"], display_order=r["display_order"], created_at=r["created_at"]), "part_categories") pg.commit() log(f"part_categories: {n} rows") # part_groups n = migrate_table(pg, sqlite_conn, "SELECT * FROM part_groups ORDER BY id", lambda r: PartGroup( id_part_group=r["id"], category_id=r["category_id"], name_part_group=r["name"], name_es=r["name_es"], slug=r["slug"], display_order=r["display_order"], created_at=r["created_at"]), "part_groups") pg.commit() log(f"part_groups: {n} rows") # parts (without search_vector — trigger will fill it) n = migrate_table(pg, sqlite_conn, "SELECT * FROM parts ORDER BY id", lambda r: Part( id_part=r["id"], oem_part_number=r["oem_part_number"], name_part=r["name"], name_es=r["name_es"], group_id=r["group_id"], description=r["description"], description_es=r["description_es"], weight_kg=r["weight_kg"], id_material=mat_map.get(r["material"]), created_at=r["created_at"]), "parts") pg.commit() log(f"parts: {n} rows") # vehicle_parts n = migrate_table(pg, sqlite_conn, "SELECT * FROM vehicle_parts ORDER BY id", lambda r: VehiclePart( id_vehicle_part=r["id"], model_year_engine_id=r["model_year_engine_id"], part_id=r["part_id"], quantity_required=r["quantity_required"], id_position_part=pos_map.get(r["position"]), fitment_notes=r["fitment_notes"], created_at=r["created_at"]), "vehicle_parts", batch=10000) pg.commit() log(f"vehicle_parts: {n} rows") # manufacturers n = migrate_table(pg, sqlite_conn, "SELECT * FROM manufacturers ORDER BY id", lambda r: Manufacturer( id_manufacture=r["id"], name_manufacture=r["name"], id_type_manu=mtype_map.get(r["type"]), id_quality_tier=qtier_map.get(r["quality_tier"]), id_country=country_map.get(r["country"]), logo_url=r["logo_url"], website=r["website"], created_at=r["created_at"]), "manufacturers") pg.commit() log(f"manufacturers: {n} rows") # aftermarket_parts (skip orphans with missing oem_part_id) valid_part_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM parts").fetchall()) n = migrate_table(pg, sqlite_conn, "SELECT * FROM aftermarket_parts ORDER BY id", lambda r: AftermarketPart( id_aftermarket_parts=r["id"], oem_part_id=r["oem_part_id"], manufacturer_id=r["manufacturer_id"], part_number=r["part_number"], name_aftermarket_parts=r["name"], name_es=r["name_es"], id_quality_tier=qtier_map.get(r["quality_tier"]), price_usd=r["price_usd"], warranty_months=r["warranty_months"], created_at=r["created_at"]) if r["oem_part_id"] in valid_part_ids else None, "aftermarket_parts") pg.commit() log(f"aftermarket_parts: {n} rows") # part_cross_references n = migrate_table(pg, sqlite_conn, "SELECT * FROM part_cross_references ORDER BY id", lambda r: PartCrossReference( id_part_cross_ref=r["id"], part_id=r["part_id"], cross_reference_number=r["cross_reference_number"], id_ref_type=reftype_map.get(r["reference_type"]), source_ref=r["source"], notes=r["notes"], created_at=r["created_at"]), "part_cross_references") pg.commit() log(f"part_cross_references: {n} rows") # diagrams n = migrate_table(pg, sqlite_conn, "SELECT * FROM diagrams ORDER BY id", lambda r: Diagram( id_diagram=r["id"], name_diagram=r["name"], name_es=r["name_es"], group_id=r["group_id"], image_path=r["image_path"], thumbnail_path=r["thumbnail_path"], display_order=r["display_order"], source_diagram=r["source"], created_at=r["created_at"]), "diagrams") pg.commit() log(f"diagrams: {n} rows") # vehicle_diagrams (skip orphans with missing diagram_id) valid_diagram_ids = set(r[0] for r in sqlite_conn.execute("SELECT id FROM diagrams").fetchall()) n = migrate_table(pg, sqlite_conn, "SELECT * FROM vehicle_diagrams ORDER BY id", lambda r: VehicleDiagram( id_vehicle_dgr=r["id"], diagram_id=r["diagram_id"], model_year_engine_id=r["model_year_engine_id"], notes=r["notes"], created_at=r["created_at"]) if r["diagram_id"] in valid_diagram_ids else None, "vehicle_diagrams") pg.commit() log(f"vehicle_diagrams: {n} rows") # diagram_hotspots n = migrate_table(pg, sqlite_conn, "SELECT * FROM diagram_hotspots ORDER BY id", lambda r: DiagramHotspot( id_dgr_hotspot=r["id"], diagram_id=r["diagram_id"], part_id=r["part_id"], callout_number=r["callout_number"], id_shape=shape_map.get(r["shape"]), coords=r["coords"], created_at=r["created_at"]), "diagram_hotspots") pg.commit() log(f"diagram_hotspots: {n} rows") # vin_cache import json n = migrate_table(pg, sqlite_conn, "SELECT * FROM vin_cache ORDER BY id", lambda r: VinCache( id=r["id"], vin=r["vin"], decoded_data=json.loads(r["decoded_data"]) if r["decoded_data"] else {}, make=r["make"], model=r["model"], year=r["year"], engine_info=r["engine_info"], body_class=r["body_class"], drive_type=r["drive_type"], model_year_engine_id=r["model_year_engine_id"], created_at=r["created_at"], expires_at=r["expires_at"]), "vin_cache") pg.commit() log(f"vin_cache: {n} rows") # ── Reset sequences ─────────────────────────────────── print("\n[5] Resetting sequences...") seq_tables = [ ("brands", "id_brand"), ("years", "id_year"), ("engines", "id_engine"), ("models", "id_model"), ("model_year_engine", "id_mye"), ("part_categories", "id_part_category"), ("part_groups", "id_part_group"), ("parts", "id_part"), ("vehicle_parts", "id_vehicle_part"), ("manufacturers", "id_manufacture"), ("aftermarket_parts", "id_aftermarket_parts"), ("part_cross_references", "id_part_cross_ref"), ("diagrams", "id_diagram"), ("vehicle_diagrams", "id_vehicle_dgr"), ("diagram_hotspots", "id_dgr_hotspot"), ("vin_cache", "id"), ("fuel_type", "id_fuel"), ("body_type", "id_body"), ("drivetrain", "id_drivetrain"), ("transmission", "id_transmission"), ("materials", "id_material"), ("position_part", "id_position_part"), ("manufacture_type", "id_type_manu"), ("quality_tier", "id_quality_tier"), ("countries", "id_country"), ("reference_type", "id_ref_type"), ("shapes", "id_shape"), ] with engine.connect() as conn: for table, pk in seq_tables: conn.execute(text( f"SELECT setval(pg_get_serial_sequence('{table}', '{pk}'), " f"COALESCE((SELECT MAX({pk}) FROM {table}), 0) + 1, false)" )) conn.commit() log("All sequences reset") # ── Full-text search trigger ────────────────────────── print("\n[6] Creating search trigger & updating vectors...") with engine.connect() as conn: conn.execute(text(SEARCH_VECTOR_TRIGGER_SQL)) conn.commit() # Backfill search_vector for existing rows conn.execute(text(""" UPDATE parts SET search_vector = to_tsvector('spanish', coalesce(oem_part_number, '') || ' ' || coalesce(name_part, '') || ' ' || coalesce(name_es, '') || ' ' || coalesce(description, '')) """)) conn.commit() log("Search vectors populated") # ── Verify counts ───────────────────────────────────── print("\n[7] Verifying row counts...") sqlite_tables = [ "brands", "models", "years", "engines", "model_year_engine", "part_categories", "part_groups", "parts", "vehicle_parts", "manufacturers", "aftermarket_parts", "part_cross_references", "diagrams", "vehicle_diagrams", "diagram_hotspots", "vin_cache" ] pg_tables = [ ("brands", "id_brand"), ("models", "id_model"), ("years", "id_year"), ("engines", "id_engine"), ("model_year_engine", "id_mye"), ("part_categories", "id_part_category"), ("part_groups", "id_part_group"), ("parts", "id_part"), ("vehicle_parts", "id_vehicle_part"), ("manufacturers", "id_manufacture"), ("aftermarket_parts", "id_aftermarket_parts"), ("part_cross_references", "id_part_cross_ref"), ("diagrams", "id_diagram"), ("vehicle_diagrams", "id_vehicle_dgr"), ("diagram_hotspots", "id_dgr_hotspot"), ("vin_cache", "id"), ] ok = True with engine.connect() as conn: for st, (pt, pk) in zip(sqlite_tables, pg_tables): s_count = sqlite_conn.execute(f"SELECT COUNT(*) FROM {st}").fetchone()[0] p_count = conn.execute(text(f"SELECT COUNT(*) FROM {pt}")).scalar() status = "OK" if s_count == p_count else "MISMATCH" if status == "MISMATCH": ok = False log(f" {st}: SQLite={s_count} PG={p_count} [{status}]") sqlite_conn.close() elapsed = time.time() - t0 print("\n" + "=" * 60) if ok: print(f" Migration completed successfully in {elapsed:.1f}s") else: print(f" Migration completed with MISMATCHES in {elapsed:.1f}s") print("=" * 60) if __name__ == "__main__": main()