from flask import Flask, render_template, jsonify, request, send_from_directory import sqlite3 import os app = Flask(__name__, static_folder='.') # Database path - adjust as needed DATABASE_PATH = '../vehicle_database/vehicle_database.db' def get_db_connection(): """Get a connection to the vehicle database""" conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row # This enables column access by name return conn def get_all_brands(): """Get all unique brands that have vehicles with parts""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT b.name FROM brands b JOIN models m ON m.brand_id = b.id JOIN model_year_engine mye ON mye.model_id = m.id JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id ORDER BY b.name """) brands = [row['name'] for row in cursor.fetchall()] conn.close() return brands def get_all_years(): """Get all unique years from the database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT year FROM years ORDER BY year DESC") years = [row['year'] for row in cursor.fetchall()] conn.close() return years def get_all_engines(): """Get all unique engines from the database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM engines ORDER BY name") engines = [row['name'] for row in cursor.fetchall()] conn.close() return engines def get_models_by_brand(brand_name=None): """Get all models that have vehicles with parts, optionally filtered by brand""" conn = get_db_connection() cursor = conn.cursor() if brand_name: cursor.execute(""" SELECT DISTINCT m.name FROM models m JOIN brands b ON m.brand_id = b.id JOIN model_year_engine mye ON mye.model_id = m.id JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id WHERE UPPER(b.name) = UPPER(?) ORDER BY m.name """, (brand_name,)) else: cursor.execute(""" SELECT DISTINCT m.name FROM models m JOIN model_year_engine mye ON mye.model_id = m.id JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id ORDER BY m.name """) models = [row['name'] for row in cursor.fetchall()] conn.close() return models def search_vehicles(brand=None, model=None, year=None, engine=None, with_parts=True): """Search for vehicles based on filters. By default only returns vehicles with parts.""" conn = get_db_connection() cursor = conn.cursor() query = """ SELECT b.name AS brand, m.name AS model, y.year, e.name AS engine, e.power_hp, e.torque_nm, e.displacement_cc, e.cylinders, e.fuel_type, mye.trim_level, mye.drivetrain, mye.transmission FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id """ # Only show vehicles that have parts if with_parts: query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)" else: query += " WHERE 1=1" params = [] if brand: query += " AND UPPER(b.name) = UPPER(?)" params.append(brand) if model: query += " AND UPPER(m.name) = UPPER(?)" params.append(model) if year: query += " AND y.year = ?" params.append(int(year)) if engine: query += " AND e.name = ?" params.append(engine) query += " ORDER BY b.name, m.name, y.year" cursor.execute(query, params) results = cursor.fetchall() conn.close() # Convert to list of dictionaries vehicles = [] for row in results: vehicle = { 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'], 'power_hp': row['power_hp'] or 0, 'torque_nm': row['torque_nm'] or 0, 'displacement_cc': row['displacement_cc'] or 0, 'cylinders': row['cylinders'] or 0, 'fuel_type': row['fuel_type'] or 'unknown', 'trim_level': row['trim_level'] or 'unknown', 'drivetrain': row['drivetrain'] or 'unknown', 'transmission': row['transmission'] or 'unknown' } vehicles.append(vehicle) return vehicles @app.route('/') def index(): """Serve the main dashboard page""" return send_from_directory('.', 'index.html') @app.route('/admin') def admin_page(): """Serve the admin panel""" return send_from_directory('.', 'admin.html') @app.route('/landing') def landing_page(): """Serve the customer landing page""" return send_from_directory('.', 'customer-landing.html') @app.route('/') def static_files(path): """Serve static files""" return send_from_directory('.', path) @app.route('/api/brands') def api_brands(): """API endpoint to get all brands""" brands = get_all_brands() return jsonify(brands) @app.route('/api/years') def api_years(): """API endpoint to get years, optionally filtered by brand and/or model""" brand = request.args.get('brand') model = request.args.get('model') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT DISTINCT y.year FROM years y JOIN model_year_engine mye ON y.id = mye.year_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id WHERE 1=1 """ params = [] if brand: query += " AND UPPER(b.name) = UPPER(?)" params.append(brand) if model: query += " AND UPPER(m.name) = UPPER(?)" params.append(model) query += " ORDER BY y.year DESC" cursor.execute(query, params) results = cursor.fetchall() conn.close() years = [row['year'] for row in results] return jsonify(years) @app.route('/api/engines') def api_engines(): """API endpoint to get engines, optionally filtered by brand, model, and/or year""" brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT DISTINCT e.name FROM engines e JOIN model_year_engine mye ON e.id = mye.engine_id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id WHERE 1=1 """ params = [] if brand: query += " AND UPPER(b.name) = UPPER(?)" params.append(brand) if model: query += " AND UPPER(m.name) = UPPER(?)" params.append(model) if year: query += " AND y.year = ?" params.append(int(year)) query += " ORDER BY e.name" cursor.execute(query, params) results = cursor.fetchall() conn.close() engines = [row['name'] for row in results] return jsonify(engines) @app.route('/api/models') def api_models(): """API endpoint to get models, optionally filtered by brand""" brand = request.args.get('brand') models = get_models_by_brand(brand) return jsonify(models) @app.route('/api/vehicles') def api_vehicles(): """API endpoint to search for vehicles""" brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') engine = request.args.get('engine') vehicles = search_vehicles(brand, model, year, engine) return jsonify(vehicles) # ============================================================================ # Parts Catalog API Endpoints # ============================================================================ @app.route('/api/categories') def api_categories(): """API endpoint to get all part categories (hierarchical)""" try: conn = get_db_connection() cursor = conn.cursor() # Get all categories cursor.execute(""" SELECT id, name, name_es, slug, icon_name, display_order, parent_id FROM part_categories ORDER BY display_order, name """) all_categories = cursor.fetchall() conn.close() # Build hierarchical structure categories_dict = {} root_categories = [] # First pass: create all category objects for row in all_categories: category = { 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'icon_name': row['icon_name'], 'display_order': row['display_order'], 'children': [] } categories_dict[row['id']] = category if row['parent_id'] is None: root_categories.append(category) # Second pass: build hierarchy for row in all_categories: if row['parent_id'] is not None and row['parent_id'] in categories_dict: categories_dict[row['parent_id']]['children'].append(categories_dict[row['id']]) return jsonify(root_categories) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/categories//groups') def api_category_groups(category_id): """API endpoint to get groups for a specific category""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, name, name_es, slug, display_order FROM part_groups WHERE category_id = ? ORDER BY display_order, name """, (category_id,)) groups = [] for row in cursor.fetchall(): groups.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'display_order': row['display_order'] }) conn.close() return jsonify(groups) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts') def api_parts(): """API endpoint to list parts with optional filters and pagination""" try: group_id = request.args.get('group_id', type=int) category_id = request.args.get('category_id', type=int) search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page conn = get_db_connection() cursor = conn.cursor() # Build base WHERE clause for both count and data queries where_clause = " WHERE 1=1" params = [] if group_id: where_clause += " AND p.group_id = ?" params.append(group_id) if category_id: where_clause += " AND pg.category_id = ?" params.append(category_id) if search: where_clause += " AND (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ?)" search_term = f"%{search}%" params.extend([search_term, search_term, search_term]) # Get total count count_query = """ SELECT COUNT(*) as total FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id """ + where_clause cursor.execute(count_query, params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.group_id, p.image_url, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id """ + where_clause + " ORDER BY p.name LIMIT ? OFFSET ?" params.extend([per_page, offset]) cursor.execute(data_query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'image_url': row['image_url'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts/') def api_part_detail(part_id): """API endpoint to get single part details""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, p.description_es, p.group_id, p.image_url, pg.name AS group_name, pg.name_es AS group_name_es, pc.id AS category_id, pc.name AS category_name, pc.name_es AS category_name_es FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE p.id = ? """, (part_id,)) row = cursor.fetchone() conn.close() if row is None: return jsonify({'error': 'Part not found'}), 404 part = { 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'description_es': row['description_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'image_url': row['image_url'], 'group_name_es': row['group_name_es'], 'category_id': row['category_id'], 'category_name': row['category_name'], 'category_name_es': row['category_name_es'] } return jsonify(part) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//categories') def api_vehicle_categories(mye_id): """API endpoint to get categories that have parts for a specific vehicle""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT pc.id, pc.name, pc.name_es, pc.slug, pc.icon_name, pc.display_order FROM part_categories pc JOIN part_groups pg ON pg.category_id = pc.id JOIN parts p ON p.group_id = pg.id JOIN vehicle_parts vp ON vp.part_id = p.id WHERE vp.model_year_engine_id = ? ORDER BY pc.display_order, pc.name """, (mye_id,)) categories = [] for row in cursor.fetchall(): categories.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'icon_name': row['icon_name'], 'display_order': row['display_order'] }) conn.close() return jsonify(categories) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//groups') def api_vehicle_groups(mye_id): """API endpoint to get groups that have parts for a specific vehicle within a category""" try: category_id = request.args.get('category_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT DISTINCT pg.id, pg.name, pg.name_es, pg.slug, pg.display_order, COUNT(DISTINCT p.id) as parts_count FROM part_groups pg JOIN parts p ON p.group_id = pg.id JOIN vehicle_parts vp ON vp.part_id = p.id WHERE vp.model_year_engine_id = ? """ params = [mye_id] if category_id: query += " AND pg.category_id = ?" params.append(category_id) query += " GROUP BY pg.id ORDER BY pg.display_order, pg.name" cursor.execute(query, params) groups = [] for row in cursor.fetchall(): groups.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'slug': row['slug'], 'display_order': row['display_order'], 'parts_count': row['parts_count'] }) conn.close() return jsonify(groups) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//parts') def api_vehicle_parts(mye_id): """API endpoint to get parts for a specific vehicle""" try: category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, vp.quantity_required, vp.position, pc.name AS category_name, pg.name AS group_name FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vp.model_year_engine_id = ? """ params = [mye_id] if category_id: query += " AND pc.id = ?" params.append(category_id) if group_id: query += " AND pg.id = ?" params.append(group_id) query += " ORDER BY pc.display_order, pg.display_order, p.name" cursor.execute(query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'quantity_required': row['quantity_required'], 'position': row['position'], 'category_name': row['category_name'], 'group_name': row['group_name'] }) conn.close() return jsonify(parts) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/model-year-engine') def api_model_year_engine(): """API endpoint to get model_year_engine records with filters""" try: brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year', type=int) with_parts = request.args.get('with_parts', 'true').lower() == 'true' conn = get_db_connection() cursor = conn.cursor() query = """ SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine, mye.trim_level, mye.drivetrain, mye.transmission FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id """ # Only show vehicles that have parts if with_parts: query += " WHERE EXISTS (SELECT 1 FROM vehicle_parts vp WHERE vp.model_year_engine_id = mye.id)" else: query += " WHERE 1=1" params = [] if brand: query += " AND UPPER(b.name) = UPPER(?)" params.append(brand) if model: query += " AND UPPER(m.name) = UPPER(?)" params.append(model) if year: query += " AND y.year = ?" params.append(year) query += " ORDER BY b.name, m.name, y.year, e.name" cursor.execute(query, params) records = [] for row in cursor.fetchall(): records.append({ 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'], 'trim_level': row['trim_level'], 'drivetrain': row['drivetrain'], 'transmission': row['transmission'] }) conn.close() return jsonify(records) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 2: Cross-References and Aftermarket API Endpoints # ============================================================================ @app.route('/api/manufacturers') def api_manufacturers(): """Get all manufacturers, optionally filtered by type""" try: manufacturer_type = request.args.get('type') quality_tier = request.args.get('quality_tier') conn = get_db_connection() cursor = conn.cursor() query = """ SELECT id, name, type, quality_tier, country, logo_url, website FROM manufacturers WHERE 1=1 """ params = [] if manufacturer_type: query += " AND type = ?" params.append(manufacturer_type) if quality_tier: query += " AND quality_tier = ?" params.append(quality_tier) query += " ORDER BY name" cursor.execute(query, params) manufacturers = [] for row in cursor.fetchall(): manufacturers.append({ 'id': row['id'], 'name': row['name'], 'type': row['type'], 'quality_tier': row['quality_tier'], 'country': row['country'], 'logo_url': row['logo_url'], 'website': row['website'] }) conn.close() return jsonify(manufacturers) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts//alternatives') def api_part_alternatives(part_id): """Get aftermarket alternatives for an OEM part""" try: quality_tier = request.args.get('quality_tier') manufacturer_id = request.args.get('manufacturer_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT ap.id, ap.part_number, ap.name, ap.name_es, m.name AS manufacturer_name, ap.manufacturer_id, ap.quality_tier, ap.price_usd, ap.warranty_months, ap.in_stock FROM aftermarket_parts ap JOIN manufacturers m ON ap.manufacturer_id = m.id WHERE ap.oem_part_id = ? """ params = [part_id] if quality_tier: query += " AND ap.quality_tier = ?" params.append(quality_tier) if manufacturer_id: query += " AND ap.manufacturer_id = ?" params.append(manufacturer_id) query += " ORDER BY ap.quality_tier DESC, ap.price_usd ASC" cursor.execute(query, params) alternatives = [] for row in cursor.fetchall(): alternatives.append({ 'id': row['id'], 'part_number': row['part_number'], 'name': row['name'], 'name_es': row['name_es'], 'manufacturer_name': row['manufacturer_name'], 'manufacturer_id': row['manufacturer_id'], 'quality_tier': row['quality_tier'], 'price_usd': row['price_usd'], 'warranty_months': row['warranty_months'], 'in_stock': bool(row['in_stock']) if row['in_stock'] is not None else None }) conn.close() return jsonify(alternatives) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts//cross-references') def api_part_cross_references(part_id): """Get cross-reference numbers for a part""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, cross_reference_number, reference_type, source, notes FROM part_cross_references WHERE part_id = ? ORDER BY reference_type, cross_reference_number """, (part_id,)) cross_references = [] for row in cursor.fetchall(): cross_references.append({ 'id': row['id'], 'cross_reference_number': row['cross_reference_number'], 'reference_type': row['reference_type'], 'source': row['source'], 'notes': row['notes'] }) conn.close() return jsonify(cross_references) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/search/part-number/') def api_search_part_number(part_number): """Search for parts by any part number (OEM, aftermarket, or cross-ref)""" try: conn = get_db_connection() cursor = conn.cursor() results = [] search_term = f"%{part_number}%" # Search in OEM parts cursor.execute(""" SELECT id, oem_part_number, name, name_es FROM parts WHERE oem_part_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'oem', 'matched_number': row['oem_part_number'] }) # Search in aftermarket parts cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, ap.part_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id WHERE ap.part_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'aftermarket', 'matched_number': row['part_number'] }) # Search in cross-references cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pcr.cross_reference_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id WHERE pcr.cross_reference_number LIKE ? """, (search_term,)) for row in cursor.fetchall(): results.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'cross_reference', 'matched_number': row['cross_reference_number'] }) conn.close() return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/aftermarket') def api_aftermarket_parts(): """List aftermarket parts with filters and pagination""" try: manufacturer_id = request.args.get('manufacturer_id', type=int) quality_tier = request.args.get('quality_tier') search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page conn = get_db_connection() cursor = conn.cursor() # Build base WHERE clause for both count and data queries where_clause = " WHERE 1=1" params = [] if manufacturer_id: where_clause += " AND ap.manufacturer_id = ?" params.append(manufacturer_id) if quality_tier: where_clause += " AND ap.quality_tier = ?" params.append(quality_tier) if search: where_clause += " AND (ap.name LIKE ? OR ap.part_number LIKE ? OR p.oem_part_number LIKE ?)" search_term = f"%{search}%" params.extend([search_term, search_term, search_term]) # Get total count count_query = """ SELECT COUNT(*) as total FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN manufacturers m ON ap.manufacturer_id = m.id """ + where_clause cursor.execute(count_query, params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT ap.id, ap.part_number, ap.name, p.oem_part_number, m.name AS manufacturer_name, ap.quality_tier, ap.price_usd FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN manufacturers m ON ap.manufacturer_id = m.id """ + where_clause + " ORDER BY ap.name LIMIT ? OFFSET ?" params.extend([per_page, offset]) cursor.execute(data_query, params) parts = [] for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'part_number': row['part_number'], 'name': row['name'], 'oem_part_number': row['oem_part_number'], 'manufacturer_name': row['manufacturer_name'], 'quality_tier': row['quality_tier'], 'price_usd': row['price_usd'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 3: Exploded Diagrams API Endpoints # ============================================================================ @app.route('/api/diagrams') def api_diagrams(): """Get all diagrams, optionally filtered by group_id""" try: group_id = request.args.get('group_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, d.thumbnail_path, d.display_order FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id WHERE 1=1 """ params = [] if group_id: query += " AND d.group_id = ?" params.append(group_id) query += " ORDER BY d.display_order, d.name" cursor.execute(query, params) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'thumbnail_path': row['thumbnail_path'], 'display_order': row['display_order'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/diagrams/') def api_diagram_detail(diagram_id): """Get diagram details including SVG content and hotspots""" try: conn = get_db_connection() cursor = conn.cursor() # Get diagram details cursor.execute(""" SELECT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, d.image_path, d.svg_content, d.width, d.height FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id WHERE d.id = ? """, (diagram_id,)) row = cursor.fetchone() if row is None: conn.close() return jsonify({'error': 'Diagram not found'}), 404 diagram = { 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'image_path': row['image_path'], 'svg_content': row['svg_content'], 'width': row['width'], 'height': row['height'], 'hotspots': [] } # Get hotspots with part info cursor.execute(""" SELECT h.id, h.part_id, h.callout_number, h.label, h.shape, h.coords, h.color, p.name AS part_name, p.oem_part_number AS part_number FROM diagram_hotspots h LEFT JOIN parts p ON h.part_id = p.id WHERE h.diagram_id = ? ORDER BY h.callout_number """, (diagram_id,)) for hotspot_row in cursor.fetchall(): diagram['hotspots'].append({ 'id': hotspot_row['id'], 'part_id': hotspot_row['part_id'], 'callout_number': hotspot_row['callout_number'], 'label': hotspot_row['label'], 'shape': hotspot_row['shape'], 'coords': hotspot_row['coords'], 'color': hotspot_row['color'], 'part_name': hotspot_row['part_name'], 'part_number': hotspot_row['part_number'] }) conn.close() return jsonify(diagram) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/diagrams//hotspots') def api_diagram_hotspots(diagram_id): """Get all hotspots for a specific diagram""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT h.id, h.part_id, h.callout_number, h.label, h.shape, h.coords, h.color, p.name AS part_name, p.oem_part_number AS part_number FROM diagram_hotspots h LEFT JOIN parts p ON h.part_id = p.id WHERE h.diagram_id = ? ORDER BY h.callout_number """, (diagram_id,)) hotspots = [] for row in cursor.fetchall(): hotspots.append({ 'id': row['id'], 'part_id': row['part_id'], 'callout_number': row['callout_number'], 'label': row['label'], 'shape': row['shape'], 'coords': row['coords'], 'color': row['color'], 'part_name': row['part_name'], 'part_number': row['part_number'] }) conn.close() return jsonify(hotspots) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/groups//diagrams') def api_group_diagrams(group_id): """Get all diagrams for a specific part group""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT id, name, name_es, thumbnail_path, display_order FROM diagrams WHERE group_id = ? ORDER BY display_order, name """, (group_id,)) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'thumbnail_path': row['thumbnail_path'], 'display_order': row['display_order'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vehicles//diagrams') def api_vehicle_diagrams(mye_id): """Get diagrams available for a specific vehicle configuration""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT DISTINCT d.id, d.name, d.name_es, d.group_id, pg.name AS group_name, pc.name AS category_name, d.thumbnail_path, vd.notes FROM vehicle_diagrams vd JOIN diagrams d ON vd.diagram_id = d.id JOIN part_groups pg ON d.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vd.model_year_engine_id = ? ORDER BY pc.display_order, pg.display_order, d.display_order """, (mye_id,)) diagrams = [] for row in cursor.fetchall(): diagrams.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'thumbnail_path': row['thumbnail_path'], 'notes': row['notes'] }) conn.close() return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/hotspots/') def api_hotspot_detail(hotspot_id): """Get hotspot details including linked part info""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT h.id, h.diagram_id, h.part_id, h.callout_number, h.label, h.shape, h.coords, h.color FROM diagram_hotspots h WHERE h.id = ? """, (hotspot_id,)) row = cursor.fetchone() if row is None: conn.close() return jsonify({'error': 'Hotspot not found'}), 404 hotspot = { 'id': row['id'], 'diagram_id': row['diagram_id'], 'part_id': row['part_id'], 'callout_number': row['callout_number'], 'label': row['label'], 'shape': row['shape'], 'coords': row['coords'], 'color': row['color'], 'part': None } # Get linked part info if part_id exists if row['part_id']: cursor.execute(""" SELECT p.id, p.oem_part_number, p.name, p.name_es, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE p.id = ? """, (row['part_id'],)) part_row = cursor.fetchone() if part_row: hotspot['part'] = { 'id': part_row['id'], 'oem_part_number': part_row['oem_part_number'], 'name': part_row['name'], 'name_es': part_row['name_es'], 'group_name': part_row['group_name'], 'category_name': part_row['category_name'] } conn.close() return jsonify(hotspot) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # FASE 4: Full-Text Search and VIN Decoder API Endpoints # ============================================================================ import urllib.request import json as json_module import re from datetime import datetime, timedelta def validate_vin(vin): """Validate VIN format: 17 alphanumeric characters, no I, O, Q""" if not vin or len(vin) != 17: return False # VIN can only contain alphanumeric characters except I, O, Q valid_pattern = re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE) return bool(valid_pattern.match(vin)) def find_vehicle_in_terms(cursor, terms): """ Try to find a vehicle match in the search terms. Returns (matched_vehicle, remaining_terms) or (None, terms) if no match. Strategy: Try different combinations of terms to find a vehicle match. - First try all terms that could be vehicle-related (brand, model, year) - Return the best match and remaining terms for part search """ if len(terms) < 2: return None, terms # Identify potential year terms (4-digit numbers between 1980-2030) year_terms = [] other_terms = [] for term in terms: if term.isdigit() and 1980 <= int(term) <= 2030: year_terms.append(term) else: other_terms.append(term) # Try to find a vehicle with the non-year terms + year # We need at least one non-year term and preferably a year if not other_terms: return None, terms # Build query to find matching vehicle # Try with different combinations best_match = None used_terms = [] for num_terms in range(min(3, len(other_terms)), 0, -1): if best_match: break # Try combinations of other_terms for i in range(len(other_terms) - num_terms + 1): test_terms = other_terms[i:i + num_terms] if year_terms: test_terms = test_terms + year_terms[:1] # Add first year where_clauses = [] params = [] for term in test_terms: term_pattern = f"%{term}%" where_clauses.append( "(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ?)" ) params.extend([term_pattern, term_pattern, term_pattern]) where_sql = " AND ".join(where_clauses) cursor.execute(f""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE {where_sql} ORDER BY y.year DESC LIMIT 1 """, params) row = cursor.fetchone() if row: best_match = { 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'] } used_terms = test_terms break if best_match: # Calculate remaining terms (terms not used for vehicle match) remaining = [] used_lower = [t.lower() for t in used_terms] for term in terms: if term.lower() not in used_lower: remaining.append(term) else: # Remove from used_lower to handle duplicates used_lower.remove(term.lower()) return best_match, remaining return None, terms @app.route('/api/search') def api_search(): """Unified search across parts, cross-references, and aftermarket""" try: q = request.args.get('q', '').strip() search_type = request.args.get('type', 'all') category_id = request.args.get('category_id', type=int) limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not q: return jsonify({'error': 'Search query is required'}), 400 conn = get_db_connection() cursor = conn.cursor() results = { 'parts': [], 'vehicles': [], 'vehicle_parts': [], # Parts for a specific vehicle 'matched_vehicle': None, # Vehicle matched in combined search 'total_count': 0 } terms = q.split() # Try to detect combined vehicle + part search # Look for patterns like "aveo 2024 balata" or "camry brake pad" if len(terms) >= 2 and search_type == 'all': matched_vehicle, remaining_terms = find_vehicle_in_terms(cursor, terms) if matched_vehicle and remaining_terms: results['matched_vehicle'] = matched_vehicle # Search for parts compatible with this vehicle (all engine variants) part_terms = remaining_terms if part_terms: vp_where_clauses = [] # Search across ALL MYE IDs for same brand/model/year (all engine variants) vp_params = [matched_vehicle['brand'], matched_vehicle['model'], matched_vehicle['year']] for term in part_terms: term_pattern = f"%{term}%" vp_where_clauses.append( "(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ?)" ) vp_params.extend([term_pattern, term_pattern, term_pattern, term_pattern]) vp_where_sql = " AND ".join(vp_where_clauses) vp_params.extend([limit]) cursor.execute(f""" SELECT DISTINCT p.id, p.oem_part_number, p.name, p.name_es, p.image_url, pg.name AS group_name, pg.id AS group_id, pc.name AS category_name, pc.id AS category_id, vp.quantity_required, vp.position FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vp.model_year_engine_id IN ( SELECT mye.id FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id WHERE UPPER(b.name) = UPPER(?) AND UPPER(m.name) = UPPER(?) AND y.year = ? ) AND ({vp_where_sql}) ORDER BY p.name LIMIT ? """, vp_params) for row in cursor.fetchall(): results['vehicle_parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'image_url': row['image_url'], 'group_name': row['group_name'], 'group_id': row['group_id'], 'category_name': row['category_name'], 'category_id': row['category_id'], 'quantity': row['quantity_required'], 'position': row['position'], 'match_type': 'vehicle_part' }) # If we found vehicle parts, return early with combined results if results['vehicle_parts']: results['total_count'] = len(results['vehicle_parts']) conn.close() return jsonify(results) # Search parts if search_type in ('parts', 'all'): # Split query into terms for multi-word search terms = q.split() # Build category filter category_filter = "" category_params = [] if category_id: category_filter = " AND pc.id = ?" category_params = [category_id] if terms: # Build WHERE clause: each term must match at least one field where_clauses = [] params = [] for term in terms: term_pattern = f"%{term}%" where_clauses.append( "(p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR pg.name LIKE ? OR pc.name LIKE ?)" ) params.extend([term_pattern, term_pattern, term_pattern, term_pattern, term_pattern]) where_sql = " AND ".join(where_clauses) # For ordering, use the first term first_term = terms[0] first_term_pattern = f"{first_term}%" cursor.execute(f""" SELECT p.id, p.oem_part_number, p.name, p.name_es, p.image_url, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE ({where_sql}){category_filter} ORDER BY CASE WHEN p.oem_part_number LIKE ? THEN 1 WHEN p.name LIKE ? THEN 2 ELSE 3 END, p.name LIMIT ? OFFSET ? """, params + category_params + [first_term_pattern, first_term_pattern, limit, offset]) for row in cursor.fetchall(): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'image_url': row['image_url'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'match_type': 'oem' }) # Also search in aftermarket parts (only if no category filter) if not category_id and terms: # Build WHERE for aftermarket: match part numbers af_where_clauses = [] af_params = [] for term in terms: af_where_clauses.append("ap.part_number LIKE ?") af_params.append(f"%{term}%") af_where_sql = " AND ".join(af_where_clauses) af_params.extend([limit, offset]) cursor.execute(f""" SELECT p.id, p.oem_part_number, p.name, p.name_es, p.image_url, pg.name AS group_name, pc.name AS category_name, ap.part_number AS matched_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE {af_where_sql} LIMIT ? OFFSET ? """, af_params) for row in cursor.fetchall(): # Avoid duplicates if not any(p['id'] == row['id'] for p in results['parts']): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'image_url': row['image_url'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'matched_number': row['matched_number'], 'match_type': 'aftermarket' }) # Search in cross-references if terms: cr_where_clauses = [] cr_params = [] for term in terms: cr_where_clauses.append("pcr.cross_reference_number LIKE ?") cr_params.append(f"%{term}%") cr_where_sql = " AND ".join(cr_where_clauses) cr_params.extend([limit, offset]) cursor.execute(f""" SELECT p.id, p.oem_part_number, p.name, p.name_es, p.image_url, pg.name AS group_name, pc.name AS category_name, pcr.cross_reference_number AS matched_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE {cr_where_sql} LIMIT ? OFFSET ? """, cr_params) for row in cursor.fetchall(): # Avoid duplicates if not any(p['id'] == row['id'] for p in results['parts']): results['parts'].append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'image_url': row['image_url'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'matched_number': row['matched_number'], 'match_type': 'cross_reference' }) # Search vehicles if search_type in ('vehicles', 'all'): # Split query into terms for multi-word search terms = q.split() if terms: # Build WHERE clause: each term must match at least one field where_clauses = [] params = [] for term in terms: term_pattern = f"%{term}%" # Each term can match brand, model, year, or engine where_clauses.append( "(b.name LIKE ? OR m.name LIKE ? OR CAST(y.year AS TEXT) LIKE ? OR e.name LIKE ?)" ) params.extend([term_pattern, term_pattern, term_pattern, term_pattern]) # All terms must match (AND between terms) where_sql = " AND ".join(where_clauses) params.extend([limit, offset]) cursor.execute(f""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE {where_sql} ORDER BY y.year DESC, b.name, m.name LIMIT ? OFFSET ? """, params) for row in cursor.fetchall(): results['vehicles'].append({ 'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'] }) results['total_count'] = len(results['parts']) + len(results['vehicles']) conn.close() return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/search/parts') def api_search_parts(): """Full-text search in parts catalog with pagination""" try: q = request.args.get('q', '').strip() category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) # Max 100 per page offset = (page - 1) * per_page if not q: return jsonify({'error': 'Search query is required'}), 400 conn = get_db_connection() cursor = conn.cursor() # Check if FTS5 table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='parts_fts' """) fts_exists = cursor.fetchone() is not None parts = [] total_count = 0 if fts_exists: # Use FTS5 for full-text search # Escape special FTS5 characters (-, *, ^, etc.) by quoting each term terms = q.split() quoted_terms = [] for term in terms: # Escape double quotes and wrap in quotes to prevent FTS operators escaped_term = term.replace('"', '""') quoted_terms.append(f'"{escaped_term}"') fts_query = ' '.join(quoted_terms) # Build filter conditions filter_clause = "" filter_params = [] if category_id: filter_clause += " AND pg.category_id = ?" filter_params.append(category_id) if group_id: filter_clause += " AND p.group_id = ?" filter_params.append(group_id) # Get total count for FTS search count_query = """ SELECT COUNT(*) as total FROM parts_fts JOIN parts p ON parts_fts.rowid = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE parts_fts MATCH ? """ + filter_clause cursor.execute(count_query, [fts_query] + filter_params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, pg.name AS group_name, pc.name AS category_name, bm25(parts_fts) AS rank FROM parts_fts JOIN parts p ON parts_fts.rowid = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE parts_fts MATCH ? """ + filter_clause + " ORDER BY rank LIMIT ? OFFSET ?" cursor.execute(data_query, [fts_query] + filter_params + [per_page, offset]) for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'rank': row['rank'] }) else: # Fallback to LIKE search if FTS5 table doesn't exist search_term = f"%{q}%" # Build filter conditions filter_clause = "" filter_params = [] if category_id: filter_clause += " AND pg.category_id = ?" filter_params.append(category_id) if group_id: filter_clause += " AND p.group_id = ?" filter_params.append(group_id) # Get total count for LIKE search count_query = """ SELECT COUNT(*) as total FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR p.description LIKE ?) """ + filter_clause cursor.execute(count_query, [search_term, search_term, search_term, search_term] + filter_params) total_count = cursor.fetchone()['total'] # Get paginated data data_query = """ SELECT p.id, p.oem_part_number, p.name, p.name_es, p.description, pg.name AS group_name, pc.name AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE (p.name LIKE ? OR p.name_es LIKE ? OR p.oem_part_number LIKE ? OR p.description LIKE ?) """ + filter_clause + " ORDER BY p.name LIMIT ? OFFSET ?" cursor.execute(data_query, [search_term, search_term, search_term, search_term] + filter_params + [per_page, offset]) for row in cursor.fetchall(): parts.append({ 'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'group_name': row['group_name'], 'category_name': row['category_name'], 'rank': 0 }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': parts, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin/decode/') def api_vin_decode(vin): """Decode a VIN using NHTSA API with caching""" try: vin = vin.upper().strip() # Validate VIN format if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None cached_data = None if cache_exists: # Check for cached VIN data that hasn't expired cursor.execute(""" SELECT vin, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, created_at, expires_at FROM vin_cache WHERE vin = ? AND expires_at > datetime('now') """, (vin,)) cached_row = cursor.fetchone() if cached_row: # Parse engine_info JSON if it exists engine_info_data = {} if cached_row['engine_info']: try: engine_info_data = json_module.loads(cached_row['engine_info']) except: engine_info_data = {'raw': cached_row['engine_info']} cached_data = { 'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'engine_info': engine_info_data, 'body_class': cached_row['body_class'], 'drive_type': cached_row['drive_type'], 'matched_vehicle': None, 'cached': True } # Get matched vehicle info if available if cached_row['model_year_engine_id']: cursor.execute(""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE mye.id = ? """, (cached_row['model_year_engine_id'],)) mye_row = cursor.fetchone() if mye_row: cached_data['matched_vehicle'] = { 'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine'] } conn.close() return jsonify(cached_data) # Call NHTSA API nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json' try: req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'AutopartesDB/1.0'}) with urllib.request.urlopen(req, timeout=10) as response: nhtsa_data = json_module.loads(response.read().decode('utf-8')) except urllib.error.URLError as e: conn.close() return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503 except urllib.error.HTTPError as e: conn.close() return jsonify({'error': f'NHTSA API error: {e.code}'}), 502 except Exception as e: conn.close() return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500 # Parse NHTSA response results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])} # Extract relevant fields make = results.get('Make', '') model = results.get('Model', '') year_str = results.get('ModelYear', '') year = int(year_str) if year_str and year_str.isdigit() else None engine_config = results.get('EngineConfiguration', '') cylinders_str = results.get('EngineCylinders', '') cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None displacement_str = results.get('DisplacementL', '') displacement_l = float(displacement_str) if displacement_str else None fuel_type = results.get('FuelTypePrimary', '') body_class = results.get('BodyClass', '') drive_type = results.get('DriveType', '') # Try to match to model_year_engine record matched_mye_id = None matched_vehicle = None if make and model and year: cursor.execute(""" SELECT mye.id, b.name AS brand, m.name AS model, y.year, e.name AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id WHERE UPPER(b.name) = UPPER(?) AND UPPER(m.name) = UPPER(?) AND y.year = ? LIMIT 1 """, (make, model, year)) mye_row = cursor.fetchone() if mye_row: matched_mye_id = mye_row['id'] matched_vehicle = { 'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine'] } # Store in cache with 30-day expiry if cache_exists: expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S') # Combine engine info into JSON engine_info = json_module.dumps({ 'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type }) cursor.execute(""" INSERT OR REPLACE INTO vin_cache (vin, decoded_data, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (vin, json_module.dumps(results), make, model, year, engine_info, body_class, drive_type, matched_mye_id, expires_at)) conn.commit() result = { 'vin': vin, 'make': make, 'model': model, 'year': year, 'engine_info': { 'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type }, 'body_class': body_class, 'drive_type': drive_type, 'matched_vehicle': matched_vehicle, 'cached': False } conn.close() return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin//parts') def api_vin_parts(vin): """Get parts for a decoded VIN""" try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 category_id = request.args.get('category_id', type=int) conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None if not cache_exists: conn.close() return jsonify({ 'error': 'VIN cache not available. Please decode the VIN first.' }), 400 # Look up VIN in cache cursor.execute(""" SELECT vin, make, model, year, model_year_engine_id FROM vin_cache WHERE vin = ? """, (vin,)) cached_row = cursor.fetchone() if not cached_row: conn.close() return jsonify({ 'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/' }), 404 mye_id = cached_row['model_year_engine_id'] vehicle_info = { 'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'mye_id': mye_id } if not mye_id: conn.close() return jsonify({ 'vin': vin, 'vehicle_info': vehicle_info, 'categories': [], 'message': 'No matching vehicle configuration found in database. Use /api/vin//match to manually link.' }) # Get parts for this vehicle grouped by category query = """ SELECT pc.id AS category_id, pc.name AS category_name, pc.name_es AS category_name_es, p.id AS part_id, p.oem_part_number, p.name AS part_name, p.name_es AS part_name_es, pg.name AS group_name, vp.quantity_required, vp.position FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id JOIN part_groups pg ON p.group_id = pg.id JOIN part_categories pc ON pg.category_id = pc.id WHERE vp.model_year_engine_id = ? """ params = [mye_id] if category_id: query += " AND pc.id = ?" params.append(category_id) query += " ORDER BY pc.display_order, pg.display_order, p.name" cursor.execute(query, params) # Group parts by category categories_dict = {} for row in cursor.fetchall(): cat_id = row['category_id'] if cat_id not in categories_dict: categories_dict[cat_id] = { 'id': cat_id, 'name': row['category_name'], 'name_es': row['category_name_es'], 'parts': [] } categories_dict[cat_id]['parts'].append({ 'id': row['part_id'], 'oem_part_number': row['oem_part_number'], 'name': row['part_name'], 'name_es': row['part_name_es'], 'group_name': row['group_name'], 'quantity_required': row['quantity_required'], 'position': row['position'] }) conn.close() return jsonify({ 'vin': vin, 'vehicle_info': vehicle_info, 'categories': list(categories_dict.values()) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/vin//match') def api_vin_match(vin): """Manually match a VIN to a vehicle configuration""" try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({ 'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).' }), 400 mye_id = request.args.get('mye_id', type=int) if not mye_id: return jsonify({'error': 'mye_id parameter is required'}), 400 conn = get_db_connection() cursor = conn.cursor() # Check if vin_cache table exists cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='vin_cache' """) cache_exists = cursor.fetchone() is not None if not cache_exists: conn.close() return jsonify({ 'error': 'VIN cache table not available.' }), 400 # Verify the mye_id exists cursor.execute(""" SELECT id FROM model_year_engine WHERE id = ? """, (mye_id,)) if not cursor.fetchone(): conn.close() return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404 # Check if VIN exists in cache cursor.execute(""" SELECT vin FROM vin_cache WHERE vin = ? """, (vin,)) vin_exists = cursor.fetchone() is not None if vin_exists: # Update existing cache entry cursor.execute(""" UPDATE vin_cache SET model_year_engine_id = ? WHERE vin = ? """, (mye_id, vin)) else: # Create new cache entry with minimal info expires_at = (datetime.now() + timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(""" INSERT INTO vin_cache (vin, model_year_engine_id, cached_at, expires_at) VALUES (?, ?, datetime('now'), ?) """, (vin, mye_id, expires_at)) conn.commit() conn.close() return jsonify({ 'success': True, 'vin': vin, 'mye_id': mye_id }) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # ADMIN API ENDPOINTS - CRUD Operations # ============================================================================ @app.route('/api/admin/stats') def api_admin_stats(): """Get statistics for admin dashboard""" try: conn = get_db_connection() cursor = conn.cursor() stats = {} # Count categories cursor.execute("SELECT COUNT(*) FROM part_categories") stats['categories'] = cursor.fetchone()[0] # Count groups cursor.execute("SELECT COUNT(*) FROM part_groups") stats['groups'] = cursor.fetchone()[0] # Count parts cursor.execute("SELECT COUNT(*) FROM parts") stats['parts'] = cursor.fetchone()[0] # Count aftermarket parts cursor.execute("SELECT COUNT(*) FROM aftermarket_parts") stats['aftermarket'] = cursor.fetchone()[0] # Count manufacturers cursor.execute("SELECT COUNT(*) FROM manufacturers") stats['manufacturers'] = cursor.fetchone()[0] # Count fitments cursor.execute("SELECT COUNT(*) FROM vehicle_parts") stats['fitment'] = cursor.fetchone()[0] conn.close() return jsonify(stats) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Categories CRUD ---- @app.route('/api/admin/categories', methods=['POST']) def api_admin_create_category(): """Create a new category""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO part_categories (name, name_es, slug, icon_name, display_order, parent_id) VALUES (?, ?, ?, ?, ?, ?) """, ( data['name'], data.get('name_es'), data.get('slug') or data['name'].lower().replace(' ', '-'), data.get('icon_name'), data.get('display_order', 0), data.get('parent_id') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Category created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/categories/', methods=['PUT']) def api_admin_update_category(category_id): """Update a category""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE part_categories SET name = ?, name_es = ?, slug = ?, icon_name = ?, display_order = ? WHERE id = ? """, ( data['name'], data.get('name_es'), data.get('slug'), data.get('icon_name'), data.get('display_order', 0), category_id )) conn.commit() conn.close() return jsonify({'message': 'Category updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/categories/', methods=['DELETE']) def api_admin_delete_category(category_id): """Delete a category""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM part_categories WHERE id = ?", (category_id,)) conn.commit() conn.close() return jsonify({'message': 'Category deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Groups CRUD ---- @app.route('/api/admin/groups') def api_admin_list_groups(): """List all groups with category info""" try: category_id = request.args.get('category_id', type=int) conn = get_db_connection() cursor = conn.cursor() query = """ SELECT pg.id, pg.name, pg.name_es, pg.category_id, pg.display_order, pg.slug, pc.name AS category_name FROM part_groups pg LEFT JOIN part_categories pc ON pg.category_id = pc.id WHERE 1=1 """ params = [] if category_id: query += " AND pg.category_id = ?" params.append(category_id) query += " ORDER BY pg.display_order, pg.name" cursor.execute(query, params) groups = [] for row in cursor.fetchall(): groups.append({ 'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'category_id': row['category_id'], 'category_name': row['category_name'], 'display_order': row['display_order'], 'slug': row['slug'] }) conn.close() return jsonify(groups) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/groups', methods=['POST']) def api_admin_create_group(): """Create a new group""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO part_groups (category_id, name, name_es, slug, display_order) VALUES (?, ?, ?, ?, ?) """, ( data['category_id'], data['name'], data.get('name_es'), data.get('slug') or data['name'].lower().replace(' ', '-'), data.get('display_order', 0) )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Group created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/groups/', methods=['PUT']) def api_admin_update_group(group_id): """Update a group""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE part_groups SET category_id = ?, name = ?, name_es = ?, display_order = ? WHERE id = ? """, ( data['category_id'], data['name'], data.get('name_es'), data.get('display_order', 0), group_id )) conn.commit() conn.close() return jsonify({'message': 'Group updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/groups/', methods=['DELETE']) def api_admin_delete_group(group_id): """Delete a group""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM part_groups WHERE id = ?", (group_id,)) conn.commit() conn.close() return jsonify({'message': 'Group deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Parts CRUD ---- @app.route('/api/admin/parts', methods=['POST']) def api_admin_create_part(): """Create a new OEM part""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material, image_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data['oem_part_number'], data['name'], data.get('name_es'), data['group_id'], data.get('description'), data.get('description_es'), data.get('weight_kg'), data.get('material'), data.get('image_url') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Part created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/parts/', methods=['PUT']) def api_admin_update_part(part_id): """Update an OEM part""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE parts SET oem_part_number = ?, name = ?, name_es = ?, group_id = ?, description = ?, description_es = ?, weight_kg = ?, material = ?, image_url = ? WHERE id = ? """, ( data['oem_part_number'], data['name'], data.get('name_es'), data['group_id'], data.get('description'), data.get('description_es'), data.get('weight_kg'), data.get('material'), data.get('image_url'), part_id )) conn.commit() conn.close() return jsonify({'message': 'Part updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/parts/', methods=['DELETE']) def api_admin_delete_part(part_id): """Delete an OEM part""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM parts WHERE id = ?", (part_id,)) conn.commit() conn.close() return jsonify({'message': 'Part deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Manufacturers CRUD ---- @app.route('/api/admin/manufacturers', methods=['POST']) def api_admin_create_manufacturer(): """Create a new manufacturer""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO manufacturers (name, type, quality_tier, country, website) VALUES (?, ?, ?, ?, ?) """, ( data['name'], data.get('type', 'aftermarket'), data.get('quality_tier', 'standard'), data.get('country'), data.get('website') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Manufacturer created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/manufacturers/', methods=['PUT']) def api_admin_update_manufacturer(manufacturer_id): """Update a manufacturer""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE manufacturers SET name = ?, type = ?, quality_tier = ?, country = ?, website = ? WHERE id = ? """, ( data['name'], data.get('type'), data.get('quality_tier'), data.get('country'), data.get('website'), manufacturer_id )) conn.commit() conn.close() return jsonify({'message': 'Manufacturer updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/manufacturers/', methods=['DELETE']) def api_admin_delete_manufacturer(manufacturer_id): """Delete a manufacturer""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM manufacturers WHERE id = ?", (manufacturer_id,)) conn.commit() conn.close() return jsonify({'message': 'Manufacturer deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Aftermarket Parts CRUD ---- @app.route('/api/admin/aftermarket', methods=['POST']) def api_admin_create_aftermarket(): """Create a new aftermarket part""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( data['oem_part_id'], data['manufacturer_id'], data['part_number'], data.get('name'), data.get('name_es'), data.get('quality_tier', 'standard'), data.get('price_usd'), data.get('warranty_months') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Aftermarket part created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/aftermarket/', methods=['PUT']) def api_admin_update_aftermarket(aftermarket_id): """Update an aftermarket part""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE aftermarket_parts SET oem_part_id = ?, manufacturer_id = ?, part_number = ?, name = ?, name_es = ?, quality_tier = ?, price_usd = ?, warranty_months = ? WHERE id = ? """, ( data['oem_part_id'], data['manufacturer_id'], data['part_number'], data.get('name'), data.get('name_es'), data.get('quality_tier'), data.get('price_usd'), data.get('warranty_months'), aftermarket_id )) conn.commit() conn.close() return jsonify({'message': 'Aftermarket part updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/aftermarket/', methods=['DELETE']) def api_admin_delete_aftermarket(aftermarket_id): """Delete an aftermarket part""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM aftermarket_parts WHERE id = ?", (aftermarket_id,)) conn.commit() conn.close() return jsonify({'message': 'Aftermarket part deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Cross-References CRUD ---- @app.route('/api/admin/crossref') def api_admin_list_crossref(): """List cross-references with pagination""" try: page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 100) offset = (page - 1) * per_page conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM part_cross_references") total_count = cursor.fetchone()[0] cursor.execute(""" SELECT pcr.id, pcr.part_id, pcr.cross_reference_number, pcr.reference_type, pcr.source, pcr.notes, p.oem_part_number, p.name AS part_name FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id ORDER BY pcr.id DESC LIMIT ? OFFSET ? """, (per_page, offset)) refs = [] for row in cursor.fetchall(): refs.append({ 'id': row['id'], 'part_id': row['part_id'], 'cross_reference_number': row['cross_reference_number'], 'reference_type': row['reference_type'], 'source': row['source'], 'notes': row['notes'], 'oem_part_number': row['oem_part_number'], 'part_name': row['part_name'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': refs, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/crossref', methods=['POST']) def api_admin_create_crossref(): """Create a new cross-reference""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes) VALUES (?, ?, ?, ?, ?) """, ( data['part_id'], data['cross_reference_number'], data['reference_type'], data.get('source'), data.get('notes') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Cross-reference created'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/crossref/', methods=['PUT']) def api_admin_update_crossref(crossref_id): """Update a cross-reference""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" UPDATE part_cross_references SET part_id = ?, cross_reference_number = ?, reference_type = ?, source = ?, notes = ? WHERE id = ? """, ( data['part_id'], data['cross_reference_number'], data['reference_type'], data.get('source'), data.get('notes'), crossref_id )) conn.commit() conn.close() return jsonify({'message': 'Cross-reference updated'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/crossref/', methods=['DELETE']) def api_admin_delete_crossref(crossref_id): """Delete a cross-reference""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM part_cross_references WHERE id = ?", (crossref_id,)) conn.commit() conn.close() return jsonify({'message': 'Cross-reference deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- Fitment CRUD ---- @app.route('/api/admin/fitment') def api_admin_list_fitment(): """List fitments with pagination and filters""" try: page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) per_page = min(per_page, 500) # Allow more for bulk editor offset = (page - 1) * per_page brand = request.args.get('brand') model = request.args.get('model') mye_id = request.args.get('mye_id', type=int) conn = get_db_connection() cursor = conn.cursor() # Build WHERE clause where_clause = " WHERE 1=1" params = [] if mye_id: where_clause += " AND vp.model_year_engine_id = ?" params.append(mye_id) if brand: where_clause += " AND UPPER(b.name) = UPPER(?)" params.append(brand) if model: where_clause += " AND UPPER(m.name) = UPPER(?)" params.append(model) # Count query count_query = """ SELECT COUNT(*) FROM vehicle_parts vp JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id """ + where_clause cursor.execute(count_query, params) total_count = cursor.fetchone()[0] # Data query data_query = """ SELECT vp.id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, vp.position, vp.fitment_notes, b.name AS brand, m.name AS model, y.year, e.name AS engine, p.oem_part_number, p.name AS part_name FROM vehicle_parts vp JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id JOIN models m ON mye.model_id = m.id JOIN brands b ON m.brand_id = b.id JOIN years y ON mye.year_id = y.id JOIN engines e ON mye.engine_id = e.id JOIN parts p ON vp.part_id = p.id """ + where_clause + " ORDER BY vp.id DESC LIMIT ? OFFSET ?" cursor.execute(data_query, params + [per_page, offset]) fitments = [] for row in cursor.fetchall(): fitments.append({ 'id': row['id'], 'model_year_engine_id': row['model_year_engine_id'], 'part_id': row['part_id'], 'quantity_required': row['quantity_required'], 'position': row['position'], 'fitment_notes': row['fitment_notes'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine'], 'oem_part_number': row['oem_part_number'], 'part_name': row['part_name'] }) conn.close() total_pages = (total_count + per_page - 1) // per_page return jsonify({ 'data': fitments, 'pagination': { 'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages } }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/fitment', methods=['POST']) def api_admin_create_fitment(): """Create a new fitment record""" try: data = request.get_json() conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes) VALUES (?, ?, ?, ?, ?) """, ( data['model_year_engine_id'], data['part_id'], data.get('quantity_required', 1), data.get('position'), data.get('fitment_notes') )) conn.commit() new_id = cursor.lastrowid conn.close() return jsonify({'id': new_id, 'message': 'Fitment created'}) except sqlite3.IntegrityError: return jsonify({'error': 'Este fitment ya existe'}), 400 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/fitment/', methods=['DELETE']) def api_admin_delete_fitment(fitment_id): """Delete a fitment record""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("DELETE FROM vehicle_parts WHERE id = ?", (fitment_id,)) conn.commit() conn.close() return jsonify({'message': 'Fitment deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 # ---- CSV Import/Export ---- @app.route('/api/admin/import/', methods=['POST']) def api_admin_import_csv(import_type): """Import records from CSV data""" try: data = request.get_json() records = data.get('records', []) if not records: return jsonify({'error': 'No records to import'}), 400 conn = get_db_connection() cursor = conn.cursor() imported = 0 errors = [] for i, record in enumerate(records): try: if import_type == 'categories': cursor.execute(""" INSERT INTO part_categories (name, name_es, slug, icon_name, display_order) VALUES (?, ?, ?, ?, ?) """, ( record['name'], record.get('name_es'), record.get('slug') or record['name'].lower().replace(' ', '-'), record.get('icon_name'), record.get('display_order', 0) )) elif import_type == 'groups': cursor.execute(""" INSERT INTO part_groups (category_id, name, name_es, display_order) VALUES (?, ?, ?, ?) """, ( record['category_id'], record['name'], record.get('name_es'), record.get('display_order', 0) )) elif import_type == 'parts': cursor.execute(""" INSERT INTO parts (oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( record['oem_part_number'], record['name'], record.get('name_es'), record['group_id'], record.get('description'), record.get('description_es'), record.get('weight_kg'), record.get('material') )) elif import_type == 'manufacturers': cursor.execute(""" INSERT INTO manufacturers (name, type, quality_tier, country, website) VALUES (?, ?, ?, ?, ?) """, ( record['name'], record.get('type', 'aftermarket'), record.get('quality_tier', 'standard'), record.get('country'), record.get('website') )) elif import_type == 'aftermarket': cursor.execute(""" INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( record['oem_part_id'], record['manufacturer_id'], record['part_number'], record.get('name'), record.get('name_es'), record.get('quality_tier', 'standard'), record.get('price_usd'), record.get('warranty_months') )) elif import_type == 'crossref': cursor.execute(""" INSERT INTO part_cross_references (part_id, cross_reference_number, reference_type, source, notes) VALUES (?, ?, ?, ?, ?) """, ( record['part_id'], record['cross_reference_number'], record['reference_type'], record.get('source'), record.get('notes') )) elif import_type == 'fitment': cursor.execute(""" INSERT OR IGNORE INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, position, fitment_notes) VALUES (?, ?, ?, ?, ?) """, ( record['model_year_engine_id'], record['part_id'], record.get('quantity_required', 1), record.get('position'), record.get('fitment_notes') )) imported += 1 except Exception as e: errors.append(f"Row {i + 1}: {str(e)}") conn.commit() conn.close() result = {'imported': imported} if errors: result['errors'] = errors[:10] # Limit errors shown return jsonify(result) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/admin/export/') def api_admin_export_csv(export_type): """Export data as JSON (to be converted to CSV on frontend)""" try: conn = get_db_connection() cursor = conn.cursor() data = [] if export_type == 'categories': cursor.execute("SELECT id, name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'groups': cursor.execute("SELECT id, category_id, name, name_es, display_order FROM part_groups ORDER BY category_id, display_order, name") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'parts': cursor.execute("SELECT id, oem_part_number, name, name_es, group_id, description, description_es, weight_kg, material FROM parts ORDER BY id") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'manufacturers': cursor.execute("SELECT id, name, type, quality_tier, country, website FROM manufacturers ORDER BY name") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'aftermarket': cursor.execute("SELECT id, oem_part_id, manufacturer_id, part_number, name, name_es, quality_tier, price_usd, warranty_months FROM aftermarket_parts ORDER BY id") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'crossref': cursor.execute("SELECT id, part_id, cross_reference_number, reference_type, source, notes FROM part_cross_references ORDER BY id") for row in cursor.fetchall(): data.append(dict(row)) elif export_type == 'fitment': cursor.execute("SELECT id, model_year_engine_id, part_id, quantity_required, position, fitment_notes FROM vehicle_parts ORDER BY id") for row in cursor.fetchall(): data.append(dict(row)) conn.close() return jsonify({'data': data}) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # Image Upload Endpoint # ============================================================================ import base64 import uuid @app.route('/api/admin/upload-image', methods=['POST']) def api_admin_upload_image(): """Upload a base64 encoded image and save it to the server""" try: data = request.get_json() image_data = data.get('image') if not image_data: return jsonify({'error': 'No image data provided'}), 400 # Parse base64 data if ',' in image_data: # Format: data:image/png;base64,xxxxx header, encoded = image_data.split(',', 1) # Extract extension from header if 'png' in header: ext = 'png' elif 'jpeg' in header or 'jpg' in header: ext = 'jpg' elif 'gif' in header: ext = 'gif' elif 'webp' in header: ext = 'webp' else: ext = 'png' else: encoded = image_data ext = 'png' # Decode the image image_bytes = base64.b64decode(encoded) # Generate unique filename filename = f"{uuid.uuid4().hex}.{ext}" filepath = os.path.join('static', 'parts_images', filename) # Ensure directory exists os.makedirs(os.path.join('.', 'static', 'parts_images'), exist_ok=True) # Save the file with open(filepath, 'wb') as f: f.write(image_bytes) # Return the URL url = f"/static/parts_images/{filename}" return jsonify({'url': url, 'filename': filename}) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Check if database exists if not os.path.exists(DATABASE_PATH): print(f"Database not found at {DATABASE_PATH}") print("Please make sure the vehicle database is created first.") exit(1) print("Starting Vehicle Dashboard Server...") print("Visit http://localhost:5000 to access the dashboard locally") print("Visit http://192.168.10.198:5000 to access the dashboard from other computers on the network") app.run(debug=True, host='0.0.0.0', port=5000)