""" Database abstraction layer for the NEXUS AUTOPARTS console application. Provides all data access methods the console app needs, reading from the PostgreSQL database used by the Flask web dashboard. """ import json as json_module from datetime import datetime, timedelta from typing import Optional from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker import sys, os sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) from config import DB_URL class Database: """Thin abstraction over the nexus_autoparts PostgreSQL database.""" def __init__(self, db_url: Optional[str] = None): self.db_url = db_url or DB_URL self._engine = None self._Session = None self._cache: dict = {} # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _get_engine(self): if self._engine is None: self._engine = create_engine(self.db_url, pool_pre_ping=True) self._Session = sessionmaker(bind=self._engine) return self._engine def _session(self): self._get_engine() return self._Session() def close(self): """Dispose the engine connection pool.""" if self._engine is not None: self._engine.dispose() self._engine = None self._Session = None def _query(self, sql: str, params: dict = None, one: bool = False): """Execute a SELECT and return list[dict] (or a single dict if *one*).""" session = self._session() try: rows = session.execute(text(sql), params or {}).mappings().all() if one: return dict(rows[0]) if rows else None return [dict(r) for r in rows] finally: session.close() def _query_cached(self, cache_key: str, sql: str, params: dict = None): """Execute a SELECT with in-memory caching for repeated queries.""" if cache_key in self._cache: return self._cache[cache_key] result = self._query(sql, params) self._cache[cache_key] = result return result def _execute(self, sql: str, params: dict = None) -> Optional[int]: """Execute an INSERT/UPDATE/DELETE. Returns scalar result if RETURNING used.""" session = self._session() try: result = session.execute(text(sql), params or {}) session.commit() self._cache.clear() # If the query has RETURNING, get the scalar try: return result.scalar() except Exception: return None finally: session.close() # ================================================================== # Vehicle navigation # ================================================================== def get_brands(self) -> list[dict]: """Return all brands ordered by name: [{id, name, country}].""" return self._query_cached( "brands", "SELECT id_brand AS id, name_brand AS name, country FROM brands ORDER BY name_brand", ) def get_models(self, brand: Optional[str] = None) -> list[dict]: """Return models, optionally filtered by brand name (case-insensitive).""" if brand: key = f"models:{brand.upper()}" return self._query_cached( key, """ SELECT MIN(m.id_model) AS id, m.name_model AS name FROM models m JOIN brands b ON m.brand_id = b.id_brand WHERE b.name_brand ILIKE :brand GROUP BY UPPER(m.name_model), m.name_model ORDER BY m.name_model """, {"brand": brand}, ) return self._query_cached( "models:all", "SELECT MIN(id_model) AS id, name_model AS name FROM models GROUP BY UPPER(name_model), name_model ORDER BY name_model", ) def get_years( self, brand: Optional[str] = None, model: Optional[str] = None ) -> list[dict]: """Return years, optionally filtered by brand and/or model.""" sql = """ SELECT DISTINCT y.id_year AS id, y.year_car AS year FROM years y JOIN model_year_engine mye ON y.id_year = mye.year_id JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand WHERE 1=1 """ params: dict = {} if brand: sql += " AND b.name_brand ILIKE :brand" params["brand"] = brand if model: sql += " AND m.name_model ILIKE :model" params["model"] = model sql += " ORDER BY y.year_car DESC" return self._query(sql, params) def get_engines( self, brand: Optional[str] = None, model: Optional[str] = None, year: Optional[int] = None, ) -> list[dict]: """Return engines, optionally filtered by brand/model/year.""" sql = """ SELECT MIN(e.id_engine) AS id, e.name_engine AS name, MAX(e.displacement_cc) AS displacement_cc, MAX(e.cylinders) AS cylinders, MAX(ft.name_fuel) AS fuel_type, MAX(e.power_hp) AS power_hp, MAX(e.torque_nm) AS torque_nm, MAX(e.engine_code) AS engine_code FROM engines e LEFT JOIN fuel_type ft ON e.id_fuel = ft.id_fuel JOIN model_year_engine mye ON e.id_engine = mye.engine_id JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year WHERE 1=1 """ params: dict = {} if brand: sql += " AND b.name_brand ILIKE :brand" params["brand"] = brand if model: sql += " AND m.name_model ILIKE :model" params["model"] = model if year: sql += " AND y.year_car = :year" params["year"] = int(year) sql += " GROUP BY UPPER(e.name_engine), e.name_engine ORDER BY e.name_engine" return self._query(sql, params) def get_model_year_engine( self, brand: str, model: str, year: int, engine_id: Optional[int] = None, ) -> list[dict]: """Return model_year_engine records for a specific vehicle config.""" sql = """ SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.id_engine AS engine_id, e.name_engine AS engine, mye.trim_level, dt.name_drivetrain AS drivetrain, tr.name_transmission AS transmission FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission WHERE b.name_brand ILIKE :brand AND m.name_model ILIKE :model AND y.year_car = :year """ params: dict = {"brand": brand, "model": model, "year": int(year)} if engine_id: sql += " AND e.id_engine = :engine_id" params["engine_id"] = engine_id sql += " ORDER BY e.name_engine, mye.trim_level" return self._query(sql, params) # ================================================================== # Parts catalog # ================================================================== def get_categories(self) -> list[dict]: """Return all part categories ordered by display_order.""" return self._query_cached( "categories", """ SELECT id_part_category AS id, name_part_category AS name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name_part_category """, ) def get_groups(self, category_id: int) -> list[dict]: """Return part groups for a given category.""" return self._query( """ SELECT id_part_group AS id, name_part_group AS name, name_es, slug, display_order FROM part_groups WHERE category_id = :cat_id ORDER BY display_order, name_part_group """, {"cat_id": category_id}, ) def get_parts( self, group_id: Optional[int] = None, mye_id: Optional[int] = None, page: int = 1, per_page: int = 15, ) -> list[dict]: """Return parts with optional group/vehicle filter and pagination.""" per_page = min(per_page, 100) offset = (page - 1) * per_page sql = """ SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.group_id, pg.name_part_group AS group_name, pc.name_part_category AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category """ where_parts: list[str] = [] params: dict = {} if group_id: where_parts.append("p.group_id = :group_id") params["group_id"] = group_id if mye_id: where_parts.append( "p.id_part IN (SELECT part_id FROM vehicle_parts WHERE model_year_engine_id = :mye_id)" ) params["mye_id"] = mye_id if where_parts: sql += " WHERE " + " AND ".join(where_parts) sql += " ORDER BY p.name_part LIMIT :limit OFFSET :offset" params["limit"] = per_page params["offset"] = offset return self._query(sql, params) def get_part(self, part_id: int) -> Optional[dict]: """Return a single part with group/category info, or None.""" return self._query( """ SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.description, p.description_es, p.weight_kg, mat.name_material AS material, p.group_id, pg.name_part_group AS group_name, pg.name_es AS group_name_es, pc.id_part_category AS category_id, pc.name_part_category AS category_name, pc.name_es AS category_name_es FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category LEFT JOIN materials mat ON p.id_material = mat.id_material WHERE p.id_part = :part_id """, {"part_id": part_id}, one=True, ) def get_alternatives(self, part_id: int) -> list[dict]: """Return aftermarket alternatives for an OEM part.""" return self._query( """ SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name, ap.name_es, m.name_manufacture AS manufacturer_name, ap.manufacturer_id, qt.name_quality AS quality_tier, ap.price_usd, ap.warranty_months FROM aftermarket_parts ap JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier WHERE ap.oem_part_id = :part_id ORDER BY qt.name_quality DESC, ap.price_usd ASC """, {"part_id": part_id}, ) def get_cross_references(self, part_id: int) -> list[dict]: """Return cross-reference numbers for a part.""" return self._query( """ SELECT id_part_cross_ref AS id, cross_reference_number, rt.name_ref_type AS reference_type, source_ref AS source, notes FROM part_cross_references pcr LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type WHERE pcr.part_id = :part_id ORDER BY rt.name_ref_type, pcr.cross_reference_number """, {"part_id": part_id}, ) def get_vehicles_for_part(self, part_id: int) -> list[dict]: """Return vehicles that use a specific part.""" return self._query( """ SELECT b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, mye.trim_level, vp.quantity_required, pp.name_position_part AS position, vp.fitment_notes FROM vehicle_parts vp JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part WHERE vp.part_id = :part_id ORDER BY b.name_brand, m.name_model, y.year_car """, {"part_id": part_id}, ) # ================================================================== # Search # ================================================================== def search_parts( self, query: str, page: int = 1, per_page: int = 15 ) -> list[dict]: """Full-text search using PostgreSQL tsvector.""" per_page = min(per_page, 100) offset = (page - 1) * per_page return self._query( """ SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.description, pg.name_part_group AS group_name, pc.name_part_category AS category_name, ts_rank(p.search_vector, plainto_tsquery('spanish', :q)) AS rank FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE p.search_vector @@ plainto_tsquery('spanish', :q) ORDER BY rank DESC LIMIT :limit OFFSET :offset """, {"q": query, "limit": per_page, "offset": offset}, ) def search_part_number(self, number: str) -> list[dict]: """Search OEM, aftermarket, and cross-reference part numbers.""" results: list[dict] = [] search_term = f"%{number}%" # OEM parts rows = self._query( """ SELECT id_part AS id, oem_part_number, name_part AS name, name_es FROM parts WHERE oem_part_number ILIKE :term """, {"term": search_term}, ) for row in rows: results.append({ **row, "match_type": "oem", "matched_number": row["oem_part_number"], }) # Aftermarket parts rows = self._query( """ SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, ap.part_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part WHERE ap.part_number ILIKE :term """, {"term": search_term}, ) for row in rows: results.append({ "id": row["id"], "oem_part_number": row["oem_part_number"], "name": row["name"], "name_es": row["name_es"], "match_type": "aftermarket", "matched_number": row["part_number"], }) # Cross-references rows = self._query( """ SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pcr.cross_reference_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part WHERE pcr.cross_reference_number ILIKE :term """, {"term": search_term}, ) for row in rows: results.append({ "id": row["id"], "oem_part_number": row["oem_part_number"], "name": row["name"], "name_es": row["name_es"], "match_type": "cross_reference", "matched_number": row["cross_reference_number"], }) return results # ================================================================== # VIN cache # ================================================================== def get_vin_cache(self, vin: str) -> Optional[dict]: """Return cached VIN decode data if still valid, else None.""" return self._query( """ SELECT vin, decoded_data, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, created_at, expires_at FROM vin_cache WHERE vin = :vin AND expires_at > NOW() """, {"vin": vin.upper().strip()}, one=True, ) def save_vin_cache( self, vin: str, data: str, make: str, model: str, year: int, engine_info: str, body_class: str, drive_type: str, ) -> Optional[int]: """Insert or update a VIN cache entry (30-day expiry).""" expires = datetime.utcnow() + timedelta(days=30) decoded = json_module.loads(data) if isinstance(data, str) else data return self._execute( """ INSERT INTO vin_cache (vin, decoded_data, make, model, year, engine_info, body_class, drive_type, expires_at) VALUES (:vin, :decoded_data, :make, :model, :year, :engine_info, :body_class, :drive_type, :expires_at) ON CONFLICT (vin) DO UPDATE SET decoded_data = EXCLUDED.decoded_data, make = EXCLUDED.make, model = EXCLUDED.model, year = EXCLUDED.year, engine_info = EXCLUDED.engine_info, body_class = EXCLUDED.body_class, drive_type = EXCLUDED.drive_type, expires_at = EXCLUDED.expires_at RETURNING id """, { "vin": vin.upper().strip(), "decoded_data": json_module.dumps(decoded), "make": make, "model": model, "year": year, "engine_info": engine_info, "body_class": body_class, "drive_type": drive_type, "expires_at": expires.isoformat(), }, ) # ================================================================== # Stats # ================================================================== def get_stats(self) -> dict: """Return counts for all major tables plus top brands by fitment.""" session = self._session() try: stats: dict = {} table_map = { "brands": "brands", "models": "models", "years": "years", "engines": "engines", "part_categories": "part_categories", "part_groups": "part_groups", "parts": "parts", "aftermarket_parts": "aftermarket_parts", "manufacturers": "manufacturers", "vehicle_parts": "vehicle_parts", "part_cross_references": "part_cross_references", } for key, table in table_map.items(): row = session.execute(text(f"SELECT COUNT(*) AS cnt FROM {table}")).mappings().one() stats[key] = row["cnt"] # Top brands by number of fitments rows = session.execute(text(""" SELECT b.name_brand AS name, COUNT(DISTINCT vp.id_vehicle_part) AS cnt FROM brands b JOIN models m ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye GROUP BY b.name_brand ORDER BY cnt DESC LIMIT 10 """)).mappings().all() stats["top_brands"] = [ {"name": r["name"], "count": r["cnt"]} for r in rows ] return stats finally: session.close() # ================================================================== # Admin — Manufacturers # ================================================================== def get_manufacturers(self) -> list[dict]: """Return all manufacturers ordered by name.""" return self._query( """ SELECT m.id_manufacture AS id, m.name_manufacture AS name, mt.name_type_manu AS type, qt.name_quality AS quality_tier, c.name_country AS country, m.logo_url, m.website FROM manufacturers m LEFT JOIN manufacture_type mt ON m.id_type_manu = mt.id_type_manu LEFT JOIN quality_tier qt ON m.id_quality_tier = qt.id_quality_tier LEFT JOIN countries c ON m.id_country = c.id_country ORDER BY m.name_manufacture """ ) def create_manufacturer(self, data: dict) -> Optional[int]: """Insert a new manufacturer and return its id.""" return self._execute( """ INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier, id_country, logo_url, website) VALUES (:name, (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type), (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier), (SELECT id_country FROM countries WHERE name_country = :country), :logo_url, :website) RETURNING id_manufacture """, { "name": data["name"], "type": data.get("type"), "quality_tier": data.get("quality_tier"), "country": data.get("country"), "logo_url": data.get("logo_url"), "website": data.get("website"), }, ) def update_manufacturer(self, mfr_id: int, data: dict) -> None: """Update an existing manufacturer.""" self._execute( """ UPDATE manufacturers SET name_manufacture = :name, id_type_manu = (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type), id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier), id_country = (SELECT id_country FROM countries WHERE name_country = :country), logo_url = :logo_url, website = :website WHERE id_manufacture = :mfr_id """, { "name": data["name"], "type": data.get("type"), "quality_tier": data.get("quality_tier"), "country": data.get("country"), "logo_url": data.get("logo_url"), "website": data.get("website"), "mfr_id": mfr_id, }, ) def delete_manufacturer(self, mfr_id: int) -> None: """Delete a manufacturer by id.""" self._execute("DELETE FROM manufacturers WHERE id_manufacture = :id", {"id": mfr_id}) # ================================================================== # Admin — Parts # ================================================================== def create_part(self, data: dict) -> Optional[int]: """Insert a new part and return its id.""" return self._execute( """ INSERT INTO parts (oem_part_number, name_part, name_es, group_id, description, description_es, weight_kg, id_material) VALUES (:oem_part_number, :name, :name_es, :group_id, :description, :description_es, :weight_kg, (SELECT id_material FROM materials WHERE name_material = :material)) RETURNING id_part """, { "oem_part_number": data["oem_part_number"], "name": data["name"], "name_es": data.get("name_es"), "group_id": data.get("group_id"), "description": data.get("description"), "description_es": data.get("description_es"), "weight_kg": data.get("weight_kg"), "material": data.get("material"), }, ) def update_part(self, part_id: int, data: dict) -> None: """Update an existing part.""" self._execute( """ UPDATE parts SET oem_part_number = :oem_part_number, name_part = :name, name_es = :name_es, group_id = :group_id, description = :description, description_es = :description_es, weight_kg = :weight_kg, id_material = (SELECT id_material FROM materials WHERE name_material = :material) WHERE id_part = :part_id """, { "oem_part_number": data["oem_part_number"], "name": data["name"], "name_es": data.get("name_es"), "group_id": data.get("group_id"), "description": data.get("description"), "description_es": data.get("description_es"), "weight_kg": data.get("weight_kg"), "material": data.get("material"), "part_id": part_id, }, ) def delete_part(self, part_id: int) -> None: """Delete a part by id.""" self._execute("DELETE FROM parts WHERE id_part = :id", {"id": part_id}) # ================================================================== # Admin — Cross-references # ================================================================== def create_crossref(self, data: dict) -> Optional[int]: """Insert a new cross-reference and return its id.""" return self._execute( """ INSERT INTO part_cross_references (part_id, cross_reference_number, id_ref_type, source_ref, notes) VALUES (:part_id, :cross_reference_number, (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type), :source, :notes) RETURNING id_part_cross_ref """, { "part_id": data["part_id"], "cross_reference_number": data["cross_reference_number"], "reference_type": data.get("reference_type"), "source": data.get("source"), "notes": data.get("notes"), }, ) def update_crossref(self, xref_id: int, data: dict) -> None: """Update an existing cross-reference.""" self._execute( """ UPDATE part_cross_references SET part_id = :part_id, cross_reference_number = :cross_reference_number, id_ref_type = (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type), source_ref = :source, notes = :notes WHERE id_part_cross_ref = :xref_id """, { "part_id": data["part_id"], "cross_reference_number": data["cross_reference_number"], "reference_type": data.get("reference_type"), "source": data.get("source"), "notes": data.get("notes"), "xref_id": xref_id, }, ) def delete_crossref(self, xref_id: int) -> None: """Delete a cross-reference by id.""" self._execute( "DELETE FROM part_cross_references WHERE id_part_cross_ref = :id", {"id": xref_id} ) def get_crossrefs_paginated( self, page: int = 1, per_page: int = 15 ) -> list[dict]: """Return paginated cross-references with part info.""" per_page = min(per_page, 100) offset = (page - 1) * per_page return self._query( """ SELECT pcr.id_part_cross_ref AS id, pcr.part_id, pcr.cross_reference_number, rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes, p.oem_part_number, p.name_part AS part_name FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type ORDER BY pcr.id_part_cross_ref LIMIT :limit OFFSET :offset """, {"limit": per_page, "offset": offset}, )