# /home/Autopartes/pos/services/catalog_service.py """Catalog service: queries nexus_autoparts (TecDoc) catalog with local stock enrichment. All functions receive database connections as parameters. This module NEVER imports tenant_db — the caller passes connections. PERFORMANCE: vehicle_parts has 14B+ rows. Every query MUST: - Filter by model_year_engine_id (indexed) - Use LIMIT - Use EXISTS instead of COUNT(*) on vehicle_parts where possible """ import re # ───────────────────────────────────────────────────────────────────────────── # VEHICLE HIERARCHY NAVIGATION # ───────────────────────────────────────────────────────────────────────────── NORTH_AMERICA_BRANDS = ( 'ACURA', 'AUDI', 'BMW', 'BUICK', 'CADILLAC', 'CHEVROLET', 'CHRYSLER', 'DODGE', 'FIAT', 'FORD', 'GMC', 'HONDA', 'HYUNDAI', 'INFINITI', 'JAGUAR', 'JEEP', 'KIA', 'LAND ROVER', 'LEXUS', 'LINCOLN', 'MAZDA', 'MERCEDES-BENZ', 'MINI', 'MITSUBISHI', 'NISSAN', 'PEUGEOT', 'PORSCHE', 'RAM', 'RENAULT', 'SEAT', 'SUBARU', 'SUZUKI', 'TESLA', 'TOYOTA', 'VOLVO', 'VW', ) def get_brands(master_conn): """Get vehicle brands available in Mexico/USA/Canada that have MYE entries.""" cur = master_conn.cursor() cur.execute(""" SELECT DISTINCT b.id_brand, b.name_brand FROM brands b JOIN models m ON m.brand_id = b.id_brand JOIN model_year_engine mye ON mye.model_id = m.id_model WHERE b.name_brand = ANY(%s) ORDER BY b.name_brand """, (list(NORTH_AMERICA_BRANDS),)) rows = cur.fetchall() cur.close() return [{'id_brand': r[0], 'name_brand': r[1]} for r in rows] def get_models(master_conn, brand_id): """Get models for a brand that have MYE entries (fast, no vehicle_parts scan).""" cur = master_conn.cursor() cur.execute(""" SELECT DISTINCT m.id_model, m.name_model FROM models m JOIN model_year_engine mye ON mye.model_id = m.id_model WHERE m.brand_id = %s ORDER BY m.name_model """, (brand_id,)) rows = cur.fetchall() cur.close() return [{'id_model': r[0], 'name_model': r[1]} for r in rows] def get_years(master_conn, model_id): """Get distinct years for a model via MYE (fast, no vehicle_parts scan). Ordered DESC.""" cur = master_conn.cursor() cur.execute(""" SELECT DISTINCT y.id_year, y.year_car FROM years y JOIN model_year_engine mye ON mye.year_id = y.id_year WHERE mye.model_id = %s ORDER BY y.year_car DESC """, (model_id,)) rows = cur.fetchall() cur.close() return [{'id_year': r[0], 'year_car': r[1]} for r in rows] def get_engines(master_conn, model_id, year_id): """Get MYE entries (engine + trim) for a model+year combo.""" cur = master_conn.cursor() cur.execute(""" SELECT mye.id_mye, e.name_engine, mye.trim_level FROM model_year_engine mye JOIN engines e ON e.id_engine = mye.engine_id WHERE mye.model_id = %s AND mye.year_id = %s ORDER BY e.name_engine, mye.trim_level """, (model_id, year_id)) rows = cur.fetchall() cur.close() return [{'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2] or ''} for r in rows] def get_categories(master_conn, mye_id): """Get part categories that have parts for this vehicle (mye_id). Uses a subquery on vehicle_parts filtered by mye_id (indexed), then JOINs through parts -> part_groups -> part_categories. Uses COUNT with a safety LIMIT on the subquery. """ cur = master_conn.cursor() cur.execute(""" SELECT pc.id_part_category, COALESCE(pc.name_es, pc.name_part_category) AS name, sub.cnt FROM ( SELECT pg.category_id, COUNT(*) AS cnt FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id WHERE vp.model_year_engine_id = %s GROUP BY pg.category_id ) sub JOIN part_categories pc ON pc.id_part_category = sub.category_id ORDER BY name """, (mye_id,)) rows = cur.fetchall() cur.close() return [{'id_part_category': r[0], 'name': r[1], 'part_count': r[2]} for r in rows] def get_groups(master_conn, mye_id, category_id): """Get part groups (subcategories) for this vehicle + category, with part counts.""" cur = master_conn.cursor() cur.execute(""" SELECT pg.id_part_group, COALESCE(pg.name_es, pg.name_part_group) AS name, COUNT(*) AS cnt FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id JOIN part_groups pg ON pg.id_part_group = p.group_id WHERE vp.model_year_engine_id = %s AND pg.category_id = %s GROUP BY pg.id_part_group, name ORDER BY name """, (mye_id, category_id)) rows = cur.fetchall() cur.close() return [{'id_part_group': r[0], 'name': r[1], 'part_count': r[2]} for r in rows] # ───────────────────────────────────────────────────────────────────────────── # PARTS LIST + DETAIL (with stock enrichment) # ───────────────────────────────────────────────────────────────────────────── def get_parts(master_conn, mye_id, group_id, tenant_conn, branch_id, page=1, per_page=30): """Get parts for a vehicle + part group, enriched with local stock + bodega indicator. 1. Fetch parts from nexus_autoparts (vehicle_parts + parts) — paginated 2. For each OEM number, look up tenant inventory for local stock 3. For each part_id, check warehouse_inventory for bodega availability Returns: {data: [...], pagination: {...}} """ per_page = min(per_page, 100) offset = (page - 1) * per_page cur = master_conn.cursor() # Count total (bounded — uses indexed mye_id + group_id join) cur.execute(""" SELECT COUNT(*) FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id WHERE vp.model_year_engine_id = %s AND p.group_id = %s """, (mye_id, group_id)) total = cur.fetchone()[0] # Fetch page of parts cur.execute(""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, p.description, p.description_es, p.image_url FROM vehicle_parts vp JOIN parts p ON p.id_part = vp.part_id WHERE vp.model_year_engine_id = %s AND p.group_id = %s ORDER BY p.name_part LIMIT %s OFFSET %s """, (mye_id, group_id, per_page, offset)) rows = cur.fetchall() if not rows: cur.close() return {'data': [], 'pagination': _pagination(page, per_page, total)} part_ids = [r[0] for r in rows] oem_numbers = [r[1] for r in rows] # Bodega availability: count distinct bodegas with stock > 0 per part cur.execute(""" SELECT part_id, COUNT(*) AS bodega_count FROM warehouse_inventory WHERE part_id = ANY(%s) AND stock_quantity > 0 GROUP BY part_id """, (part_ids,)) bodega_map = {r[0]: r[1] for r in cur.fetchall()} cur.close() # Local stock enrichment from tenant DB local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids) items = [] for r in rows: part_id = r[0] oem = r[1] local = local_map.get(oem) or local_map.get(f'cat:{part_id}') items.append({ 'id_part': part_id, 'oem_part_number': oem, 'name': r[3] or r[2], # prefer Spanish name 'description': r[5] or r[4], 'image_url': r[6], 'local_stock': local['stock'] if local else 0, 'local_price': local['price_1'] if local else None, 'bodega_count': bodega_map.get(part_id, 0), }) return {'data': items, 'pagination': _pagination(page, per_page, total)} def get_part_detail(master_conn, part_id, tenant_conn, branch_id): """Get full detail for a single part: catalog info, local stock, bodegas, alternatives. Returns: { part: {id, oem, name, description, image_url, group_name, category_name}, local: {stock, price_1, price_2, price_3, cost, inventory_id} | null, bodegas: [{business_name, price, stock, location}], alternatives: [{part_number, manufacturer, name, type, local_stock, bodega_count}] } """ cur = master_conn.cursor() # Part info with group + category names cur.execute(""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, p.description, p.description_es, p.image_url, COALESCE(pg.name_es, pg.name_part_group) AS group_name, COALESCE(pc.name_es, pc.name_part_category) AS category_name FROM parts p LEFT JOIN part_groups pg ON pg.id_part_group = p.group_id LEFT JOIN part_categories pc ON pc.id_part_category = pg.category_id WHERE p.id_part = %s """, (part_id,)) row = cur.fetchone() if not row: cur.close() return None oem = row[1] part_info = { 'id_part': row[0], 'oem_part_number': oem, 'name': row[3] or row[2], 'description': row[5] or row[4], 'image_url': row[6], 'group_name': row[7], 'category_name': row[8], } # Bodegas with stock cur.execute(""" SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location FROM warehouse_inventory wi JOIN users u ON u.id_user = wi.user_id WHERE wi.part_id = %s AND wi.stock_quantity > 0 ORDER BY wi.price ASC LIMIT 20 """, (part_id,)) bodegas = [ {'business_name': r[0], 'price': float(r[1]) if r[1] else None, 'stock': r[2], 'location': r[3]} for r in cur.fetchall() ] # Alternatives: cross-references + aftermarket alternatives = _get_alternatives(cur, part_id) cur.close() # Local stock local = _get_local_stock_single(tenant_conn, branch_id, oem, part_id) # Enrich alternatives with local stock + bodega count if alternatives: alt_oems = [a['part_number'] for a in alternatives] alt_local = _get_local_stock_bulk(tenant_conn, branch_id, alt_oems, []) cur2 = master_conn.cursor() # Find part_ids for cross-ref numbers to check bodega stock cur2.execute(""" SELECT oem_part_number, id_part FROM parts WHERE oem_part_number = ANY(%s) """, (alt_oems,)) oem_to_part = {r[0]: r[1] for r in cur2.fetchall()} alt_part_ids = [pid for pid in oem_to_part.values() if pid] bodega_map = {} if alt_part_ids: cur2.execute(""" SELECT part_id, COUNT(*) FROM warehouse_inventory WHERE part_id = ANY(%s) AND stock_quantity > 0 GROUP BY part_id """, (alt_part_ids,)) bodega_map = {r[0]: r[1] for r in cur2.fetchall()} cur2.close() for a in alternatives: l = alt_local.get(a['part_number']) a['local_stock'] = l['stock'] if l else 0 pid = oem_to_part.get(a['part_number']) a['bodega_count'] = bodega_map.get(pid, 0) if pid else 0 return { 'part': part_info, 'local': local, 'bodegas': bodegas, 'alternatives': alternatives, } def _get_alternatives(cur, part_id): """Get cross-references + aftermarket parts for a given OEM part.""" results = [] # Cross-references (other OEM numbers that reference this part) cur.execute(""" SELECT pcr.cross_reference_number, pcr.source_ref FROM part_cross_references pcr WHERE pcr.part_id = %s LIMIT 50 """, (part_id,)) for r in cur.fetchall(): results.append({ 'part_number': r[0], 'manufacturer': r[1] or 'OEM Cross-Ref', 'name': None, 'type': 'cross_reference', 'local_stock': 0, 'bodega_count': 0, }) # Aftermarket alternatives cur.execute(""" SELECT ap.part_number, m.name_manufacture, COALESCE(ap.name_es, ap.name_aftermarket_parts) AS name FROM aftermarket_parts ap JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id WHERE ap.oem_part_id = %s LIMIT 50 """, (part_id,)) for r in cur.fetchall(): results.append({ 'part_number': r[0], 'manufacturer': r[1], 'name': r[2], 'type': 'aftermarket', 'local_stock': 0, 'bodega_count': 0, }) return results # ───────────────────────────────────────────────────────────────────────────── # SMART SEARCH # ───────────────────────────────────────────────────────────────────────────── def smart_search(master_conn, q, tenant_conn, branch_id, limit=50): """Search parts by part number or text. Enriches with local stock. Strategy: - If q looks like a part number (contains digits + hyphens): search oem_part_number ILIKE - If q is text: use PostgreSQL full-text search (search_vector) with ILIKE fallback - Always enriches results with local stock from tenant DB """ q = q.strip() if not q or len(q) < 2: return [] limit = min(limit, 100) cur = master_conn.cursor() is_part_number = bool(re.search(r'\d.*[-/]|[-/].*\d|\d{3,}', q)) if is_part_number: # Search by OEM part number clean_q = q.replace(' ', '').upper() cur.execute(""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, p.image_url, p.group_id FROM parts p WHERE REPLACE(UPPER(p.oem_part_number), ' ', '') LIKE %s ORDER BY p.oem_part_number LIMIT %s """, (f'%{clean_q}%', limit)) else: # Full-text search using tsvector, fall back to ILIKE tsquery = ' & '.join(q.split()) cur.execute(""" SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, p.image_url, p.group_id FROM parts p WHERE p.search_vector @@ to_tsquery('spanish', %s) OR p.name_part ILIKE %s OR p.name_es ILIKE %s ORDER BY CASE WHEN p.search_vector @@ to_tsquery('spanish', %s) THEN 0 ELSE 1 END, p.name_part LIMIT %s """, (tsquery, f'%{q}%', f'%{q}%', tsquery, limit)) rows = cur.fetchall() if not rows: cur.close() return [] part_ids = [r[0] for r in rows] oem_numbers = [r[1] for r in rows] # Get vehicle info for each part (first match only) vehicle_info_map = {} cur.execute(""" SELECT DISTINCT ON (vp.part_id) vp.part_id, b.name_brand, m.name_model, y.year_car FROM vehicle_parts vp JOIN model_year_engine mye ON mye.id_mye = vp.model_year_engine_id JOIN models m ON m.id_model = mye.model_id JOIN brands b ON b.id_brand = m.brand_id JOIN years y ON y.id_year = mye.year_id WHERE vp.part_id = ANY(%s) ORDER BY vp.part_id, y.year_car DESC """, (part_ids,)) for r in cur.fetchall(): vehicle_info_map[r[0]] = f"{r[1]} {r[2]} {r[3]}" cur.close() # Local stock enrichment local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids) results = [] for r in rows: part_id = r[0] oem = r[1] local = local_map.get(oem) or local_map.get(f'cat:{part_id}') results.append({ 'id_part': part_id, 'oem_part_number': oem, 'name': r[3] or r[2], 'image_url': r[4], 'local_stock': local['stock'] if local else 0, 'local_price': local['price_1'] if local else None, 'vehicle_info': vehicle_info_map.get(part_id, ''), }) return results # ───────────────────────────────────────────────────────────────────────────── # LOCAL STOCK HELPERS # ───────────────────────────────────────────────────────────────────────────── def _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, catalog_part_ids): """Look up tenant inventory for a batch of OEM numbers / catalog part IDs. Returns: dict keyed by oem_number (or 'cat:{id}') -> {stock, price_1, ...} Matches by: part_number = oem_number OR catalog_part_id = id """ if not oem_numbers and not catalog_part_ids: return {} cur = tenant_conn.cursor() conditions = [] params = [] if oem_numbers: conditions.append("i.part_number = ANY(%s)") params.append(oem_numbers) if catalog_part_ids: conditions.append("i.catalog_part_id = ANY(%s)") params.append(catalog_part_ids) where = " OR ".join(conditions) branch_filter = "" if branch_id: branch_filter = " AND i.branch_id = %s" params.append(branch_id) cur.execute(f""" SELECT i.id, i.part_number, i.catalog_part_id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, COALESCE(SUM(io.quantity), 0) AS stock FROM inventory i LEFT JOIN inventory_operations io ON io.inventory_id = i.id WHERE ({where}) AND i.is_active = true{branch_filter} GROUP BY i.id """, params) result = {} for r in cur.fetchall(): entry = { 'inventory_id': r[0], 'part_number': r[1], 'catalog_part_id': r[2], 'price_1': float(r[3]) if r[3] else 0, 'price_2': float(r[4]) if r[4] else 0, 'price_3': float(r[5]) if r[5] else 0, 'cost': float(r[6]) if r[6] else 0, 'tax_rate': float(r[7]) if r[7] else 0.16, 'stock': r[8], } if r[1]: result[r[1]] = entry if r[2]: result[f'cat:{r[2]}'] = entry cur.close() return result def _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id): """Look up a single part in the tenant inventory. Returns dict or None.""" cur = tenant_conn.cursor() branch_filter = "" params = [oem_part_number, catalog_part_id] if branch_id: branch_filter = " AND i.branch_id = %s" params.append(branch_id) cur.execute(f""" SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, i.location, i.unit, i.barcode, COALESCE(SUM(io.quantity), 0) AS stock FROM inventory i LEFT JOIN inventory_operations io ON io.inventory_id = i.id WHERE (i.part_number = %s OR i.catalog_part_id = %s) AND i.is_active = true{branch_filter} GROUP BY i.id LIMIT 1 """, params) row = cur.fetchone() cur.close() if not row: return None return { 'inventory_id': row[0], 'price_1': float(row[1]) if row[1] else 0, 'price_2': float(row[2]) if row[2] else 0, 'price_3': float(row[3]) if row[3] else 0, 'cost': float(row[4]) if row[4] else 0, 'tax_rate': float(row[5]) if row[5] else 0.16, 'location': row[6], 'unit': row[7] or 'PZA', 'barcode': row[8], 'stock': row[9], } # ───────────────────────────────────────────────────────────────────────────── # PUBLIC WRAPPERS (for direct use by callers) # ───────────────────────────────────────────────────────────────────────────── def get_local_stock(tenant_conn, oem_part_number, catalog_part_id, branch_id=None): """Public wrapper: look up a single part in the tenant inventory.""" return _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id) def get_bodega_availability(master_conn, part_id): """Check warehouse_inventory for a part. Returns list of bodegas with stock.""" cur = master_conn.cursor() cur.execute(""" SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location FROM warehouse_inventory wi JOIN users u ON u.id_user = wi.user_id WHERE wi.part_id = %s AND wi.stock_quantity > 0 ORDER BY wi.price ASC LIMIT 20 """, (part_id,)) rows = cur.fetchall() cur.close() return [ {'business_name': r[0], 'price': float(r[1]) if r[1] else None, 'stock': r[2], 'location': r[3]} for r in rows ] def get_alternatives(master_conn, part_id): """Public wrapper: get cross-references + aftermarket parts for a given OEM part.""" cur = master_conn.cursor() results = _get_alternatives(cur, part_id) cur.close() return results # ───────────────────────────────────────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────────────────────────────────────── def _pagination(page, per_page, total): """Build standard pagination dict.""" total_pages = max(1, (total + per_page - 1) // per_page) return { 'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages, }