from flask import Flask, jsonify, request, send_from_directory, redirect, g from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError import os import sys import json as json_module import re import base64 import uuid import urllib.request from datetime import datetime, timedelta sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) from config import DB_URL from auth import hash_password, check_password, create_access_token, create_refresh_token, decode_token, require_auth app = Flask(__name__, static_folder='.') engine = create_engine(DB_URL, pool_pre_ping=True, pool_size=5, max_overflow=10) Session = sessionmaker(bind=engine) # ============================================================================ # Helper Functions # ============================================================================ def get_all_brands(detailed=False, region_mask=None): session = Session() try: region_filter = "" params = {} if region_mask is not None: region_filter = " AND (b.region & :rmask) > 0" params['rmask'] = region_mask if detailed: sql = text(""" SELECT b.name_brand AS name, COUNT(DISTINCT m.name_model) AS model_count, COUNT(DISTINCT mye.id_mye) AS vehicle_count FROM brands b JOIN models m ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model WHERE 1=1""" + region_filter + """ GROUP BY b.name_brand ORDER BY b.name_brand LIMIT 1000 """) rows = session.execute(sql, params).mappings().all() return [{'name': r['name'], 'model_count': r['model_count'], 'vehicle_count': r['vehicle_count']} for r in rows] else: sql = text(""" SELECT DISTINCT b.name_brand AS name FROM brands b JOIN models m ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model WHERE 1=1""" + region_filter + """ ORDER BY b.name_brand LIMIT 1000 """) rows = session.execute(sql, params).mappings().all() return [r['name'] for r in rows] finally: session.close() def get_all_years(): session = Session() try: rows = session.execute(text( "SELECT DISTINCT year_car AS year FROM years ORDER BY year_car DESC LIMIT 200" )).mappings().all() return [r['year'] for r in rows] finally: session.close() def get_all_engines(): session = Session() try: rows = session.execute(text( "SELECT DISTINCT name_engine AS name FROM engines ORDER BY name_engine LIMIT 5000" )).mappings().all() return [r['name'] for r in rows] finally: session.close() def get_models_by_brand(brand_name=None): session = Session() try: if brand_name: sql = text(""" SELECT DISTINCT m.name_model AS name FROM models m JOIN brands b ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model WHERE b.name_brand ILIKE :brand ORDER BY m.name_model LIMIT 1000 """) rows = session.execute(sql, {'brand': brand_name}).mappings().all() else: sql = text(""" SELECT DISTINCT m.name_model AS name FROM models m JOIN model_year_engine mye ON mye.model_id = m.id_model ORDER BY m.name_model LIMIT 1000 """) rows = session.execute(sql).mappings().all() return [r['name'] for r in rows] finally: session.close() def search_vehicles(brand=None, model=None, year=None, engine_name=None, with_parts=False, page=1, per_page=50): session = Session() try: per_page = min(per_page, 100) offset = (page - 1) * per_page base_from = """ FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine LEFT JOIN fuel_type ft ON e.id_fuel = ft.id_fuel LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission """ if with_parts: base_from += " JOIN (SELECT DISTINCT model_year_engine_id FROM vehicle_parts) AS has_parts ON mye.id_mye = has_parts.model_year_engine_id" where = " WHERE 1=1" params = {} if brand: where += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: where += " AND m.name_model ILIKE :model" params['model'] = model if year: where += " AND y.year_car = :year" params['year'] = int(year) if engine_name: where += " AND e.name_engine = :engine" params['engine'] = engine_name count_sql = text("SELECT COUNT(*) AS total " + base_from + where) total_count = session.execute(count_sql, params).mappings().first()['total'] data_params = dict(params) data_params['limit'] = per_page data_params['offset'] = offset query = text(""" SELECT b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, e.power_hp, e.torque_nm, e.displacement_cc, e.cylinders, ft.name_fuel AS fuel_type, mye.trim_level, dt.name_drivetrain AS drivetrain, tr.name_transmission AS transmission """ + base_from + where + " ORDER BY b.name_brand, m.name_model, y.year_car LIMIT :limit OFFSET :offset") rows = session.execute(query, data_params).mappings().all() vehicles = [] for r in rows: vehicles.append({ 'brand': r['brand'], 'model': r['model'], 'year': r['year'], 'engine': r['engine'], 'power_hp': r['power_hp'] or 0, 'torque_nm': r['torque_nm'] or 0, 'displacement_cc': r['displacement_cc'] or 0, 'cylinders': r['cylinders'] or 0, 'fuel_type': r['fuel_type'] or 'unknown', 'trim_level': r['trim_level'] or 'unknown', 'drivetrain': r['drivetrain'] or 'unknown', 'transmission': r['transmission'] or 'unknown' }) total_pages = (total_count + per_page - 1) // per_page return {'data': vehicles, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}} finally: session.close() # ============================================================================ # Static Routes # ============================================================================ @app.route('/') def index(): return redirect('/login.html') @app.route('/admin') def admin_page(): return send_from_directory('.', 'admin.html') @app.route('/landing') def landing_page(): return send_from_directory('.', 'customer-landing.html') @app.route('/diagramas') def diagrams_page(): return send_from_directory('.', 'diagrams.html') @app.route('/index.html') def index_html(): return send_from_directory('.', 'index.html') @app.route('/admin.html') def admin_html(): return send_from_directory('.', 'admin.html') @app.route('/customer-landing.html') def customer_landing_html(): return send_from_directory('.', 'customer-landing.html') @app.route('/diagrams.html') def diagrams_html(): return send_from_directory('.', 'diagrams.html') @app.route('/static/') def static_files(path): return send_from_directory('static', path) @app.route('/shared.css') def shared_css(): return send_from_directory('.', 'shared.css') @app.route('/nav.js') def nav_js(): return send_from_directory('.', 'nav.js') @app.route('/dashboard.js') def dashboard_js(): return send_from_directory('.', 'dashboard.js') @app.route('/admin.js') def admin_js(): return send_from_directory('.', 'admin.js') @app.route('/enhanced-search.js') def enhanced_search_js(): return send_from_directory('.', 'enhanced-search.js') # ============================================================================ # Core API Endpoints # ============================================================================ @app.route('/api/catalog/stats') def api_catalog_stats(): session = Session() try: row = session.execute(text(""" SELECT (SELECT COUNT(*) FROM brands) AS brands, (SELECT COUNT(*) FROM models) AS models, (SELECT COUNT(*) FROM model_year_engine) AS vehicles, (SELECT COUNT(*) FROM parts) AS parts """)).mappings().first() return jsonify({ 'brands': row['brands'], 'models': row['models'], 'vehicles': row['vehicles'], 'parts': row['parts'] }) finally: session.close() @app.route('/api/brands') def api_brands(): detailed = request.args.get('detailed', 'false').lower() == 'true' region = request.args.get('region') region_mask = int(region) if region else None return jsonify(get_all_brands(detailed=detailed, region_mask=region_mask)) @app.route('/api/years') def api_years(): brand = request.args.get('brand') model = request.args.get('model') session = Session() try: q = """SELECT DISTINCT y.year_car AS year FROM years y JOIN model_year_engine mye ON y.id_year = mye.year_id JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand WHERE 1=1""" params = {} if brand: q += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: q += " AND m.name_model ILIKE :model" params['model'] = model q += " ORDER BY y.year_car DESC" rows = session.execute(text(q), params).mappings().all() return jsonify([r['year'] for r in rows]) finally: session.close() @app.route('/api/engines') def api_engines(): brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') session = Session() try: q = """SELECT DISTINCT e.name_engine AS name FROM engines e JOIN model_year_engine mye ON e.id_engine = mye.engine_id JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year WHERE 1=1""" params = {} if brand: q += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: q += " AND m.name_model ILIKE :model" params['model'] = model if year: q += " AND y.year_car = :year" params['year'] = int(year) q += " ORDER BY e.name_engine" rows = session.execute(text(q), params).mappings().all() return jsonify([r['name'] for r in rows]) finally: session.close() @app.route('/api/models') def api_models(): brand = request.args.get('brand') detailed = request.args.get('detailed', 'false').lower() == 'true' if detailed and brand: session = Session() try: sql = text(""" SELECT m.name_model AS name, MIN(y.year_car) AS year_min, MAX(y.year_car) AS year_max, COUNT(DISTINCT y.year_car) AS year_count, COUNT(DISTINCT mye.id_mye) AS vehicle_count, COUNT(DISTINCT e.name_engine) AS engine_count FROM models m JOIN brands b ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE b.name_brand ILIKE :brand GROUP BY m.name_model ORDER BY m.name_model LIMIT 1000 """) rows = session.execute(sql, {'brand': brand}).mappings().all() return jsonify([{'name': r['name'], 'year_min': r['year_min'], 'year_max': r['year_max'], 'year_count': r['year_count'], 'vehicle_count': r['vehicle_count'], 'engine_count': r['engine_count']} for r in rows]) finally: session.close() return jsonify(get_models_by_brand(brand)) @app.route('/api/vehicles') def api_vehicles(): brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year') eng = request.args.get('engine') page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 50, type=int) return jsonify(search_vehicles(brand, model, year, eng, page=page, per_page=per_page)) @app.route('/api/model-year-engine') def api_model_year_engine(): try: brand = request.args.get('brand') model = request.args.get('model') year = request.args.get('year', type=int) with_parts = request.args.get('with_parts', 'false').lower() == 'true' page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page session = Session() try: base_from = """ FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine LEFT JOIN drivetrain dt ON mye.id_drivetrain = dt.id_drivetrain LEFT JOIN transmission tr ON mye.id_transmission = tr.id_transmission """ if with_parts: base_from += " JOIN (SELECT DISTINCT model_year_engine_id FROM vehicle_parts) AS has_parts ON mye.id_mye = has_parts.model_year_engine_id" where = " WHERE 1=1" params = {} if brand: where += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: where += " AND m.name_model ILIKE :model" params['model'] = model if year: where += " AND y.year_car = :year" params['year'] = year total_count = session.execute(text("SELECT COUNT(*) AS total " + base_from + where), params).mappings().first()['total'] data_params = dict(params) data_params['limit'] = per_page data_params['offset'] = offset query = text(""" SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, mye.trim_level, dt.name_drivetrain AS drivetrain, tr.name_transmission AS transmission """ + base_from + where + " ORDER BY b.name_brand, m.name_model, y.year_car, e.name_engine LIMIT :limit OFFSET :offset") rows = session.execute(query, data_params).mappings().all() records = [{'id': r['id'], 'brand': r['brand'], 'model': r['model'], 'year': r['year'], 'engine': r['engine'], 'trim_level': r['trim_level'], 'drivetrain': r['drivetrain'], 'transmission': r['transmission']} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': records, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) finally: session.close() except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # Parts Catalog Endpoints # ============================================================================ @app.route('/api/categories') def api_categories(): session = Session() try: rows = session.execute(text(""" SELECT id_part_category AS id, name_part_category AS name, name_es, slug, icon_name, display_order, parent_id FROM part_categories ORDER BY display_order, name_part_category LIMIT 50 """)).mappings().all() categories_dict = {} root_categories = [] for r in rows: cat = {'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'slug': r['slug'], 'icon_name': r['icon_name'], 'display_order': r['display_order'], 'children': []} categories_dict[r['id']] = cat if r['parent_id'] is None: root_categories.append(cat) for r in rows: if r['parent_id'] is not None and r['parent_id'] in categories_dict: categories_dict[r['parent_id']]['children'].append(categories_dict[r['id']]) return jsonify(root_categories) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/categories//groups') def api_category_groups(category_id): session = Session() try: rows = session.execute(text(""" SELECT id_part_group AS id, name_part_group AS name, name_es, slug, display_order FROM part_groups WHERE category_id = :cid ORDER BY display_order, name_part_group LIMIT 200 """), {'cid': category_id}).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'slug': r['slug'], 'display_order': r['display_order']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/parts') def api_parts(): try: group_id = request.args.get('group_id', type=int) category_id = request.args.get('category_id', type=int) search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page session = Session() try: where = " WHERE 1=1" params = {} if group_id: where += " AND p.group_id = :group_id" params['group_id'] = group_id if category_id: where += " AND pg.category_id = :category_id" params['category_id'] = category_id if search: where += " AND (p.name_part ILIKE :search OR p.name_es ILIKE :search OR p.oem_part_number ILIKE :search)" params['search'] = '%' + search + '%' base = """ FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category """ total_count = session.execute(text("SELECT COUNT(*) AS total " + base + where), params).mappings().first()['total'] data_params = dict(params) data_params['limit'] = per_page data_params['offset'] = offset rows = session.execute(text(""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.group_id, pg.name_part_group AS group_name, pc.name_part_category AS category_name, p.image_url """ + base + where + " ORDER BY p.name_part LIMIT :limit OFFSET :offset"), data_params).mappings().all() parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'group_id': r['group_id'], 'group_name': r['group_name'], 'category_name': r['category_name'], 'image_url': r['image_url']} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) finally: session.close() except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/parts/') def api_part_detail(part_id): session = Session() try: row = session.execute(text(""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.description, p.description_es, p.group_id, p.image_url, pg.name_part_group AS group_name, pg.name_es AS group_name_es, pc.id_part_category AS category_id, pc.name_part_category AS category_name, pc.name_es AS category_name_es FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE p.id_part = :pid """), {'pid': part_id}).mappings().first() if row is None: return jsonify({'error': 'Part not found'}), 404 return jsonify({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'description': row['description'], 'description_es': row['description_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'image_url': row['image_url'], 'group_name_es': row['group_name_es'], 'category_id': row['category_id'], 'category_name': row['category_name'], 'category_name_es': row['category_name_es']}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vehicles//categories') def api_vehicle_categories(mye_id): session = Session() try: rows = session.execute(text(""" SELECT DISTINCT pc.id_part_category AS id, pc.name_part_category AS name, pc.name_es, pc.slug, pc.icon_name, pc.display_order FROM part_categories pc JOIN part_groups pg ON pg.category_id = pc.id_part_category JOIN parts p ON p.group_id = pg.id_part_group JOIN vehicle_parts vp ON vp.part_id = p.id_part WHERE vp.model_year_engine_id = :mye_id ORDER BY pc.display_order, pc.name_part_category LIMIT 50 """), {'mye_id': mye_id}).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'slug': r['slug'], 'icon_name': r['icon_name'], 'display_order': r['display_order']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vehicles//groups') def api_vehicle_groups(mye_id): session = Session() try: category_id = request.args.get('category_id', type=int) q = """ SELECT DISTINCT pg.id_part_group AS id, pg.name_part_group AS name, pg.name_es, pg.slug, pg.display_order, COUNT(DISTINCT p.id_part) AS parts_count FROM part_groups pg JOIN parts p ON p.group_id = pg.id_part_group JOIN vehicle_parts vp ON vp.part_id = p.id_part WHERE vp.model_year_engine_id = :mye_id """ params = {'mye_id': mye_id} if category_id: q += " AND pg.category_id = :cid" params['cid'] = category_id q += " GROUP BY pg.id_part_group, pg.name_part_group, pg.name_es, pg.slug, pg.display_order ORDER BY pg.display_order, pg.name_part_group LIMIT 200" rows = session.execute(text(q), params).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'slug': r['slug'], 'display_order': r['display_order'], 'parts_count': r['parts_count']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vehicles//parts') def api_vehicle_parts(mye_id): try: category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page session = Session() try: base = """ FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part WHERE vp.model_year_engine_id = :mye_id """ params = {'mye_id': mye_id} if category_id: base += " AND pc.id_part_category = :cid" params['cid'] = category_id if group_id: base += " AND pg.id_part_group = :gid" params['gid'] = group_id total_count = session.execute(text("SELECT COUNT(*) AS total " + base), params).mappings().first()['total'] data_params = dict(params) data_params['limit'] = per_page data_params['offset'] = offset rows = session.execute(text(""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, vp.quantity_required, pp.name_position_part AS position, pc.name_part_category AS category_name, pg.name_part_group AS group_name, p.image_url """ + base + " ORDER BY pc.display_order, pg.display_order, p.name_part LIMIT :limit OFFSET :offset"), data_params).mappings().all() parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'quantity_required': r['quantity_required'], 'position': r['position'], 'category_name': r['category_name'], 'group_name': r['group_name'], 'image_url': r['image_url']} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) finally: session.close() except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # Cross-References and Aftermarket Endpoints # ============================================================================ @app.route('/api/manufacturers') def api_manufacturers(): session = Session() try: manufacturer_type = request.args.get('type') quality_tier = request.args.get('quality_tier') q = """ SELECT mfr.id_manufacture AS id, mfr.name_manufacture AS name, mt.name_type_manu AS type, qt.name_quality AS quality_tier, co.name_country AS country, mfr.logo_url, mfr.website FROM manufacturers mfr LEFT JOIN manufacture_type mt ON mfr.id_type_manu = mt.id_type_manu LEFT JOIN quality_tier qt ON mfr.id_quality_tier = qt.id_quality_tier LEFT JOIN countries co ON mfr.id_country = co.id_country WHERE 1=1 """ params = {} if manufacturer_type: q += " AND mt.name_type_manu ILIKE :type" params['type'] = manufacturer_type if quality_tier: q += " AND qt.name_quality ILIKE :qt" params['qt'] = quality_tier q += " ORDER BY mfr.name_manufacture LIMIT 200" rows = session.execute(text(q), params).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'type': r['type'], 'quality_tier': r['quality_tier'], 'country': r['country'], 'logo_url': r['logo_url'], 'website': r['website']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/parts//alternatives') def api_part_alternatives(part_id): session = Session() try: quality_tier = request.args.get('quality_tier') manufacturer_id = request.args.get('manufacturer_id', type=int) q = """ SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name, ap.name_es, mfr.name_manufacture AS manufacturer_name, ap.manufacturer_id, qt.name_quality AS quality_tier, ap.price_usd, ap.warranty_months FROM aftermarket_parts ap JOIN manufacturers mfr ON ap.manufacturer_id = mfr.id_manufacture LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier WHERE ap.oem_part_id = :pid """ params = {'pid': part_id} if quality_tier: q += " AND qt.name_quality ILIKE :qt" params['qt'] = quality_tier if manufacturer_id: q += " AND ap.manufacturer_id = :mid" params['mid'] = manufacturer_id q += " ORDER BY qt.name_quality DESC, ap.price_usd ASC LIMIT 50" rows = session.execute(text(q), params).mappings().all() return jsonify([{'id': r['id'], 'part_number': r['part_number'], 'name': r['name'], 'name_es': r['name_es'], 'manufacturer_name': r['manufacturer_name'], 'manufacturer_id': r['manufacturer_id'], 'quality_tier': r['quality_tier'], 'price_usd': r['price_usd'], 'warranty_months': r['warranty_months'], 'in_stock': None} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/parts//cross-references') def api_part_cross_references(part_id): session = Session() try: rows = session.execute(text(""" SELECT pcr.id_part_cross_ref AS id, pcr.cross_reference_number, rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes FROM part_cross_references pcr LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type WHERE pcr.part_id = :pid ORDER BY rt.name_ref_type, pcr.cross_reference_number LIMIT 100 """), {'pid': part_id}).mappings().all() return jsonify([{'id': r['id'], 'cross_reference_number': r['cross_reference_number'], 'reference_type': r['reference_type'], 'source': r['source'], 'notes': r['notes']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/search/part-number/') def api_search_part_number(part_number): session = Session() try: results = [] st = '%' + part_number + '%' for row in session.execute(text("SELECT id_part AS id, oem_part_number, name_part AS name, name_es FROM parts WHERE oem_part_number ILIKE :s"), {'s': st}).mappings().all(): results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'oem', 'matched_number': row['oem_part_number']}) for row in session.execute(text("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, ap.part_number FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part WHERE ap.part_number ILIKE :s"), {'s': st}).mappings().all(): results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'aftermarket', 'matched_number': row['part_number']}) for row in session.execute(text("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pcr.cross_reference_number FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part WHERE pcr.cross_reference_number ILIKE :s"), {'s': st}).mappings().all(): results.append({'id': row['id'], 'oem_part_number': row['oem_part_number'], 'name': row['name'], 'name_es': row['name_es'], 'match_type': 'cross_reference', 'matched_number': row['cross_reference_number']}) return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/aftermarket') def api_aftermarket_parts(): session = Session() try: manufacturer_id = request.args.get('manufacturer_id', type=int) quality_tier = request.args.get('quality_tier') search = request.args.get('search') page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page where = " WHERE 1=1" params = {} if manufacturer_id: where += " AND ap.manufacturer_id = :mid" params['mid'] = manufacturer_id if quality_tier: where += " AND qt.name_quality ILIKE :qt" params['qt'] = quality_tier if search: where += " AND (ap.name_aftermarket_parts ILIKE :s OR ap.part_number ILIKE :s OR p.oem_part_number ILIKE :s)" params['s'] = '%' + search + '%' base = """ FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier """ total_count = session.execute(text("SELECT COUNT(*) AS total " + base + where), params).mappings().first()['total'] data_params = dict(params) data_params['limit'] = per_page data_params['offset'] = offset rows = session.execute(text(""" SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name, p.oem_part_number, m.name_manufacture AS manufacturer_name, qt.name_quality AS quality_tier, ap.price_usd """ + base + where + " ORDER BY ap.name_aftermarket_parts LIMIT :limit OFFSET :offset"), data_params).mappings().all() parts = [{'id': r['id'], 'part_number': r['part_number'], 'name': r['name'], 'oem_part_number': r['oem_part_number'], 'manufacturer_name': r['manufacturer_name'], 'quality_tier': r['quality_tier'], 'price_usd': r['price_usd']} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Diagram Endpoints # ============================================================================ @app.route('/api/diagrams') def api_diagrams(): session = Session() try: group_id = request.args.get('group_id', type=int) q = """ SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id, pg.name_part_group AS group_name, d.thumbnail_path, d.display_order FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id_part_group WHERE 1=1 """ params = {} if group_id: q += " AND d.group_id = :gid" params['gid'] = group_id q += " ORDER BY d.display_order, d.name_diagram LIMIT 200" rows = session.execute(text(q), params).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'group_id': r['group_id'], 'group_name': r['group_name'], 'thumbnail_path': r['thumbnail_path'], 'display_order': r['display_order']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/diagrams/') def api_diagram_detail(diagram_id): session = Session() try: row = session.execute(text(""" SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id, pg.name_part_group AS group_name, d.image_path FROM diagrams d JOIN part_groups pg ON d.group_id = pg.id_part_group WHERE d.id_diagram = :did """), {'did': diagram_id}).mappings().first() if row is None: return jsonify({'error': 'Diagram not found'}), 404 image_path = row['image_path'] or '' image_url = '/' + image_path if image_path and not image_path.startswith('/') else image_path diagram = {'id': row['id'], 'name': row['name'], 'name_es': row['name_es'], 'group_id': row['group_id'], 'group_name': row['group_name'], 'image_path': image_path, 'image_url': image_url, 'svg_content': None, 'width': None, 'height': None, 'hotspots': []} hotspot_rows = session.execute(text(""" SELECT h.id_dgr_hotspot AS id, h.part_id, h.callout_number, sh.name_shape AS shape, h.coords, p.name_part AS part_name, p.oem_part_number AS part_number FROM diagram_hotspots h LEFT JOIN parts p ON h.part_id = p.id_part LEFT JOIN shapes sh ON h.id_shape = sh.id_shape WHERE h.diagram_id = :did ORDER BY h.callout_number """), {'did': diagram_id}).mappings().all() for hr in hotspot_rows: diagram['hotspots'].append({'id': hr['id'], 'part_id': hr['part_id'], 'callout_number': hr['callout_number'], 'label': None, 'shape': hr['shape'], 'coords': hr['coords'], 'color': None, 'part_name': hr['part_name'], 'part_number': hr['part_number']}) return jsonify(diagram) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/diagrams//hotspots') def api_diagram_hotspots(diagram_id): session = Session() try: rows = session.execute(text(""" SELECT h.id_dgr_hotspot AS id, h.part_id, h.callout_number, sh.name_shape AS shape, h.coords, p.name_part AS part_name, p.oem_part_number AS part_number FROM diagram_hotspots h LEFT JOIN parts p ON h.part_id = p.id_part LEFT JOIN shapes sh ON h.id_shape = sh.id_shape WHERE h.diagram_id = :did ORDER BY h.callout_number LIMIT 500 """), {'did': diagram_id}).mappings().all() return jsonify([{'id': r['id'], 'part_id': r['part_id'], 'callout_number': r['callout_number'], 'label': None, 'shape': r['shape'], 'coords': r['coords'], 'color': None, 'part_name': r['part_name'], 'part_number': r['part_number']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/groups//diagrams') def api_group_diagrams(group_id): session = Session() try: rows = session.execute(text(""" SELECT id_diagram AS id, name_diagram AS name, name_es, thumbnail_path, display_order FROM diagrams WHERE group_id = :gid ORDER BY display_order, name_diagram LIMIT 100 """), {'gid': group_id}).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'thumbnail_path': r['thumbnail_path'], 'display_order': r['display_order']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vehicles//diagrams') def api_vehicle_diagrams(mye_id): session = Session() try: rows = session.execute(text(""" SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id, d.image_path, pg.name_part_group AS group_name, pc.id_part_category AS category_id, pc.name_part_category AS category_name, d.thumbnail_path, vd.notes FROM vehicle_diagrams vd JOIN diagrams d ON vd.diagram_id = d.id_diagram JOIN part_groups pg ON d.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE vd.model_year_engine_id = :mye_id ORDER BY pc.display_order, pg.display_order, d.display_order LIMIT 200 """), {'mye_id': mye_id}).mappings().all() diagrams = [] for r in rows: ip = r['image_path'] or '' iu = '/' + ip if ip and not ip.startswith('/') else ip diagrams.append({'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'group_id': r['group_id'], 'group_name': r['group_name'], 'category_id': r['category_id'], 'category_name': r['category_name'], 'image_url': iu, 'thumbnail_path': r['thumbnail_path'], 'notes': r['notes']}) return jsonify(diagrams) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/diagrams//parts') def api_diagram_parts(diagram_id): session = Session() try: mye_id = request.args.get('mye_id', type=int) q = """ SELECT DISTINCT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.description, pg.id_part_group AS group_id, pg.name_part_group AS group_name, pg.name_es AS group_name_es FROM vehicle_diagrams vd JOIN vehicle_parts vp ON vp.model_year_engine_id = vd.model_year_engine_id JOIN parts p ON vp.part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE vd.diagram_id = :did AND pc.id_part_category IN (10, 11) """ params = {'did': diagram_id} if mye_id: q += " AND vd.model_year_engine_id = :mye_id" params['mye_id'] = mye_id q += " ORDER BY pg.name_part_group, p.oem_part_number LIMIT 200" rows = session.execute(text(q), params).mappings().all() # Batch cross-refs (N+1 fix) xrefs_map = {} if rows: part_ids = list(set(r['id'] for r in rows)) in_params = {} in_pl = [] for i, pid in enumerate(part_ids): in_params[f'pid_{i}'] = pid in_pl.append(f':pid_{i}') xref_rows = session.execute(text(f""" SELECT part_id, cross_reference_number, source_ref AS source FROM part_cross_references WHERE part_id IN ({', '.join(in_pl)}) """), in_params).mappings().all() for xr in xref_rows: xrefs_map.setdefault(xr['part_id'], []).append({'number': xr['cross_reference_number'], 'source': xr['source']}) return jsonify([{'id': r['id'], 'part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'description': r['description'], 'group_name': r['group_name'], 'group_name_es': r['group_name_es'], 'cross_references': xrefs_map.get(r['id'], [])} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/diagrams/search') def api_diagrams_search(): session = Session() try: q = request.args.get('q', '').strip() brand = request.args.get('brand', '').strip() model = request.args.get('model', '').strip() if q: rows = session.execute(text(""" SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.image_path, d.source_diagram AS source FROM diagrams d WHERE d.name_diagram ILIKE :q OR d.name_es ILIKE :q ORDER BY d.name_diagram LIMIT 50 """), {'q': '%' + q + '%'}).mappings().all() elif brand or model: sql = """ SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.image_path, d.source_diagram AS source FROM diagrams d JOIN vehicle_diagrams vd ON vd.diagram_id = d.id_diagram JOIN model_year_engine mye ON vd.model_year_engine_id = mye.id_mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand WHERE 1=1 """ params = {} if brand: sql += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: sql += " AND m.name_model ILIKE :model" params['model'] = model sql += " ORDER BY d.name_diagram LIMIT 50" rows = session.execute(text(sql), params).mappings().all() else: rows = session.execute(text(""" SELECT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.image_path, d.source_diagram AS source FROM diagrams d WHERE d.source_diagram = 'MOOG Catalog' ORDER BY d.name_diagram LIMIT 50 """)).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'image_path': r['image_path'], 'source': r['source']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/hotspots/') def api_hotspot_detail(hotspot_id): session = Session() try: row = session.execute(text(""" SELECT h.id_dgr_hotspot AS id, h.diagram_id, h.part_id, h.callout_number, sh.name_shape AS shape, h.coords FROM diagram_hotspots h LEFT JOIN shapes sh ON h.id_shape = sh.id_shape WHERE h.id_dgr_hotspot = :hid """), {'hid': hotspot_id}).mappings().first() if row is None: return jsonify({'error': 'Hotspot not found'}), 404 hotspot = {'id': row['id'], 'diagram_id': row['diagram_id'], 'part_id': row['part_id'], 'callout_number': row['callout_number'], 'label': None, 'shape': row['shape'], 'coords': row['coords'], 'color': None, 'part': None} if row['part_id']: pr = session.execute(text(""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category_name FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE p.id_part = :pid """), {'pid': row['part_id']}).mappings().first() if pr: hotspot['part'] = {'id': pr['id'], 'oem_part_number': pr['oem_part_number'], 'name': pr['name'], 'name_es': pr['name_es'], 'group_name': pr['group_name'], 'category_name': pr['category_name']} return jsonify(hotspot) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Search and VIN Endpoints # ============================================================================ def validate_vin(vin): if not vin or len(vin) != 17: return False return bool(re.compile(r'^[A-HJ-NPR-Z0-9]{17}$', re.IGNORECASE).match(vin)) def find_vehicle_in_terms(session, terms): if len(terms) < 2: return None, terms year_terms = [] other_terms = [] for t in terms: if t.isdigit() and 1980 <= int(t) <= 2030: year_terms.append(t) else: other_terms.append(t) if not other_terms: return None, terms best_match = None used_terms = [] for num_terms in range(min(3, len(other_terms)), 0, -1): if best_match: break for i in range(len(other_terms) - num_terms + 1): test_terms = other_terms[i:i + num_terms] if year_terms: test_terms = test_terms + year_terms[:1] where_clauses = [] params = {} for idx, t in enumerate(test_terms): tp = f"%{t}%" where_clauses.append(f"(b.name_brand ILIKE :t{idx}_b OR m.name_model ILIKE :t{idx}_m OR CAST(y.year_car AS TEXT) ILIKE :t{idx}_y)") params[f't{idx}_b'] = tp params[f't{idx}_m'] = tp params[f't{idx}_y'] = tp where_sql = " AND ".join(where_clauses) row = session.execute(text(f""" SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE {where_sql} ORDER BY y.year_car DESC LIMIT 1 """), params).mappings().first() if row: best_match = {'id': row['id'], 'brand': row['brand'], 'model': row['model'], 'year': row['year'], 'engine': row['engine']} used_terms = test_terms break if best_match: remaining = [] used_lower = [t.lower() for t in used_terms] for t in terms: if t.lower() not in used_lower: remaining.append(t) else: used_lower.remove(t.lower()) return best_match, remaining return None, terms @app.route('/api/search') def api_search(): session = Session() try: q = request.args.get('q', '').strip() search_type = request.args.get('type', 'all') category_id = request.args.get('category_id', type=int) limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not q: return jsonify({'error': 'Search query is required'}), 400 results = {'parts': [], 'vehicles': [], 'vehicle_parts': [], 'matched_vehicle': None, 'total_count': 0} terms = q.split() # Combined vehicle + part search if len(terms) >= 2 and search_type == 'all': matched_vehicle, remaining_terms = find_vehicle_in_terms(session, terms) if matched_vehicle and remaining_terms: results['matched_vehicle'] = matched_vehicle mv = matched_vehicle vp_where_clauses = [] vp_params = {'mv_brand': mv['brand'], 'mv_model': mv['model'], 'mv_year': mv['year'], 'limit': limit} for i, t in enumerate(remaining_terms): tp = f"%{t}%" vp_where_clauses.append(f"(p.name_part ILIKE :vp{i}_n OR p.name_es ILIKE :vp{i}_ne OR p.oem_part_number ILIKE :vp{i}_o OR pg.name_part_group ILIKE :vp{i}_g)") vp_params[f'vp{i}_n'] = tp vp_params[f'vp{i}_ne'] = tp vp_params[f'vp{i}_o'] = tp vp_params[f'vp{i}_g'] = tp vp_where_sql = " AND ".join(vp_where_clauses) rows = session.execute(text(f""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pg.name_part_group AS group_name, pg.id_part_group AS group_id, pc.name_part_category AS category_name, pc.id_part_category AS category_id, vp.quantity_required, pp.name_position_part AS position, p.image_url FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part WHERE vp.model_year_engine_id IN ( SELECT mye.id_mye FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year WHERE b.name_brand ILIKE :mv_brand AND m.name_model ILIKE :mv_model AND y.year_car = :mv_year ) AND ({vp_where_sql}) ORDER BY p.name_part LIMIT :limit """), vp_params).mappings().all() for r in rows: results['vehicle_parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'image_url': r.get('image_url'), 'group_name': r['group_name'], 'group_id': r['group_id'], 'category_name': r['category_name'], 'category_id': r['category_id'], 'quantity': r['quantity_required'], 'position': r['position'], 'match_type': 'vehicle_part'}) if results['vehicle_parts']: results['total_count'] = len(results['vehicle_parts']) return jsonify(results) # Search parts if search_type in ('parts', 'all') and terms: where_clauses = [] params = {} for i, t in enumerate(terms): tp = f"%{t}%" where_clauses.append(f"(p.name_part ILIKE :p{i}_n OR p.name_es ILIKE :p{i}_ne OR p.oem_part_number ILIKE :p{i}_o OR pg.name_part_group ILIKE :p{i}_g OR pc.name_part_category ILIKE :p{i}_c)") params[f'p{i}_n'] = tp params[f'p{i}_ne'] = tp params[f'p{i}_o'] = tp params[f'p{i}_g'] = tp params[f'p{i}_c'] = tp where_sql = " AND ".join(where_clauses) cat_filter = "" if category_id: cat_filter = " AND pc.id_part_category = :cat_id" params['cat_id'] = category_id params['first_term'] = f"{terms[0]}%" params['limit'] = limit params['offset'] = offset rows = session.execute(text(f""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category_name, p.image_url FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE ({where_sql}){cat_filter} ORDER BY CASE WHEN p.oem_part_number ILIKE :first_term THEN 1 WHEN p.name_part ILIKE :first_term THEN 2 ELSE 3 END, p.name_part LIMIT :limit OFFSET :offset """), params).mappings().all() for r in rows: results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'image_url': r['image_url'], 'group_name': r['group_name'], 'category_name': r['category_name'], 'match_type': 'oem'}) # Aftermarket search if not category_id: af_params = {'af_limit': limit, 'af_offset': offset} af_clauses = [] for i, t in enumerate(terms): af_clauses.append(f"ap.part_number ILIKE :af{i}") af_params[f'af{i}'] = f"%{t}%" af_rows = session.execute(text(f""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category_name, ap.part_number AS matched_number, p.image_url FROM aftermarket_parts ap JOIN parts p ON ap.oem_part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE {' AND '.join(af_clauses)} LIMIT :af_limit OFFSET :af_offset """), af_params).mappings().all() for r in af_rows: if not any(p['id'] == r['id'] for p in results['parts']): results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'image_url': r['image_url'], 'group_name': r['group_name'], 'category_name': r['category_name'], 'matched_number': r['matched_number'], 'match_type': 'aftermarket'}) # Cross-reference search cr_params = {'cr_limit': limit, 'cr_offset': offset} cr_clauses = [] for i, t in enumerate(terms): cr_clauses.append(f"pcr.cross_reference_number ILIKE :cr{i}") cr_params[f'cr{i}'] = f"%{t}%" cr_rows = session.execute(text(f""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category_name, pcr.cross_reference_number AS matched_number, p.image_url FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE {' AND '.join(cr_clauses)} LIMIT :cr_limit OFFSET :cr_offset """), cr_params).mappings().all() for r in cr_rows: if not any(p['id'] == r['id'] for p in results['parts']): results['parts'].append({'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'image_url': r['image_url'], 'group_name': r['group_name'], 'category_name': r['category_name'], 'matched_number': r['matched_number'], 'match_type': 'cross_reference'}) # Search vehicles if search_type in ('vehicles', 'all') and terms: v_params = {'v_limit': limit, 'v_offset': offset} v_clauses = [] for i, t in enumerate(terms): tp = f"%{t}%" v_clauses.append(f"(b.name_brand ILIKE :v{i}_b OR m.name_model ILIKE :v{i}_m OR CAST(y.year_car AS TEXT) ILIKE :v{i}_y OR e.name_engine ILIKE :v{i}_e)") v_params[f'v{i}_b'] = tp v_params[f'v{i}_m'] = tp v_params[f'v{i}_y'] = tp v_params[f'v{i}_e'] = tp v_rows = session.execute(text(f""" SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE {' AND '.join(v_clauses)} ORDER BY y.year_car DESC, b.name_brand, m.name_model LIMIT :v_limit OFFSET :v_offset """), v_params).mappings().all() for r in v_rows: results['vehicles'].append({'id': r['id'], 'brand': r['brand'], 'model': r['model'], 'year': r['year'], 'engine': r['engine']}) results['total_count'] = len(results['parts']) + len(results['vehicles']) return jsonify(results) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/search/parts') def api_search_parts(): session = Session() try: q = request.args.get('q', '').strip() category_id = request.args.get('category_id', type=int) group_id = request.args.get('group_id', type=int) page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page if not q: return jsonify({'error': 'Search query is required'}), 400 filter_clause = "" params = {'q': q, 'per_page': per_page, 'offset': offset} if category_id: filter_clause += " AND pg.category_id = :cid" params['cid'] = category_id if group_id: filter_clause += " AND p.group_id = :gid" params['gid'] = group_id total_count = session.execute(text(f""" SELECT COUNT(*) AS total FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE p.search_vector @@ plainto_tsquery('spanish', :q) {filter_clause} """), params).mappings().first()['total'] rows = session.execute(text(f""" SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.description, pg.name_part_group AS group_name, pc.name_part_category AS category_name, ts_rank(p.search_vector, plainto_tsquery('spanish', :q)) AS rank FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE p.search_vector @@ plainto_tsquery('spanish', :q) {filter_clause} ORDER BY rank DESC LIMIT :per_page OFFSET :offset """), params).mappings().all() parts = [{'id': r['id'], 'oem_part_number': r['oem_part_number'], 'name': r['name'], 'name_es': r['name_es'], 'description': r['description'], 'group_name': r['group_name'], 'category_name': r['category_name'], 'rank': float(r['rank'])} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': parts, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vin/decode/') def api_vin_decode(vin): session = Session() try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400 cached_row = session.execute(text(""" SELECT vin, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, created_at, expires_at FROM vin_cache WHERE vin = :vin AND expires_at > NOW() """), {'vin': vin}).mappings().first() if cached_row: engine_info_data = {} if cached_row['engine_info']: try: engine_info_data = json_module.loads(cached_row['engine_info']) except Exception: engine_info_data = {'raw': cached_row['engine_info']} cached_data = {'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'engine_info': engine_info_data, 'body_class': cached_row['body_class'], 'drive_type': cached_row['drive_type'], 'matched_vehicle': None, 'cached': True} if cached_row['model_year_engine_id']: mye_row = session.execute(text(""" SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE mye.id_mye = :mye_id """), {'mye_id': cached_row['model_year_engine_id']}).mappings().first() if mye_row: cached_data['matched_vehicle'] = {'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine']} return jsonify(cached_data) # Call NHTSA API nhtsa_url = f'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVin/{vin}?format=json' try: req = urllib.request.Request(nhtsa_url, headers={'User-Agent': 'NexusAutoparts/2.0'}) with urllib.request.urlopen(req, timeout=10) as response: nhtsa_data = json_module.loads(response.read().decode('utf-8')) except urllib.error.URLError as e: return jsonify({'error': f'Failed to connect to NHTSA API: {str(e)}'}), 503 except urllib.error.HTTPError as e: return jsonify({'error': f'NHTSA API error: {e.code}'}), 502 except Exception as e: return jsonify({'error': f'Error calling NHTSA API: {str(e)}'}), 500 nhtsa_results = {item['Variable']: item['Value'] for item in nhtsa_data.get('Results', [])} make = nhtsa_results.get('Make', '') model = nhtsa_results.get('Model', '') year_str = nhtsa_results.get('ModelYear', '') year = int(year_str) if year_str and year_str.isdigit() else None engine_config = nhtsa_results.get('EngineConfiguration', '') cylinders_str = nhtsa_results.get('EngineCylinders', '') cylinders = int(cylinders_str) if cylinders_str and cylinders_str.isdigit() else None displacement_str = nhtsa_results.get('DisplacementL', '') displacement_l = float(displacement_str) if displacement_str else None fuel_type = nhtsa_results.get('FuelTypePrimary', '') body_class = nhtsa_results.get('BodyClass', '') drive_type = nhtsa_results.get('DriveType', '') matched_mye_id = None matched_vehicle = None if make and model and year: mye_row = session.execute(text(""" SELECT mye.id_mye AS id, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE b.name_brand ILIKE :make AND m.name_model ILIKE :model AND y.year_car = :year LIMIT 1 """), {'make': make, 'model': model, 'year': year}).mappings().first() if mye_row: matched_mye_id = mye_row['id'] matched_vehicle = {'mye_id': mye_row['id'], 'brand': mye_row['brand'], 'model': mye_row['model'], 'year': mye_row['year'], 'engine': mye_row['engine']} expires_at = datetime.now() + timedelta(days=30) engine_info = json_module.dumps({'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type}) session.execute(text(""" INSERT INTO vin_cache (vin, decoded_data, make, model, year, engine_info, body_class, drive_type, model_year_engine_id, expires_at) VALUES (:vin, :decoded_data::jsonb, :make, :model, :year, :engine_info, :body_class, :drive_type, :mye_id, :expires_at) ON CONFLICT (vin) DO UPDATE SET decoded_data = EXCLUDED.decoded_data, make = EXCLUDED.make, model = EXCLUDED.model, year = EXCLUDED.year, engine_info = EXCLUDED.engine_info, body_class = EXCLUDED.body_class, drive_type = EXCLUDED.drive_type, model_year_engine_id = EXCLUDED.model_year_engine_id, expires_at = EXCLUDED.expires_at """), {'vin': vin, 'decoded_data': json_module.dumps(nhtsa_results), 'make': make, 'model': model, 'year': year, 'engine_info': engine_info, 'body_class': body_class, 'drive_type': drive_type, 'mye_id': matched_mye_id, 'expires_at': expires_at}) session.commit() return jsonify({'vin': vin, 'make': make, 'model': model, 'year': year, 'engine_info': {'configuration': engine_config, 'cylinders': cylinders, 'displacement_l': displacement_l, 'fuel_type': fuel_type}, 'body_class': body_class, 'drive_type': drive_type, 'matched_vehicle': matched_vehicle, 'cached': False}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vin//parts') def api_vin_parts(vin): session = Session() try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400 category_id = request.args.get('category_id', type=int) cached_row = session.execute(text("SELECT vin, make, model, year, model_year_engine_id FROM vin_cache WHERE vin = :vin"), {'vin': vin}).mappings().first() if not cached_row: return jsonify({'error': 'VIN not found in cache. Please decode the VIN first using /api/vin/decode/'}), 404 mye_id = cached_row['model_year_engine_id'] vehicle_info = {'vin': cached_row['vin'], 'make': cached_row['make'], 'model': cached_row['model'], 'year': cached_row['year'], 'mye_id': mye_id} if not mye_id: return jsonify({'vin': vin, 'vehicle_info': vehicle_info, 'categories': [], 'message': 'No matching vehicle configuration found in database. Use /api/vin//match to manually link.'}) params = {'mye_id': mye_id} cat_filter = "" if category_id: cat_filter = " AND pc.id_part_category = :cid" params['cid'] = category_id rows = session.execute(text(f""" SELECT pc.id_part_category AS category_id, pc.name_part_category AS category_name, pc.name_es AS category_name_es, p.id_part AS part_id, p.oem_part_number, p.name_part AS part_name, p.name_es AS part_name_es, pg.name_part_group AS group_name, vp.quantity_required, pp.name_position_part AS position FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id_part JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part WHERE vp.model_year_engine_id = :mye_id {cat_filter} ORDER BY pc.display_order, pg.display_order, p.name_part """), params).mappings().all() categories_dict = {} for r in rows: cid = r['category_id'] if cid not in categories_dict: categories_dict[cid] = {'id': cid, 'name': r['category_name'], 'name_es': r['category_name_es'], 'parts': []} categories_dict[cid]['parts'].append({'id': r['part_id'], 'oem_part_number': r['oem_part_number'], 'name': r['part_name'], 'name_es': r['part_name_es'], 'group_name': r['group_name'], 'quantity_required': r['quantity_required'], 'position': r['position']}) return jsonify({'vin': vin, 'vehicle_info': vehicle_info, 'categories': list(categories_dict.values())}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/vin//match') def api_vin_match(vin): session = Session() try: vin = vin.upper().strip() if not validate_vin(vin): return jsonify({'error': 'Invalid VIN format. VIN must be 17 alphanumeric characters (no I, O, Q).'}), 400 mye_id = request.args.get('mye_id', type=int) if not mye_id: return jsonify({'error': 'mye_id parameter is required'}), 400 mye_row = session.execute(text("SELECT id_mye FROM model_year_engine WHERE id_mye = :mid"), {'mid': mye_id}).mappings().first() if not mye_row: return jsonify({'error': f'model_year_engine_id {mye_id} not found'}), 404 vin_row = session.execute(text("SELECT vin FROM vin_cache WHERE vin = :vin"), {'vin': vin}).mappings().first() if vin_row: session.execute(text("UPDATE vin_cache SET model_year_engine_id = :mid WHERE vin = :vin"), {'mid': mye_id, 'vin': vin}) else: expires_at = datetime.now() + timedelta(days=30) session.execute(text(""" INSERT INTO vin_cache (vin, decoded_data, model_year_engine_id, created_at, expires_at) VALUES (:vin, '{}'::jsonb, :mid, NOW(), :expires_at) """), {'vin': vin, 'mid': mye_id, 'expires_at': expires_at}) session.commit() return jsonify({'success': True, 'vin': vin, 'mye_id': mye_id}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Admin Endpoints # ============================================================================ @app.route('/api/admin/stats') def api_admin_stats(): session = Session() try: stats = {} # Small tables: exact count for table, key in [('part_categories', 'categories'), ('part_groups', 'groups'), ('manufacturers', 'manufacturers')]: stats[key] = session.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar() # Large tables: use pg estimate for speed for table, key in [('parts', 'parts'), ('aftermarket_parts', 'aftermarket'), ('vehicle_parts', 'fitment')]: est = session.execute(text( "SELECT reltuples::bigint FROM pg_class WHERE relname = :t" ), {'t': table}).scalar() if est and est > 0: stats[key] = est else: stats[key] = session.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar() return jsonify(stats) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Categories CRUD ---- @app.route('/api/admin/categories', methods=['POST']) def api_admin_create_category(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO part_categories (name_part_category, name_es, slug, icon_name, display_order, parent_id) VALUES (:name, :name_es, :slug, :icon_name, :display_order, :parent_id) RETURNING id_part_category """), {'name': data['name'], 'name_es': data.get('name_es'), 'slug': data.get('slug') or data['name'].lower().replace(' ', '-'), 'icon_name': data.get('icon_name'), 'display_order': data.get('display_order', 0), 'parent_id': data.get('parent_id')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Category created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/categories/', methods=['PUT']) def api_admin_update_category(category_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE part_categories SET name_part_category = :name, name_es = :name_es, slug = :slug, icon_name = :icon_name, display_order = :display_order WHERE id_part_category = :id """), {'name': data['name'], 'name_es': data.get('name_es'), 'slug': data.get('slug'), 'icon_name': data.get('icon_name'), 'display_order': data.get('display_order', 0), 'id': category_id}) session.commit() return jsonify({'message': 'Category updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/categories/', methods=['DELETE']) def api_admin_delete_category(category_id): session = Session() try: session.execute(text("DELETE FROM part_categories WHERE id_part_category = :id"), {'id': category_id}) session.commit() return jsonify({'message': 'Category deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Groups CRUD ---- @app.route('/api/admin/groups') def api_admin_list_groups(): session = Session() try: category_id = request.args.get('category_id', type=int) q = """ SELECT pg.id_part_group AS id, pg.name_part_group AS name, pg.name_es, pg.category_id, pg.display_order, pg.slug, pc.name_part_category AS category_name FROM part_groups pg LEFT JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE 1=1 """ params = {} if category_id: q += " AND pg.category_id = :cid" params['cid'] = category_id q += " ORDER BY pg.display_order, pg.name_part_group" rows = session.execute(text(q), params).mappings().all() return jsonify([{'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'category_id': r['category_id'], 'category_name': r['category_name'], 'display_order': r['display_order'], 'slug': r['slug']} for r in rows]) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/groups', methods=['POST']) def api_admin_create_group(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO part_groups (category_id, name_part_group, name_es, slug, display_order) VALUES (:category_id, :name, :name_es, :slug, :display_order) RETURNING id_part_group """), {'category_id': data['category_id'], 'name': data['name'], 'name_es': data.get('name_es'), 'slug': data.get('slug') or data['name'].lower().replace(' ', '-'), 'display_order': data.get('display_order', 0)}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Group created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/groups/', methods=['PUT']) def api_admin_update_group(group_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE part_groups SET category_id = :category_id, name_part_group = :name, name_es = :name_es, display_order = :display_order WHERE id_part_group = :id """), {'category_id': data['category_id'], 'name': data['name'], 'name_es': data.get('name_es'), 'display_order': data.get('display_order', 0), 'id': group_id}) session.commit() return jsonify({'message': 'Group updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/groups/', methods=['DELETE']) def api_admin_delete_group(group_id): session = Session() try: session.execute(text("DELETE FROM part_groups WHERE id_part_group = :id"), {'id': group_id}) session.commit() return jsonify({'message': 'Group deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Parts CRUD ---- @app.route('/api/admin/parts', methods=['POST']) def api_admin_create_part(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO parts (oem_part_number, name_part, name_es, group_id, description, description_es, weight_kg, id_material) VALUES (:oem, :name, :name_es, :group_id, :desc, :desc_es, :weight, (SELECT id_material FROM materials WHERE name_material = :material)) RETURNING id_part """), {'oem': data['oem_part_number'], 'name': data['name'], 'name_es': data.get('name_es'), 'group_id': data['group_id'], 'desc': data.get('description'), 'desc_es': data.get('description_es'), 'weight': data.get('weight_kg'), 'material': data.get('material')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Part created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/parts/', methods=['PUT']) def api_admin_update_part(part_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE parts SET oem_part_number = :oem, name_part = :name, name_es = :name_es, group_id = :group_id, description = :desc, description_es = :desc_es, weight_kg = :weight, id_material = (SELECT id_material FROM materials WHERE name_material = :material) WHERE id_part = :id """), {'oem': data['oem_part_number'], 'name': data['name'], 'name_es': data.get('name_es'), 'group_id': data['group_id'], 'desc': data.get('description'), 'desc_es': data.get('description_es'), 'weight': data.get('weight_kg'), 'material': data.get('material'), 'id': part_id}) session.commit() return jsonify({'message': 'Part updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/parts/', methods=['DELETE']) def api_admin_delete_part(part_id): session = Session() try: session.execute(text("DELETE FROM parts WHERE id_part = :id"), {'id': part_id}) session.commit() return jsonify({'message': 'Part deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Manufacturers CRUD ---- @app.route('/api/admin/manufacturers', methods=['POST']) def api_admin_create_manufacturer(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier, id_country, website) VALUES (:name, (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type), (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), (SELECT id_country FROM countries WHERE name_country = :country), :website) RETURNING id_manufacture """), {'name': data['name'], 'type': data.get('type', 'aftermarket'), 'qt': data.get('quality_tier', 'standard'), 'country': data.get('country'), 'website': data.get('website')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Manufacturer created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/manufacturers/', methods=['PUT']) def api_admin_update_manufacturer(manufacturer_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE manufacturers SET name_manufacture = :name, id_type_manu = (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type), id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), id_country = (SELECT id_country FROM countries WHERE name_country = :country), website = :website WHERE id_manufacture = :id """), {'name': data['name'], 'type': data.get('type'), 'qt': data.get('quality_tier'), 'country': data.get('country'), 'website': data.get('website'), 'id': manufacturer_id}) session.commit() return jsonify({'message': 'Manufacturer updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/manufacturers/', methods=['DELETE']) def api_admin_delete_manufacturer(manufacturer_id): session = Session() try: session.execute(text("DELETE FROM manufacturers WHERE id_manufacture = :id"), {'id': manufacturer_id}) session.commit() return jsonify({'message': 'Manufacturer deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Aftermarket Parts CRUD ---- @app.route('/api/admin/aftermarket', methods=['POST']) def api_admin_create_aftermarket(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts, name_es, id_quality_tier, price_usd, warranty_months) VALUES (:oem_part_id, :manufacturer_id, :part_number, :name, :name_es, (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), :price_usd, :warranty_months) RETURNING id_aftermarket_parts """), {'oem_part_id': data['oem_part_id'], 'manufacturer_id': data['manufacturer_id'], 'part_number': data['part_number'], 'name': data.get('name'), 'name_es': data.get('name_es'), 'qt': data.get('quality_tier', 'standard'), 'price_usd': data.get('price_usd'), 'warranty_months': data.get('warranty_months')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Aftermarket part created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/aftermarket/', methods=['PUT']) def api_admin_update_aftermarket(aftermarket_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE aftermarket_parts SET oem_part_id = :oem_part_id, manufacturer_id = :manufacturer_id, part_number = :part_number, name_aftermarket_parts = :name, name_es = :name_es, id_quality_tier = (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), price_usd = :price_usd, warranty_months = :warranty_months WHERE id_aftermarket_parts = :id """), {'oem_part_id': data['oem_part_id'], 'manufacturer_id': data['manufacturer_id'], 'part_number': data['part_number'], 'name': data.get('name'), 'name_es': data.get('name_es'), 'qt': data.get('quality_tier'), 'price_usd': data.get('price_usd'), 'warranty_months': data.get('warranty_months'), 'id': aftermarket_id}) session.commit() return jsonify({'message': 'Aftermarket part updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/aftermarket/', methods=['DELETE']) def api_admin_delete_aftermarket(aftermarket_id): session = Session() try: session.execute(text("DELETE FROM aftermarket_parts WHERE id_aftermarket_parts = :id"), {'id': aftermarket_id}) session.commit() return jsonify({'message': 'Aftermarket part deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Cross-References CRUD ---- @app.route('/api/admin/crossref') def api_admin_list_crossref(): session = Session() try: page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 100) offset = (page - 1) * per_page total_count = session.execute(text("SELECT COUNT(*) FROM part_cross_references")).scalar() rows = session.execute(text(""" SELECT pcr.id_part_cross_ref AS id, pcr.part_id, pcr.cross_reference_number, rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes, p.oem_part_number, p.name_part AS part_name FROM part_cross_references pcr JOIN parts p ON pcr.part_id = p.id_part LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type ORDER BY pcr.id_part_cross_ref DESC LIMIT :limit OFFSET :offset """), {'limit': per_page, 'offset': offset}).mappings().all() refs = [{'id': r['id'], 'part_id': r['part_id'], 'cross_reference_number': r['cross_reference_number'], 'reference_type': r['reference_type'], 'source': r['source'], 'notes': r['notes'], 'oem_part_number': r['oem_part_number'], 'part_name': r['part_name']} for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': refs, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/crossref', methods=['POST']) def api_admin_create_crossref(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO part_cross_references (part_id, cross_reference_number, id_ref_type, source_ref, notes) VALUES (:part_id, :cross_ref, (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type), :source, :notes) RETURNING id_part_cross_ref """), {'part_id': data['part_id'], 'cross_ref': data['cross_reference_number'], 'ref_type': data['reference_type'], 'source': data.get('source'), 'notes': data.get('notes')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Cross-reference created'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/crossref/', methods=['PUT']) def api_admin_update_crossref(crossref_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE part_cross_references SET part_id = :part_id, cross_reference_number = :cross_ref, id_ref_type = (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type), source_ref = :source, notes = :notes WHERE id_part_cross_ref = :id """), {'part_id': data['part_id'], 'cross_ref': data['cross_reference_number'], 'ref_type': data['reference_type'], 'source': data.get('source'), 'notes': data.get('notes'), 'id': crossref_id}) session.commit() return jsonify({'message': 'Cross-reference updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/crossref/', methods=['DELETE']) def api_admin_delete_crossref(crossref_id): session = Session() try: session.execute(text("DELETE FROM part_cross_references WHERE id_part_cross_ref = :id"), {'id': crossref_id}) session.commit() return jsonify({'message': 'Cross-reference deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Fitment CRUD ---- @app.route('/api/admin/fitment') def api_admin_list_fitment(): session = Session() try: page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 50, type=int), 500) offset = (page - 1) * per_page brand = request.args.get('brand') model = request.args.get('model') mye_id = request.args.get('mye_id', type=int) where = " WHERE 1=1" params = {'limit': per_page, 'offset': offset} if mye_id: where += " AND vp.model_year_engine_id = :mye_id" params['mye_id'] = mye_id if brand: where += " AND b.name_brand ILIKE :brand" params['brand'] = brand if model: where += " AND m.name_model ILIKE :model" params['model'] = model base = """ FROM vehicle_parts vp JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand """ total_count = session.execute(text("SELECT COUNT(*) " + base + where), params).scalar() rows = session.execute(text(""" SELECT vp.id_vehicle_part AS id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, pp.name_position_part AS position, vp.fitment_notes, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, p.oem_part_number, p.name_part AS part_name FROM vehicle_parts vp JOIN model_year_engine mye ON vp.model_year_engine_id = mye.id_mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine JOIN parts p ON vp.part_id = p.id_part LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part """ + where + " ORDER BY vp.id_vehicle_part DESC LIMIT :limit OFFSET :offset"), params).mappings().all() fitments = [dict(r) for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': fitments, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/fitment', methods=['POST']) def api_admin_create_fitment(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, id_position_part, fitment_notes) VALUES (:mye_id, :part_id, :qty, (SELECT id_position_part FROM position_part WHERE name_position_part = :position), :notes) RETURNING id_vehicle_part """), {'mye_id': data['model_year_engine_id'], 'part_id': data['part_id'], 'qty': data.get('quantity_required', 1), 'position': data.get('position'), 'notes': data.get('fitment_notes')}) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Fitment created'}) except IntegrityError: session.rollback() return jsonify({'error': 'Este fitment ya existe'}), 400 except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/fitment/', methods=['DELETE']) def api_admin_delete_fitment(fitment_id): session = Session() try: session.execute(text("DELETE FROM vehicle_parts WHERE id_vehicle_part = :id"), {'id': fitment_id}) session.commit() return jsonify({'message': 'Fitment deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # CSV Import # ============================================================================ @app.route('/api/admin/import/', methods=['POST']) def api_admin_import_csv(import_type): session = Session() try: data = request.get_json() records = data.get('records', []) if not records: return jsonify({'error': 'No records to import'}), 400 imported = 0 errors = [] for i, rec in enumerate(records): try: if import_type == 'categories': session.execute(text("INSERT INTO part_categories (name_part_category, name_es, slug, icon_name, display_order) VALUES (:name, :name_es, :slug, :icon_name, :do)"), {'name': rec['name'], 'name_es': rec.get('name_es'), 'slug': rec.get('slug') or rec['name'].lower().replace(' ', '-'), 'icon_name': rec.get('icon_name'), 'do': rec.get('display_order', 0)}) elif import_type == 'groups': session.execute(text("INSERT INTO part_groups (category_id, name_part_group, name_es, display_order) VALUES (:cid, :name, :name_es, :do)"), {'cid': rec['category_id'], 'name': rec['name'], 'name_es': rec.get('name_es'), 'do': rec.get('display_order', 0)}) elif import_type == 'parts': session.execute(text("""INSERT INTO parts (oem_part_number, name_part, name_es, group_id, description, description_es, weight_kg, id_material) VALUES (:oem, :name, :name_es, :gid, :desc, :desc_es, :weight, (SELECT id_material FROM materials WHERE name_material = :material))"""), {'oem': rec['oem_part_number'], 'name': rec['name'], 'name_es': rec.get('name_es'), 'gid': rec['group_id'], 'desc': rec.get('description'), 'desc_es': rec.get('description_es'), 'weight': rec.get('weight_kg'), 'material': rec.get('material')}) elif import_type == 'manufacturers': session.execute(text("""INSERT INTO manufacturers (name_manufacture, id_type_manu, id_quality_tier, id_country, website) VALUES (:name, (SELECT id_type_manu FROM manufacture_type WHERE name_type_manu = :type), (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), (SELECT id_country FROM countries WHERE name_country = :country), :website)"""), {'name': rec['name'], 'type': rec.get('type', 'aftermarket'), 'qt': rec.get('quality_tier', 'standard'), 'country': rec.get('country'), 'website': rec.get('website')}) elif import_type == 'aftermarket': session.execute(text("""INSERT INTO aftermarket_parts (oem_part_id, manufacturer_id, part_number, name_aftermarket_parts, name_es, id_quality_tier, price_usd, warranty_months) VALUES (:oem_part_id, :mid, :pn, :name, :name_es, (SELECT id_quality_tier FROM quality_tier WHERE name_quality = :qt), :price, :warranty)"""), {'oem_part_id': rec['oem_part_id'], 'mid': rec['manufacturer_id'], 'pn': rec['part_number'], 'name': rec.get('name'), 'name_es': rec.get('name_es'), 'qt': rec.get('quality_tier', 'standard'), 'price': rec.get('price_usd'), 'warranty': rec.get('warranty_months')}) elif import_type == 'crossref': session.execute(text("""INSERT INTO part_cross_references (part_id, cross_reference_number, id_ref_type, source_ref, notes) VALUES (:pid, :cross_ref, (SELECT id_ref_type FROM reference_type WHERE name_ref_type = :ref_type), :source, :notes)"""), {'pid': rec['part_id'], 'cross_ref': rec['cross_reference_number'], 'ref_type': rec.get('reference_type'), 'source': rec.get('source'), 'notes': rec.get('notes')}) elif import_type == 'fitment': session.execute(text("""INSERT INTO vehicle_parts (model_year_engine_id, part_id, quantity_required, id_position_part, fitment_notes) VALUES (:mye_id, :pid, :qty, (SELECT id_position_part FROM position_part WHERE name_position_part = :position), :notes) ON CONFLICT (model_year_engine_id, part_id, id_position_part) DO NOTHING"""), {'mye_id': rec['model_year_engine_id'], 'pid': rec['part_id'], 'qty': rec.get('quantity_required', 1), 'position': rec.get('position'), 'notes': rec.get('fitment_notes')}) else: return jsonify({'error': f'Unknown import type: {import_type}'}), 400 imported += 1 except Exception as e: errors.append(f"Row {i + 1}: {str(e)}") session.commit() result = {'imported': imported} if errors: result['errors'] = errors[:10] return jsonify(result) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # CSV Export # ============================================================================ @app.route('/api/admin/export/') def api_admin_export_csv(export_type): session = Session() try: page = request.args.get('page', 1, type=int) per_page = min(request.args.get('per_page', 1000, type=int), 10000) offset = (page - 1) * per_page export_queries = { 'categories': ("SELECT id_part_category AS id, name_part_category AS name, name_es, slug, icon_name, display_order FROM part_categories ORDER BY display_order, name_part_category", "part_categories"), 'groups': ("SELECT id_part_group AS id, category_id, name_part_group AS name, name_es, display_order FROM part_groups ORDER BY category_id, display_order, name_part_group", "part_groups"), 'parts': ("SELECT p.id_part AS id, p.oem_part_number, p.name_part AS name, p.name_es, p.group_id, p.description, p.description_es, p.weight_kg, mat.name_material AS material FROM parts p LEFT JOIN materials mat ON p.id_material = mat.id_material ORDER BY p.id_part", "parts"), 'manufacturers': ("SELECT mfr.id_manufacture AS id, mfr.name_manufacture AS name, mt.name_type_manu AS type, qt.name_quality AS quality_tier, co.name_country AS country, mfr.website FROM manufacturers mfr LEFT JOIN manufacture_type mt ON mfr.id_type_manu = mt.id_type_manu LEFT JOIN quality_tier qt ON mfr.id_quality_tier = qt.id_quality_tier LEFT JOIN countries co ON mfr.id_country = co.id_country ORDER BY mfr.name_manufacture", "manufacturers"), 'aftermarket': ("SELECT ap.id_aftermarket_parts AS id, ap.oem_part_id, ap.manufacturer_id, ap.part_number, ap.name_aftermarket_parts AS name, ap.name_es, qt.name_quality AS quality_tier, ap.price_usd, ap.warranty_months FROM aftermarket_parts ap LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier ORDER BY ap.id_aftermarket_parts", "aftermarket_parts"), 'crossref': ("SELECT pcr.id_part_cross_ref AS id, pcr.part_id, pcr.cross_reference_number, rt.name_ref_type AS reference_type, pcr.source_ref AS source, pcr.notes FROM part_cross_references pcr LEFT JOIN reference_type rt ON pcr.id_ref_type = rt.id_ref_type ORDER BY pcr.id_part_cross_ref", "part_cross_references"), 'fitment': ("SELECT vp.id_vehicle_part AS id, vp.model_year_engine_id, vp.part_id, vp.quantity_required, pp.name_position_part AS position, vp.fitment_notes FROM vehicle_parts vp LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part ORDER BY vp.id_vehicle_part", "vehicle_parts"), } if export_type not in export_queries: return jsonify({'error': f'Unknown export type: {export_type}'}), 400 base_query, table_name = export_queries[export_type] total_count = session.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar() rows = session.execute(text(base_query + " LIMIT :limit OFFSET :offset"), {'limit': per_page, 'offset': offset}).mappings().all() data_list = [dict(r) for r in rows] total_pages = (total_count + per_page - 1) // per_page return jsonify({'data': data_list, 'pagination': {'page': page, 'per_page': per_page, 'total': total_count, 'total_pages': total_pages}}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Image Upload # ============================================================================ @app.route('/api/admin/upload-image', methods=['POST']) def api_admin_upload_image(): try: data = request.get_json() image_data = data.get('image') if not image_data: return jsonify({'error': 'No image data provided'}), 400 if ',' in image_data: header, encoded = image_data.split(',', 1) ext = 'png' if 'jpeg' in header or 'jpg' in header: ext = 'jpg' elif 'gif' in header: ext = 'gif' elif 'webp' in header: ext = 'webp' else: encoded = image_data ext = 'png' image_bytes = base64.b64decode(encoded) filename = f"{uuid.uuid4().hex}.{ext}" filepath = os.path.join('static', 'parts_images', filename) os.makedirs(os.path.join('.', 'static', 'parts_images'), exist_ok=True) with open(filepath, 'wb') as f: f.write(image_bytes) return jsonify({'url': f"/static/parts_images/{filename}", 'filename': filename}) except Exception as e: return jsonify({'error': str(e)}), 500 # ============================================================================ # Diagrams by Category # ============================================================================ @app.route('/api/vehicles//diagrams/by-category') def api_vehicle_diagrams_by_category(mye_id): session = Session() try: category_id = request.args.get('category_id', type=int) params = {'mye_id': mye_id} cat_filter = "" if category_id: cat_filter = " AND pc.id_part_category = :cid" params['cid'] = category_id rows = session.execute(text(f""" SELECT DISTINCT d.id_diagram AS id, d.name_diagram AS name, d.name_es, d.group_id, d.image_path, d.thumbnail_path, pg.name_part_group AS group_name, pg.name_es AS group_name_es, pc.id_part_category AS category_id, pc.name_part_category AS category_name, pc.name_es AS category_name_es, vd.notes FROM vehicle_diagrams vd JOIN diagrams d ON vd.diagram_id = d.id_diagram JOIN part_groups pg ON d.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE vd.model_year_engine_id = :mye_id {cat_filter} ORDER BY pc.display_order, pg.display_order, d.display_order, d.name_diagram """), params).mappings().all() categories = {} for r in rows: cid = r['category_id'] if cid not in categories: categories[cid] = {'category_id': cid, 'category_name': r['category_name'], 'category_name_es': r['category_name_es'], 'diagrams': []} ip = r['image_path'] or '' iu = '/' + ip if ip and not ip.startswith('/') else ip categories[cid]['diagrams'].append({'id': r['id'], 'name': r['name'], 'name_es': r['name_es'], 'group_id': r['group_id'], 'group_name': r['group_name'], 'group_name_es': r['group_name_es'], 'image_url': iu, 'thumbnail_path': r['thumbnail_path'], 'notes': r['notes']}) return jsonify(list(categories.values())) except Exception as e: return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Hotspot CRUD # ============================================================================ @app.route('/api/admin/hotspots', methods=['POST']) def api_admin_create_hotspot(): session = Session() try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 diagram_id = data.get('diagram_id') coords = data.get('coords', '') if not diagram_id or not coords: return jsonify({'error': 'diagram_id and coords are required'}), 400 result = session.execute(text(""" INSERT INTO diagram_hotspots (diagram_id, part_id, callout_number, id_shape, coords) VALUES (:did, :pid, :callout, (SELECT id_shape FROM shapes WHERE name_shape = :shape), :coords) RETURNING id_dgr_hotspot """), {'did': diagram_id, 'pid': data.get('part_id'), 'callout': data.get('callout_number'), 'shape': data.get('shape', 'circle'), 'coords': coords}) hotspot_id = result.scalar() session.commit() return jsonify({'id': hotspot_id, 'message': 'Hotspot created'}), 201 except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/hotspots/', methods=['PUT']) def api_admin_update_hotspot(hotspot_id): session = Session() try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 exists = session.execute(text("SELECT id_dgr_hotspot FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id}).mappings().first() if not exists: return jsonify({'error': 'Hotspot not found'}), 404 fields = [] params = {'id': hotspot_id} if 'part_id' in data: fields.append("part_id = :part_id") params['part_id'] = data['part_id'] if 'callout_number' in data: fields.append("callout_number = :callout_number") params['callout_number'] = data['callout_number'] if 'shape' in data: fields.append("id_shape = (SELECT id_shape FROM shapes WHERE name_shape = :shape)") params['shape'] = data['shape'] if 'coords' in data: fields.append("coords = :coords") params['coords'] = data['coords'] if not fields: return jsonify({'error': 'No fields to update'}), 400 session.execute(text(f"UPDATE diagram_hotspots SET {', '.join(fields)} WHERE id_dgr_hotspot = :id"), params) session.commit() return jsonify({'message': 'Hotspot updated'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/admin/hotspots/', methods=['DELETE']) def api_admin_delete_hotspot(hotspot_id): session = Session() try: exists = session.execute(text("SELECT id_dgr_hotspot FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id}).mappings().first() if not exists: return jsonify({'error': 'Hotspot not found'}), 404 session.execute(text("DELETE FROM diagram_hotspots WHERE id_dgr_hotspot = :id"), {'id': hotspot_id}) session.commit() return jsonify({'message': 'Hotspot deleted'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ============================================================================ # Captura (Data Entry) Endpoints # ============================================================================ @app.route('/captura') def captura_page(): return send_from_directory('.', 'captura.html') @app.route('/captura.js') def captura_js(): return send_from_directory('.', 'captura.js') @app.route('/captura.css') def captura_css(): return send_from_directory('.', 'captura.css') @app.route('/api/captura/vehicles/pending') def api_captura_vehicles_pending(): session = Session() try: brand = request.args.get('brand', '') model = request.args.get('model', '') page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page filters = ["mye.captura_status = 'pending'"] params = {'limit': per_page, 'offset': offset} if brand: filters.append("b.name_brand = :brand") params['brand'] = brand if model: filters.append("m.name_model ILIKE :model") params['model'] = f'%{model}%' where = ' AND '.join(filters) total = session.execute(text(f""" SELECT COUNT(*) FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand WHERE {where} """), params).scalar() rows = session.execute(text(f""" SELECT mye.id_mye, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, mye.trim_level, mye.captura_status FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE {where} ORDER BY b.name_brand, m.name_model, y.year_car, e.name_engine LIMIT :limit OFFSET :offset """), params).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/captura/vehicles/in-progress') def api_captura_vehicles_in_progress(): session = Session() try: page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page total = session.execute(text( "SELECT COUNT(*) FROM model_year_engine WHERE captura_status = 'in_progress'" )).scalar() rows = session.execute(text(""" SELECT mye.id_mye, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, mye.trim_level, mye.captura_status, COUNT(vp.id_vehicle_part) AS parts_count FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine LEFT JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye WHERE mye.captura_status = 'in_progress' GROUP BY mye.id_mye, b.name_brand, m.name_model, y.year_car, e.name_engine, mye.trim_level, mye.captura_status ORDER BY b.name_brand, m.name_model, y.year_car LIMIT :limit OFFSET :offset """), {'limit': per_page, 'offset': offset}).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/captura/vehicles//status', methods=['PUT']) def api_captura_vehicle_status(mye_id): session = Session() try: data = request.get_json() status = data.get('status') if status not in ('pending', 'in_progress', 'completed'): return jsonify({'error': 'Invalid status'}), 400 session.execute(text( "UPDATE model_year_engine SET captura_status = :status WHERE id_mye = :id" ), {'status': status, 'id': mye_id}) session.commit() return jsonify({'message': f'Status updated to {status}'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/captura/vehicles//parts') def api_captura_vehicle_parts(mye_id): session = Session() try: vehicle = session.execute(text(""" SELECT mye.id_mye, b.name_brand AS brand, m.name_model AS model, y.year_car AS year, e.name_engine AS engine, mye.trim_level FROM model_year_engine mye JOIN models m ON mye.model_id = m.id_model JOIN brands b ON m.brand_id = b.id_brand JOIN years y ON mye.year_id = y.id_year JOIN engines e ON mye.engine_id = e.id_engine WHERE mye.id_mye = :id """), {'id': mye_id}).mappings().first() if not vehicle: return jsonify({'error': 'Vehicle not found'}), 404 groups = session.execute(text(""" SELECT pc.id_part_category, pc.name_part_category AS category, pg.id_part_group, pg.name_part_group AS group_name, pc.display_order AS cat_order, pg.display_order AS grp_order FROM part_categories pc JOIN part_groups pg ON pg.category_id = pc.id_part_category ORDER BY pc.display_order, pg.display_order """)).mappings().all() existing = session.execute(text(""" SELECT vp.id_vehicle_part, vp.part_id, p.oem_part_number, p.name_part, p.name_es, p.group_id, vp.quantity_required, pp.name_position_part AS position FROM vehicle_parts vp JOIN parts p ON vp.part_id = p.id_part LEFT JOIN position_part pp ON vp.id_position_part = pp.id_position_part WHERE vp.model_year_engine_id = :id ORDER BY p.group_id, p.oem_part_number """), {'id': mye_id}).mappings().all() return jsonify({ 'vehicle': dict(vehicle), 'groups': [dict(g) for g in groups], 'parts': [dict(e) for e in existing] }) finally: session.close() @app.route('/api/captura/parts/without-aftermarket') def api_captura_parts_without_aftermarket(): session = Session() try: search = request.args.get('search', '') group_id = request.args.get('group_id', '') page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page filters = ["NOT EXISTS (SELECT 1 FROM aftermarket_parts ap WHERE ap.oem_part_id = p.id_part)"] params = {'limit': per_page, 'offset': offset} if search: filters.append("(p.oem_part_number ILIKE :search OR p.name_part ILIKE :search)") params['search'] = f'%{search}%' if group_id: filters.append("p.group_id = :group_id") params['group_id'] = int(group_id) where = ' AND '.join(filters) total = session.execute(text(f"SELECT COUNT(*) FROM parts p WHERE {where}"), params).scalar() rows = session.execute(text(f""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE {where} ORDER BY pc.display_order, pg.display_order, p.oem_part_number LIMIT :limit OFFSET :offset """), params).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/captura/parts/without-image') def api_captura_parts_without_image(): session = Session() try: search = request.args.get('search', '') page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page filters = ["(p.image_url IS NULL OR p.image_url = '')"] params = {'limit': per_page, 'offset': offset} if search: filters.append("(p.oem_part_number ILIKE :search OR p.name_part ILIKE :search)") params['search'] = f'%{search}%' where = ' AND '.join(filters) total = session.execute(text(f"SELECT COUNT(*) FROM parts p WHERE {where}"), params).scalar() rows = session.execute(text(f""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, pg.name_part_group AS group_name, pc.name_part_category AS category FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group JOIN part_categories pc ON pg.category_id = pc.id_part_category WHERE {where} ORDER BY p.oem_part_number LIMIT :limit OFFSET :offset """), params).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/captura/parts//image', methods=['POST']) def api_captura_upload_part_image(part_id): session = Session() try: if 'image' not in request.files: return jsonify({'error': 'No image file provided'}), 400 file = request.files['image'] if not file.filename: return jsonify({'error': 'No file selected'}), 400 allowed = {'jpg', 'jpeg', 'png', 'webp'} ext = file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else '' if ext not in allowed: return jsonify({'error': f'Tipo no permitido. Usar: {", ".join(allowed)}'}), 400 file.seek(0, 2) size = file.tell() file.seek(0) if size > 2 * 1024 * 1024: return jsonify({'error': 'Archivo muy grande (max 2MB)'}), 400 part = session.execute(text( "SELECT oem_part_number FROM parts WHERE id_part = :id" ), {'id': part_id}).mappings().first() if not part: return jsonify({'error': 'Part not found'}), 404 safe_oem = re.sub(r'[^a-zA-Z0-9_-]', '_', part['oem_part_number']) filename = f"{safe_oem}.{ext}" filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static', 'parts', filename) file.save(filepath) image_url = f"/static/parts/{filename}" session.execute(text( "UPDATE parts SET image_url = :url WHERE id_part = :id" ), {'url': image_url, 'id': part_id}) session.commit() return jsonify({'message': 'Image uploaded', 'image_url': image_url}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/captura/parts/check-oem') def api_captura_check_oem(): session = Session() try: oem = request.args.get('oem', '') if not oem: return jsonify({'exists': False}) row = session.execute(text( "SELECT id_part, oem_part_number, name_part, name_es, group_id FROM parts WHERE oem_part_number = :oem" ), {'oem': oem}).mappings().first() if row: return jsonify({'exists': True, 'part': dict(row)}) return jsonify({'exists': False}) finally: session.close() @app.route('/api/captura/manufacturers') def api_captura_manufacturers(): session = Session() try: rows = session.execute(text(""" SELECT m.id_manufacture AS id, m.name_manufacture AS name, qt.name_quality AS quality FROM manufacturers m LEFT JOIN quality_tier qt ON m.id_quality_tier = qt.id_quality_tier ORDER BY m.name_manufacture """)).mappings().all() return jsonify([dict(r) for r in rows]) finally: session.close() @app.route('/api/captura/parts//aftermarket') def api_captura_part_aftermarket(part_id): session = Session() try: rows = session.execute(text(""" SELECT ap.id_aftermarket_parts AS id, ap.part_number, ap.name_aftermarket_parts AS name, m.name_manufacture AS manufacturer, qt.name_quality AS quality, ap.price_usd, ap.warranty_months FROM aftermarket_parts ap JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture LEFT JOIN quality_tier qt ON ap.id_quality_tier = qt.id_quality_tier WHERE ap.oem_part_id = :id ORDER BY m.name_manufacture """), {'id': part_id}).mappings().all() return jsonify([dict(r) for r in rows]) finally: session.close() # ============================================================================ # POS (Point of Sale) Endpoints # ============================================================================ @app.route('/pos') def pos_page(): return send_from_directory('.', 'pos.html') @app.route('/pos.js') def pos_js(): return send_from_directory('.', 'pos.js') @app.route('/pos.css') def pos_css(): return send_from_directory('.', 'pos.css') @app.route('/cuentas') def cuentas_page(): return send_from_directory('.', 'cuentas.html') @app.route('/cuentas.js') def cuentas_js(): return send_from_directory('.', 'cuentas.js') @app.route('/cuentas.css') def cuentas_css(): return send_from_directory('.', 'cuentas.css') # ---- Customers ---- @app.route('/api/pos/customers') def api_pos_customers(): session = Session() try: search = request.args.get('search', '') page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page filters = ["active = TRUE"] params = {'limit': per_page, 'offset': offset} if search: filters.append("(name ILIKE :search OR rfc ILIKE :search OR business_name ILIKE :search)") params['search'] = f'%{search}%' where = ' AND '.join(filters) total = session.execute(text(f"SELECT COUNT(*) FROM customers WHERE {where}"), params).scalar() rows = session.execute(text(f""" SELECT id_customer, name, rfc, business_name, phone, balance, credit_limit, payment_terms FROM customers WHERE {where} ORDER BY name LIMIT :limit OFFSET :offset """), params).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/pos/customers/') def api_pos_customer_detail(customer_id): session = Session() try: row = session.execute(text( "SELECT * FROM customers WHERE id_customer = :id" ), {'id': customer_id}).mappings().first() if not row: return jsonify({'error': 'Cliente no encontrado'}), 404 return jsonify(dict(row)) finally: session.close() @app.route('/api/pos/customers', methods=['POST']) def api_pos_create_customer(): session = Session() try: data = request.get_json() result = session.execute(text(""" INSERT INTO customers (name, rfc, business_name, email, phone, address, credit_limit, payment_terms) VALUES (:name, :rfc, :business_name, :email, :phone, :address, :credit_limit, :payment_terms) RETURNING id_customer """), { 'name': data['name'], 'rfc': data.get('rfc'), 'business_name': data.get('business_name'), 'email': data.get('email'), 'phone': data.get('phone'), 'address': data.get('address'), 'credit_limit': data.get('credit_limit', 0), 'payment_terms': data.get('payment_terms', 30) }) new_id = result.scalar() session.commit() return jsonify({'id': new_id, 'message': 'Cliente creado'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/pos/customers/', methods=['PUT']) def api_pos_update_customer(customer_id): session = Session() try: data = request.get_json() session.execute(text(""" UPDATE customers SET name = :name, rfc = :rfc, business_name = :business_name, email = :email, phone = :phone, address = :address, credit_limit = :credit_limit, payment_terms = :payment_terms WHERE id_customer = :id """), { 'name': data['name'], 'rfc': data.get('rfc'), 'business_name': data.get('business_name'), 'email': data.get('email'), 'phone': data.get('phone'), 'address': data.get('address'), 'credit_limit': data.get('credit_limit', 0), 'payment_terms': data.get('payment_terms', 30), 'id': customer_id }) session.commit() return jsonify({'message': 'Cliente actualizado'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Invoices ---- @app.route('/api/pos/invoices') def api_pos_invoices(): session = Session() try: customer_id = request.args.get('customer_id', '') status = request.args.get('status', '') page = int(request.args.get('page', 1)) per_page = min(int(request.args.get('per_page', 50)), 100) offset = (page - 1) * per_page filters = ["1=1"] params = {'limit': per_page, 'offset': offset} if customer_id: filters.append("i.customer_id = :customer_id") params['customer_id'] = int(customer_id) if status: filters.append("i.status = :status") params['status'] = status where = ' AND '.join(filters) total = session.execute(text(f""" SELECT COUNT(*) FROM invoices i WHERE {where} """), params).scalar() rows = session.execute(text(f""" SELECT i.id_invoice, i.folio, i.date_issued, i.subtotal, i.tax_amount, i.total, i.amount_paid, i.status, c.name AS customer_name, c.rfc FROM invoices i JOIN customers c ON i.customer_id = c.id_customer WHERE {where} ORDER BY i.date_issued DESC LIMIT :limit OFFSET :offset """), params).mappings().all() return jsonify({'data': [dict(r) for r in rows], 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': (total + per_page - 1) // per_page }}) finally: session.close() @app.route('/api/pos/invoices/') def api_pos_invoice_detail(invoice_id): session = Session() try: inv = session.execute(text(""" SELECT i.*, c.name AS customer_name, c.rfc, c.business_name, c.address FROM invoices i JOIN customers c ON i.customer_id = c.id_customer WHERE i.id_invoice = :id """), {'id': invoice_id}).mappings().first() if not inv: return jsonify({'error': 'Factura no encontrada'}), 404 items = session.execute(text(""" SELECT ii.*, p.oem_part_number, ap.part_number AS aftermarket_number FROM invoice_items ii LEFT JOIN parts p ON ii.part_id = p.id_part LEFT JOIN aftermarket_parts ap ON ii.aftermarket_id = ap.id_aftermarket_parts WHERE ii.invoice_id = :id ORDER BY ii.id_invoice_item """), {'id': invoice_id}).mappings().all() return jsonify({'invoice': dict(inv), 'items': [dict(it) for it in items]}) finally: session.close() @app.route('/api/pos/invoices', methods=['POST']) def api_pos_create_invoice(): session = Session() try: data = request.get_json() customer_id = data['customer_id'] items = data['items'] tax_rate = data.get('tax_rate', 0.16) notes = data.get('notes', '') if not items: return jsonify({'error': 'La factura debe tener al menos una linea'}), 400 folio_num = session.execute(text("SELECT nextval('invoice_folio_seq')")).scalar() folio = f"NX-{folio_num:06d}" subtotal = sum(it['quantity'] * it['unit_price'] for it in items) tax_amount = round(subtotal * tax_rate, 2) total = round(subtotal + tax_amount, 2) result = session.execute(text(""" INSERT INTO invoices (customer_id, folio, subtotal, tax_rate, tax_amount, total, notes) VALUES (:customer_id, :folio, :subtotal, :tax_rate, :tax_amount, :total, :notes) RETURNING id_invoice """), { 'customer_id': customer_id, 'folio': folio, 'subtotal': subtotal, 'tax_rate': tax_rate, 'tax_amount': tax_amount, 'total': total, 'notes': notes }) invoice_id = result.scalar() for it in items: line_total = it['quantity'] * it['unit_price'] session.execute(text(""" INSERT INTO invoice_items (invoice_id, part_id, aftermarket_id, description, quantity, unit_cost, margin_pct, unit_price, line_total) VALUES (:inv_id, :part_id, :af_id, :desc, :qty, :cost, :margin, :price, :total) """), { 'inv_id': invoice_id, 'part_id': it.get('part_id'), 'af_id': it.get('aftermarket_id'), 'desc': it['description'], 'qty': it['quantity'], 'cost': it.get('unit_cost', 0), 'margin': it.get('margin_pct', 30), 'price': it['unit_price'], 'total': line_total }) session.execute(text( "UPDATE customers SET balance = balance + :total WHERE id_customer = :id" ), {'total': total, 'id': customer_id}) session.commit() return jsonify({'id': invoice_id, 'folio': folio, 'total': total, 'message': 'Factura creada'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/pos/invoices//cancel', methods=['PUT']) def api_pos_cancel_invoice(invoice_id): session = Session() try: inv = session.execute(text( "SELECT total, customer_id, status FROM invoices WHERE id_invoice = :id" ), {'id': invoice_id}).mappings().first() if not inv: return jsonify({'error': 'Factura no encontrada'}), 404 if inv['status'] == 'cancelled': return jsonify({'error': 'La factura ya esta cancelada'}), 400 session.execute(text( "UPDATE invoices SET status = 'cancelled' WHERE id_invoice = :id" ), {'id': invoice_id}) session.execute(text( "UPDATE customers SET balance = balance - :total WHERE id_customer = :cid" ), {'total': inv['total'], 'cid': inv['customer_id']}) session.commit() return jsonify({'message': 'Factura cancelada'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() # ---- Payments ---- @app.route('/api/pos/payments', methods=['POST']) def api_pos_create_payment(): session = Session() try: data = request.get_json() customer_id = data['customer_id'] amount = float(data['amount']) payment_method = data.get('payment_method', 'efectivo') reference = data.get('reference') invoice_id = data.get('invoice_id') notes = data.get('notes') if amount <= 0: return jsonify({'error': 'El monto debe ser mayor a 0'}), 400 result = session.execute(text(""" INSERT INTO payments (customer_id, invoice_id, amount, payment_method, reference, notes) VALUES (:cid, :inv_id, :amount, :method, :ref, :notes) RETURNING id_payment """), { 'cid': customer_id, 'inv_id': invoice_id, 'amount': amount, 'method': payment_method, 'ref': reference, 'notes': notes }) payment_id = result.scalar() session.execute(text( "UPDATE customers SET balance = balance - :amount WHERE id_customer = :id" ), {'amount': amount, 'id': customer_id}) if invoice_id: session.execute(text( "UPDATE invoices SET amount_paid = amount_paid + :amount WHERE id_invoice = :id" ), {'amount': amount, 'id': invoice_id}) session.execute(text(""" UPDATE invoices SET status = CASE WHEN amount_paid >= total THEN 'paid' WHEN amount_paid > 0 THEN 'partial' ELSE 'pending' END WHERE id_invoice = :id """), {'id': invoice_id}) session.commit() return jsonify({'id': payment_id, 'message': 'Pago registrado'}) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/pos/customers//statement') def api_pos_customer_statement(customer_id): session = Session() try: customer = session.execute(text( "SELECT * FROM customers WHERE id_customer = :id" ), {'id': customer_id}).mappings().first() if not customer: return jsonify({'error': 'Cliente no encontrado'}), 404 invoices = session.execute(text(""" SELECT id_invoice, folio, date_issued, total, amount_paid, status FROM invoices WHERE customer_id = :id AND status != 'cancelled' ORDER BY date_issued DESC LIMIT 100 """), {'id': customer_id}).mappings().all() payments = session.execute(text(""" SELECT p.id_payment, p.amount, p.payment_method, p.reference, p.date_payment, p.notes, i.folio AS invoice_folio FROM payments p LEFT JOIN invoices i ON p.invoice_id = i.id_invoice WHERE p.customer_id = :id ORDER BY p.date_payment DESC LIMIT 100 """), {'id': customer_id}).mappings().all() return jsonify({ 'customer': dict(customer), 'invoices': [dict(i) for i in invoices], 'payments': [dict(p) for p in payments] }) finally: session.close() @app.route('/api/pos/search-parts') def api_pos_search_parts(): session = Session() try: q = request.args.get('q', '') if len(q) < 2: return jsonify([]) results = [] oem = session.execute(text(""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, p.cost_usd, pg.name_part_group AS group_name, 'oem' AS part_type FROM parts p JOIN part_groups pg ON p.group_id = pg.id_part_group WHERE p.oem_part_number ILIKE :q OR p.name_part ILIKE :q ORDER BY p.oem_part_number LIMIT 20 """), {'q': f'%{q}%'}).mappings().all() results.extend([dict(r) for r in oem]) af = session.execute(text(""" SELECT ap.id_aftermarket_parts AS id_part, ap.part_number AS oem_part_number, ap.name_aftermarket_parts AS name_part, ap.name_es, COALESCE(ap.cost_usd, ap.price_usd) AS cost_usd, m.name_manufacture AS group_name, 'aftermarket' AS part_type FROM aftermarket_parts ap JOIN manufacturers m ON ap.manufacturer_id = m.id_manufacture WHERE ap.part_number ILIKE :q OR ap.name_aftermarket_parts ILIKE :q ORDER BY ap.part_number LIMIT 20 """), {'q': f'%{q}%'}).mappings().all() results.extend([dict(r) for r in af]) return jsonify(results) finally: session.close() # ============================================================================ # Store Dashboard Endpoints # ============================================================================ @app.route('/demo') def demo_page(): return send_from_directory('.', 'demo.html') @app.route('/bodega') def bodega_page(): return send_from_directory('.', 'bodega.html') @app.route('/bodega.js') def bodega_js(): return send_from_directory('.', 'bodega.js') @app.route('/bodega.css') def bodega_css(): return send_from_directory('.', 'bodega.css') @app.route('/login.html') def login_page(): return send_from_directory('.', 'login.html') @app.route('/login.js') def login_js(): return send_from_directory('.', 'login.js') @app.route('/login.css') def login_css(): return send_from_directory('.', 'login.css') @app.route('/tienda') def tienda_page(): return send_from_directory('.', 'tienda.html') @app.route('/tienda.js') def tienda_js(): return send_from_directory('.', 'tienda.js') @app.route('/tienda.css') def tienda_css(): return send_from_directory('.', 'tienda.css') @app.route('/api/tienda/stats') def api_tienda_stats(): session = Session() try: today = "date_issued::date = CURRENT_DATE" month = "date_issued >= date_trunc('month', CURRENT_DATE)" sales_today = session.execute(text(f""" SELECT COALESCE(SUM(total), 0), COUNT(*) FROM invoices WHERE {today} AND status != 'cancelled' """)).fetchone() sales_month = session.execute(text(f""" SELECT COALESCE(SUM(total), 0), COUNT(*) FROM invoices WHERE {month} AND status != 'cancelled' """)).fetchone() payments_today = session.execute(text(f""" SELECT COALESCE(SUM(amount), 0), COUNT(*) FROM payments WHERE date_payment::date = CURRENT_DATE """)).fetchone() pending_balance = session.execute(text( "SELECT COALESCE(SUM(balance), 0) FROM customers WHERE active = TRUE AND balance > 0" )).scalar() pending_invoices = session.execute(text( "SELECT COUNT(*) FROM invoices WHERE status IN ('pending', 'partial')" )).scalar() total_customers = session.execute(text( "SELECT COUNT(*) FROM customers WHERE active = TRUE" )).scalar() total_parts = session.execute(text("SELECT COUNT(*) FROM parts")).scalar() total_aftermarket = session.execute(text("SELECT COUNT(*) FROM aftermarket_parts")).scalar() recent_invoices = session.execute(text(""" SELECT i.folio, i.total, i.status, i.date_issued, c.name AS customer_name FROM invoices i JOIN customers c ON i.customer_id = c.id_customer ORDER BY i.date_issued DESC LIMIT 8 """)).mappings().all() top_debtors = session.execute(text(""" SELECT id_customer, name, balance, credit_limit FROM customers WHERE active = TRUE AND balance > 0 ORDER BY balance DESC LIMIT 6 """)).mappings().all() return jsonify({ 'sales_today': {'total': float(sales_today[0]), 'count': int(sales_today[1])}, 'sales_month': {'total': float(sales_month[0]), 'count': int(sales_month[1])}, 'payments_today': {'total': float(payments_today[0]), 'count': int(payments_today[1])}, 'pending_balance': float(pending_balance), 'pending_invoices': pending_invoices, 'total_customers': total_customers, 'total_parts': total_parts, 'total_aftermarket': total_aftermarket, 'recent_invoices': [dict(r) for r in recent_invoices], 'top_debtors': [dict(r) for r in top_debtors] }) finally: session.close() # ============================================================================ # Auth Endpoints # ============================================================================ @app.route('/api/auth/register', methods=['POST']) def auth_register(): """Register a new user (TALLER or BODEGA). Account starts inactive.""" data = request.get_json() if not data: return jsonify({'error': 'Invalid JSON'}), 400 required = ['name', 'email', 'password', 'role', 'business_name'] for field in required: if not data.get(field): return jsonify({'error': f'Missing required field: {field}'}), 400 role = data['role'].upper() if role not in ('TALLER', 'BODEGA'): return jsonify({'error': 'Role must be TALLER or BODEGA'}), 400 role_map = {'TALLER': 3, 'BODEGA': 4} id_rol = role_map[role] hashed = hash_password(data['password']) session = Session() try: session.execute(text( """INSERT INTO users (name_user, email, pass, id_rol, business_name, phone, address, is_active, created_at) VALUES (:name, :email, :pass, :id_rol, :biz, :phone, :addr, false, NOW())""" ), { 'name': data['name'], 'email': data['email'], 'pass': hashed, 'id_rol': id_rol, 'biz': data['business_name'], 'phone': data.get('phone', ''), 'addr': data.get('address', '') }) session.commit() return jsonify({'message': 'Registration successful. Account pending activation.'}), 201 except IntegrityError: session.rollback() return jsonify({'error': 'Email already registered'}), 409 finally: session.close() @app.route('/api/auth/login', methods=['POST']) def auth_login(): """Authenticate user and return access + refresh tokens.""" data = request.get_json() if not data or not data.get('email') or not data.get('password'): return jsonify({'error': 'Email and password are required'}), 400 session = Session() try: row = session.execute(text( """SELECT u.id_user, u.name_user, u.email, u.pass, u.is_active, u.business_name, r.name_rol FROM users u JOIN roles r ON r.id_rol = u.id_rol WHERE u.email = :email""" ), {'email': data['email']}).mappings().first() if not row: return jsonify({'error': 'Invalid email or password'}), 401 if not check_password(data['password'], row['pass']): return jsonify({'error': 'Invalid email or password'}), 401 if not row['is_active']: return jsonify({'error': 'Account is not active. Contact an administrator.'}), 403 # Update last_login session.execute(text( "UPDATE users SET last_login = NOW() WHERE id_user = :uid" ), {'uid': row['id_user']}) session.commit() access_token = create_access_token(row['id_user'], row['name_rol'], row['business_name']) refresh_token = create_refresh_token(row['id_user']) return jsonify({ 'access_token': access_token, 'refresh_token': refresh_token, 'user': { 'id': row['id_user'], 'name': row['name_user'], 'role': row['name_rol'], 'business_name': row['business_name'] } }) finally: session.close() @app.route('/api/auth/refresh', methods=['POST']) def auth_refresh(): """Exchange a valid refresh token for a new access token.""" data = request.get_json() if not data or not data.get('refresh_token'): return jsonify({'error': 'refresh_token is required'}), 400 session = Session() try: row = session.execute(text( """SELECT s.user_id, s.expires_at, u.business_name, r.name_rol FROM sessions s JOIN users u ON u.id_user = s.user_id JOIN roles r ON r.id_rol = u.id_rol WHERE s.refresh_token = :token""" ), {'token': data['refresh_token']}).mappings().first() if not row: return jsonify({'error': 'Invalid refresh token'}), 401 if row['expires_at'] < datetime.utcnow(): # Clean up expired token session.execute(text( "DELETE FROM sessions WHERE refresh_token = :token" ), {'token': data['refresh_token']}) session.commit() return jsonify({'error': 'Refresh token expired'}), 401 access_token = create_access_token(row['user_id'], row['name_rol'], row['business_name']) return jsonify({'access_token': access_token}) finally: session.close() @app.route('/api/auth/me', methods=['GET']) @require_auth() def auth_me(): """Return the current authenticated user's info from the JWT payload.""" return jsonify(g.user) # ============================================================================ # Task 5: Admin User Management # ============================================================================ import csv import io import math @app.route('/api/admin/users', methods=['GET']) @require_auth('ADMIN', 'OWNER') def admin_list_users(): """Return list of all users with role info.""" session = Session() try: rows = session.execute(text( """SELECT u.id_user, u.name_user, u.email, u.business_name, u.phone, u.is_active, u.created_at, u.last_login, r.name_rol FROM users u JOIN roles r ON r.id_rol = u.id_rol ORDER BY u.created_at DESC""" )).mappings().all() users = [] for r in rows: users.append({ 'id': r['id_user'], 'name': r['name_user'], 'email': r['email'], 'business_name': r['business_name'], 'phone': r['phone'], 'is_active': r['is_active'], 'created_at': r['created_at'].isoformat() if r['created_at'] else None, 'last_login': r['last_login'].isoformat() if r['last_login'] else None, 'role': r['name_rol'] }) return jsonify(users) finally: session.close() @app.route('/api/admin/users//activate', methods=['PUT']) @require_auth('ADMIN', 'OWNER') def admin_activate_user(user_id): """Activate or deactivate a user.""" data = request.get_json() if data is None or 'is_active' not in data: return jsonify({'error': 'is_active field is required'}), 400 session = Session() try: result = session.execute(text( "UPDATE users SET is_active = :active WHERE id_user = :uid" ), {'active': bool(data['is_active']), 'uid': user_id}) session.commit() if result.rowcount == 0: return jsonify({'error': 'User not found'}), 404 return jsonify({'message': 'User updated', 'is_active': bool(data['is_active'])}) finally: session.close() # ============================================================================ # Task 6: Inventory Endpoints (BODEGA) # ============================================================================ @app.route('/api/inventory/mapping', methods=['GET']) @require_auth('BODEGA', 'ADMIN') def inventory_get_mapping(): """Return column mapping for current user.""" session = Session() try: row = session.execute(text( "SELECT mapping FROM inventory_column_mappings WHERE user_id = :uid" ), {'uid': g.user['user_id']}).mappings().first() return jsonify({'mapping': row['mapping'] if row else {}}) finally: session.close() @app.route('/api/inventory/mapping', methods=['PUT']) @require_auth('BODEGA', 'ADMIN') def inventory_put_mapping(): """Upsert column mapping for current user.""" data = request.get_json() if not data or 'mapping' not in data: return jsonify({'error': 'mapping is required'}), 400 mapping = data['mapping'] required_keys = ['part_number', 'price', 'stock'] missing = [k for k in required_keys if k not in mapping or not mapping[k]] if missing: return jsonify({'error': f'Missing required mapping keys: {", ".join(missing)}'}), 400 session = Session() try: session.execute(text( """INSERT INTO inventory_column_mappings (user_id, mapping) VALUES (:uid, :mapping) ON CONFLICT (user_id) DO UPDATE SET mapping = :mapping""" ), {'uid': g.user['user_id'], 'mapping': json_module.dumps(mapping)}) session.commit() return jsonify({'message': 'Mapping saved', 'mapping': mapping}) finally: session.close() @app.route('/api/inventory/upload', methods=['POST']) @require_auth('BODEGA', 'ADMIN') def inventory_upload(): """Upload inventory file (CSV or Excel), apply column mapping, upsert into warehouse_inventory.""" if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if not file.filename: return jsonify({'error': 'Empty filename'}), 400 session = Session() try: # 1. Get mapping row = session.execute(text( "SELECT mapping FROM inventory_column_mappings WHERE user_id = :uid" ), {'uid': g.user['user_id']}).mappings().first() if not row or not row['mapping']: return jsonify({'error': 'No column mapping configured. Set mapping first.'}), 400 mapping = row['mapping'] if isinstance(row['mapping'], dict) else json_module.loads(row['mapping']) # 2. Create upload record upload = session.execute(text( """INSERT INTO inventory_uploads (user_id, filename, status) VALUES (:uid, :fname, 'processing') RETURNING id_upload""" ), {'uid': g.user['user_id'], 'fname': file.filename}).mappings().first() session.commit() upload_id = upload['id_upload'] # 3. Parse file filename_lower = file.filename.lower() rows_data = [] if filename_lower.endswith(('.xlsx', '.xls')): import openpyxl wb = openpyxl.load_workbook(io.BytesIO(file.read()), read_only=True, data_only=True) ws = wb.active headers = None for row_cells in ws.iter_rows(values_only=True): if headers is None: headers = [str(c).strip() if c else '' for c in row_cells] continue row_dict = {} for i, val in enumerate(row_cells): if i < len(headers): row_dict[headers[i]] = val rows_data.append(row_dict) wb.close() else: # CSV content = file.read().decode('utf-8-sig', errors='replace') reader = csv.DictReader(io.StringIO(content)) for row_dict in reader: rows_data.append(row_dict) # 4. Process rows imported = 0 errors = 0 error_samples = [] def clean_price(val): if val is None: return None s = str(val).replace('$', '').replace(',', '').strip() try: return float(s) except (ValueError, TypeError): return None def clean_stock(val): if val is None: return 0 s = str(val).replace(',', '').strip() try: return int(float(s)) except (ValueError, TypeError): return 0 for i, row_dict in enumerate(rows_data): try: part_number_col = mapping.get('part_number', '') price_col = mapping.get('price', '') stock_col = mapping.get('stock', '') location_col = mapping.get('location', '') part_number = str(row_dict.get(part_number_col, '')).strip() if not part_number: errors += 1 if len(error_samples) < 10: error_samples.append({'row': i + 2, 'error': 'Empty part number'}) continue price = clean_price(row_dict.get(price_col)) stock = clean_stock(row_dict.get(stock_col)) location = str(row_dict.get(location_col, 'Principal')).strip() if location_col else 'Principal' if not location: location = 'Principal' # Find part by OEM part number part_row = session.execute(text( "SELECT id_part FROM parts WHERE oem_part_number = :pn LIMIT 1" ), {'pn': part_number}).mappings().first() # Also try aftermarket_parts.part_number if OEM not found if not part_row: am_row = session.execute(text( "SELECT oem_part_id FROM aftermarket_parts WHERE part_number = :pn LIMIT 1" ), {'pn': part_number}).mappings().first() if am_row: part_row = {'id_part': am_row['oem_part_id']} if not part_row: errors += 1 if len(error_samples) < 10: error_samples.append({'row': i + 2, 'error': f'Part not found: {part_number}'}) continue # UPSERT into warehouse_inventory session.execute(text( """INSERT INTO warehouse_inventory (user_id, part_id, price, stock_quantity, warehouse_location, updated_at) VALUES (:uid, :pid, :price, :stock, :loc, NOW()) ON CONFLICT (user_id, part_id, warehouse_location) DO UPDATE SET price = :price, stock_quantity = :stock, updated_at = NOW()""" ), { 'uid': g.user['user_id'], 'pid': part_row['id_part'], 'price': price, 'stock': stock, 'loc': location }) imported += 1 except Exception as e: errors += 1 if len(error_samples) < 10: error_samples.append({'row': i + 2, 'error': str(e)}) # 5. Update upload record session.execute(text( """UPDATE inventory_uploads SET status = 'completed', rows_total = :total, rows_imported = :imported, rows_errors = :errors, error_log = :elog, completed_at = NOW() WHERE id_upload = :uid""" ), { 'total': len(rows_data), 'imported': imported, 'errors': errors, 'elog': json_module.dumps(error_samples) if error_samples else None, 'uid': upload_id }) session.commit() return jsonify({ 'message': 'Upload processed', 'upload_id': upload_id, 'imported': imported, 'errors': errors, 'error_samples': error_samples }) except Exception as e: session.rollback() return jsonify({'error': str(e)}), 500 finally: session.close() @app.route('/api/inventory/uploads', methods=['GET']) @require_auth('BODEGA', 'ADMIN') def inventory_list_uploads(): """Return last 50 uploads for current user.""" session = Session() try: rows = session.execute(text( """SELECT id_upload, filename, status, rows_total, rows_imported, rows_errors, created_at, completed_at FROM inventory_uploads WHERE user_id = :uid ORDER BY created_at DESC LIMIT 50""" ), {'uid': g.user['user_id']}).mappings().all() uploads = [] for r in rows: uploads.append({ 'id': r['id_upload'], 'filename': r['filename'], 'status': r['status'], 'rows_total': r['rows_total'], 'rows_imported': r['rows_imported'], 'rows_errors': r['rows_errors'], 'created_at': r['created_at'].isoformat() if r['created_at'] else None, 'completed_at': r['completed_at'].isoformat() if r['completed_at'] else None }) return jsonify(uploads) finally: session.close() @app.route('/api/inventory/items', methods=['GET']) @require_auth('BODEGA', 'ADMIN') def inventory_list_items(): """Paginated list of warehouse_inventory for current user.""" page = max(1, request.args.get('page', 1, type=int)) per_page = min(200, max(1, request.args.get('per_page', 50, type=int))) q = request.args.get('q', '').strip() session = Session() try: params = {'uid': g.user['user_id'], 'offset': (page - 1) * per_page, 'limit': per_page} where_clause = "WHERE wi.user_id = :uid" if q: where_clause += " AND (p.oem_part_number ILIKE :q OR p.name_part ILIKE :q)" params['q'] = f'%{q}%' count_row = session.execute(text( f"""SELECT COUNT(*) AS cnt FROM warehouse_inventory wi JOIN parts p ON p.id_part = wi.part_id {where_clause}""" ), params).mappings().first() total = count_row['cnt'] rows = session.execute(text( f"""SELECT wi.id_inventory, wi.part_id, p.oem_part_number, p.name_part, wi.price, wi.stock_quantity, wi.warehouse_location, wi.updated_at FROM warehouse_inventory wi JOIN parts p ON p.id_part = wi.part_id {where_clause} ORDER BY wi.updated_at DESC LIMIT :limit OFFSET :offset""" ), params).mappings().all() data = [] for r in rows: data.append({ 'id': r['id_inventory'], 'part_id': r['part_id'], 'oem_part_number': r['oem_part_number'], 'name': r['name_part'], 'price': float(r['price']) if r['price'] else None, 'stock': r['stock_quantity'], 'location': r['warehouse_location'], 'updated_at': r['updated_at'].isoformat() if r['updated_at'] else None }) total_pages = math.ceil(total / per_page) if total > 0 else 1 return jsonify({ 'data': data, 'pagination': { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages } }) finally: session.close() @app.route('/api/inventory/items', methods=['DELETE']) @require_auth('BODEGA', 'ADMIN') def inventory_delete_all(): """Delete all warehouse_inventory for current user.""" session = Session() try: result = session.execute(text( "DELETE FROM warehouse_inventory WHERE user_id = :uid" ), {'uid': g.user['user_id']}) session.commit() return jsonify({'message': 'Inventory cleared', 'deleted': result.rowcount}) finally: session.close() # ============================================================================ # Task 7: Part Availability & Aftermarket # ============================================================================ @app.route('/api/parts//availability', methods=['GET']) @require_auth('TALLER', 'ADMIN', 'OWNER') def part_availability(part_id): """Return all bodegas that have this part in stock.""" session = Session() try: rows = session.execute(text( """SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location, wi.updated_at FROM warehouse_inventory wi JOIN users u ON u.id_user = wi.user_id WHERE wi.part_id = :pid AND wi.stock_quantity > 0 AND u.is_active = true ORDER BY wi.price ASC""" ), {'pid': part_id}).mappings().all() data = [] for r in rows: data.append({ 'bodega': r['business_name'], 'price': float(r['price']) if r['price'] else None, 'stock': r['stock_quantity'], 'location': r['warehouse_location'], 'updated_at': r['updated_at'].isoformat() if r['updated_at'] else None }) return jsonify(data) finally: session.close() @app.route('/api/parts//aftermarket', methods=['GET']) def part_aftermarket(part_id): """Return aftermarket alternatives and cross-references for a part (public).""" session = Session() try: # Aftermarket alternatives rows = session.execute(text( """SELECT ap.id_aftermarket_parts, ap.part_number, ap.name_aftermarket_parts, m.name_manufacture, qt.name_quality, ap.price_usd FROM aftermarket_parts ap JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id LEFT JOIN quality_tier qt ON qt.id_quality_tier = ap.id_quality_tier WHERE ap.oem_part_id = :pid ORDER BY ap.price_usd ASC NULLS LAST""" ), {'pid': part_id}).mappings().all() alternatives = [] for r in rows: alternatives.append({ 'id': r['id_aftermarket_parts'], 'part_number': r['part_number'], 'name': r['name_aftermarket_parts'], 'manufacturer': r['name_manufacture'], 'quality_tier': r['name_quality'], 'price': float(r['price_usd']) if r['price_usd'] else None, 'source': 'aftermarket' }) # Cross-references xrefs = session.execute(text( """SELECT pcr.cross_reference_number, pcr.source_ref, pcr.notes FROM part_cross_references pcr WHERE pcr.part_id = :pid ORDER BY pcr.cross_reference_number""" ), {'pid': part_id}).mappings().all() cross_refs = [] for x in xrefs: cross_refs.append({ 'cross_reference_number': x['cross_reference_number'], 'source': x['source_ref'], 'notes': x['notes'] }) return jsonify({ 'data': alternatives, 'cross_references': cross_refs }) finally: session.close() # ============================================================================ # Main Block # ============================================================================ if __name__ == '__main__': print("Starting Nexus Autoparts Dashboard Server...") print("Visit http://localhost:5000 to access the dashboard locally") app.run(debug=False, host='0.0.0.0', port=5000)