Fase 1: Lista de precios de proveedor - Tabla supplier_catalog_prices en master DB - Endpoints GET/POST/PUT/DELETE /supplier-catalog/prices - Upload CSV/Excel de precios de proveedor - Visualizacion de supplier_price en catalogo y POS Fase 2: Multi-sucursal completo - Migracion v4.0: inventory.branch_id=NULL, tabla inventory_stock - Campos fiscales en branches (RFC, regimen, CP, serie CFDI, certificados) - Trigger trg_update_inventory_stock para sincronizar stock por sucursal - Backend config_bp.py con CRUD de sucursales fiscales - Backend inventory_bp.py y pos_bp.py refactorizados para inventario compartido - Backend invoicing_bp.py usa datos fiscales de la sucursal de la venta - Frontend config.html/js con modal de sucursales expandido Fase 3: Factura global mensual - Migracion v4.1: tablas global_invoice_sales, sales.global_invoiced_at - build_global_invoice_xml() con InformacionGlobal SAT-compliant - Servicio global_invoice.py para agrupar ventas PUE <=000 - Endpoints POST/GET /global-invoice y /global-invoice/eligible-sales - Frontend invoicing.html/js con boton y modal de factura global
1133 lines
44 KiB
Python
1133 lines
44 KiB
Python
# /home/Autopartes/pos/blueprints/catalog_bp.py
|
|
"""Catalog blueprint: TecDoc vehicle navigation with local stock enrichment.
|
|
|
|
Endpoints (all under /pos/api/catalog):
|
|
GET /brands — vehicle brands with parts
|
|
GET /models?brand_id= — models for a brand
|
|
GET /years?model_id= — years for a model
|
|
GET /engines?model_id=&year_id= — engines for model+year
|
|
GET /categories?mye_id= — part categories for vehicle
|
|
GET /groups?mye_id=&category_id= — part subcategories for vehicle+category
|
|
GET /parts?mye_id=&group_id= — parts with local stock enrichment
|
|
GET /part/<part_id> — full part detail (stock + bodegas + alternatives)
|
|
GET /search?q= — smart search (part number or text)
|
|
"""
|
|
|
|
from flask import Blueprint, request, jsonify, g
|
|
from middleware import require_auth
|
|
from tenant_db import get_master_conn, get_tenant_conn
|
|
from services import catalog_service
|
|
from services.vin_decoder import decode_vin
|
|
from services.plate_lookup import search_plate, register_plate, is_valid_mexican_plate, normalize_plate
|
|
from config import CATALOG_OEM_ENABLED
|
|
|
|
catalog_bp = Blueprint('catalog', __name__, url_prefix='/pos/api/catalog')
|
|
|
|
|
|
def _oem_blocked():
|
|
"""Return a 403 response if OEM catalog is disabled."""
|
|
if not CATALOG_OEM_ENABLED:
|
|
return jsonify({
|
|
'error': 'Catálogo OEM no disponible',
|
|
'message': 'El catálogo OEM está en construcción. Por favor usa el modo Local o Shop Supplies.',
|
|
'oem_disabled': True,
|
|
}), 403
|
|
return None
|
|
|
|
|
|
def _get_allowed_brands(tenant_conn):
|
|
"""Read allowed part brands from tenant_config. Returns list or None."""
|
|
import json
|
|
cur = tenant_conn.cursor()
|
|
try:
|
|
cur.execute("SELECT value FROM tenant_config WHERE key = 'allowed_part_brands'")
|
|
row = cur.fetchone()
|
|
if row and row[0]:
|
|
try:
|
|
brands = json.loads(row[0])
|
|
if isinstance(brands, list) and brands:
|
|
return brands
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
finally:
|
|
cur.close()
|
|
return None
|
|
|
|
|
|
def _with_conns(fn):
|
|
"""Helper: open master + tenant connections, call fn, close both.
|
|
fn receives (master_conn, tenant_conn, branch_id).
|
|
"""
|
|
master = None
|
|
tenant = None
|
|
try:
|
|
master = get_master_conn()
|
|
tenant = get_tenant_conn(g.tenant_id)
|
|
branch_id = request.args.get('branch_id', g.branch_id)
|
|
return fn(master, tenant, branch_id)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
if master:
|
|
try: master.close()
|
|
except: pass
|
|
if tenant:
|
|
try: tenant.close()
|
|
except: pass
|
|
|
|
|
|
def _master_only(fn):
|
|
"""Helper: open only master connection for hierarchy endpoints."""
|
|
master = None
|
|
try:
|
|
master = get_master_conn()
|
|
return fn(master)
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
if master:
|
|
try: master.close()
|
|
except: pass
|
|
|
|
|
|
def _filter_parts_by_allowed_brands(master_conn, parts_data, allowed_brands):
|
|
"""Filter a list of part dicts to only include those with aftermarket equivalents
|
|
from allowed brands. parts_data items must have 'id_part' or 'id' key."""
|
|
if not allowed_brands or not parts_data:
|
|
return parts_data
|
|
part_ids = []
|
|
for p in parts_data:
|
|
pid = p.get('id_part') or p.get('id')
|
|
# Skip local inventory IDs (strings like 'inv:3') — aftermarket filter
|
|
# only applies to catalog parts with integer OEM part IDs.
|
|
if pid is not None and isinstance(pid, int):
|
|
part_ids.append(pid)
|
|
if not part_ids:
|
|
return parts_data
|
|
cur = master_conn.cursor()
|
|
try:
|
|
cur.execute("""
|
|
SELECT DISTINCT ap.oem_part_id
|
|
FROM aftermarket_parts ap
|
|
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
|
|
WHERE ap.oem_part_id = ANY(%s) AND UPPER(m.name_manufacture) = ANY(%s)
|
|
""", (part_ids, allowed_brands))
|
|
allowed_ids = {r[0] for r in cur.fetchall()}
|
|
finally:
|
|
cur.close()
|
|
return [p for p in parts_data if (p.get('id_part') or p.get('id')) in allowed_ids]
|
|
|
|
|
|
# ─── Hierarchy navigation (master DB only) ───
|
|
|
|
@catalog_bp.route('/brands', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def brands():
|
|
from services.catalog_modes import normalize_mode
|
|
year_id = request.args.get('year_id', type=int)
|
|
mode = normalize_mode(request.args.get('mode'))
|
|
def _do(master, tenant, branch_id):
|
|
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
|
|
data = catalog_service.get_brands(master, year_id=year_id, mode=mode, mye_ids=mye_ids)
|
|
return jsonify({'data': data, 'mode': mode})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/models', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def models():
|
|
brand_id = request.args.get('brand_id', type=int)
|
|
year_id = request.args.get('year_id', type=int)
|
|
if not brand_id:
|
|
return jsonify({'error': 'brand_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
|
|
data = catalog_service.get_models(master, brand_id, year_id=year_id, mye_ids=mye_ids)
|
|
return jsonify({'data': data})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/years', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def years():
|
|
model_id = request.args.get('model_id', type=int)
|
|
if not model_id:
|
|
return jsonify({'error': 'model_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
|
|
data = catalog_service.get_years(master, model_id, mye_ids=mye_ids)
|
|
return jsonify({'data': data})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/years-all', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def years_all():
|
|
"""Get all available years (for vehicle selector dropdown)."""
|
|
def _do(master):
|
|
cur = master.cursor()
|
|
cur.execute("SELECT DISTINCT id_year, year_car FROM years ORDER BY year_car DESC")
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return jsonify({'data': [{'id_year': r[0], 'year_car': r[1]} for r in rows]})
|
|
return _master_only(_do)
|
|
|
|
|
|
@catalog_bp.route('/engines', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def engines():
|
|
model_id = request.args.get('model_id', type=int)
|
|
year_id = request.args.get('year_id', type=int)
|
|
if not model_id or not year_id:
|
|
return jsonify({'error': 'model_id and year_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
mye_ids = catalog_service._get_mye_ids_with_parts(tenant, tenant_id=g.tenant_id, master_conn=master) if tenant else None
|
|
data = catalog_service.get_engines(master, model_id, year_id, mye_ids=mye_ids)
|
|
return jsonify({'data': data})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/categories', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def categories():
|
|
"""Categories for a vehicle.
|
|
|
|
OEM mode: TecDoc part_categories (id_part_category, name).
|
|
Local mode: 14 Nexpart top-level groups, filtered by what's available
|
|
for this vehicle. Returns 'slug' (string) instead of integer id.
|
|
"""
|
|
from services.catalog_modes import normalize_mode
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
mode = normalize_mode(request.args.get('mode'))
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
if mode == 'local':
|
|
data = catalog_service.get_nexpart_groups_for_vehicle(master, mye_id, tenant)
|
|
else:
|
|
data = catalog_service.get_categories(master, mye_id, allowed_brands)
|
|
return jsonify({'data': data, 'mode': mode, 'allowed_brands': allowed_brands or []})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/groups', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def groups():
|
|
"""Subgroups for a vehicle + parent category.
|
|
|
|
OEM mode: TecDoc part_groups within a TecDoc part_category (integer ids).
|
|
Local mode: Nexpart subgroups within a Nexpart group (string slugs).
|
|
"""
|
|
from services.catalog_modes import normalize_mode
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
category_id = request.args.get('category_id', type=int)
|
|
category_slug = request.args.get('category_slug')
|
|
mode = normalize_mode(request.args.get('mode'))
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
if mode == 'local':
|
|
if not category_slug:
|
|
return jsonify({'error': 'category_slug required for local mode'}), 400
|
|
data = catalog_service.get_nexpart_subgroups_for_vehicle(master, mye_id, category_slug, tenant)
|
|
else:
|
|
if not category_id:
|
|
return jsonify({'error': 'category_id required for oem mode'}), 400
|
|
data = catalog_service.get_groups(master, mye_id, category_id)
|
|
return jsonify({'data': data, 'mode': mode})
|
|
return _with_conns(_do)
|
|
|
|
|
|
# ─── Parts with stock enrichment (master + tenant) ───
|
|
|
|
@catalog_bp.route('/part-types', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def part_types():
|
|
"""Distinct part types (3rd subcategory level) for a vehicle + group/subgroup.
|
|
|
|
OEM mode: distinct name_part values within a TecDoc part_group_id.
|
|
Local mode: Nexpart Part Types within a Nexpart group + subgroup.
|
|
"""
|
|
from services.catalog_modes import normalize_mode
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
group_id = request.args.get('group_id', type=int)
|
|
group_slug = request.args.get('group_slug') # parent Nexpart group
|
|
subgroup_slug = request.args.get('subgroup_slug') # current Nexpart subgroup
|
|
mode = normalize_mode(request.args.get('mode'))
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
if mode == 'local':
|
|
if not group_slug or not subgroup_slug:
|
|
return jsonify({'error': 'group_slug and subgroup_slug required for local mode'}), 400
|
|
data = catalog_service.get_nexpart_part_types_for_vehicle(
|
|
master, mye_id, group_slug, subgroup_slug, tenant
|
|
)
|
|
else:
|
|
if not group_id:
|
|
return jsonify({'error': 'group_id required for oem mode'}), 400
|
|
data = catalog_service.get_part_types(master, mye_id, group_id)
|
|
return jsonify({'data': data, 'mode': mode})
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/shop-supplies/groups', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def shop_supplies_groups():
|
|
"""Vehicle-independent groups (Chemicals + Tires/Tools)."""
|
|
def _do(master):
|
|
data = catalog_service.get_shop_supplies_groups()
|
|
return jsonify({'data': data})
|
|
return _master_only(_do)
|
|
|
|
|
|
@catalog_bp.route('/shop-supplies/subgroups', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def shop_supplies_subgroups():
|
|
group_slug = request.args.get('group_slug')
|
|
if not group_slug:
|
|
return jsonify({'error': 'group_slug required'}), 400
|
|
def _do(master):
|
|
data = catalog_service.get_shop_supplies_subgroups(master, group_slug)
|
|
return jsonify({'data': data})
|
|
return _master_only(_do)
|
|
|
|
|
|
@catalog_bp.route('/shop-supplies/part-types', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def shop_supplies_part_types():
|
|
group_slug = request.args.get('group_slug')
|
|
subgroup_slug = request.args.get('subgroup_slug')
|
|
if not group_slug or not subgroup_slug:
|
|
return jsonify({'error': 'group_slug and subgroup_slug required'}), 400
|
|
def _do(master):
|
|
data = catalog_service.get_shop_supplies_part_types(master, group_slug, subgroup_slug)
|
|
return jsonify({'data': data})
|
|
return _master_only(_do)
|
|
|
|
|
|
@catalog_bp.route('/shop-supplies/parts', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def shop_supplies_parts():
|
|
group_slug = request.args.get('group_slug')
|
|
subgroup_slug = request.args.get('subgroup_slug')
|
|
part_type_slug = request.args.get('part_type_slug')
|
|
page = max(1, request.args.get('page', 1, type=int) or 1)
|
|
per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100))
|
|
if not group_slug or not subgroup_slug or not part_type_slug:
|
|
return jsonify({'error': 'group_slug, subgroup_slug, part_type_slug required'}), 400
|
|
def _do(master, tenant, branch_id):
|
|
result = catalog_service.get_shop_supplies_parts(
|
|
master, group_slug, subgroup_slug, part_type_slug,
|
|
tenant, branch_id, page, per_page,
|
|
)
|
|
return jsonify(result)
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/parts', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def parts():
|
|
"""Parts list for the deepest navigation level.
|
|
|
|
Three call shapes (the endpoint chooses based on which params are present):
|
|
|
|
A) OEM mode legacy:
|
|
?mode=oem&mye_id=&group_id=&part_type=...
|
|
B) Local mode legacy (TecDoc-style):
|
|
?mode=local&mye_id=&group_id=&part_type=...
|
|
C) Local mode Nexpart navigation (NEW):
|
|
?mode=local&mye_id=&nexpart_group=&nexpart_subgroup=&nexpart_part_type=
|
|
"""
|
|
from services.catalog_modes import normalize_mode
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
group_id = request.args.get('group_id', type=int)
|
|
part_type = request.args.get('part_type') # optional 3rd-level (legacy)
|
|
|
|
# Nexpart navigation slugs (Local mode only)
|
|
nexpart_group = request.args.get('nexpart_group')
|
|
nexpart_subgroup = request.args.get('nexpart_subgroup')
|
|
nexpart_part_type = request.args.get('nexpart_part_type')
|
|
|
|
page = max(1, request.args.get('page', 1, type=int) or 1)
|
|
per_page = max(1, min(request.args.get('per_page', 30, type=int) or 30, 100))
|
|
mode = normalize_mode(request.args.get('mode'))
|
|
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id required'}), 400
|
|
|
|
use_nexpart_nav = mode == 'local' and nexpart_group and nexpart_subgroup and nexpart_part_type
|
|
|
|
if not use_nexpart_nav and not group_id:
|
|
return jsonify({'error': 'group_id (or nexpart_group + subgroup + part_type) required'}), 400
|
|
|
|
# Block OEM catalog if not enabled
|
|
if mode != 'local' and not use_nexpart_nav:
|
|
blocked = _oem_blocked()
|
|
if blocked:
|
|
return blocked
|
|
|
|
def _do(master, tenant, branch_id):
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
# For local mode with allowed_brands, fetch everything first so filtering
|
|
# happens before pagination. OEM mode keeps post-filter for now.
|
|
fetch_all_for_filter = bool(allowed_brands) and (mode == 'local' or use_nexpart_nav)
|
|
_page = 1 if fetch_all_for_filter else page
|
|
_per_page = 9999 if fetch_all_for_filter else per_page
|
|
|
|
if use_nexpart_nav:
|
|
result = catalog_service.get_parts_for_nexpart_triple(
|
|
master, mye_id, nexpart_group, nexpart_subgroup, nexpart_part_type,
|
|
tenant, branch_id, _page, _per_page, tenant_id=g.tenant_id,
|
|
)
|
|
elif mode == 'local':
|
|
result = catalog_service.get_parts_local(
|
|
master, mye_id, group_id, tenant, branch_id, _page, _per_page, part_type=part_type,
|
|
)
|
|
else:
|
|
result = catalog_service.get_parts(
|
|
master, mye_id, group_id, tenant, branch_id, page, per_page, part_type=part_type,
|
|
)
|
|
if allowed_brands:
|
|
result['data'] = _filter_parts_by_allowed_brands(master, result.get('data', []), allowed_brands)
|
|
if fetch_all_for_filter:
|
|
total = len(result['data'])
|
|
offset = (page - 1) * per_page
|
|
result['data'] = result['data'][offset:offset + per_page]
|
|
result['pagination'] = catalog_service._pagination(page, per_page, total)
|
|
result['allowed_brands'] = allowed_brands or []
|
|
return jsonify(result)
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/part/<int:part_id>', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def part_detail(part_id):
|
|
# Part detail is available in both local and OEM modes
|
|
# — it reads from the master parts DB and enriches with local stock.
|
|
def _do(master, tenant, branch_id):
|
|
result = catalog_service.get_part_detail(master, part_id, tenant, branch_id)
|
|
if not result:
|
|
return jsonify({'error': 'Part not found'}), 404
|
|
return jsonify(result)
|
|
return _with_conns(_do)
|
|
|
|
|
|
@catalog_bp.route('/search', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def search():
|
|
# Search is available in both local and OEM modes
|
|
# — it reads from the master parts DB and enriches with local stock.
|
|
q = request.args.get('q', '').strip()
|
|
if not q or len(q) < 2:
|
|
return jsonify({'data': []})
|
|
limit = request.args.get('limit', 50, type=int)
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
def _do(master, tenant, branch_id):
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
data = catalog_service.smart_search(master, q, tenant, branch_id, limit, mye_id, tenant_id=g.tenant_id)
|
|
if allowed_brands:
|
|
data = _filter_parts_by_allowed_brands(master, data, allowed_brands)
|
|
return jsonify({'data': data, 'allowed_brands': allowed_brands or []})
|
|
return _with_conns(_do)
|
|
|
|
|
|
# ─── VIN Decoder ───
|
|
|
|
@catalog_bp.route('/vin/<vin>', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def decode_vin_route(vin):
|
|
"""Decode a VIN and try to match to a brand/model/year in our catalog DB."""
|
|
vin = (vin or "").strip().upper()
|
|
if len(vin) != 17:
|
|
return jsonify({'error': 'VIN debe tener exactamente 17 caracteres.'}), 400
|
|
|
|
try:
|
|
info = decode_vin(vin)
|
|
except Exception as e:
|
|
return jsonify({'error': f'Error al decodificar VIN: {str(e)}'}), 502
|
|
|
|
if info.get('error'):
|
|
return jsonify(info), 200 # Return info even with partial errors
|
|
|
|
# Try to match the decoded vehicle to our catalog DB
|
|
db_match = None
|
|
master = None
|
|
try:
|
|
master = get_master_conn()
|
|
db_match = _match_vin_to_catalog(master, info)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if master:
|
|
try:
|
|
master.close()
|
|
except Exception:
|
|
pass
|
|
|
|
result = {**info}
|
|
if db_match:
|
|
result['catalog_match'] = db_match
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
# ─── Plate Lookup ───
|
|
|
|
@catalog_bp.route('/plate/<plate>', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def plate_lookup(plate):
|
|
"""Look up a vehicle by Mexican license plate in the local plate_vehicles table.
|
|
If found, also tries to match the vehicle to the catalog DB.
|
|
"""
|
|
plate = (plate or '').strip()
|
|
if not plate:
|
|
return jsonify({'error': 'Placa requerida.'}), 400
|
|
|
|
if not is_valid_mexican_plate(plate):
|
|
return jsonify({'error': 'Formato de placa no valido. Ej: ABC-1234 o AB-123-C'}), 400
|
|
|
|
tenant = None
|
|
master = None
|
|
try:
|
|
tenant = get_tenant_conn(g.tenant_id)
|
|
result = search_plate(tenant, plate)
|
|
|
|
if not result:
|
|
return jsonify({
|
|
'found': False,
|
|
'plate': normalize_plate(plate),
|
|
'message': 'Placa no registrada.'
|
|
})
|
|
|
|
# Try to match to catalog
|
|
catalog_match = None
|
|
try:
|
|
master = get_master_conn()
|
|
catalog_match = _match_plate_to_catalog(master, result)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
if master:
|
|
try: master.close()
|
|
except: pass
|
|
master = None
|
|
|
|
response = {
|
|
'found': True,
|
|
'plate': result['plate'],
|
|
'make': result['make'],
|
|
'model': result['model'],
|
|
'year': result['year'],
|
|
'vin': result['vin'],
|
|
'customer_id': result['customer_id'],
|
|
}
|
|
if catalog_match:
|
|
response['catalog_match'] = catalog_match
|
|
|
|
return jsonify(response)
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
if tenant:
|
|
try: tenant.close()
|
|
except: pass
|
|
if master:
|
|
try: master.close()
|
|
except: pass
|
|
|
|
|
|
@catalog_bp.route('/plate', methods=['POST'])
|
|
@require_auth('catalog.view')
|
|
def plate_register():
|
|
"""Register or update a plate-to-vehicle mapping."""
|
|
data = request.get_json() or {}
|
|
plate = (data.get('plate') or '').strip()
|
|
if not plate:
|
|
return jsonify({'error': 'plate required'}), 400
|
|
|
|
if not is_valid_mexican_plate(plate):
|
|
return jsonify({'error': 'Formato de placa no valido.'}), 400
|
|
|
|
tenant = None
|
|
try:
|
|
tenant = get_tenant_conn(g.tenant_id)
|
|
rec_id = register_plate(
|
|
tenant, plate,
|
|
make=data.get('make'),
|
|
model=data.get('model'),
|
|
year=data.get('year'),
|
|
vin=data.get('vin'),
|
|
customer_id=data.get('customer_id'),
|
|
)
|
|
return jsonify({'id': rec_id, 'message': 'Placa registrada.'})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
if tenant:
|
|
try: tenant.close()
|
|
except: pass
|
|
|
|
|
|
def _match_plate_to_catalog(master_conn, plate_info):
|
|
"""Try to match plate vehicle info to the catalog DB (same logic as VIN)."""
|
|
return _match_vin_to_catalog(master_conn, {
|
|
'make': plate_info.get('make'),
|
|
'model': plate_info.get('model'),
|
|
'year': plate_info.get('year'),
|
|
})
|
|
|
|
|
|
def _match_vin_to_catalog(master_conn, vin_info):
|
|
"""Try to find brand_id, model_id, year_id, mye_id from decoded VIN info."""
|
|
make = (vin_info.get('make') or '').upper().strip()
|
|
model = (vin_info.get('model') or '').strip()
|
|
year = vin_info.get('year')
|
|
|
|
if not make:
|
|
return None
|
|
|
|
cur = master_conn.cursor()
|
|
result = {}
|
|
try:
|
|
# Find brand (try exact, then LIKE)
|
|
cur.execute(
|
|
"SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) = %s",
|
|
(make,)
|
|
)
|
|
brand_row = cur.fetchone()
|
|
if not brand_row:
|
|
cur.execute(
|
|
"SELECT id_brand, name_brand FROM brands WHERE UPPER(name_brand) LIKE %s ORDER BY name_brand LIMIT 1",
|
|
(f"%{make}%",)
|
|
)
|
|
brand_row = cur.fetchone()
|
|
|
|
if not brand_row:
|
|
return None
|
|
|
|
result['brand_id'] = brand_row[0]
|
|
result['brand_name'] = brand_row[1]
|
|
|
|
# Find model
|
|
if model:
|
|
cur.execute(
|
|
"""SELECT m.id_model, m.name_model
|
|
FROM models m
|
|
WHERE m.brand_id = %s AND UPPER(m.name_model) LIKE %s
|
|
ORDER BY m.name_model LIMIT 5""",
|
|
(brand_row[0], f"%{model.upper()}%")
|
|
)
|
|
model_row = cur.fetchone()
|
|
if model_row:
|
|
result['model_id'] = model_row[0]
|
|
result['model_name'] = model_row[1]
|
|
|
|
# Find year
|
|
if year:
|
|
cur.execute(
|
|
"SELECT id_year, year_car FROM years WHERE year_car = %s",
|
|
(int(year),)
|
|
)
|
|
year_row = cur.fetchone()
|
|
if year_row:
|
|
result['year_id'] = year_row[0]
|
|
result['year_car'] = year_row[1]
|
|
|
|
# Find MYE options
|
|
cur.execute(
|
|
"""SELECT mye.id_mye, e.name_engine, mye.trim_level
|
|
FROM model_year_engine mye
|
|
JOIN engines e ON e.id_engine = mye.engine_id
|
|
WHERE mye.model_id = %s AND mye.year_id = %s
|
|
ORDER BY e.name_engine
|
|
LIMIT 10""",
|
|
(model_row[0], year_row[0])
|
|
)
|
|
mye_rows = cur.fetchall()
|
|
if mye_rows:
|
|
result['engines'] = [
|
|
{'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2]}
|
|
for r in mye_rows
|
|
]
|
|
# Auto-select if only one engine
|
|
if len(mye_rows) == 1:
|
|
result['id_mye'] = mye_rows[0][0]
|
|
|
|
return result
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
cur.close()
|
|
|
|
|
|
# ─── Brand Catalog (vehicle-brand-first navigation) ───
|
|
|
|
@catalog_bp.route('/vehicle-brands', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def vehicle_brands():
|
|
"""Return North American vehicle brands for brand-first catalog browsing.
|
|
|
|
Uses the same OEM_BRANDS_NA filter as the regular catalog so that
|
|
the brand list is consistent across both navigation modes.
|
|
"""
|
|
from services.catalog_modes import get_brands_for_mode
|
|
allowed = list(get_brands_for_mode('oem'))
|
|
|
|
def _query(master):
|
|
cur = master.cursor()
|
|
try:
|
|
cur.execute("""
|
|
SELECT id_brand, name_brand
|
|
FROM brands
|
|
WHERE name_brand = ANY(%s)
|
|
ORDER BY name_brand ASC
|
|
""", (allowed,))
|
|
rows = cur.fetchall()
|
|
return jsonify({
|
|
'brands': [
|
|
{'id': r[0], 'name': r[1], 'part_count': 0}
|
|
for r in rows
|
|
]
|
|
})
|
|
finally:
|
|
cur.close()
|
|
return _master_only(_query)
|
|
|
|
|
|
@catalog_bp.route('/brand-categories', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def brand_categories():
|
|
"""Return part categories available for a given vehicle brand."""
|
|
brand = request.args.get('brand', '')
|
|
if not brand:
|
|
return jsonify({'error': 'brand parameter required'}), 400
|
|
|
|
def _query(master, tenant, branch_id):
|
|
cur = master.cursor()
|
|
try:
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
brand_filter = ""
|
|
params = [brand]
|
|
if allowed_brands:
|
|
brand_filter = """AND EXISTS (
|
|
SELECT 1 FROM aftermarket_parts ap2
|
|
JOIN manufacturers m2 ON m2.id_manufacture = ap2.manufacturer_id
|
|
WHERE ap2.oem_part_id = p.id_part AND UPPER(m2.name_manufacture) = ANY(%s)
|
|
)"""
|
|
params.append(allowed_brands)
|
|
cur.execute(f"""
|
|
SELECT pc.id_part_category,
|
|
COALESCE(NULLIF(pc.name_es, ''), pc.name_part_category) as name,
|
|
pc.slug,
|
|
COUNT(DISTINCT p.id_part) as part_count
|
|
FROM part_vehicle_preview pvp
|
|
JOIN parts p ON p.id_part = pvp.part_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE pvp.name_brand = %s
|
|
{brand_filter}
|
|
GROUP BY pc.id_part_category, pc.name_part_category, pc.name_es, pc.slug
|
|
ORDER BY part_count DESC
|
|
""", params)
|
|
rows = cur.fetchall()
|
|
return jsonify({
|
|
'brand': brand,
|
|
'categories': [
|
|
{'id': r[0], 'name': r[1], 'slug': r[2], 'part_count': r[3]}
|
|
for r in rows
|
|
],
|
|
'allowed_brands': allowed_brands or []
|
|
})
|
|
finally:
|
|
cur.close()
|
|
return _with_conns(_query)
|
|
|
|
|
|
@catalog_bp.route('/brand-parts', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def brand_parts():
|
|
"""Return parts for a given vehicle brand + category, optionally filtered by search term."""
|
|
brand = request.args.get('brand', '')
|
|
category_id = request.args.get('category_id', type=int)
|
|
search = request.args.get('search', '').strip()
|
|
limit = request.args.get('limit', 50, type=int)
|
|
offset = request.args.get('offset', 0, type=int)
|
|
|
|
if not brand:
|
|
return jsonify({'error': 'brand parameter required'}), 400
|
|
|
|
def _query(master, tenant, branch_id):
|
|
cur = master.cursor()
|
|
try:
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
|
|
cat_filter = ""
|
|
search_filter = ""
|
|
params = [brand]
|
|
|
|
if category_id:
|
|
cat_filter = "AND pc.id_part_category = %s"
|
|
params.append(category_id)
|
|
|
|
# --- Brand-filtered mode: return aftermarket parts directly ---
|
|
if allowed_brands:
|
|
am_search = ""
|
|
am_params = list(params)
|
|
if search:
|
|
am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)"
|
|
like_term = f"%{search}%"
|
|
am_params.extend([like_term, like_term])
|
|
|
|
query_params = list(am_params)
|
|
cur.execute(f"""
|
|
SELECT DISTINCT ap.id_aftermarket_parts,
|
|
ap.part_number,
|
|
COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name,
|
|
m.name_manufacture,
|
|
ap.price_usd,
|
|
p.id_part,
|
|
pg.id_part_group, pg.name_part_group,
|
|
pc.id_part_category, pc.name_part_category
|
|
FROM part_vehicle_preview pvp
|
|
JOIN parts p ON p.id_part = pvp.part_id
|
|
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
|
|
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE pvp.name_brand = %s
|
|
{cat_filter}
|
|
{am_search}
|
|
AND UPPER(m.name_manufacture) = ANY(%s)
|
|
ORDER BY m.name_manufacture, ap.part_number
|
|
LIMIT %s OFFSET %s
|
|
""", query_params + [allowed_brands, limit, offset])
|
|
|
|
part_rows = cur.fetchall()
|
|
oem_ids = [r[5] for r in part_rows]
|
|
|
|
count_params = list(am_params)
|
|
cur.execute(f"""
|
|
SELECT COUNT(DISTINCT ap.id_aftermarket_parts)
|
|
FROM part_vehicle_preview pvp
|
|
JOIN parts p ON p.id_part = pvp.part_id
|
|
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
|
|
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE pvp.name_brand = %s
|
|
{cat_filter}
|
|
{am_search}
|
|
AND UPPER(m.name_manufacture) = ANY(%s)
|
|
""", count_params + [allowed_brands])
|
|
total = cur.fetchone()[0]
|
|
|
|
local_stock = {}
|
|
if tenant and oem_ids:
|
|
try:
|
|
from services.catalog_service import _get_local_stock_bulk
|
|
local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, [])
|
|
except Exception:
|
|
pass
|
|
|
|
items = []
|
|
for r in part_rows:
|
|
oem_id = r[5]
|
|
stock_info = local_stock.get(oem_id, {})
|
|
items.append({
|
|
'id': r[0],
|
|
'oem_part_number': r[1],
|
|
'name': r[2],
|
|
'manufacturer': r[3],
|
|
'price_usd': float(r[4]) if r[4] is not None else None,
|
|
'oem_id': oem_id,
|
|
'group': {'id': r[6], 'name': r[7]},
|
|
'category': {'id': r[8], 'name': r[9]},
|
|
'local_stock': stock_info.get('stock', 0),
|
|
'local_price': stock_info.get('price', None),
|
|
})
|
|
|
|
return jsonify({
|
|
'brand': brand,
|
|
'category_id': category_id,
|
|
'search': search,
|
|
'items': items,
|
|
'total': total,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'allowed_brands': allowed_brands
|
|
})
|
|
|
|
# --- Normal mode: return OEM parts ---
|
|
if search:
|
|
search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)"
|
|
like_term = f"%{search}%"
|
|
params.extend([like_term, like_term])
|
|
|
|
query_params = list(params)
|
|
cur.execute(f"""
|
|
SELECT DISTINCT p.id_part, p.oem_part_number,
|
|
COALESCE(NULLIF(p.name_es, ''), p.name_part) as name,
|
|
pg.id_part_group, pg.name_part_group,
|
|
pc.id_part_category, pc.name_part_category
|
|
FROM part_vehicle_preview pvp
|
|
JOIN parts p ON p.id_part = pvp.part_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE pvp.name_brand = %s
|
|
{cat_filter}
|
|
{search_filter}
|
|
ORDER BY p.id_part
|
|
LIMIT %s OFFSET %s
|
|
""", query_params + [limit, offset])
|
|
|
|
part_rows = cur.fetchall()
|
|
part_ids = [r[0] for r in part_rows]
|
|
|
|
count_params = list(params)
|
|
cur.execute(f"""
|
|
SELECT COUNT(DISTINCT p.id_part)
|
|
FROM part_vehicle_preview pvp
|
|
JOIN parts p ON p.id_part = pvp.part_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE pvp.name_brand = %s
|
|
{cat_filter}
|
|
{search_filter}
|
|
""", count_params)
|
|
total = cur.fetchone()[0]
|
|
|
|
local_stock = {}
|
|
if tenant and part_ids:
|
|
try:
|
|
from services.catalog_service import _get_local_stock_bulk
|
|
local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids)
|
|
except Exception:
|
|
pass
|
|
|
|
items = []
|
|
for r in part_rows:
|
|
part_id = r[0]
|
|
stock_info = local_stock.get(part_id, {})
|
|
items.append({
|
|
'id': part_id,
|
|
'oem_part_number': r[1],
|
|
'name': r[2],
|
|
'group': {'id': r[3], 'name': r[4]},
|
|
'category': {'id': r[5], 'name': r[6]},
|
|
'local_stock': stock_info.get('stock', 0),
|
|
'local_price': stock_info.get('price', None),
|
|
})
|
|
|
|
return jsonify({
|
|
'brand': brand,
|
|
'category_id': category_id,
|
|
'search': search,
|
|
'items': items,
|
|
'total': total,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'allowed_brands': []
|
|
})
|
|
finally:
|
|
cur.close()
|
|
return _with_conns(_query)
|
|
|
|
|
|
@catalog_bp.route('/mye-parts', methods=['GET'])
|
|
@require_auth('catalog.view')
|
|
def mye_parts():
|
|
"""Return parts for a specific MYE + category (brand-catalog flow).
|
|
|
|
Skips the group/subgroup level and goes directly from category to parts.
|
|
"""
|
|
mye_id = request.args.get('mye_id', type=int)
|
|
category_id = request.args.get('category_id', type=int)
|
|
search = request.args.get('search', '').strip()
|
|
limit = request.args.get('limit', 50, type=int)
|
|
offset = request.args.get('offset', 0, type=int)
|
|
|
|
if not mye_id:
|
|
return jsonify({'error': 'mye_id required'}), 400
|
|
|
|
def _query(master, tenant, branch_id):
|
|
cur = master.cursor()
|
|
try:
|
|
allowed_brands = _get_allowed_brands(tenant) if tenant else None
|
|
|
|
cat_filter = ""
|
|
search_filter = ""
|
|
params = [mye_id]
|
|
|
|
if category_id:
|
|
cat_filter = "AND pc.id_part_category = %s"
|
|
params.append(category_id)
|
|
|
|
# --- Brand-filtered mode: return aftermarket parts directly ---
|
|
if allowed_brands:
|
|
am_search = ""
|
|
am_params = list(params)
|
|
if search:
|
|
am_search = "AND (ap.part_number ILIKE %s OR COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) ILIKE %s)"
|
|
like_term = f"%{search}%"
|
|
am_params.extend([like_term, like_term])
|
|
|
|
# Get aftermarket parts
|
|
query_params = list(am_params)
|
|
cur.execute(f"""
|
|
SELECT DISTINCT ap.id_aftermarket_parts,
|
|
ap.part_number,
|
|
COALESCE(NULLIF(ap.name_aftermarket_parts, ''), p.name_part) as name,
|
|
m.name_manufacture,
|
|
ap.price_usd,
|
|
p.id_part,
|
|
pg.id_part_group, pg.name_part_group,
|
|
pc.id_part_category, pc.name_part_category
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON p.id_part = vp.part_id
|
|
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
|
|
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE vp.model_year_engine_id = %s
|
|
{cat_filter}
|
|
{am_search}
|
|
AND UPPER(m.name_manufacture) = ANY(%s)
|
|
ORDER BY m.name_manufacture, ap.part_number
|
|
LIMIT %s OFFSET %s
|
|
""", query_params + [allowed_brands, limit, offset])
|
|
|
|
part_rows = cur.fetchall()
|
|
oem_ids = [r[5] for r in part_rows]
|
|
|
|
# Count total
|
|
count_params = list(am_params)
|
|
cur.execute(f"""
|
|
SELECT COUNT(DISTINCT ap.id_aftermarket_parts)
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON p.id_part = vp.part_id
|
|
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
|
|
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE vp.model_year_engine_id = %s
|
|
{cat_filter}
|
|
{am_search}
|
|
AND UPPER(m.name_manufacture) = ANY(%s)
|
|
""", count_params + [allowed_brands])
|
|
total = cur.fetchone()[0]
|
|
|
|
# Local stock keyed by OEM part id
|
|
local_stock = {}
|
|
if tenant and oem_ids:
|
|
try:
|
|
from services.catalog_service import _get_local_stock_bulk
|
|
local_stock = _get_local_stock_bulk(tenant, branch_id, oem_ids, [])
|
|
except Exception:
|
|
pass
|
|
|
|
items = []
|
|
for r in part_rows:
|
|
oem_id = r[5]
|
|
stock_info = local_stock.get(oem_id, {})
|
|
items.append({
|
|
'id': r[0],
|
|
'oem_part_number': r[1],
|
|
'name': r[2],
|
|
'manufacturer': r[3],
|
|
'price_usd': float(r[4]) if r[4] is not None else None,
|
|
'oem_id': oem_id,
|
|
'group': {'id': r[6], 'name': r[7]},
|
|
'category': {'id': r[8], 'name': r[9]},
|
|
'local_stock': stock_info.get('stock', 0),
|
|
'local_price': stock_info.get('price', None),
|
|
})
|
|
|
|
return jsonify({
|
|
'mye_id': mye_id,
|
|
'category_id': category_id,
|
|
'search': search,
|
|
'items': items,
|
|
'total': total,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'allowed_brands': allowed_brands
|
|
})
|
|
|
|
# --- Normal mode: return OEM parts ---
|
|
if search:
|
|
search_filter = "AND (p.oem_part_number ILIKE %s OR COALESCE(NULLIF(p.name_es, ''), p.name_part) ILIKE %s)"
|
|
like_term = f"%{search}%"
|
|
params.extend([like_term, like_term])
|
|
|
|
query_params = list(params)
|
|
cur.execute(f"""
|
|
SELECT DISTINCT p.id_part, p.oem_part_number,
|
|
COALESCE(NULLIF(p.name_es, ''), p.name_part) as name,
|
|
pg.id_part_group, pg.name_part_group,
|
|
pc.id_part_category, pc.name_part_category
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON p.id_part = vp.part_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE vp.model_year_engine_id = %s
|
|
{cat_filter}
|
|
{search_filter}
|
|
ORDER BY p.id_part
|
|
LIMIT %s OFFSET %s
|
|
""", query_params + [limit, offset])
|
|
|
|
part_rows = cur.fetchall()
|
|
part_ids = [r[0] for r in part_rows]
|
|
|
|
count_params = list(params)
|
|
cur.execute(f"""
|
|
SELECT COUNT(DISTINCT p.id_part)
|
|
FROM vehicle_parts vp
|
|
JOIN parts p ON p.id_part = vp.part_id
|
|
JOIN part_groups pg ON pg.id_part_group = p.group_id
|
|
JOIN part_categories pc ON pc.id_part_category = pg.category_id
|
|
WHERE vp.model_year_engine_id = %s
|
|
{cat_filter}
|
|
{search_filter}
|
|
""", count_params)
|
|
total = cur.fetchone()[0]
|
|
|
|
local_stock = {}
|
|
if tenant and part_ids:
|
|
try:
|
|
from services.catalog_service import _get_local_stock_bulk
|
|
local_stock = _get_local_stock_bulk(tenant, branch_id, [], part_ids)
|
|
except Exception:
|
|
pass
|
|
|
|
items = []
|
|
for r in part_rows:
|
|
part_id = r[0]
|
|
stock_info = local_stock.get(part_id, {})
|
|
items.append({
|
|
'id': part_id,
|
|
'oem_part_number': r[1],
|
|
'name': r[2],
|
|
'group': {'id': r[3], 'name': r[4]},
|
|
'category': {'id': r[5], 'name': r[6]},
|
|
'local_stock': stock_info.get('stock', 0),
|
|
'local_price': stock_info.get('price', None),
|
|
})
|
|
|
|
return jsonify({
|
|
'mye_id': mye_id,
|
|
'category_id': category_id,
|
|
'search': search,
|
|
'items': items,
|
|
'total': total,
|
|
'limit': limit,
|
|
'offset': offset,
|
|
'allowed_brands': []
|
|
})
|
|
finally:
|
|
cur.close()
|
|
return _with_conns(_query)
|