Files
Autoparts-DB/dashboard/server.py
consultoria-as 7b2a904498 feat: migrate to PostgreSQL + SQLAlchemy ORM, rebrand to Nexus Autoparts
- Migrate from SQLite to PostgreSQL with normalized schema
- Add 11 lookup tables (fuel_type, body_type, drivetrain, transmission,
  materials, position_part, manufacture_type, quality_tier, countries,
  reference_type, shapes)
- Rewrite dashboard/server.py (76 routes) using SQLAlchemy text() queries
- Rewrite console/db.py (27 methods) using SQLAlchemy ORM
- Add models.py with 27 SQLAlchemy model definitions
- Add config.py for centralized DB_URL configuration
- Add migrate_to_postgres.py migration script
- Add docs/METABASE_GUIDE.md with complete data entry guide
- Rebrand from "AUTOPARTS DB" to "NEXUS AUTOPARTS"
- Fill vehicle data gaps via NHTSA API + heuristics:
  engines (cylinders, power, torque), brands (country, founded_year),
  models (body_type, production years), MYE (drivetrain, transmission, trim)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 05:24:47 +00:00

2328 lines
111 KiB
Python

from flask import Flask, jsonify, request, send_from_directory
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import os
import sys
import json as json_module
import re
import base64
import uuid
import urllib.request
from datetime import datetime, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
from config import DB_URL
app = Flask(__name__, static_folder='.')
engine = create_engine(DB_URL, pool_pre_ping=True, pool_size=5, max_overflow=10)
Session = sessionmaker(bind=engine)
# ============================================================================
# Helper Functions
# ============================================================================
def get_all_brands(detailed=False):
session = Session()
try:
if detailed:
sql = text("""
SELECT b.name_brand AS name,
COUNT(DISTINCT m.name_model) AS model_count,
COUNT(DISTINCT mye.id_mye) AS vehicle_count
FROM brands b
JOIN models m ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
GROUP BY b.name_brand ORDER BY b.name_brand LIMIT 500
""")
rows = session.execute(sql).mappings().all()
return [{'name': r['name'], 'model_count': r['model_count'],
'vehicle_count': r['vehicle_count']} for r in rows]
else:
sql = text("""
SELECT DISTINCT b.name_brand AS name
FROM brands b
JOIN models m ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
ORDER BY b.name_brand LIMIT 500
""")
rows = session.execute(sql).mappings().all()
return [r['name'] for r in rows]
finally:
session.close()
def get_all_years():
session = Session()
try:
rows = session.execute(text(
"SELECT DISTINCT year_car AS year FROM years ORDER BY year_car DESC LIMIT 200"
)).mappings().all()
return [r['year'] for r in rows]
finally:
session.close()
def get_all_engines():
session = Session()
try:
rows = session.execute(text(
"SELECT DISTINCT name_engine AS name FROM engines ORDER BY name_engine LIMIT 5000"
)).mappings().all()
return [r['name'] for r in rows]
finally:
session.close()
def get_models_by_brand(brand_name=None):
session = Session()
try:
if brand_name:
sql = text("""
SELECT DISTINCT m.name_model AS name
FROM models m
JOIN brands b ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
WHERE b.name_brand ILIKE :brand
ORDER BY m.name_model LIMIT 1000
""")
rows = session.execute(sql, {'brand': brand_name}).mappings().all()
else:
sql = text("""
SELECT DISTINCT m.name_model AS name
FROM models m
JOIN model_year_engine mye ON mye.model_id = m.id_model
ORDER BY m.name_model LIMIT 1000
""")
rows = session.execute(sql).mappings().all()
return [r['name'] for r in rows]
finally:
session.close()
def search_vehicles(brand=None, model=None, year=None, engine_name=None, with_parts=False, page=1, per_page=50):
session = Session()
try:
per_page = min(per_page, 100)
offset = (page - 1) * per_page
base_from = """
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
LEFT JOIN fuel_type ft ON e.id_fuel = ft.id_fuel
LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain
LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission
"""
if with_parts:
base_from += " JOIN (SELECT DISTINCT model_year_engine_id FROM vehicle_parts) AS has_parts ON mye.id_mye = has_parts.model_year_engine_id"
where = " WHERE 1=1"
params = {}
if brand:
where += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
where += " AND m.name_model ILIKE :model"
params['model'] = model
if year:
where += " AND y.year_car = :year"
params['year'] = int(year)
if engine_name:
where += " AND e.name_engine = :engine"
params['engine'] = engine_name
count_sql = text("SELECT COUNT(*) AS total " + base_from + where)
total_count = session.execute(count_sql, params).mappings().first()['total']
data_params = dict(params)
data_params['limit'] = per_page
data_params['offset'] = offset
query = text("""
SELECT b.name_brand AS brand, m.name_model AS model, y.year_car AS year,
e.name_engine AS engine, e.power_hp, e.torque_nm, e.displacement_cc,
e.cylinders, ft.name_fuel AS fuel_type, mye.trim_level,
dt.name_drivetrain AS drivetrain, tr.name_transmission AS transmission
""" + base_from + where + " ORDER BY b.name_brand, m.name_model, y.year_car LIMIT :limit OFFSET :offset")
rows = session.execute(query, data_params).mappings().all()
vehicles = []
for r in rows:
vehicles.append({
'brand': r['brand'], 'model': r['model'], 'year': r['year'],
'engine': r['engine'], 'power_hp': r['power_hp'] or 0,
'torque_nm': r['torque_nm'] or 0, 'displacement_cc': r['displacement_cc'] or 0,
'cylinders': r['cylinders'] or 0, 'fuel_type': r['fuel_type'] or 'unknown',
'trim_level': r['trim_level'] or 'unknown',
'drivetrain': r['drivetrain'] or 'unknown',
'transmission': r['transmission'] or 'unknown'
})
total_pages = (total_count + per_page - 1) // per_page
return {'data': vehicles, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}}
finally:
session.close()
# ============================================================================
# Static Routes
# ============================================================================
@app.route('/')
def index():
return send_from_directory('.', 'index.html')
@app.route('/admin')
def admin_page():
return send_from_directory('.', 'admin.html')
@app.route('/landing')
def landing_page():
return send_from_directory('.', 'customer-landing.html')
@app.route('/diagramas')
def diagrams_page():
return send_from_directory('.', 'diagrams.html')
@app.route('/index.html')
def index_html():
return send_from_directory('.', 'index.html')
@app.route('/admin.html')
def admin_html():
return send_from_directory('.', 'admin.html')
@app.route('/customer-landing.html')
def customer_landing_html():
return send_from_directory('.', 'customer-landing.html')
@app.route('/diagrams.html')
def diagrams_html():
return send_from_directory('.', 'diagrams.html')
@app.route('/static/<path:path>')
def static_files(path):
return send_from_directory('static', path)
@app.route('/shared.css')
def shared_css():
return send_from_directory('.', 'shared.css')
@app.route('/nav.js')
def nav_js():
return send_from_directory('.', 'nav.js')
@app.route('/dashboard.js')
def dashboard_js():
return send_from_directory('.', 'dashboard.js')
@app.route('/admin.js')
def admin_js():
return send_from_directory('.', 'admin.js')
@app.route('/enhanced-search.js')
def enhanced_search_js():
return send_from_directory('.', 'enhanced-search.js')
# ============================================================================
# Core API Endpoints
# ============================================================================
@app.route('/api/brands')
def api_brands():
detailed = request.args.get('detailed', 'false').lower() == 'true'
return jsonify(get_all_brands(detailed=detailed))
@app.route('/api/years')
def api_years():
brand = request.args.get('brand')
model = request.args.get('model')
session = Session()
try:
q = """SELECT DISTINCT y.year_car AS year FROM years y
JOIN model_year_engine mye ON y.id_year = mye.year_id
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand WHERE 1=1"""
params = {}
if brand:
q += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
q += " AND m.name_model ILIKE :model"
params['model'] = model
q += " ORDER BY y.year_car DESC"
rows = session.execute(text(q), params).mappings().all()
return jsonify([r['year'] for r in rows])
finally:
session.close()
@app.route('/api/engines')
def api_engines():
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
session = Session()
try:
q = """SELECT DISTINCT e.name_engine AS name FROM engines e
JOIN model_year_engine mye ON e.id_engine = mye.engine_id
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year WHERE 1=1"""
params = {}
if brand:
q += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
q += " AND m.name_model ILIKE :model"
params['model'] = model
if year:
q += " AND y.year_car = :year"
params['year'] = int(year)
q += " ORDER BY e.name_engine"
rows = session.execute(text(q), params).mappings().all()
return jsonify([r['name'] for r in rows])
finally:
session.close()
@app.route('/api/models')
def api_models():
brand = request.args.get('brand')
detailed = request.args.get('detailed', 'false').lower() == 'true'
if detailed and brand:
session = Session()
try:
sql = text("""
SELECT m.name_model AS name, MIN(y.year_car) AS year_min,
MAX(y.year_car) AS year_max, COUNT(DISTINCT y.year_car) AS year_count,
COUNT(DISTINCT mye.id_mye) AS vehicle_count,
COUNT(DISTINCT e.name_engine) AS engine_count
FROM models m
JOIN brands b ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
WHERE b.name_brand ILIKE :brand
GROUP BY m.name_model ORDER BY m.name_model LIMIT 1000
""")
rows = session.execute(sql, {'brand': brand}).mappings().all()
return jsonify([{'name': r['name'], 'year_min': r['year_min'], 'year_max': r['year_max'],
'year_count': r['year_count'], 'vehicle_count': r['vehicle_count'],
'engine_count': r['engine_count']} for r in rows])
finally:
session.close()
return jsonify(get_models_by_brand(brand))
@app.route('/api/vehicles')
def api_vehicles():
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
eng = request.args.get('engine')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
return jsonify(search_vehicles(brand, model, year, eng, page=page, per_page=per_page))
@app.route('/api/model-year-engine')
def api_model_year_engine():
try:
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year', type=int)
with_parts = request.args.get('with_parts', 'false').lower() == 'true'
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
session = Session()
try:
base_from = """
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain
LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission
"""
if with_parts:
base_from += " JOIN (SELECT DISTINCT model_year_engine_id FROM vehicle_parts) AS has_parts ON mye.id_mye = has_parts.model_year_engine_id"
where = " WHERE 1=1"
params = {}
if brand:
where += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
where += " AND m.name_model ILIKE :model"
params['model'] = model
if year:
where += " AND y.year_car = :year"
params['year'] = year
total_count = session.execute(text("SELECT COUNT(*) AS total " + base_from + where), params).mappings().first()['total']
data_params = dict(params)
data_params['limit'] = per_page
data_params['offset'] = offset
query = text("""
SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model,
y.year_car AS year, e.name_engine AS engine, mye.trim_level,
dt.name_drivetrain AS drivetrain, tr.name_transmission AS transmission
""" + base_from + where + " ORDER BY b.name_brand, m.name_model, y.year_car, e.name_engine LIMIT :limit OFFSET :offset")
rows = session.execute(query, data_params).mappings().all()
records = [{'id': r['id'], 'brand': r['brand'], 'model': r['model'], 'year': r['year'],
'engine': r['engine'], 'trim_level': r['trim_level'],
'drivetrain': r['drivetrain'], 'transmission': r['transmission']} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': records, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
finally:
session.close()
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Parts Catalog Endpoints
# ============================================================================
@app.route('/api/categories')
def api_categories():
session = Session()
try:
rows = session.execute(text("""
SELECT id_part_category AS id, name_part_category AS name, name_es, slug, icon_name, display_order, parent_id
FROM part_categories ORDER BY display_order, name_part_category LIMIT 50
""")).mappings().all()
categories_dict = {}
root_categories = []
for r in rows:
cat = {'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'slug': r['slug'],
'icon_name': r['icon_name'], 'display_order': r['display_order'], 'children': []}
categories_dict[r['id']] = cat
if r['parent_id'] is None:
root_categories.append(cat)
for r in rows:
if r['parent_id'] is not None and r['parent_id'] in categories_dict:
categories_dict[r['parent_id']]['children'].append(categories_dict[r['id']])
return jsonify(root_categories)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/categories/<int:category_id>/groups')
def api_category_groups(category_id):
session = Session()
try:
rows = session.execute(text("""
SELECT id_part_group AS id, name_part_group AS name, name_es, slug, display_order
FROM part_groups WHERE category_id = :cid ORDER BY display_order, name_part_group LIMIT 200
"""), {'cid': category_id}).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'slug': r['slug'], 'display_order': r['display_order']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/parts')
def api_parts():
try:
group_id = request.args.get('group_id', type=int)
category_id = request.args.get('category_id', type=int)
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
session = Session()
try:
where = " WHERE 1=1"
params = {}
if group_id:
where += " AND p.group_id = :group_id"
params['group_id'] = group_id
if category_id:
where += " AND pg.category_id = :category_id"
params['category_id'] = category_id
if search:
where += " AND (p.name_part ILIKE :search OR p.name_es ILIKE :search OR p.oem_part_number ILIKE :search)"
params['search'] = '%' + search + '%'
base = """
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
"""
total_count = session.execute(text("SELECT COUNT(*) AS total " + base + where), params).mappings().first()['total']
data_params = dict(params)
data_params['limit'] = per_page
data_params['offset'] = offset
rows = session.execute(text("""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
p.group_id, pg.name_part_group AS group_name, pc.name_part_category AS category_name
""" + base + where + " ORDER BY p.name_part LIMIT :limit OFFSET :offset"), data_params).mappings().all()
parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'],
'name_es': r['name_es'], 'group_id': r['group_id'], 'group_name': r['group_name'],
'category_name': r['category_name'], 'image_url': None} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
finally:
session.close()
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>')
def api_part_detail(part_id):
session = Session()
try:
row = session.execute(text("""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
p.description, p.description_es, p.group_id,
pg.name_part_group AS group_name, pg.name_es AS group_name_es,
pc.id_part_category AS category_id, pc.name_part_category AS category_name,
pc.name_es AS category_name_es
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE p.id_part = :pid
"""), {'pid': part_id}).mappings().first()
if row is None:
return jsonify({'error': 'Part not found'}), 404
return jsonify({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'],
'name_es': row['name_es'], 'description': row['description'],
'description_es': row['description_es'], 'group_id': row['group_id'],
'group_name': row['group_name'], 'image_url': None,
'group_name_es': row['group_name_es'], 'category_id': row['category_id'],
'category_name': row['category_name'], 'category_name_es': row['category_name_es']})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vehicles/<int:mye_id>/categories')
def api_vehicle_categories(mye_id):
session = Session()
try:
rows = session.execute(text("""
SELECT DISTINCT pc.id_part_category AS id, pc.name_part_category AS name,
pc.name_es, pc.slug, pc.icon_name, pc.display_order
FROM part_categories pc
JOIN part_groups pg ON pg.category_id = pc.id_part_category
JOIN parts p ON p.group_id = pg.id_part_group
JOIN vehicle_parts vp ON vp.part_id = p.id_part
WHERE vp.model_year_engine_id = :mye_id
ORDER BY pc.display_order, pc.name_part_category LIMIT 50
"""), {'mye_id': mye_id}).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'slug': r['slug'], 'icon_name': r['icon_name'],
'display_order': r['display_order']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vehicles/<int:mye_id>/groups')
def api_vehicle_groups(mye_id):
session = Session()
try:
category_id = request.args.get('category_id', type=int)
q = """
SELECT DISTINCT pg.id_part_group AS id, pg.name_part_group AS name, pg.name_es,
pg.slug, pg.display_order, COUNT(DISTINCT p.id_part) AS parts_count
FROM part_groups pg
JOIN parts p ON p.group_id = pg.id_part_group
JOIN vehicle_parts vp ON vp.part_id = p.id_part
WHERE vp.model_year_engine_id = :mye_id
"""
params = {'mye_id': mye_id}
if category_id:
q += " AND pg.category_id = :cid"
params['cid'] = category_id
q += " GROUP BY pg.id_part_group, pg.name_part_group, pg.name_es, pg.slug, pg.display_order ORDER BY pg.display_order, pg.name_part_group LIMIT 200"
rows = session.execute(text(q), params).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'slug': r['slug'], 'display_order': r['display_order'],
'parts_count': r['parts_count']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vehicles/<int:mye_id>/parts')
def api_vehicle_parts(mye_id):
try:
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
session = Session()
try:
base = """
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
WHERE vp.model_year_engine_id = :mye_id
"""
params = {'mye_id': mye_id}
if category_id:
base += " AND pc.id_part_category = :cid"
params['cid'] = category_id
if group_id:
base += " AND pg.id_part_group = :gid"
params['gid'] = group_id
total_count = session.execute(text("SELECT COUNT(*) AS total " + base), params).mappings().first()['total']
data_params = dict(params)
data_params['limit'] = per_page
data_params['offset'] = offset
rows = session.execute(text("""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
vp.quantity_required, pp.name_position_part AS position,
pc.name_part_category AS category_name, pg.name_part_group AS group_name
""" + base + " ORDER BY pc.display_order, pg.display_order, p.name_part LIMIT :limit OFFSET :offset"), data_params).mappings().all()
parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'],
'name_es': r['name_es'], 'quantity_required': r['quantity_required'],
'position': r['position'], 'category_name': r['category_name'],
'group_name': r['group_name']} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
finally:
session.close()
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Cross-References and Aftermarket Endpoints
# ============================================================================
@app.route('/api/manufacturers')
def api_manufacturers():
session = Session()
try:
manufacturer_type = request.args.get('type')
quality_tier = request.args.get('quality_tier')
q = """
SELECT mfr.id_manufacture AS id, mfr.name_manufacture AS name,
mt.name_type_manu AS type, qt.name_quality AS quality_tier,
co.name_country AS country, mfr.logo_url, mfr.website
FROM manufacturers mfr
LEFT JOIN manufacture_type mt ON mfr.id_type_manu = mt.id_type_manu
LEFT JOIN quality_tier qt ON mfr.id_quality_tier = qt.id_quality_tier
LEFT JOIN countries co ON mfr.id_country = co.id_country
WHERE 1=1
"""
params = {}
if manufacturer_type:
q += " AND mt.name_type_manu ILIKE :type"
params['type'] = manufacturer_type
if quality_tier:
q += " AND qt.name_quality ILIKE :qt"
params['qt'] = quality_tier
q += " ORDER BY mfr.name_manufacture LIMIT 200"
rows = session.execute(text(q), params).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'type': r['type'],
'quality_tier': r['quality_tier'], 'country': r['country'],
'logo_url': r['logo_url'], 'website': r['website']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/parts/<int:part_id>/alternatives')
def api_part_alternatives(part_id):
session = Session()
try:
quality_tier = request.args.get('quality_tier')
manufacturer_id = request.args.get('manufacturer_id', type=int)
q = """
SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name,
ap.name_es, mfr.name_manufacture AS manufacturer_name, ap.manufacturer_id,
qt.name_quality AS quality_tier, ap.price_usd, ap.warranty_months
FROM aftermarket_parts ap
JOIN manufacturers mfr ON ap.manufacturer_id = mfr.id_manufacture
LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier
WHERE ap.oem_part_id = :pid
"""
params = {'pid': part_id}
if quality_tier:
q += " AND qt.name_quality ILIKE :qt"
params['qt'] = quality_tier
if manufacturer_id:
q += " AND ap.manufacturer_id = :mid"
params['mid'] = manufacturer_id
q += " ORDER BY qt.name_quality DESC, ap.price_usd ASC LIMIT 50"
rows = session.execute(text(q), params).mappings().all()
return jsonify([{'id': r['id'], 'part_number': r['part_number'], 'name': r['name'],
'name_es': r['name_es'], 'manufacturer_name': r['manufacturer_name'],
'manufacturer_id': r['manufacturer_id'], 'quality_tier': r['quality_tier'],
'price_usd': r['price_usd'], 'warranty_months': r['warranty_months'],
'in_stock': None} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/parts/<int:part_id>/cross-references')
def api_part_cross_references(part_id):
session = Session()
try:
rows = session.execute(text("""
SELECT pcr.id_part_cross_ref AS id, pcr.cross_reference_number,
rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes
FROM part_cross_references pcr
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
WHERE pcr.part_id = :pid
ORDER BY rt.name_ref_type, pcr.cross_reference_number LIMIT 100
"""), {'pid': part_id}).mappings().all()
return jsonify([{'id': r['id'], 'cross_reference_number': r['cross_reference_number'],
'reference_type': r['reference_type'], 'source': r['source'],
'notes': r['notes']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/search/part-number/<part_number>')
def api_search_part_number(part_number):
session = Session()
try:
results = []
st = '%' + part_number + '%'
for row in session.execute(text("SELECT id_part AS id, oem_part_number, name_part AS name, name_es FROM parts WHERE oem_part_number ILIKE :s"), {'s': st}).mappings().all():
results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'oem', 'matched_number': row['oem_part_number']})
for row in session.execute(text("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, ap.part_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part WHERE ap.part_number ILIKE :s"), {'s': st}).mappings().all():
results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'aftermarket', 'matched_number': row['part_number']})
for row in session.execute(text("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pcr.cross_reference_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part WHERE pcr.cross_reference_number ILIKE :s"), {'s': st}).mappings().all():
results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'cross_reference', 'matched_number': row['cross_reference_number']})
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/aftermarket')
def api_aftermarket_parts():
session = Session()
try:
manufacturer_id = request.args.get('manufacturer_id', type=int)
quality_tier = request.args.get('quality_tier')
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
where = " WHERE 1=1"
params = {}
if manufacturer_id:
where += " AND ap.manufacturer_id = :mid"
params['mid'] = manufacturer_id
if quality_tier:
where += " AND qt.name_quality ILIKE :qt"
params['qt'] = quality_tier
if search:
where += " AND (ap.name_aftermarket_parts ILIKE :s OR ap.part_number ILIKE :s OR p.oem_part_number ILIKE :s)"
params['s'] = '%' + search + '%'
base = """
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture
LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier
"""
total_count = session.execute(text("SELECT COUNT(*) AS total " + base + where), params).mappings().first()['total']
data_params = dict(params)
data_params['limit'] = per_page
data_params['offset'] = offset
rows = session.execute(text("""
SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name,
p.oem_part_number, m.name_manufacture AS manufacturer_name,
qt.name_quality AS quality_tier, ap.price_usd
""" + base + where + " ORDER BY ap.name_aftermarket_parts LIMIT :limit OFFSET :offset"), data_params).mappings().all()
parts = [{'id': r['id'], 'part_number': r['part_number'], 'name': r['name'],
'oem_part_number': r['oem_part_number'], 'manufacturer_name': r['manufacturer_name'],
'quality_tier': r['quality_tier'], 'price_usd': r['price_usd']} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Diagram Endpoints
# ============================================================================
@app.route('/api/diagrams')
def api_diagrams():
session = Session()
try:
group_id = request.args.get('group_id', type=int)
q = """
SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id,
pg.name_part_group AS group_name, d.thumbnail_path, d.display_order
FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id_part_group WHERE 1=1
"""
params = {}
if group_id:
q += " AND d.group_id = :gid"
params['gid'] = group_id
q += " ORDER BY d.display_order, d.name_diagram LIMIT 200"
rows = session.execute(text(q), params).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'group_id': r['group_id'], 'group_name': r['group_name'],
'thumbnail_path': r['thumbnail_path'], 'display_order': r['display_order']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/diagrams/<int:diagram_id>')
def api_diagram_detail(diagram_id):
session = Session()
try:
row = session.execute(text("""
SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id,
pg.name_part_group AS group_name, d.image_path
FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id_part_group
WHERE d.id_diagram = :did
"""), {'did': diagram_id}).mappings().first()
if row is None:
return jsonify({'error': 'Diagram not found'}), 404
image_path = row['image_path'] or ''
image_url = '/' + image_path if image_path and not image_path.startswith('/') else image_path
diagram = {'id': row['id'], 'name': row['name'], 'name_es': row['name_es'],
'group_id': row['group_id'], 'group_name': row['group_name'],
'image_path': image_path, 'image_url': image_url,
'svg_content': None, 'width': None, 'height': None, 'hotspots': []}
hotspot_rows = session.execute(text("""
SELECT h.id_dgr_hotspot AS id, h.part_id, h.callout_number,
sh.name_shape AS shape, h.coords,
p.name_part AS part_name, p.oem_part_number AS part_number
FROM diagram_hotspots h
LEFT JOIN parts p ON h.part_id = p.id_part
LEFT JOIN shapes sh ON h.id_shape = sh.id_shape
WHERE h.diagram_id = :did ORDER BY h.callout_number
"""), {'did': diagram_id}).mappings().all()
for hr in hotspot_rows:
diagram['hotspots'].append({'id': hr['id'], 'part_id': hr['part_id'],
'callout_number': hr['callout_number'], 'label': None, 'shape': hr['shape'],
'coords': hr['coords'], 'color': None, 'part_name': hr['part_name'],
'part_number': hr['part_number']})
return jsonify(diagram)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/diagrams/<int:diagram_id>/hotspots')
def api_diagram_hotspots(diagram_id):
session = Session()
try:
rows = session.execute(text("""
SELECT h.id_dgr_hotspot AS id, h.part_id, h.callout_number,
sh.name_shape AS shape, h.coords,
p.name_part AS part_name, p.oem_part_number AS part_number
FROM diagram_hotspots h
LEFT JOIN parts p ON h.part_id = p.id_part
LEFT JOIN shapes sh ON h.id_shape = sh.id_shape
WHERE h.diagram_id = :did ORDER BY h.callout_number LIMIT 500
"""), {'did': diagram_id}).mappings().all()
return jsonify([{'id': r['id'], 'part_id': r['part_id'], 'callout_number': r['callout_number'],
'label': None, 'shape': r['shape'], 'coords': r['coords'], 'color': None,
'part_name': r['part_name'], 'part_number': r['part_number']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/groups/<int:group_id>/diagrams')
def api_group_diagrams(group_id):
session = Session()
try:
rows = session.execute(text("""
SELECT id_diagram AS id, name_diagram AS name, name_es, thumbnail_path, display_order
FROM diagrams WHERE group_id = :gid ORDER BY display_order, name_diagram LIMIT 100
"""), {'gid': group_id}).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'thumbnail_path': r['thumbnail_path'], 'display_order': r['display_order']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vehicles/<int:mye_id>/diagrams')
def api_vehicle_diagrams(mye_id):
session = Session()
try:
rows = session.execute(text("""
SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es,
d.group_id, d.image_path, pg.name_part_group AS group_name,
pc.id_part_category AS category_id, pc.name_part_category AS category_name,
d.thumbnail_path, vd.notes
FROM vehicle_diagrams vd
JOIN diagrams d ON vd.diagram_id = d.id_diagram
JOIN part_groups pg ON d.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE vd.model_year_engine_id = :mye_id
ORDER BY pc.display_order, pg.display_order, d.display_order LIMIT 200
"""), {'mye_id': mye_id}).mappings().all()
diagrams = []
for r in rows:
ip = r['image_path'] or ''
iu = '/' + ip if ip and not ip.startswith('/') else ip
diagrams.append({'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'group_id': r['group_id'], 'group_name': r['group_name'],
'category_id': r['category_id'], 'category_name': r['category_name'],
'image_url': iu, 'thumbnail_path': r['thumbnail_path'], 'notes': r['notes']})
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/diagrams/<int:diagram_id>/parts')
def api_diagram_parts(diagram_id):
session = Session()
try:
mye_id = request.args.get('mye_id', type=int)
q = """
SELECT DISTINCT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
p.description, pg.id_part_group AS group_id, pg.name_part_group AS group_name,
pg.name_es AS group_name_es
FROM vehicle_diagrams vd
JOIN vehicle_parts vp ON vp.model_year_engine_id = vd.model_year_engine_id
JOIN parts p ON vp.part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE vd.diagram_id = :did AND pc.id_part_category IN (10, 11)
"""
params = {'did': diagram_id}
if mye_id:
q += " AND vd.model_year_engine_id = :mye_id"
params['mye_id'] = mye_id
q += " ORDER BY pg.name_part_group, p.oem_part_number LIMIT 200"
rows = session.execute(text(q), params).mappings().all()
# Batch cross-refs (N+1 fix)
xrefs_map = {}
if rows:
part_ids = list(set(r['id'] for r in rows))
in_params = {}
in_pl = []
for i, pid in enumerate(part_ids):
in_params[f'pid_{i}'] = pid
in_pl.append(f':pid_{i}')
xref_rows = session.execute(text(f"""
SELECT part_id, cross_reference_number, source_ref AS source
FROM part_cross_references WHERE part_id IN ({', '.join(in_pl)})
"""), in_params).mappings().all()
for xr in xref_rows:
xrefs_map.setdefault(xr['part_id'], []).append({'number': xr['cross_reference_number'], 'source': xr['source']})
return jsonify([{'id': r['id'], 'part_number': r['oem_part_number'], 'name': r['name'],
'name_es': r['name_es'], 'description': r['description'],
'group_name': r['group_name'], 'group_name_es': r['group_name_es'],
'cross_references': xrefs_map.get(r['id'], [])} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/diagrams/search')
def api_diagrams_search():
session = Session()
try:
q = request.args.get('q', '').strip()
brand = request.args.get('brand', '').strip()
model = request.args.get('model', '').strip()
if q:
rows = session.execute(text("""
SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es,
d.image_path, d.source_diagram AS source
FROM diagrams d WHERE d.name_diagram ILIKE :q OR d.name_es ILIKE :q
ORDER BY d.name_diagram LIMIT 50
"""), {'q': '%' + q + '%'}).mappings().all()
elif brand or model:
sql = """
SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es,
d.image_path, d.source_diagram AS source
FROM diagrams d
JOIN vehicle_diagrams vd ON vd.diagram_id = d.id_diagram
JOIN model_year_engine mye ON vd.model_year_engine_id = mye.id_mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand WHERE 1=1
"""
params = {}
if brand:
sql += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
sql += " AND m.name_model ILIKE :model"
params['model'] = model
sql += " ORDER BY d.name_diagram LIMIT 50"
rows = session.execute(text(sql), params).mappings().all()
else:
rows = session.execute(text("""
SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es,
d.image_path, d.source_diagram AS source
FROM diagrams d WHERE d.source_diagram = 'MOOG Catalog'
ORDER BY d.name_diagram LIMIT 50
""")).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'image_path': r['image_path'], 'source': r['source']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/hotspots/<int:hotspot_id>')
def api_hotspot_detail(hotspot_id):
session = Session()
try:
row = session.execute(text("""
SELECT h.id_dgr_hotspot AS id, h.diagram_id, h.part_id, h.callout_number,
sh.name_shape AS shape, h.coords
FROM diagram_hotspots h
LEFT JOIN shapes sh ON h.id_shape = sh.id_shape
WHERE h.id_dgr_hotspot = :hid
"""), {'hid': hotspot_id}).mappings().first()
if row is None:
return jsonify({'error': 'Hotspot not found'}), 404
hotspot = {'id': row['id'], 'diagram_id': row['diagram_id'], 'part_id': row['part_id'],
'callout_number': row['callout_number'], 'label': None, 'shape': row['shape'],
'coords': row['coords'], 'color': None, 'part': None}
if row['part_id']:
pr = session.execute(text("""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
pg.name_part_group AS group_name, pc.name_part_category AS category_name
FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE p.id_part = :pid
"""), {'pid': row['part_id']}).mappings().first()
if pr:
hotspot['part'] = {'id': pr['id'], 'oem_part_number': pr['oem_part_number'],
'name': pr['name'], 'name_es': pr['name_es'],
'group_name': pr['group_name'], 'category_name': pr['category_name']}
return jsonify(hotspot)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Search and VIN Endpoints
# ============================================================================
def validate_vin(vin):
if not vin or len(vin) != 17:
return False
return bool(re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE).match(vin))
def find_vehicle_in_terms(session, terms):
if len(terms) < 2:
return None, terms
year_terms = []
other_terms = []
for t in terms:
if t.isdigit() and 1980 <= int(t) <= 2030:
year_terms.append(t)
else:
other_terms.append(t)
if not other_terms:
return None, terms
best_match = None
used_terms = []
for num_terms in range(min(3, len(other_terms)), 0, -1):
if best_match:
break
for i in range(len(other_terms) - num_terms + 1):
test_terms = other_terms[i:i + num_terms]
if year_terms:
test_terms = test_terms + year_terms[:1]
where_clauses = []
params = {}
for idx, t in enumerate(test_terms):
tp = f"%{t}%"
where_clauses.append(f"(b.name_brand ILIKE :t{idx}_b OR m.name_model ILIKE :t{idx}_m OR CAST(y.year_car AS TEXT) ILIKE :t{idx}_y)")
params[f't{idx}_b'] = tp
params[f't{idx}_m'] = tp
params[f't{idx}_y'] = tp
where_sql = " AND ".join(where_clauses)
row = session.execute(text(f"""
SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model,
y.year_car AS year, e.name_engine AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
WHERE {where_sql} ORDER BY y.year_car DESC LIMIT 1
"""), params).mappings().first()
if row:
best_match = {'id': row['id'], 'brand': row['brand'], 'model': row['model'],
'year': row['year'], 'engine': row['engine']}
used_terms = test_terms
break
if best_match:
remaining = []
used_lower = [t.lower() for t in used_terms]
for t in terms:
if t.lower() not in used_lower:
remaining.append(t)
else:
used_lower.remove(t.lower())
return best_match, remaining
return None, terms
@app.route('/api/search')
def api_search():
session = Session()
try:
q = request.args.get('q', '').strip()
search_type = request.args.get('type', 'all')
category_id = request.args.get('category_id', type=int)
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not q:
return jsonify({'error': 'Search query is required'}), 400
results = {'parts': [], 'vehicles': [], 'vehicle_parts': [], 'matched_vehicle': None, 'total_count': 0}
terms = q.split()
# Combined vehicle + part search
if len(terms) >= 2 and search_type == 'all':
matched_vehicle, remaining_terms = find_vehicle_in_terms(session, terms)
if matched_vehicle and remaining_terms:
results['matched_vehicle'] = matched_vehicle
mv = matched_vehicle
vp_where_clauses = []
vp_params = {'mv_brand': mv['brand'], 'mv_model': mv['model'], 'mv_year': mv['year'], 'limit': limit}
for i, t in enumerate(remaining_terms):
tp = f"%{t}%"
vp_where_clauses.append(f"(p.name_part ILIKE :vp{i}_n OR p.name_es ILIKE :vp{i}_ne OR p.oem_part_number ILIKE :vp{i}_o OR pg.name_part_group ILIKE :vp{i}_g)")
vp_params[f'vp{i}_n'] = tp
vp_params[f'vp{i}_ne'] = tp
vp_params[f'vp{i}_o'] = tp
vp_params[f'vp{i}_g'] = tp
vp_where_sql = " AND ".join(vp_where_clauses)
rows = session.execute(text(f"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
pg.name_part_group AS group_name, pg.id_part_group AS group_id,
pc.name_part_category AS category_name, pc.id_part_category AS category_id,
vp.quantity_required, pp.name_position_part AS position
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
WHERE vp.model_year_engine_id IN (
SELECT mye.id_mye FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
WHERE b.name_brand ILIKE :mv_brand AND m.name_model ILIKE :mv_model AND y.year_car = :mv_year
) AND ({vp_where_sql}) ORDER BY p.name_part LIMIT :limit
"""), vp_params).mappings().all()
for r in rows:
results['vehicle_parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'],
'name': r['name'], 'name_es': r['name_es'], 'image_url': None,
'group_name': r['group_name'], 'group_id': r['group_id'],
'category_name': r['category_name'], 'category_id': r['category_id'],
'quantity': r['quantity_required'], 'position': r['position'], 'match_type': 'vehicle_part'})
if results['vehicle_parts']:
results['total_count'] = len(results['vehicle_parts'])
return jsonify(results)
# Search parts
if search_type in ('parts', 'all') and terms:
where_clauses = []
params = {}
for i, t in enumerate(terms):
tp = f"%{t}%"
where_clauses.append(f"(p.name_part ILIKE :p{i}_n OR p.name_es ILIKE :p{i}_ne OR p.oem_part_number ILIKE :p{i}_o OR pg.name_part_group ILIKE :p{i}_g OR pc.name_part_category ILIKE :p{i}_c)")
params[f'p{i}_n'] = tp
params[f'p{i}_ne'] = tp
params[f'p{i}_o'] = tp
params[f'p{i}_g'] = tp
params[f'p{i}_c'] = tp
where_sql = " AND ".join(where_clauses)
cat_filter = ""
if category_id:
cat_filter = " AND pc.id_part_category = :cat_id"
params['cat_id'] = category_id
params['first_term'] = f"{terms[0]}%"
params['limit'] = limit
params['offset'] = offset
rows = session.execute(text(f"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
pg.name_part_group AS group_name, pc.name_part_category AS category_name
FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE ({where_sql}){cat_filter}
ORDER BY CASE WHEN p.oem_part_number ILIKE :first_term THEN 1
WHEN p.name_part ILIKE :first_term THEN 2 ELSE 3 END, p.name_part
LIMIT :limit OFFSET :offset
"""), params).mappings().all()
for r in rows:
results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'],
'name': r['name'], 'name_es': r['name_es'], 'image_url': None,
'group_name': r['group_name'], 'category_name': r['category_name'], 'match_type': 'oem'})
# Aftermarket search
if not category_id:
af_params = {'af_limit': limit, 'af_offset': offset}
af_clauses = []
for i, t in enumerate(terms):
af_clauses.append(f"ap.part_number ILIKE :af{i}")
af_params[f'af{i}'] = f"%{t}%"
af_rows = session.execute(text(f"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
pg.name_part_group AS group_name, pc.name_part_category AS category_name,
ap.part_number AS matched_number
FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE {' AND '.join(af_clauses)} LIMIT :af_limit OFFSET :af_offset
"""), af_params).mappings().all()
for r in af_rows:
if not any(p['id'] == r['id'] for p in results['parts']):
results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'],
'name': r['name'], 'name_es': r['name_es'], 'image_url': None,
'group_name': r['group_name'], 'category_name': r['category_name'],
'matched_number': r['matched_number'], 'match_type': 'aftermarket'})
# Cross-reference search
cr_params = {'cr_limit': limit, 'cr_offset': offset}
cr_clauses = []
for i, t in enumerate(terms):
cr_clauses.append(f"pcr.cross_reference_number ILIKE :cr{i}")
cr_params[f'cr{i}'] = f"%{t}%"
cr_rows = session.execute(text(f"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
pg.name_part_group AS group_name, pc.name_part_category AS category_name,
pcr.cross_reference_number AS matched_number
FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE {' AND '.join(cr_clauses)} LIMIT :cr_limit OFFSET :cr_offset
"""), cr_params).mappings().all()
for r in cr_rows:
if not any(p['id'] == r['id'] for p in results['parts']):
results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'],
'name': r['name'], 'name_es': r['name_es'], 'image_url': None,
'group_name': r['group_name'], 'category_name': r['category_name'],
'matched_number': r['matched_number'], 'match_type': 'cross_reference'})
# Search vehicles
if search_type in ('vehicles', 'all') and terms:
v_params = {'v_limit': limit, 'v_offset': offset}
v_clauses = []
for i, t in enumerate(terms):
tp = f"%{t}%"
v_clauses.append(f"(b.name_brand ILIKE :v{i}_b OR m.name_model ILIKE :v{i}_m OR CAST(y.year_car AS TEXT) ILIKE :v{i}_y OR e.name_engine ILIKE :v{i}_e)")
v_params[f'v{i}_b'] = tp
v_params[f'v{i}_m'] = tp
v_params[f'v{i}_y'] = tp
v_params[f'v{i}_e'] = tp
v_rows = session.execute(text(f"""
SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model,
y.year_car AS year, e.name_engine AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine
WHERE {' AND '.join(v_clauses)}
ORDER BY y.year_car DESC, b.name_brand, m.name_model
LIMIT :v_limit OFFSET :v_offset
"""), v_params).mappings().all()
for r in v_rows:
results['vehicles'].append({'id': r['id'], 'brand': r['brand'], 'model': r['model'],
'year': r['year'], 'engine': r['engine']})
results['total_count'] = len(results['parts']) + len(results['vehicles'])
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/search/parts')
def api_search_parts():
session = Session()
try:
q = request.args.get('q', '').strip()
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
if not q:
return jsonify({'error': 'Search query is required'}), 400
filter_clause = ""
params = {'q': q, 'per_page': per_page, 'offset': offset}
if category_id:
filter_clause += " AND pg.category_id = :cid"
params['cid'] = category_id
if group_id:
filter_clause += " AND p.group_id = :gid"
params['gid'] = group_id
total_count = session.execute(text(f"""
SELECT COUNT(*) AS total FROM parts p
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE p.search_vector @@ plainto_tsquery('spanish', :q) {filter_clause}
"""), params).mappings().first()['total']
rows = session.execute(text(f"""
SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es,
p.description, pg.name_part_group AS group_name, pc.name_part_category AS category_name,
ts_rank(p.search_vector, plainto_tsquery('spanish', :q)) AS rank
FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE p.search_vector @@ plainto_tsquery('spanish', :q) {filter_clause}
ORDER BY rank DESC LIMIT :per_page OFFSET :offset
"""), params).mappings().all()
parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'],
'name_es': r['name_es'], 'description': r['description'],
'group_name': r['group_name'], 'category_name': r['category_name'],
'rank': float(r['rank'])} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vin/decode/<vin>')
def api_vin_decode(vin):
session = Session()
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400
cached_row = session.execute(text("""
SELECT vin, make, model, year, engine_info, body_class, drive_type,
model_year_engine_id, created_at, expires_at
FROM vin_cache WHERE vin = :vin AND expires_at > NOW()
"""), {'vin': vin}).mappings().first()
if cached_row:
engine_info_data = {}
if cached_row['engine_info']:
try:
engine_info_data = json_module.loads(cached_row['engine_info'])
except Exception:
engine_info_data = {'raw': cached_row['engine_info']}
cached_data = {'vin': cached_row['vin'], 'make': cached_row['make'],
'model': cached_row['model'], 'year': cached_row['year'],
'engine_info': engine_info_data, 'body_class': cached_row['body_class'],
'drive_type': cached_row['drive_type'], 'matched_vehicle': None, 'cached': True}
if cached_row['model_year_engine_id']:
mye_row = session.execute(text("""
SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model,
y.year_car AS year, e.name_engine AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine
WHERE mye.id_mye = :mye_id
"""), {'mye_id': cached_row['model_year_engine_id']}).mappings().first()
if mye_row:
cached_data['matched_vehicle'] = {'mye_id': mye_row['id'], 'brand': mye_row['brand'],
'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine']}
return jsonify(cached_data)
# Call NHTSA API
nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json'
try:
req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'NexusAutoparts/2.0'})
with urllib.request.urlopen(req, timeout=10) as response:
nhtsa_data = json_module.loads(response.read().decode('utf-8'))
except urllib.error.URLError as e:
return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503
except urllib.error.HTTPError as e:
return jsonify({'error': f'NHTSA API error: {e.code}'}), 502
except Exception as e:
return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500
nhtsa_results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])}
make = nhtsa_results.get('Make', '')
model = nhtsa_results.get('Model', '')
year_str = nhtsa_results.get('ModelYear', '')
year = int(year_str) if year_str and year_str.isdigit() else None
engine_config = nhtsa_results.get('EngineConfiguration', '')
cylinders_str = nhtsa_results.get('EngineCylinders', '')
cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None
displacement_str = nhtsa_results.get('DisplacementL', '')
displacement_l = float(displacement_str) if displacement_str else None
fuel_type = nhtsa_results.get('FuelTypePrimary', '')
body_class = nhtsa_results.get('BodyClass', '')
drive_type = nhtsa_results.get('DriveType', '')
matched_mye_id = None
matched_vehicle = None
if make and model and year:
mye_row = session.execute(text("""
SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model,
y.year_car AS year, e.name_engine AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine
WHERE b.name_brand ILIKE :make AND m.name_model ILIKE :model AND y.year_car = :year LIMIT 1
"""), {'make': make, 'model': model, 'year': year}).mappings().first()
if mye_row:
matched_mye_id = mye_row['id']
matched_vehicle = {'mye_id': mye_row['id'], 'brand': mye_row['brand'],
'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine']}
expires_at = datetime.now() + timedelta(days=30)
engine_info = json_module.dumps({'configuration': engine_config, 'cylinders': cylinders,
'displacement_l': displacement_l, 'fuel_type': fuel_type})
session.execute(text("""
INSERT INTO vin_cache (vin, decoded_data, make, model, year, engine_info,
body_class, drive_type, model_year_engine_id, expires_at)
VALUES (:vin, :decoded_data::jsonb, :make, :model, :year, :engine_info,
:body_class, :drive_type, :mye_id, :expires_at)
ON CONFLICT (vin) DO UPDATE SET
decoded_data = EXCLUDED.decoded_data, make = EXCLUDED.make, model = EXCLUDED.model,
year = EXCLUDED.year, engine_info = EXCLUDED.engine_info, body_class = EXCLUDED.body_class,
drive_type = EXCLUDED.drive_type, model_year_engine_id = EXCLUDED.model_year_engine_id,
expires_at = EXCLUDED.expires_at
"""), {'vin': vin, 'decoded_data': json_module.dumps(nhtsa_results), 'make': make,
'model': model, 'year': year, 'engine_info': engine_info, 'body_class': body_class,
'drive_type': drive_type, 'mye_id': matched_mye_id, 'expires_at': expires_at})
session.commit()
return jsonify({'vin': vin, 'make': make, 'model': model, 'year': year,
'engine_info': {'configuration': engine_config, 'cylinders': cylinders,
'displacement_l': displacement_l, 'fuel_type': fuel_type},
'body_class': body_class, 'drive_type': drive_type,
'matched_vehicle': matched_vehicle, 'cached': False})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vin/<vin>/parts')
def api_vin_parts(vin):
session = Session()
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400
category_id = request.args.get('category_id', type=int)
cached_row = session.execute(text("SELECT vin, make, model, year, model_year_engine_id FROM vin_cache WHERE vin = :vin"), {'vin': vin}).mappings().first()
if not cached_row:
return jsonify({'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/<vin>'}), 404
mye_id = cached_row['model_year_engine_id']
vehicle_info = {'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'],
'year': cached_row['year'], 'mye_id': mye_id}
if not mye_id:
return jsonify({'vin': vin, 'vehicle_info': vehicle_info, 'categories': [],
'message': 'No matching vehicle configuration found in database. Use /api/vin/<vin>/match to manually link.'})
params = {'mye_id': mye_id}
cat_filter = ""
if category_id:
cat_filter = " AND pc.id_part_category = :cid"
params['cid'] = category_id
rows = session.execute(text(f"""
SELECT pc.id_part_category AS category_id, pc.name_part_category AS category_name,
pc.name_es AS category_name_es, p.id_part AS part_id, p.oem_part_number,
p.name_part AS part_name, p.name_es AS part_name_es,
pg.name_part_group AS group_name, vp.quantity_required,
pp.name_position_part AS position
FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id_part
JOIN part_groups pg ON p.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
WHERE vp.model_year_engine_id = :mye_id {cat_filter}
ORDER BY pc.display_order, pg.display_order, p.name_part
"""), params).mappings().all()
categories_dict = {}
for r in rows:
cid = r['category_id']
if cid not in categories_dict:
categories_dict[cid] = {'id': cid, 'name': r['category_name'], 'name_es': r['category_name_es'], 'parts': []}
categories_dict[cid]['parts'].append({'id': r['part_id'], 'oem_part_number': r['oem_part_number'],
'name': r['part_name'], 'name_es': r['part_name_es'], 'group_name': r['group_name'],
'quantity_required': r['quantity_required'], 'position': r['position']})
return jsonify({'vin': vin, 'vehicle_info': vehicle_info, 'categories': list(categories_dict.values())})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/vin/<vin>/match')
def api_vin_match(vin):
session = Session()
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400
mye_id = request.args.get('mye_id', type=int)
if not mye_id:
return jsonify({'error': 'mye_id parameter is required'}), 400
mye_row = session.execute(text("SELECT id_mye FROM model_year_engine WHERE id_mye = :mid"), {'mid': mye_id}).mappings().first()
if not mye_row:
return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404
vin_row = session.execute(text("SELECT vin FROM vin_cache WHERE vin = :vin"), {'vin': vin}).mappings().first()
if vin_row:
session.execute(text("UPDATE vin_cache SET model_year_engine_id = :mid WHERE vin = :vin"), {'mid': mye_id, 'vin': vin})
else:
expires_at = datetime.now() + timedelta(days=30)
session.execute(text("""
INSERT INTO vin_cache (vin, decoded_data, model_year_engine_id, created_at, expires_at)
VALUES (:vin, '{}'::jsonb, :mid, NOW(), :expires_at)
"""), {'vin': vin, 'mid': mye_id, 'expires_at': expires_at})
session.commit()
return jsonify({'success': True, 'vin': vin, 'mye_id': mye_id})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Admin Endpoints
# ============================================================================
@app.route('/api/admin/stats')
def api_admin_stats():
session = Session()
try:
stats = {}
for table, key in [('part_categories', 'categories'), ('part_groups', 'groups'),
('parts', 'parts'), ('aftermarket_parts', 'aftermarket'),
('manufacturers', 'manufacturers'), ('vehicle_parts', 'fitment')]:
stats[key] = session.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar()
return jsonify(stats)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Categories CRUD ----
@app.route('/api/admin/categories', methods=['POST'])
def api_admin_create_category():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO part_categories (name_part_category, name_es, slug, icon_name, display_order, parent_id)
VALUES (:name, :name_es, :slug, :icon_name, :display_order, :parent_id)
RETURNING id_part_category
"""), {'name': data['name'], 'name_es': data.get('name_es'),
'slug': data.get('slug') or data['name'].lower().replace(' ', '-'),
'icon_name': data.get('icon_name'), 'display_order': data.get('display_order', 0),
'parent_id': data.get('parent_id')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Category created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/categories/<int:category_id>', methods=['PUT'])
def api_admin_update_category(category_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE part_categories SET name_part_category = :name, name_es = :name_es,
slug = :slug, icon_name = :icon_name, display_order = :display_order
WHERE id_part_category = :id
"""), {'name': data['name'], 'name_es': data.get('name_es'), 'slug': data.get('slug'),
'icon_name': data.get('icon_name'), 'display_order': data.get('display_order', 0),
'id': category_id})
session.commit()
return jsonify({'message': 'Category updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/categories/<int:category_id>', methods=['DELETE'])
def api_admin_delete_category(category_id):
session = Session()
try:
session.execute(text("DELETE FROM part_categories WHERE id_part_category = :id"), {'id': category_id})
session.commit()
return jsonify({'message': 'Category deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Groups CRUD ----
@app.route('/api/admin/groups')
def api_admin_list_groups():
session = Session()
try:
category_id = request.args.get('category_id', type=int)
q = """
SELECT pg.id_part_group AS id, pg.name_part_group AS name, pg.name_es,
pg.category_id, pg.display_order, pg.slug,
pc.name_part_category AS category_name
FROM part_groups pg
LEFT JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE 1=1
"""
params = {}
if category_id:
q += " AND pg.category_id = :cid"
params['cid'] = category_id
q += " ORDER BY pg.display_order, pg.name_part_group"
rows = session.execute(text(q), params).mappings().all()
return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'category_id': r['category_id'], 'category_name': r['category_name'],
'display_order': r['display_order'], 'slug': r['slug']} for r in rows])
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/groups', methods=['POST'])
def api_admin_create_group():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO part_groups (category_id, name_part_group, name_es, slug, display_order)
VALUES (:category_id, :name, :name_es, :slug, :display_order)
RETURNING id_part_group
"""), {'category_id': data['category_id'], 'name': data['name'],
'name_es': data.get('name_es'),
'slug': data.get('slug') or data['name'].lower().replace(' ', '-'),
'display_order': data.get('display_order', 0)})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Group created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/groups/<int:group_id>', methods=['PUT'])
def api_admin_update_group(group_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE part_groups SET category_id = :category_id, name_part_group = :name,
name_es = :name_es, display_order = :display_order
WHERE id_part_group = :id
"""), {'category_id': data['category_id'], 'name': data['name'],
'name_es': data.get('name_es'), 'display_order': data.get('display_order', 0),
'id': group_id})
session.commit()
return jsonify({'message': 'Group updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/groups/<int:group_id>', methods=['DELETE'])
def api_admin_delete_group(group_id):
session = Session()
try:
session.execute(text("DELETE FROM part_groups WHERE id_part_group = :id"), {'id': group_id})
session.commit()
return jsonify({'message': 'Group deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Parts CRUD ----
@app.route('/api/admin/parts', methods=['POST'])
def api_admin_create_part():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO parts (oem_part_number, name_part, name_es, group_id, description,
description_es, weight_kg, id_material)
VALUES (:oem, :name, :name_es, :group_id, :desc, :desc_es, :weight,
(SELECT id_material FROM materials WHERE name_material = :material))
RETURNING id_part
"""), {'oem': data['oem_part_number'], 'name': data['name'], 'name_es': data.get('name_es'),
'group_id': data['group_id'], 'desc': data.get('description'),
'desc_es': data.get('description_es'), 'weight': data.get('weight_kg'),
'material': data.get('material')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Part created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/parts/<int:part_id>', methods=['PUT'])
def api_admin_update_part(part_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE parts SET oem_part_number = :oem, name_part = :name, name_es = :name_es,
group_id = :group_id, description = :desc, description_es = :desc_es,
weight_kg = :weight,
id_material = (SELECT id_material FROM materials WHERE name_material = :material)
WHERE id_part = :id
"""), {'oem': data['oem_part_number'], 'name': data['name'], 'name_es': data.get('name_es'),
'group_id': data['group_id'], 'desc': data.get('description'),
'desc_es': data.get('description_es'), 'weight': data.get('weight_kg'),
'material': data.get('material'), 'id': part_id})
session.commit()
return jsonify({'message': 'Part updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/parts/<int:part_id>', methods=['DELETE'])
def api_admin_delete_part(part_id):
session = Session()
try:
session.execute(text("DELETE FROM parts WHERE id_part = :id"), {'id': part_id})
session.commit()
return jsonify({'message': 'Part deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Manufacturers CRUD ----
@app.route('/api/admin/manufacturers', methods=['POST'])
def api_admin_create_manufacturer():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier, id_country, website)
VALUES (:name,
(SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt),
(SELECT id_country FROM countries WHERE name_country = :country),
:website)
RETURNING id_manufacture
"""), {'name': data['name'], 'type': data.get('type', 'aftermarket'),
'qt': data.get('quality_tier', 'standard'), 'country': data.get('country'),
'website': data.get('website')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Manufacturer created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['PUT'])
def api_admin_update_manufacturer(manufacturer_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE manufacturers SET name_manufacture = :name,
id_type_manu = (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt),
id_country = (SELECT id_country FROM countries WHERE name_country = :country),
website = :website
WHERE id_manufacture = :id
"""), {'name': data['name'], 'type': data.get('type'), 'qt': data.get('quality_tier'),
'country': data.get('country'), 'website': data.get('website'), 'id': manufacturer_id})
session.commit()
return jsonify({'message': 'Manufacturer updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['DELETE'])
def api_admin_delete_manufacturer(manufacturer_id):
session = Session()
try:
session.execute(text("DELETE FROM manufacturers WHERE id_manufacture = :id"), {'id': manufacturer_id})
session.commit()
return jsonify({'message': 'Manufacturer deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Aftermarket Parts CRUD ----
@app.route('/api/admin/aftermarket', methods=['POST'])
def api_admin_create_aftermarket():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number,
name_aftermarket_parts, name_es, id_quality_tier, price_usd, warranty_months)
VALUES (:oem_part_id, :manufacturer_id, :part_number, :name, :name_es,
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt),
:price_usd, :warranty_months)
RETURNING id_aftermarket_parts
"""), {'oem_part_id': data['oem_part_id'], 'manufacturer_id': data['manufacturer_id'],
'part_number': data['part_number'], 'name': data.get('name'), 'name_es': data.get('name_es'),
'qt': data.get('quality_tier', 'standard'), 'price_usd': data.get('price_usd'),
'warranty_months': data.get('warranty_months')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Aftermarket part created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['PUT'])
def api_admin_update_aftermarket(aftermarket_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE aftermarket_parts SET oem_part_id = :oem_part_id, manufacturer_id = :manufacturer_id,
part_number = :part_number, name_aftermarket_parts = :name, name_es = :name_es,
id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt),
price_usd = :price_usd, warranty_months = :warranty_months
WHERE id_aftermarket_parts = :id
"""), {'oem_part_id': data['oem_part_id'], 'manufacturer_id': data['manufacturer_id'],
'part_number': data['part_number'], 'name': data.get('name'), 'name_es': data.get('name_es'),
'qt': data.get('quality_tier'), 'price_usd': data.get('price_usd'),
'warranty_months': data.get('warranty_months'), 'id': aftermarket_id})
session.commit()
return jsonify({'message': 'Aftermarket part updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['DELETE'])
def api_admin_delete_aftermarket(aftermarket_id):
session = Session()
try:
session.execute(text("DELETE FROM aftermarket_parts WHERE id_aftermarket_parts = :id"), {'id': aftermarket_id})
session.commit()
return jsonify({'message': 'Aftermarket part deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Cross-References CRUD ----
@app.route('/api/admin/crossref')
def api_admin_list_crossref():
session = Session()
try:
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 100)
offset = (page - 1) * per_page
total_count = session.execute(text("SELECT COUNT(*) FROM part_cross_references")).scalar()
rows = session.execute(text("""
SELECT pcr.id_part_cross_ref AS id, pcr.part_id, pcr.cross_reference_number,
rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes,
p.oem_part_number, p.name_part AS part_name
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id_part
LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type
ORDER BY pcr.id_part_cross_ref DESC LIMIT :limit OFFSET :offset
"""), {'limit': per_page, 'offset': offset}).mappings().all()
refs = [{'id': r['id'], 'part_id': r['part_id'], 'cross_reference_number': r['cross_reference_number'],
'reference_type': r['reference_type'], 'source': r['source'], 'notes': r['notes'],
'oem_part_number': r['oem_part_number'], 'part_name': r['part_name']} for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': refs, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/crossref', methods=['POST'])
def api_admin_create_crossref():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO part_cross_references (part_id, cross_reference_number, id_ref_type, source_ref, notes)
VALUES (:part_id, :cross_ref,
(SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type),
:source, :notes)
RETURNING id_part_cross_ref
"""), {'part_id': data['part_id'], 'cross_ref': data['cross_reference_number'],
'ref_type': data['reference_type'], 'source': data.get('source'), 'notes': data.get('notes')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Cross-reference created'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['PUT'])
def api_admin_update_crossref(crossref_id):
session = Session()
try:
data = request.get_json()
session.execute(text("""
UPDATE part_cross_references SET part_id = :part_id, cross_reference_number = :cross_ref,
id_ref_type = (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type),
source_ref = :source, notes = :notes
WHERE id_part_cross_ref = :id
"""), {'part_id': data['part_id'], 'cross_ref': data['cross_reference_number'],
'ref_type': data['reference_type'], 'source': data.get('source'),
'notes': data.get('notes'), 'id': crossref_id})
session.commit()
return jsonify({'message': 'Cross-reference updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['DELETE'])
def api_admin_delete_crossref(crossref_id):
session = Session()
try:
session.execute(text("DELETE FROM part_cross_references WHERE id_part_cross_ref = :id"), {'id': crossref_id})
session.commit()
return jsonify({'message': 'Cross-reference deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ---- Fitment CRUD ----
@app.route('/api/admin/fitment')
def api_admin_list_fitment():
session = Session()
try:
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 50, type=int), 500)
offset = (page - 1) * per_page
brand = request.args.get('brand')
model = request.args.get('model')
mye_id = request.args.get('mye_id', type=int)
where = " WHERE 1=1"
params = {'limit': per_page, 'offset': offset}
if mye_id:
where += " AND vp.model_year_engine_id = :mye_id"
params['mye_id'] = mye_id
if brand:
where += " AND b.name_brand ILIKE :brand"
params['brand'] = brand
if model:
where += " AND m.name_model ILIKE :model"
params['model'] = model
base = """
FROM vehicle_parts vp
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
"""
total_count = session.execute(text("SELECT COUNT(*) " + base + where), params).scalar()
rows = session.execute(text("""
SELECT vp.id_vehicle_part AS id, vp.model_year_engine_id, vp.part_id,
vp.quantity_required, pp.name_position_part AS position, vp.fitment_notes,
b.name_brand AS brand, m.name_model AS model, y.year_car AS year,
e.name_engine AS engine, p.oem_part_number, p.name_part AS part_name
FROM vehicle_parts vp
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye
JOIN models m ON mye.model_id = m.id_model
JOIN brands b ON m.brand_id = b.id_brand
JOIN years y ON mye.year_id = y.id_year
JOIN engines e ON mye.engine_id = e.id_engine
JOIN parts p ON vp.part_id = p.id_part
LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part
""" + where + " ORDER BY vp.id_vehicle_part DESC LIMIT :limit OFFSET :offset"), params).mappings().all()
fitments = [dict(r) for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': fitments, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/fitment', methods=['POST'])
def api_admin_create_fitment():
session = Session()
try:
data = request.get_json()
result = session.execute(text("""
INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required,
id_position_part, fitment_notes)
VALUES (:mye_id, :part_id, :qty,
(SELECT id_position_part FROM position_part WHERE name_position_part = :position),
:notes)
RETURNING id_vehicle_part
"""), {'mye_id': data['model_year_engine_id'], 'part_id': data['part_id'],
'qty': data.get('quantity_required', 1), 'position': data.get('position'),
'notes': data.get('fitment_notes')})
new_id = result.scalar()
session.commit()
return jsonify({'id': new_id, 'message': 'Fitment created'})
except IntegrityError:
session.rollback()
return jsonify({'error': 'Este fitment ya existe'}), 400
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/fitment/<int:fitment_id>', methods=['DELETE'])
def api_admin_delete_fitment(fitment_id):
session = Session()
try:
session.execute(text("DELETE FROM vehicle_parts WHERE id_vehicle_part = :id"), {'id': fitment_id})
session.commit()
return jsonify({'message': 'Fitment deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# CSV Import
# ============================================================================
@app.route('/api/admin/import/<import_type>', methods=['POST'])
def api_admin_import_csv(import_type):
session = Session()
try:
data = request.get_json()
records = data.get('records', [])
if not records:
return jsonify({'error': 'No records to import'}), 400
imported = 0
errors = []
for i, rec in enumerate(records):
try:
if import_type == 'categories':
session.execute(text("INSERT INTO part_categories (name_part_category, name_es, slug, icon_name, display_order) VALUES (:name, :name_es, :slug, :icon_name, :do)"),
{'name': rec['name'], 'name_es': rec.get('name_es'), 'slug': rec.get('slug') or rec['name'].lower().replace(' ', '-'), 'icon_name': rec.get('icon_name'), 'do': rec.get('display_order', 0)})
elif import_type == 'groups':
session.execute(text("INSERT INTO part_groups (category_id, name_part_group, name_es, display_order) VALUES (:cid, :name, :name_es, :do)"),
{'cid': rec['category_id'], 'name': rec['name'], 'name_es': rec.get('name_es'), 'do': rec.get('display_order', 0)})
elif import_type == 'parts':
session.execute(text("""INSERT INTO parts (oem_part_number, name_part, name_es, group_id, description, description_es, weight_kg, id_material)
VALUES (:oem, :name, :name_es, :gid, :desc, :desc_es, :weight, (SELECT id_material FROM materials WHERE name_material = :material))"""),
{'oem': rec['oem_part_number'], 'name': rec['name'], 'name_es': rec.get('name_es'), 'gid': rec['group_id'], 'desc': rec.get('description'), 'desc_es': rec.get('description_es'), 'weight': rec.get('weight_kg'), 'material': rec.get('material')})
elif import_type == 'manufacturers':
session.execute(text("""INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier, id_country, website)
VALUES (:name, (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type),
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt),
(SELECT id_country FROM countries WHERE name_country = :country), :website)"""),
{'name': rec['name'], 'type': rec.get('type', 'aftermarket'), 'qt': rec.get('quality_tier', 'standard'), 'country': rec.get('country'), 'website': rec.get('website')})
elif import_type == 'aftermarket':
session.execute(text("""INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts, name_es, id_quality_tier, price_usd, warranty_months)
VALUES (:oem_part_id, :mid, :pn, :name, :name_es,
(SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), :price, :warranty)"""),
{'oem_part_id': rec['oem_part_id'], 'mid': rec['manufacturer_id'], 'pn': rec['part_number'], 'name': rec.get('name'), 'name_es': rec.get('name_es'), 'qt': rec.get('quality_tier', 'standard'), 'price': rec.get('price_usd'), 'warranty': rec.get('warranty_months')})
elif import_type == 'crossref':
session.execute(text("""INSERT INTO part_cross_references (part_id, cross_reference_number, id_ref_type, source_ref, notes)
VALUES (:pid, :cross_ref, (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type), :source, :notes)"""),
{'pid': rec['part_id'], 'cross_ref': rec['cross_reference_number'], 'ref_type': rec.get('reference_type'), 'source': rec.get('source'), 'notes': rec.get('notes')})
elif import_type == 'fitment':
session.execute(text("""INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, id_position_part, fitment_notes)
VALUES (:mye_id, :pid, :qty, (SELECT id_position_part FROM position_part WHERE name_position_part = :position), :notes)
ON CONFLICT (model_year_engine_id, part_id, id_position_part) DO NOTHING"""),
{'mye_id': rec['model_year_engine_id'], 'pid': rec['part_id'], 'qty': rec.get('quantity_required', 1), 'position': rec.get('position'), 'notes': rec.get('fitment_notes')})
else:
return jsonify({'error': f'Unknown import type: {import_type}'}), 400
imported += 1
except Exception as e:
errors.append(f"Row {i + 1}: {str(e)}")
session.commit()
result = {'imported': imported}
if errors:
result['errors'] = errors[:10]
return jsonify(result)
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# CSV Export
# ============================================================================
@app.route('/api/admin/export/<export_type>')
def api_admin_export_csv(export_type):
session = Session()
try:
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 1000, type=int), 10000)
offset = (page - 1) * per_page
export_queries = {
'categories': ("SELECT id_part_category AS id, name_part_category AS name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name_part_category", "part_categories"),
'groups': ("SELECT id_part_group AS id, category_id, name_part_group AS name, name_es, display_order FROM part_groups ORDER BY category_id, display_order, name_part_group", "part_groups"),
'parts': ("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.group_id, p.description, p.description_es, p.weight_kg, mat.name_material AS material FROM parts p LEFT JOIN materials mat ON p.id_material = mat.id_material ORDER BY p.id_part", "parts"),
'manufacturers': ("SELECT mfr.id_manufacture AS id, mfr.name_manufacture AS name, mt.name_type_manu AS type, qt.name_quality AS quality_tier, co.name_country AS country, mfr.website FROM manufacturers mfr LEFT JOIN manufacture_type mt ON mfr.id_type_manu = mt.id_type_manu LEFT JOIN quality_tier qt ON mfr.id_quality_tier = qt.id_quality_tier LEFT JOIN countries co ON mfr.id_country = co.id_country ORDER BY mfr.name_manufacture", "manufacturers"),
'aftermarket': ("SELECT ap.id_aftermarket_parts AS id, ap.oem_part_id, ap.manufacturer_id, ap.part_number, ap.name_aftermarket_parts AS name, ap.name_es, qt.name_quality AS quality_tier, ap.price_usd, ap.warranty_months FROM aftermarket_parts ap LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier ORDER BY ap.id_aftermarket_parts", "aftermarket_parts"),
'crossref': ("SELECT pcr.id_part_cross_ref AS id, pcr.part_id, pcr.cross_reference_number, rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes FROM part_cross_references pcr LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type ORDER BY pcr.id_part_cross_ref", "part_cross_references"),
'fitment': ("SELECT vp.id_vehicle_part AS id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, pp.name_position_part AS position, vp.fitment_notes FROM vehicle_parts vp LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part ORDER BY vp.id_vehicle_part", "vehicle_parts"),
}
if export_type not in export_queries:
return jsonify({'error': f'Unknown export type: {export_type}'}), 400
base_query, table_name = export_queries[export_type]
total_count = session.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar()
rows = session.execute(text(base_query + " LIMIT :limit OFFSET :offset"), {'limit': per_page, 'offset': offset}).mappings().all()
data_list = [dict(r) for r in rows]
total_pages = (total_count + per_page - 1) // per_page
return jsonify({'data': data_list, 'pagination': {'page': page, 'per_page': per_page,
'total': total_count, 'total_pages': total_pages}})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Image Upload
# ============================================================================
@app.route('/api/admin/upload-image', methods=['POST'])
def api_admin_upload_image():
try:
data = request.get_json()
image_data = data.get('image')
if not image_data:
return jsonify({'error': 'No image data provided'}), 400
if ',' in image_data:
header, encoded = image_data.split(',', 1)
ext = 'png'
if 'jpeg' in header or 'jpg' in header:
ext = 'jpg'
elif 'gif' in header:
ext = 'gif'
elif 'webp' in header:
ext = 'webp'
else:
encoded = image_data
ext = 'png'
image_bytes = base64.b64decode(encoded)
filename = f"{uuid.uuid4().hex}.{ext}"
filepath = os.path.join('static', 'parts_images', filename)
os.makedirs(os.path.join('.', 'static', 'parts_images'), exist_ok=True)
with open(filepath, 'wb') as f:
f.write(image_bytes)
return jsonify({'url': f"/static/parts_images/{filename}", 'filename': filename})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Diagrams by Category
# ============================================================================
@app.route('/api/vehicles/<int:mye_id>/diagrams/by-category')
def api_vehicle_diagrams_by_category(mye_id):
session = Session()
try:
category_id = request.args.get('category_id', type=int)
params = {'mye_id': mye_id}
cat_filter = ""
if category_id:
cat_filter = " AND pc.id_part_category = :cid"
params['cid'] = category_id
rows = session.execute(text(f"""
SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es,
d.group_id, d.image_path, d.thumbnail_path,
pg.name_part_group AS group_name, pg.name_es AS group_name_es,
pc.id_part_category AS category_id, pc.name_part_category AS category_name,
pc.name_es AS category_name_es, vd.notes
FROM vehicle_diagrams vd
JOIN diagrams d ON vd.diagram_id = d.id_diagram
JOIN part_groups pg ON d.group_id = pg.id_part_group
JOIN part_categories pc ON pg.category_id = pc.id_part_category
WHERE vd.model_year_engine_id = :mye_id {cat_filter}
ORDER BY pc.display_order, pg.display_order, d.display_order, d.name_diagram
"""), params).mappings().all()
categories = {}
for r in rows:
cid = r['category_id']
if cid not in categories:
categories[cid] = {'category_id': cid, 'category_name': r['category_name'],
'category_name_es': r['category_name_es'], 'diagrams': []}
ip = r['image_path'] or ''
iu = '/' + ip if ip and not ip.startswith('/') else ip
categories[cid]['diagrams'].append({'id': r['id'], 'name': r['name'], 'name_es': r['name_es'],
'group_id': r['group_id'], 'group_name': r['group_name'],
'group_name_es': r['group_name_es'], 'image_url': iu,
'thumbnail_path': r['thumbnail_path'], 'notes': r['notes']})
return jsonify(list(categories.values()))
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Hotspot CRUD
# ============================================================================
@app.route('/api/admin/hotspots', methods=['POST'])
def api_admin_create_hotspot():
session = Session()
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
diagram_id = data.get('diagram_id')
coords = data.get('coords', '')
if not diagram_id or not coords:
return jsonify({'error': 'diagram_id and coords are required'}), 400
result = session.execute(text("""
INSERT INTO diagram_hotspots (diagram_id, part_id, callout_number, id_shape, coords)
VALUES (:did, :pid, :callout,
(SELECT id_shape FROM shapes WHERE name_shape = :shape), :coords)
RETURNING id_dgr_hotspot
"""), {'did': diagram_id, 'pid': data.get('part_id'),
'callout': data.get('callout_number'), 'shape': data.get('shape', 'circle'),
'coords': coords})
hotspot_id = result.scalar()
session.commit()
return jsonify({'id': hotspot_id, 'message': 'Hotspot created'}), 201
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/hotspots/<int:hotspot_id>', methods=['PUT'])
def api_admin_update_hotspot(hotspot_id):
session = Session()
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
exists = session.execute(text("SELECT id_dgr_hotspot FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id}).mappings().first()
if not exists:
return jsonify({'error': 'Hotspot not found'}), 404
fields = []
params = {'id': hotspot_id}
if 'part_id' in data:
fields.append("part_id = :part_id")
params['part_id'] = data['part_id']
if 'callout_number' in data:
fields.append("callout_number = :callout_number")
params['callout_number'] = data['callout_number']
if 'shape' in data:
fields.append("id_shape = (SELECT id_shape FROM shapes WHERE name_shape = :shape)")
params['shape'] = data['shape']
if 'coords' in data:
fields.append("coords = :coords")
params['coords'] = data['coords']
if not fields:
return jsonify({'error': 'No fields to update'}), 400
session.execute(text(f"UPDATE diagram_hotspots SET {', '.join(fields)} WHERE id_dgr_hotspot = :id"), params)
session.commit()
return jsonify({'message': 'Hotspot updated'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
@app.route('/api/admin/hotspots/<int:hotspot_id>', methods=['DELETE'])
def api_admin_delete_hotspot(hotspot_id):
session = Session()
try:
exists = session.execute(text("SELECT id_dgr_hotspot FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id}).mappings().first()
if not exists:
return jsonify({'error': 'Hotspot not found'}), 404
session.execute(text("DELETE FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id})
session.commit()
return jsonify({'message': 'Hotspot deleted'})
except Exception as e:
session.rollback()
return jsonify({'error': str(e)}), 500
finally:
session.close()
# ============================================================================
# Main Block
# ============================================================================
if __name__ == '__main__':
print("Starting Nexus Autoparts Dashboard Server...")
print("Visit http://localhost:5000 to access the dashboard locally")
app.run(debug=False, host='0.0.0.0', port=5000)