from flask import Flask, render_template, jsonify, request, send_from_directory import sqlite3 import os app = Flask(__name__, static_folder='.') # Database path - adjust as needed DATABASE_PATH = '../vehicle_database/vehicle_database.db' def get_db_connection(): """Get a connection to the vehicle database""" conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row # This enables column access by name return conn def get_all_brands(): """Get all unique brands from the database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM brands ORDER BY name") brands = [row['name'] for row in cursor.fetchall()] conn.close() return brands def get_all_years(): """Get all unique years from the database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT year FROM years ORDER BY year DESC") years = [row['year'] for row in cursor.fetchall()] conn.close() return years def get_all_engines(): """Get all unique engines from the database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM engines ORDER BY name") engines = [row['name'] for row in cursor.fetchall()] conn.close() return engines def get_models_by_brand(brand_name=None): """Get all models, optionally filtered by brand""" conn = get_db_connection() cursor = conn.cursor() if brand_name: cursor.execute(""" SELECT DISTINCT m.name FROM models m JOIN brands b ON m.brand_id = b.id WHERE b.name = ? ORDER BY m.name """, (brand_name,)) else: cursor.execute("SELECT DISTINCT name FROM models ORDER BY name") models = [row['name'] for row in cursor.fetchall()] conn.close() return models def search_vehicles(brand=None, model=None, year=None, engine=None): """Search for vehicles based on filters""" conn = get_db_connection() cursor = conn.cursor() query = """ SELECT b.name AS brand, m.name AS model, y.year, e.name AS engine, e.power_hp, e.displacement_cc, e.cylinders, e.fuel_type, mye.trim_level, mye.drivetrain, mye.transmission FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE 1=1 """ params = [] if brand: query += " AND b.name = ?" params.append(brand) if model: query += " AND m.name = ?" params.append(model) if year: query += " AND y.year = ?" params.append(int(year)) if engine: query += " AND e.name = ?" params.append(engine) query += " ORDER BY b.name, m.name, y.year" cursor.execute(query, params) results = cursor.fetchall() conn.close() # Convert to list of dictionaries vehicles = [] for row in results: vehicle = { 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'], 'power_hp': row['power_hp'] or 0, 'displacement_cc': row['displacement_cc'] or 0, 'cylinders': row['cylinders'] or 0, 'fuel_type': row['fuel_type'] or 'unknown', 'trim_level': row['trim_level'] or 'unknown', 'drivetrain': row['drivetrain'] or 'unknown', 'transmission': row['transmission'] or 'unknown' } vehicles.append(vehicle) return vehicles @app.route('/') def index(): """Serve the main dashboard page""" return send_from_directory('.', 'index.html') @app.route('/') def static_files(path): """Serve static files""" return send_from_directory('.', path) @app.route('/api/brands') def api_brands(): """API endpoint to get all brands""" brands = get_all_brands() return jsonify(brands) @app.route('/api/years') def api_years(): """API endpoint to get years, optionally filtered by brand and/or model""" brand = request.args.get('brand') model = request.args.get('model') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT DISTINCT y.year FROM years y JOIN model_year_engine mye ON y.id = mye.year_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id WHERE 1=1 """ params = [] if brand: query += " AND b.name = ?" params.append(brand) if model: query += " AND m.name = ?" params.append(model) query += " ORDER BY y.year DESC" cursor.execute(query, params) results = cursor.fetchall() conn.close() years = [row['year'] for row in results] return jsonify(years) @app.route('/api/engines') def api_engines(): """API endpoint to get engines, optionally filtered by brand, model, and/or year""" brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT DISTINCT e.name FROM engines e JOIN model_year_engine mye ON e.id = mye.engine_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id WHERE 1=1 """ params = [] if brand: query += " AND b.name = ?" params.append(brand) if model: query += " AND m.name = ?" params.append(model) if year: query += " AND y.year = ?" params.append(int(year)) query += " ORDER BY e.name" cursor.execute(query, params) results = cursor.fetchall() conn.close() engines = [row['name'] for row in results] return jsonify(engines) @app.route('/api/models') def api_models(): """API endpoint to get models, optionally filtered by brand""" brand = request.args.get('brand') models = get_models_by_brand(brand) return jsonify(models) @app.route('/api/vehicles') def api_vehicles(): """API endpoint to search for vehicles""" brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') engine = request.args.get('engine') vehicles = search_vehicles(brand, model, year, engine) return jsonify(vehicles) # ============================================================================ # Parts Catalog API Endpoints # ============================================================================ @app.route('/api/categories') def api_categories(): """API endpoint to get all part categories (hierarchical)""" try: conn = get_db_connection() cursor = conn.cursor() # Get all categories cursor.execute(""" SELECT id, name, name_es, slug, icon_name, display_order, parent_id FROM part_categories ORDER BY display_order, name """) all_categories = cursor.fetchall() conn.close() # Build hierarchical structure categories_dict = {} root_categories = [] # First pass: create all category objects for row in all_categories: category = { 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'icon_name': row['icon_name'], 'display_order': row['display_order'], 'children': [] } categories_dict[row['id']] = category if row['parent_id'] is None: root_categories.append(category) # Second pass: build hierarchy for row in all_categories: if row['parent_id'] is not None and row['parent_id'] in categories_dict: categories_dict[row['parent_id']]['children'].append(categories_dict[row['id']]) return jsonify(root_categories) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/categories//groups') def api_category_groups(category_id): """API endpoint to get groups for a specific category""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, name, name_es, slug, display_order FROM part_groups WHERE category_id = ? ORDER BY display_order, name """, (category_id,)) groups = [] for row in cursor.fetchall(): groups.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'display_order': row['display_order'] }) conn.close() return jsonify(groups) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts') def api_parts(): """API endpoint to list parts with optional filters and pagination""" try: group_id = request.args.get('group_id', type=int) category_id = request.args.get('category_id', type=int) search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page conn = get_db_connection() cursor = conn.cursor() # Build base WHERE clause for both count and data queries where_clause = " WHERE 1=1" params = [] if group_id: where_clause += " AND p.group_id = ?" params.append(group_id) if category_id: where_clause += " AND pg.category_id = ?" params.append(category_id) if search: where_clause += " AND (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?)" search_term = f"%{search}%" params.extend([search_term, search_term, search_term]) # Get total count count_query = """ SELECT COUNT(*) as total FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id """ + where_clause cursor.execute(count_query, params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.group_id, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id """ + where_clause + " ORDER BY p.name LIMIT ? OFFSET ?" params.extend([per_page, offset]) cursor.execute(data_query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'category_name': row['category_name'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts/') def api_part_detail(part_id): """API endpoint to get single part details""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, p.description_es, p.group_id, pg.name AS group_name, pg.name_es AS group_name_es, pc.id AS category_id, pc.name AS category_name, pc.name_es AS category_name_es FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE p.id = ? """, (part_id,)) row = cursor.fetchone() conn.close() if row is None: return jsonify({'error': 'Part not found'}), 404 part = { 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'description_es': row['description_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'group_name_es': row['group_name_es'], 'category_id': row['category_id'], 'category_name': row['category_name'], 'category_name_es': row['category_name_es'] } return jsonify(part) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//categories') def api_vehicle_categories(mye_id): """API endpoint to get categories that have parts for a specific vehicle""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT pc.id, pc.name, pc.name_es, pc.slug, pc.icon_name, pc.display_order FROM part_categories pc JOIN part_groups pg ON pg.category_id = pc.id JOIN parts p ON p.group_id = pg.id JOIN vehicle_parts vp ON vp.part_id = p.id WHERE vp.model_year_engine_id = ? ORDER BY pc.display_order, pc.name """, (mye_id,)) categories = [] for row in cursor.fetchall(): categories.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'icon_name': row['icon_name'], 'display_order': row['display_order'] }) conn.close() return jsonify(categories) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//parts') def api_vehicle_parts(mye_id): """API endpoint to get parts for a specific vehicle""" try: category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, vp.quantity_required, vp.position, pc.name AS category_name, pg.name AS group_name FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vp.model_year_engine_id = ? """ params = [mye_id] if category_id: query += " AND pc.id = ?" params.append(category_id) if group_id: query += " AND pg.id = ?" params.append(group_id) query += " ORDER BY pc.display_order, pg.display_order, p.name" cursor.execute(query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'quantity_required': row['quantity_required'], 'position': row['position'], 'category_name': row['category_name'], 'group_name': row['group_name'] }) conn.close() return jsonify(parts) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/model-year-engine') def api_model_year_engine(): """API endpoint to get model_year_engine records with filters""" try: brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine, mye.trim_level, mye.drivetrain, mye.transmission FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE 1=1 """ params = [] if brand: query += " AND b.name = ?" params.append(brand) if model: query += " AND m.name = ?" params.append(model) if year: query += " AND y.year = ?" params.append(year) query += " ORDER BY b.name, m.name, y.year, e.name" cursor.execute(query, params) records = [] for row in cursor.fetchall(): records.append({ 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'], 'trim_level': row['trim_level'], 'drivetrain': row['drivetrain'], 'transmission': row['transmission'] }) conn.close() return jsonify(records) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 2: Cross-References and Aftermarket API Endpoints # ============================================================================ @app.route('/api/manufacturers') def api_manufacturers(): """Get all manufacturers, optionally filtered by type""" try: manufacturer_type = request.args.get('type') quality_tier = request.args.get('quality_tier') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT id, name, type, quality_tier, country, logo_url, website FROM manufacturers WHERE 1=1 """ params = [] if manufacturer_type: query += " AND type = ?" params.append(manufacturer_type) if quality_tier: query += " AND quality_tier = ?" params.append(quality_tier) query += " ORDER BY name" cursor.execute(query, params) manufacturers = [] for row in cursor.fetchall(): manufacturers.append({ 'id': row['id'], 'name': row['name'], 'type': row['type'], 'quality_tier': row['quality_tier'], 'country': row['country'], 'logo_url': row['logo_url'], 'website': row['website'] }) conn.close() return jsonify(manufacturers) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts//alternatives') def api_part_alternatives(part_id): """Get aftermarket alternatives for an OEM part""" try: quality_tier = request.args.get('quality_tier') manufacturer_id = request.args.get('manufacturer_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT ap.id, ap.part_number, ap.name, ap.name_es, m.name AS manufacturer_name, ap.manufacturer_id, ap.quality_tier, ap.price_usd, ap.warranty_months, ap.in_stock FROM aftermarket_parts ap JOIN manufacturers m ON ap.manufacturer_id = m.id WHERE ap.oem_part_id = ? """ params = [part_id] if quality_tier: query += " AND ap.quality_tier = ?" params.append(quality_tier) if manufacturer_id: query += " AND ap.manufacturer_id = ?" params.append(manufacturer_id) query += " ORDER BY ap.quality_tier DESC, ap.price_usd ASC" cursor.execute(query, params) alternatives = [] for row in cursor.fetchall(): alternatives.append({ 'id': row['id'], 'part_number': row['part_number'], 'name': row['name'], 'name_es': row['name_es'], 'manufacturer_name': row['manufacturer_name'], 'manufacturer_id': row['manufacturer_id'], 'quality_tier': row['quality_tier'], 'price_usd': row['price_usd'], 'warranty_months': row['warranty_months'], 'in_stock': bool(row['in_stock']) if row['in_stock'] is not None else None }) conn.close() return jsonify(alternatives) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts//cross-references') def api_part_cross_references(part_id): """Get cross-reference numbers for a part""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, cross_reference_number, reference_type, source, notes FROM part_cross_references WHERE part_id = ? ORDER BY reference_type, cross_reference_number """, (part_id,)) cross_references = [] for row in cursor.fetchall(): cross_references.append({ 'id': row['id'], 'cross_reference_number': row['cross_reference_number'], 'reference_type': row['reference_type'], 'source': row['source'], 'notes': row['notes'] }) conn.close() return jsonify(cross_references) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/search/part-number/') def api_search_part_number(part_number): """Search for parts by any part number (OEM, aftermarket, or cross-ref)""" try: conn = get_db_connection() cursor = conn.cursor() results = [] search_term = f"%{part_number}%" # Search in OEM parts cursor.execute(""" SELECT id, oem_part_number, name, name_es FROM parts WHERE oem_part_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'oem', 'matched_number': row['oem_part_number'] }) # Search in aftermarket parts cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, ap.part_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id WHERE ap.part_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'aftermarket', 'matched_number': row['part_number'] }) # Search in cross-references cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pcr.cross_reference_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id WHERE pcr.cross_reference_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'cross_reference', 'matched_number': row['cross_reference_number'] }) conn.close() return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/aftermarket') def api_aftermarket_parts(): """List aftermarket parts with filters and pagination""" try: manufacturer_id = request.args.get('manufacturer_id', type=int) quality_tier = request.args.get('quality_tier') search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page conn = get_db_connection() cursor = conn.cursor() # Build base WHERE clause for both count and data queries where_clause = " WHERE 1=1" params = [] if manufacturer_id: where_clause += " AND ap.manufacturer_id = ?" params.append(manufacturer_id) if quality_tier: where_clause += " AND ap.quality_tier = ?" params.append(quality_tier) if search: where_clause += " AND (ap.name LIKE ? OR ap.part_number LIKE ? OR p.oem_part_number LIKE ?)" search_term = f"%{search}%" params.extend([search_term, search_term, search_term]) # Get total count count_query = """ SELECT COUNT(*) as total FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN manufacturers m ON ap.manufacturer_id = m.id """ + where_clause cursor.execute(count_query, params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT ap.id, ap.part_number, ap.name, p.oem_part_number, m.name AS manufacturer_name, ap.quality_tier, ap.price_usd FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN manufacturers m ON ap.manufacturer_id = m.id """ + where_clause + " ORDER BY ap.name LIMIT ? OFFSET ?" params.extend([per_page, offset]) cursor.execute(data_query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'part_number': row['part_number'], 'name': row['name'], 'oem_part_number': row['oem_part_number'], 'manufacturer_name': row['manufacturer_name'], 'quality_tier': row['quality_tier'], 'price_usd': row['price_usd'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 3: Exploded Diagrams API Endpoints # ============================================================================ @app.route('/api/diagrams') def api_diagrams(): """Get all diagrams, optionally filtered by group_id""" try: group_id = request.args.get('group_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, d.thumbnail_path, d.display_order FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id WHERE 1=1 """ params = [] if group_id: query += " AND d.group_id = ?" params.append(group_id) query += " ORDER BY d.display_order, d.name" cursor.execute(query, params) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'thumbnail_path': row['thumbnail_path'], 'display_order': row['display_order'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/diagrams/') def api_diagram_detail(diagram_id): """Get diagram details including SVG content and hotspots""" try: conn = get_db_connection() cursor = conn.cursor() # Get diagram details cursor.execute(""" SELECT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, d.image_path, d.svg_content, d.width, d.height FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id WHERE d.id = ? """, (diagram_id,)) row = cursor.fetchone() if row is None: conn.close() return jsonify({'error': 'Diagram not found'}), 404 diagram = { 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'image_path': row['image_path'], 'svg_content': row['svg_content'], 'width': row['width'], 'height': row['height'], 'hotspots': [] } # Get hotspots with part info cursor.execute(""" SELECT h.id, h.part_id, h.callout_number, h.label, h.shape, h.coords, h.color, p.name AS part_name, p.oem_part_number AS part_number FROM diagram_hotspots h LEFT JOIN parts p ON h.part_id = p.id WHERE h.diagram_id = ? ORDER BY h.callout_number """, (diagram_id,)) for hotspot_row in cursor.fetchall(): diagram['hotspots'].append({ 'id': hotspot_row['id'], 'part_id': hotspot_row['part_id'], 'callout_number': hotspot_row['callout_number'], 'label': hotspot_row['label'], 'shape': hotspot_row['shape'], 'coords': hotspot_row['coords'], 'color': hotspot_row['color'], 'part_name': hotspot_row['part_name'], 'part_number': hotspot_row['part_number'] }) conn.close() return jsonify(diagram) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/groups//diagrams') def api_group_diagrams(group_id): """Get all diagrams for a specific part group""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, name, name_es, thumbnail_path, display_order FROM diagrams WHERE group_id = ? ORDER BY display_order, name """, (group_id,)) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'thumbnail_path': row['thumbnail_path'], 'display_order': row['display_order'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//diagrams') def api_vehicle_diagrams(mye_id): """Get diagrams available for a specific vehicle configuration""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, pc.name AS category_name, d.thumbnail_path, vd.notes FROM vehicle_diagrams vd JOIN diagrams d ON vd.diagram_id = d.id JOIN part_groups pg ON d.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vd.model_year_engine_id = ? ORDER BY pc.display_order, pg.display_order, d.display_order """, (mye_id,)) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'thumbnail_path': row['thumbnail_path'], 'notes': row['notes'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/hotspots/') def api_hotspot_detail(hotspot_id): """Get hotspot details including linked part info""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT h.id, h.diagram_id, h.part_id, h.callout_number, h.label, h.shape, h.coords, h.color FROM diagram_hotspots h WHERE h.id = ? """, (hotspot_id,)) row = cursor.fetchone() if row is None: conn.close() return jsonify({'error': 'Hotspot not found'}), 404 hotspot = { 'id': row['id'], 'diagram_id': row['diagram_id'], 'part_id': row['part_id'], 'callout_number': row['callout_number'], 'label': row['label'], 'shape': row['shape'], 'coords': row['coords'], 'color': row['color'], 'part': None } # Get linked part info if part_id exists if row['part_id']: cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE p.id = ? """, (row['part_id'],)) part_row = cursor.fetchone() if part_row: hotspot['part'] = { 'id': part_row['id'], 'oem_part_number': part_row['oem_part_number'], 'name': part_row['name'], 'name_es': part_row['name_es'], 'group_name': part_row['group_name'], 'category_name': part_row['category_name'] } conn.close() return jsonify(hotspot) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 4: Full-Text Search and VIN Decoder API Endpoints # ============================================================================ import urllib.request import json as json_module import re from datetime import datetime, timedelta def validate_vin(vin): """Validate VIN format: 17 alphanumeric characters, no I, O, Q""" if not vin or len(vin) != 17: return False # VIN can only contain alphanumeric characters except I, O, Q valid_pattern = re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE) return bool(valid_pattern.match(vin)) @app.route('/api/search') def api_search(): """Unified search across parts, cross-references, and aftermarket""" try: q = request.args.get('q', '').strip() search_type = request.args.get('type', 'all') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not q: return jsonify({'error': 'Search query is required'}), 400 conn = get_db_connection() cursor = conn.cursor() results = { 'parts': [], 'vehicles': [], 'total_count': 0 } # Search parts if search_type in ('parts', 'all'): search_term = f"%{q}%" # Search in parts table cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? ORDER BY p.name LIMIT ? OFFSET ? """, (search_term, search_term, search_term, limit, offset)) for row in cursor.fetchall(): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'match_type': 'oem' }) # Also search in aftermarket parts cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pg.name AS group_name, pc.name AS category_name, ap.part_number AS matched_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE ap.part_number LIKE ? LIMIT ? OFFSET ? """, (search_term, limit, offset)) for row in cursor.fetchall(): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'matched_number': row['matched_number'], 'match_type': 'aftermarket' }) # Search in cross-references cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pg.name AS group_name, pc.name AS category_name, pcr.cross_reference_number AS matched_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE pcr.cross_reference_number LIKE ? LIMIT ? OFFSET ? """, (search_term, limit, offset)) for row in cursor.fetchall(): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'matched_number': row['matched_number'], 'match_type': 'cross_reference' }) # Search vehicles if search_type in ('vehicles', 'all'): search_term = f"%{q}%" cursor.execute(""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE b.name LIKE ? OR m.name LIKE ? OR e.name LIKE ? ORDER BY b.name, m.name, y.year LIMIT ? OFFSET ? """, (search_term, search_term, search_term, limit, offset)) for row in cursor.fetchall(): results['vehicles'].append({ 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'] }) results['total_count'] = len(results['parts']) + len(results['vehicles']) conn.close() return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/search/parts') def api_search_parts(): """Full-text search in parts catalog with pagination""" try: q = request.args.get('q', '').strip() category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page if not q: return jsonify({'error': 'Search query is required'}), 400 conn = get_db_connection() cursor = conn.cursor() # Check if FTS5 table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='parts_fts' """) fts_exists = cursor.fetchone() is not None parts = [] total_count = 0 if fts_exists: # Use FTS5 for full-text search # Escape special FTS5 characters and prepare search term fts_query = q.replace('"', '""') # Build filter conditions filter_clause = "" filter_params = [] if category_id: filter_clause += " AND pg.category_id = ?" filter_params.append(category_id) if group_id: filter_clause += " AND p.group_id = ?" filter_params.append(group_id) # Get total count for FTS search count_query = """ SELECT COUNT(*) as total FROM parts_fts JOIN parts p ON parts_fts.rowid = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE parts_fts MATCH ? """ + filter_clause cursor.execute(count_query, [fts_query] + filter_params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, pg.name AS group_name, pc.name AS category_name, bm25(parts_fts) AS rank FROM parts_fts JOIN parts p ON parts_fts.rowid = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE parts_fts MATCH ? """ + filter_clause + " ORDER BY rank LIMIT ? OFFSET ?" cursor.execute(data_query, [fts_query] + filter_params + [per_page, offset]) for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'rank': row['rank'] }) else: # Fallback to LIKE search if FTS5 table doesn't exist search_term = f"%{q}%" # Build filter conditions filter_clause = "" filter_params = [] if category_id: filter_clause += " AND pg.category_id = ?" filter_params.append(category_id) if group_id: filter_clause += " AND p.group_id = ?" filter_params.append(group_id) # Get total count for LIKE search count_query = """ SELECT COUNT(*) as total FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR p.description LIKE ?) """ + filter_clause cursor.execute(count_query, [search_term, search_term, search_term, search_term] + filter_params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR p.description LIKE ?) """ + filter_clause + " ORDER BY p.name LIMIT ? OFFSET ?" cursor.execute(data_query, [search_term, search_term, search_term, search_term] + filter_params + [per_page, offset]) for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'rank': 0 }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin/decode/') def api_vin_decode(vin): """Decode a VIN using NHTSA API with caching""" try: vin = vin.upper().strip() # Validate VIN format if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None cached_data = None if cache_exists: # Check for cached VIN data that hasn't expired cursor.execute(""" SELECT vin, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, created_at, expires_at FROM vin_cache WHERE vin = ? AND expires_at > datetime('now') """, (vin,)) cached_row = cursor.fetchone() if cached_row: # Parse engine_info JSON if it exists engine_info_data = {} if cached_row['engine_info']: try: engine_info_data = json_module.loads(cached_row['engine_info']) except: engine_info_data = {'raw': cached_row['engine_info']} cached_data = { 'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'engine_info': engine_info_data, 'body_class': cached_row['body_class'], 'drive_type': cached_row['drive_type'], 'matched_vehicle': None, 'cached': True } # Get matched vehicle info if available if cached_row['model_year_engine_id']: cursor.execute(""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE mye.id = ? """, (cached_row['model_year_engine_id'],)) mye_row = cursor.fetchone() if mye_row: cached_data['matched_vehicle'] = { 'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine'] } conn.close() return jsonify(cached_data) # Call NHTSA API nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json' try: req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'AutopartesDB/1.0'}) with urllib.request.urlopen(req, timeout=10) as response: nhtsa_data = json_module.loads(response.read().decode('utf-8')) except urllib.error.URLError as e: conn.close() return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503 except urllib.error.HTTPError as e: conn.close() return jsonify({'error': f'NHTSA API error: {e.code}'}), 502 except Exception as e: conn.close() return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500 # Parse NHTSA response results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])} # Extract relevant fields make = results.get('Make', '') model = results.get('Model', '') year_str = results.get('ModelYear', '') year = int(year_str) if year_str and year_str.isdigit() else None engine_config = results.get('EngineConfiguration', '') cylinders_str = results.get('EngineCylinders', '') cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None displacement_str = results.get('DisplacementL', '') displacement_l = float(displacement_str) if displacement_str else None fuel_type = results.get('FuelTypePrimary', '') body_class = results.get('BodyClass', '') drive_type = results.get('DriveType', '') # Try to match to model_year_engine record matched_mye_id = None matched_vehicle = None if make and model and year: cursor.execute(""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE UPPER(b.name) = UPPER(?) AND UPPER(m.name) = UPPER(?) AND y.year = ? LIMIT 1 """, (make, model, year)) mye_row = cursor.fetchone() if mye_row: matched_mye_id = mye_row['id'] matched_vehicle = { 'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine'] } # Store in cache with 30-day expiry if cache_exists: expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S') # Combine engine info into JSON engine_info = json_module.dumps({ 'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type }) cursor.execute(""" INSERT OR REPLACE INTO vin_cache (vin, decoded_data, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (vin, json_module.dumps(results), make, model, year, engine_info, body_class, drive_type, matched_mye_id, expires_at)) conn.commit() result = { 'vin': vin, 'make': make, 'model': model, 'year': year, 'engine_info': { 'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type }, 'body_class': body_class, 'drive_type': drive_type, 'matched_vehicle': matched_vehicle, 'cached': False } conn.close() return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin//parts') def api_vin_parts(vin): """Get parts for a decoded VIN""" try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 category_id = request.args.get('category_id', type=int) conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None if not cache_exists: conn.close() return jsonify({ 'error': 'VIN cache not available. Please decode the VIN first.' }), 400 # Look up VIN in cache cursor.execute(""" SELECT vin, make, model, year, model_year_engine_id FROM vin_cache WHERE vin = ? """, (vin,)) cached_row = cursor.fetchone() if not cached_row: conn.close() return jsonify({ 'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/' }), 404 mye_id = cached_row['model_year_engine_id'] vehicle_info = { 'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'mye_id': mye_id } if not mye_id: conn.close() return jsonify({ 'vin': vin, 'vehicle_info': vehicle_info, 'categories': [], 'message': 'No matching vehicle configuration found in database. Use /api/vin//match to manually link.' }) # Get parts for this vehicle grouped by category query = """ SELECT pc.id AS category_id, pc.name AS category_name, pc.name_es AS category_name_es, p.id AS part_id, p.oem_part_number, p.name AS part_name, p.name_es AS part_name_es, pg.name AS group_name, vp.quantity_required, vp.position FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vp.model_year_engine_id = ? """ params = [mye_id] if category_id: query += " AND pc.id = ?" params.append(category_id) query += " ORDER BY pc.display_order, pg.display_order, p.name" cursor.execute(query, params) # Group parts by category categories_dict = {} for row in cursor.fetchall(): cat_id = row['category_id'] if cat_id not in categories_dict: categories_dict[cat_id] = { 'id': cat_id, 'name': row['category_name'], 'name_es': row['category_name_es'], 'parts': [] } categories_dict[cat_id]['parts'].append({ 'id': row['part_id'], 'oem_part_number': row['oem_part_number'], 'name': row['part_name'], 'name_es': row['part_name_es'], 'group_name': row['group_name'], 'quantity_required': row['quantity_required'], 'position': row['position'] }) conn.close() return jsonify({ 'vin': vin, 'vehicle_info': vehicle_info, 'categories': list(categories_dict.values()) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin//match') def api_vin_match(vin): """Manually match a VIN to a vehicle configuration""" try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 mye_id = request.args.get('mye_id', type=int) if not mye_id: return jsonify({'error': 'mye_id parameter is required'}), 400 conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None if not cache_exists: conn.close() return jsonify({ 'error': 'VIN cache table not available.' }), 400 # Verify the mye_id exists cursor.execute(""" SELECT id FROM model_year_engine WHERE id = ? """, (mye_id,)) if not cursor.fetchone(): conn.close() return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404 # Check if VIN exists in cache cursor.execute(""" SELECT vin FROM vin_cache WHERE vin = ? """, (vin,)) vin_exists = cursor.fetchone() is not None if vin_exists: # Update existing cache entry cursor.execute(""" UPDATE vin_cache SET model_year_engine_id = ? WHERE vin = ? """, (mye_id, vin)) else: # Create new cache entry with minimal info expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" INSERT INTO vin_cache (vin, model_year_engine_id, cached_at, expires_at) VALUES (?, ?, datetime('now'), ?) """, (vin, mye_id, expires_at)) conn.commit() conn.close() return jsonify({ 'success': True, 'vin': vin, 'mye_id': mye_id }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Check if database exists if not os.path.exists(DATABASE_PATH): print(f"Database not found at {DATABASE_PATH}") print("Please make sure the vehicle database is created first.") exit(1) print("Starting Vehicle Dashboard Server...") print("Visit http://localhost:5000 to access the dashboard locally") print("Visit http://192.168.10.198:5000 to access the dashboard from other computers on the network") app.run(debug=True, host='0.0.0.0', port=5000)