- Add admin interface (admin.html, admin.js) for managing catalog data - Add enhanced search module with advanced filtering capabilities - Expand server.py with new API endpoints and admin functionality - Add Gonher catalog import scripts (import_gonher_catalog.py, import_gonher_complete.py) - Add demo data population script and sample CSV data - Update customer landing page and dashboard with UI improvements - Update database with enriched vehicle and parts data Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3259 lines
108 KiB
Python
3259 lines
108 KiB
Python
from flask import Flask, render_template, jsonify, request, send_from_directory
|
|
import sqlite3
|
|
import os
|
|
|
|
app = Flask(__name__, static_folder='.')
|
|
|
|
# Database path - adjust as needed
|
|
DATABASE_PATH = '../vehicle_database/vehicle_database.db'
|
|
|
|
def get_db_connection():
|
|
"""Get a connection to the vehicle database"""
|
|
conn = sqlite3.connect(DATABASE_PATH)
|
|
conn.row_factory = sqlite3.Row # This enables column access by name
|
|
return conn
|
|
|
|
def get_all_brands():
|
|
"""Get all unique brands that have vehicles with parts"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT DISTINCT b.name
|
|
FROM brands b
|
|
JOIN models m ON m.brand_id = b.id
|
|
JOIN model_year_engine mye ON mye.model_id = m.id
|
|
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
|
|
ORDER BY b.name
|
|
""")
|
|
brands = [row['name'] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return brands
|
|
|
|
def get_all_years():
|
|
"""Get all unique years from the database"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT DISTINCT year FROM years ORDER BY year DESC")
|
|
years = [row['year'] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return years
|
|
|
|
def get_all_engines():
|
|
"""Get all unique engines from the database"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT DISTINCT name FROM engines ORDER BY name")
|
|
engines = [row['name'] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return engines
|
|
|
|
def get_models_by_brand(brand_name=None):
|
|
"""Get all models that have vehicles with parts, optionally filtered by brand"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if brand_name:
|
|
cursor.execute("""
|
|
SELECT DISTINCT m.name
|
|
FROM models m
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN model_year_engine mye ON mye.model_id = m.id
|
|
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
|
|
WHERE b.name = ?
|
|
ORDER BY m.name
|
|
""", (brand_name,))
|
|
else:
|
|
cursor.execute("""
|
|
SELECT DISTINCT m.name
|
|
FROM models m
|
|
JOIN model_year_engine mye ON mye.model_id = m.id
|
|
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
|
|
ORDER BY m.name
|
|
""")
|
|
|
|
models = [row['name'] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return models
|
|
|
|
def search_vehicles(brand=None, model=None, year=None, engine=None, with_parts=True):
|
|
"""Search for vehicles based on filters. By default only returns vehicles with parts."""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine,
|
|
e.power_hp,
|
|
e.torque_nm,
|
|
e.displacement_cc,
|
|
e.cylinders,
|
|
e.fuel_type,
|
|
mye.trim_level,
|
|
mye.drivetrain,
|
|
mye.transmission
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
"""
|
|
|
|
# Only show vehicles that have parts
|
|
if with_parts:
|
|
query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)"
|
|
else:
|
|
query += " WHERE 1=1"
|
|
|
|
params = []
|
|
if brand:
|
|
query += " AND b.name = ?"
|
|
params.append(brand)
|
|
if model:
|
|
query += " AND m.name = ?"
|
|
params.append(model)
|
|
if year:
|
|
query += " AND y.year = ?"
|
|
params.append(int(year))
|
|
if engine:
|
|
query += " AND e.name = ?"
|
|
params.append(engine)
|
|
|
|
query += " ORDER BY b.name, m.name, y.year"
|
|
|
|
cursor.execute(query, params)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
# Convert to list of dictionaries
|
|
vehicles = []
|
|
for row in results:
|
|
vehicle = {
|
|
'brand': row['brand'],
|
|
'model': row['model'],
|
|
'year': row['year'],
|
|
'engine': row['engine'],
|
|
'power_hp': row['power_hp'] or 0,
|
|
'torque_nm': row['torque_nm'] or 0,
|
|
'displacement_cc': row['displacement_cc'] or 0,
|
|
'cylinders': row['cylinders'] or 0,
|
|
'fuel_type': row['fuel_type'] or 'unknown',
|
|
'trim_level': row['trim_level'] or 'unknown',
|
|
'drivetrain': row['drivetrain'] or 'unknown',
|
|
'transmission': row['transmission'] or 'unknown'
|
|
}
|
|
vehicles.append(vehicle)
|
|
|
|
return vehicles
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Serve the main dashboard page"""
|
|
return send_from_directory('.', 'index.html')
|
|
|
|
@app.route('/<path:path>')
|
|
def static_files(path):
|
|
"""Serve static files"""
|
|
return send_from_directory('.', path)
|
|
|
|
@app.route('/api/brands')
|
|
def api_brands():
|
|
"""API endpoint to get all brands"""
|
|
brands = get_all_brands()
|
|
return jsonify(brands)
|
|
|
|
@app.route('/api/years')
|
|
def api_years():
|
|
"""API endpoint to get years, optionally filtered by brand and/or model"""
|
|
brand = request.args.get('brand')
|
|
model = request.args.get('model')
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT DISTINCT y.year
|
|
FROM years y
|
|
JOIN model_year_engine mye ON y.id = mye.year_id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
if brand:
|
|
query += " AND b.name = ?"
|
|
params.append(brand)
|
|
if model:
|
|
query += " AND m.name = ?"
|
|
params.append(model)
|
|
|
|
query += " ORDER BY y.year DESC"
|
|
|
|
cursor.execute(query, params)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
years = [row['year'] for row in results]
|
|
return jsonify(years)
|
|
|
|
@app.route('/api/engines')
|
|
def api_engines():
|
|
"""API endpoint to get engines, optionally filtered by brand, model, and/or year"""
|
|
brand = request.args.get('brand')
|
|
model = request.args.get('model')
|
|
year = request.args.get('year')
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT DISTINCT e.name
|
|
FROM engines e
|
|
JOIN model_year_engine mye ON e.id = mye.engine_id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
WHERE 1=1
|
|
"""
|
|
|
|
params = []
|
|
if brand:
|
|
query += " AND b.name = ?"
|
|
params.append(brand)
|
|
if model:
|
|
query += " AND m.name = ?"
|
|
params.append(model)
|
|
if year:
|
|
query += " AND y.year = ?"
|
|
params.append(int(year))
|
|
|
|
query += " ORDER BY e.name"
|
|
|
|
cursor.execute(query, params)
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
|
|
engines = [row['name'] for row in results]
|
|
return jsonify(engines)
|
|
|
|
@app.route('/api/models')
|
|
def api_models():
|
|
"""API endpoint to get models, optionally filtered by brand"""
|
|
brand = request.args.get('brand')
|
|
models = get_models_by_brand(brand)
|
|
return jsonify(models)
|
|
|
|
@app.route('/api/vehicles')
|
|
def api_vehicles():
|
|
"""API endpoint to search for vehicles"""
|
|
brand = request.args.get('brand')
|
|
model = request.args.get('model')
|
|
year = request.args.get('year')
|
|
engine = request.args.get('engine')
|
|
|
|
vehicles = search_vehicles(brand, model, year, engine)
|
|
return jsonify(vehicles)
|
|
|
|
# ============================================================================
|
|
# Parts Catalog API Endpoints
|
|
# ============================================================================
|
|
|
|
@app.route('/api/categories')
|
|
def api_categories():
|
|
"""API endpoint to get all part categories (hierarchical)"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get all categories
|
|
cursor.execute("""
|
|
SELECT id, name, name_es, slug, icon_name, display_order, parent_id
|
|
FROM part_categories
|
|
ORDER BY display_order, name
|
|
""")
|
|
all_categories = cursor.fetchall()
|
|
conn.close()
|
|
|
|
# Build hierarchical structure
|
|
categories_dict = {}
|
|
root_categories = []
|
|
|
|
# First pass: create all category objects
|
|
for row in all_categories:
|
|
category = {
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'slug': row['slug'],
|
|
'icon_name': row['icon_name'],
|
|
'display_order': row['display_order'],
|
|
'children': []
|
|
}
|
|
categories_dict[row['id']] = category
|
|
|
|
if row['parent_id'] is None:
|
|
root_categories.append(category)
|
|
|
|
# Second pass: build hierarchy
|
|
for row in all_categories:
|
|
if row['parent_id'] is not None and row['parent_id'] in categories_dict:
|
|
categories_dict[row['parent_id']]['children'].append(categories_dict[row['id']])
|
|
|
|
return jsonify(root_categories)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/categories/<int:category_id>/groups')
|
|
def api_category_groups(category_id):
|
|
"""API endpoint to get groups for a specific category"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, name, name_es, slug, display_order
|
|
FROM part_groups
|
|
WHERE category_id = ?
|
|
ORDER BY display_order, name
|
|
""", (category_id,))
|
|
|
|
groups = []
|
|
for row in cursor.fetchall():
|
|
groups.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'slug': row['slug'],
|
|
'display_order': row['display_order']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(groups)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/parts')
|
|
def api_parts():
|
|
"""API endpoint to list parts with optional filters and pagination"""
|
|
try:
|
|
group_id = request.args.get('group_id', type=int)
|
|
category_id = request.args.get('category_id', type=int)
|
|
search = request.args.get('search')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
per_page = min(per_page, 100) # Max 100 per page
|
|
offset = (page - 1) * per_page
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Build base WHERE clause for both count and data queries
|
|
where_clause = " WHERE 1=1"
|
|
params = []
|
|
|
|
if group_id:
|
|
where_clause += " AND p.group_id = ?"
|
|
params.append(group_id)
|
|
|
|
if category_id:
|
|
where_clause += " AND pg.category_id = ?"
|
|
params.append(category_id)
|
|
|
|
if search:
|
|
where_clause += " AND (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?)"
|
|
search_term = f"%{search}%"
|
|
params.extend([search_term, search_term, search_term])
|
|
|
|
# Get total count
|
|
count_query = """
|
|
SELECT COUNT(*) as total
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
""" + where_clause
|
|
cursor.execute(count_query, params)
|
|
total_count = cursor.fetchone()['total']
|
|
|
|
# Get paginated data
|
|
data_query = """
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.group_id,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
""" + where_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
|
|
params.extend([per_page, offset])
|
|
|
|
cursor.execute(data_query, params)
|
|
|
|
parts = []
|
|
for row in cursor.fetchall():
|
|
parts.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'group_id': row['group_id'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'image_url': row['image_url']
|
|
})
|
|
|
|
conn.close()
|
|
|
|
total_pages = (total_count + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': parts,
|
|
'pagination': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_count,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/parts/<int:part_id>')
|
|
def api_part_detail(part_id):
|
|
"""API endpoint to get single part details"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.description,
|
|
p.description_es,
|
|
p.group_id,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pg.name_es AS group_name_es,
|
|
pc.id AS category_id,
|
|
pc.name AS category_name,
|
|
pc.name_es AS category_name_es
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE p.id = ?
|
|
""", (part_id,))
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row is None:
|
|
return jsonify({'error': 'Part not found'}), 404
|
|
|
|
part = {
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'description': row['description'],
|
|
'description_es': row['description_es'],
|
|
'group_id': row['group_id'],
|
|
'group_name': row['group_name'],
|
|
'image_url': row['image_url'],
|
|
'group_name_es': row['group_name_es'],
|
|
'category_id': row['category_id'],
|
|
'category_name': row['category_name'],
|
|
'category_name_es': row['category_name_es']
|
|
}
|
|
|
|
return jsonify(part)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/vehicles/<int:mye_id>/categories')
|
|
def api_vehicle_categories(mye_id):
|
|
"""API endpoint to get categories that have parts for a specific vehicle"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT DISTINCT
|
|
pc.id,
|
|
pc.name,
|
|
pc.name_es,
|
|
pc.slug,
|
|
pc.icon_name,
|
|
pc.display_order
|
|
FROM part_categories pc
|
|
JOIN part_groups pg ON pg.category_id = pc.id
|
|
JOIN parts p ON p.group_id = pg.id
|
|
JOIN vehicle_parts vp ON vp.part_id = p.id
|
|
WHERE vp.model_year_engine_id = ?
|
|
ORDER BY pc.display_order, pc.name
|
|
""", (mye_id,))
|
|
|
|
categories = []
|
|
for row in cursor.fetchall():
|
|
categories.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'slug': row['slug'],
|
|
'icon_name': row['icon_name'],
|
|
'display_order': row['display_order']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(categories)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/vehicles/<int:mye_id>/groups')
|
|
def api_vehicle_groups(mye_id):
|
|
"""API endpoint to get groups that have parts for a specific vehicle within a category"""
|
|
try:
|
|
category_id = request.args.get('category_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT DISTINCT
|
|
pg.id,
|
|
pg.name,
|
|
pg.name_es,
|
|
pg.slug,
|
|
pg.display_order,
|
|
COUNT(DISTINCT p.id) as parts_count
|
|
FROM part_groups pg
|
|
JOIN parts p ON p.group_id = pg.id
|
|
JOIN vehicle_parts vp ON vp.part_id = p.id
|
|
WHERE vp.model_year_engine_id = ?
|
|
"""
|
|
params = [mye_id]
|
|
|
|
if category_id:
|
|
query += " AND pg.category_id = ?"
|
|
params.append(category_id)
|
|
|
|
query += " GROUP BY pg.id ORDER BY pg.display_order, pg.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
groups = []
|
|
for row in cursor.fetchall():
|
|
groups.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'slug': row['slug'],
|
|
'display_order': row['display_order'],
|
|
'parts_count': row['parts_count']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(groups)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/vehicles/<int:mye_id>/parts')
|
|
def api_vehicle_parts(mye_id):
|
|
"""API endpoint to get parts for a specific vehicle"""
|
|
try:
|
|
category_id = request.args.get('category_id', type=int)
|
|
group_id = request.args.get('group_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
vp.quantity_required,
|
|
vp.position,
|
|
pc.name AS category_name,
|
|
pg.name AS group_name
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON vp.part_id = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE vp.model_year_engine_id = ?
|
|
"""
|
|
params = [mye_id]
|
|
|
|
if category_id:
|
|
query += " AND pc.id = ?"
|
|
params.append(category_id)
|
|
|
|
if group_id:
|
|
query += " AND pg.id = ?"
|
|
params.append(group_id)
|
|
|
|
query += " ORDER BY pc.display_order, pg.display_order, p.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
parts = []
|
|
for row in cursor.fetchall():
|
|
parts.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'quantity_required': row['quantity_required'],
|
|
'position': row['position'],
|
|
'category_name': row['category_name'],
|
|
'group_name': row['group_name']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(parts)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/model-year-engine')
|
|
def api_model_year_engine():
|
|
"""API endpoint to get model_year_engine records with filters"""
|
|
try:
|
|
brand = request.args.get('brand')
|
|
model = request.args.get('model')
|
|
year = request.args.get('year', type=int)
|
|
with_parts = request.args.get('with_parts', 'true').lower() == 'true'
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
mye.id,
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine,
|
|
mye.trim_level,
|
|
mye.drivetrain,
|
|
mye.transmission
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
"""
|
|
|
|
# Only show vehicles that have parts
|
|
if with_parts:
|
|
query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)"
|
|
else:
|
|
query += " WHERE 1=1"
|
|
|
|
params = []
|
|
|
|
if brand:
|
|
query += " AND b.name = ?"
|
|
params.append(brand)
|
|
|
|
if model:
|
|
query += " AND m.name = ?"
|
|
params.append(model)
|
|
|
|
if year:
|
|
query += " AND y.year = ?"
|
|
params.append(year)
|
|
|
|
query += " ORDER BY b.name, m.name, y.year, e.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
records = []
|
|
for row in cursor.fetchall():
|
|
records.append({
|
|
'id': row['id'],
|
|
'brand': row['brand'],
|
|
'model': row['model'],
|
|
'year': row['year'],
|
|
'engine': row['engine'],
|
|
'trim_level': row['trim_level'],
|
|
'drivetrain': row['drivetrain'],
|
|
'transmission': row['transmission']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(records)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
# ============================================================================
|
|
# FASE 2: Cross-References and Aftermarket API Endpoints
|
|
# ============================================================================
|
|
|
|
@app.route('/api/manufacturers')
|
|
def api_manufacturers():
|
|
"""Get all manufacturers, optionally filtered by type"""
|
|
try:
|
|
manufacturer_type = request.args.get('type')
|
|
quality_tier = request.args.get('quality_tier')
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT id, name, type, quality_tier, country, logo_url, website
|
|
FROM manufacturers
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
if manufacturer_type:
|
|
query += " AND type = ?"
|
|
params.append(manufacturer_type)
|
|
|
|
if quality_tier:
|
|
query += " AND quality_tier = ?"
|
|
params.append(quality_tier)
|
|
|
|
query += " ORDER BY name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
manufacturers = []
|
|
for row in cursor.fetchall():
|
|
manufacturers.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'type': row['type'],
|
|
'quality_tier': row['quality_tier'],
|
|
'country': row['country'],
|
|
'logo_url': row['logo_url'],
|
|
'website': row['website']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(manufacturers)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/parts/<int:part_id>/alternatives')
|
|
def api_part_alternatives(part_id):
|
|
"""Get aftermarket alternatives for an OEM part"""
|
|
try:
|
|
quality_tier = request.args.get('quality_tier')
|
|
manufacturer_id = request.args.get('manufacturer_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
ap.id,
|
|
ap.part_number,
|
|
ap.name,
|
|
ap.name_es,
|
|
m.name AS manufacturer_name,
|
|
ap.manufacturer_id,
|
|
ap.quality_tier,
|
|
ap.price_usd,
|
|
ap.warranty_months,
|
|
ap.in_stock
|
|
FROM aftermarket_parts ap
|
|
JOIN manufacturers m ON ap.manufacturer_id = m.id
|
|
WHERE ap.oem_part_id = ?
|
|
"""
|
|
params = [part_id]
|
|
|
|
if quality_tier:
|
|
query += " AND ap.quality_tier = ?"
|
|
params.append(quality_tier)
|
|
|
|
if manufacturer_id:
|
|
query += " AND ap.manufacturer_id = ?"
|
|
params.append(manufacturer_id)
|
|
|
|
query += " ORDER BY ap.quality_tier DESC, ap.price_usd ASC"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
alternatives = []
|
|
for row in cursor.fetchall():
|
|
alternatives.append({
|
|
'id': row['id'],
|
|
'part_number': row['part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'manufacturer_name': row['manufacturer_name'],
|
|
'manufacturer_id': row['manufacturer_id'],
|
|
'quality_tier': row['quality_tier'],
|
|
'price_usd': row['price_usd'],
|
|
'warranty_months': row['warranty_months'],
|
|
'in_stock': bool(row['in_stock']) if row['in_stock'] is not None else None
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(alternatives)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/parts/<int:part_id>/cross-references')
|
|
def api_part_cross_references(part_id):
|
|
"""Get cross-reference numbers for a part"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, cross_reference_number, reference_type, source, notes
|
|
FROM part_cross_references
|
|
WHERE part_id = ?
|
|
ORDER BY reference_type, cross_reference_number
|
|
""", (part_id,))
|
|
|
|
cross_references = []
|
|
for row in cursor.fetchall():
|
|
cross_references.append({
|
|
'id': row['id'],
|
|
'cross_reference_number': row['cross_reference_number'],
|
|
'reference_type': row['reference_type'],
|
|
'source': row['source'],
|
|
'notes': row['notes']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(cross_references)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/search/part-number/<part_number>')
|
|
def api_search_part_number(part_number):
|
|
"""Search for parts by any part number (OEM, aftermarket, or cross-ref)"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
results = []
|
|
search_term = f"%{part_number}%"
|
|
|
|
# Search in OEM parts
|
|
cursor.execute("""
|
|
SELECT id, oem_part_number, name, name_es
|
|
FROM parts
|
|
WHERE oem_part_number LIKE ?
|
|
""", (search_term,))
|
|
|
|
for row in cursor.fetchall():
|
|
results.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'match_type': 'oem',
|
|
'matched_number': row['oem_part_number']
|
|
})
|
|
|
|
# Search in aftermarket parts
|
|
cursor.execute("""
|
|
SELECT p.id, p.oem_part_number, p.name, p.name_es, ap.part_number
|
|
FROM aftermarket_parts ap
|
|
JOIN parts p ON ap.oem_part_id = p.id
|
|
WHERE ap.part_number LIKE ?
|
|
""", (search_term,))
|
|
|
|
for row in cursor.fetchall():
|
|
results.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'match_type': 'aftermarket',
|
|
'matched_number': row['part_number']
|
|
})
|
|
|
|
# Search in cross-references
|
|
cursor.execute("""
|
|
SELECT p.id, p.oem_part_number, p.name, p.name_es, pcr.cross_reference_number
|
|
FROM part_cross_references pcr
|
|
JOIN parts p ON pcr.part_id = p.id
|
|
WHERE pcr.cross_reference_number LIKE ?
|
|
""", (search_term,))
|
|
|
|
for row in cursor.fetchall():
|
|
results.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'match_type': 'cross_reference',
|
|
'matched_number': row['cross_reference_number']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(results)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/aftermarket')
|
|
def api_aftermarket_parts():
|
|
"""List aftermarket parts with filters and pagination"""
|
|
try:
|
|
manufacturer_id = request.args.get('manufacturer_id', type=int)
|
|
quality_tier = request.args.get('quality_tier')
|
|
search = request.args.get('search')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
per_page = min(per_page, 100) # Max 100 per page
|
|
offset = (page - 1) * per_page
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Build base WHERE clause for both count and data queries
|
|
where_clause = " WHERE 1=1"
|
|
params = []
|
|
|
|
if manufacturer_id:
|
|
where_clause += " AND ap.manufacturer_id = ?"
|
|
params.append(manufacturer_id)
|
|
|
|
if quality_tier:
|
|
where_clause += " AND ap.quality_tier = ?"
|
|
params.append(quality_tier)
|
|
|
|
if search:
|
|
where_clause += " AND (ap.name LIKE ? OR ap.part_number LIKE ? OR p.oem_part_number LIKE ?)"
|
|
search_term = f"%{search}%"
|
|
params.extend([search_term, search_term, search_term])
|
|
|
|
# Get total count
|
|
count_query = """
|
|
SELECT COUNT(*) as total
|
|
FROM aftermarket_parts ap
|
|
JOIN parts p ON ap.oem_part_id = p.id
|
|
JOIN manufacturers m ON ap.manufacturer_id = m.id
|
|
""" + where_clause
|
|
cursor.execute(count_query, params)
|
|
total_count = cursor.fetchone()['total']
|
|
|
|
# Get paginated data
|
|
data_query = """
|
|
SELECT
|
|
ap.id,
|
|
ap.part_number,
|
|
ap.name,
|
|
p.oem_part_number,
|
|
m.name AS manufacturer_name,
|
|
ap.quality_tier,
|
|
ap.price_usd
|
|
FROM aftermarket_parts ap
|
|
JOIN parts p ON ap.oem_part_id = p.id
|
|
JOIN manufacturers m ON ap.manufacturer_id = m.id
|
|
""" + where_clause + " ORDER BY ap.name LIMIT ? OFFSET ?"
|
|
params.extend([per_page, offset])
|
|
|
|
cursor.execute(data_query, params)
|
|
|
|
parts = []
|
|
for row in cursor.fetchall():
|
|
parts.append({
|
|
'id': row['id'],
|
|
'part_number': row['part_number'],
|
|
'name': row['name'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'manufacturer_name': row['manufacturer_name'],
|
|
'quality_tier': row['quality_tier'],
|
|
'price_usd': row['price_usd']
|
|
})
|
|
|
|
conn.close()
|
|
|
|
total_pages = (total_count + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': parts,
|
|
'pagination': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_count,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# FASE 3: Exploded Diagrams API Endpoints
|
|
# ============================================================================
|
|
|
|
@app.route('/api/diagrams')
|
|
def api_diagrams():
|
|
"""Get all diagrams, optionally filtered by group_id"""
|
|
try:
|
|
group_id = request.args.get('group_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
d.id,
|
|
d.name,
|
|
d.name_es,
|
|
d.group_id,
|
|
pg.name AS group_name,
|
|
d.thumbnail_path,
|
|
d.display_order
|
|
FROM diagrams d
|
|
JOIN part_groups pg ON d.group_id = pg.id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
if group_id:
|
|
query += " AND d.group_id = ?"
|
|
params.append(group_id)
|
|
|
|
query += " ORDER BY d.display_order, d.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
diagrams = []
|
|
for row in cursor.fetchall():
|
|
diagrams.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'group_id': row['group_id'],
|
|
'group_name': row['group_name'],
|
|
'thumbnail_path': row['thumbnail_path'],
|
|
'display_order': row['display_order']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(diagrams)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/diagrams/<int:diagram_id>')
|
|
def api_diagram_detail(diagram_id):
|
|
"""Get diagram details including SVG content and hotspots"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Get diagram details
|
|
cursor.execute("""
|
|
SELECT
|
|
d.id,
|
|
d.name,
|
|
d.name_es,
|
|
d.group_id,
|
|
pg.name AS group_name,
|
|
d.image_path,
|
|
d.svg_content,
|
|
d.width,
|
|
d.height
|
|
FROM diagrams d
|
|
JOIN part_groups pg ON d.group_id = pg.id
|
|
WHERE d.id = ?
|
|
""", (diagram_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row is None:
|
|
conn.close()
|
|
return jsonify({'error': 'Diagram not found'}), 404
|
|
|
|
diagram = {
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'group_id': row['group_id'],
|
|
'group_name': row['group_name'],
|
|
'image_path': row['image_path'],
|
|
'svg_content': row['svg_content'],
|
|
'width': row['width'],
|
|
'height': row['height'],
|
|
'hotspots': []
|
|
}
|
|
|
|
# Get hotspots with part info
|
|
cursor.execute("""
|
|
SELECT
|
|
h.id,
|
|
h.part_id,
|
|
h.callout_number,
|
|
h.label,
|
|
h.shape,
|
|
h.coords,
|
|
h.color,
|
|
p.name AS part_name,
|
|
p.oem_part_number AS part_number
|
|
FROM diagram_hotspots h
|
|
LEFT JOIN parts p ON h.part_id = p.id
|
|
WHERE h.diagram_id = ?
|
|
ORDER BY h.callout_number
|
|
""", (diagram_id,))
|
|
|
|
for hotspot_row in cursor.fetchall():
|
|
diagram['hotspots'].append({
|
|
'id': hotspot_row['id'],
|
|
'part_id': hotspot_row['part_id'],
|
|
'callout_number': hotspot_row['callout_number'],
|
|
'label': hotspot_row['label'],
|
|
'shape': hotspot_row['shape'],
|
|
'coords': hotspot_row['coords'],
|
|
'color': hotspot_row['color'],
|
|
'part_name': hotspot_row['part_name'],
|
|
'part_number': hotspot_row['part_number']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(diagram)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/groups/<int:group_id>/diagrams')
|
|
def api_group_diagrams(group_id):
|
|
"""Get all diagrams for a specific part group"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT
|
|
id,
|
|
name,
|
|
name_es,
|
|
thumbnail_path,
|
|
display_order
|
|
FROM diagrams
|
|
WHERE group_id = ?
|
|
ORDER BY display_order, name
|
|
""", (group_id,))
|
|
|
|
diagrams = []
|
|
for row in cursor.fetchall():
|
|
diagrams.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'thumbnail_path': row['thumbnail_path'],
|
|
'display_order': row['display_order']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(diagrams)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/vehicles/<int:mye_id>/diagrams')
|
|
def api_vehicle_diagrams(mye_id):
|
|
"""Get diagrams available for a specific vehicle configuration"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT DISTINCT
|
|
d.id,
|
|
d.name,
|
|
d.name_es,
|
|
d.group_id,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name,
|
|
d.thumbnail_path,
|
|
vd.notes
|
|
FROM vehicle_diagrams vd
|
|
JOIN diagrams d ON vd.diagram_id = d.id
|
|
JOIN part_groups pg ON d.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE vd.model_year_engine_id = ?
|
|
ORDER BY pc.display_order, pg.display_order, d.display_order
|
|
""", (mye_id,))
|
|
|
|
diagrams = []
|
|
for row in cursor.fetchall():
|
|
diagrams.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'group_id': row['group_id'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'thumbnail_path': row['thumbnail_path'],
|
|
'notes': row['notes']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(diagrams)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/hotspots/<int:hotspot_id>')
|
|
def api_hotspot_detail(hotspot_id):
|
|
"""Get hotspot details including linked part info"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT
|
|
h.id,
|
|
h.diagram_id,
|
|
h.part_id,
|
|
h.callout_number,
|
|
h.label,
|
|
h.shape,
|
|
h.coords,
|
|
h.color
|
|
FROM diagram_hotspots h
|
|
WHERE h.id = ?
|
|
""", (hotspot_id,))
|
|
|
|
row = cursor.fetchone()
|
|
|
|
if row is None:
|
|
conn.close()
|
|
return jsonify({'error': 'Hotspot not found'}), 404
|
|
|
|
hotspot = {
|
|
'id': row['id'],
|
|
'diagram_id': row['diagram_id'],
|
|
'part_id': row['part_id'],
|
|
'callout_number': row['callout_number'],
|
|
'label': row['label'],
|
|
'shape': row['shape'],
|
|
'coords': row['coords'],
|
|
'color': row['color'],
|
|
'part': None
|
|
}
|
|
|
|
# Get linked part info if part_id exists
|
|
if row['part_id']:
|
|
cursor.execute("""
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE p.id = ?
|
|
""", (row['part_id'],))
|
|
|
|
part_row = cursor.fetchone()
|
|
if part_row:
|
|
hotspot['part'] = {
|
|
'id': part_row['id'],
|
|
'oem_part_number': part_row['oem_part_number'],
|
|
'name': part_row['name'],
|
|
'name_es': part_row['name_es'],
|
|
'group_name': part_row['group_name'],
|
|
'category_name': part_row['category_name']
|
|
}
|
|
|
|
conn.close()
|
|
return jsonify(hotspot)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# FASE 4: Full-Text Search and VIN Decoder API Endpoints
|
|
# ============================================================================
|
|
|
|
import urllib.request
|
|
import json as json_module
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
|
|
def validate_vin(vin):
|
|
"""Validate VIN format: 17 alphanumeric characters, no I, O, Q"""
|
|
if not vin or len(vin) != 17:
|
|
return False
|
|
# VIN can only contain alphanumeric characters except I, O, Q
|
|
valid_pattern = re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE)
|
|
return bool(valid_pattern.match(vin))
|
|
|
|
|
|
def find_vehicle_in_terms(cursor, terms):
|
|
"""
|
|
Try to find a vehicle match in the search terms.
|
|
Returns (matched_vehicle, remaining_terms) or (None, terms) if no match.
|
|
|
|
Strategy: Try different combinations of terms to find a vehicle match.
|
|
- First try all terms that could be vehicle-related (brand, model, year)
|
|
- Return the best match and remaining terms for part search
|
|
"""
|
|
if len(terms) < 2:
|
|
return None, terms
|
|
|
|
# Identify potential year terms (4-digit numbers between 1980-2030)
|
|
year_terms = []
|
|
other_terms = []
|
|
for term in terms:
|
|
if term.isdigit() and 1980 <= int(term) <= 2030:
|
|
year_terms.append(term)
|
|
else:
|
|
other_terms.append(term)
|
|
|
|
# Try to find a vehicle with the non-year terms + year
|
|
# We need at least one non-year term and preferably a year
|
|
if not other_terms:
|
|
return None, terms
|
|
|
|
# Build query to find matching vehicle
|
|
# Try with different combinations
|
|
best_match = None
|
|
used_terms = []
|
|
|
|
for num_terms in range(min(3, len(other_terms)), 0, -1):
|
|
if best_match:
|
|
break
|
|
|
|
# Try combinations of other_terms
|
|
for i in range(len(other_terms) - num_terms + 1):
|
|
test_terms = other_terms[i:i + num_terms]
|
|
if year_terms:
|
|
test_terms = test_terms + year_terms[:1] # Add first year
|
|
|
|
where_clauses = []
|
|
params = []
|
|
|
|
for term in test_terms:
|
|
term_pattern = f"%{term}%"
|
|
where_clauses.append(
|
|
"(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ?)"
|
|
)
|
|
params.extend([term_pattern, term_pattern, term_pattern])
|
|
|
|
where_sql = " AND ".join(where_clauses)
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
mye.id,
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
WHERE {where_sql}
|
|
ORDER BY y.year DESC
|
|
LIMIT 1
|
|
""", params)
|
|
|
|
row = cursor.fetchone()
|
|
if row:
|
|
best_match = {
|
|
'id': row['id'],
|
|
'brand': row['brand'],
|
|
'model': row['model'],
|
|
'year': row['year'],
|
|
'engine': row['engine']
|
|
}
|
|
used_terms = test_terms
|
|
break
|
|
|
|
if best_match:
|
|
# Calculate remaining terms (terms not used for vehicle match)
|
|
remaining = []
|
|
used_lower = [t.lower() for t in used_terms]
|
|
for term in terms:
|
|
if term.lower() not in used_lower:
|
|
remaining.append(term)
|
|
else:
|
|
# Remove from used_lower to handle duplicates
|
|
used_lower.remove(term.lower())
|
|
|
|
return best_match, remaining
|
|
|
|
return None, terms
|
|
|
|
|
|
@app.route('/api/search')
|
|
def api_search():
|
|
"""Unified search across parts, cross-references, and aftermarket"""
|
|
try:
|
|
q = request.args.get('q', '').strip()
|
|
search_type = request.args.get('type', 'all')
|
|
category_id = request.args.get('category_id', type=int)
|
|
limit = request.args.get('limit', 50, type=int)
|
|
offset = request.args.get('offset', 0, type=int)
|
|
|
|
if not q:
|
|
return jsonify({'error': 'Search query is required'}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
results = {
|
|
'parts': [],
|
|
'vehicles': [],
|
|
'vehicle_parts': [], # Parts for a specific vehicle
|
|
'matched_vehicle': None, # Vehicle matched in combined search
|
|
'total_count': 0
|
|
}
|
|
|
|
terms = q.split()
|
|
|
|
# Try to detect combined vehicle + part search
|
|
# Look for patterns like "aveo 2024 balata" or "camry brake pad"
|
|
if len(terms) >= 2 and search_type == 'all':
|
|
matched_vehicle, remaining_terms = find_vehicle_in_terms(cursor, terms)
|
|
|
|
if matched_vehicle and remaining_terms:
|
|
results['matched_vehicle'] = matched_vehicle
|
|
|
|
# Search for parts compatible with this vehicle (all engine variants)
|
|
part_terms = remaining_terms
|
|
if part_terms:
|
|
vp_where_clauses = []
|
|
# Search across ALL MYE IDs for same brand/model/year (all engine variants)
|
|
vp_params = [matched_vehicle['brand'], matched_vehicle['model'], matched_vehicle['year']]
|
|
|
|
for term in part_terms:
|
|
term_pattern = f"%{term}%"
|
|
vp_where_clauses.append(
|
|
"(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ?)"
|
|
)
|
|
vp_params.extend([term_pattern, term_pattern, term_pattern, term_pattern])
|
|
|
|
vp_where_sql = " AND ".join(vp_where_clauses)
|
|
vp_params.extend([limit])
|
|
|
|
cursor.execute(f"""
|
|
SELECT DISTINCT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pg.id AS group_id,
|
|
pc.name AS category_name,
|
|
pc.id AS category_id,
|
|
vp.quantity_required,
|
|
vp.position
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON vp.part_id = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE vp.model_year_engine_id IN (
|
|
SELECT mye.id FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
WHERE UPPER(b.name) = UPPER(?) AND UPPER(m.name) = UPPER(?) AND y.year = ?
|
|
) AND ({vp_where_sql})
|
|
ORDER BY p.name
|
|
LIMIT ?
|
|
""", vp_params)
|
|
|
|
for row in cursor.fetchall():
|
|
results['vehicle_parts'].append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'image_url': row['image_url'],
|
|
'group_name': row['group_name'],
|
|
'group_id': row['group_id'],
|
|
'category_name': row['category_name'],
|
|
'category_id': row['category_id'],
|
|
'quantity': row['quantity_required'],
|
|
'position': row['position'],
|
|
'match_type': 'vehicle_part'
|
|
})
|
|
|
|
# If we found vehicle parts, return early with combined results
|
|
if results['vehicle_parts']:
|
|
results['total_count'] = len(results['vehicle_parts'])
|
|
conn.close()
|
|
return jsonify(results)
|
|
|
|
# Search parts
|
|
if search_type in ('parts', 'all'):
|
|
# Split query into terms for multi-word search
|
|
terms = q.split()
|
|
|
|
# Build category filter
|
|
category_filter = ""
|
|
category_params = []
|
|
if category_id:
|
|
category_filter = " AND pc.id = ?"
|
|
category_params = [category_id]
|
|
|
|
if terms:
|
|
# Build WHERE clause: each term must match at least one field
|
|
where_clauses = []
|
|
params = []
|
|
|
|
for term in terms:
|
|
term_pattern = f"%{term}%"
|
|
where_clauses.append(
|
|
"(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ? OR pc.name LIKE ?)"
|
|
)
|
|
params.extend([term_pattern, term_pattern, term_pattern, term_pattern, term_pattern])
|
|
|
|
where_sql = " AND ".join(where_clauses)
|
|
|
|
# For ordering, use the first term
|
|
first_term = terms[0]
|
|
first_term_pattern = f"{first_term}%"
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE ({where_sql}){category_filter}
|
|
ORDER BY
|
|
CASE
|
|
WHEN p.oem_part_number LIKE ? THEN 1
|
|
WHEN p.name LIKE ? THEN 2
|
|
ELSE 3
|
|
END,
|
|
p.name
|
|
LIMIT ? OFFSET ?
|
|
""", params + category_params + [first_term_pattern, first_term_pattern, limit, offset])
|
|
|
|
for row in cursor.fetchall():
|
|
results['parts'].append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'image_url': row['image_url'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'match_type': 'oem'
|
|
})
|
|
|
|
# Also search in aftermarket parts (only if no category filter)
|
|
if not category_id and terms:
|
|
# Build WHERE for aftermarket: match part numbers
|
|
af_where_clauses = []
|
|
af_params = []
|
|
for term in terms:
|
|
af_where_clauses.append("ap.part_number LIKE ?")
|
|
af_params.append(f"%{term}%")
|
|
|
|
af_where_sql = " AND ".join(af_where_clauses)
|
|
af_params.extend([limit, offset])
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name,
|
|
ap.part_number AS matched_number
|
|
FROM aftermarket_parts ap
|
|
JOIN parts p ON ap.oem_part_id = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE {af_where_sql}
|
|
LIMIT ? OFFSET ?
|
|
""", af_params)
|
|
|
|
for row in cursor.fetchall():
|
|
# Avoid duplicates
|
|
if not any(p['id'] == row['id'] for p in results['parts']):
|
|
results['parts'].append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'image_url': row['image_url'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'matched_number': row['matched_number'],
|
|
'match_type': 'aftermarket'
|
|
})
|
|
|
|
# Search in cross-references
|
|
if terms:
|
|
cr_where_clauses = []
|
|
cr_params = []
|
|
for term in terms:
|
|
cr_where_clauses.append("pcr.cross_reference_number LIKE ?")
|
|
cr_params.append(f"%{term}%")
|
|
|
|
cr_where_sql = " AND ".join(cr_where_clauses)
|
|
cr_params.extend([limit, offset])
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.image_url,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name,
|
|
pcr.cross_reference_number AS matched_number
|
|
FROM part_cross_references pcr
|
|
JOIN parts p ON pcr.part_id = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE {cr_where_sql}
|
|
LIMIT ? OFFSET ?
|
|
""", cr_params)
|
|
|
|
for row in cursor.fetchall():
|
|
# Avoid duplicates
|
|
if not any(p['id'] == row['id'] for p in results['parts']):
|
|
results['parts'].append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'image_url': row['image_url'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'matched_number': row['matched_number'],
|
|
'match_type': 'cross_reference'
|
|
})
|
|
|
|
# Search vehicles
|
|
if search_type in ('vehicles', 'all'):
|
|
# Split query into terms for multi-word search
|
|
terms = q.split()
|
|
|
|
if terms:
|
|
# Build WHERE clause: each term must match at least one field
|
|
where_clauses = []
|
|
params = []
|
|
|
|
for term in terms:
|
|
term_pattern = f"%{term}%"
|
|
# Each term can match brand, model, year, or engine
|
|
where_clauses.append(
|
|
"(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ? OR e.name LIKE ?)"
|
|
)
|
|
params.extend([term_pattern, term_pattern, term_pattern, term_pattern])
|
|
|
|
# All terms must match (AND between terms)
|
|
where_sql = " AND ".join(where_clauses)
|
|
params.extend([limit, offset])
|
|
|
|
cursor.execute(f"""
|
|
SELECT
|
|
mye.id,
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
WHERE {where_sql}
|
|
ORDER BY y.year DESC, b.name, m.name
|
|
LIMIT ? OFFSET ?
|
|
""", params)
|
|
|
|
for row in cursor.fetchall():
|
|
results['vehicles'].append({
|
|
'id': row['id'],
|
|
'brand': row['brand'],
|
|
'model': row['model'],
|
|
'year': row['year'],
|
|
'engine': row['engine']
|
|
})
|
|
|
|
results['total_count'] = len(results['parts']) + len(results['vehicles'])
|
|
conn.close()
|
|
return jsonify(results)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/search/parts')
|
|
def api_search_parts():
|
|
"""Full-text search in parts catalog with pagination"""
|
|
try:
|
|
q = request.args.get('q', '').strip()
|
|
category_id = request.args.get('category_id', type=int)
|
|
group_id = request.args.get('group_id', type=int)
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
per_page = min(per_page, 100) # Max 100 per page
|
|
offset = (page - 1) * per_page
|
|
|
|
if not q:
|
|
return jsonify({'error': 'Search query is required'}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if FTS5 table exists
|
|
cursor.execute("""
|
|
SELECT name FROM sqlite_master
|
|
WHERE type='table' AND name='parts_fts'
|
|
""")
|
|
fts_exists = cursor.fetchone() is not None
|
|
|
|
parts = []
|
|
total_count = 0
|
|
|
|
if fts_exists:
|
|
# Use FTS5 for full-text search
|
|
# Escape special FTS5 characters (-, *, ^, etc.) by quoting each term
|
|
terms = q.split()
|
|
quoted_terms = []
|
|
for term in terms:
|
|
# Escape double quotes and wrap in quotes to prevent FTS operators
|
|
escaped_term = term.replace('"', '""')
|
|
quoted_terms.append(f'"{escaped_term}"')
|
|
fts_query = ' '.join(quoted_terms)
|
|
|
|
# Build filter conditions
|
|
filter_clause = ""
|
|
filter_params = []
|
|
|
|
if category_id:
|
|
filter_clause += " AND pg.category_id = ?"
|
|
filter_params.append(category_id)
|
|
|
|
if group_id:
|
|
filter_clause += " AND p.group_id = ?"
|
|
filter_params.append(group_id)
|
|
|
|
# Get total count for FTS search
|
|
count_query = """
|
|
SELECT COUNT(*) as total
|
|
FROM parts_fts
|
|
JOIN parts p ON parts_fts.rowid = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE parts_fts MATCH ?
|
|
""" + filter_clause
|
|
cursor.execute(count_query, [fts_query] + filter_params)
|
|
total_count = cursor.fetchone()['total']
|
|
|
|
# Get paginated data
|
|
data_query = """
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.description,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name,
|
|
bm25(parts_fts) AS rank
|
|
FROM parts_fts
|
|
JOIN parts p ON parts_fts.rowid = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE parts_fts MATCH ?
|
|
""" + filter_clause + " ORDER BY rank LIMIT ? OFFSET ?"
|
|
|
|
cursor.execute(data_query, [fts_query] + filter_params + [per_page, offset])
|
|
|
|
for row in cursor.fetchall():
|
|
parts.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'description': row['description'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'rank': row['rank']
|
|
})
|
|
else:
|
|
# Fallback to LIKE search if FTS5 table doesn't exist
|
|
search_term = f"%{q}%"
|
|
|
|
# Build filter conditions
|
|
filter_clause = ""
|
|
filter_params = []
|
|
|
|
if category_id:
|
|
filter_clause += " AND pg.category_id = ?"
|
|
filter_params.append(category_id)
|
|
|
|
if group_id:
|
|
filter_clause += " AND p.group_id = ?"
|
|
filter_params.append(group_id)
|
|
|
|
# Get total count for LIKE search
|
|
count_query = """
|
|
SELECT COUNT(*) as total
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
|
|
OR p.description LIKE ?)
|
|
""" + filter_clause
|
|
cursor.execute(count_query, [search_term, search_term, search_term, search_term] + filter_params)
|
|
total_count = cursor.fetchone()['total']
|
|
|
|
# Get paginated data
|
|
data_query = """
|
|
SELECT
|
|
p.id,
|
|
p.oem_part_number,
|
|
p.name,
|
|
p.name_es,
|
|
p.description,
|
|
pg.name AS group_name,
|
|
pc.name AS category_name
|
|
FROM parts p
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
|
|
OR p.description LIKE ?)
|
|
""" + filter_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
|
|
|
|
cursor.execute(data_query, [search_term, search_term, search_term, search_term] + filter_params + [per_page, offset])
|
|
|
|
for row in cursor.fetchall():
|
|
parts.append({
|
|
'id': row['id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'description': row['description'],
|
|
'group_name': row['group_name'],
|
|
'category_name': row['category_name'],
|
|
'rank': 0
|
|
})
|
|
|
|
conn.close()
|
|
|
|
total_pages = (total_count + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': parts,
|
|
'pagination': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_count,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/vin/decode/<vin>')
|
|
def api_vin_decode(vin):
|
|
"""Decode a VIN using NHTSA API with caching"""
|
|
try:
|
|
vin = vin.upper().strip()
|
|
|
|
# Validate VIN format
|
|
if not validate_vin(vin):
|
|
return jsonify({
|
|
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
|
|
}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if vin_cache table exists
|
|
cursor.execute("""
|
|
SELECT name FROM sqlite_master
|
|
WHERE type='table' AND name='vin_cache'
|
|
""")
|
|
cache_exists = cursor.fetchone() is not None
|
|
|
|
cached_data = None
|
|
if cache_exists:
|
|
# Check for cached VIN data that hasn't expired
|
|
cursor.execute("""
|
|
SELECT
|
|
vin, make, model, year, engine_info,
|
|
body_class, drive_type,
|
|
model_year_engine_id, created_at, expires_at
|
|
FROM vin_cache
|
|
WHERE vin = ? AND expires_at > datetime('now')
|
|
""", (vin,))
|
|
cached_row = cursor.fetchone()
|
|
|
|
if cached_row:
|
|
# Parse engine_info JSON if it exists
|
|
engine_info_data = {}
|
|
if cached_row['engine_info']:
|
|
try:
|
|
engine_info_data = json_module.loads(cached_row['engine_info'])
|
|
except:
|
|
engine_info_data = {'raw': cached_row['engine_info']}
|
|
|
|
cached_data = {
|
|
'vin': cached_row['vin'],
|
|
'make': cached_row['make'],
|
|
'model': cached_row['model'],
|
|
'year': cached_row['year'],
|
|
'engine_info': engine_info_data,
|
|
'body_class': cached_row['body_class'],
|
|
'drive_type': cached_row['drive_type'],
|
|
'matched_vehicle': None,
|
|
'cached': True
|
|
}
|
|
|
|
# Get matched vehicle info if available
|
|
if cached_row['model_year_engine_id']:
|
|
cursor.execute("""
|
|
SELECT
|
|
mye.id,
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
WHERE mye.id = ?
|
|
""", (cached_row['model_year_engine_id'],))
|
|
mye_row = cursor.fetchone()
|
|
if mye_row:
|
|
cached_data['matched_vehicle'] = {
|
|
'mye_id': mye_row['id'],
|
|
'brand': mye_row['brand'],
|
|
'model': mye_row['model'],
|
|
'year': mye_row['year'],
|
|
'engine': mye_row['engine']
|
|
}
|
|
|
|
conn.close()
|
|
return jsonify(cached_data)
|
|
|
|
# Call NHTSA API
|
|
nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json'
|
|
|
|
try:
|
|
req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'AutopartesDB/1.0'})
|
|
with urllib.request.urlopen(req, timeout=10) as response:
|
|
nhtsa_data = json_module.loads(response.read().decode('utf-8'))
|
|
except urllib.error.URLError as e:
|
|
conn.close()
|
|
return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503
|
|
except urllib.error.HTTPError as e:
|
|
conn.close()
|
|
return jsonify({'error': f'NHTSA API error: {e.code}'}), 502
|
|
except Exception as e:
|
|
conn.close()
|
|
return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500
|
|
|
|
# Parse NHTSA response
|
|
results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])}
|
|
|
|
# Extract relevant fields
|
|
make = results.get('Make', '')
|
|
model = results.get('Model', '')
|
|
year_str = results.get('ModelYear', '')
|
|
year = int(year_str) if year_str and year_str.isdigit() else None
|
|
engine_config = results.get('EngineConfiguration', '')
|
|
cylinders_str = results.get('EngineCylinders', '')
|
|
cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None
|
|
displacement_str = results.get('DisplacementL', '')
|
|
displacement_l = float(displacement_str) if displacement_str else None
|
|
fuel_type = results.get('FuelTypePrimary', '')
|
|
body_class = results.get('BodyClass', '')
|
|
drive_type = results.get('DriveType', '')
|
|
|
|
# Try to match to model_year_engine record
|
|
matched_mye_id = None
|
|
matched_vehicle = None
|
|
|
|
if make and model and year:
|
|
cursor.execute("""
|
|
SELECT
|
|
mye.id,
|
|
b.name AS brand,
|
|
m.name AS model,
|
|
y.year,
|
|
e.name AS engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
WHERE UPPER(b.name) = UPPER(?)
|
|
AND UPPER(m.name) = UPPER(?)
|
|
AND y.year = ?
|
|
LIMIT 1
|
|
""", (make, model, year))
|
|
|
|
mye_row = cursor.fetchone()
|
|
if mye_row:
|
|
matched_mye_id = mye_row['id']
|
|
matched_vehicle = {
|
|
'mye_id': mye_row['id'],
|
|
'brand': mye_row['brand'],
|
|
'model': mye_row['model'],
|
|
'year': mye_row['year'],
|
|
'engine': mye_row['engine']
|
|
}
|
|
|
|
# Store in cache with 30-day expiry
|
|
if cache_exists:
|
|
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
|
|
# Combine engine info into JSON
|
|
engine_info = json_module.dumps({
|
|
'configuration': engine_config,
|
|
'cylinders': cylinders,
|
|
'displacement_l': displacement_l,
|
|
'fuel_type': fuel_type
|
|
})
|
|
cursor.execute("""
|
|
INSERT OR REPLACE INTO vin_cache
|
|
(vin, decoded_data, make, model, year, engine_info,
|
|
body_class, drive_type, model_year_engine_id, expires_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (vin, json_module.dumps(results), make, model, year, engine_info,
|
|
body_class, drive_type, matched_mye_id, expires_at))
|
|
conn.commit()
|
|
|
|
result = {
|
|
'vin': vin,
|
|
'make': make,
|
|
'model': model,
|
|
'year': year,
|
|
'engine_info': {
|
|
'configuration': engine_config,
|
|
'cylinders': cylinders,
|
|
'displacement_l': displacement_l,
|
|
'fuel_type': fuel_type
|
|
},
|
|
'body_class': body_class,
|
|
'drive_type': drive_type,
|
|
'matched_vehicle': matched_vehicle,
|
|
'cached': False
|
|
}
|
|
|
|
conn.close()
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/vin/<vin>/parts')
|
|
def api_vin_parts(vin):
|
|
"""Get parts for a decoded VIN"""
|
|
try:
|
|
vin = vin.upper().strip()
|
|
|
|
if not validate_vin(vin):
|
|
return jsonify({
|
|
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
|
|
}), 400
|
|
|
|
category_id = request.args.get('category_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if vin_cache table exists
|
|
cursor.execute("""
|
|
SELECT name FROM sqlite_master
|
|
WHERE type='table' AND name='vin_cache'
|
|
""")
|
|
cache_exists = cursor.fetchone() is not None
|
|
|
|
if not cache_exists:
|
|
conn.close()
|
|
return jsonify({
|
|
'error': 'VIN cache not available. Please decode the VIN first.'
|
|
}), 400
|
|
|
|
# Look up VIN in cache
|
|
cursor.execute("""
|
|
SELECT
|
|
vin, make, model, year, model_year_engine_id
|
|
FROM vin_cache
|
|
WHERE vin = ?
|
|
""", (vin,))
|
|
cached_row = cursor.fetchone()
|
|
|
|
if not cached_row:
|
|
conn.close()
|
|
return jsonify({
|
|
'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/<vin>'
|
|
}), 404
|
|
|
|
mye_id = cached_row['model_year_engine_id']
|
|
|
|
vehicle_info = {
|
|
'vin': cached_row['vin'],
|
|
'make': cached_row['make'],
|
|
'model': cached_row['model'],
|
|
'year': cached_row['year'],
|
|
'mye_id': mye_id
|
|
}
|
|
|
|
if not mye_id:
|
|
conn.close()
|
|
return jsonify({
|
|
'vin': vin,
|
|
'vehicle_info': vehicle_info,
|
|
'categories': [],
|
|
'message': 'No matching vehicle configuration found in database. Use /api/vin/<vin>/match to manually link.'
|
|
})
|
|
|
|
# Get parts for this vehicle grouped by category
|
|
query = """
|
|
SELECT
|
|
pc.id AS category_id,
|
|
pc.name AS category_name,
|
|
pc.name_es AS category_name_es,
|
|
p.id AS part_id,
|
|
p.oem_part_number,
|
|
p.name AS part_name,
|
|
p.name_es AS part_name_es,
|
|
pg.name AS group_name,
|
|
vp.quantity_required,
|
|
vp.position
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON vp.part_id = p.id
|
|
JOIN part_groups pg ON p.group_id = pg.id
|
|
JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE vp.model_year_engine_id = ?
|
|
"""
|
|
params = [mye_id]
|
|
|
|
if category_id:
|
|
query += " AND pc.id = ?"
|
|
params.append(category_id)
|
|
|
|
query += " ORDER BY pc.display_order, pg.display_order, p.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
# Group parts by category
|
|
categories_dict = {}
|
|
for row in cursor.fetchall():
|
|
cat_id = row['category_id']
|
|
if cat_id not in categories_dict:
|
|
categories_dict[cat_id] = {
|
|
'id': cat_id,
|
|
'name': row['category_name'],
|
|
'name_es': row['category_name_es'],
|
|
'parts': []
|
|
}
|
|
|
|
categories_dict[cat_id]['parts'].append({
|
|
'id': row['part_id'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'name': row['part_name'],
|
|
'name_es': row['part_name_es'],
|
|
'group_name': row['group_name'],
|
|
'quantity_required': row['quantity_required'],
|
|
'position': row['position']
|
|
})
|
|
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'vin': vin,
|
|
'vehicle_info': vehicle_info,
|
|
'categories': list(categories_dict.values())
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/vin/<vin>/match')
|
|
def api_vin_match(vin):
|
|
"""Manually match a VIN to a vehicle configuration"""
|
|
try:
|
|
vin = vin.upper().strip()
|
|
|
|
if not validate_vin(vin):
|
|
return jsonify({
|
|
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
|
|
}), 400
|
|
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id parameter is required'}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Check if vin_cache table exists
|
|
cursor.execute("""
|
|
SELECT name FROM sqlite_master
|
|
WHERE type='table' AND name='vin_cache'
|
|
""")
|
|
cache_exists = cursor.fetchone() is not None
|
|
|
|
if not cache_exists:
|
|
conn.close()
|
|
return jsonify({
|
|
'error': 'VIN cache table not available.'
|
|
}), 400
|
|
|
|
# Verify the mye_id exists
|
|
cursor.execute("""
|
|
SELECT id FROM model_year_engine WHERE id = ?
|
|
""", (mye_id,))
|
|
if not cursor.fetchone():
|
|
conn.close()
|
|
return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404
|
|
|
|
# Check if VIN exists in cache
|
|
cursor.execute("""
|
|
SELECT vin FROM vin_cache WHERE vin = ?
|
|
""", (vin,))
|
|
vin_exists = cursor.fetchone() is not None
|
|
|
|
if vin_exists:
|
|
# Update existing cache entry
|
|
cursor.execute("""
|
|
UPDATE vin_cache
|
|
SET model_year_engine_id = ?
|
|
WHERE vin = ?
|
|
""", (mye_id, vin))
|
|
else:
|
|
# Create new cache entry with minimal info
|
|
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
|
|
cursor.execute("""
|
|
INSERT INTO vin_cache
|
|
(vin, model_year_engine_id, cached_at, expires_at)
|
|
VALUES (?, ?, datetime('now'), ?)
|
|
""", (vin, mye_id, expires_at))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'vin': vin,
|
|
'mye_id': mye_id
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# ADMIN API ENDPOINTS - CRUD Operations
|
|
# ============================================================================
|
|
|
|
@app.route('/api/admin/stats')
|
|
def api_admin_stats():
|
|
"""Get statistics for admin dashboard"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
stats = {}
|
|
|
|
# Count categories
|
|
cursor.execute("SELECT COUNT(*) FROM part_categories")
|
|
stats['categories'] = cursor.fetchone()[0]
|
|
|
|
# Count groups
|
|
cursor.execute("SELECT COUNT(*) FROM part_groups")
|
|
stats['groups'] = cursor.fetchone()[0]
|
|
|
|
# Count parts
|
|
cursor.execute("SELECT COUNT(*) FROM parts")
|
|
stats['parts'] = cursor.fetchone()[0]
|
|
|
|
# Count aftermarket parts
|
|
cursor.execute("SELECT COUNT(*) FROM aftermarket_parts")
|
|
stats['aftermarket'] = cursor.fetchone()[0]
|
|
|
|
# Count manufacturers
|
|
cursor.execute("SELECT COUNT(*) FROM manufacturers")
|
|
stats['manufacturers'] = cursor.fetchone()[0]
|
|
|
|
# Count fitments
|
|
cursor.execute("SELECT COUNT(*) FROM vehicle_parts")
|
|
stats['fitment'] = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
return jsonify(stats)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Categories CRUD ----
|
|
|
|
@app.route('/api/admin/categories', methods=['POST'])
|
|
def api_admin_create_category():
|
|
"""Create a new category"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO part_categories (name, name_es, slug, icon_name, display_order, parent_id)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data.get('slug') or data['name'].lower().replace(' ', '-'),
|
|
data.get('icon_name'),
|
|
data.get('display_order', 0),
|
|
data.get('parent_id')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Category created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/categories/<int:category_id>', methods=['PUT'])
|
|
def api_admin_update_category(category_id):
|
|
"""Update a category"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE part_categories
|
|
SET name = ?, name_es = ?, slug = ?, icon_name = ?, display_order = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data.get('slug'),
|
|
data.get('icon_name'),
|
|
data.get('display_order', 0),
|
|
category_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Category updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/categories/<int:category_id>', methods=['DELETE'])
|
|
def api_admin_delete_category(category_id):
|
|
"""Delete a category"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM part_categories WHERE id = ?", (category_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Category deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Groups CRUD ----
|
|
|
|
@app.route('/api/admin/groups')
|
|
def api_admin_list_groups():
|
|
"""List all groups with category info"""
|
|
try:
|
|
category_id = request.args.get('category_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT pg.id, pg.name, pg.name_es, pg.category_id, pg.display_order, pg.slug,
|
|
pc.name AS category_name
|
|
FROM part_groups pg
|
|
LEFT JOIN part_categories pc ON pg.category_id = pc.id
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
if category_id:
|
|
query += " AND pg.category_id = ?"
|
|
params.append(category_id)
|
|
|
|
query += " ORDER BY pg.display_order, pg.name"
|
|
|
|
cursor.execute(query, params)
|
|
|
|
groups = []
|
|
for row in cursor.fetchall():
|
|
groups.append({
|
|
'id': row['id'],
|
|
'name': row['name'],
|
|
'name_es': row['name_es'],
|
|
'category_id': row['category_id'],
|
|
'category_name': row['category_name'],
|
|
'display_order': row['display_order'],
|
|
'slug': row['slug']
|
|
})
|
|
|
|
conn.close()
|
|
return jsonify(groups)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/groups', methods=['POST'])
|
|
def api_admin_create_group():
|
|
"""Create a new group"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO part_groups (category_id, name, name_es, slug, display_order)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
data['category_id'],
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data.get('slug') or data['name'].lower().replace(' ', '-'),
|
|
data.get('display_order', 0)
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Group created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/groups/<int:group_id>', methods=['PUT'])
|
|
def api_admin_update_group(group_id):
|
|
"""Update a group"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE part_groups
|
|
SET category_id = ?, name = ?, name_es = ?, display_order = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['category_id'],
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data.get('display_order', 0),
|
|
group_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Group updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/groups/<int:group_id>', methods=['DELETE'])
|
|
def api_admin_delete_group(group_id):
|
|
"""Delete a group"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM part_groups WHERE id = ?", (group_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Group deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Parts CRUD ----
|
|
|
|
@app.route('/api/admin/parts', methods=['POST'])
|
|
def api_admin_create_part():
|
|
"""Create a new OEM part"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material, image_url)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
data['oem_part_number'],
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data['group_id'],
|
|
data.get('description'),
|
|
data.get('description_es'),
|
|
data.get('weight_kg'),
|
|
data.get('material'),
|
|
data.get('image_url')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Part created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/parts/<int:part_id>', methods=['PUT'])
|
|
def api_admin_update_part(part_id):
|
|
"""Update an OEM part"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE parts
|
|
SET oem_part_number = ?, name = ?, name_es = ?, group_id = ?,
|
|
description = ?, description_es = ?, weight_kg = ?, material = ?, image_url = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['oem_part_number'],
|
|
data['name'],
|
|
data.get('name_es'),
|
|
data['group_id'],
|
|
data.get('description'),
|
|
data.get('description_es'),
|
|
data.get('weight_kg'),
|
|
data.get('material'),
|
|
data.get('image_url'),
|
|
part_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Part updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/parts/<int:part_id>', methods=['DELETE'])
|
|
def api_admin_delete_part(part_id):
|
|
"""Delete an OEM part"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM parts WHERE id = ?", (part_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Part deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Manufacturers CRUD ----
|
|
|
|
@app.route('/api/admin/manufacturers', methods=['POST'])
|
|
def api_admin_create_manufacturer():
|
|
"""Create a new manufacturer"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO manufacturers (name, type, quality_tier, country, website)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
data['name'],
|
|
data.get('type', 'aftermarket'),
|
|
data.get('quality_tier', 'standard'),
|
|
data.get('country'),
|
|
data.get('website')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Manufacturer created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['PUT'])
|
|
def api_admin_update_manufacturer(manufacturer_id):
|
|
"""Update a manufacturer"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE manufacturers
|
|
SET name = ?, type = ?, quality_tier = ?, country = ?, website = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['name'],
|
|
data.get('type'),
|
|
data.get('quality_tier'),
|
|
data.get('country'),
|
|
data.get('website'),
|
|
manufacturer_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Manufacturer updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['DELETE'])
|
|
def api_admin_delete_manufacturer(manufacturer_id):
|
|
"""Delete a manufacturer"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM manufacturers WHERE id = ?", (manufacturer_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Manufacturer deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Aftermarket Parts CRUD ----
|
|
|
|
@app.route('/api/admin/aftermarket', methods=['POST'])
|
|
def api_admin_create_aftermarket():
|
|
"""Create a new aftermarket part"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
data['oem_part_id'],
|
|
data['manufacturer_id'],
|
|
data['part_number'],
|
|
data.get('name'),
|
|
data.get('name_es'),
|
|
data.get('quality_tier', 'standard'),
|
|
data.get('price_usd'),
|
|
data.get('warranty_months')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Aftermarket part created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['PUT'])
|
|
def api_admin_update_aftermarket(aftermarket_id):
|
|
"""Update an aftermarket part"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE aftermarket_parts
|
|
SET oem_part_id = ?, manufacturer_id = ?, part_number = ?, name = ?, name_es = ?,
|
|
quality_tier = ?, price_usd = ?, warranty_months = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['oem_part_id'],
|
|
data['manufacturer_id'],
|
|
data['part_number'],
|
|
data.get('name'),
|
|
data.get('name_es'),
|
|
data.get('quality_tier'),
|
|
data.get('price_usd'),
|
|
data.get('warranty_months'),
|
|
aftermarket_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Aftermarket part updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['DELETE'])
|
|
def api_admin_delete_aftermarket(aftermarket_id):
|
|
"""Delete an aftermarket part"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM aftermarket_parts WHERE id = ?", (aftermarket_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Aftermarket part deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Cross-References CRUD ----
|
|
|
|
@app.route('/api/admin/crossref')
|
|
def api_admin_list_crossref():
|
|
"""List cross-references with pagination"""
|
|
try:
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
per_page = min(per_page, 100)
|
|
offset = (page - 1) * per_page
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM part_cross_references")
|
|
total_count = cursor.fetchone()[0]
|
|
|
|
cursor.execute("""
|
|
SELECT pcr.id, pcr.part_id, pcr.cross_reference_number, pcr.reference_type, pcr.source, pcr.notes,
|
|
p.oem_part_number, p.name AS part_name
|
|
FROM part_cross_references pcr
|
|
JOIN parts p ON pcr.part_id = p.id
|
|
ORDER BY pcr.id DESC
|
|
LIMIT ? OFFSET ?
|
|
""", (per_page, offset))
|
|
|
|
refs = []
|
|
for row in cursor.fetchall():
|
|
refs.append({
|
|
'id': row['id'],
|
|
'part_id': row['part_id'],
|
|
'cross_reference_number': row['cross_reference_number'],
|
|
'reference_type': row['reference_type'],
|
|
'source': row['source'],
|
|
'notes': row['notes'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'part_name': row['part_name']
|
|
})
|
|
|
|
conn.close()
|
|
|
|
total_pages = (total_count + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': refs,
|
|
'pagination': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_count,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/crossref', methods=['POST'])
|
|
def api_admin_create_crossref():
|
|
"""Create a new cross-reference"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
data['part_id'],
|
|
data['cross_reference_number'],
|
|
data['reference_type'],
|
|
data.get('source'),
|
|
data.get('notes')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Cross-reference created'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['PUT'])
|
|
def api_admin_update_crossref(crossref_id):
|
|
"""Update a cross-reference"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE part_cross_references
|
|
SET part_id = ?, cross_reference_number = ?, reference_type = ?, source = ?, notes = ?
|
|
WHERE id = ?
|
|
""", (
|
|
data['part_id'],
|
|
data['cross_reference_number'],
|
|
data['reference_type'],
|
|
data.get('source'),
|
|
data.get('notes'),
|
|
crossref_id
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Cross-reference updated'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['DELETE'])
|
|
def api_admin_delete_crossref(crossref_id):
|
|
"""Delete a cross-reference"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM part_cross_references WHERE id = ?", (crossref_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Cross-reference deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- Fitment CRUD ----
|
|
|
|
@app.route('/api/admin/fitment')
|
|
def api_admin_list_fitment():
|
|
"""List fitments with pagination and filters"""
|
|
try:
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 50, type=int)
|
|
per_page = min(per_page, 500) # Allow more for bulk editor
|
|
offset = (page - 1) * per_page
|
|
brand = request.args.get('brand')
|
|
model = request.args.get('model')
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
# Build WHERE clause
|
|
where_clause = " WHERE 1=1"
|
|
params = []
|
|
|
|
if mye_id:
|
|
where_clause += " AND vp.model_year_engine_id = ?"
|
|
params.append(mye_id)
|
|
|
|
if brand:
|
|
where_clause += " AND b.name = ?"
|
|
params.append(brand)
|
|
|
|
if model:
|
|
where_clause += " AND m.name = ?"
|
|
params.append(model)
|
|
|
|
# Count query
|
|
count_query = """
|
|
SELECT COUNT(*)
|
|
FROM vehicle_parts vp
|
|
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
""" + where_clause
|
|
cursor.execute(count_query, params)
|
|
total_count = cursor.fetchone()[0]
|
|
|
|
# Data query
|
|
data_query = """
|
|
SELECT vp.id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, vp.position, vp.fitment_notes,
|
|
b.name AS brand, m.name AS model, y.year, e.name AS engine,
|
|
p.oem_part_number, p.name AS part_name
|
|
FROM vehicle_parts vp
|
|
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id
|
|
JOIN models m ON mye.model_id = m.id
|
|
JOIN brands b ON m.brand_id = b.id
|
|
JOIN years y ON mye.year_id = y.id
|
|
JOIN engines e ON mye.engine_id = e.id
|
|
JOIN parts p ON vp.part_id = p.id
|
|
""" + where_clause + " ORDER BY vp.id DESC LIMIT ? OFFSET ?"
|
|
|
|
cursor.execute(data_query, params + [per_page, offset])
|
|
|
|
fitments = []
|
|
for row in cursor.fetchall():
|
|
fitments.append({
|
|
'id': row['id'],
|
|
'model_year_engine_id': row['model_year_engine_id'],
|
|
'part_id': row['part_id'],
|
|
'quantity_required': row['quantity_required'],
|
|
'position': row['position'],
|
|
'fitment_notes': row['fitment_notes'],
|
|
'brand': row['brand'],
|
|
'model': row['model'],
|
|
'year': row['year'],
|
|
'engine': row['engine'],
|
|
'oem_part_number': row['oem_part_number'],
|
|
'part_name': row['part_name']
|
|
})
|
|
|
|
conn.close()
|
|
|
|
total_pages = (total_count + per_page - 1) // per_page
|
|
return jsonify({
|
|
'data': fitments,
|
|
'pagination': {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_count,
|
|
'total_pages': total_pages
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/fitment', methods=['POST'])
|
|
def api_admin_create_fitment():
|
|
"""Create a new fitment record"""
|
|
try:
|
|
data = request.get_json()
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
data['model_year_engine_id'],
|
|
data['part_id'],
|
|
data.get('quantity_required', 1),
|
|
data.get('position'),
|
|
data.get('fitment_notes')
|
|
))
|
|
|
|
conn.commit()
|
|
new_id = cursor.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({'id': new_id, 'message': 'Fitment created'})
|
|
except sqlite3.IntegrityError:
|
|
return jsonify({'error': 'Este fitment ya existe'}), 400
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/fitment/<int:fitment_id>', methods=['DELETE'])
|
|
def api_admin_delete_fitment(fitment_id):
|
|
"""Delete a fitment record"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("DELETE FROM vehicle_parts WHERE id = ?", (fitment_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({'message': 'Fitment deleted'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ---- CSV Import/Export ----
|
|
|
|
@app.route('/api/admin/import/<import_type>', methods=['POST'])
|
|
def api_admin_import_csv(import_type):
|
|
"""Import records from CSV data"""
|
|
try:
|
|
data = request.get_json()
|
|
records = data.get('records', [])
|
|
|
|
if not records:
|
|
return jsonify({'error': 'No records to import'}), 400
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
imported = 0
|
|
errors = []
|
|
|
|
for i, record in enumerate(records):
|
|
try:
|
|
if import_type == 'categories':
|
|
cursor.execute("""
|
|
INSERT INTO part_categories (name, name_es, slug, icon_name, display_order)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
record['name'],
|
|
record.get('name_es'),
|
|
record.get('slug') or record['name'].lower().replace(' ', '-'),
|
|
record.get('icon_name'),
|
|
record.get('display_order', 0)
|
|
))
|
|
|
|
elif import_type == 'groups':
|
|
cursor.execute("""
|
|
INSERT INTO part_groups (category_id, name, name_es, display_order)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (
|
|
record['category_id'],
|
|
record['name'],
|
|
record.get('name_es'),
|
|
record.get('display_order', 0)
|
|
))
|
|
|
|
elif import_type == 'parts':
|
|
cursor.execute("""
|
|
INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
record['oem_part_number'],
|
|
record['name'],
|
|
record.get('name_es'),
|
|
record['group_id'],
|
|
record.get('description'),
|
|
record.get('description_es'),
|
|
record.get('weight_kg'),
|
|
record.get('material')
|
|
))
|
|
|
|
elif import_type == 'manufacturers':
|
|
cursor.execute("""
|
|
INSERT INTO manufacturers (name, type, quality_tier, country, website)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
record['name'],
|
|
record.get('type', 'aftermarket'),
|
|
record.get('quality_tier', 'standard'),
|
|
record.get('country'),
|
|
record.get('website')
|
|
))
|
|
|
|
elif import_type == 'aftermarket':
|
|
cursor.execute("""
|
|
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
record['oem_part_id'],
|
|
record['manufacturer_id'],
|
|
record['part_number'],
|
|
record.get('name'),
|
|
record.get('name_es'),
|
|
record.get('quality_tier', 'standard'),
|
|
record.get('price_usd'),
|
|
record.get('warranty_months')
|
|
))
|
|
|
|
elif import_type == 'crossref':
|
|
cursor.execute("""
|
|
INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
record['part_id'],
|
|
record['cross_reference_number'],
|
|
record['reference_type'],
|
|
record.get('source'),
|
|
record.get('notes')
|
|
))
|
|
|
|
elif import_type == 'fitment':
|
|
cursor.execute("""
|
|
INSERT OR IGNORE INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (
|
|
record['model_year_engine_id'],
|
|
record['part_id'],
|
|
record.get('quantity_required', 1),
|
|
record.get('position'),
|
|
record.get('fitment_notes')
|
|
))
|
|
|
|
imported += 1
|
|
|
|
except Exception as e:
|
|
errors.append(f"Row {i + 1}: {str(e)}")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
result = {'imported': imported}
|
|
if errors:
|
|
result['errors'] = errors[:10] # Limit errors shown
|
|
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/api/admin/export/<export_type>')
|
|
def api_admin_export_csv(export_type):
|
|
"""Export data as JSON (to be converted to CSV on frontend)"""
|
|
try:
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
data = []
|
|
|
|
if export_type == 'categories':
|
|
cursor.execute("SELECT id, name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'groups':
|
|
cursor.execute("SELECT id, category_id, name, name_es, display_order FROM part_groups ORDER BY category_id, display_order, name")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'parts':
|
|
cursor.execute("SELECT id, oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material FROM parts ORDER BY id")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'manufacturers':
|
|
cursor.execute("SELECT id, name, type, quality_tier, country, website FROM manufacturers ORDER BY name")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'aftermarket':
|
|
cursor.execute("SELECT id, oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months FROM aftermarket_parts ORDER BY id")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'crossref':
|
|
cursor.execute("SELECT id, part_id, cross_reference_number, reference_type, source, notes FROM part_cross_references ORDER BY id")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
elif export_type == 'fitment':
|
|
cursor.execute("SELECT id, model_year_engine_id, part_id, quantity_required, position, fitment_notes FROM vehicle_parts ORDER BY id")
|
|
for row in cursor.fetchall():
|
|
data.append(dict(row))
|
|
|
|
conn.close()
|
|
return jsonify({'data': data})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================================
|
|
# Image Upload Endpoint
|
|
# ============================================================================
|
|
|
|
import base64
|
|
import uuid
|
|
|
|
@app.route('/api/admin/upload-image', methods=['POST'])
|
|
def api_admin_upload_image():
|
|
"""Upload a base64 encoded image and save it to the server"""
|
|
try:
|
|
data = request.get_json()
|
|
image_data = data.get('image')
|
|
|
|
if not image_data:
|
|
return jsonify({'error': 'No image data provided'}), 400
|
|
|
|
# Parse base64 data
|
|
if ',' in image_data:
|
|
# Format: data:image/png;base64,xxxxx
|
|
header, encoded = image_data.split(',', 1)
|
|
# Extract extension from header
|
|
if 'png' in header:
|
|
ext = 'png'
|
|
elif 'jpeg' in header or 'jpg' in header:
|
|
ext = 'jpg'
|
|
elif 'gif' in header:
|
|
ext = 'gif'
|
|
elif 'webp' in header:
|
|
ext = 'webp'
|
|
else:
|
|
ext = 'png'
|
|
else:
|
|
encoded = image_data
|
|
ext = 'png'
|
|
|
|
# Decode the image
|
|
image_bytes = base64.b64decode(encoded)
|
|
|
|
# Generate unique filename
|
|
filename = f"{uuid.uuid4().hex}.{ext}"
|
|
filepath = os.path.join('static', 'parts_images', filename)
|
|
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.join('.', 'static', 'parts_images'), exist_ok=True)
|
|
|
|
# Save the file
|
|
with open(filepath, 'wb') as f:
|
|
f.write(image_bytes)
|
|
|
|
# Return the URL
|
|
url = f"/static/parts_images/{filename}"
|
|
return jsonify({'url': url, 'filename': filename})
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Check if database exists
|
|
if not os.path.exists(DATABASE_PATH):
|
|
print(f"Database not found at {DATABASE_PATH}")
|
|
print("Please make sure the vehicle database is created first.")
|
|
exit(1)
|
|
|
|
print("Starting Vehicle Dashboard Server...")
|
|
print("Visit http://localhost:5000 to access the dashboard locally")
|
|
print("Visit http://192.168.10.198:5000 to access the dashboard from other computers on the network")
|
|
app.run(debug=True, host='0.0.0.0', port=5000) |