- Migrate from SQLite to PostgreSQL with normalized schema - Add 11 lookup tables (fuel_type, body_type, drivetrain, transmission, materials, position_part, manufacture_type, quality_tier, countries, reference_type, shapes) - Rewrite dashboard/server.py (76 routes) using SQLAlchemy text() queries - Rewrite console/db.py (27 methods) using SQLAlchemy ORM - Add models.py with 27 SQLAlchemy model definitions - Add config.py for centralized DB_URL configuration - Add migrate_to_postgres.py migration script - Add docs/METABASE_GUIDE.md with complete data entry guide - Rebrand from "AUTOPARTS DB" to "NEXUS AUTOPARTS" - Fill vehicle data gaps via NHTSA API + heuristics: engines (cylinders, power, torque), brands (country, founded_year), models (body_type, production years), MYE (drivetrain, transmission, trim) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
787 lines
30 KiB
Python
787 lines
30 KiB
Python
"""
|
|
Database abstraction layer for the NEXUS AUTOPARTS console application.
|
|
|
|
Provides all data access methods the console app needs, reading from the
|
|
PostgreSQL database used by the Flask web dashboard.
|
|
"""
|
|
|
|
import json as json_module
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
import sys, os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
|
|
from config import DB_URL
|
|
|
|
|
|
class Database:
|
|
"""Thin abstraction over the nexus_autoparts PostgreSQL database."""
|
|
|
|
def __init__(self, db_url: Optional[str] = None):
|
|
self.db_url = db_url or DB_URL
|
|
self._engine = None
|
|
self._Session = None
|
|
self._cache: dict = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_engine(self):
|
|
if self._engine is None:
|
|
self._engine = create_engine(self.db_url, pool_pre_ping=True)
|
|
self._Session = sessionmaker(bind=self._engine)
|
|
return self._engine
|
|
|
|
def _session(self):
|
|
self._get_engine()
|
|
return self._Session()
|
|
|
|
def close(self):
|
|
"""Dispose the engine connection pool."""
|
|
if self._engine is not None:
|
|
self._engine.dispose()
|
|
self._engine = None
|
|
self._Session = None
|
|
|
|
def _query(self, sql: str, params: dict = None, one: bool = False):
|
|
"""Execute a SELECT and return list[dict] (or a single dict if *one*)."""
|
|
session = self._session()
|
|
try:
|
|
rows = session.execute(text(sql), params or {}).mappings().all()
|
|
if one:
|
|
return dict(rows[0]) if rows else None
|
|
return [dict(r) for r in rows]
|
|
finally:
|
|
session.close()
|
|
|
|
def _query_cached(self, cache_key: str, sql: str, params: dict = None):
|
|
"""Execute a SELECT with in-memory caching for repeated queries."""
|
|
if cache_key in self._cache:
|
|
return self._cache[cache_key]
|
|
result = self._query(sql, params)
|
|
self._cache[cache_key] = result
|
|
return result
|
|
|
|
def _execute(self, sql: str, params: dict = None) -> Optional[int]:
|
|
"""Execute an INSERT/UPDATE/DELETE. Returns scalar result if RETURNING used."""
|
|
session = self._session()
|
|
try:
|
|
result = session.execute(text(sql), params or {})
|
|
session.commit()
|
|
self._cache.clear()
|
|
# If the query has RETURNING, get the scalar
|
|
try:
|
|
return result.scalar()
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
session.close()
|
|
|
|
# ==================================================================
|
|
# Vehicle navigation
|
|
# ==================================================================
|
|
|
|
def get_brands(self) -> list[dict]:
|
|
"""Return all brands ordered by name: [{id, name, country}]."""
|
|
return self._query_cached(
|
|
"brands",
|
|
"SELECT id_brand AS id, name_brand AS name, country FROM brands ORDER BY name_brand",
|
|
)
|
|
|
|
def get_models(self, brand: Optional[str] = None) -> list[dict]:
|
|
"""Return models, optionally filtered by brand name (case-insensitive)."""
|
|
if brand:
|
|
key = f"models:{brand.upper()}"
|
|
return self._query_cached(
|
|
key,
|
|
"""
|
|
SELECT MIN(m.id_model) AS id, m.name_model AS name
|
|
FROM models m
|
|
JOIN brands b ON m.brand_id = b.id_brand
|
|
WHERE b.name_brand ILIKE :brand
|
|
GROUP BY UPPER(m.name_model), m.name_model
|
|
ORDER BY m.name_model
|
|
""",
|
|
{"brand": brand},
|
|
)
|
|
return self._query_cached(
|
|
"models:all",
|
|
"SELECT MIN(id_model) AS id, name_model AS name FROM models GROUP BY UPPER(name_model), name_model ORDER BY name_model",
|
|
)
|
|
|
|
def get_years(
|
|
self, brand: Optional[str] = None, model: Optional[str] = None
|
|
) -> list[dict]:
|
|
"""Return years, optionally filtered by brand and/or model."""
|
|
sql = """
|
|
SELECT DISTINCT y.id_year AS id, y.year_car AS year
|
|
FROM years y
|
|
JOIN model_year_engine mye ON y.id_year = mye.year_id
|
|
JOIN models m ON mye.model_id = m.id_model
|
|
JOIN brands b ON m.brand_id = b.id_brand
|
|
WHERE 1=1
|
|
"""
|
|
params: dict = {}
|
|
if brand:
|
|
sql += " AND b.name_brand ILIKE :brand"
|
|
params["brand"] = brand
|
|
if model:
|
|
sql += " AND m.name_model ILIKE :model"
|
|
params["model"] = model
|
|
sql += " ORDER BY y.year_car DESC"
|
|
return self._query(sql, params)
|
|
|
|
def get_engines(
|
|
self,
|
|
brand: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
year: Optional[int] = None,
|
|
) -> list[dict]:
|
|
"""Return engines, optionally filtered by brand/model/year."""
|
|
sql = """
|
|
SELECT MIN(e.id_engine) AS id, e.name_engine AS name,
|
|
MAX(e.displacement_cc) AS displacement_cc,
|
|
MAX(e.cylinders) AS cylinders,
|
|
MAX(ft.name_fuel) AS fuel_type,
|
|
MAX(e.power_hp) AS power_hp,
|
|
MAX(e.torque_nm) AS torque_nm,
|
|
MAX(e.engine_code) AS engine_code
|
|
FROM engines e
|
|
LEFT JOIN fuel_type ft ON e.id_fuel = ft.id_fuel
|
|
JOIN model_year_engine mye ON e.id_engine = mye.engine_id
|
|
JOIN models m ON mye.model_id = m.id_model
|
|
JOIN brands b ON m.brand_id = b.id_brand
|
|
JOIN years y ON mye.year_id = y.id_year
|
|
WHERE 1=1
|
|
"""
|
|
params: dict = {}
|
|
if brand:
|
|
sql += " AND b.name_brand ILIKE :brand"
|
|
params["brand"] = brand
|
|
if model:
|
|
sql += " AND m.name_model ILIKE :model"
|
|
params["model"] = model
|
|
if year:
|
|
sql += " AND y.year_car = :year"
|
|
params["year"] = int(year)
|
|
sql += " GROUP BY UPPER(e.name_engine), e.name_engine ORDER BY e.name_engine"
|
|
return self._query(sql, params)
|
|
|
|
def get_model_year_engine(
|
|
self,
|
|
brand: str,
|
|
model: str,
|
|
year: int,
|
|
engine_id: Optional[int] = None,
|
|
) -> list[dict]:
|
|
"""Return model_year_engine records for a specific vehicle config."""
|
|
sql = """
|
|
SELECT
|
|
mye.id_mye AS id,
|
|
b.name_brand AS brand,
|
|
m.name_model AS model,
|
|
y.year_car AS year,
|
|
e.id_engine AS engine_id,
|
|
e.name_engine AS engine,
|
|
mye.trim_level,
|
|
dt.name_drivetrain AS drivetrain,
|
|
tr.name_transmission AS transmission
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id_model
|
|
JOIN brands b ON m.brand_id = b.id_brand
|
|
JOIN years y ON mye.year_id = y.id_year
|
|
JOIN engines e ON mye.engine_id = e.id_engine
|
|
LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain
|
|
LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission
|
|
WHERE b.name_brand ILIKE :brand
|
|
AND m.name_model ILIKE :model
|
|
AND y.year_car = :year
|
|
"""
|
|
params: dict = {"brand": brand, "model": model, "year": int(year)}
|
|
if engine_id:
|
|
sql += " AND e.id_engine = :engine_id"
|
|
params["engine_id"] = engine_id
|
|
sql += " ORDER BY e.name_engine, mye.trim_level"
|
|
return self._query(sql, params)
|
|
|
|
# ==================================================================
|
|
# Parts catalog
|
|
# ==================================================================
|
|
|
|
def get_categories(self) -> list[dict]:
|
|
"""Return all part categories ordered by display_order."""
|
|
return self._query_cached(
|
|
"categories",
|
|
"""
|
|
SELECT id_part_category AS id, name_part_category AS name,
|
|
name_es, slug, icon_name, display_order
|
|
FROM part_categories
|
|
ORDER BY display_order, name_part_category
|
|
""",
|
|
)
|
|
|
|
def get_groups(self, category_id: int) -> list[dict]:
|
|
"""Return part groups for a given category."""
|
|
return self._query(
|
|
"""
|
|
SELECT id_part_group AS id, name_part_group AS name,
|
|
name_es, slug, display_order
|
|
FROM part_groups
|
|
WHERE category_id = :cat_id
|
|
ORDER BY display_order, name_part_group
|
|
""",
|
|
{"cat_id": category_id},
|
|
)
|
|
|
|
def get_parts(
|
|
self,
|
|
group_id: Optional[int] = None,
|
|
mye_id: Optional[int] = None,
|
|
page: int = 1,
|
|
per_page: int = 15,
|
|
) -> list[dict]:
|
|
"""Return parts with optional group/vehicle filter and pagination."""
|
|
per_page = min(per_page, 100)
|
|
offset = (page - 1) * per_page
|
|
|
|
sql = """
|
|
SELECT
|
|
p.id_part AS id,
|
|
p.oem_part_number,
|
|
p.name_part AS name,
|
|
p.name_es,
|
|
p.group_id,
|
|
pg.name_part_group AS group_name,
|
|
pc.name_part_category AS category_name
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id_part_group
|
|
JOIN part_categories pc ON pg.category_id = pc.id_part_category
|
|
"""
|
|
where_parts: list[str] = []
|
|
params: dict = {}
|
|
|
|
if group_id:
|
|
where_parts.append("p.group_id = :group_id")
|
|
params["group_id"] = group_id
|
|
if mye_id:
|
|
where_parts.append(
|
|
"p.id_part IN (SELECT part_id FROM vehicle_parts WHERE model_year_engine_id = :mye_id)"
|
|
)
|
|
params["mye_id"] = mye_id
|
|
|
|
if where_parts:
|
|
sql += " WHERE " + " AND ".join(where_parts)
|
|
|
|
sql += " ORDER BY p.name_part LIMIT :limit OFFSET :offset"
|
|
params["limit"] = per_page
|
|
params["offset"] = offset
|
|
|
|
return self._query(sql, params)
|
|
|
|
def get_part(self, part_id: int) -> Optional[dict]:
|
|
"""Return a single part with group/category info, or None."""
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
p.id_part AS id,
|
|
p.oem_part_number,
|
|
p.name_part AS name,
|
|
p.name_es,
|
|
p.description,
|
|
p.description_es,
|
|
p.weight_kg,
|
|
mat.name_material AS material,
|
|
p.group_id,
|
|
pg.name_part_group AS group_name,
|
|
pg.name_es AS group_name_es,
|
|
pc.id_part_category AS category_id,
|
|
pc.name_part_category AS category_name,
|
|
pc.name_es AS category_name_es
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id_part_group
|
|
JOIN part_categories pc ON pg.category_id = pc.id_part_category
|
|
LEFT JOIN materials mat ON p.id_material = mat.id_material
|
|
WHERE p.id_part = :part_id
|
|
""",
|
|
{"part_id": part_id},
|
|
one=True,
|
|
)
|
|
|
|
def get_alternatives(self, part_id: int) -> list[dict]:
|
|
"""Return aftermarket alternatives for an OEM part."""
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
ap.id_aftermarket_parts AS id,
|
|
ap.part_number,
|
|
ap.name_aftermarket_parts AS name,
|
|
ap.name_es,
|
|
m.name_manufacture AS manufacturer_name,
|
|
ap.manufacturer_id,
|
|
qt.name_quality AS quality_tier,
|
|
ap.price_usd,
|
|
ap.warranty_months
|
|
FROM aftermarket_parts ap
|
|
JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture
|
|
LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier
|
|
WHERE ap.oem_part_id = :part_id
|
|
ORDER BY qt.name_quality DESC, ap.price_usd ASC
|
|
""",
|
|
{"part_id": part_id},
|
|
)
|
|
|
|
def get_cross_references(self, part_id: int) -> list[dict]:
|
|
"""Return cross-reference numbers for a part."""
|
|
return self._query(
|
|
"""
|
|
SELECT id_part_cross_ref AS id, cross_reference_number,
|
|
rt.name_ref_type AS reference_type,
|
|
source_ref AS source, notes
|
|
FROM part_cross_references pcr
|
|
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
|
|
WHERE pcr.part_id = :part_id
|
|
ORDER BY rt.name_ref_type, pcr.cross_reference_number
|
|
""",
|
|
{"part_id": part_id},
|
|
)
|
|
|
|
def get_vehicles_for_part(self, part_id: int) -> list[dict]:
|
|
"""Return vehicles that use a specific part."""
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
b.name_brand AS brand,
|
|
m.name_model AS model,
|
|
y.year_car AS year,
|
|
e.name_engine AS engine,
|
|
mye.trim_level,
|
|
vp.quantity_required,
|
|
pp.name_position_part AS position,
|
|
vp.fitment_notes
|
|
FROM vehicle_parts vp
|
|
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye
|
|
JOIN models m ON mye.model_id = m.id_model
|
|
JOIN brands b ON m.brand_id = b.id_brand
|
|
JOIN years y ON mye.year_id = y.id_year
|
|
JOIN engines e ON mye.engine_id = e.id_engine
|
|
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
|
|
WHERE vp.part_id = :part_id
|
|
ORDER BY b.name_brand, m.name_model, y.year_car
|
|
""",
|
|
{"part_id": part_id},
|
|
)
|
|
|
|
# ==================================================================
|
|
# Search
|
|
# ==================================================================
|
|
|
|
def search_parts(
|
|
self, query: str, page: int = 1, per_page: int = 15
|
|
) -> list[dict]:
|
|
"""Full-text search using PostgreSQL tsvector."""
|
|
per_page = min(per_page, 100)
|
|
offset = (page - 1) * per_page
|
|
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
p.id_part AS id,
|
|
p.oem_part_number,
|
|
p.name_part AS name,
|
|
p.name_es,
|
|
p.description,
|
|
pg.name_part_group AS group_name,
|
|
pc.name_part_category AS category_name,
|
|
ts_rank(p.search_vector, plainto_tsquery('spanish', :q)) AS rank
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id_part_group
|
|
JOIN part_categories pc ON pg.category_id = pc.id_part_category
|
|
WHERE p.search_vector @@ plainto_tsquery('spanish', :q)
|
|
ORDER BY rank DESC
|
|
LIMIT :limit OFFSET :offset
|
|
""",
|
|
{"q": query, "limit": per_page, "offset": offset},
|
|
)
|
|
|
|
def search_part_number(self, number: str) -> list[dict]:
|
|
"""Search OEM, aftermarket, and cross-reference part numbers."""
|
|
results: list[dict] = []
|
|
search_term = f"%{number}%"
|
|
|
|
# OEM parts
|
|
rows = self._query(
|
|
"""
|
|
SELECT id_part AS id, oem_part_number, name_part AS name, name_es
|
|
FROM parts
|
|
WHERE oem_part_number ILIKE :term
|
|
""",
|
|
{"term": search_term},
|
|
)
|
|
for row in rows:
|
|
results.append({
|
|
**row,
|
|
"match_type": "oem",
|
|
"matched_number": row["oem_part_number"],
|
|
})
|
|
|
|
# Aftermarket parts
|
|
rows = self._query(
|
|
"""
|
|
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name,
|
|
p.name_es, ap.part_number
|
|
FROM aftermarket_parts ap
|
|
JOIN parts p ON ap.oem_part_id = p.id_part
|
|
WHERE ap.part_number ILIKE :term
|
|
""",
|
|
{"term": search_term},
|
|
)
|
|
for row in rows:
|
|
results.append({
|
|
"id": row["id"],
|
|
"oem_part_number": row["oem_part_number"],
|
|
"name": row["name"],
|
|
"name_es": row["name_es"],
|
|
"match_type": "aftermarket",
|
|
"matched_number": row["part_number"],
|
|
})
|
|
|
|
# Cross-references
|
|
rows = self._query(
|
|
"""
|
|
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name,
|
|
p.name_es, pcr.cross_reference_number
|
|
FROM part_cross_references pcr
|
|
JOIN parts p ON pcr.part_id = p.id_part
|
|
WHERE pcr.cross_reference_number ILIKE :term
|
|
""",
|
|
{"term": search_term},
|
|
)
|
|
for row in rows:
|
|
results.append({
|
|
"id": row["id"],
|
|
"oem_part_number": row["oem_part_number"],
|
|
"name": row["name"],
|
|
"name_es": row["name_es"],
|
|
"match_type": "cross_reference",
|
|
"matched_number": row["cross_reference_number"],
|
|
})
|
|
|
|
return results
|
|
|
|
# ==================================================================
|
|
# VIN cache
|
|
# ==================================================================
|
|
|
|
def get_vin_cache(self, vin: str) -> Optional[dict]:
|
|
"""Return cached VIN decode data if still valid, else None."""
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
vin, decoded_data, make, model, year,
|
|
engine_info, body_class, drive_type,
|
|
model_year_engine_id, created_at, expires_at
|
|
FROM vin_cache
|
|
WHERE vin = :vin AND expires_at > NOW()
|
|
""",
|
|
{"vin": vin.upper().strip()},
|
|
one=True,
|
|
)
|
|
|
|
def save_vin_cache(
|
|
self,
|
|
vin: str,
|
|
data: str,
|
|
make: str,
|
|
model: str,
|
|
year: int,
|
|
engine_info: str,
|
|
body_class: str,
|
|
drive_type: str,
|
|
) -> Optional[int]:
|
|
"""Insert or update a VIN cache entry (30-day expiry)."""
|
|
expires = datetime.utcnow() + timedelta(days=30)
|
|
decoded = json_module.loads(data) if isinstance(data, str) else data
|
|
return self._execute(
|
|
"""
|
|
INSERT INTO vin_cache
|
|
(vin, decoded_data, make, model, year,
|
|
engine_info, body_class, drive_type, expires_at)
|
|
VALUES (:vin, :decoded_data, :make, :model, :year,
|
|
:engine_info, :body_class, :drive_type, :expires_at)
|
|
ON CONFLICT (vin) DO UPDATE SET
|
|
decoded_data = EXCLUDED.decoded_data,
|
|
make = EXCLUDED.make,
|
|
model = EXCLUDED.model,
|
|
year = EXCLUDED.year,
|
|
engine_info = EXCLUDED.engine_info,
|
|
body_class = EXCLUDED.body_class,
|
|
drive_type = EXCLUDED.drive_type,
|
|
expires_at = EXCLUDED.expires_at
|
|
RETURNING id
|
|
""",
|
|
{
|
|
"vin": vin.upper().strip(),
|
|
"decoded_data": json_module.dumps(decoded),
|
|
"make": make,
|
|
"model": model,
|
|
"year": year,
|
|
"engine_info": engine_info,
|
|
"body_class": body_class,
|
|
"drive_type": drive_type,
|
|
"expires_at": expires.isoformat(),
|
|
},
|
|
)
|
|
|
|
# ==================================================================
|
|
# Stats
|
|
# ==================================================================
|
|
|
|
def get_stats(self) -> dict:
|
|
"""Return counts for all major tables plus top brands by fitment."""
|
|
session = self._session()
|
|
try:
|
|
stats: dict = {}
|
|
table_map = {
|
|
"brands": "brands",
|
|
"models": "models",
|
|
"years": "years",
|
|
"engines": "engines",
|
|
"part_categories": "part_categories",
|
|
"part_groups": "part_groups",
|
|
"parts": "parts",
|
|
"aftermarket_parts": "aftermarket_parts",
|
|
"manufacturers": "manufacturers",
|
|
"vehicle_parts": "vehicle_parts",
|
|
"part_cross_references": "part_cross_references",
|
|
}
|
|
for key, table in table_map.items():
|
|
row = session.execute(text(f"SELECT COUNT(*) AS cnt FROM {table}")).mappings().one()
|
|
stats[key] = row["cnt"]
|
|
|
|
# Top brands by number of fitments
|
|
rows = session.execute(text("""
|
|
SELECT b.name_brand AS name, COUNT(DISTINCT vp.id_vehicle_part) AS cnt
|
|
FROM brands b
|
|
JOIN models m ON m.brand_id = b.id_brand
|
|
JOIN model_year_engine mye ON mye.model_id = m.id_model
|
|
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye
|
|
GROUP BY b.name_brand
|
|
ORDER BY cnt DESC
|
|
LIMIT 10
|
|
""")).mappings().all()
|
|
stats["top_brands"] = [
|
|
{"name": r["name"], "count": r["cnt"]} for r in rows
|
|
]
|
|
return stats
|
|
finally:
|
|
session.close()
|
|
|
|
# ==================================================================
|
|
# Admin — Manufacturers
|
|
# ==================================================================
|
|
|
|
def get_manufacturers(self) -> list[dict]:
|
|
"""Return all manufacturers ordered by name."""
|
|
return self._query(
|
|
"""
|
|
SELECT m.id_manufacture AS id, m.name_manufacture AS name,
|
|
mt.name_type_manu AS type,
|
|
qt.name_quality AS quality_tier,
|
|
c.name_country AS country,
|
|
m.logo_url, m.website
|
|
FROM manufacturers m
|
|
LEFT JOIN manufacture_type mt ON m.id_type_manu = mt.id_type_manu
|
|
LEFT JOIN quality_tier qt ON m.id_quality_tier = qt.id_quality_tier
|
|
LEFT JOIN countries c ON m.id_country = c.id_country
|
|
ORDER BY m.name_manufacture
|
|
"""
|
|
)
|
|
|
|
def create_manufacturer(self, data: dict) -> Optional[int]:
|
|
"""Insert a new manufacturer and return its id."""
|
|
return self._execute(
|
|
"""
|
|
INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier,
|
|
id_country, logo_url, website)
|
|
VALUES (:name,
|
|
(SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
|
|
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier),
|
|
(SELECT id_country FROM countries WHERE name_country = :country),
|
|
:logo_url, :website)
|
|
RETURNING id_manufacture
|
|
""",
|
|
{
|
|
"name": data["name"],
|
|
"type": data.get("type"),
|
|
"quality_tier": data.get("quality_tier"),
|
|
"country": data.get("country"),
|
|
"logo_url": data.get("logo_url"),
|
|
"website": data.get("website"),
|
|
},
|
|
)
|
|
|
|
def update_manufacturer(self, mfr_id: int, data: dict) -> None:
|
|
"""Update an existing manufacturer."""
|
|
self._execute(
|
|
"""
|
|
UPDATE manufacturers
|
|
SET name_manufacture = :name,
|
|
id_type_manu = (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
|
|
id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :quality_tier),
|
|
id_country = (SELECT id_country FROM countries WHERE name_country = :country),
|
|
logo_url = :logo_url, website = :website
|
|
WHERE id_manufacture = :mfr_id
|
|
""",
|
|
{
|
|
"name": data["name"],
|
|
"type": data.get("type"),
|
|
"quality_tier": data.get("quality_tier"),
|
|
"country": data.get("country"),
|
|
"logo_url": data.get("logo_url"),
|
|
"website": data.get("website"),
|
|
"mfr_id": mfr_id,
|
|
},
|
|
)
|
|
|
|
def delete_manufacturer(self, mfr_id: int) -> None:
|
|
"""Delete a manufacturer by id."""
|
|
self._execute("DELETE FROM manufacturers WHERE id_manufacture = :id", {"id": mfr_id})
|
|
|
|
# ==================================================================
|
|
# Admin — Parts
|
|
# ==================================================================
|
|
|
|
def create_part(self, data: dict) -> Optional[int]:
|
|
"""Insert a new part and return its id."""
|
|
return self._execute(
|
|
"""
|
|
INSERT INTO parts
|
|
(oem_part_number, name_part, name_es, group_id,
|
|
description, description_es, weight_kg,
|
|
id_material)
|
|
VALUES (:oem_part_number, :name, :name_es, :group_id,
|
|
:description, :description_es, :weight_kg,
|
|
(SELECT id_material FROM materials WHERE name_material = :material))
|
|
RETURNING id_part
|
|
""",
|
|
{
|
|
"oem_part_number": data["oem_part_number"],
|
|
"name": data["name"],
|
|
"name_es": data.get("name_es"),
|
|
"group_id": data.get("group_id"),
|
|
"description": data.get("description"),
|
|
"description_es": data.get("description_es"),
|
|
"weight_kg": data.get("weight_kg"),
|
|
"material": data.get("material"),
|
|
},
|
|
)
|
|
|
|
def update_part(self, part_id: int, data: dict) -> None:
|
|
"""Update an existing part."""
|
|
self._execute(
|
|
"""
|
|
UPDATE parts
|
|
SET oem_part_number = :oem_part_number, name_part = :name,
|
|
name_es = :name_es, group_id = :group_id,
|
|
description = :description, description_es = :description_es,
|
|
weight_kg = :weight_kg,
|
|
id_material = (SELECT id_material FROM materials WHERE name_material = :material)
|
|
WHERE id_part = :part_id
|
|
""",
|
|
{
|
|
"oem_part_number": data["oem_part_number"],
|
|
"name": data["name"],
|
|
"name_es": data.get("name_es"),
|
|
"group_id": data.get("group_id"),
|
|
"description": data.get("description"),
|
|
"description_es": data.get("description_es"),
|
|
"weight_kg": data.get("weight_kg"),
|
|
"material": data.get("material"),
|
|
"part_id": part_id,
|
|
},
|
|
)
|
|
|
|
def delete_part(self, part_id: int) -> None:
|
|
"""Delete a part by id."""
|
|
self._execute("DELETE FROM parts WHERE id_part = :id", {"id": part_id})
|
|
|
|
# ==================================================================
|
|
# Admin — Cross-references
|
|
# ==================================================================
|
|
|
|
def create_crossref(self, data: dict) -> Optional[int]:
|
|
"""Insert a new cross-reference and return its id."""
|
|
return self._execute(
|
|
"""
|
|
INSERT INTO part_cross_references
|
|
(part_id, cross_reference_number, id_ref_type, source_ref, notes)
|
|
VALUES (:part_id, :cross_reference_number,
|
|
(SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type),
|
|
:source, :notes)
|
|
RETURNING id_part_cross_ref
|
|
""",
|
|
{
|
|
"part_id": data["part_id"],
|
|
"cross_reference_number": data["cross_reference_number"],
|
|
"reference_type": data.get("reference_type"),
|
|
"source": data.get("source"),
|
|
"notes": data.get("notes"),
|
|
},
|
|
)
|
|
|
|
def update_crossref(self, xref_id: int, data: dict) -> None:
|
|
"""Update an existing cross-reference."""
|
|
self._execute(
|
|
"""
|
|
UPDATE part_cross_references
|
|
SET part_id = :part_id, cross_reference_number = :cross_reference_number,
|
|
id_ref_type = (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :reference_type),
|
|
source_ref = :source, notes = :notes
|
|
WHERE id_part_cross_ref = :xref_id
|
|
""",
|
|
{
|
|
"part_id": data["part_id"],
|
|
"cross_reference_number": data["cross_reference_number"],
|
|
"reference_type": data.get("reference_type"),
|
|
"source": data.get("source"),
|
|
"notes": data.get("notes"),
|
|
"xref_id": xref_id,
|
|
},
|
|
)
|
|
|
|
def delete_crossref(self, xref_id: int) -> None:
|
|
"""Delete a cross-reference by id."""
|
|
self._execute(
|
|
"DELETE FROM part_cross_references WHERE id_part_cross_ref = :id", {"id": xref_id}
|
|
)
|
|
|
|
def get_crossrefs_paginated(
|
|
self, page: int = 1, per_page: int = 15
|
|
) -> list[dict]:
|
|
"""Return paginated cross-references with part info."""
|
|
per_page = min(per_page, 100)
|
|
offset = (page - 1) * per_page
|
|
return self._query(
|
|
"""
|
|
SELECT
|
|
pcr.id_part_cross_ref AS id,
|
|
pcr.part_id,
|
|
pcr.cross_reference_number,
|
|
rt.name_ref_type AS reference_type,
|
|
pcr.source_ref AS source,
|
|
pcr.notes,
|
|
p.oem_part_number,
|
|
p.name_part AS part_name
|
|
FROM part_cross_references pcr
|
|
JOIN parts p ON pcr.part_id = p.id_part
|
|
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
|
|
ORDER BY pcr.id_part_cross_ref
|
|
LIMIT :limit OFFSET :offset
|
|
""",
|
|
{"limit": per_page, "offset": offset},
|
|
)
|