# /home/Autopartes/pos/blueprints/catalog_bp.py """Catalog blueprint: TecDoc vehicle navigation with local stock enrichment. Endpoints (all under /pos/api/catalog): GET /brands — vehicle brands with parts GET /models?brand_id= — models for a brand GET /years?model_id= — years for a model GET /engines?model_id=&year_id= — engines for model+year GET /categories?mye_id= — part categories for vehicle GET /groups?mye_id=&category_id= — part subcategories for vehicle+category GET /parts?mye_id=&group_id= — parts with local stock enrichment GET /part/ — full part detail (stock + bodegas + alternatives) GET /search?q= — smart search (part number or text) """ from flask import Blueprint, request, jsonify, g from middleware import require_auth from tenant_db import get_master_conn, get_tenant_conn from services import catalog_service from services.vin_decoder import decode_vin from services.plate_lookup import search_plate, register_plate, is_valid_mexican_plate, normalize_plate from config import CATALOG_OEM_ENABLED catalog_bp = Blueprint('catalog', __name__, url_prefix='/pos/api/catalog') def _oem_blocked(): """Return a 403 response if OEM catalog is disabled.""" if not CATALOG_OEM_ENABLED: return jsonify({ 'error': 'Catálogo OEM no disponible', 'message': 'El catálogo OEM está en construcción. Por favor usa el modo Local o Shop Supplies.', 'oem_disabled': True, }), 403 return None def _get_allowed_brands(tenant_conn): """Read allowed part brands from tenant_config. Returns list or None.""" import json cur = tenant_conn.cursor() try: cur.execute("SELECT value FROM tenant_config WHERE key = 'allowed_part_brands'") row = cur.fetchone() if row and row[0]: try: brands = json.loads(row[0]) if isinstance(brands, list) and brands: return brands except (json.JSONDecodeError, ValueError): pass finally: cur.close() return None def _with_conns(fn): """Helper: open master + tenant connections, call fn, close both. fn receives (master_conn, tenant_conn, branch_id). """ master = None tenant = None try: master = get_master_conn() tenant = get_tenant_conn(g.tenant_id) branch_id = request.args.get('branch_id', g.branch_id) return fn(master, tenant, branch_id) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if master: try: master.close() except: pass if tenant: try: tenant.close() except: pass def _master_only(fn): """Helper: open only master connection for hierarchy endpoints.""" master = None try: master = get_master_conn() return fn(master) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if master: try: master.close() except: pass def _filter_parts_by_allowed_brands(master_conn, parts_data, allowed_brands): """Filter a list of part dicts to only include those with aftermarket equivalents from allowed brands. parts_data items must have 'id_part' or 'id' key.""" if not allowed_brands or not parts_data: return parts_data part_ids = [] for p in parts_data: pid = p.get('id_part') or p.get('id') # Skip local inventory IDs (strings like 'inv:3') — aftermarket filter # only applies to catalog parts with integer OEM part IDs. if pid is not None and isinstance(pid, int): part_ids.append(pid) if not part_ids: return parts_data cur = master_conn.cursor() try: cur.execute(""" SELECT DISTINCT ap.oem_part_id FROM aftermarket_parts ap JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id WHERE ap.oem_part_id = ANY(%s) AND UPPER(m.name_manufacture) = ANY(%s) """, (part_ids, allowed_brands)) allowed_ids = {r[0] for r in cur.fetchall()} finally: cur.close() return [p for p in parts_data if (p.get('id_part') or p.get('id')) in allowed_ids] # ─── Hierarchy navigation (master DB only) ─── @catalog_bp.route('/brands', methods=['GET']) @require_auth('catalog.view') def brands(): from services.catalog_modes import normalize_mode year_id = request.args.get('year_id', type=int) mode = normalize_mode(request.args.get('mode')) def _do(master, tenant, branch_id): mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None data = catalog_service.get_brands(master, year_id=year_id, mode=mode, mye_ids=mye_ids) return jsonify({'data': data, 'mode': mode}) return _with_conns(_do) @catalog_bp.route('/models', methods=['GET']) @require_auth('catalog.view') def models(): brand_id = request.args.get('brand_id', type=int) year_id = request.args.get('year_id', type=int) if not brand_id: return jsonify({'error': 'brand_id required'}), 400 def _do(master, tenant, branch_id): mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None data = catalog_service.get_models(master, brand_id, year_id=year_id, mye_ids=mye_ids) return jsonify({'data': data}) return _with_conns(_do) @catalog_bp.route('/years', methods=['GET']) @require_auth('catalog.view') def years(): model_id = request.args.get('model_id', type=int) if not model_id: return jsonify({'error': 'model_id required'}), 400 def _do(master, tenant, branch_id): mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None data = catalog_service.get_years(master, model_id, mye_ids=mye_ids) return jsonify({'data': data}) return _with_conns(_do) @catalog_bp.route('/years-all', methods=['GET']) @require_auth('catalog.view') def years_all(): """Get all available years (for vehicle selector dropdown).""" def _do(master): cur = master.cursor() cur.execute("SELECT DISTINCT id_year, year_car FROM years ORDER BY year_car DESC") rows = cur.fetchall() cur.close() return jsonify({'data': [{'id_year': r[0], 'year_car': r[1]} for r in rows]}) return _master_only(_do) @catalog_bp.route('/engines', methods=['GET']) @require_auth('catalog.view') def engines(): model_id = request.args.get('model_id', type=int) year_id = request.args.get('year_id', type=int) if not model_id or not year_id: return jsonify({'error': 'model_id and year_id required'}), 400 def _do(master, tenant, branch_id): mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None data = catalog_service.get_engines(master, model_id, year_id, mye_ids=mye_ids) return jsonify({'data': data}) return _with_conns(_do) @catalog_bp.route('/categories', methods=['GET']) @require_auth('catalog.view') def categories(): """Categories for a vehicle. OEM mode: TecDoc part_categories (id_part_category, name). Local mode: 14 Nexpart top-level groups, filtered by what's available for this vehicle. Returns 'slug' (string) instead of integer id. """ from services.catalog_modes import normalize_mode mye_id = request.args.get('mye_id', type=int) mode = normalize_mode(request.args.get('mode')) if not mye_id: return jsonify({'error': 'mye_id required'}), 400 def _do(master, tenant, branch_id): allowed_brands = _get_allowed_brands(tenant) if tenant else None if mode == 'local': data = catalog_service.get_nexpart_groups_for_vehicle(master, mye_id, tenant) else: data = catalog_service.get_categories(master, mye_id, allowed_brands) return jsonify({'data': data, 'mode': mode, 'allowed_brands': allowed_brands or []}) return _with_conns(_do) @catalog_bp.route('/groups', methods=['GET']) @require_auth('catalog.view') def groups(): """Subgroups for a vehicle + parent category. OEM mode: TecDoc part_groups within a TecDoc part_category (integer ids). Local mode: Nexpart subgroups within a Nexpart group (string slugs). """ from services.catalog_modes import normalize_mode mye_id = request.args.get('mye_id', type=int) category_id = request.args.get('category_id', type=int) category_slug = request.args.get('category_slug') mode = normalize_mode(request.args.get('mode')) if not mye_id: return jsonify({'error': 'mye_id required'}), 400 def _do(master, tenant, branch_id): if mode == 'local': if not category_slug: return jsonify({'error': 'category_slug required for local mode'}), 400 data = catalog_service.get_nexpart_subgroups_for_vehicle(master, mye_id, category_slug, tenant) else: if not category_id: return jsonify({'error': 'category_id required for oem mode'}), 400 data = catalog_service.get_groups(master, mye_id, category_id) return jsonify({'data': data, 'mode': mode}) return _with_conns(_do) # ─── Parts with stock enrichment (master + tenant) ─── @catalog_bp.route('/part-types', methods=['GET']) @require_auth('catalog.view') def part_types(): """Distinct part types (3rd subcategory level) for a vehicle + group/subgroup. OEM mode: distinct name_part values within a TecDoc part_group_id. Local mode: Nexpart Part Types within a Nexpart group + subgroup. """ from services.catalog_modes import normalize_mode mye_id = request.args.get('mye_id', type=int) group_id = request.args.get('group_id', type=int) group_slug = request.args.get('group_slug') # parent Nexpart group subgroup_slug = request.args.get('subgroup_slug') # current Nexpart subgroup mode = normalize_mode(request.args.get('mode')) if not mye_id: return jsonify({'error': 'mye_id required'}), 400 def _do(master, tenant, branch_id): if mode == 'local': if not group_slug or not subgroup_slug: return jsonify({'error': 'group_slug and subgroup_slug required for local mode'}), 400 data = catalog_service.get_nexpart_part_types_for_vehicle( master, mye_id, group_slug, subgroup_slug, tenant ) else: if not group_id: return jsonify({'error': 'group_id required for oem mode'}), 400 data = catalog_service.get_part_types(master, mye_id, group_id) return jsonify({'data': data, 'mode': mode}) return _with_conns(_do) @catalog_bp.route('/shop-supplies/groups', methods=['GET']) @require_auth('catalog.view') def shop_supplies_groups(): """Vehicle-independent groups (Chemicals + Tires/Tools).""" def _do(master): data = catalog_service.get_shop_supplies_groups() return jsonify({'data': data}) return _master_only(_do) @catalog_bp.route('/shop-supplies/subgroups', methods=['GET']) @require_auth('catalog.view') def shop_supplies_subgroups(): group_slug = request.args.get('group_slug') if not group_slug: return jsonify({'error': 'group_slug required'}), 400 def _do(master): data = catalog_service.get_shop_supplies_subgroups(master, group_slug) return jsonify({'data': data}) return _master_only(_do) @catalog_bp.route('/shop-supplies/part-types', methods=['GET']) @require_auth('catalog.view') def shop_supplies_part_types(): group_slug = request.args.get('group_slug') subgroup_slug = request.args.get('subgroup_slug') if not group_slug or not subgroup_slug: return jsonify({'error': 'group_slug and subgroup_slug required'}), 400 def _do(master): data = catalog_service.get_shop_supplies_part_types(master, group_slug, subgroup_slug) return jsonify({'data': data}) return _master_only(_do) @catalog_bp.route('/shop-supplies/parts', methods=['GET']) @require_auth('catalog.view') def shop_supplies_parts(): group_slug = request.args.get('group_slug') subgroup_slug = request.args.get('subgroup_slug') part_type_slug = request.args.get('part_type_slug') page = max(1, request.args.get('page', 1, type=int) or 1) per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100)) if not group_slug or not subgroup_slug or not part_type_slug: return jsonify({'error': 'group_slug, subgroup_slug, part_type_slug required'}), 400 def _do(master, tenant, branch_id): result = catalog_service.get_shop_supplies_parts( master, group_slug, subgroup_slug, part_type_slug, tenant, branch_id, page, per_page, ) return jsonify(result) return _with_conns(_do) @catalog_bp.route('/parts', methods=['GET']) @require_auth('catalog.view') def parts(): """Parts list for the deepest navigation level. Three call shapes (the endpoint chooses based on which params are present): A) OEM mode legacy: ?mode=oem&mye_id=&group_id=&part_type=... B) Local mode legacy (TecDoc-style): ?mode=local&mye_id=&group_id=&part_type=... C) Local mode Nexpart navigation (NEW): ?mode=local&mye_id=&nexpart_group=&nexpart_subgroup=&nexpart_part_type= """ from services.catalog_modes import normalize_mode mye_id = request.args.get('mye_id', type=int) group_id = request.args.get('group_id', type=int) part_type = request.args.get('part_type') # optional 3rd-level (legacy) # Nexpart navigation slugs (Local mode only) nexpart_group = request.args.get('nexpart_group') nexpart_subgroup = request.args.get('nexpart_subgroup') nexpart_part_type = request.args.get('nexpart_part_type') page = max(1, request.args.get('page', 1, type=int) or 1) per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100)) mode = normalize_mode(request.args.get('mode')) if not mye_id: return jsonify({'error': 'mye_id required'}), 400 use_nexpart_nav = mode == 'local' and nexpart_group and nexpart_subgroup and nexpart_part_type if not use_nexpart_nav and not group_id: return jsonify({'error': 'group_id (or nexpart_group + subgroup + part_type) required'}), 400 # Block OEM catalog if not enabled if mode != 'local' and not use_nexpart_nav: blocked = _oem_blocked() if blocked: return blocked def _do(master, tenant, branch_id): allowed_brands = _get_allowed_brands(tenant) if tenant else None # For local mode with allowed_brands, fetch everything first so filtering # happens before pagination. OEM mode keeps post-filter for now. fetch_all_for_filter = bool(allowed_brands) and (mode == 'local' or use_nexpart_nav) _page = 1 if fetch_all_for_filter else page _per_page = 9999 if fetch_all_for_filter else per_page if use_nexpart_nav: result = catalog_service.get_parts_for_nexpart_triple( master, mye_id, nexpart_group, nexpart_subgroup, nexpart_part_type, tenant, branch_id, _page, _per_page, tenant_id=g.tenant_id, ) elif mode == 'local': result = catalog_service.get_parts_local( master, mye_id, group_id, tenant, branch_id, _page, _per_page, part_type=part_type, ) else: result = catalog_service.get_parts( master, mye_id, group_id, tenant, branch_id, page, per_page, part_type=part_type, ) if allowed_brands: result['data'] = _filter_parts_by_allowed_brands(master, result.get('data', []), allowed_brands) if fetch_all_for_filter: total = len(result['data']) offset = (page - 1) * per_page result['data'] = result['data'][offset:offset + per_page] result['pagination'] = catalog_service._pagination(page, per_page, total) result['allowed_brands'] = allowed_brands or [] return jsonify(result) return _with_conns(_do) @catalog_bp.route('/part/', methods=['GET']) @require_auth('catalog.view') def part_detail(part_id): # Part detail is available in both local and OEM modes # — it reads from the master parts DB and enriches with local stock. def _do(master, tenant, branch_id): result = catalog_service.get_part_detail(master, part_id, tenant, branch_id) if not result: return jsonify({'error': 'Part not found'}), 404 return jsonify(result) return _with_conns(_do) @catalog_bp.route('/search', methods=['GET']) @require_auth('catalog.view') def search(): # Search is available in both local and OEM modes # — it reads from the master parts DB and enriches with local stock. q = request.args.get('q', '').strip() if not q or len(q) < 2: return jsonify({'data': []}) limit = request.args.get('limit', 50, type=int) mye_id = request.args.get('mye_id', type=int) def _do(master, tenant, branch_id): allowed_brands = _get_allowed_brands(tenant) if tenant else None data = catalog_service.smart_search(master, q, tenant, branch_id, limit, mye_id, tenant_id=g.tenant_id) if allowed_brands: data = _filter_parts_by_allowed_brands(master, data, allowed_brands) return jsonify({'data': data, 'allowed_brands': allowed_brands or []}) return _with_conns(_do) # ─── VIN Decoder ─── @catalog_bp.route('/vin/', methods=['GET']) @require_auth('catalog.view') def decode_vin_route(vin): """Decode a VIN and try to match to a brand/model/year in our catalog DB.""" vin = (vin or "").strip().upper() if len(vin) != 17: return jsonify({'error': 'VIN debe tener exactamente 17 caracteres.'}), 400 try: info = decode_vin(vin) except Exception as e: return jsonify({'error': f'Error al decodificar VIN: {str(e)}'}), 502 if info.get('error'): return jsonify(info), 200 # Return info even with partial errors # Try to match the decoded vehicle to our catalog DB db_match = None master = None try: master = get_master_conn() db_match = _match_vin_to_catalog(master, info) except Exception: pass finally: if master: try: master.close() except Exception: pass result = {**info} if db_match: result['catalog_match'] = db_match return jsonify(result) # ─── Plate Lookup ─── @catalog_bp.route('/plate/', methods=['GET']) @require_auth('catalog.view') def plate_lookup(plate): """Look up a vehicle by Mexican license plate in the local plate_vehicles table. If found, also tries to match the vehicle to the catalog DB. """ plate = (plate or '').strip() if not plate: return jsonify({'error': 'Placa requerida.'}), 400 if not is_valid_mexican_plate(plate): return jsonify({'error': 'Formato de placa no valido. Ej: ABC-1234 o AB-123-C'}), 400 tenant = None master = None try: tenant = get_tenant_conn(g.tenant_id) result = search_plate(tenant, plate) if not result: return jsonify({ 'found': False, 'plate': normalize_plate(plate), 'message': 'Placa no registrada.' }) # Try to match to catalog catalog_match = None try: master = get_master_conn() catalog_match = _match_plate_to_catalog(master, result) except Exception: pass finally: if master: try: master.close() except: pass master = None response = { 'found': True, 'plate': result['plate'], 'make': result['make'], 'model': result['model'], 'year': result['year'], 'vin': result['vin'], 'customer_id': result['customer_id'], } if catalog_match: response['catalog_match'] = catalog_match return jsonify(response) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if tenant: try: tenant.close() except: pass if master: try: master.close() except: pass @catalog_bp.route('/plate', methods=['POST']) @require_auth('catalog.view') def plate_register(): """Register or update a plate-to-vehicle mapping.""" data = request.get_json() or {} plate = (data.get('plate') or '').strip() if not plate: return jsonify({'error': 'plate required'}), 400 if not is_valid_mexican_plate(plate): return jsonify({'error': 'Formato de placa no valido.'}), 400 tenant = None try: tenant = get_tenant_conn(g.tenant_id) rec_id = register_plate( tenant, plate, make=data.get('make'), model=data.get('model'), year=data.get('year'), vin=data.get('vin'), customer_id=data.get('customer_id'), ) return jsonify({'id': rec_id, 'message': 'Placa registrada.'}) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if tenant: try: tenant.close() except: pass def _match_plate_to_catalog(master_conn, plate_info): """Try to match plate vehicle info to the catalog DB (same logic as VIN).""" return _match_vin_to_catalog(master_conn, { 'make': plate_info.get('make'), 'model': plate_info.get('model'), 'year': plate_info.get('year'), }) def _match_vin_to_catalog(master_conn, vin_info): """Try to find brand_id, model_id, year_id, mye_id from decoded VIN info.""" make = (vin_info.get('make') or '').upper().strip() model = (vin_info.get('model') or '').strip() year = vin_info.get('year') if not make: return None cur = master_conn.cursor() result = {} try: # Find brand (try exact, then LIKE) cur.execute( "SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) = %s", (make,) ) brand_row = cur.fetchone() if not brand_row: cur.execute( "SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) LIKE %s ORDER BY name_brand LIMIT 1", (f"%{make}%",) ) brand_row = cur.fetchone() if not brand_row: return None result['brand_id'] = brand_row[0] result['brand_name'] = brand_row[1] # Find model if model: cur.execute( """SELECT m.id_model, m.name_model FROM models m WHERE m.brand_id = %s AND UPPER(m.name_model) LIKE %s ORDER BY m.name_model LIMIT 5""", (brand_row[0], f"%{model.upper()}%") ) model_row = cur.fetchone() if model_row: result['model_id'] = model_row[0] result['model_name'] = model_row[1] # Find year if year: cur.execute( "SELECT id_year, year_car FROM years WHERE year_car = %s", (int(year),) ) year_row = cur.fetchone() if year_row: result['year_id'] = year_row[0] result['year_car'] = year_row[1] # Find MYE options cur.execute( """SELECT mye.id_mye, e.name_engine, mye.trim_level FROM model_year_engine mye JOIN engines e ON e.id_engine = mye.engine_id WHERE mye.model_id = %s AND mye.year_id = %s ORDER BY e.name_engine LIMIT 10""", (model_row[0], year_row[0]) ) mye_rows = cur.fetchall() if mye_rows: result['engines'] = [ {'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2]} for r in mye_rows ] # Auto-select if only one engine if len(mye_rows) == 1: result['id_mye'] = mye_rows[0][0] return result except Exception: return None finally: cur.close() # ─── Brand Catalog (vehicle-brand-first navigation) ─── @catalog_bp.route('/vehicle-brands', methods=['GET']) @require_auth('catalog.view') def vehicle_brands(): """Return North American vehicle brands for brand-first catalog browsing. Uses the same OEM_BRANDS_NA filter as the regular catalog so that the brand list is consistent across both navigation modes. """ from services.catalog_modes import get_brands_for_mode allowed = list(get_brands_for_mode('oem')) def _query(master): cur = master.cursor() try: cur.execute(""" SELECT id_brand, name_brand FROM brands WHERE name_brand = ANY(%s) ORDER BY name_brand ASC """, (allowed,)) rows = cur.fetchall() return jsonify({ 'brands': [ {'id': r[0], 'name': r[1], 'part_count': 0} for r in rows ] }) finally: cur.close() return _master_only(_query) @catalog_bp.route('/brand-categories', methods=['GET']) @require_auth('catalog.view') def brand_categories(): """Return part categories available for a given vehicle brand.""" brand = request.args.get('brand', '') if not brand: return jsonify({'error': 'brand parameter required'}), 400 def _query(master, tenant, branch_id): cur = master.cursor() try: allowed_brands = _get_allowed_brands(tenant) if tenant else None brand_filter = "" params = [brand] if allowed_brands: brand_filter = """AND EXISTS ( SELECT 1 FROM aftermarket_parts ap2 JOIN manufacturers m2 ON m2.id_manufacture = ap2.manufacturer_id WHERE ap2.oem_part_id = p.id_part AND UPPER(m2.name_manufacture) = ANY(%s) )""" params.append(allowed_brands) cur.execute(f""" SELECT pc.id_part_category, COALESCE(NULLIF(pc.name_es, ''), pc.name_part_category) as name, pc.slug, COUNT(DISTINCT p.id_part) as part_count FROM part_vehicle_preview pvp JOIN parts p ON p.id_part = pvp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE pvp.name_brand = %s {brand_filter} GROUP BY pc.id_part_category, pc.name_part_category, pc.name_es, pc.slug ORDER BY part_count DESC """, params) rows = cur.fetchall() return jsonify({ 'brand': brand, 'categories': [ {'id': r[0], 'name': r[1], 'slug': r[2], 'part_count': r[3]} for r in rows ], 'allowed_brands': allowed_brands or [] }) finally: cur.close() return _with_conns(_query) @catalog_bp.route('/brand-parts', methods=['GET']) @require_auth('catalog.view') def brand_parts(): """Return parts for a given vehicle brand + category, optionally filtered by search term.""" brand = request.args.get('brand', '') category_id = request.args.get('category_id', type=int) search = request.args.get('search', '').strip() limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not brand: return jsonify({'error': 'brand parameter required'}), 400 def _query(master, tenant, branch_id): cur = master.cursor() try: allowed_brands = _get_allowed_brands(tenant) if tenant else None cat_filter = "" search_filter = "" params = [brand] if category_id: cat_filter = "AND pc.id_part_category = %s" params.append(category_id) # --- Brand-filtered mode: return aftermarket parts directly --- if allowed_brands: am_search = "" am_params = list(params) if search: am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)" like_term = f"%{search}%" am_params.extend([like_term, like_term]) query_params = list(am_params) cur.execute(f""" SELECT DISTINCT ap.id_aftermarket_parts, ap.part_number, COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name, m.name_manufacture, ap.price_usd, p.id_part, pg.id_part_group, pg.name_part_group, pc.id_part_category, pc.name_part_category FROM part_vehicle_preview pvp JOIN parts p ON p.id_part = pvp.part_id JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE pvp.name_brand = %s {cat_filter} {am_search} AND UPPER(m.name_manufacture) = ANY(%s) ORDER BY m.name_manufacture, ap.part_number LIMIT %s OFFSET %s """, query_params + [allowed_brands, limit, offset]) part_rows = cur.fetchall() oem_ids = [r[5] for r in part_rows] count_params = list(am_params) cur.execute(f""" SELECT COUNT(DISTINCT ap.id_aftermarket_parts) FROM part_vehicle_preview pvp JOIN parts p ON p.id_part = pvp.part_id JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE pvp.name_brand = %s {cat_filter} {am_search} AND UPPER(m.name_manufacture) = ANY(%s) """, count_params + [allowed_brands]) total = cur.fetchone()[0] local_stock = {} if tenant and oem_ids: try: from services.catalog_service import _get_local_stock_bulk local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, []) except Exception: pass items = [] for r in part_rows: oem_id = r[5] stock_info = local_stock.get(oem_id, {}) items.append({ 'id': r[0], 'oem_part_number': r[1], 'name': r[2], 'manufacturer': r[3], 'price_usd': float(r[4]) if r[4] is not None else None, 'oem_id': oem_id, 'group': {'id': r[6], 'name': r[7]}, 'category': {'id': r[8], 'name': r[9]}, 'local_stock': stock_info.get('stock', 0), 'local_price': stock_info.get('price', None), }) return jsonify({ 'brand': brand, 'category_id': category_id, 'search': search, 'items': items, 'total': total, 'limit': limit, 'offset': offset, 'allowed_brands': allowed_brands }) # --- Normal mode: return OEM parts --- if search: search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)" like_term = f"%{search}%" params.extend([like_term, like_term]) query_params = list(params) cur.execute(f""" SELECT DISTINCT p.id_part, p.oem_part_number, COALESCE(NULLIF(p.name_es, ''), p.name_part) as name, pg.id_part_group, pg.name_part_group, pc.id_part_category, pc.name_part_category FROM part_vehicle_preview pvp JOIN parts p ON p.id_part = pvp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE pvp.name_brand = %s {cat_filter} {search_filter} ORDER BY p.id_part LIMIT %s OFFSET %s """, query_params + [limit, offset]) part_rows = cur.fetchall() part_ids = [r[0] for r in part_rows] count_params = list(params) cur.execute(f""" SELECT COUNT(DISTINCT p.id_part) FROM part_vehicle_preview pvp JOIN parts p ON p.id_part = pvp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE pvp.name_brand = %s {cat_filter} {search_filter} """, count_params) total = cur.fetchone()[0] local_stock = {} if tenant and part_ids: try: from services.catalog_service import _get_local_stock_bulk local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids) except Exception: pass items = [] for r in part_rows: part_id = r[0] stock_info = local_stock.get(part_id, {}) items.append({ 'id': part_id, 'oem_part_number': r[1], 'name': r[2], 'group': {'id': r[3], 'name': r[4]}, 'category': {'id': r[5], 'name': r[6]}, 'local_stock': stock_info.get('stock', 0), 'local_price': stock_info.get('price', None), }) return jsonify({ 'brand': brand, 'category_id': category_id, 'search': search, 'items': items, 'total': total, 'limit': limit, 'offset': offset, 'allowed_brands': [] }) finally: cur.close() return _with_conns(_query) @catalog_bp.route('/mye-parts', methods=['GET']) @require_auth('catalog.view') def mye_parts(): """Return parts for a specific MYE + category (brand-catalog flow). Skips the group/subgroup level and goes directly from category to parts. """ mye_id = request.args.get('mye_id', type=int) category_id = request.args.get('category_id', type=int) search = request.args.get('search', '').strip() limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) if not mye_id: return jsonify({'error': 'mye_id required'}), 400 def _query(master, tenant, branch_id): cur = master.cursor() try: allowed_brands = _get_allowed_brands(tenant) if tenant else None cat_filter = "" search_filter = "" params = [mye_id] if category_id: cat_filter = "AND pc.id_part_category = %s" params.append(category_id) # --- Brand-filtered mode: return aftermarket parts directly --- if allowed_brands: am_search = "" am_params = list(params) if search: am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)" like_term = f"%{search}%" am_params.extend([like_term, like_term]) # Get aftermarket parts query_params = list(am_params) cur.execute(f""" SELECT DISTINCT ap.id_aftermarket_parts, ap.part_number, COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name, m.name_manufacture, ap.price_usd, p.id_part, pg.id_part_group, pg.name_part_group, pc.id_part_category, pc.name_part_category FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE vp.model_year_engine_id = %s {cat_filter} {am_search} AND UPPER(m.name_manufacture) = ANY(%s) ORDER BY m.name_manufacture, ap.part_number LIMIT %s OFFSET %s """, query_params + [allowed_brands, limit, offset]) part_rows = cur.fetchall() oem_ids = [r[5] for r in part_rows] # Count total count_params = list(am_params) cur.execute(f""" SELECT COUNT(DISTINCT ap.id_aftermarket_parts) FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE vp.model_year_engine_id = %s {cat_filter} {am_search} AND UPPER(m.name_manufacture) = ANY(%s) """, count_params + [allowed_brands]) total = cur.fetchone()[0] # Local stock keyed by OEM part id local_stock = {} if tenant and oem_ids: try: from services.catalog_service import _get_local_stock_bulk local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, []) except Exception: pass items = [] for r in part_rows: oem_id = r[5] stock_info = local_stock.get(oem_id, {}) items.append({ 'id': r[0], 'oem_part_number': r[1], 'name': r[2], 'manufacturer': r[3], 'price_usd': float(r[4]) if r[4] is not None else None, 'oem_id': oem_id, 'group': {'id': r[6], 'name': r[7]}, 'category': {'id': r[8], 'name': r[9]}, 'local_stock': stock_info.get('stock', 0), 'local_price': stock_info.get('price', None), }) return jsonify({ 'mye_id': mye_id, 'category_id': category_id, 'search': search, 'items': items, 'total': total, 'limit': limit, 'offset': offset, 'allowed_brands': allowed_brands }) # --- Normal mode: return OEM parts --- if search: search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)" like_term = f"%{search}%" params.extend([like_term, like_term]) query_params = list(params) cur.execute(f""" SELECT DISTINCT p.id_part, p.oem_part_number, COALESCE(NULLIF(p.name_es, ''), p.name_part) as name, pg.id_part_group, pg.name_part_group, pc.id_part_category, pc.name_part_category FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE vp.model_year_engine_id = %s {cat_filter} {search_filter} ORDER BY p.id_part LIMIT %s OFFSET %s """, query_params + [limit, offset]) part_rows = cur.fetchall() part_ids = [r[0] for r in part_rows] count_params = list(params) cur.execute(f""" SELECT COUNT(DISTINCT p.id_part) FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE vp.model_year_engine_id = %s {cat_filter} {search_filter} """, count_params) total = cur.fetchone()[0] local_stock = {} if tenant and part_ids: try: from services.catalog_service import _get_local_stock_bulk local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids) except Exception: pass items = [] for r in part_rows: part_id = r[0] stock_info = local_stock.get(part_id, {}) items.append({ 'id': part_id, 'oem_part_number': r[1], 'name': r[2], 'group': {'id': r[3], 'name': r[4]}, 'category': {'id': r[5], 'name': r[6]}, 'local_stock': stock_info.get('stock', 0), 'local_price': stock_info.get('price', None), }) return jsonify({ 'mye_id': mye_id, 'category_id': category_id, 'search': search, 'items': items, 'total': total, 'limit': limit, 'offset': offset, 'allowed_brands': [] }) finally: cur.close() return _with_conns(_query)