Files
Autoparts-DB/dashboard/server.py
consultoria-as ad79724e8a Fix case-insensitive brand search, add hotspots endpoint and clean URLs
- Make brand and model name queries case-insensitive using UPPER()
- Add /api/diagrams/<id>/hotspots endpoint for fetching diagram hotspots
- Add /admin and /landing clean URL routes for HTML pages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 00:42:19 +00:00

3313 lines
110 KiB
Python

from flask import Flask, render_template, jsonify, request, send_from_directory
import sqlite3
import os
app = Flask(__name__, static_folder='.')
# Database path - adjust as needed
DATABASE_PATH = '../vehicle_database/vehicle_database.db'
def get_db_connection():
"""Get a connection to the vehicle database"""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row # This enables column access by name
return conn
def get_all_brands():
"""Get all unique brands that have vehicles with parts"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT b.name
FROM brands b
JOIN models m ON m.brand_id = b.id
JOIN model_year_engine mye ON mye.model_id = m.id
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
ORDER BY b.name
""")
brands = [row['name'] for row in cursor.fetchall()]
conn.close()
return brands
def get_all_years():
"""Get all unique years from the database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT year FROM years ORDER BY year DESC")
years = [row['year'] for row in cursor.fetchall()]
conn.close()
return years
def get_all_engines():
"""Get all unique engines from the database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM engines ORDER BY name")
engines = [row['name'] for row in cursor.fetchall()]
conn.close()
return engines
def get_models_by_brand(brand_name=None):
"""Get all models that have vehicles with parts, optionally filtered by brand"""
conn = get_db_connection()
cursor = conn.cursor()
if brand_name:
cursor.execute("""
SELECT DISTINCT m.name
FROM models m
JOIN brands b ON m.brand_id = b.id
JOIN model_year_engine mye ON mye.model_id = m.id
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
WHERE UPPER(b.name) = UPPER(?)
ORDER BY m.name
""", (brand_name,))
else:
cursor.execute("""
SELECT DISTINCT m.name
FROM models m
JOIN model_year_engine mye ON mye.model_id = m.id
JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id
ORDER BY m.name
""")
models = [row['name'] for row in cursor.fetchall()]
conn.close()
return models
def search_vehicles(brand=None, model=None, year=None, engine=None, with_parts=True):
"""Search for vehicles based on filters. By default only returns vehicles with parts."""
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine,
e.power_hp,
e.torque_nm,
e.displacement_cc,
e.cylinders,
e.fuel_type,
mye.trim_level,
mye.drivetrain,
mye.transmission
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
"""
# Only show vehicles that have parts
if with_parts:
query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)"
else:
query += " WHERE 1=1"
params = []
if brand:
query += " AND UPPER(b.name) = UPPER(?)"
params.append(brand)
if model:
query += " AND UPPER(m.name) = UPPER(?)"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(int(year))
if engine:
query += " AND e.name = ?"
params.append(engine)
query += " ORDER BY b.name, m.name, y.year"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
# Convert to list of dictionaries
vehicles = []
for row in results:
vehicle = {
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine'],
'power_hp': row['power_hp'] or 0,
'torque_nm': row['torque_nm'] or 0,
'displacement_cc': row['displacement_cc'] or 0,
'cylinders': row['cylinders'] or 0,
'fuel_type': row['fuel_type'] or 'unknown',
'trim_level': row['trim_level'] or 'unknown',
'drivetrain': row['drivetrain'] or 'unknown',
'transmission': row['transmission'] or 'unknown'
}
vehicles.append(vehicle)
return vehicles
@app.route('/')
def index():
"""Serve the main dashboard page"""
return send_from_directory('.', 'index.html')
@app.route('/admin')
def admin_page():
"""Serve the admin panel"""
return send_from_directory('.', 'admin.html')
@app.route('/landing')
def landing_page():
"""Serve the customer landing page"""
return send_from_directory('.', 'customer-landing.html')
@app.route('/<path:path>')
def static_files(path):
"""Serve static files"""
return send_from_directory('.', path)
@app.route('/api/brands')
def api_brands():
"""API endpoint to get all brands"""
brands = get_all_brands()
return jsonify(brands)
@app.route('/api/years')
def api_years():
"""API endpoint to get years, optionally filtered by brand and/or model"""
brand = request.args.get('brand')
model = request.args.get('model')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE 1=1
"""
params = []
if brand:
query += " AND UPPER(b.name) = UPPER(?)"
params.append(brand)
if model:
query += " AND UPPER(m.name) = UPPER(?)"
params.append(model)
query += " ORDER BY y.year DESC"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
years = [row['year'] for row in results]
return jsonify(years)
@app.route('/api/engines')
def api_engines():
"""API endpoint to get engines, optionally filtered by brand, model, and/or year"""
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT e.name
FROM engines e
JOIN model_year_engine mye ON e.id = mye.engine_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
WHERE 1=1
"""
params = []
if brand:
query += " AND UPPER(b.name) = UPPER(?)"
params.append(brand)
if model:
query += " AND UPPER(m.name) = UPPER(?)"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(int(year))
query += " ORDER BY e.name"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
engines = [row['name'] for row in results]
return jsonify(engines)
@app.route('/api/models')
def api_models():
"""API endpoint to get models, optionally filtered by brand"""
brand = request.args.get('brand')
models = get_models_by_brand(brand)
return jsonify(models)
@app.route('/api/vehicles')
def api_vehicles():
"""API endpoint to search for vehicles"""
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
engine = request.args.get('engine')
vehicles = search_vehicles(brand, model, year, engine)
return jsonify(vehicles)
# ============================================================================
# Parts Catalog API Endpoints
# ============================================================================
@app.route('/api/categories')
def api_categories():
"""API endpoint to get all part categories (hierarchical)"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get all categories
cursor.execute("""
SELECT id, name, name_es, slug, icon_name, display_order, parent_id
FROM part_categories
ORDER BY display_order, name
""")
all_categories = cursor.fetchall()
conn.close()
# Build hierarchical structure
categories_dict = {}
root_categories = []
# First pass: create all category objects
for row in all_categories:
category = {
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'icon_name': row['icon_name'],
'display_order': row['display_order'],
'children': []
}
categories_dict[row['id']] = category
if row['parent_id'] is None:
root_categories.append(category)
# Second pass: build hierarchy
for row in all_categories:
if row['parent_id'] is not None and row['parent_id'] in categories_dict:
categories_dict[row['parent_id']]['children'].append(categories_dict[row['id']])
return jsonify(root_categories)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/categories/<int:category_id>/groups')
def api_category_groups(category_id):
"""API endpoint to get groups for a specific category"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, name_es, slug, display_order
FROM part_groups
WHERE category_id = ?
ORDER BY display_order, name
""", (category_id,))
groups = []
for row in cursor.fetchall():
groups.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'display_order': row['display_order']
})
conn.close()
return jsonify(groups)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts')
def api_parts():
"""API endpoint to list parts with optional filters and pagination"""
try:
group_id = request.args.get('group_id', type=int)
category_id = request.args.get('category_id', type=int)
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
conn = get_db_connection()
cursor = conn.cursor()
# Build base WHERE clause for both count and data queries
where_clause = " WHERE 1=1"
params = []
if group_id:
where_clause += " AND p.group_id = ?"
params.append(group_id)
if category_id:
where_clause += " AND pg.category_id = ?"
params.append(category_id)
if search:
where_clause += " AND (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?)"
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
# Get total count
count_query = """
SELECT COUNT(*) as total
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
""" + where_clause
cursor.execute(count_query, params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.group_id,
p.image_url,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
""" + where_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cursor.execute(data_query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'image_url': row['image_url']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>')
def api_part_detail(part_id):
"""API endpoint to get single part details"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
p.description_es,
p.group_id,
p.image_url,
pg.name AS group_name,
pg.name_es AS group_name_es,
pc.id AS category_id,
pc.name AS category_name,
pc.name_es AS category_name_es
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE p.id = ?
""", (part_id,))
row = cursor.fetchone()
conn.close()
if row is None:
return jsonify({'error': 'Part not found'}), 404
part = {
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'description_es': row['description_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'image_url': row['image_url'],
'group_name_es': row['group_name_es'],
'category_id': row['category_id'],
'category_name': row['category_name'],
'category_name_es': row['category_name_es']
}
return jsonify(part)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/categories')
def api_vehicle_categories(mye_id):
"""API endpoint to get categories that have parts for a specific vehicle"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
pc.id,
pc.name,
pc.name_es,
pc.slug,
pc.icon_name,
pc.display_order
FROM part_categories pc
JOIN part_groups pg ON pg.category_id = pc.id
JOIN parts p ON p.group_id = pg.id
JOIN vehicle_parts vp ON vp.part_id = p.id
WHERE vp.model_year_engine_id = ?
ORDER BY pc.display_order, pc.name
""", (mye_id,))
categories = []
for row in cursor.fetchall():
categories.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'icon_name': row['icon_name'],
'display_order': row['display_order']
})
conn.close()
return jsonify(categories)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/groups')
def api_vehicle_groups(mye_id):
"""API endpoint to get groups that have parts for a specific vehicle within a category"""
try:
category_id = request.args.get('category_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT
pg.id,
pg.name,
pg.name_es,
pg.slug,
pg.display_order,
COUNT(DISTINCT p.id) as parts_count
FROM part_groups pg
JOIN parts p ON p.group_id = pg.id
JOIN vehicle_parts vp ON vp.part_id = p.id
WHERE vp.model_year_engine_id = ?
"""
params = [mye_id]
if category_id:
query += " AND pg.category_id = ?"
params.append(category_id)
query += " GROUP BY pg.id ORDER BY pg.display_order, pg.name"
cursor.execute(query, params)
groups = []
for row in cursor.fetchall():
groups.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'display_order': row['display_order'],
'parts_count': row['parts_count']
})
conn.close()
return jsonify(groups)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/parts')
def api_vehicle_parts(mye_id):
"""API endpoint to get parts for a specific vehicle"""
try:
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
vp.quantity_required,
vp.position,
pc.name AS category_name,
pg.name AS group_name
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vp.model_year_engine_id = ?
"""
params = [mye_id]
if category_id:
query += " AND pc.id = ?"
params.append(category_id)
if group_id:
query += " AND pg.id = ?"
params.append(group_id)
query += " ORDER BY pc.display_order, pg.display_order, p.name"
cursor.execute(query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'quantity_required': row['quantity_required'],
'position': row['position'],
'category_name': row['category_name'],
'group_name': row['group_name']
})
conn.close()
return jsonify(parts)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/model-year-engine')
def api_model_year_engine():
"""API endpoint to get model_year_engine records with filters"""
try:
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year', type=int)
with_parts = request.args.get('with_parts', 'true').lower() == 'true'
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine,
mye.trim_level,
mye.drivetrain,
mye.transmission
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
"""
# Only show vehicles that have parts
if with_parts:
query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)"
else:
query += " WHERE 1=1"
params = []
if brand:
query += " AND UPPER(b.name) = UPPER(?)"
params.append(brand)
if model:
query += " AND UPPER(m.name) = UPPER(?)"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(year)
query += " ORDER BY b.name, m.name, y.year, e.name"
cursor.execute(query, params)
records = []
for row in cursor.fetchall():
records.append({
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine'],
'trim_level': row['trim_level'],
'drivetrain': row['drivetrain'],
'transmission': row['transmission']
})
conn.close()
return jsonify(records)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 2: Cross-References and Aftermarket API Endpoints
# ============================================================================
@app.route('/api/manufacturers')
def api_manufacturers():
"""Get all manufacturers, optionally filtered by type"""
try:
manufacturer_type = request.args.get('type')
quality_tier = request.args.get('quality_tier')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT id, name, type, quality_tier, country, logo_url, website
FROM manufacturers
WHERE 1=1
"""
params = []
if manufacturer_type:
query += " AND type = ?"
params.append(manufacturer_type)
if quality_tier:
query += " AND quality_tier = ?"
params.append(quality_tier)
query += " ORDER BY name"
cursor.execute(query, params)
manufacturers = []
for row in cursor.fetchall():
manufacturers.append({
'id': row['id'],
'name': row['name'],
'type': row['type'],
'quality_tier': row['quality_tier'],
'country': row['country'],
'logo_url': row['logo_url'],
'website': row['website']
})
conn.close()
return jsonify(manufacturers)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>/alternatives')
def api_part_alternatives(part_id):
"""Get aftermarket alternatives for an OEM part"""
try:
quality_tier = request.args.get('quality_tier')
manufacturer_id = request.args.get('manufacturer_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
ap.id,
ap.part_number,
ap.name,
ap.name_es,
m.name AS manufacturer_name,
ap.manufacturer_id,
ap.quality_tier,
ap.price_usd,
ap.warranty_months,
ap.in_stock
FROM aftermarket_parts ap
JOIN manufacturers m ON ap.manufacturer_id = m.id
WHERE ap.oem_part_id = ?
"""
params = [part_id]
if quality_tier:
query += " AND ap.quality_tier = ?"
params.append(quality_tier)
if manufacturer_id:
query += " AND ap.manufacturer_id = ?"
params.append(manufacturer_id)
query += " ORDER BY ap.quality_tier DESC, ap.price_usd ASC"
cursor.execute(query, params)
alternatives = []
for row in cursor.fetchall():
alternatives.append({
'id': row['id'],
'part_number': row['part_number'],
'name': row['name'],
'name_es': row['name_es'],
'manufacturer_name': row['manufacturer_name'],
'manufacturer_id': row['manufacturer_id'],
'quality_tier': row['quality_tier'],
'price_usd': row['price_usd'],
'warranty_months': row['warranty_months'],
'in_stock': bool(row['in_stock']) if row['in_stock'] is not None else None
})
conn.close()
return jsonify(alternatives)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>/cross-references')
def api_part_cross_references(part_id):
"""Get cross-reference numbers for a part"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, cross_reference_number, reference_type, source, notes
FROM part_cross_references
WHERE part_id = ?
ORDER BY reference_type, cross_reference_number
""", (part_id,))
cross_references = []
for row in cursor.fetchall():
cross_references.append({
'id': row['id'],
'cross_reference_number': row['cross_reference_number'],
'reference_type': row['reference_type'],
'source': row['source'],
'notes': row['notes']
})
conn.close()
return jsonify(cross_references)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/search/part-number/<part_number>')
def api_search_part_number(part_number):
"""Search for parts by any part number (OEM, aftermarket, or cross-ref)"""
try:
conn = get_db_connection()
cursor = conn.cursor()
results = []
search_term = f"%{part_number}%"
# Search in OEM parts
cursor.execute("""
SELECT id, oem_part_number, name, name_es
FROM parts
WHERE oem_part_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'oem',
'matched_number': row['oem_part_number']
})
# Search in aftermarket parts
cursor.execute("""
SELECT p.id, p.oem_part_number, p.name, p.name_es, ap.part_number
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
WHERE ap.part_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'aftermarket',
'matched_number': row['part_number']
})
# Search in cross-references
cursor.execute("""
SELECT p.id, p.oem_part_number, p.name, p.name_es, pcr.cross_reference_number
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id
WHERE pcr.cross_reference_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'cross_reference',
'matched_number': row['cross_reference_number']
})
conn.close()
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/aftermarket')
def api_aftermarket_parts():
"""List aftermarket parts with filters and pagination"""
try:
manufacturer_id = request.args.get('manufacturer_id', type=int)
quality_tier = request.args.get('quality_tier')
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
conn = get_db_connection()
cursor = conn.cursor()
# Build base WHERE clause for both count and data queries
where_clause = " WHERE 1=1"
params = []
if manufacturer_id:
where_clause += " AND ap.manufacturer_id = ?"
params.append(manufacturer_id)
if quality_tier:
where_clause += " AND ap.quality_tier = ?"
params.append(quality_tier)
if search:
where_clause += " AND (ap.name LIKE ? OR ap.part_number LIKE ? OR p.oem_part_number LIKE ?)"
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
# Get total count
count_query = """
SELECT COUNT(*) as total
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN manufacturers m ON ap.manufacturer_id = m.id
""" + where_clause
cursor.execute(count_query, params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
ap.id,
ap.part_number,
ap.name,
p.oem_part_number,
m.name AS manufacturer_name,
ap.quality_tier,
ap.price_usd
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN manufacturers m ON ap.manufacturer_id = m.id
""" + where_clause + " ORDER BY ap.name LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cursor.execute(data_query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'part_number': row['part_number'],
'name': row['name'],
'oem_part_number': row['oem_part_number'],
'manufacturer_name': row['manufacturer_name'],
'quality_tier': row['quality_tier'],
'price_usd': row['price_usd']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 3: Exploded Diagrams API Endpoints
# ============================================================================
@app.route('/api/diagrams')
def api_diagrams():
"""Get all diagrams, optionally filtered by group_id"""
try:
group_id = request.args.get('group_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
d.thumbnail_path,
d.display_order
FROM diagrams d
JOIN part_groups pg ON d.group_id = pg.id
WHERE 1=1
"""
params = []
if group_id:
query += " AND d.group_id = ?"
params.append(group_id)
query += " ORDER BY d.display_order, d.name"
cursor.execute(query, params)
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'thumbnail_path': row['thumbnail_path'],
'display_order': row['display_order']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/diagrams/<int:diagram_id>')
def api_diagram_detail(diagram_id):
"""Get diagram details including SVG content and hotspots"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get diagram details
cursor.execute("""
SELECT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
d.image_path,
d.svg_content,
d.width,
d.height
FROM diagrams d
JOIN part_groups pg ON d.group_id = pg.id
WHERE d.id = ?
""", (diagram_id,))
row = cursor.fetchone()
if row is None:
conn.close()
return jsonify({'error': 'Diagram not found'}), 404
diagram = {
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'image_path': row['image_path'],
'svg_content': row['svg_content'],
'width': row['width'],
'height': row['height'],
'hotspots': []
}
# Get hotspots with part info
cursor.execute("""
SELECT
h.id,
h.part_id,
h.callout_number,
h.label,
h.shape,
h.coords,
h.color,
p.name AS part_name,
p.oem_part_number AS part_number
FROM diagram_hotspots h
LEFT JOIN parts p ON h.part_id = p.id
WHERE h.diagram_id = ?
ORDER BY h.callout_number
""", (diagram_id,))
for hotspot_row in cursor.fetchall():
diagram['hotspots'].append({
'id': hotspot_row['id'],
'part_id': hotspot_row['part_id'],
'callout_number': hotspot_row['callout_number'],
'label': hotspot_row['label'],
'shape': hotspot_row['shape'],
'coords': hotspot_row['coords'],
'color': hotspot_row['color'],
'part_name': hotspot_row['part_name'],
'part_number': hotspot_row['part_number']
})
conn.close()
return jsonify(diagram)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/diagrams/<int:diagram_id>/hotspots')
def api_diagram_hotspots(diagram_id):
"""Get all hotspots for a specific diagram"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
h.id,
h.part_id,
h.callout_number,
h.label,
h.shape,
h.coords,
h.color,
p.name AS part_name,
p.oem_part_number AS part_number
FROM diagram_hotspots h
LEFT JOIN parts p ON h.part_id = p.id
WHERE h.diagram_id = ?
ORDER BY h.callout_number
""", (diagram_id,))
hotspots = []
for row in cursor.fetchall():
hotspots.append({
'id': row['id'],
'part_id': row['part_id'],
'callout_number': row['callout_number'],
'label': row['label'],
'shape': row['shape'],
'coords': row['coords'],
'color': row['color'],
'part_name': row['part_name'],
'part_number': row['part_number']
})
conn.close()
return jsonify(hotspots)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/groups/<int:group_id>/diagrams')
def api_group_diagrams(group_id):
"""Get all diagrams for a specific part group"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
id,
name,
name_es,
thumbnail_path,
display_order
FROM diagrams
WHERE group_id = ?
ORDER BY display_order, name
""", (group_id,))
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'thumbnail_path': row['thumbnail_path'],
'display_order': row['display_order']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/diagrams')
def api_vehicle_diagrams(mye_id):
"""Get diagrams available for a specific vehicle configuration"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
pc.name AS category_name,
d.thumbnail_path,
vd.notes
FROM vehicle_diagrams vd
JOIN diagrams d ON vd.diagram_id = d.id
JOIN part_groups pg ON d.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vd.model_year_engine_id = ?
ORDER BY pc.display_order, pg.display_order, d.display_order
""", (mye_id,))
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'thumbnail_path': row['thumbnail_path'],
'notes': row['notes']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/hotspots/<int:hotspot_id>')
def api_hotspot_detail(hotspot_id):
"""Get hotspot details including linked part info"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
h.id,
h.diagram_id,
h.part_id,
h.callout_number,
h.label,
h.shape,
h.coords,
h.color
FROM diagram_hotspots h
WHERE h.id = ?
""", (hotspot_id,))
row = cursor.fetchone()
if row is None:
conn.close()
return jsonify({'error': 'Hotspot not found'}), 404
hotspot = {
'id': row['id'],
'diagram_id': row['diagram_id'],
'part_id': row['part_id'],
'callout_number': row['callout_number'],
'label': row['label'],
'shape': row['shape'],
'coords': row['coords'],
'color': row['color'],
'part': None
}
# Get linked part info if part_id exists
if row['part_id']:
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE p.id = ?
""", (row['part_id'],))
part_row = cursor.fetchone()
if part_row:
hotspot['part'] = {
'id': part_row['id'],
'oem_part_number': part_row['oem_part_number'],
'name': part_row['name'],
'name_es': part_row['name_es'],
'group_name': part_row['group_name'],
'category_name': part_row['category_name']
}
conn.close()
return jsonify(hotspot)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 4: Full-Text Search and VIN Decoder API Endpoints
# ============================================================================
import urllib.request
import json as json_module
import re
from datetime import datetime, timedelta
def validate_vin(vin):
"""Validate VIN format: 17 alphanumeric characters, no I, O, Q"""
if not vin or len(vin) != 17:
return False
# VIN can only contain alphanumeric characters except I, O, Q
valid_pattern = re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE)
return bool(valid_pattern.match(vin))
def find_vehicle_in_terms(cursor, terms):
"""
Try to find a vehicle match in the search terms.
Returns (matched_vehicle, remaining_terms) or (None, terms) if no match.
Strategy: Try different combinations of terms to find a vehicle match.
- First try all terms that could be vehicle-related (brand, model, year)
- Return the best match and remaining terms for part search
"""
if len(terms) < 2:
return None, terms
# Identify potential year terms (4-digit numbers between 1980-2030)
year_terms = []
other_terms = []
for term in terms:
if term.isdigit() and 1980 <= int(term) <= 2030:
year_terms.append(term)
else:
other_terms.append(term)
# Try to find a vehicle with the non-year terms + year
# We need at least one non-year term and preferably a year
if not other_terms:
return None, terms
# Build query to find matching vehicle
# Try with different combinations
best_match = None
used_terms = []
for num_terms in range(min(3, len(other_terms)), 0, -1):
if best_match:
break
# Try combinations of other_terms
for i in range(len(other_terms) - num_terms + 1):
test_terms = other_terms[i:i + num_terms]
if year_terms:
test_terms = test_terms + year_terms[:1] # Add first year
where_clauses = []
params = []
for term in test_terms:
term_pattern = f"%{term}%"
where_clauses.append(
"(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ?)"
)
params.extend([term_pattern, term_pattern, term_pattern])
where_sql = " AND ".join(where_clauses)
cursor.execute(f"""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE {where_sql}
ORDER BY y.year DESC
LIMIT 1
""", params)
row = cursor.fetchone()
if row:
best_match = {
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine']
}
used_terms = test_terms
break
if best_match:
# Calculate remaining terms (terms not used for vehicle match)
remaining = []
used_lower = [t.lower() for t in used_terms]
for term in terms:
if term.lower() not in used_lower:
remaining.append(term)
else:
# Remove from used_lower to handle duplicates
used_lower.remove(term.lower())
return best_match, remaining
return None, terms
@app.route('/api/search')
def api_search():
"""Unified search across parts, cross-references, and aftermarket"""
try:
q = request.args.get('q', '').strip()
search_type = request.args.get('type', 'all')
category_id = request.args.get('category_id', type=int)
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not q:
return jsonify({'error': 'Search query is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
results = {
'parts': [],
'vehicles': [],
'vehicle_parts': [], # Parts for a specific vehicle
'matched_vehicle': None, # Vehicle matched in combined search
'total_count': 0
}
terms = q.split()
# Try to detect combined vehicle + part search
# Look for patterns like "aveo 2024 balata" or "camry brake pad"
if len(terms) >= 2 and search_type == 'all':
matched_vehicle, remaining_terms = find_vehicle_in_terms(cursor, terms)
if matched_vehicle and remaining_terms:
results['matched_vehicle'] = matched_vehicle
# Search for parts compatible with this vehicle (all engine variants)
part_terms = remaining_terms
if part_terms:
vp_where_clauses = []
# Search across ALL MYE IDs for same brand/model/year (all engine variants)
vp_params = [matched_vehicle['brand'], matched_vehicle['model'], matched_vehicle['year']]
for term in part_terms:
term_pattern = f"%{term}%"
vp_where_clauses.append(
"(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ?)"
)
vp_params.extend([term_pattern, term_pattern, term_pattern, term_pattern])
vp_where_sql = " AND ".join(vp_where_clauses)
vp_params.extend([limit])
cursor.execute(f"""
SELECT DISTINCT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.image_url,
pg.name AS group_name,
pg.id AS group_id,
pc.name AS category_name,
pc.id AS category_id,
vp.quantity_required,
vp.position
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vp.model_year_engine_id IN (
SELECT mye.id FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
WHERE UPPER(b.name) = UPPER(?) AND UPPER(m.name) = UPPER(?) AND y.year = ?
) AND ({vp_where_sql})
ORDER BY p.name
LIMIT ?
""", vp_params)
for row in cursor.fetchall():
results['vehicle_parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'image_url': row['image_url'],
'group_name': row['group_name'],
'group_id': row['group_id'],
'category_name': row['category_name'],
'category_id': row['category_id'],
'quantity': row['quantity_required'],
'position': row['position'],
'match_type': 'vehicle_part'
})
# If we found vehicle parts, return early with combined results
if results['vehicle_parts']:
results['total_count'] = len(results['vehicle_parts'])
conn.close()
return jsonify(results)
# Search parts
if search_type in ('parts', 'all'):
# Split query into terms for multi-word search
terms = q.split()
# Build category filter
category_filter = ""
category_params = []
if category_id:
category_filter = " AND pc.id = ?"
category_params = [category_id]
if terms:
# Build WHERE clause: each term must match at least one field
where_clauses = []
params = []
for term in terms:
term_pattern = f"%{term}%"
where_clauses.append(
"(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ? OR pc.name LIKE ?)"
)
params.extend([term_pattern, term_pattern, term_pattern, term_pattern, term_pattern])
where_sql = " AND ".join(where_clauses)
# For ordering, use the first term
first_term = terms[0]
first_term_pattern = f"{first_term}%"
cursor.execute(f"""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.image_url,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE ({where_sql}){category_filter}
ORDER BY
CASE
WHEN p.oem_part_number LIKE ? THEN 1
WHEN p.name LIKE ? THEN 2
ELSE 3
END,
p.name
LIMIT ? OFFSET ?
""", params + category_params + [first_term_pattern, first_term_pattern, limit, offset])
for row in cursor.fetchall():
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'image_url': row['image_url'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'match_type': 'oem'
})
# Also search in aftermarket parts (only if no category filter)
if not category_id and terms:
# Build WHERE for aftermarket: match part numbers
af_where_clauses = []
af_params = []
for term in terms:
af_where_clauses.append("ap.part_number LIKE ?")
af_params.append(f"%{term}%")
af_where_sql = " AND ".join(af_where_clauses)
af_params.extend([limit, offset])
cursor.execute(f"""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.image_url,
pg.name AS group_name,
pc.name AS category_name,
ap.part_number AS matched_number
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE {af_where_sql}
LIMIT ? OFFSET ?
""", af_params)
for row in cursor.fetchall():
# Avoid duplicates
if not any(p['id'] == row['id'] for p in results['parts']):
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'image_url': row['image_url'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'matched_number': row['matched_number'],
'match_type': 'aftermarket'
})
# Search in cross-references
if terms:
cr_where_clauses = []
cr_params = []
for term in terms:
cr_where_clauses.append("pcr.cross_reference_number LIKE ?")
cr_params.append(f"%{term}%")
cr_where_sql = " AND ".join(cr_where_clauses)
cr_params.extend([limit, offset])
cursor.execute(f"""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.image_url,
pg.name AS group_name,
pc.name AS category_name,
pcr.cross_reference_number AS matched_number
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE {cr_where_sql}
LIMIT ? OFFSET ?
""", cr_params)
for row in cursor.fetchall():
# Avoid duplicates
if not any(p['id'] == row['id'] for p in results['parts']):
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'image_url': row['image_url'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'matched_number': row['matched_number'],
'match_type': 'cross_reference'
})
# Search vehicles
if search_type in ('vehicles', 'all'):
# Split query into terms for multi-word search
terms = q.split()
if terms:
# Build WHERE clause: each term must match at least one field
where_clauses = []
params = []
for term in terms:
term_pattern = f"%{term}%"
# Each term can match brand, model, year, or engine
where_clauses.append(
"(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ? OR e.name LIKE ?)"
)
params.extend([term_pattern, term_pattern, term_pattern, term_pattern])
# All terms must match (AND between terms)
where_sql = " AND ".join(where_clauses)
params.extend([limit, offset])
cursor.execute(f"""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE {where_sql}
ORDER BY y.year DESC, b.name, m.name
LIMIT ? OFFSET ?
""", params)
for row in cursor.fetchall():
results['vehicles'].append({
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine']
})
results['total_count'] = len(results['parts']) + len(results['vehicles'])
conn.close()
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/search/parts')
def api_search_parts():
"""Full-text search in parts catalog with pagination"""
try:
q = request.args.get('q', '').strip()
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
if not q:
return jsonify({'error': 'Search query is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if FTS5 table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='parts_fts'
""")
fts_exists = cursor.fetchone() is not None
parts = []
total_count = 0
if fts_exists:
# Use FTS5 for full-text search
# Escape special FTS5 characters (-, *, ^, etc.) by quoting each term
terms = q.split()
quoted_terms = []
for term in terms:
# Escape double quotes and wrap in quotes to prevent FTS operators
escaped_term = term.replace('"', '""')
quoted_terms.append(f'"{escaped_term}"')
fts_query = ' '.join(quoted_terms)
# Build filter conditions
filter_clause = ""
filter_params = []
if category_id:
filter_clause += " AND pg.category_id = ?"
filter_params.append(category_id)
if group_id:
filter_clause += " AND p.group_id = ?"
filter_params.append(group_id)
# Get total count for FTS search
count_query = """
SELECT COUNT(*) as total
FROM parts_fts
JOIN parts p ON parts_fts.rowid = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE parts_fts MATCH ?
""" + filter_clause
cursor.execute(count_query, [fts_query] + filter_params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
pg.name AS group_name,
pc.name AS category_name,
bm25(parts_fts) AS rank
FROM parts_fts
JOIN parts p ON parts_fts.rowid = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE parts_fts MATCH ?
""" + filter_clause + " ORDER BY rank LIMIT ? OFFSET ?"
cursor.execute(data_query, [fts_query] + filter_params + [per_page, offset])
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'rank': row['rank']
})
else:
# Fallback to LIKE search if FTS5 table doesn't exist
search_term = f"%{q}%"
# Build filter conditions
filter_clause = ""
filter_params = []
if category_id:
filter_clause += " AND pg.category_id = ?"
filter_params.append(category_id)
if group_id:
filter_clause += " AND p.group_id = ?"
filter_params.append(group_id)
# Get total count for LIKE search
count_query = """
SELECT COUNT(*) as total
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
OR p.description LIKE ?)
""" + filter_clause
cursor.execute(count_query, [search_term, search_term, search_term, search_term] + filter_params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
OR p.description LIKE ?)
""" + filter_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
cursor.execute(data_query, [search_term, search_term, search_term, search_term] + filter_params + [per_page, offset])
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'rank': 0
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/decode/<vin>')
def api_vin_decode(vin):
"""Decode a VIN using NHTSA API with caching"""
try:
vin = vin.upper().strip()
# Validate VIN format
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
cached_data = None
if cache_exists:
# Check for cached VIN data that hasn't expired
cursor.execute("""
SELECT
vin, make, model, year, engine_info,
body_class, drive_type,
model_year_engine_id, created_at, expires_at
FROM vin_cache
WHERE vin = ? AND expires_at > datetime('now')
""", (vin,))
cached_row = cursor.fetchone()
if cached_row:
# Parse engine_info JSON if it exists
engine_info_data = {}
if cached_row['engine_info']:
try:
engine_info_data = json_module.loads(cached_row['engine_info'])
except:
engine_info_data = {'raw': cached_row['engine_info']}
cached_data = {
'vin': cached_row['vin'],
'make': cached_row['make'],
'model': cached_row['model'],
'year': cached_row['year'],
'engine_info': engine_info_data,
'body_class': cached_row['body_class'],
'drive_type': cached_row['drive_type'],
'matched_vehicle': None,
'cached': True
}
# Get matched vehicle info if available
if cached_row['model_year_engine_id']:
cursor.execute("""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE mye.id = ?
""", (cached_row['model_year_engine_id'],))
mye_row = cursor.fetchone()
if mye_row:
cached_data['matched_vehicle'] = {
'mye_id': mye_row['id'],
'brand': mye_row['brand'],
'model': mye_row['model'],
'year': mye_row['year'],
'engine': mye_row['engine']
}
conn.close()
return jsonify(cached_data)
# Call NHTSA API
nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json'
try:
req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'AutopartesDB/1.0'})
with urllib.request.urlopen(req, timeout=10) as response:
nhtsa_data = json_module.loads(response.read().decode('utf-8'))
except urllib.error.URLError as e:
conn.close()
return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503
except urllib.error.HTTPError as e:
conn.close()
return jsonify({'error': f'NHTSA API error: {e.code}'}), 502
except Exception as e:
conn.close()
return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500
# Parse NHTSA response
results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])}
# Extract relevant fields
make = results.get('Make', '')
model = results.get('Model', '')
year_str = results.get('ModelYear', '')
year = int(year_str) if year_str and year_str.isdigit() else None
engine_config = results.get('EngineConfiguration', '')
cylinders_str = results.get('EngineCylinders', '')
cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None
displacement_str = results.get('DisplacementL', '')
displacement_l = float(displacement_str) if displacement_str else None
fuel_type = results.get('FuelTypePrimary', '')
body_class = results.get('BodyClass', '')
drive_type = results.get('DriveType', '')
# Try to match to model_year_engine record
matched_mye_id = None
matched_vehicle = None
if make and model and year:
cursor.execute("""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE UPPER(b.name) = UPPER(?)
AND UPPER(m.name) = UPPER(?)
AND y.year = ?
LIMIT 1
""", (make, model, year))
mye_row = cursor.fetchone()
if mye_row:
matched_mye_id = mye_row['id']
matched_vehicle = {
'mye_id': mye_row['id'],
'brand': mye_row['brand'],
'model': mye_row['model'],
'year': mye_row['year'],
'engine': mye_row['engine']
}
# Store in cache with 30-day expiry
if cache_exists:
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
# Combine engine info into JSON
engine_info = json_module.dumps({
'configuration': engine_config,
'cylinders': cylinders,
'displacement_l': displacement_l,
'fuel_type': fuel_type
})
cursor.execute("""
INSERT OR REPLACE INTO vin_cache
(vin, decoded_data, make, model, year, engine_info,
body_class, drive_type, model_year_engine_id, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (vin, json_module.dumps(results), make, model, year, engine_info,
body_class, drive_type, matched_mye_id, expires_at))
conn.commit()
result = {
'vin': vin,
'make': make,
'model': model,
'year': year,
'engine_info': {
'configuration': engine_config,
'cylinders': cylinders,
'displacement_l': displacement_l,
'fuel_type': fuel_type
},
'body_class': body_class,
'drive_type': drive_type,
'matched_vehicle': matched_vehicle,
'cached': False
}
conn.close()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/<vin>/parts')
def api_vin_parts(vin):
"""Get parts for a decoded VIN"""
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
category_id = request.args.get('category_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
if not cache_exists:
conn.close()
return jsonify({
'error': 'VIN cache not available. Please decode the VIN first.'
}), 400
# Look up VIN in cache
cursor.execute("""
SELECT
vin, make, model, year, model_year_engine_id
FROM vin_cache
WHERE vin = ?
""", (vin,))
cached_row = cursor.fetchone()
if not cached_row:
conn.close()
return jsonify({
'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/<vin>'
}), 404
mye_id = cached_row['model_year_engine_id']
vehicle_info = {
'vin': cached_row['vin'],
'make': cached_row['make'],
'model': cached_row['model'],
'year': cached_row['year'],
'mye_id': mye_id
}
if not mye_id:
conn.close()
return jsonify({
'vin': vin,
'vehicle_info': vehicle_info,
'categories': [],
'message': 'No matching vehicle configuration found in database. Use /api/vin/<vin>/match to manually link.'
})
# Get parts for this vehicle grouped by category
query = """
SELECT
pc.id AS category_id,
pc.name AS category_name,
pc.name_es AS category_name_es,
p.id AS part_id,
p.oem_part_number,
p.name AS part_name,
p.name_es AS part_name_es,
pg.name AS group_name,
vp.quantity_required,
vp.position
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vp.model_year_engine_id = ?
"""
params = [mye_id]
if category_id:
query += " AND pc.id = ?"
params.append(category_id)
query += " ORDER BY pc.display_order, pg.display_order, p.name"
cursor.execute(query, params)
# Group parts by category
categories_dict = {}
for row in cursor.fetchall():
cat_id = row['category_id']
if cat_id not in categories_dict:
categories_dict[cat_id] = {
'id': cat_id,
'name': row['category_name'],
'name_es': row['category_name_es'],
'parts': []
}
categories_dict[cat_id]['parts'].append({
'id': row['part_id'],
'oem_part_number': row['oem_part_number'],
'name': row['part_name'],
'name_es': row['part_name_es'],
'group_name': row['group_name'],
'quantity_required': row['quantity_required'],
'position': row['position']
})
conn.close()
return jsonify({
'vin': vin,
'vehicle_info': vehicle_info,
'categories': list(categories_dict.values())
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/<vin>/match')
def api_vin_match(vin):
"""Manually match a VIN to a vehicle configuration"""
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
mye_id = request.args.get('mye_id', type=int)
if not mye_id:
return jsonify({'error': 'mye_id parameter is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
if not cache_exists:
conn.close()
return jsonify({
'error': 'VIN cache table not available.'
}), 400
# Verify the mye_id exists
cursor.execute("""
SELECT id FROM model_year_engine WHERE id = ?
""", (mye_id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404
# Check if VIN exists in cache
cursor.execute("""
SELECT vin FROM vin_cache WHERE vin = ?
""", (vin,))
vin_exists = cursor.fetchone() is not None
if vin_exists:
# Update existing cache entry
cursor.execute("""
UPDATE vin_cache
SET model_year_engine_id = ?
WHERE vin = ?
""", (mye_id, vin))
else:
# Create new cache entry with minimal info
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
INSERT INTO vin_cache
(vin, model_year_engine_id, cached_at, expires_at)
VALUES (?, ?, datetime('now'), ?)
""", (vin, mye_id, expires_at))
conn.commit()
conn.close()
return jsonify({
'success': True,
'vin': vin,
'mye_id': mye_id
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# ADMIN API ENDPOINTS - CRUD Operations
# ============================================================================
@app.route('/api/admin/stats')
def api_admin_stats():
"""Get statistics for admin dashboard"""
try:
conn = get_db_connection()
cursor = conn.cursor()
stats = {}
# Count categories
cursor.execute("SELECT COUNT(*) FROM part_categories")
stats['categories'] = cursor.fetchone()[0]
# Count groups
cursor.execute("SELECT COUNT(*) FROM part_groups")
stats['groups'] = cursor.fetchone()[0]
# Count parts
cursor.execute("SELECT COUNT(*) FROM parts")
stats['parts'] = cursor.fetchone()[0]
# Count aftermarket parts
cursor.execute("SELECT COUNT(*) FROM aftermarket_parts")
stats['aftermarket'] = cursor.fetchone()[0]
# Count manufacturers
cursor.execute("SELECT COUNT(*) FROM manufacturers")
stats['manufacturers'] = cursor.fetchone()[0]
# Count fitments
cursor.execute("SELECT COUNT(*) FROM vehicle_parts")
stats['fitment'] = cursor.fetchone()[0]
conn.close()
return jsonify(stats)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Categories CRUD ----
@app.route('/api/admin/categories', methods=['POST'])
def api_admin_create_category():
"""Create a new category"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO part_categories (name, name_es, slug, icon_name, display_order, parent_id)
VALUES (?, ?, ?, ?, ?, ?)
""", (
data['name'],
data.get('name_es'),
data.get('slug') or data['name'].lower().replace(' ', '-'),
data.get('icon_name'),
data.get('display_order', 0),
data.get('parent_id')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Category created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/categories/<int:category_id>', methods=['PUT'])
def api_admin_update_category(category_id):
"""Update a category"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE part_categories
SET name = ?, name_es = ?, slug = ?, icon_name = ?, display_order = ?
WHERE id = ?
""", (
data['name'],
data.get('name_es'),
data.get('slug'),
data.get('icon_name'),
data.get('display_order', 0),
category_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Category updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/categories/<int:category_id>', methods=['DELETE'])
def api_admin_delete_category(category_id):
"""Delete a category"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM part_categories WHERE id = ?", (category_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Category deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Groups CRUD ----
@app.route('/api/admin/groups')
def api_admin_list_groups():
"""List all groups with category info"""
try:
category_id = request.args.get('category_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT pg.id, pg.name, pg.name_es, pg.category_id, pg.display_order, pg.slug,
pc.name AS category_name
FROM part_groups pg
LEFT JOIN part_categories pc ON pg.category_id = pc.id
WHERE 1=1
"""
params = []
if category_id:
query += " AND pg.category_id = ?"
params.append(category_id)
query += " ORDER BY pg.display_order, pg.name"
cursor.execute(query, params)
groups = []
for row in cursor.fetchall():
groups.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'category_id': row['category_id'],
'category_name': row['category_name'],
'display_order': row['display_order'],
'slug': row['slug']
})
conn.close()
return jsonify(groups)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/groups', methods=['POST'])
def api_admin_create_group():
"""Create a new group"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO part_groups (category_id, name, name_es, slug, display_order)
VALUES (?, ?, ?, ?, ?)
""", (
data['category_id'],
data['name'],
data.get('name_es'),
data.get('slug') or data['name'].lower().replace(' ', '-'),
data.get('display_order', 0)
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Group created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/groups/<int:group_id>', methods=['PUT'])
def api_admin_update_group(group_id):
"""Update a group"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE part_groups
SET category_id = ?, name = ?, name_es = ?, display_order = ?
WHERE id = ?
""", (
data['category_id'],
data['name'],
data.get('name_es'),
data.get('display_order', 0),
group_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Group updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/groups/<int:group_id>', methods=['DELETE'])
def api_admin_delete_group(group_id):
"""Delete a group"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM part_groups WHERE id = ?", (group_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Group deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Parts CRUD ----
@app.route('/api/admin/parts', methods=['POST'])
def api_admin_create_part():
"""Create a new OEM part"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material, image_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
data['oem_part_number'],
data['name'],
data.get('name_es'),
data['group_id'],
data.get('description'),
data.get('description_es'),
data.get('weight_kg'),
data.get('material'),
data.get('image_url')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Part created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/parts/<int:part_id>', methods=['PUT'])
def api_admin_update_part(part_id):
"""Update an OEM part"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE parts
SET oem_part_number = ?, name = ?, name_es = ?, group_id = ?,
description = ?, description_es = ?, weight_kg = ?, material = ?, image_url = ?
WHERE id = ?
""", (
data['oem_part_number'],
data['name'],
data.get('name_es'),
data['group_id'],
data.get('description'),
data.get('description_es'),
data.get('weight_kg'),
data.get('material'),
data.get('image_url'),
part_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Part updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/parts/<int:part_id>', methods=['DELETE'])
def api_admin_delete_part(part_id):
"""Delete an OEM part"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM parts WHERE id = ?", (part_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Part deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Manufacturers CRUD ----
@app.route('/api/admin/manufacturers', methods=['POST'])
def api_admin_create_manufacturer():
"""Create a new manufacturer"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO manufacturers (name, type, quality_tier, country, website)
VALUES (?, ?, ?, ?, ?)
""", (
data['name'],
data.get('type', 'aftermarket'),
data.get('quality_tier', 'standard'),
data.get('country'),
data.get('website')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Manufacturer created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['PUT'])
def api_admin_update_manufacturer(manufacturer_id):
"""Update a manufacturer"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE manufacturers
SET name = ?, type = ?, quality_tier = ?, country = ?, website = ?
WHERE id = ?
""", (
data['name'],
data.get('type'),
data.get('quality_tier'),
data.get('country'),
data.get('website'),
manufacturer_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Manufacturer updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/manufacturers/<int:manufacturer_id>', methods=['DELETE'])
def api_admin_delete_manufacturer(manufacturer_id):
"""Delete a manufacturer"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM manufacturers WHERE id = ?", (manufacturer_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Manufacturer deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Aftermarket Parts CRUD ----
@app.route('/api/admin/aftermarket', methods=['POST'])
def api_admin_create_aftermarket():
"""Create a new aftermarket part"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
data['oem_part_id'],
data['manufacturer_id'],
data['part_number'],
data.get('name'),
data.get('name_es'),
data.get('quality_tier', 'standard'),
data.get('price_usd'),
data.get('warranty_months')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Aftermarket part created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['PUT'])
def api_admin_update_aftermarket(aftermarket_id):
"""Update an aftermarket part"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE aftermarket_parts
SET oem_part_id = ?, manufacturer_id = ?, part_number = ?, name = ?, name_es = ?,
quality_tier = ?, price_usd = ?, warranty_months = ?
WHERE id = ?
""", (
data['oem_part_id'],
data['manufacturer_id'],
data['part_number'],
data.get('name'),
data.get('name_es'),
data.get('quality_tier'),
data.get('price_usd'),
data.get('warranty_months'),
aftermarket_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Aftermarket part updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/aftermarket/<int:aftermarket_id>', methods=['DELETE'])
def api_admin_delete_aftermarket(aftermarket_id):
"""Delete an aftermarket part"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM aftermarket_parts WHERE id = ?", (aftermarket_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Aftermarket part deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Cross-References CRUD ----
@app.route('/api/admin/crossref')
def api_admin_list_crossref():
"""List cross-references with pagination"""
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100)
offset = (page - 1) * per_page
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM part_cross_references")
total_count = cursor.fetchone()[0]
cursor.execute("""
SELECT pcr.id, pcr.part_id, pcr.cross_reference_number, pcr.reference_type, pcr.source, pcr.notes,
p.oem_part_number, p.name AS part_name
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id
ORDER BY pcr.id DESC
LIMIT ? OFFSET ?
""", (per_page, offset))
refs = []
for row in cursor.fetchall():
refs.append({
'id': row['id'],
'part_id': row['part_id'],
'cross_reference_number': row['cross_reference_number'],
'reference_type': row['reference_type'],
'source': row['source'],
'notes': row['notes'],
'oem_part_number': row['oem_part_number'],
'part_name': row['part_name']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': refs,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/crossref', methods=['POST'])
def api_admin_create_crossref():
"""Create a new cross-reference"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes)
VALUES (?, ?, ?, ?, ?)
""", (
data['part_id'],
data['cross_reference_number'],
data['reference_type'],
data.get('source'),
data.get('notes')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Cross-reference created'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['PUT'])
def api_admin_update_crossref(crossref_id):
"""Update a cross-reference"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE part_cross_references
SET part_id = ?, cross_reference_number = ?, reference_type = ?, source = ?, notes = ?
WHERE id = ?
""", (
data['part_id'],
data['cross_reference_number'],
data['reference_type'],
data.get('source'),
data.get('notes'),
crossref_id
))
conn.commit()
conn.close()
return jsonify({'message': 'Cross-reference updated'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/crossref/<int:crossref_id>', methods=['DELETE'])
def api_admin_delete_crossref(crossref_id):
"""Delete a cross-reference"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM part_cross_references WHERE id = ?", (crossref_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Cross-reference deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- Fitment CRUD ----
@app.route('/api/admin/fitment')
def api_admin_list_fitment():
"""List fitments with pagination and filters"""
try:
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 500) # Allow more for bulk editor
offset = (page - 1) * per_page
brand = request.args.get('brand')
model = request.args.get('model')
mye_id = request.args.get('mye_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
# Build WHERE clause
where_clause = " WHERE 1=1"
params = []
if mye_id:
where_clause += " AND vp.model_year_engine_id = ?"
params.append(mye_id)
if brand:
where_clause += " AND UPPER(b.name) = UPPER(?)"
params.append(brand)
if model:
where_clause += " AND UPPER(m.name) = UPPER(?)"
params.append(model)
# Count query
count_query = """
SELECT COUNT(*)
FROM vehicle_parts vp
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
""" + where_clause
cursor.execute(count_query, params)
total_count = cursor.fetchone()[0]
# Data query
data_query = """
SELECT vp.id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, vp.position, vp.fitment_notes,
b.name AS brand, m.name AS model, y.year, e.name AS engine,
p.oem_part_number, p.name AS part_name
FROM vehicle_parts vp
JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
JOIN parts p ON vp.part_id = p.id
""" + where_clause + " ORDER BY vp.id DESC LIMIT ? OFFSET ?"
cursor.execute(data_query, params + [per_page, offset])
fitments = []
for row in cursor.fetchall():
fitments.append({
'id': row['id'],
'model_year_engine_id': row['model_year_engine_id'],
'part_id': row['part_id'],
'quantity_required': row['quantity_required'],
'position': row['position'],
'fitment_notes': row['fitment_notes'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine'],
'oem_part_number': row['oem_part_number'],
'part_name': row['part_name']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': fitments,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/fitment', methods=['POST'])
def api_admin_create_fitment():
"""Create a new fitment record"""
try:
data = request.get_json()
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes)
VALUES (?, ?, ?, ?, ?)
""", (
data['model_year_engine_id'],
data['part_id'],
data.get('quantity_required', 1),
data.get('position'),
data.get('fitment_notes')
))
conn.commit()
new_id = cursor.lastrowid
conn.close()
return jsonify({'id': new_id, 'message': 'Fitment created'})
except sqlite3.IntegrityError:
return jsonify({'error': 'Este fitment ya existe'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/fitment/<int:fitment_id>', methods=['DELETE'])
def api_admin_delete_fitment(fitment_id):
"""Delete a fitment record"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM vehicle_parts WHERE id = ?", (fitment_id,))
conn.commit()
conn.close()
return jsonify({'message': 'Fitment deleted'})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ---- CSV Import/Export ----
@app.route('/api/admin/import/<import_type>', methods=['POST'])
def api_admin_import_csv(import_type):
"""Import records from CSV data"""
try:
data = request.get_json()
records = data.get('records', [])
if not records:
return jsonify({'error': 'No records to import'}), 400
conn = get_db_connection()
cursor = conn.cursor()
imported = 0
errors = []
for i, record in enumerate(records):
try:
if import_type == 'categories':
cursor.execute("""
INSERT INTO part_categories (name, name_es, slug, icon_name, display_order)
VALUES (?, ?, ?, ?, ?)
""", (
record['name'],
record.get('name_es'),
record.get('slug') or record['name'].lower().replace(' ', '-'),
record.get('icon_name'),
record.get('display_order', 0)
))
elif import_type == 'groups':
cursor.execute("""
INSERT INTO part_groups (category_id, name, name_es, display_order)
VALUES (?, ?, ?, ?)
""", (
record['category_id'],
record['name'],
record.get('name_es'),
record.get('display_order', 0)
))
elif import_type == 'parts':
cursor.execute("""
INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['oem_part_number'],
record['name'],
record.get('name_es'),
record['group_id'],
record.get('description'),
record.get('description_es'),
record.get('weight_kg'),
record.get('material')
))
elif import_type == 'manufacturers':
cursor.execute("""
INSERT INTO manufacturers (name, type, quality_tier, country, website)
VALUES (?, ?, ?, ?, ?)
""", (
record['name'],
record.get('type', 'aftermarket'),
record.get('quality_tier', 'standard'),
record.get('country'),
record.get('website')
))
elif import_type == 'aftermarket':
cursor.execute("""
INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
record['oem_part_id'],
record['manufacturer_id'],
record['part_number'],
record.get('name'),
record.get('name_es'),
record.get('quality_tier', 'standard'),
record.get('price_usd'),
record.get('warranty_months')
))
elif import_type == 'crossref':
cursor.execute("""
INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes)
VALUES (?, ?, ?, ?, ?)
""", (
record['part_id'],
record['cross_reference_number'],
record['reference_type'],
record.get('source'),
record.get('notes')
))
elif import_type == 'fitment':
cursor.execute("""
INSERT OR IGNORE INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes)
VALUES (?, ?, ?, ?, ?)
""", (
record['model_year_engine_id'],
record['part_id'],
record.get('quantity_required', 1),
record.get('position'),
record.get('fitment_notes')
))
imported += 1
except Exception as e:
errors.append(f"Row {i + 1}: {str(e)}")
conn.commit()
conn.close()
result = {'imported': imported}
if errors:
result['errors'] = errors[:10] # Limit errors shown
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/export/<export_type>')
def api_admin_export_csv(export_type):
"""Export data as JSON (to be converted to CSV on frontend)"""
try:
conn = get_db_connection()
cursor = conn.cursor()
data = []
if export_type == 'categories':
cursor.execute("SELECT id, name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'groups':
cursor.execute("SELECT id, category_id, name, name_es, display_order FROM part_groups ORDER BY category_id, display_order, name")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'parts':
cursor.execute("SELECT id, oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material FROM parts ORDER BY id")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'manufacturers':
cursor.execute("SELECT id, name, type, quality_tier, country, website FROM manufacturers ORDER BY name")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'aftermarket':
cursor.execute("SELECT id, oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months FROM aftermarket_parts ORDER BY id")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'crossref':
cursor.execute("SELECT id, part_id, cross_reference_number, reference_type, source, notes FROM part_cross_references ORDER BY id")
for row in cursor.fetchall():
data.append(dict(row))
elif export_type == 'fitment':
cursor.execute("SELECT id, model_year_engine_id, part_id, quantity_required, position, fitment_notes FROM vehicle_parts ORDER BY id")
for row in cursor.fetchall():
data.append(dict(row))
conn.close()
return jsonify({'data': data})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# Image Upload Endpoint
# ============================================================================
import base64
import uuid
@app.route('/api/admin/upload-image', methods=['POST'])
def api_admin_upload_image():
"""Upload a base64 encoded image and save it to the server"""
try:
data = request.get_json()
image_data = data.get('image')
if not image_data:
return jsonify({'error': 'No image data provided'}), 400
# Parse base64 data
if ',' in image_data:
# Format: data:image/png;base64,xxxxx
header, encoded = image_data.split(',', 1)
# Extract extension from header
if 'png' in header:
ext = 'png'
elif 'jpeg' in header or 'jpg' in header:
ext = 'jpg'
elif 'gif' in header:
ext = 'gif'
elif 'webp' in header:
ext = 'webp'
else:
ext = 'png'
else:
encoded = image_data
ext = 'png'
# Decode the image
image_bytes = base64.b64decode(encoded)
# Generate unique filename
filename = f"{uuid.uuid4().hex}.{ext}"
filepath = os.path.join('static', 'parts_images', filename)
# Ensure directory exists
os.makedirs(os.path.join('.', 'static', 'parts_images'), exist_ok=True)
# Save the file
with open(filepath, 'wb') as f:
f.write(image_bytes)
# Return the URL
url = f"/static/parts_images/{filename}"
return jsonify({'url': url, 'filename': filename})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Check if database exists
if not os.path.exists(DATABASE_PATH):
print(f"Database not found at {DATABASE_PATH}")
print("Please make sure the vehicle database is created first.")
exit(1)
print("Starting Vehicle Dashboard Server...")
print("Visit http://localhost:5000 to access the dashboard locally")
print("Visit http://192.168.10.198:5000 to access the dashboard from other computers on the network")
app.run(debug=True, host='0.0.0.0', port=5000)