Files
Autoparts-DB/dashboard/server.py
consultoria-as d4d1c9b7ba Implement complete autoparts catalog system (5 phases)
FASE 1: Parts Database
- Added part_categories, part_groups, parts, vehicle_parts tables
- 12 categories, 190 groups with Spanish translations
- API endpoints for categories, groups, parts CRUD

FASE 2: Cross-References & Aftermarket
- Added manufacturers, aftermarket_parts, part_cross_references tables
- 24 manufacturers, quality tier system (economy/standard/premium/oem)
- Part number search across OEM and aftermarket

FASE 3: Exploded Diagrams
- Added diagrams, vehicle_diagrams, diagram_hotspots tables
- SVG viewer with zoom controls and interactive hotspots
- 3 sample diagrams (brake, oil filter, suspension)

FASE 4: Search & VIN Decoder
- SQLite FTS5 full-text search with auto-sync triggers
- NHTSA VIN decoder API integration with 30-day cache
- Unified search endpoint

FASE 5: Optimization & UX
- API pagination (page/per_page, max 100 items)
- Dark mode with localStorage persistence
- Keyboard shortcuts (/, Ctrl+K, Escape, Backspace, Ctrl+D)
- Breadcrumb navigation
- ARIA accessibility (labels, roles, focus management)
- Skip link for keyboard users

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 07:13:46 +00:00

1934 lines
63 KiB
Python

from flask import Flask, render_template, jsonify, request, send_from_directory
import sqlite3
import os
app = Flask(__name__, static_folder='.')
# Database path - adjust as needed
DATABASE_PATH = '../vehicle_database/vehicle_database.db'
def get_db_connection():
"""Get a connection to the vehicle database"""
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row # This enables column access by name
return conn
def get_all_brands():
"""Get all unique brands from the database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM brands ORDER BY name")
brands = [row['name'] for row in cursor.fetchall()]
conn.close()
return brands
def get_all_years():
"""Get all unique years from the database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT year FROM years ORDER BY year DESC")
years = [row['year'] for row in cursor.fetchall()]
conn.close()
return years
def get_all_engines():
"""Get all unique engines from the database"""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT DISTINCT name FROM engines ORDER BY name")
engines = [row['name'] for row in cursor.fetchall()]
conn.close()
return engines
def get_models_by_brand(brand_name=None):
"""Get all models, optionally filtered by brand"""
conn = get_db_connection()
cursor = conn.cursor()
if brand_name:
cursor.execute("""
SELECT DISTINCT m.name
FROM models m
JOIN brands b ON m.brand_id = b.id
WHERE b.name = ?
ORDER BY m.name
""", (brand_name,))
else:
cursor.execute("SELECT DISTINCT name FROM models ORDER BY name")
models = [row['name'] for row in cursor.fetchall()]
conn.close()
return models
def search_vehicles(brand=None, model=None, year=None, engine=None):
"""Search for vehicles based on filters"""
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine,
e.power_hp,
e.displacement_cc,
e.cylinders,
e.fuel_type,
mye.trim_level,
mye.drivetrain,
mye.transmission
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE 1=1
"""
params = []
if brand:
query += " AND b.name = ?"
params.append(brand)
if model:
query += " AND m.name = ?"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(int(year))
if engine:
query += " AND e.name = ?"
params.append(engine)
query += " ORDER BY b.name, m.name, y.year"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
# Convert to list of dictionaries
vehicles = []
for row in results:
vehicle = {
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine'],
'power_hp': row['power_hp'] or 0,
'displacement_cc': row['displacement_cc'] or 0,
'cylinders': row['cylinders'] or 0,
'fuel_type': row['fuel_type'] or 'unknown',
'trim_level': row['trim_level'] or 'unknown',
'drivetrain': row['drivetrain'] or 'unknown',
'transmission': row['transmission'] or 'unknown'
}
vehicles.append(vehicle)
return vehicles
@app.route('/')
def index():
"""Serve the main dashboard page"""
return send_from_directory('.', 'index.html')
@app.route('/<path:path>')
def static_files(path):
"""Serve static files"""
return send_from_directory('.', path)
@app.route('/api/brands')
def api_brands():
"""API endpoint to get all brands"""
brands = get_all_brands()
return jsonify(brands)
@app.route('/api/years')
def api_years():
"""API endpoint to get years, optionally filtered by brand and/or model"""
brand = request.args.get('brand')
model = request.args.get('model')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT y.year
FROM years y
JOIN model_year_engine mye ON y.id = mye.year_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
WHERE 1=1
"""
params = []
if brand:
query += " AND b.name = ?"
params.append(brand)
if model:
query += " AND m.name = ?"
params.append(model)
query += " ORDER BY y.year DESC"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
years = [row['year'] for row in results]
return jsonify(years)
@app.route('/api/engines')
def api_engines():
"""API endpoint to get engines, optionally filtered by brand, model, and/or year"""
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT DISTINCT e.name
FROM engines e
JOIN model_year_engine mye ON e.id = mye.engine_id
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
WHERE 1=1
"""
params = []
if brand:
query += " AND b.name = ?"
params.append(brand)
if model:
query += " AND m.name = ?"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(int(year))
query += " ORDER BY e.name"
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
engines = [row['name'] for row in results]
return jsonify(engines)
@app.route('/api/models')
def api_models():
"""API endpoint to get models, optionally filtered by brand"""
brand = request.args.get('brand')
models = get_models_by_brand(brand)
return jsonify(models)
@app.route('/api/vehicles')
def api_vehicles():
"""API endpoint to search for vehicles"""
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year')
engine = request.args.get('engine')
vehicles = search_vehicles(brand, model, year, engine)
return jsonify(vehicles)
# ============================================================================
# Parts Catalog API Endpoints
# ============================================================================
@app.route('/api/categories')
def api_categories():
"""API endpoint to get all part categories (hierarchical)"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get all categories
cursor.execute("""
SELECT id, name, name_es, slug, icon_name, display_order, parent_id
FROM part_categories
ORDER BY display_order, name
""")
all_categories = cursor.fetchall()
conn.close()
# Build hierarchical structure
categories_dict = {}
root_categories = []
# First pass: create all category objects
for row in all_categories:
category = {
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'icon_name': row['icon_name'],
'display_order': row['display_order'],
'children': []
}
categories_dict[row['id']] = category
if row['parent_id'] is None:
root_categories.append(category)
# Second pass: build hierarchy
for row in all_categories:
if row['parent_id'] is not None and row['parent_id'] in categories_dict:
categories_dict[row['parent_id']]['children'].append(categories_dict[row['id']])
return jsonify(root_categories)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/categories/<int:category_id>/groups')
def api_category_groups(category_id):
"""API endpoint to get groups for a specific category"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, name_es, slug, display_order
FROM part_groups
WHERE category_id = ?
ORDER BY display_order, name
""", (category_id,))
groups = []
for row in cursor.fetchall():
groups.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'display_order': row['display_order']
})
conn.close()
return jsonify(groups)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts')
def api_parts():
"""API endpoint to list parts with optional filters and pagination"""
try:
group_id = request.args.get('group_id', type=int)
category_id = request.args.get('category_id', type=int)
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
conn = get_db_connection()
cursor = conn.cursor()
# Build base WHERE clause for both count and data queries
where_clause = " WHERE 1=1"
params = []
if group_id:
where_clause += " AND p.group_id = ?"
params.append(group_id)
if category_id:
where_clause += " AND pg.category_id = ?"
params.append(category_id)
if search:
where_clause += " AND (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?)"
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
# Get total count
count_query = """
SELECT COUNT(*) as total
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
""" + where_clause
cursor.execute(count_query, params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.group_id,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
""" + where_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cursor.execute(data_query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'category_name': row['category_name']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>')
def api_part_detail(part_id):
"""API endpoint to get single part details"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
p.description_es,
p.group_id,
pg.name AS group_name,
pg.name_es AS group_name_es,
pc.id AS category_id,
pc.name AS category_name,
pc.name_es AS category_name_es
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE p.id = ?
""", (part_id,))
row = cursor.fetchone()
conn.close()
if row is None:
return jsonify({'error': 'Part not found'}), 404
part = {
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'description_es': row['description_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'group_name_es': row['group_name_es'],
'category_id': row['category_id'],
'category_name': row['category_name'],
'category_name_es': row['category_name_es']
}
return jsonify(part)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/categories')
def api_vehicle_categories(mye_id):
"""API endpoint to get categories that have parts for a specific vehicle"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
pc.id,
pc.name,
pc.name_es,
pc.slug,
pc.icon_name,
pc.display_order
FROM part_categories pc
JOIN part_groups pg ON pg.category_id = pc.id
JOIN parts p ON p.group_id = pg.id
JOIN vehicle_parts vp ON vp.part_id = p.id
WHERE vp.model_year_engine_id = ?
ORDER BY pc.display_order, pc.name
""", (mye_id,))
categories = []
for row in cursor.fetchall():
categories.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'slug': row['slug'],
'icon_name': row['icon_name'],
'display_order': row['display_order']
})
conn.close()
return jsonify(categories)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/parts')
def api_vehicle_parts(mye_id):
"""API endpoint to get parts for a specific vehicle"""
try:
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
vp.quantity_required,
vp.position,
pc.name AS category_name,
pg.name AS group_name
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vp.model_year_engine_id = ?
"""
params = [mye_id]
if category_id:
query += " AND pc.id = ?"
params.append(category_id)
if group_id:
query += " AND pg.id = ?"
params.append(group_id)
query += " ORDER BY pc.display_order, pg.display_order, p.name"
cursor.execute(query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'quantity_required': row['quantity_required'],
'position': row['position'],
'category_name': row['category_name'],
'group_name': row['group_name']
})
conn.close()
return jsonify(parts)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/model-year-engine')
def api_model_year_engine():
"""API endpoint to get model_year_engine records with filters"""
try:
brand = request.args.get('brand')
model = request.args.get('model')
year = request.args.get('year', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine,
mye.trim_level,
mye.drivetrain,
mye.transmission
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE 1=1
"""
params = []
if brand:
query += " AND b.name = ?"
params.append(brand)
if model:
query += " AND m.name = ?"
params.append(model)
if year:
query += " AND y.year = ?"
params.append(year)
query += " ORDER BY b.name, m.name, y.year, e.name"
cursor.execute(query, params)
records = []
for row in cursor.fetchall():
records.append({
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine'],
'trim_level': row['trim_level'],
'drivetrain': row['drivetrain'],
'transmission': row['transmission']
})
conn.close()
return jsonify(records)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 2: Cross-References and Aftermarket API Endpoints
# ============================================================================
@app.route('/api/manufacturers')
def api_manufacturers():
"""Get all manufacturers, optionally filtered by type"""
try:
manufacturer_type = request.args.get('type')
quality_tier = request.args.get('quality_tier')
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT id, name, type, quality_tier, country, logo_url, website
FROM manufacturers
WHERE 1=1
"""
params = []
if manufacturer_type:
query += " AND type = ?"
params.append(manufacturer_type)
if quality_tier:
query += " AND quality_tier = ?"
params.append(quality_tier)
query += " ORDER BY name"
cursor.execute(query, params)
manufacturers = []
for row in cursor.fetchall():
manufacturers.append({
'id': row['id'],
'name': row['name'],
'type': row['type'],
'quality_tier': row['quality_tier'],
'country': row['country'],
'logo_url': row['logo_url'],
'website': row['website']
})
conn.close()
return jsonify(manufacturers)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>/alternatives')
def api_part_alternatives(part_id):
"""Get aftermarket alternatives for an OEM part"""
try:
quality_tier = request.args.get('quality_tier')
manufacturer_id = request.args.get('manufacturer_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
ap.id,
ap.part_number,
ap.name,
ap.name_es,
m.name AS manufacturer_name,
ap.manufacturer_id,
ap.quality_tier,
ap.price_usd,
ap.warranty_months,
ap.in_stock
FROM aftermarket_parts ap
JOIN manufacturers m ON ap.manufacturer_id = m.id
WHERE ap.oem_part_id = ?
"""
params = [part_id]
if quality_tier:
query += " AND ap.quality_tier = ?"
params.append(quality_tier)
if manufacturer_id:
query += " AND ap.manufacturer_id = ?"
params.append(manufacturer_id)
query += " ORDER BY ap.quality_tier DESC, ap.price_usd ASC"
cursor.execute(query, params)
alternatives = []
for row in cursor.fetchall():
alternatives.append({
'id': row['id'],
'part_number': row['part_number'],
'name': row['name'],
'name_es': row['name_es'],
'manufacturer_name': row['manufacturer_name'],
'manufacturer_id': row['manufacturer_id'],
'quality_tier': row['quality_tier'],
'price_usd': row['price_usd'],
'warranty_months': row['warranty_months'],
'in_stock': bool(row['in_stock']) if row['in_stock'] is not None else None
})
conn.close()
return jsonify(alternatives)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/parts/<int:part_id>/cross-references')
def api_part_cross_references(part_id):
"""Get cross-reference numbers for a part"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT id, cross_reference_number, reference_type, source, notes
FROM part_cross_references
WHERE part_id = ?
ORDER BY reference_type, cross_reference_number
""", (part_id,))
cross_references = []
for row in cursor.fetchall():
cross_references.append({
'id': row['id'],
'cross_reference_number': row['cross_reference_number'],
'reference_type': row['reference_type'],
'source': row['source'],
'notes': row['notes']
})
conn.close()
return jsonify(cross_references)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/search/part-number/<part_number>')
def api_search_part_number(part_number):
"""Search for parts by any part number (OEM, aftermarket, or cross-ref)"""
try:
conn = get_db_connection()
cursor = conn.cursor()
results = []
search_term = f"%{part_number}%"
# Search in OEM parts
cursor.execute("""
SELECT id, oem_part_number, name, name_es
FROM parts
WHERE oem_part_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'oem',
'matched_number': row['oem_part_number']
})
# Search in aftermarket parts
cursor.execute("""
SELECT p.id, p.oem_part_number, p.name, p.name_es, ap.part_number
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
WHERE ap.part_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'aftermarket',
'matched_number': row['part_number']
})
# Search in cross-references
cursor.execute("""
SELECT p.id, p.oem_part_number, p.name, p.name_es, pcr.cross_reference_number
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id
WHERE pcr.cross_reference_number LIKE ?
""", (search_term,))
for row in cursor.fetchall():
results.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'match_type': 'cross_reference',
'matched_number': row['cross_reference_number']
})
conn.close()
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/aftermarket')
def api_aftermarket_parts():
"""List aftermarket parts with filters and pagination"""
try:
manufacturer_id = request.args.get('manufacturer_id', type=int)
quality_tier = request.args.get('quality_tier')
search = request.args.get('search')
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
conn = get_db_connection()
cursor = conn.cursor()
# Build base WHERE clause for both count and data queries
where_clause = " WHERE 1=1"
params = []
if manufacturer_id:
where_clause += " AND ap.manufacturer_id = ?"
params.append(manufacturer_id)
if quality_tier:
where_clause += " AND ap.quality_tier = ?"
params.append(quality_tier)
if search:
where_clause += " AND (ap.name LIKE ? OR ap.part_number LIKE ? OR p.oem_part_number LIKE ?)"
search_term = f"%{search}%"
params.extend([search_term, search_term, search_term])
# Get total count
count_query = """
SELECT COUNT(*) as total
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN manufacturers m ON ap.manufacturer_id = m.id
""" + where_clause
cursor.execute(count_query, params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
ap.id,
ap.part_number,
ap.name,
p.oem_part_number,
m.name AS manufacturer_name,
ap.quality_tier,
ap.price_usd
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN manufacturers m ON ap.manufacturer_id = m.id
""" + where_clause + " ORDER BY ap.name LIMIT ? OFFSET ?"
params.extend([per_page, offset])
cursor.execute(data_query, params)
parts = []
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'part_number': row['part_number'],
'name': row['name'],
'oem_part_number': row['oem_part_number'],
'manufacturer_name': row['manufacturer_name'],
'quality_tier': row['quality_tier'],
'price_usd': row['price_usd']
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 3: Exploded Diagrams API Endpoints
# ============================================================================
@app.route('/api/diagrams')
def api_diagrams():
"""Get all diagrams, optionally filtered by group_id"""
try:
group_id = request.args.get('group_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
query = """
SELECT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
d.thumbnail_path,
d.display_order
FROM diagrams d
JOIN part_groups pg ON d.group_id = pg.id
WHERE 1=1
"""
params = []
if group_id:
query += " AND d.group_id = ?"
params.append(group_id)
query += " ORDER BY d.display_order, d.name"
cursor.execute(query, params)
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'thumbnail_path': row['thumbnail_path'],
'display_order': row['display_order']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/diagrams/<int:diagram_id>')
def api_diagram_detail(diagram_id):
"""Get diagram details including SVG content and hotspots"""
try:
conn = get_db_connection()
cursor = conn.cursor()
# Get diagram details
cursor.execute("""
SELECT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
d.image_path,
d.svg_content,
d.width,
d.height
FROM diagrams d
JOIN part_groups pg ON d.group_id = pg.id
WHERE d.id = ?
""", (diagram_id,))
row = cursor.fetchone()
if row is None:
conn.close()
return jsonify({'error': 'Diagram not found'}), 404
diagram = {
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'image_path': row['image_path'],
'svg_content': row['svg_content'],
'width': row['width'],
'height': row['height'],
'hotspots': []
}
# Get hotspots with part info
cursor.execute("""
SELECT
h.id,
h.part_id,
h.callout_number,
h.label,
h.shape,
h.coords,
h.color,
p.name AS part_name,
p.oem_part_number AS part_number
FROM diagram_hotspots h
LEFT JOIN parts p ON h.part_id = p.id
WHERE h.diagram_id = ?
ORDER BY h.callout_number
""", (diagram_id,))
for hotspot_row in cursor.fetchall():
diagram['hotspots'].append({
'id': hotspot_row['id'],
'part_id': hotspot_row['part_id'],
'callout_number': hotspot_row['callout_number'],
'label': hotspot_row['label'],
'shape': hotspot_row['shape'],
'coords': hotspot_row['coords'],
'color': hotspot_row['color'],
'part_name': hotspot_row['part_name'],
'part_number': hotspot_row['part_number']
})
conn.close()
return jsonify(diagram)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/groups/<int:group_id>/diagrams')
def api_group_diagrams(group_id):
"""Get all diagrams for a specific part group"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
id,
name,
name_es,
thumbnail_path,
display_order
FROM diagrams
WHERE group_id = ?
ORDER BY display_order, name
""", (group_id,))
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'thumbnail_path': row['thumbnail_path'],
'display_order': row['display_order']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vehicles/<int:mye_id>/diagrams')
def api_vehicle_diagrams(mye_id):
"""Get diagrams available for a specific vehicle configuration"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT
d.id,
d.name,
d.name_es,
d.group_id,
pg.name AS group_name,
pc.name AS category_name,
d.thumbnail_path,
vd.notes
FROM vehicle_diagrams vd
JOIN diagrams d ON vd.diagram_id = d.id
JOIN part_groups pg ON d.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vd.model_year_engine_id = ?
ORDER BY pc.display_order, pg.display_order, d.display_order
""", (mye_id,))
diagrams = []
for row in cursor.fetchall():
diagrams.append({
'id': row['id'],
'name': row['name'],
'name_es': row['name_es'],
'group_id': row['group_id'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'thumbnail_path': row['thumbnail_path'],
'notes': row['notes']
})
conn.close()
return jsonify(diagrams)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/hotspots/<int:hotspot_id>')
def api_hotspot_detail(hotspot_id):
"""Get hotspot details including linked part info"""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
h.id,
h.diagram_id,
h.part_id,
h.callout_number,
h.label,
h.shape,
h.coords,
h.color
FROM diagram_hotspots h
WHERE h.id = ?
""", (hotspot_id,))
row = cursor.fetchone()
if row is None:
conn.close()
return jsonify({'error': 'Hotspot not found'}), 404
hotspot = {
'id': row['id'],
'diagram_id': row['diagram_id'],
'part_id': row['part_id'],
'callout_number': row['callout_number'],
'label': row['label'],
'shape': row['shape'],
'coords': row['coords'],
'color': row['color'],
'part': None
}
# Get linked part info if part_id exists
if row['part_id']:
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE p.id = ?
""", (row['part_id'],))
part_row = cursor.fetchone()
if part_row:
hotspot['part'] = {
'id': part_row['id'],
'oem_part_number': part_row['oem_part_number'],
'name': part_row['name'],
'name_es': part_row['name_es'],
'group_name': part_row['group_name'],
'category_name': part_row['category_name']
}
conn.close()
return jsonify(hotspot)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ============================================================================
# FASE 4: Full-Text Search and VIN Decoder API Endpoints
# ============================================================================
import urllib.request
import json as json_module
import re
from datetime import datetime, timedelta
def validate_vin(vin):
"""Validate VIN format: 17 alphanumeric characters, no I, O, Q"""
if not vin or len(vin) != 17:
return False
# VIN can only contain alphanumeric characters except I, O, Q
valid_pattern = re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE)
return bool(valid_pattern.match(vin))
@app.route('/api/search')
def api_search():
"""Unified search across parts, cross-references, and aftermarket"""
try:
q = request.args.get('q', '').strip()
search_type = request.args.get('type', 'all')
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not q:
return jsonify({'error': 'Search query is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
results = {
'parts': [],
'vehicles': [],
'total_count': 0
}
# Search parts
if search_type in ('parts', 'all'):
search_term = f"%{q}%"
# Search in parts table
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
ORDER BY p.name
LIMIT ? OFFSET ?
""", (search_term, search_term, search_term, limit, offset))
for row in cursor.fetchall():
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'match_type': 'oem'
})
# Also search in aftermarket parts
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
pg.name AS group_name,
pc.name AS category_name,
ap.part_number AS matched_number
FROM aftermarket_parts ap
JOIN parts p ON ap.oem_part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE ap.part_number LIKE ?
LIMIT ? OFFSET ?
""", (search_term, limit, offset))
for row in cursor.fetchall():
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'matched_number': row['matched_number'],
'match_type': 'aftermarket'
})
# Search in cross-references
cursor.execute("""
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
pg.name AS group_name,
pc.name AS category_name,
pcr.cross_reference_number AS matched_number
FROM part_cross_references pcr
JOIN parts p ON pcr.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE pcr.cross_reference_number LIKE ?
LIMIT ? OFFSET ?
""", (search_term, limit, offset))
for row in cursor.fetchall():
results['parts'].append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'matched_number': row['matched_number'],
'match_type': 'cross_reference'
})
# Search vehicles
if search_type in ('vehicles', 'all'):
search_term = f"%{q}%"
cursor.execute("""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE b.name LIKE ? OR m.name LIKE ? OR e.name LIKE ?
ORDER BY b.name, m.name, y.year
LIMIT ? OFFSET ?
""", (search_term, search_term, search_term, limit, offset))
for row in cursor.fetchall():
results['vehicles'].append({
'id': row['id'],
'brand': row['brand'],
'model': row['model'],
'year': row['year'],
'engine': row['engine']
})
results['total_count'] = len(results['parts']) + len(results['vehicles'])
conn.close()
return jsonify(results)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/search/parts')
def api_search_parts():
"""Full-text search in parts catalog with pagination"""
try:
q = request.args.get('q', '').strip()
category_id = request.args.get('category_id', type=int)
group_id = request.args.get('group_id', type=int)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
per_page = min(per_page, 100) # Max 100 per page
offset = (page - 1) * per_page
if not q:
return jsonify({'error': 'Search query is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if FTS5 table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='parts_fts'
""")
fts_exists = cursor.fetchone() is not None
parts = []
total_count = 0
if fts_exists:
# Use FTS5 for full-text search
# Escape special FTS5 characters and prepare search term
fts_query = q.replace('"', '""')
# Build filter conditions
filter_clause = ""
filter_params = []
if category_id:
filter_clause += " AND pg.category_id = ?"
filter_params.append(category_id)
if group_id:
filter_clause += " AND p.group_id = ?"
filter_params.append(group_id)
# Get total count for FTS search
count_query = """
SELECT COUNT(*) as total
FROM parts_fts
JOIN parts p ON parts_fts.rowid = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE parts_fts MATCH ?
""" + filter_clause
cursor.execute(count_query, [fts_query] + filter_params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
pg.name AS group_name,
pc.name AS category_name,
bm25(parts_fts) AS rank
FROM parts_fts
JOIN parts p ON parts_fts.rowid = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE parts_fts MATCH ?
""" + filter_clause + " ORDER BY rank LIMIT ? OFFSET ?"
cursor.execute(data_query, [fts_query] + filter_params + [per_page, offset])
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'rank': row['rank']
})
else:
# Fallback to LIKE search if FTS5 table doesn't exist
search_term = f"%{q}%"
# Build filter conditions
filter_clause = ""
filter_params = []
if category_id:
filter_clause += " AND pg.category_id = ?"
filter_params.append(category_id)
if group_id:
filter_clause += " AND p.group_id = ?"
filter_params.append(group_id)
# Get total count for LIKE search
count_query = """
SELECT COUNT(*) as total
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
OR p.description LIKE ?)
""" + filter_clause
cursor.execute(count_query, [search_term, search_term, search_term, search_term] + filter_params)
total_count = cursor.fetchone()['total']
# Get paginated data
data_query = """
SELECT
p.id,
p.oem_part_number,
p.name,
p.name_es,
p.description,
pg.name AS group_name,
pc.name AS category_name
FROM parts p
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?
OR p.description LIKE ?)
""" + filter_clause + " ORDER BY p.name LIMIT ? OFFSET ?"
cursor.execute(data_query, [search_term, search_term, search_term, search_term] + filter_params + [per_page, offset])
for row in cursor.fetchall():
parts.append({
'id': row['id'],
'oem_part_number': row['oem_part_number'],
'name': row['name'],
'name_es': row['name_es'],
'description': row['description'],
'group_name': row['group_name'],
'category_name': row['category_name'],
'rank': 0
})
conn.close()
total_pages = (total_count + per_page - 1) // per_page
return jsonify({
'data': parts,
'pagination': {
'page': page,
'per_page': per_page,
'total': total_count,
'total_pages': total_pages
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/decode/<vin>')
def api_vin_decode(vin):
"""Decode a VIN using NHTSA API with caching"""
try:
vin = vin.upper().strip()
# Validate VIN format
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
cached_data = None
if cache_exists:
# Check for cached VIN data that hasn't expired
cursor.execute("""
SELECT
vin, make, model, year, engine_info,
body_class, drive_type,
model_year_engine_id, created_at, expires_at
FROM vin_cache
WHERE vin = ? AND expires_at > datetime('now')
""", (vin,))
cached_row = cursor.fetchone()
if cached_row:
# Parse engine_info JSON if it exists
engine_info_data = {}
if cached_row['engine_info']:
try:
engine_info_data = json_module.loads(cached_row['engine_info'])
except:
engine_info_data = {'raw': cached_row['engine_info']}
cached_data = {
'vin': cached_row['vin'],
'make': cached_row['make'],
'model': cached_row['model'],
'year': cached_row['year'],
'engine_info': engine_info_data,
'body_class': cached_row['body_class'],
'drive_type': cached_row['drive_type'],
'matched_vehicle': None,
'cached': True
}
# Get matched vehicle info if available
if cached_row['model_year_engine_id']:
cursor.execute("""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE mye.id = ?
""", (cached_row['model_year_engine_id'],))
mye_row = cursor.fetchone()
if mye_row:
cached_data['matched_vehicle'] = {
'mye_id': mye_row['id'],
'brand': mye_row['brand'],
'model': mye_row['model'],
'year': mye_row['year'],
'engine': mye_row['engine']
}
conn.close()
return jsonify(cached_data)
# Call NHTSA API
nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json'
try:
req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'AutopartesDB/1.0'})
with urllib.request.urlopen(req, timeout=10) as response:
nhtsa_data = json_module.loads(response.read().decode('utf-8'))
except urllib.error.URLError as e:
conn.close()
return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503
except urllib.error.HTTPError as e:
conn.close()
return jsonify({'error': f'NHTSA API error: {e.code}'}), 502
except Exception as e:
conn.close()
return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500
# Parse NHTSA response
results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])}
# Extract relevant fields
make = results.get('Make', '')
model = results.get('Model', '')
year_str = results.get('ModelYear', '')
year = int(year_str) if year_str and year_str.isdigit() else None
engine_config = results.get('EngineConfiguration', '')
cylinders_str = results.get('EngineCylinders', '')
cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None
displacement_str = results.get('DisplacementL', '')
displacement_l = float(displacement_str) if displacement_str else None
fuel_type = results.get('FuelTypePrimary', '')
body_class = results.get('BodyClass', '')
drive_type = results.get('DriveType', '')
# Try to match to model_year_engine record
matched_mye_id = None
matched_vehicle = None
if make and model and year:
cursor.execute("""
SELECT
mye.id,
b.name AS brand,
m.name AS model,
y.year,
e.name AS engine
FROM model_year_engine mye
JOIN models m ON mye.model_id = m.id
JOIN brands b ON m.brand_id = b.id
JOIN years y ON mye.year_id = y.id
JOIN engines e ON mye.engine_id = e.id
WHERE UPPER(b.name) = UPPER(?)
AND UPPER(m.name) = UPPER(?)
AND y.year = ?
LIMIT 1
""", (make, model, year))
mye_row = cursor.fetchone()
if mye_row:
matched_mye_id = mye_row['id']
matched_vehicle = {
'mye_id': mye_row['id'],
'brand': mye_row['brand'],
'model': mye_row['model'],
'year': mye_row['year'],
'engine': mye_row['engine']
}
# Store in cache with 30-day expiry
if cache_exists:
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
# Combine engine info into JSON
engine_info = json_module.dumps({
'configuration': engine_config,
'cylinders': cylinders,
'displacement_l': displacement_l,
'fuel_type': fuel_type
})
cursor.execute("""
INSERT OR REPLACE INTO vin_cache
(vin, decoded_data, make, model, year, engine_info,
body_class, drive_type, model_year_engine_id, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (vin, json_module.dumps(results), make, model, year, engine_info,
body_class, drive_type, matched_mye_id, expires_at))
conn.commit()
result = {
'vin': vin,
'make': make,
'model': model,
'year': year,
'engine_info': {
'configuration': engine_config,
'cylinders': cylinders,
'displacement_l': displacement_l,
'fuel_type': fuel_type
},
'body_class': body_class,
'drive_type': drive_type,
'matched_vehicle': matched_vehicle,
'cached': False
}
conn.close()
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/<vin>/parts')
def api_vin_parts(vin):
"""Get parts for a decoded VIN"""
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
category_id = request.args.get('category_id', type=int)
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
if not cache_exists:
conn.close()
return jsonify({
'error': 'VIN cache not available. Please decode the VIN first.'
}), 400
# Look up VIN in cache
cursor.execute("""
SELECT
vin, make, model, year, model_year_engine_id
FROM vin_cache
WHERE vin = ?
""", (vin,))
cached_row = cursor.fetchone()
if not cached_row:
conn.close()
return jsonify({
'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/<vin>'
}), 404
mye_id = cached_row['model_year_engine_id']
vehicle_info = {
'vin': cached_row['vin'],
'make': cached_row['make'],
'model': cached_row['model'],
'year': cached_row['year'],
'mye_id': mye_id
}
if not mye_id:
conn.close()
return jsonify({
'vin': vin,
'vehicle_info': vehicle_info,
'categories': [],
'message': 'No matching vehicle configuration found in database. Use /api/vin/<vin>/match to manually link.'
})
# Get parts for this vehicle grouped by category
query = """
SELECT
pc.id AS category_id,
pc.name AS category_name,
pc.name_es AS category_name_es,
p.id AS part_id,
p.oem_part_number,
p.name AS part_name,
p.name_es AS part_name_es,
pg.name AS group_name,
vp.quantity_required,
vp.position
FROM vehicle_parts vp
JOIN parts p ON vp.part_id = p.id
JOIN part_groups pg ON p.group_id = pg.id
JOIN part_categories pc ON pg.category_id = pc.id
WHERE vp.model_year_engine_id = ?
"""
params = [mye_id]
if category_id:
query += " AND pc.id = ?"
params.append(category_id)
query += " ORDER BY pc.display_order, pg.display_order, p.name"
cursor.execute(query, params)
# Group parts by category
categories_dict = {}
for row in cursor.fetchall():
cat_id = row['category_id']
if cat_id not in categories_dict:
categories_dict[cat_id] = {
'id': cat_id,
'name': row['category_name'],
'name_es': row['category_name_es'],
'parts': []
}
categories_dict[cat_id]['parts'].append({
'id': row['part_id'],
'oem_part_number': row['oem_part_number'],
'name': row['part_name'],
'name_es': row['part_name_es'],
'group_name': row['group_name'],
'quantity_required': row['quantity_required'],
'position': row['position']
})
conn.close()
return jsonify({
'vin': vin,
'vehicle_info': vehicle_info,
'categories': list(categories_dict.values())
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/vin/<vin>/match')
def api_vin_match(vin):
"""Manually match a VIN to a vehicle configuration"""
try:
vin = vin.upper().strip()
if not validate_vin(vin):
return jsonify({
'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'
}), 400
mye_id = request.args.get('mye_id', type=int)
if not mye_id:
return jsonify({'error': 'mye_id parameter is required'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Check if vin_cache table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='vin_cache'
""")
cache_exists = cursor.fetchone() is not None
if not cache_exists:
conn.close()
return jsonify({
'error': 'VIN cache table not available.'
}), 400
# Verify the mye_id exists
cursor.execute("""
SELECT id FROM model_year_engine WHERE id = ?
""", (mye_id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404
# Check if VIN exists in cache
cursor.execute("""
SELECT vin FROM vin_cache WHERE vin = ?
""", (vin,))
vin_exists = cursor.fetchone() is not None
if vin_exists:
# Update existing cache entry
cursor.execute("""
UPDATE vin_cache
SET model_year_engine_id = ?
WHERE vin = ?
""", (mye_id, vin))
else:
# Create new cache entry with minimal info
expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
INSERT INTO vin_cache
(vin, model_year_engine_id, cached_at, expires_at)
VALUES (?, ?, datetime('now'), ?)
""", (vin, mye_id, expires_at))
conn.commit()
conn.close()
return jsonify({
'success': True,
'vin': vin,
'mye_id': mye_id
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Check if database exists
if not os.path.exists(DATABASE_PATH):
print(f"Database not found at {DATABASE_PATH}")
print("Please make sure the vehicle database is created first.")
exit(1)
print("Starting Vehicle Dashboard Server...")
print("Visit http://localhost:5000 to access the dashboard locally")
print("Visit http://192.168.10.198:5000 to access the dashboard from other computers on the network")
app.run(debug=True, host='0.0.0.0', port=5000)