Files
consultoria-as 7b2a904498 feat: migrate to PostgreSQL + SQLAlchemy ORM, rebrand to Nexus Autoparts
- Migrate from SQLite to PostgreSQL with normalized schema
- Add 11 lookup tables (fuel_type, body_type, drivetrain, transmission,
  materials, position_part, manufacture_type, quality_tier, countries,
  reference_type, shapes)
- Rewrite dashboard/server.py (76 routes) using SQLAlchemy text() queries
- Rewrite console/db.py (27 methods) using SQLAlchemy ORM
- Add models.py with 27 SQLAlchemy model definitions
- Add config.py for centralized DB_URL configuration
- Add migrate_to_postgres.py migration script
- Add docs/METABASE_GUIDE.md with complete data entry guide
- Rebrand from "AUTOPARTS DB" to "NEXUS AUTOPARTS"
- Fill vehicle data gaps via NHTSA API + heuristics:
  engines (cylinders, power, torque), brands (country, founded_year),
  models (body_type, production years), MYE (drivetrain, transmission, trim)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 05:24:47 +00:00

787 lines
30 KiB
Python

"""
Database abstraction layer for the NEXUS AUTOPARTS console application.
Provides all data access methods the console app needs, reading from the
PostgreSQL database used by the Flask web dashboard.
"""
import json as json_module
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
from config import DB_URL
class Database:
"""Thin abstraction over the nexus_autoparts PostgreSQL database."""
def __init__(self, db_url: Optional[str] = None):
self.db_url = db_url or DB_URL
self._engine = None
self._Session = None
self._cache: dict = {}
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _get_engine(self):
if self._engine is None:
self._engine = create_engine(self.db_url, pool_pre_ping=True)
self._Session = sessionmaker(bind=self._engine)
return self._engine
def _session(self):
self._get_engine()
return self._Session()
def close(self):
"""Dispose the engine connection pool."""
if self._engine is not None:
self._engine.dispose()
self._engine = None
self._Session = None
def _query(self, sql: str, params: dict = None, one: bool = False):
"""Execute a SELECT and return list[dict] (or a single dict if *one*)."""
session = self._session()
try:
rows = session.execute(text(sql), params or {}).mappings().all()
if one:
return dict(rows[0]) if rows else None
return [dict(r) for r in rows]
finally:
session.close()
def _query_cached(self, cache_key: str, sql: str, params: dict = None):
"""Execute a SELECT with in-memory caching for repeated queries."""
if cache_key in self._cache:
return self._cache[cache_key]
result = self._query(sql, params)
self._cache[cache_key] = result
return result
def _execute(self, sql: str, params: dict = None) -> Optional[int]:
"""Execute an INSERT/UPDATE/DELETE. Returns scalar result if RETURNING used."""
session = self._session()
try:
result = session.execute(text(sql), params or {})
session.commit()
self._cache.clear()
# If the query has RETURNING, get the scalar
try:
return result.scalar()
except Exception:
return None
finally:
session.close()
# ==================================================================
# Vehicle navigation
# ==================================================================
def get_brands(self) -> list[dict]:
"""Return all brands ordered by name: [{id, name, country}]."""
return self._query_cached(
"brands",
"SELECT id_brand AS id, name_brand AS name, country FROM brands ORDER BY name_brand",
)
def get_models(self, brand: Optional[str] = None) -> list[dict]:
"""Return models, optionally filtered by brand name (case-insensitive)."""
if brand:
key = f"models:{brand.upper()}"
return self._query_cached(
key,
"""
SELECT MIN(m.id_model) AS id, m.name_model AS name
FROM models m
JOIN brands b ON m.brand_id = b.id_brand
WHERE b.name_brand ILIKE :brand
GROUP BY UPPER(m.name_model), m.name_model
ORDER BY m.name_model
""",
{"brand": brand},
)
return self._query_cached(
"models:all",
"SELECT MIN(id_model) AS id, name_model AS name FROM models GROUP BY UPPER(name_model), name_model ORDER BY name_model",
)
def get_years(
self, brand: Optional[str] = None, model: Optional[str] = None
) -> list[dict]:
"""Return years, optionally filtered by brand and/or model."""
sql = """
SELECT DISTINCT y.id_year AS id, y.year_car AS year
FROM years y
JOIN model_year_engine mye ON y.id_year = mye.year_id
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
WHERE 1=1
"""
params: dict = {}
if brand:
sql += " AND b.name_brand ILIKE :brand"
params["brand"] = brand
if model:
sql += " AND m.name_model ILIKE :model"
params["model"] = model
sql += " ORDER BY y.year_car DESC"
return self._query(sql, params)
def get_engines(
self,
brand: Optional[str] = None,
model: Optional[str] = None,
year: Optional[int] = None,
) -> list[dict]:
"""Return engines, optionally filtered by brand/model/year."""
sql = """
SELECT MIN(e.id_engine) AS id, e.name_engine AS name,
MAX(e.displacement_cc) AS displacement_cc,
MAX(e.cylinders) AS cylinders,
MAX(ft.name_fuel) AS fuel_type,
MAX(e.power_hp) AS power_hp,
MAX(e.torque_nm) AS torque_nm,
MAX(e.engine_code) AS engine_code
FROM engines e
LEFT JOIN fuel_type ft ON e.id_fuel = ft.id_fuel
JOIN model_year_engine mye ON e.id_engine = mye.engine_id
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
WHERE 1=1
"""
params: dict = {}
if brand:
sql += " AND b.name_brand ILIKE :brand"
params["brand"] = brand
if model:
sql += " AND m.name_model ILIKE :model"
params["model"] = model
if year:
sql += " AND y.year_car = :year"
params["year"] = int(year)
sql += " GROUP BY UPPER(e.name_engine), e.name_engine ORDER BY e.name_engine"
return self._query(sql, params)
def get_model_year_engine(
self,
brand: str,
model: str,
year: int,
engine_id: Optional[int] = None,
) -> list[dict]:
"""Return model_year_engine records for a specific vehicle config."""
sql = """
SELECT
mye.id_mye AS id,
b.name_brand AS brand,
m.name_model AS model,
y.year_car AS year,
e.id_engine AS engine_id,
e.name_engine AS engine,
mye.trim_level,
dt.name_drivetrain AS drivetrain,
tr.name_transmission AS transmission
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain
LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission
WHERE b.name_brand ILIKE :brand
AND m.name_model ILIKE :model
AND y.year_car = :year
"""
params: dict = {"brand": brand, "model": model, "year": int(year)}
if engine_id:
sql += " AND e.id_engine = :engine_id"
params["engine_id"] = engine_id
sql += " ORDER BY e.name_engine, mye.trim_level"
return self._query(sql, params)
# ==================================================================
# Parts catalog
# ==================================================================
def get_categories(self) -> list[dict]:
"""Return all part categories ordered by display_order."""
return self._query_cached(
"categories",
"""
SELECT id_part_category AS id, name_part_category AS name,
name_es, slug, icon_name, display_order
FROM part_categories
ORDER BY display_order, name_part_category
""",
)
def get_groups(self, category_id: int) -> list[dict]:
"""Return part groups for a given category."""
return self._query(
"""
SELECT id_part_group AS id, name_part_group AS name,
name_es, slug, display_order
FROM part_groups
WHERE category_id = :cat_id
ORDER BY display_order, name_part_group
""",
{"cat_id": category_id},
)
def get_parts(
self,
group_id: Optional[int] = None,
mye_id: Optional[int] = None,
page: int = 1,
per_page: int = 15,
) -> list[dict]:
"""Return parts with optional group/vehicle filter and pagination."""
per_page = min(per_page, 100)
offset = (page - 1) * per_page
sql = """
SELECT
p.id_part AS id,
p.oem_part_number,
p.name_part AS name,
p.name_es,
p.group_id,
pg.name_part_group AS group_name,
pc.name_part_category AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
"""
where_parts: list[str] = []
params: dict = {}
if group_id:
where_parts.append("p.group_id = :group_id")
params["group_id"] = group_id
if mye_id:
where_parts.append(
"p.id_part IN (SELECT part_id FROM vehicle_parts WHERE model_year_engine_id = :mye_id)"
)
params["mye_id"] = mye_id
if where_parts:
sql += " WHERE " + " AND ".join(where_parts)
sql += " ORDER BY p.name_part LIMIT :limit OFFSET :offset"
params["limit"] = per_page
params["offset"] = offset
return self._query(sql, params)
def get_part(self, part_id: int) -> Optional[dict]:
"""Return a single part with group/category info, or None."""
return self._query(
"""
SELECT
p.id_part AS id,
p.oem_part_number,
p.name_part AS name,
p.name_es,
p.description,
p.description_es,
p.weight_kg,
mat.name_material AS material,
p.group_id,
pg.name_part_group AS group_name,
pg.name_es AS group_name_es,
pc.id_part_category AS category_id,
pc.name_part_category AS category_name,
pc.name_es AS category_name_es
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
LEFT JOIN materials mat ON p.id_material = mat.id_material
WHERE p.id_part = :part_id
""",
{"part_id": part_id},
one=True,
)
def get_alternatives(self, part_id: int) -> list[dict]:
"""Return aftermarket alternatives for an OEM part."""
return self._query(
"""
SELECT
ap.id_aftermarket_parts AS id,
ap.part_number,
ap.name_aftermarket_parts AS name,
ap.name_es,
m.name_manufacture AS manufacturer_name,
ap.manufacturer_id,
qt.name_quality AS quality_tier,
ap.price_usd,
ap.warranty_months
FROM aftermarket_parts ap
JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture
LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier
WHERE ap.oem_part_id = :part_id
ORDER BY qt.name_quality DESC, ap.price_usd ASC
""",
{"part_id": part_id},
)
def get_cross_references(self, part_id: int) -> list[dict]:
"""Return cross-reference numbers for a part."""
return self._query(
"""
SELECT id_part_cross_ref AS id, cross_reference_number,
rt.name_ref_type AS reference_type,
source_ref AS source, notes
FROM part_cross_references pcr
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
WHERE pcr.part_id = :part_id
ORDER BY rt.name_ref_type, pcr.cross_reference_number
""",
{"part_id": part_id},
)
def get_vehicles_for_part(self, part_id: int) -> list[dict]:
"""Return vehicles that use a specific part."""
return self._query(
"""
SELECT
b.name_brand AS brand,
m.name_model AS model,
y.year_car AS year,
e.name_engine AS engine,
mye.trim_level,
vp.quantity_required,
pp.name_position_part AS position,
vp.fitment_notes
FROM vehicle_parts vp
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
WHERE vp.part_id = :part_id
ORDER BY b.name_brand, m.name_model, y.year_car
""",
{"part_id": part_id},
)
# ==================================================================
# Search
# ==================================================================
def search_parts(
self, query: str, page: int = 1, per_page: int = 15
) -> list[dict]:
"""Full-text search using PostgreSQL tsvector."""
per_page = min(per_page, 100)
offset = (page - 1) * per_page
return self._query(
"""
SELECT
p.id_part AS id,
p.oem_part_number,
p.name_part AS name,
p.name_es,
p.description,
pg.name_part_group AS group_name,
pc.name_part_category AS category_name,
ts_rank(p.search_vector, plainto_tsquery('spanish', :q)) AS rank
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE p.search_vector @@ plainto_tsquery('spanish', :q)
ORDER BY rank DESC
LIMIT :limit OFFSET :offset
""",
{"q": query, "limit": per_page, "offset": offset},
)
def search_part_number(self, number: str) -> list[dict]:
"""Search OEM, aftermarket, and cross-reference part numbers."""
results: list[dict] = []
search_term = f"%{number}%"
# OEM parts
rows = self._query(
"""
SELECT id_part AS id, oem_part_number, name_part AS name, name_es
FROM parts
WHERE oem_part_number ILIKE :term
""",
{"term": search_term},
)
for row in rows:
results.append({
**row,
"match_type": "oem",
"matched_number": row["oem_part_number"],
})
# Aftermarket parts
rows = self._query(
"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name,
p.name_es, ap.part_number
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id_part
WHERE ap.part_number ILIKE :term
""",
{"term": search_term},
)
for row in rows:
results.append({
"id": row["id"],
"oem_part_number": row["oem_part_number"],
"name": row["name"],
"name_es": row["name_es"],
"match_type": "aftermarket",
"matched_number": row["part_number"],
})
# Cross-references
rows = self._query(
"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name,
p.name_es, pcr.cross_reference_number
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id_part
WHERE pcr.cross_reference_number ILIKE :term
""",
{"term": search_term},
)
for row in rows:
results.append({
"id": row["id"],
"oem_part_number": row["oem_part_number"],
"name": row["name"],
"name_es": row["name_es"],
"match_type": "cross_reference",
"matched_number": row["cross_reference_number"],
})
return results
# ==================================================================
# VIN cache
# ==================================================================
def get_vin_cache(self, vin: str) -> Optional[dict]:
"""Return cached VIN decode data if still valid, else None."""
return self._query(
"""
SELECT
vin, decoded_data, make, model, year,
engine_info, body_class, drive_type,
model_year_engine_id, created_at, expires_at
FROM vin_cache
WHERE vin = :vin AND expires_at > NOW()
""",
{"vin": vin.upper().strip()},
one=True,
)
def save_vin_cache(
self,
vin: str,
data: str,
make: str,
model: str,
year: int,
engine_info: str,
body_class: str,
drive_type: str,
) -> Optional[int]:
"""Insert or update a VIN cache entry (30-day expiry)."""
expires = datetime.utcnow() + timedelta(days=30)
decoded = json_module.loads(data) if isinstance(data, str) else data
return self._execute(
"""
INSERT INTO vin_cache
(vin, decoded_data, make, model, year,
engine_info, body_class, drive_type, expires_at)
VALUES (:vin, :decoded_data, :make, :model, :year,
:engine_info, :body_class, :drive_type, :expires_at)
ON CONFLICT (vin) DO UPDATE SET
decoded_data = EXCLUDED.decoded_data,
make = EXCLUDED.make,
model = EXCLUDED.model,
year = EXCLUDED.year,
engine_info = EXCLUDED.engine_info,
body_class = EXCLUDED.body_class,
drive_type = EXCLUDED.drive_type,
expires_at = EXCLUDED.expires_at
RETURNING id
""",
{
"vin": vin.upper().strip(),
"decoded_data": json_module.dumps(decoded),
"make": make,
"model": model,
"year": year,
"engine_info": engine_info,
"body_class": body_class,
"drive_type": drive_type,
"expires_at": expires.isoformat(),
},
)
# ==================================================================
# Stats
# ==================================================================
def get_stats(self) -> dict:
"""Return counts for all major tables plus top brands by fitment."""
session = self._session()
try:
stats: dict = {}
table_map = {
"brands": "brands",
"models": "models",
"years": "years",
"engines": "engines",
"part_categories": "part_categories",
"part_groups": "part_groups",
"parts": "parts",
"aftermarket_parts": "aftermarket_parts",
"manufacturers": "manufacturers",
"vehicle_parts": "vehicle_parts",
"part_cross_references": "part_cross_references",
}
for key, table in table_map.items():
row = session.execute(text(f"SELECT COUNT(*) AS cnt FROM {table}")).mappings().one()
stats[key] = row["cnt"]
# Top brands by number of fitments
rows = session.execute(text("""
SELECT b.name_brand AS name, COUNT(DISTINCT vp.id_vehicle_part) AS cnt
FROM brands b
JOIN models m ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye
GROUP BY b.name_brand
ORDER BY cnt DESC
LIMIT 10
""")).mappings().all()
stats["top_brands"] = [
{"name": r["name"], "count": r["cnt"]} for r in rows
]
return stats
finally:
session.close()
# ==================================================================
# Admin — Manufacturers
# ==================================================================
def get_manufacturers(self) -> list[dict]:
"""Return all manufacturers ordered by name."""
return self._query(
"""
SELECT m.id_manufacture AS id, m.name_manufacture AS name,
mt.name_type_manu AS type,
qt.name_quality AS quality_tier,
c.name_country AS country,
m.logo_url, m.website
FROM manufacturers m
LEFT JOIN manufacture_type mt ON m.id_type_manu = mt.id_type_manu
LEFT JOIN quality_tier qt ON m.id_quality_tier = qt.id_quality_tier
LEFT JOIN countries c ON m.id_country = c.id_country
ORDER BY m.name_manufacture
"""
)
def create_manufacturer(self, data: dict) -> Optional[int]:
"""Insert a new manufacturer and return its id."""
return self._execute(
"""
INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier,
id_country, logo_url, website)
VALUES (:name,
(SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier),
(SELECT id_country FROM countries WHERE name_country = :country),
:logo_url, :website)
RETURNING id_manufacture
""",
{
"name": data["name"],
"type": data.get("type"),
"quality_tier": data.get("quality_tier"),
"country": data.get("country"),
"logo_url": data.get("logo_url"),
"website": data.get("website"),
},
)
def update_manufacturer(self, mfr_id: int, data: dict) -> None:
"""Update an existing manufacturer."""
self._execute(
"""
UPDATE manufacturers
SET name_manufacture = :name,
id_type_manu = (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier),
id_country = (SELECT id_country FROM countries WHERE name_country = :country),
logo_url = :logo_url, website = :website
WHERE id_manufacture = :mfr_id
""",
{
"name": data["name"],
"type": data.get("type"),
"quality_tier": data.get("quality_tier"),
"country": data.get("country"),
"logo_url": data.get("logo_url"),
"website": data.get("website"),
"mfr_id": mfr_id,
},
)
def delete_manufacturer(self, mfr_id: int) -> None:
"""Delete a manufacturer by id."""
self._execute("DELETE FROM manufacturers WHERE id_manufacture = :id", {"id": mfr_id})
# ==================================================================
# Admin — Parts
# ==================================================================
def create_part(self, data: dict) -> Optional[int]:
"""Insert a new part and return its id."""
return self._execute(
"""
INSERT INTO parts
(oem_part_number, name_part, name_es, group_id,
description, description_es, weight_kg,
id_material)
VALUES (:oem_part_number, :name, :name_es, :group_id,
:description, :description_es, :weight_kg,
(SELECT id_material FROM materials WHERE name_material = :material))
RETURNING id_part
""",
{
"oem_part_number": data["oem_part_number"],
"name": data["name"],
"name_es": data.get("name_es"),
"group_id": data.get("group_id"),
"description": data.get("description"),
"description_es": data.get("description_es"),
"weight_kg": data.get("weight_kg"),
"material": data.get("material"),
},
)
def update_part(self, part_id: int, data: dict) -> None:
"""Update an existing part."""
self._execute(
"""
UPDATE parts
SET oem_part_number = :oem_part_number, name_part = :name,
name_es = :name_es, group_id = :group_id,
description = :description, description_es = :description_es,
weight_kg = :weight_kg,
id_material = (SELECT id_material FROM materials WHERE name_material = :material)
WHERE id_part = :part_id
""",
{
"oem_part_number": data["oem_part_number"],
"name": data["name"],
"name_es": data.get("name_es"),
"group_id": data.get("group_id"),
"description": data.get("description"),
"description_es": data.get("description_es"),
"weight_kg": data.get("weight_kg"),
"material": data.get("material"),
"part_id": part_id,
},
)
def delete_part(self, part_id: int) -> None:
"""Delete a part by id."""
self._execute("DELETE FROM parts WHERE id_part = :id", {"id": part_id})
# ==================================================================
# Admin — Cross-references
# ==================================================================
def create_crossref(self, data: dict) -> Optional[int]:
"""Insert a new cross-reference and return its id."""
return self._execute(
"""
INSERT INTO part_cross_references
(part_id, cross_reference_number, id_ref_type, source_ref, notes)
VALUES (:part_id, :cross_reference_number,
(SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type),
:source, :notes)
RETURNING id_part_cross_ref
""",
{
"part_id": data["part_id"],
"cross_reference_number": data["cross_reference_number"],
"reference_type": data.get("reference_type"),
"source": data.get("source"),
"notes": data.get("notes"),
},
)
def update_crossref(self, xref_id: int, data: dict) -> None:
"""Update an existing cross-reference."""
self._execute(
"""
UPDATE part_cross_references
SET part_id = :part_id, cross_reference_number = :cross_reference_number,
id_ref_type = (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type),
source_ref = :source, notes = :notes
WHERE id_part_cross_ref = :xref_id
""",
{
"part_id": data["part_id"],
"cross_reference_number": data["cross_reference_number"],
"reference_type": data.get("reference_type"),
"source": data.get("source"),
"notes": data.get("notes"),
"xref_id": xref_id,
},
)
def delete_crossref(self, xref_id: int) -> None:
"""Delete a cross-reference by id."""
self._execute(
"DELETE FROM part_cross_references WHERE id_part_cross_ref = :id", {"id": xref_id}
)
def get_crossrefs_paginated(
self, page: int = 1, per_page: int = 15
) -> list[dict]:
"""Return paginated cross-references with part info."""
per_page = min(per_page, 100)
offset = (page - 1) * per_page
return self._query(
"""
SELECT
pcr.id_part_cross_ref AS id,
pcr.part_id,
pcr.cross_reference_number,
rt.name_ref_type AS reference_type,
pcr.source_ref AS source,
pcr.notes,
p.oem_part_number,
p.name_part AS part_name
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id_part
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
ORDER BY pcr.id_part_cross_ref
LIMIT :limit OFFSET :offset
""",
{"limit": per_page, "offset": offset},
)