Files
Autoparts-DB/pos/blueprints/catalog_bp.py
consultoria-as 2b73c2c6db feat: Fase 1-3 completas - precios proveedor, multi-sucursal, factura global
Fase 1: Lista de precios de proveedor
- Tabla supplier_catalog_prices en master DB
- Endpoints GET/POST/PUT/DELETE /supplier-catalog/prices
- Upload CSV/Excel de precios de proveedor
- Visualizacion de supplier_price en catalogo y POS

Fase 2: Multi-sucursal completo
- Migracion v4.0: inventory.branch_id=NULL, tabla inventory_stock
- Campos fiscales en branches (RFC, regimen, CP, serie CFDI, certificados)
- Trigger trg_update_inventory_stock para sincronizar stock por sucursal
- Backend config_bp.py con CRUD de sucursales fiscales
- Backend inventory_bp.py y pos_bp.py refactorizados para inventario compartido
- Backend invoicing_bp.py usa datos fiscales de la sucursal de la venta
- Frontend config.html/js con modal de sucursales expandido

Fase 3: Factura global mensual
- Migracion v4.1: tablas global_invoice_sales, sales.global_invoiced_at
- build_global_invoice_xml() con InformacionGlobal SAT-compliant
- Servicio global_invoice.py para agrupar ventas PUE <=000
- Endpoints POST/GET /global-invoice y /global-invoice/eligible-sales
- Frontend invoicing.html/js con boton y modal de factura global
2026-06-11 08:59:56 +00:00

1133 lines
44 KiB
Python

# /home/Autopartes/pos/blueprints/catalog_bp.py
"""Catalog blueprint: TecDoc vehicle navigation with local stock enrichment.
Endpoints (all under /pos/api/catalog):
GET /brands — vehicle brands with parts
GET /models?brand_id= — models for a brand
GET /years?model_id= — years for a model
GET /engines?model_id=&year_id= — engines for model+year
GET /categories?mye_id= — part categories for vehicle
GET /groups?mye_id=&category_id= — part subcategories for vehicle+category
GET /parts?mye_id=&group_id= — parts with local stock enrichment
GET /part/<part_id> — full part detail (stock + bodegas + alternatives)
GET /search?q= — smart search (part number or text)
"""
from flask import Blueprint, request, jsonify, g
from middleware import require_auth
from tenant_db import get_master_conn, get_tenant_conn
from services import catalog_service
from services.vin_decoder import decode_vin
from services.plate_lookup import search_plate, register_plate, is_valid_mexican_plate, normalize_plate
from config import CATALOG_OEM_ENABLED
catalog_bp = Blueprint('catalog', __name__, url_prefix='/pos/api/catalog')
def _oem_blocked():
"""Return a 403 response if OEM catalog is disabled."""
if not CATALOG_OEM_ENABLED:
return jsonify({
'error': 'Catálogo OEM no disponible',
'message': 'El catálogo OEM está en construcción. Por favor usa el modo Local o Shop Supplies.',
'oem_disabled': True,
}), 403
return None
def _get_allowed_brands(tenant_conn):
"""Read allowed part brands from tenant_config. Returns list or None."""
import json
cur = tenant_conn.cursor()
try:
cur.execute("SELECT value FROM tenant_config WHERE key = 'allowed_part_brands'")
row = cur.fetchone()
if row and row[0]:
try:
brands = json.loads(row[0])
if isinstance(brands, list) and brands:
return brands
except (json.JSONDecodeError, ValueError):
pass
finally:
cur.close()
return None
def _with_conns(fn):
"""Helper: open master + tenant connections, call fn, close both.
fn receives (master_conn, tenant_conn, branch_id).
"""
master = None
tenant = None
try:
master = get_master_conn()
tenant = get_tenant_conn(g.tenant_id)
branch_id = request.args.get('branch_id', g.branch_id)
return fn(master, tenant, branch_id)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if master:
try: master.close()
except: pass
if tenant:
try: tenant.close()
except: pass
def _master_only(fn):
"""Helper: open only master connection for hierarchy endpoints."""
master = None
try:
master = get_master_conn()
return fn(master)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if master:
try: master.close()
except: pass
def _filter_parts_by_allowed_brands(master_conn, parts_data, allowed_brands):
"""Filter a list of part dicts to only include those with aftermarket equivalents
from allowed brands. parts_data items must have 'id_part' or 'id' key."""
if not allowed_brands or not parts_data:
return parts_data
part_ids = []
for p in parts_data:
pid = p.get('id_part') or p.get('id')
# Skip local inventory IDs (strings like 'inv:3') — aftermarket filter
# only applies to catalog parts with integer OEM part IDs.
if pid is not None and isinstance(pid, int):
part_ids.append(pid)
if not part_ids:
return parts_data
cur = master_conn.cursor()
try:
cur.execute("""
SELECT DISTINCT ap.oem_part_id
FROM aftermarket_parts ap
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
WHERE ap.oem_part_id = ANY(%s) AND UPPER(m.name_manufacture) = ANY(%s)
""", (part_ids, allowed_brands))
allowed_ids = {r[0] for r in cur.fetchall()}
finally:
cur.close()
return [p for p in parts_data if (p.get('id_part') or p.get('id')) in allowed_ids]
# ─── Hierarchy navigation (master DB only) ───
@catalog_bp.route('/brands', methods=['GET'])
@require_auth('catalog.view')
def brands():
from services.catalog_modes import normalize_mode
year_id = request.args.get('year_id', type=int)
mode = normalize_mode(request.args.get('mode'))
def _do(master, tenant, branch_id):
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
data = catalog_service.get_brands(master, year_id=year_id, mode=mode, mye_ids=mye_ids)
return jsonify({'data': data, 'mode': mode})
return _with_conns(_do)
@catalog_bp.route('/models', methods=['GET'])
@require_auth('catalog.view')
def models():
brand_id = request.args.get('brand_id', type=int)
year_id = request.args.get('year_id', type=int)
if not brand_id:
return jsonify({'error': 'brand_id required'}), 400
def _do(master, tenant, branch_id):
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
data = catalog_service.get_models(master, brand_id, year_id=year_id, mye_ids=mye_ids)
return jsonify({'data': data})
return _with_conns(_do)
@catalog_bp.route('/years', methods=['GET'])
@require_auth('catalog.view')
def years():
model_id = request.args.get('model_id', type=int)
if not model_id:
return jsonify({'error': 'model_id required'}), 400
def _do(master, tenant, branch_id):
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
data = catalog_service.get_years(master, model_id, mye_ids=mye_ids)
return jsonify({'data': data})
return _with_conns(_do)
@catalog_bp.route('/years-all', methods=['GET'])
@require_auth('catalog.view')
def years_all():
"""Get all available years (for vehicle selector dropdown)."""
def _do(master):
cur = master.cursor()
cur.execute("SELECT DISTINCT id_year, year_car FROM years ORDER BY year_car DESC")
rows = cur.fetchall()
cur.close()
return jsonify({'data': [{'id_year': r[0], 'year_car': r[1]} for r in rows]})
return _master_only(_do)
@catalog_bp.route('/engines', methods=['GET'])
@require_auth('catalog.view')
def engines():
model_id = request.args.get('model_id', type=int)
year_id = request.args.get('year_id', type=int)
if not model_id or not year_id:
return jsonify({'error': 'model_id and year_id required'}), 400
def _do(master, tenant, branch_id):
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
data = catalog_service.get_engines(master, model_id, year_id, mye_ids=mye_ids)
return jsonify({'data': data})
return _with_conns(_do)
@catalog_bp.route('/categories', methods=['GET'])
@require_auth('catalog.view')
def categories():
"""Categories for a vehicle.
OEM mode: TecDoc part_categories (id_part_category, name).
Local mode: 14 Nexpart top-level groups, filtered by what's available
for this vehicle. Returns 'slug' (string) instead of integer id.
"""
from services.catalog_modes import normalize_mode
mye_id = request.args.get('mye_id', type=int)
mode = normalize_mode(request.args.get('mode'))
if not mye_id:
return jsonify({'error': 'mye_id required'}), 400
def _do(master, tenant, branch_id):
allowed_brands = _get_allowed_brands(tenant) if tenant else None
if mode == 'local':
data = catalog_service.get_nexpart_groups_for_vehicle(master, mye_id, tenant)
else:
data = catalog_service.get_categories(master, mye_id, allowed_brands)
return jsonify({'data': data, 'mode': mode, 'allowed_brands': allowed_brands or []})
return _with_conns(_do)
@catalog_bp.route('/groups', methods=['GET'])
@require_auth('catalog.view')
def groups():
"""Subgroups for a vehicle + parent category.
OEM mode: TecDoc part_groups within a TecDoc part_category (integer ids).
Local mode: Nexpart subgroups within a Nexpart group (string slugs).
"""
from services.catalog_modes import normalize_mode
mye_id = request.args.get('mye_id', type=int)
category_id = request.args.get('category_id', type=int)
category_slug = request.args.get('category_slug')
mode = normalize_mode(request.args.get('mode'))
if not mye_id:
return jsonify({'error': 'mye_id required'}), 400
def _do(master, tenant, branch_id):
if mode == 'local':
if not category_slug:
return jsonify({'error': 'category_slug required for local mode'}), 400
data = catalog_service.get_nexpart_subgroups_for_vehicle(master, mye_id, category_slug, tenant)
else:
if not category_id:
return jsonify({'error': 'category_id required for oem mode'}), 400
data = catalog_service.get_groups(master, mye_id, category_id)
return jsonify({'data': data, 'mode': mode})
return _with_conns(_do)
# ─── Parts with stock enrichment (master + tenant) ───
@catalog_bp.route('/part-types', methods=['GET'])
@require_auth('catalog.view')
def part_types():
"""Distinct part types (3rd subcategory level) for a vehicle + group/subgroup.
OEM mode: distinct name_part values within a TecDoc part_group_id.
Local mode: Nexpart Part Types within a Nexpart group + subgroup.
"""
from services.catalog_modes import normalize_mode
mye_id = request.args.get('mye_id', type=int)
group_id = request.args.get('group_id', type=int)
group_slug = request.args.get('group_slug') # parent Nexpart group
subgroup_slug = request.args.get('subgroup_slug') # current Nexpart subgroup
mode = normalize_mode(request.args.get('mode'))
if not mye_id:
return jsonify({'error': 'mye_id required'}), 400
def _do(master, tenant, branch_id):
if mode == 'local':
if not group_slug or not subgroup_slug:
return jsonify({'error': 'group_slug and subgroup_slug required for local mode'}), 400
data = catalog_service.get_nexpart_part_types_for_vehicle(
master, mye_id, group_slug, subgroup_slug, tenant
)
else:
if not group_id:
return jsonify({'error': 'group_id required for oem mode'}), 400
data = catalog_service.get_part_types(master, mye_id, group_id)
return jsonify({'data': data, 'mode': mode})
return _with_conns(_do)
@catalog_bp.route('/shop-supplies/groups', methods=['GET'])
@require_auth('catalog.view')
def shop_supplies_groups():
"""Vehicle-independent groups (Chemicals + Tires/Tools)."""
def _do(master):
data = catalog_service.get_shop_supplies_groups()
return jsonify({'data': data})
return _master_only(_do)
@catalog_bp.route('/shop-supplies/subgroups', methods=['GET'])
@require_auth('catalog.view')
def shop_supplies_subgroups():
group_slug = request.args.get('group_slug')
if not group_slug:
return jsonify({'error': 'group_slug required'}), 400
def _do(master):
data = catalog_service.get_shop_supplies_subgroups(master, group_slug)
return jsonify({'data': data})
return _master_only(_do)
@catalog_bp.route('/shop-supplies/part-types', methods=['GET'])
@require_auth('catalog.view')
def shop_supplies_part_types():
group_slug = request.args.get('group_slug')
subgroup_slug = request.args.get('subgroup_slug')
if not group_slug or not subgroup_slug:
return jsonify({'error': 'group_slug and subgroup_slug required'}), 400
def _do(master):
data = catalog_service.get_shop_supplies_part_types(master, group_slug, subgroup_slug)
return jsonify({'data': data})
return _master_only(_do)
@catalog_bp.route('/shop-supplies/parts', methods=['GET'])
@require_auth('catalog.view')
def shop_supplies_parts():
group_slug = request.args.get('group_slug')
subgroup_slug = request.args.get('subgroup_slug')
part_type_slug = request.args.get('part_type_slug')
page = max(1, request.args.get('page', 1, type=int) or 1)
per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100))
if not group_slug or not subgroup_slug or not part_type_slug:
return jsonify({'error': 'group_slug, subgroup_slug, part_type_slug required'}), 400
def _do(master, tenant, branch_id):
result = catalog_service.get_shop_supplies_parts(
master, group_slug, subgroup_slug, part_type_slug,
tenant, branch_id, page, per_page,
)
return jsonify(result)
return _with_conns(_do)
@catalog_bp.route('/parts', methods=['GET'])
@require_auth('catalog.view')
def parts():
"""Parts list for the deepest navigation level.
Three call shapes (the endpoint chooses based on which params are present):
A) OEM mode legacy:
?mode=oem&mye_id=&group_id=&part_type=...
B) Local mode legacy (TecDoc-style):
?mode=local&mye_id=&group_id=&part_type=...
C) Local mode Nexpart navigation (NEW):
?mode=local&mye_id=&nexpart_group=&nexpart_subgroup=&nexpart_part_type=
"""
from services.catalog_modes import normalize_mode
mye_id = request.args.get('mye_id', type=int)
group_id = request.args.get('group_id', type=int)
part_type = request.args.get('part_type') # optional 3rd-level (legacy)
# Nexpart navigation slugs (Local mode only)
nexpart_group = request.args.get('nexpart_group')
nexpart_subgroup = request.args.get('nexpart_subgroup')
nexpart_part_type = request.args.get('nexpart_part_type')
page = max(1, request.args.get('page', 1, type=int) or 1)
per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100))
mode = normalize_mode(request.args.get('mode'))
if not mye_id:
return jsonify({'error': 'mye_id required'}), 400
use_nexpart_nav = mode == 'local' and nexpart_group and nexpart_subgroup and nexpart_part_type
if not use_nexpart_nav and not group_id:
return jsonify({'error': 'group_id (or nexpart_group + subgroup + part_type) required'}), 400
# Block OEM catalog if not enabled
if mode != 'local' and not use_nexpart_nav:
blocked = _oem_blocked()
if blocked:
return blocked
def _do(master, tenant, branch_id):
allowed_brands = _get_allowed_brands(tenant) if tenant else None
# For local mode with allowed_brands, fetch everything first so filtering
# happens before pagination. OEM mode keeps post-filter for now.
fetch_all_for_filter = bool(allowed_brands) and (mode == 'local' or use_nexpart_nav)
_page = 1 if fetch_all_for_filter else page
_per_page = 9999 if fetch_all_for_filter else per_page
if use_nexpart_nav:
result = catalog_service.get_parts_for_nexpart_triple(
master, mye_id, nexpart_group, nexpart_subgroup, nexpart_part_type,
tenant, branch_id, _page, _per_page, tenant_id=g.tenant_id,
)
elif mode == 'local':
result = catalog_service.get_parts_local(
master, mye_id, group_id, tenant, branch_id, _page, _per_page, part_type=part_type,
)
else:
result = catalog_service.get_parts(
master, mye_id, group_id, tenant, branch_id, page, per_page, part_type=part_type,
)
if allowed_brands:
result['data'] = _filter_parts_by_allowed_brands(master, result.get('data', []), allowed_brands)
if fetch_all_for_filter:
total = len(result['data'])
offset = (page - 1) * per_page
result['data'] = result['data'][offset:offset + per_page]
result['pagination'] = catalog_service._pagination(page, per_page, total)
result['allowed_brands'] = allowed_brands or []
return jsonify(result)
return _with_conns(_do)
@catalog_bp.route('/part/<int:part_id>', methods=['GET'])
@require_auth('catalog.view')
def part_detail(part_id):
# Part detail is available in both local and OEM modes
# — it reads from the master parts DB and enriches with local stock.
def _do(master, tenant, branch_id):
result = catalog_service.get_part_detail(master, part_id, tenant, branch_id)
if not result:
return jsonify({'error': 'Part not found'}), 404
return jsonify(result)
return _with_conns(_do)
@catalog_bp.route('/search', methods=['GET'])
@require_auth('catalog.view')
def search():
# Search is available in both local and OEM modes
# — it reads from the master parts DB and enriches with local stock.
q = request.args.get('q', '').strip()
if not q or len(q) < 2:
return jsonify({'data': []})
limit = request.args.get('limit', 50, type=int)
mye_id = request.args.get('mye_id', type=int)
def _do(master, tenant, branch_id):
allowed_brands = _get_allowed_brands(tenant) if tenant else None
data = catalog_service.smart_search(master, q, tenant, branch_id, limit, mye_id, tenant_id=g.tenant_id)
if allowed_brands:
data = _filter_parts_by_allowed_brands(master, data, allowed_brands)
return jsonify({'data': data, 'allowed_brands': allowed_brands or []})
return _with_conns(_do)
# ─── VIN Decoder ───
@catalog_bp.route('/vin/<vin>', methods=['GET'])
@require_auth('catalog.view')
def decode_vin_route(vin):
"""Decode a VIN and try to match to a brand/model/year in our catalog DB."""
vin = (vin or "").strip().upper()
if len(vin) != 17:
return jsonify({'error': 'VIN debe tener exactamente 17 caracteres.'}), 400
try:
info = decode_vin(vin)
except Exception as e:
return jsonify({'error': f'Error al decodificar VIN: {str(e)}'}), 502
if info.get('error'):
return jsonify(info), 200 # Return info even with partial errors
# Try to match the decoded vehicle to our catalog DB
db_match = None
master = None
try:
master = get_master_conn()
db_match = _match_vin_to_catalog(master, info)
except Exception:
pass
finally:
if master:
try:
master.close()
except Exception:
pass
result = {**info}
if db_match:
result['catalog_match'] = db_match
return jsonify(result)
# ─── Plate Lookup ───
@catalog_bp.route('/plate/<plate>', methods=['GET'])
@require_auth('catalog.view')
def plate_lookup(plate):
"""Look up a vehicle by Mexican license plate in the local plate_vehicles table.
If found, also tries to match the vehicle to the catalog DB.
"""
plate = (plate or '').strip()
if not plate:
return jsonify({'error': 'Placa requerida.'}), 400
if not is_valid_mexican_plate(plate):
return jsonify({'error': 'Formato de placa no valido. Ej: ABC-1234 o AB-123-C'}), 400
tenant = None
master = None
try:
tenant = get_tenant_conn(g.tenant_id)
result = search_plate(tenant, plate)
if not result:
return jsonify({
'found': False,
'plate': normalize_plate(plate),
'message': 'Placa no registrada.'
})
# Try to match to catalog
catalog_match = None
try:
master = get_master_conn()
catalog_match = _match_plate_to_catalog(master, result)
except Exception:
pass
finally:
if master:
try: master.close()
except: pass
master = None
response = {
'found': True,
'plate': result['plate'],
'make': result['make'],
'model': result['model'],
'year': result['year'],
'vin': result['vin'],
'customer_id': result['customer_id'],
}
if catalog_match:
response['catalog_match'] = catalog_match
return jsonify(response)
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if tenant:
try: tenant.close()
except: pass
if master:
try: master.close()
except: pass
@catalog_bp.route('/plate', methods=['POST'])
@require_auth('catalog.view')
def plate_register():
"""Register or update a plate-to-vehicle mapping."""
data = request.get_json() or {}
plate = (data.get('plate') or '').strip()
if not plate:
return jsonify({'error': 'plate required'}), 400
if not is_valid_mexican_plate(plate):
return jsonify({'error': 'Formato de placa no valido.'}), 400
tenant = None
try:
tenant = get_tenant_conn(g.tenant_id)
rec_id = register_plate(
tenant, plate,
make=data.get('make'),
model=data.get('model'),
year=data.get('year'),
vin=data.get('vin'),
customer_id=data.get('customer_id'),
)
return jsonify({'id': rec_id, 'message': 'Placa registrada.'})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
if tenant:
try: tenant.close()
except: pass
def _match_plate_to_catalog(master_conn, plate_info):
"""Try to match plate vehicle info to the catalog DB (same logic as VIN)."""
return _match_vin_to_catalog(master_conn, {
'make': plate_info.get('make'),
'model': plate_info.get('model'),
'year': plate_info.get('year'),
})
def _match_vin_to_catalog(master_conn, vin_info):
"""Try to find brand_id, model_id, year_id, mye_id from decoded VIN info."""
make = (vin_info.get('make') or '').upper().strip()
model = (vin_info.get('model') or '').strip()
year = vin_info.get('year')
if not make:
return None
cur = master_conn.cursor()
result = {}
try:
# Find brand (try exact, then LIKE)
cur.execute(
"SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) = %s",
(make,)
)
brand_row = cur.fetchone()
if not brand_row:
cur.execute(
"SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) LIKE %s ORDER BY name_brand LIMIT 1",
(f"%{make}%",)
)
brand_row = cur.fetchone()
if not brand_row:
return None
result['brand_id'] = brand_row[0]
result['brand_name'] = brand_row[1]
# Find model
if model:
cur.execute(
"""SELECT m.id_model, m.name_model
FROM models m
WHERE m.brand_id = %s AND UPPER(m.name_model) LIKE %s
ORDER BY m.name_model LIMIT 5""",
(brand_row[0], f"%{model.upper()}%")
)
model_row = cur.fetchone()
if model_row:
result['model_id'] = model_row[0]
result['model_name'] = model_row[1]
# Find year
if year:
cur.execute(
"SELECT id_year, year_car FROM years WHERE year_car = %s",
(int(year),)
)
year_row = cur.fetchone()
if year_row:
result['year_id'] = year_row[0]
result['year_car'] = year_row[1]
# Find MYE options
cur.execute(
"""SELECT mye.id_mye, e.name_engine, mye.trim_level
FROM model_year_engine mye
JOIN engines e ON e.id_engine = mye.engine_id
WHERE mye.model_id = %s AND mye.year_id = %s
ORDER BY e.name_engine
LIMIT 10""",
(model_row[0], year_row[0])
)
mye_rows = cur.fetchall()
if mye_rows:
result['engines'] = [
{'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2]}
for r in mye_rows
]
# Auto-select if only one engine
if len(mye_rows) == 1:
result['id_mye'] = mye_rows[0][0]
return result
except Exception:
return None
finally:
cur.close()
# ─── Brand Catalog (vehicle-brand-first navigation) ───
@catalog_bp.route('/vehicle-brands', methods=['GET'])
@require_auth('catalog.view')
def vehicle_brands():
"""Return North American vehicle brands for brand-first catalog browsing.
Uses the same OEM_BRANDS_NA filter as the regular catalog so that
the brand list is consistent across both navigation modes.
"""
from services.catalog_modes import get_brands_for_mode
allowed = list(get_brands_for_mode('oem'))
def _query(master):
cur = master.cursor()
try:
cur.execute("""
SELECT id_brand, name_brand
FROM brands
WHERE name_brand = ANY(%s)
ORDER BY name_brand ASC
""", (allowed,))
rows = cur.fetchall()
return jsonify({
'brands': [
{'id': r[0], 'name': r[1], 'part_count': 0}
for r in rows
]
})
finally:
cur.close()
return _master_only(_query)
@catalog_bp.route('/brand-categories', methods=['GET'])
@require_auth('catalog.view')
def brand_categories():
"""Return part categories available for a given vehicle brand."""
brand = request.args.get('brand', '')
if not brand:
return jsonify({'error': 'brand parameter required'}), 400
def _query(master, tenant, branch_id):
cur = master.cursor()
try:
allowed_brands = _get_allowed_brands(tenant) if tenant else None
brand_filter = ""
params = [brand]
if allowed_brands:
brand_filter = """AND EXISTS (
SELECT 1 FROM aftermarket_parts ap2
JOIN manufacturers m2 ON m2.id_manufacture = ap2.manufacturer_id
WHERE ap2.oem_part_id = p.id_part AND UPPER(m2.name_manufacture) = ANY(%s)
)"""
params.append(allowed_brands)
cur.execute(f"""
SELECT pc.id_part_category,
COALESCE(NULLIF(pc.name_es, ''), pc.name_part_category) as name,
pc.slug,
COUNT(DISTINCT p.id_part) as part_count
FROM part_vehicle_preview pvp
JOIN parts p ON p.id_part = pvp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE pvp.name_brand = %s
{brand_filter}
GROUP BY pc.id_part_category, pc.name_part_category, pc.name_es, pc.slug
ORDER BY part_count DESC
""", params)
rows = cur.fetchall()
return jsonify({
'brand': brand,
'categories': [
{'id': r[0], 'name': r[1], 'slug': r[2], 'part_count': r[3]}
for r in rows
],
'allowed_brands': allowed_brands or []
})
finally:
cur.close()
return _with_conns(_query)
@catalog_bp.route('/brand-parts', methods=['GET'])
@require_auth('catalog.view')
def brand_parts():
"""Return parts for a given vehicle brand + category, optionally filtered by search term."""
brand = request.args.get('brand', '')
category_id = request.args.get('category_id', type=int)
search = request.args.get('search', '').strip()
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not brand:
return jsonify({'error': 'brand parameter required'}), 400
def _query(master, tenant, branch_id):
cur = master.cursor()
try:
allowed_brands = _get_allowed_brands(tenant) if tenant else None
cat_filter = ""
search_filter = ""
params = [brand]
if category_id:
cat_filter = "AND pc.id_part_category = %s"
params.append(category_id)
# --- Brand-filtered mode: return aftermarket parts directly ---
if allowed_brands:
am_search = ""
am_params = list(params)
if search:
am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)"
like_term = f"%{search}%"
am_params.extend([like_term, like_term])
query_params = list(am_params)
cur.execute(f"""
SELECT DISTINCT ap.id_aftermarket_parts,
ap.part_number,
COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name,
m.name_manufacture,
ap.price_usd,
p.id_part,
pg.id_part_group, pg.name_part_group,
pc.id_part_category, pc.name_part_category
FROM part_vehicle_preview pvp
JOIN parts p ON p.id_part = pvp.part_id
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE pvp.name_brand = %s
{cat_filter}
{am_search}
AND UPPER(m.name_manufacture) = ANY(%s)
ORDER BY m.name_manufacture, ap.part_number
LIMIT %s OFFSET %s
""", query_params + [allowed_brands, limit, offset])
part_rows = cur.fetchall()
oem_ids = [r[5] for r in part_rows]
count_params = list(am_params)
cur.execute(f"""
SELECT COUNT(DISTINCT ap.id_aftermarket_parts)
FROM part_vehicle_preview pvp
JOIN parts p ON p.id_part = pvp.part_id
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE pvp.name_brand = %s
{cat_filter}
{am_search}
AND UPPER(m.name_manufacture) = ANY(%s)
""", count_params + [allowed_brands])
total = cur.fetchone()[0]
local_stock = {}
if tenant and oem_ids:
try:
from services.catalog_service import _get_local_stock_bulk
local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, [])
except Exception:
pass
items = []
for r in part_rows:
oem_id = r[5]
stock_info = local_stock.get(oem_id, {})
items.append({
'id': r[0],
'oem_part_number': r[1],
'name': r[2],
'manufacturer': r[3],
'price_usd': float(r[4]) if r[4] is not None else None,
'oem_id': oem_id,
'group': {'id': r[6], 'name': r[7]},
'category': {'id': r[8], 'name': r[9]},
'local_stock': stock_info.get('stock', 0),
'local_price': stock_info.get('price', None),
})
return jsonify({
'brand': brand,
'category_id': category_id,
'search': search,
'items': items,
'total': total,
'limit': limit,
'offset': offset,
'allowed_brands': allowed_brands
})
# --- Normal mode: return OEM parts ---
if search:
search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)"
like_term = f"%{search}%"
params.extend([like_term, like_term])
query_params = list(params)
cur.execute(f"""
SELECT DISTINCT p.id_part, p.oem_part_number,
COALESCE(NULLIF(p.name_es, ''), p.name_part) as name,
pg.id_part_group, pg.name_part_group,
pc.id_part_category, pc.name_part_category
FROM part_vehicle_preview pvp
JOIN parts p ON p.id_part = pvp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE pvp.name_brand = %s
{cat_filter}
{search_filter}
ORDER BY p.id_part
LIMIT %s OFFSET %s
""", query_params + [limit, offset])
part_rows = cur.fetchall()
part_ids = [r[0] for r in part_rows]
count_params = list(params)
cur.execute(f"""
SELECT COUNT(DISTINCT p.id_part)
FROM part_vehicle_preview pvp
JOIN parts p ON p.id_part = pvp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE pvp.name_brand = %s
{cat_filter}
{search_filter}
""", count_params)
total = cur.fetchone()[0]
local_stock = {}
if tenant and part_ids:
try:
from services.catalog_service import _get_local_stock_bulk
local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids)
except Exception:
pass
items = []
for r in part_rows:
part_id = r[0]
stock_info = local_stock.get(part_id, {})
items.append({
'id': part_id,
'oem_part_number': r[1],
'name': r[2],
'group': {'id': r[3], 'name': r[4]},
'category': {'id': r[5], 'name': r[6]},
'local_stock': stock_info.get('stock', 0),
'local_price': stock_info.get('price', None),
})
return jsonify({
'brand': brand,
'category_id': category_id,
'search': search,
'items': items,
'total': total,
'limit': limit,
'offset': offset,
'allowed_brands': []
})
finally:
cur.close()
return _with_conns(_query)
@catalog_bp.route('/mye-parts', methods=['GET'])
@require_auth('catalog.view')
def mye_parts():
"""Return parts for a specific MYE + category (brand-catalog flow).
Skips the group/subgroup level and goes directly from category to parts.
"""
mye_id = request.args.get('mye_id', type=int)
category_id = request.args.get('category_id', type=int)
search = request.args.get('search', '').strip()
limit = request.args.get('limit', 50, type=int)
offset = request.args.get('offset', 0, type=int)
if not mye_id:
return jsonify({'error': 'mye_id required'}), 400
def _query(master, tenant, branch_id):
cur = master.cursor()
try:
allowed_brands = _get_allowed_brands(tenant) if tenant else None
cat_filter = ""
search_filter = ""
params = [mye_id]
if category_id:
cat_filter = "AND pc.id_part_category = %s"
params.append(category_id)
# --- Brand-filtered mode: return aftermarket parts directly ---
if allowed_brands:
am_search = ""
am_params = list(params)
if search:
am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)"
like_term = f"%{search}%"
am_params.extend([like_term, like_term])
# Get aftermarket parts
query_params = list(am_params)
cur.execute(f"""
SELECT DISTINCT ap.id_aftermarket_parts,
ap.part_number,
COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name,
m.name_manufacture,
ap.price_usd,
p.id_part,
pg.id_part_group, pg.name_part_group,
pc.id_part_category, pc.name_part_category
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE vp.model_year_engine_id = %s
{cat_filter}
{am_search}
AND UPPER(m.name_manufacture) = ANY(%s)
ORDER BY m.name_manufacture, ap.part_number
LIMIT %s OFFSET %s
""", query_params + [allowed_brands, limit, offset])
part_rows = cur.fetchall()
oem_ids = [r[5] for r in part_rows]
# Count total
count_params = list(am_params)
cur.execute(f"""
SELECT COUNT(DISTINCT ap.id_aftermarket_parts)
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE vp.model_year_engine_id = %s
{cat_filter}
{am_search}
AND UPPER(m.name_manufacture) = ANY(%s)
""", count_params + [allowed_brands])
total = cur.fetchone()[0]
# Local stock keyed by OEM part id
local_stock = {}
if tenant and oem_ids:
try:
from services.catalog_service import _get_local_stock_bulk
local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, [])
except Exception:
pass
items = []
for r in part_rows:
oem_id = r[5]
stock_info = local_stock.get(oem_id, {})
items.append({
'id': r[0],
'oem_part_number': r[1],
'name': r[2],
'manufacturer': r[3],
'price_usd': float(r[4]) if r[4] is not None else None,
'oem_id': oem_id,
'group': {'id': r[6], 'name': r[7]},
'category': {'id': r[8], 'name': r[9]},
'local_stock': stock_info.get('stock', 0),
'local_price': stock_info.get('price', None),
})
return jsonify({
'mye_id': mye_id,
'category_id': category_id,
'search': search,
'items': items,
'total': total,
'limit': limit,
'offset': offset,
'allowed_brands': allowed_brands
})
# --- Normal mode: return OEM parts ---
if search:
search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)"
like_term = f"%{search}%"
params.extend([like_term, like_term])
query_params = list(params)
cur.execute(f"""
SELECT DISTINCT p.id_part, p.oem_part_number,
COALESCE(NULLIF(p.name_es, ''), p.name_part) as name,
pg.id_part_group, pg.name_part_group,
pc.id_part_category, pc.name_part_category
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE vp.model_year_engine_id = %s
{cat_filter}
{search_filter}
ORDER BY p.id_part
LIMIT %s OFFSET %s
""", query_params + [limit, offset])
part_rows = cur.fetchall()
part_ids = [r[0] for r in part_rows]
count_params = list(params)
cur.execute(f"""
SELECT COUNT(DISTINCT p.id_part)
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE vp.model_year_engine_id = %s
{cat_filter}
{search_filter}
""", count_params)
total = cur.fetchone()[0]
local_stock = {}
if tenant and part_ids:
try:
from services.catalog_service import _get_local_stock_bulk
local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids)
except Exception:
pass
items = []
for r in part_rows:
part_id = r[0]
stock_info = local_stock.get(part_id, {})
items.append({
'id': part_id,
'oem_part_number': r[1],
'name': r[2],
'group': {'id': r[3], 'name': r[4]},
'category': {'id': r[5], 'name': r[6]},
'local_stock': stock_info.get('stock', 0),
'local_price': stock_info.get('price', None),
})
return jsonify({
'mye_id': mye_id,
'category_id': category_id,
'search': search,
'items': items,
'total': total,
'limit': limit,
'offset': offset,
'allowed_brands': []
})
finally:
cur.close()
return _with_conns(_query)