From e88988f47850de5c4d30e36f0045bc3b53b7ae43 Mon Sep 17 00:00:00 2001 From: consultoria-as Date: Wed, 1 Apr 2026 21:41:35 +0000 Subject: [PATCH] =?UTF-8?q?feat(pos):=20add=20catalog=20service=20?= =?UTF-8?q?=E2=80=94=20TecDoc=20hierarchy=20+=20stock=20enrichment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- pos/services/catalog_service.py | 624 ++++++++++++++++++++++++++++++++ 1 file changed, 624 insertions(+) create mode 100644 pos/services/catalog_service.py diff --git a/pos/services/catalog_service.py b/pos/services/catalog_service.py new file mode 100644 index 0000000..b5ed534 --- /dev/null +++ b/pos/services/catalog_service.py @@ -0,0 +1,624 @@ +# /home/Autopartes/pos/services/catalog_service.py +"""Catalog service: queries nexus_autoparts (TecDoc) catalog with local stock enrichment. + +All functions receive database connections as parameters. +This module NEVER imports tenant_db — the caller passes connections. + +PERFORMANCE: vehicle_parts has 14B+ rows. Every query MUST: + - Filter by model_year_engine_id (indexed) + - Use LIMIT + - Use EXISTS instead of COUNT(*) on vehicle_parts where possible +""" + +import re + + +# ───────────────────────────────────────────────────────────────────────────── +# VEHICLE HIERARCHY NAVIGATION +# ───────────────────────────────────────────────────────────────────────────── + +def get_brands(master_conn): + """Get all vehicle brands that have at least one part in the catalog. + + Uses EXISTS on model_year_engine + vehicle_parts to avoid scanning + vehicle_parts fully. The subquery stops at the first match per brand. + """ + cur = master_conn.cursor() + cur.execute(""" + SELECT DISTINCT b.id_brand, b.name_brand + FROM brands b + WHERE EXISTS ( + SELECT 1 + FROM models m + JOIN model_year_engine mye ON mye.model_id = m.id_model + JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye + WHERE m.brand_id = b.id_brand + LIMIT 1 + ) + ORDER BY b.name_brand + """) + rows = cur.fetchall() + cur.close() + return [{'id_brand': r[0], 'name_brand': r[1]} for r in rows] + + +def get_models(master_conn, brand_id): + """Get models for a brand that have at least one MYE with parts.""" + cur = master_conn.cursor() + cur.execute(""" + SELECT DISTINCT m.id_model, m.name_model + FROM models m + WHERE m.brand_id = %s + AND EXISTS ( + SELECT 1 + FROM model_year_engine mye + JOIN vehicle_parts vp ON vp.model_year_engine_id = mye.id_mye + WHERE mye.model_id = m.id_model + LIMIT 1 + ) + ORDER BY m.name_model + """, (brand_id,)) + rows = cur.fetchall() + cur.close() + return [{'id_model': r[0], 'name_model': r[1]} for r in rows] + + +def get_years(master_conn, model_id): + """Get distinct years for a model (via MYE) that have parts. Ordered DESC.""" + cur = master_conn.cursor() + cur.execute(""" + SELECT DISTINCT y.id_year, y.year_car + FROM years y + JOIN model_year_engine mye ON mye.year_id = y.id_year + WHERE mye.model_id = %s + AND EXISTS ( + SELECT 1 + FROM vehicle_parts vp + WHERE vp.model_year_engine_id = mye.id_mye + LIMIT 1 + ) + ORDER BY y.year_car DESC + """, (model_id,)) + rows = cur.fetchall() + cur.close() + return [{'id_year': r[0], 'year_car': r[1]} for r in rows] + + +def get_engines(master_conn, model_id, year_id): + """Get MYE entries (engine + trim) for a model+year combo that have parts.""" + cur = master_conn.cursor() + cur.execute(""" + SELECT mye.id_mye, e.name_engine, mye.trim_level + FROM model_year_engine mye + JOIN engines e ON e.id_engine = mye.engine_id + WHERE mye.model_id = %s AND mye.year_id = %s + AND EXISTS ( + SELECT 1 + FROM vehicle_parts vp + WHERE vp.model_year_engine_id = mye.id_mye + LIMIT 1 + ) + ORDER BY e.name_engine, mye.trim_level + """, (model_id, year_id)) + rows = cur.fetchall() + cur.close() + return [{'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2] or ''} for r in rows] + + +def get_categories(master_conn, mye_id): + """Get part categories that have parts for this vehicle (mye_id). + + Uses a subquery on vehicle_parts filtered by mye_id (indexed), + then JOINs through parts -> part_groups -> part_categories. + Uses COUNT with a safety LIMIT on the subquery. + """ + cur = master_conn.cursor() + cur.execute(""" + SELECT pc.id_part_category, + COALESCE(pc.name_es, pc.name_part_category) AS name, + sub.cnt + FROM ( + SELECT pg.category_id, COUNT(*) AS cnt + FROM vehicle_parts vp + JOIN parts p ON p.id_part = vp.part_id + JOIN part_groups pg ON pg.id_part_group = p.group_id + WHERE vp.model_year_engine_id = %s + GROUP BY pg.category_id + ) sub + JOIN part_categories pc ON pc.id_part_category = sub.category_id + ORDER BY name + """, (mye_id,)) + rows = cur.fetchall() + cur.close() + return [{'id_part_category': r[0], 'name': r[1], 'part_count': r[2]} for r in rows] + + +def get_groups(master_conn, mye_id, category_id): + """Get part groups (subcategories) for this vehicle + category, with part counts.""" + cur = master_conn.cursor() + cur.execute(""" + SELECT pg.id_part_group, + COALESCE(pg.name_es, pg.name_part_group) AS name, + COUNT(*) AS cnt + FROM vehicle_parts vp + JOIN parts p ON p.id_part = vp.part_id + JOIN part_groups pg ON pg.id_part_group = p.group_id + WHERE vp.model_year_engine_id = %s + AND pg.category_id = %s + GROUP BY pg.id_part_group, name + ORDER BY name + """, (mye_id, category_id)) + rows = cur.fetchall() + cur.close() + return [{'id_part_group': r[0], 'name': r[1], 'part_count': r[2]} for r in rows] + + +# ───────────────────────────────────────────────────────────────────────────── +# PARTS LIST + DETAIL (with stock enrichment) +# ───────────────────────────────────────────────────────────────────────────── + +def get_parts(master_conn, mye_id, group_id, tenant_conn, branch_id, page=1, per_page=30): + """Get parts for a vehicle + part group, enriched with local stock + bodega indicator. + + 1. Fetch parts from nexus_autoparts (vehicle_parts + parts) — paginated + 2. For each OEM number, look up tenant inventory for local stock + 3. For each part_id, check warehouse_inventory for bodega availability + Returns: {data: [...], pagination: {...}} + """ + per_page = min(per_page, 100) + offset = (page - 1) * per_page + + cur = master_conn.cursor() + + # Count total (bounded — uses indexed mye_id + group_id join) + cur.execute(""" + SELECT COUNT(*) + FROM vehicle_parts vp + JOIN parts p ON p.id_part = vp.part_id + WHERE vp.model_year_engine_id = %s AND p.group_id = %s + """, (mye_id, group_id)) + total = cur.fetchone()[0] + + # Fetch page of parts + cur.execute(""" + SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, + p.description, p.description_es, p.image_url + FROM vehicle_parts vp + JOIN parts p ON p.id_part = vp.part_id + WHERE vp.model_year_engine_id = %s AND p.group_id = %s + ORDER BY p.name_part + LIMIT %s OFFSET %s + """, (mye_id, group_id, per_page, offset)) + rows = cur.fetchall() + + if not rows: + cur.close() + return {'data': [], 'pagination': _pagination(page, per_page, total)} + + part_ids = [r[0] for r in rows] + oem_numbers = [r[1] for r in rows] + + # Bodega availability: count distinct bodegas with stock > 0 per part + cur.execute(""" + SELECT part_id, COUNT(*) AS bodega_count + FROM warehouse_inventory + WHERE part_id = ANY(%s) AND stock_quantity > 0 + GROUP BY part_id + """, (part_ids,)) + bodega_map = {r[0]: r[1] for r in cur.fetchall()} + cur.close() + + # Local stock enrichment from tenant DB + local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids) + + items = [] + for r in rows: + part_id = r[0] + oem = r[1] + local = local_map.get(oem) or local_map.get(f'cat:{part_id}') + items.append({ + 'id_part': part_id, + 'oem_part_number': oem, + 'name': r[3] or r[2], # prefer Spanish name + 'description': r[5] or r[4], + 'image_url': r[6], + 'local_stock': local['stock'] if local else 0, + 'local_price': local['price_1'] if local else None, + 'bodega_count': bodega_map.get(part_id, 0), + }) + + return {'data': items, 'pagination': _pagination(page, per_page, total)} + + +def get_part_detail(master_conn, part_id, tenant_conn, branch_id): + """Get full detail for a single part: catalog info, local stock, bodegas, alternatives. + + Returns: + { + part: {id, oem, name, description, image_url, group_name, category_name}, + local: {stock, price_1, price_2, price_3, cost, inventory_id} | null, + bodegas: [{business_name, price, stock, location}], + alternatives: [{part_number, manufacturer, name, type, local_stock, bodega_count}] + } + """ + cur = master_conn.cursor() + + # Part info with group + category names + cur.execute(""" + SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, + p.description, p.description_es, p.image_url, + COALESCE(pg.name_es, pg.name_part_group) AS group_name, + COALESCE(pc.name_es, pc.name_part_category) AS category_name + FROM parts p + LEFT JOIN part_groups pg ON pg.id_part_group = p.group_id + LEFT JOIN part_categories pc ON pc.id_part_category = pg.category_id + WHERE p.id_part = %s + """, (part_id,)) + row = cur.fetchone() + if not row: + cur.close() + return None + + oem = row[1] + + part_info = { + 'id_part': row[0], + 'oem_part_number': oem, + 'name': row[3] or row[2], + 'description': row[5] or row[4], + 'image_url': row[6], + 'group_name': row[7], + 'category_name': row[8], + } + + # Bodegas with stock + cur.execute(""" + SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location + FROM warehouse_inventory wi + JOIN users u ON u.id_user = wi.user_id + WHERE wi.part_id = %s AND wi.stock_quantity > 0 + ORDER BY wi.price ASC + LIMIT 20 + """, (part_id,)) + bodegas = [ + {'business_name': r[0], 'price': float(r[1]) if r[1] else None, + 'stock': r[2], 'location': r[3]} + for r in cur.fetchall() + ] + + # Alternatives: cross-references + aftermarket + alternatives = _get_alternatives(cur, part_id) + cur.close() + + # Local stock + local = _get_local_stock_single(tenant_conn, branch_id, oem, part_id) + + # Enrich alternatives with local stock + bodega count + if alternatives: + alt_oems = [a['part_number'] for a in alternatives] + alt_local = _get_local_stock_bulk(tenant_conn, branch_id, alt_oems, []) + + cur2 = master_conn.cursor() + # Find part_ids for cross-ref numbers to check bodega stock + cur2.execute(""" + SELECT oem_part_number, id_part FROM parts + WHERE oem_part_number = ANY(%s) + """, (alt_oems,)) + oem_to_part = {r[0]: r[1] for r in cur2.fetchall()} + + alt_part_ids = [pid for pid in oem_to_part.values() if pid] + bodega_map = {} + if alt_part_ids: + cur2.execute(""" + SELECT part_id, COUNT(*) + FROM warehouse_inventory + WHERE part_id = ANY(%s) AND stock_quantity > 0 + GROUP BY part_id + """, (alt_part_ids,)) + bodega_map = {r[0]: r[1] for r in cur2.fetchall()} + cur2.close() + + for a in alternatives: + l = alt_local.get(a['part_number']) + a['local_stock'] = l['stock'] if l else 0 + pid = oem_to_part.get(a['part_number']) + a['bodega_count'] = bodega_map.get(pid, 0) if pid else 0 + + return { + 'part': part_info, + 'local': local, + 'bodegas': bodegas, + 'alternatives': alternatives, + } + + +def _get_alternatives(cur, part_id): + """Get cross-references + aftermarket parts for a given OEM part.""" + results = [] + + # Cross-references (other OEM numbers that reference this part) + cur.execute(""" + SELECT pcr.cross_reference_number, pcr.source_ref + FROM part_cross_references pcr + WHERE pcr.part_id = %s + LIMIT 50 + """, (part_id,)) + for r in cur.fetchall(): + results.append({ + 'part_number': r[0], + 'manufacturer': r[1] or 'OEM Cross-Ref', + 'name': None, + 'type': 'cross_reference', + 'local_stock': 0, + 'bodega_count': 0, + }) + + # Aftermarket alternatives + cur.execute(""" + SELECT ap.part_number, m.name_manufacture, + COALESCE(ap.name_es, ap.name_aftermarket_parts) AS name + FROM aftermarket_parts ap + JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id + WHERE ap.oem_part_id = %s + LIMIT 50 + """, (part_id,)) + for r in cur.fetchall(): + results.append({ + 'part_number': r[0], + 'manufacturer': r[1], + 'name': r[2], + 'type': 'aftermarket', + 'local_stock': 0, + 'bodega_count': 0, + }) + + return results + + +# ───────────────────────────────────────────────────────────────────────────── +# SMART SEARCH +# ───────────────────────────────────────────────────────────────────────────── + +def smart_search(master_conn, q, tenant_conn, branch_id, limit=50): + """Search parts by part number or text. Enriches with local stock. + + Strategy: + - If q looks like a part number (contains digits + hyphens): search oem_part_number ILIKE + - If q is text: use PostgreSQL full-text search (search_vector) with ILIKE fallback + - Always enriches results with local stock from tenant DB + """ + q = q.strip() + if not q or len(q) < 2: + return [] + + limit = min(limit, 100) + cur = master_conn.cursor() + + is_part_number = bool(re.search(r'\d.*[-/]|[-/].*\d|\d{3,}', q)) + + if is_part_number: + # Search by OEM part number + clean_q = q.replace(' ', '').upper() + cur.execute(""" + SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, + p.image_url, p.group_id + FROM parts p + WHERE REPLACE(UPPER(p.oem_part_number), ' ', '') LIKE %s + ORDER BY p.oem_part_number + LIMIT %s + """, (f'%{clean_q}%', limit)) + else: + # Full-text search using tsvector, fall back to ILIKE + tsquery = ' & '.join(q.split()) + cur.execute(""" + SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es, + p.image_url, p.group_id + FROM parts p + WHERE p.search_vector @@ to_tsquery('spanish', %s) + OR p.name_part ILIKE %s + OR p.name_es ILIKE %s + ORDER BY + CASE WHEN p.search_vector @@ to_tsquery('spanish', %s) + THEN 0 ELSE 1 END, + p.name_part + LIMIT %s + """, (tsquery, f'%{q}%', f'%{q}%', tsquery, limit)) + + rows = cur.fetchall() + if not rows: + cur.close() + return [] + + part_ids = [r[0] for r in rows] + oem_numbers = [r[1] for r in rows] + + # Get vehicle info for each part (first match only) + vehicle_info_map = {} + cur.execute(""" + SELECT DISTINCT ON (vp.part_id) + vp.part_id, b.name_brand, m.name_model, y.year_car + FROM vehicle_parts vp + JOIN model_year_engine mye ON mye.id_mye = vp.model_year_engine_id + JOIN models m ON m.id_model = mye.model_id + JOIN brands b ON b.id_brand = m.brand_id + JOIN years y ON y.id_year = mye.year_id + WHERE vp.part_id = ANY(%s) + ORDER BY vp.part_id, y.year_car DESC + """, (part_ids,)) + for r in cur.fetchall(): + vehicle_info_map[r[0]] = f"{r[1]} {r[2]} {r[3]}" + cur.close() + + # Local stock enrichment + local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids) + + results = [] + for r in rows: + part_id = r[0] + oem = r[1] + local = local_map.get(oem) or local_map.get(f'cat:{part_id}') + results.append({ + 'id_part': part_id, + 'oem_part_number': oem, + 'name': r[3] or r[2], + 'image_url': r[4], + 'local_stock': local['stock'] if local else 0, + 'local_price': local['price_1'] if local else None, + 'vehicle_info': vehicle_info_map.get(part_id, ''), + }) + + return results + + +# ───────────────────────────────────────────────────────────────────────────── +# LOCAL STOCK HELPERS +# ───────────────────────────────────────────────────────────────────────────── + +def _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, catalog_part_ids): + """Look up tenant inventory for a batch of OEM numbers / catalog part IDs. + + Returns: dict keyed by oem_number (or 'cat:{id}') -> {stock, price_1, ...} + Matches by: part_number = oem_number OR catalog_part_id = id + """ + if not oem_numbers and not catalog_part_ids: + return {} + + cur = tenant_conn.cursor() + conditions = [] + params = [] + + if oem_numbers: + conditions.append("i.part_number = ANY(%s)") + params.append(oem_numbers) + if catalog_part_ids: + conditions.append("i.catalog_part_id = ANY(%s)") + params.append(catalog_part_ids) + + where = " OR ".join(conditions) + branch_filter = "" + if branch_id: + branch_filter = " AND i.branch_id = %s" + params.append(branch_id) + + cur.execute(f""" + SELECT i.id, i.part_number, i.catalog_part_id, + i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, + COALESCE(SUM(io.quantity), 0) AS stock + FROM inventory i + LEFT JOIN inventory_operations io ON io.inventory_id = i.id + WHERE ({where}) AND i.is_active = true{branch_filter} + GROUP BY i.id + """, params) + + result = {} + for r in cur.fetchall(): + entry = { + 'inventory_id': r[0], + 'part_number': r[1], + 'catalog_part_id': r[2], + 'price_1': float(r[3]) if r[3] else 0, + 'price_2': float(r[4]) if r[4] else 0, + 'price_3': float(r[5]) if r[5] else 0, + 'cost': float(r[6]) if r[6] else 0, + 'tax_rate': float(r[7]) if r[7] else 0.16, + 'stock': r[8], + } + if r[1]: + result[r[1]] = entry + if r[2]: + result[f'cat:{r[2]}'] = entry + cur.close() + return result + + +def _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id): + """Look up a single part in the tenant inventory. Returns dict or None.""" + cur = tenant_conn.cursor() + branch_filter = "" + params = [oem_part_number, catalog_part_id] + if branch_id: + branch_filter = " AND i.branch_id = %s" + params.append(branch_id) + + cur.execute(f""" + SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate, + i.location, i.unit, i.barcode, + COALESCE(SUM(io.quantity), 0) AS stock + FROM inventory i + LEFT JOIN inventory_operations io ON io.inventory_id = i.id + WHERE (i.part_number = %s OR i.catalog_part_id = %s) + AND i.is_active = true{branch_filter} + GROUP BY i.id + LIMIT 1 + """, params) + + row = cur.fetchone() + cur.close() + + if not row: + return None + + return { + 'inventory_id': row[0], + 'price_1': float(row[1]) if row[1] else 0, + 'price_2': float(row[2]) if row[2] else 0, + 'price_3': float(row[3]) if row[3] else 0, + 'cost': float(row[4]) if row[4] else 0, + 'tax_rate': float(row[5]) if row[5] else 0.16, + 'location': row[6], + 'unit': row[7] or 'PZA', + 'barcode': row[8], + 'stock': row[9], + } + + +# ───────────────────────────────────────────────────────────────────────────── +# PUBLIC WRAPPERS (for direct use by callers) +# ───────────────────────────────────────────────────────────────────────────── + +def get_local_stock(tenant_conn, oem_part_number, catalog_part_id, branch_id=None): + """Public wrapper: look up a single part in the tenant inventory.""" + return _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id) + + +def get_bodega_availability(master_conn, part_id): + """Check warehouse_inventory for a part. Returns list of bodegas with stock.""" + cur = master_conn.cursor() + cur.execute(""" + SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location + FROM warehouse_inventory wi + JOIN users u ON u.id_user = wi.user_id + WHERE wi.part_id = %s AND wi.stock_quantity > 0 + ORDER BY wi.price ASC + LIMIT 20 + """, (part_id,)) + rows = cur.fetchall() + cur.close() + return [ + {'business_name': r[0], 'price': float(r[1]) if r[1] else None, + 'stock': r[2], 'location': r[3]} + for r in rows + ] + + +def get_alternatives(master_conn, part_id): + """Public wrapper: get cross-references + aftermarket parts for a given OEM part.""" + cur = master_conn.cursor() + results = _get_alternatives(cur, part_id) + cur.close() + return results + + +# ───────────────────────────────────────────────────────────────────────────── +# HELPERS +# ───────────────────────────────────────────────────────────────────────────── + +def _pagination(page, per_page, total): + """Build standard pagination dict.""" + total_pages = max(1, (total + per_page - 1) // per_page) + return { + 'page': page, + 'per_page': per_page, + 'total': total, + 'total_pages': total_pages, + }