Files
Autoparts-DB/pos/services/catalog_service.py
Nexus Dev 9ff3dc4c8b FASE 4-5-6: Infraestructura, CRM, Service Orders, Notificaciones, Ahorro, Logistica, API Publica
FASE 4:
- Redis cache de stock con fallback graceful
- Multi-moneda (MXN/USD) con contabilidad en MXN
- Proveedores y ordenes de compra completo
- Meilisearch 1.5M+ partes indexadas
- Metabase KPIs con dashboard auto-generado

FASE 5:
- CRM mejorado: activities, tags, loyalty program, analytics
- Imagenes de partes: upload, resize, thumbnails WebP
- Ordenes de servicio Kanban: received->diagnosis->repair->ready->delivered
- Garantias/RMA, alertas de reorden, multi-sucursal
- Stubs BNPL (APLAZO) y ERP Sync (Aspel/Contpaqi)

FASE 6:
- Notificaciones automaticas: push/WhatsApp/email/in-app
- Reportes de ahorro vs retail_price
- Logistica + tracking: DHL, FedEx, Estafeta, 99min, Uber
- API Publica: API keys, rate limiting, catalog search

Migraciones: v1.9-v3.0
Tests: 93/93 pasando
Backup: nexus_backup_20260427_045859.tar.gz
2026-04-27 05:23:30 +00:00

1476 lines
56 KiB
Python

# /home/Autopartes/pos/services/catalog_service.py
"""Catalog service: queries nexus_autoparts (TecDoc) catalog with local stock enrichment.
All functions receive database connections as parameters.
This module NEVER imports tenant_db — the caller passes connections.
PERFORMANCE: vehicle_parts has 14B+ rows. Every query MUST:
- Filter by model_year_engine_id (indexed)
- Use LIMIT
- Use EXISTS instead of COUNT(*) on vehicle_parts where possible
"""
import re
from services.na_models import is_na_model
from services.translations import translate_part_name, translate_category
def _clean_model_name(name):
"""Parse TecDoc model name to show only the primary name.
'4 RUNNER V (_N28_)''4 RUNNER'
'Corolla Hatchback (_E21_)''COROLLA'
'CAMRY Saloon (_V4_)''CAMRY'
'RAV 4 III (_A3_)''RAV 4'
"""
s = name.strip()
# Remove generation codes in parentheses: (_N28_), (B1_), (_E21_), etc.
s = re.sub(r'\s*\([^)]*\)\s*', '', s)
# Remove Roman numeral generation suffixes: I, II, III, IV, V, VI, VII, VIII, IX, X
s = re.sub(r'\s+(?:VIII|VII|VI|IV|IX|III|II|V|X|I)(?:\s|$)', ' ', s)
# Remove body type suffixes
s = re.sub(r'\s+(?:Estate|Saloon|Hatchback|Van|Coupe|Coupé|Convertible|Wagon|Pickup|Cab|Sedan|SUV|MPV|Kombi|Kasten|Bus|Box|Platform|Chassis)\b', '', s, flags=re.IGNORECASE)
# Remove "Hatchback Van", "Box Body" compound types
s = re.sub(r'\s+(?:Hatchback|Box)\s+(?:Van|Body)\b', '', s, flags=re.IGNORECASE)
# Clean up extra spaces
s = re.sub(r'\s+', ' ', s).strip()
return s.upper() if s else name.upper()
# ─────────────────────────────────────────────────────────────────────────────
# VEHICLE HIERARCHY NAVIGATION
# ─────────────────────────────────────────────────────────────────────────────
from services.catalog_modes import get_brands_for_mode
# Legacy alias — kept for backwards compatibility with any existing imports.
# Prefer `catalog_modes.OEM_BRANDS_NA` in new code.
NORTH_AMERICA_BRANDS = get_brands_for_mode('oem')
def get_brands(master_conn, year_id=None, mode='oem'):
"""Get vehicle brands that have MYE entries, filtered by catalog mode.
Args:
master_conn: Connection to the nexus_autoparts master DB.
year_id: Optional — only return brands with models for that year.
mode: 'oem' (full TecDoc coverage) or 'local' (curated bodega list).
"""
allowed = list(get_brands_for_mode(mode))
cur = master_conn.cursor()
if year_id:
cur.execute("""
SELECT DISTINCT b.id_brand, b.name_brand
FROM brands b
JOIN models m ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
WHERE b.name_brand = ANY(%s) AND mye.year_id = %s
ORDER BY b.name_brand
""", (allowed, year_id))
else:
cur.execute("""
SELECT DISTINCT b.id_brand, b.name_brand
FROM brands b
JOIN models m ON m.brand_id = b.id_brand
JOIN model_year_engine mye ON mye.model_id = m.id_model
WHERE b.name_brand = ANY(%s)
ORDER BY b.name_brand
""", (allowed,))
rows = cur.fetchall()
cur.close()
return [{'id_brand': r[0], 'name_brand': r[1]} for r in rows]
def get_models(master_conn, brand_id, year_id=None, brand_name=None):
"""Get models for a brand that have MYE entries, filtered to North America.
If year_id is provided, only models available for that year.
brand_name is used for NA filtering; looked up from DB if not provided."""
cur = master_conn.cursor()
# Resolve brand_name for NA filter if not provided
if not brand_name:
cur.execute("SELECT name_brand FROM brands WHERE id_brand = %s", (brand_id,))
row = cur.fetchone()
brand_name = row[0] if row else ''
if year_id:
cur.execute("""
SELECT DISTINCT m.id_model, m.name_model
FROM models m
JOIN model_year_engine mye ON mye.model_id = m.id_model
WHERE m.brand_id = %s AND mye.year_id = %s
ORDER BY m.name_model
""", (brand_id, year_id))
else:
cur.execute("""
SELECT DISTINCT m.id_model, m.name_model
FROM models m
JOIN model_year_engine mye ON mye.model_id = m.id_model
WHERE m.brand_id = %s
ORDER BY m.name_model
""", (brand_id,))
rows = cur.fetchall()
cur.close()
# Filter to North America models only, add clean display name, deduplicate
filtered = [r for r in rows if is_na_model(brand_name, r[1])]
# Group by clean name — keep all id_models but show one display name
seen = {} # display_name → first row
results = []
for r in filtered:
display = _clean_model_name(r[1])
if display not in seen:
seen[display] = True
results.append({
'id_model': r[0],
'name_model': r[1],
'display_name': display,
})
# Sort by display_name
results.sort(key=lambda x: x['display_name'])
return results
def get_years(master_conn, model_id):
"""Get distinct years for a model via MYE (fast, no vehicle_parts scan). Ordered DESC."""
cur = master_conn.cursor()
cur.execute("""
SELECT DISTINCT y.id_year, y.year_car
FROM years y
JOIN model_year_engine mye ON mye.year_id = y.id_year
WHERE mye.model_id = %s
ORDER BY y.year_car DESC
""", (model_id,))
rows = cur.fetchall()
cur.close()
return [{'id_year': r[0], 'year_car': r[1]} for r in rows]
def get_engines(master_conn, model_id, year_id):
"""Get MYE entries (engine + trim) for a model+year combo."""
cur = master_conn.cursor()
cur.execute("""
SELECT mye.id_mye, e.name_engine, mye.trim_level
FROM model_year_engine mye
JOIN engines e ON e.id_engine = mye.engine_id
WHERE mye.model_id = %s AND mye.year_id = %s
ORDER BY e.name_engine, mye.trim_level
""", (model_id, year_id))
rows = cur.fetchall()
cur.close()
return [{'id_mye': r[0], 'name_engine': r[1], 'trim_level': r[2] or ''} for r in rows]
def get_categories(master_conn, mye_id):
"""Get part categories that have parts for this vehicle (mye_id).
Uses a subquery on vehicle_parts filtered by mye_id (indexed),
then JOINs through parts -> part_groups -> part_categories.
Uses COUNT with a safety LIMIT on the subquery.
"""
cur = master_conn.cursor()
cur.execute("""
SELECT pc.id_part_category,
COALESCE(pc.name_es, pc.name_part_category) AS name,
sub.cnt
FROM (
SELECT pg.category_id, COUNT(*) AS cnt
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
WHERE vp.model_year_engine_id = %s
GROUP BY pg.category_id
) sub
JOIN part_categories pc ON pc.id_part_category = sub.category_id
ORDER BY name
""", (mye_id,))
rows = cur.fetchall()
cur.close()
return [{'id_part_category': r[0], 'name': translate_category(r[1]), 'part_count': r[2]} for r in rows]
# ─────────────────────────────────────────────────────────────────────────────
# NEXPART HIERARCHY (Local mode) — filtered by what TecDoc has for this vehicle
# ─────────────────────────────────────────────────────────────────────────────
# ─── In-memory cache for vehicle → Nexpart classification ─────────────────
# Key: mye_id (int). Value: (expires_at_timestamp, classified_dict).
# TTL is short (5 min) because catalog data rarely changes but we don't
# want stale data lingering across sessions. Single-process cache —
# Gunicorn workers each have their own, which is fine for this workload.
import time as _time
_CLASSIFY_CACHE = {}
_CLASSIFY_TTL_SECONDS = 300
def _classify_cache_get(mye_id):
entry = _CLASSIFY_CACHE.get(mye_id)
if entry is None:
return None
expires_at, data = entry
if _time.time() > expires_at:
_CLASSIFY_CACHE.pop(mye_id, None)
return None
return data
def _classify_cache_set(mye_id, data):
_CLASSIFY_CACHE[mye_id] = (_time.time() + _CLASSIFY_TTL_SECONDS, data)
# Simple unbounded-growth protection: if cache grows past 500 entries,
# evict the oldest half. Real production would use an LRU library.
if len(_CLASSIFY_CACHE) > 500:
sorted_keys = sorted(_CLASSIFY_CACHE.items(), key=lambda kv: kv[1][0])
for k, _v in sorted_keys[:250]:
_CLASSIFY_CACHE.pop(k, None)
def classify_cache_clear():
"""Manual cache invalidation — call after catalog import."""
_CLASSIFY_CACHE.clear()
def classify_cache_stats():
"""Diagnostic helper for the cache state."""
now = _time.time()
alive = sum(1 for expires, _ in _CLASSIFY_CACHE.values() if expires > now)
return {
'total_entries': len(_CLASSIFY_CACHE),
'alive': alive,
'expired': len(_CLASSIFY_CACHE) - alive,
'ttl_seconds': _CLASSIFY_TTL_SECONDS,
}
def _classify_vehicle_parts(master_conn, mye_id):
"""Classify all TecDoc parts for a vehicle into Nexpart triples.
Runs the matcher once per distinct part name, builds a nested dict:
{
"Brake System...": {
"Front Friction, Drums & Rotors": {
"Front Disc Brake Rotor": [oem_part_id, ...],
...
},
...
},
...
}
Drops parts whose name has no Nexpart equivalent (UNMAPPED_STRATEGY=drop).
Used by all 3 Nexpart-filtered functions below — cached by mye_id so
one navigation sequence (categories → subgroups → part types → parts)
does the classification work exactly once.
"""
# Cache hit — skip the query and matcher entirely
cached = _classify_cache_get(mye_id)
if cached is not None:
return cached
from services.nexpart_taxonomy import tecdoc_to_nexpart
cur = master_conn.cursor()
cur.execute("""
SELECT p.id_part, p.name_part
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
WHERE vp.model_year_engine_id = %s
""", (mye_id,))
rows = cur.fetchall()
cur.close()
classified = {}
for part_id, name_part in rows:
triple = tecdoc_to_nexpart(name_part)
if not triple:
continue # drop unmapped (Decision 2)
group, subgroup, part_type = triple
classified.setdefault(group, {}) \
.setdefault(subgroup, {}) \
.setdefault(part_type, []) \
.append(part_id)
_classify_cache_set(mye_id, classified)
return classified
def get_nexpart_groups_for_vehicle(master_conn, mye_id):
"""Local mode: return Nexpart top-level groups that have parts for this vehicle.
Output shape mirrors get_categories() but uses `slug` (string) instead of
integer category_id. Empty groups are dropped so the user only sees
categories with at least one matched part.
"""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
classified = _classify_vehicle_parts(master_conn, mye_id)
result = []
# Iterate in canonical Nexpart order so the UI is stable
for group in NEXPART_TAXONOMY.keys():
if group not in classified:
continue
# Count distinct part_types matched in this group across all subgroups
part_count = sum(
len(parts)
for subgroup_dict in classified[group].values()
for parts in subgroup_dict.values()
)
result.append({
'slug': group,
'name': translate_taxonomy_node(group),
'name_en': group,
'part_count': part_count,
})
return result
def get_nexpart_subgroups_for_vehicle(master_conn, mye_id, group_slug):
"""Local mode: return Nexpart subgroups within a group that have vehicle parts."""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
classified = _classify_vehicle_parts(master_conn, mye_id)
group_data = classified.get(group_slug, {})
if not group_data:
return []
# Iterate in the canonical order from NEXPART_TAXONOMY for stability
canonical_order = list(NEXPART_TAXONOMY.get(group_slug, {}).keys())
result = []
for subgroup in canonical_order:
if subgroup not in group_data:
continue
part_count = sum(len(p) for p in group_data[subgroup].values())
result.append({
'slug': subgroup,
'name': translate_taxonomy_node(subgroup),
'name_en': subgroup,
'part_count': part_count,
})
return result
# ═══════════════════════════════════════════════════════════════════════════
# SHOP SUPPLIES — vehicle-independent parts (Network Shop Supplies tab)
# ═══════════════════════════════════════════════════════════════════════════
# These live under 2 Nexpart groups that don't require a vehicle selection:
# - Chemicals, Waxes & Lubricants (oils, fluids, additives)
# - Tires, Wheels, Tools & Accessory Parts (TPMS, lug nuts, universal clips)
#
# The navigation skips the Year→Make→Model→Engine chain and goes directly
# to group selection. The query scans `parts` globally without joining
# `vehicle_parts` (which is HUGE), so it's fast.
# The 2 Nexpart groups that are safely vehicle-independent.
_SHOP_SUPPLIES_GROUPS = (
"Chemicals, Waxes & Lubricants",
"Tires, Wheels, Tools & Accessory Parts",
)
# Maps each Nexpart Part Type in the universal groups to a list of SQL ILIKE
# patterns that match the actual TecDoc name_part values. This inverts the
# forward matcher (which goes TecDoc → Nexpart) — here we're asking "which
# TecDoc part names should be classified into this Nexpart Part Type?"
#
# Built by inspecting real name_part values in the parts table. Grow this
# map when you see shop supplies that are missing from the results.
SHOP_SUPPLIES_PATTERNS = {
# Chemicals, Waxes & Lubricants
"Engine Oil": ["Engine Oil"],
"Automatic Transmission Fluid": ["Automatic Transmission Fluid", "Transmission Oil"],
# Tires & Wheels (TPMS + lug hardware)
"TPMS Sensor": ["TPMS%", "Tire Pressure Sensor%"],
"TPMS Programmable Sensor": ["%TPMS%Programmable%"],
"TPMS Sensor Service Kit": ["%TPMS%Service Kit%", "%TPMS%Repair Kit%"],
"TPMS Sensor Valve Assembly": ["%TPMS%Valve%"],
"TPMS Valve Kit": ["%TPMS%Valve Kit%", "Valve, tyre pressure control system"],
"TPMS Programmable Sensor Valve Kit": ["%TPMS%Programmable%Valve%"],
"Wheel Lug Nut": ["Wheel Nut"],
"Wheel Lug Stud": ["Wheel Bolt", "Wheel Stud"],
# Bumper & License Plate (universal clips)
"Bumper Clip": ["%Clip%bumper%", "%bumper%Clip%"],
"Bumper Cover Retainer": ["%bumper cover%", "Retainer%bumper%"],
"Bumper Cover Trim Panel Retainer": ["%Retainer%trim%", "%bumper trim%"],
"License Plate Bracket Rivet": ["%license plate%rivet%", "%plate%bracket%"],
# Hood, Fender & Body Parts (universal clips)
"Cowl Panel Retainer": ["%cowl%retainer%", "Clip, cowl%"],
"Door Sill Plate Clip": ["%door sill%clip%", "Clip, sill%"],
"Exterior Molding Clip": ["%Clip%trim%", "%trim%strip%Clip%"],
"Interior Panel Clip": ["Clip, trim%"],
"Rocker Panel Molding Retainer": ["%rocker%retainer%"],
"Spoiler Clip": ["Clip, spoiler%", "%spoiler%Clip%"],
"Undercar Shield Clip": ["%undercar shield%", "%underbody%Clip%"],
# Tools, Jacks, Hardware & Manuals — mostly not present in TecDoc
"Cooling System Flush Gun Kit": ["%cooling system flush%"],
"Molding Clip": ["Clip, moulding%", "Clip, molding%"],
"Multi-Purpose Retainer": ["Retainer%", "Clip, mounting%"],
"Interior Panel Retainer": ["Retainer%panel%", "Clip, interior panel%"],
# Interior & Steering Wheel — mostly connectors (sparse in TecDoc)
"Accelerator Pedal Sensor Connector": ["%accelerator pedal%connector%"],
"Clutch Pedal Position Switch Connector": ["%clutch pedal%connector%"],
"Console Trim Panel Clip": ["%console%clip%"],
# Electronics Audio/Visual & Mirrors
"Antenna Mast": ["%antenna mast%", "%antenna%"],
"Interior Rear View Mirror Connector": ["%rear view mirror%connector%"],
"Interior Rear View Mirror Mounting Base": ["%mirror%mounting base%"],
"Keyless Entry Transmitter Cover": ["%keyless%cover%"],
"Lane Departure System Camera": ["%lane departure%"],
}
def _shop_supplies_count_by_part_type(master_conn, part_type_names):
"""Given a list of Nexpart Part Type names (the Shop Supplies-eligible ones),
return {part_type: total_count} using the SHOP_SUPPLIES_PATTERNS map.
Uses one query per Part Type because the patterns are OR'd via ILIKE and
we need a per-PT count. Still fast because patterns are indexed via
trigram if enabled, or just full-scan on 1.5M rows (~500ms total).
"""
result = {}
cur = master_conn.cursor()
for pt in part_type_names:
patterns = SHOP_SUPPLIES_PATTERNS.get(pt)
if not patterns:
continue
# Build a WHERE clause with multiple ILIKE ORs
like_parts = " OR ".join(["name_part ILIKE %s"] * len(patterns))
cur.execute(
f"SELECT COUNT(*) FROM parts WHERE {like_parts}",
patterns,
)
count = cur.fetchone()[0] or 0
if count > 0:
result[pt] = count
cur.close()
return result
def _shop_supplies_oem_ids(master_conn, part_type_name, limit=5000):
"""Return the OEM id_part values that match a Shop Supplies Part Type."""
patterns = SHOP_SUPPLIES_PATTERNS.get(part_type_name)
if not patterns:
return []
cur = master_conn.cursor()
like_parts = " OR ".join(["name_part ILIKE %s"] * len(patterns))
cur.execute(
f"SELECT id_part FROM parts WHERE {like_parts} LIMIT %s",
patterns + [limit],
)
ids = [row[0] for row in cur.fetchall()]
cur.close()
return ids
def get_shop_supplies_groups():
"""Return the 2 Nexpart groups that don't require a vehicle.
Unlike the vehicle-scoped get_nexpart_groups_for_vehicle(), this returns
ALL subgroups of these groups regardless of whether there are matching
parts in the DB — that check happens at the subgroup level to avoid
scanning `parts` multiple times.
"""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
result = []
for group in _SHOP_SUPPLIES_GROUPS:
if group not in NEXPART_TAXONOMY:
continue
subgroup_count = len(NEXPART_TAXONOMY[group])
part_type_count = sum(len(pts) for pts in NEXPART_TAXONOMY[group].values())
result.append({
'slug': group,
'name': translate_taxonomy_node(group),
'name_en': group,
'part_count': part_type_count, # count of distinct Part Types, not parts
'subgroup_count': subgroup_count,
})
return result
def get_shop_supplies_subgroups(master_conn, group_slug):
"""Return subgroups in a Shop Supplies group that have actual TecDoc parts."""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
if group_slug not in _SHOP_SUPPLIES_GROUPS:
return []
if group_slug not in NEXPART_TAXONOMY:
return []
subgroups = NEXPART_TAXONOMY[group_slug]
# Count each Part Type via the SHOP_SUPPLIES_PATTERNS map (ILIKE-based
# inverse search that handles naming gaps between Nexpart and TecDoc).
all_part_types = [pt for pts in subgroups.values() for pt in pts]
counts_by_pt = _shop_supplies_count_by_part_type(master_conn, all_part_types)
result = []
for sg_name, pt_list in subgroups.items():
total = sum(counts_by_pt.get(pt, 0) for pt in pt_list)
if total == 0:
continue
result.append({
'slug': sg_name,
'name': translate_taxonomy_node(sg_name),
'name_en': sg_name,
'part_count': total,
})
return result
def get_shop_supplies_part_types(master_conn, group_slug, subgroup_slug):
"""Return Part Types within a Shop Supplies subgroup that have TecDoc parts."""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
if group_slug not in _SHOP_SUPPLIES_GROUPS:
return []
subgroups = NEXPART_TAXONOMY.get(group_slug, {})
part_types = subgroups.get(subgroup_slug, [])
if not part_types:
return []
counts_by_pt = _shop_supplies_count_by_part_type(master_conn, part_types)
# Also fetch a sample image for each matched Part Type
cur = master_conn.cursor()
result = []
for pt in part_types:
cnt = counts_by_pt.get(pt, 0)
if cnt == 0:
continue
patterns = SHOP_SUPPLIES_PATTERNS.get(pt, [])
if patterns:
like_parts = " OR ".join(["name_part ILIKE %s"] * len(patterns))
cur.execute(
f"SELECT image_url FROM parts WHERE ({like_parts}) AND image_url IS NOT NULL LIMIT 1",
patterns,
)
row = cur.fetchone()
sample_image = row[0] if row else None
else:
sample_image = None
result.append({
'slug': pt,
'name': translate_taxonomy_node(pt),
'name_en': pt,
'variant_count': cnt,
'sample_image': sample_image,
})
cur.close()
return result
def get_shop_supplies_parts(master_conn, group_slug, subgroup_slug, part_type_slug,
tenant_conn, branch_id, page=1, per_page=30):
"""Return paginated parts (with aftermarket enrichment) for a Shop Supplies triple.
Same output shape as get_parts_for_nexpart_triple() — reuses get_parts_local
with an explicit OEM part ID list.
"""
from services.nexpart_taxonomy import NEXPART_TAXONOMY
if group_slug not in _SHOP_SUPPLIES_GROUPS:
return {'data': [], 'pagination': _pagination(page, per_page, 0), 'mode': 'local'}
# Validate that the requested part type exists in the taxonomy
valid_pts = NEXPART_TAXONOMY.get(group_slug, {}).get(subgroup_slug, [])
if part_type_slug not in valid_pts:
return {'data': [], 'pagination': _pagination(page, per_page, 0), 'mode': 'local'}
# Fetch OEM IDs via the inverse-pattern map (handles TecDoc/Nexpart name gaps)
oem_part_ids = _shop_supplies_oem_ids(master_conn, part_type_slug)
if not oem_part_ids:
return {'data': [], 'pagination': _pagination(page, per_page, 0), 'mode': 'local'}
# Reuse the aftermarket-enriched query path
return get_parts_local(
master_conn, mye_id=None, group_id=None,
tenant_conn=tenant_conn, branch_id=branch_id,
page=page, per_page=per_page,
oem_part_ids=oem_part_ids,
)
def get_parts_for_nexpart_triple(master_conn, mye_id, group_slug, subgroup_slug,
part_type_slug, tenant_conn, branch_id,
page=1, per_page=30):
"""Local mode: fetch parts (aftermarket-prioritized) for a Nexpart triple.
Steps:
1. Classify the vehicle's parts to find which OEM id_part values
map to (group, subgroup, part_type).
2. Delegate to get_parts_local() with the resulting OEM part IDs.
Returns the same shape as get_parts_local().
"""
classified = _classify_vehicle_parts(master_conn, mye_id)
part_ids = (
classified
.get(group_slug, {})
.get(subgroup_slug, {})
.get(part_type_slug, [])
)
if not part_ids:
return {
'data': [],
'pagination': _pagination(page, per_page, 0),
'mode': 'local',
}
return get_parts_local(
master_conn, mye_id=None, group_id=None,
tenant_conn=tenant_conn, branch_id=branch_id,
page=page, per_page=per_page,
oem_part_ids=part_ids,
)
def get_nexpart_part_types_for_vehicle(master_conn, mye_id, group_slug, subgroup_slug):
"""Local mode: return Nexpart part types within a subgroup that have vehicle parts.
Output shape matches get_part_types() so the frontend can render with
minimal branching: each item has slug + name + variant_count + sample_image.
"""
from services.nexpart_taxonomy import (
NEXPART_TAXONOMY,
translate_taxonomy_node,
)
classified = _classify_vehicle_parts(master_conn, mye_id)
subgroup_data = classified.get(group_slug, {}).get(subgroup_slug, {})
if not subgroup_data:
return []
# Pull a sample image for each part type — single query, all part_ids at once
all_part_ids = [
pid
for pids in subgroup_data.values()
for pid in pids
]
image_map = {}
if all_part_ids:
cur = master_conn.cursor()
cur.execute("""
SELECT id_part, image_url
FROM parts
WHERE id_part = ANY(%s) AND image_url IS NOT NULL
""", (all_part_ids,))
for pid, url in cur.fetchall():
image_map[pid] = url
cur.close()
canonical_order = NEXPART_TAXONOMY.get(group_slug, {}).get(subgroup_slug, [])
result = []
for pt in canonical_order:
if pt not in subgroup_data:
continue
part_ids = subgroup_data[pt]
sample_image = next((image_map[pid] for pid in part_ids if pid in image_map), None)
result.append({
'slug': pt,
'name': translate_taxonomy_node(pt),
'name_en': pt,
'variant_count': len(part_ids),
'sample_image': sample_image,
})
return result
def get_groups(master_conn, mye_id, category_id):
"""Get part groups (subcategories) for this vehicle + category, with part counts."""
cur = master_conn.cursor()
cur.execute("""
SELECT pg.id_part_group,
COALESCE(pg.name_es, pg.name_part_group) AS name,
COUNT(*) AS cnt
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN part_groups pg ON pg.id_part_group = p.group_id
WHERE vp.model_year_engine_id = %s
AND pg.category_id = %s
GROUP BY pg.id_part_group, name
ORDER BY name
""", (mye_id, category_id))
rows = cur.fetchall()
cur.close()
return [{'id_part_group': r[0], 'name': r[1], 'part_count': r[2]} for r in rows]
def get_part_types(master_conn, mye_id, group_id):
"""Get distinct part types within a vehicle+group (Nexpart-style 3rd subcategory level).
A "part type" is a unique part name within a group — e.g. within "Brake System"
group, the types might be "Brake Disc", "Brake Pad", "Brake Caliper", each with
multiple OEM/aftermarket variants.
Returns: [{name, slug, variant_count, sample_image}]
- name: display name (Spanish if available, else original)
- slug: URL-safe key used to filter parts (the original English name_part)
- variant_count: how many distinct OEM parts exist for this type
- sample_image: image URL of the first variant (for thumbnail)
"""
cur = master_conn.cursor()
# Use ORIGINAL name_part as the slug (matches DB column for filtering),
# but display the Spanish translation if available.
cur.execute("""
SELECT
p.name_part AS slug,
COALESCE(p.name_es, p.name_part) AS display_name,
COUNT(*) AS variants,
(ARRAY_AGG(p.image_url) FILTER (WHERE p.image_url IS NOT NULL))[1] AS sample_image
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
WHERE vp.model_year_engine_id = %s
AND p.group_id = %s
GROUP BY p.name_part, COALESCE(p.name_es, p.name_part)
ORDER BY variants DESC, display_name ASC
""", (mye_id, group_id))
rows = cur.fetchall()
cur.close()
return [
{
'slug': r[0],
'name': translate_part_name(r[1]),
'variant_count': r[2],
'sample_image': r[3],
}
for r in rows
]
# ─────────────────────────────────────────────────────────────────────────────
# PARTS LIST + DETAIL (with stock enrichment)
# ─────────────────────────────────────────────────────────────────────────────
def get_parts(master_conn, mye_id, group_id, tenant_conn, branch_id, page=1, per_page=30, part_type=None):
"""Get parts for a vehicle + part group, enriched with local stock + bodega indicator.
1. Fetch parts from nexus_autoparts (vehicle_parts + parts) — paginated
2. For each OEM number, look up tenant inventory for local stock
3. For each part_id, check warehouse_inventory for bodega availability
Optional part_type filter (string): when provided, only returns parts whose
name_part matches exactly. Used for the 3rd subcategory level (Nexpart-style).
Returns: {data: [...], pagination: {...}}
"""
per_page = min(per_page, 100)
offset = (page - 1) * per_page
cur = master_conn.cursor()
extra_where = ""
extra_params_count = (mye_id, group_id)
extra_params_fetch = (mye_id, group_id, per_page, offset)
if part_type:
extra_where = " AND p.name_part = %s"
extra_params_count = (mye_id, group_id, part_type)
extra_params_fetch = (mye_id, group_id, part_type, per_page, offset)
# Count total (bounded — uses indexed mye_id + group_id join)
cur.execute("""
SELECT COUNT(*)
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
WHERE vp.model_year_engine_id = %s AND p.group_id = %s""" + extra_where, extra_params_count)
total = cur.fetchone()[0]
# Fetch page of parts
cur.execute("""
SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es,
p.description, p.description_es, p.image_url
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
WHERE vp.model_year_engine_id = %s AND p.group_id = %s""" + extra_where + """
ORDER BY p.name_part
LIMIT %s OFFSET %s
""", extra_params_fetch)
rows = cur.fetchall()
if not rows:
cur.close()
return {'data': [], 'pagination': _pagination(page, per_page, total)}
part_ids = [r[0] for r in rows]
oem_numbers = [r[1] for r in rows]
# Bodega availability: count distinct bodegas with stock > 0 per part
cur.execute("""
SELECT part_id, COUNT(*) AS bodega_count
FROM warehouse_inventory
WHERE part_id = ANY(%s) AND stock_quantity > 0
GROUP BY part_id
""", (part_ids,))
bodega_map = {r[0]: r[1] for r in cur.fetchall()}
cur.close()
# Local stock enrichment from tenant DB
local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids)
items = []
for r in rows:
part_id = r[0]
oem = r[1]
local = local_map.get(oem) or local_map.get(f'cat:{part_id}')
# Prefer local inventory image over catalog image
image_url = (local.get('image_url') if local else None) or r[6]
raw_name = r[3] or r[2] # prefer Spanish name
items.append({
'id_part': part_id,
'oem_part_number': oem,
'name': translate_part_name(raw_name),
'description': r[5] or r[4],
'image_url': image_url,
'local_stock': local['stock'] if local else 0,
'local_price': local['price_1'] if local else None,
'bodega_count': bodega_map.get(part_id, 0),
})
return {'data': items, 'pagination': _pagination(page, per_page, total)}
def get_parts_local(master_conn, mye_id, group_id, tenant_conn, branch_id,
page=1, per_page=30, part_type=None, oem_part_ids=None):
"""Local catalog mode: show aftermarket parts instead of OEM.
Two filtering modes:
A) `oem_part_ids` provided → fetch aftermarket equivalents for that
specific list of OEM IDs. Used by get_parts_for_nexpart_triple()
(Nexpart navigation in Local mode).
B) `oem_part_ids` is None → use (mye_id, group_id, optional part_type)
to find OEM parts via vehicle_parts join. Legacy path for the
TecDoc-style Local navigation.
Flow (mode B; mode A skips step 1 since IDs are already known):
1. Find OEM parts for the vehicle+group.
2. For each OEM part, pull all aftermarket equivalents.
3. Join manufacturers to get brand name.
4. Join warehouse_inventory to check bodega availability.
5. Sort by priority tier, then in-stock first, then manufacturer name.
6. Paginate.
Returns:
{data: [...], pagination: {...}, mode: 'local'}
Each part item: manufacturer, priority_tier, in_stock_network,
warehouse_price, plus the standard fields.
"""
from services.catalog_modes import (
LOCAL_PRIORITY_MANUFACTURERS_TIER1,
LOCAL_PRIORITY_MANUFACTURERS_TIER2,
get_priority_tier,
)
per_page = min(per_page, 100)
offset = (page - 1) * per_page
cur = master_conn.cursor()
tier1 = list(LOCAL_PRIORITY_MANUFACTURERS_TIER1)
tier2 = list(LOCAL_PRIORITY_MANUFACTURERS_TIER2)
# ─── Build the WHERE clause for the OEM-side filter ───
if oem_part_ids is not None:
# Mode A: explicit OEM ID list (Nexpart navigation)
where_clause = "p.id_part = ANY(%s)"
where_params_count = (oem_part_ids,)
from_join_count = """
FROM parts p
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
"""
else:
# Mode B: vehicle+group filter (legacy TecDoc navigation)
from_join_count = """
FROM vehicle_parts vp
JOIN parts p ON p.id_part = vp.part_id
JOIN aftermarket_parts ap ON ap.oem_part_id = p.id_part
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
"""
where_clause = "vp.model_year_engine_id = %s AND p.group_id = %s"
where_params_count = (mye_id, group_id)
if part_type:
where_clause += " AND p.name_part = %s"
where_params_count = (mye_id, group_id, part_type)
# Count total aftermarket parts
cur.execute(
"SELECT COUNT(*) " + from_join_count + " WHERE " + where_clause,
where_params_count,
)
total = cur.fetchone()[0]
# Priority-sorted fetch — same WHERE clause as the COUNT, plus tiers + paging.
fetch_params = list(where_params_count) + [tier1, tier2, per_page, offset]
cur.execute("""
WITH aftermarket_for_vehicle AS (
SELECT DISTINCT
ap.id_aftermarket_parts,
ap.oem_part_id,
ap.part_number,
COALESCE(ap.name_es, ap.name_aftermarket_parts) AS am_name,
ap.price_usd,
m.name_manufacture,
p.oem_part_number,
COALESCE(p.name_es, p.name_part) AS oem_name,
COALESCE(p.description_es, p.description) AS oem_desc,
p.image_url AS oem_image
""" + from_join_count + """
WHERE """ + where_clause + """
),
stock_per_oem AS (
SELECT part_id, COUNT(*) AS bodega_count, MIN(price) AS min_price, SUM(stock_quantity) AS total_stock
FROM warehouse_inventory
WHERE stock_quantity > 0
GROUP BY part_id
)
SELECT afv.id_aftermarket_parts,
afv.oem_part_id,
afv.part_number,
afv.am_name,
afv.price_usd,
afv.name_manufacture,
afv.oem_part_number,
afv.oem_name,
afv.oem_desc,
afv.oem_image,
COALESCE(s.bodega_count, 0) AS bodega_count,
s.min_price AS warehouse_price,
COALESCE(s.total_stock, 0) AS warehouse_stock,
CASE
WHEN UPPER(afv.name_manufacture) = ANY(%s) THEN 1
WHEN UPPER(afv.name_manufacture) = ANY(%s) THEN 2
ELSE 3
END AS tier
FROM aftermarket_for_vehicle afv
LEFT JOIN stock_per_oem s ON s.part_id = afv.oem_part_id
ORDER BY tier ASC,
(COALESCE(s.bodega_count, 0) > 0) DESC,
afv.name_manufacture ASC,
afv.am_name ASC
LIMIT %s OFFSET %s
""", fetch_params)
rows = cur.fetchall()
cur.close()
if not rows:
return {'data': [], 'pagination': _pagination(page, per_page, total), 'mode': 'local'}
# Enrich with tenant local stock (look up by OEM part number).
# Use a different name to avoid shadowing the `oem_part_ids` parameter.
oem_numbers = list({r[6] for r in rows if r[6]})
result_oem_ids = list({r[1] for r in rows if r[1]})
local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, result_oem_ids)
items = []
for r in rows:
aft_id = r[0]
oem_part_id = r[1]
aft_number = r[2]
aft_name = r[3]
price_usd = r[4]
manufacturer = r[5]
oem_number = r[6]
oem_name = r[7]
oem_desc = r[8]
oem_image = r[9]
bodega_count = r[10]
warehouse_price = r[11]
warehouse_stock = r[12]
tier = r[13]
# Tenant local stock (refaccionaria's own inventory)
local = local_map.get(oem_number) or local_map.get(f'cat:{oem_part_id}')
image_url = (local.get('image_url') if local else None) or oem_image
items.append({
# Keep fields compatible with OEM mode output so the frontend
# can render with minimal branching.
'id_part': oem_part_id, # OEM id used for detail drill-down
'id_aftermarket': aft_id, # aftermarket row id (for future use)
'oem_part_number': oem_number,
'part_number': aft_number, # aftermarket SKU
'name': translate_part_name(aft_name or oem_name),
'description': oem_desc,
'image_url': image_url,
'manufacturer': manufacturer,
'priority_tier': tier, # 1, 2, or 3
'local_stock': local['stock'] if local else 0,
'local_price': local['price_1'] if local else None,
'bodega_count': bodega_count,
'warehouse_stock': warehouse_stock,
'warehouse_price': float(warehouse_price) if warehouse_price is not None else None,
'in_stock_network': bodega_count > 0,
'price_usd': float(price_usd) if price_usd is not None else None,
})
return {'data': items, 'pagination': _pagination(page, per_page, total), 'mode': 'local'}
def get_part_detail(master_conn, part_id, tenant_conn, branch_id):
"""Get full detail for a single part: catalog info, local stock, bodegas, alternatives.
Returns:
{
part: {id, oem, name, description, image_url, group_name, category_name},
local: {stock, price_1, price_2, price_3, cost, inventory_id} | null,
bodegas: [{business_name, price, stock, location}],
alternatives: [{part_number, manufacturer, name, type, local_stock, bodega_count}]
}
"""
cur = master_conn.cursor()
# Part info with group + category names
cur.execute("""
SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es,
p.description, p.description_es, p.image_url,
COALESCE(pg.name_es, pg.name_part_group) AS group_name,
COALESCE(pc.name_es, pc.name_part_category) AS category_name
FROM parts p
LEFT JOIN part_groups pg ON pg.id_part_group = p.group_id
LEFT JOIN part_categories pc ON pc.id_part_category = pg.category_id
WHERE p.id_part = %s
""", (part_id,))
row = cur.fetchone()
if not row:
cur.close()
return None
oem = row[1]
part_info = {
'id_part': row[0],
'oem_part_number': oem,
'name': translate_part_name(row[3] or row[2]),
'description': row[5] or row[4],
'image_url': row[6],
'group_name': translate_category(row[7]) if row[7] else row[7],
'category_name': translate_category(row[8]) if row[8] else row[8],
}
# Bodegas with stock
cur.execute("""
SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location
FROM warehouse_inventory wi
JOIN users u ON u.id_user = wi.user_id
WHERE wi.part_id = %s AND wi.stock_quantity > 0
ORDER BY wi.price ASC
LIMIT 20
""", (part_id,))
bodegas = [
{'business_name': r[0], 'price': float(r[1]) if r[1] else None,
'stock': r[2], 'location': r[3]}
for r in cur.fetchall()
]
# Alternatives: cross-references + aftermarket
alternatives = _get_alternatives(cur, part_id)
cur.close()
# Local stock
local = _get_local_stock_single(tenant_conn, branch_id, oem, part_id)
# Enrich alternatives with local stock + bodega count
if alternatives:
alt_oems = [a['part_number'] for a in alternatives]
alt_local = _get_local_stock_bulk(tenant_conn, branch_id, alt_oems, [])
cur2 = master_conn.cursor()
# Find part_ids for cross-ref numbers to check bodega stock
cur2.execute("""
SELECT oem_part_number, id_part FROM parts
WHERE oem_part_number = ANY(%s)
""", (alt_oems,))
oem_to_part = {r[0]: r[1] for r in cur2.fetchall()}
alt_part_ids = [pid for pid in oem_to_part.values() if pid]
bodega_map = {}
if alt_part_ids:
cur2.execute("""
SELECT part_id, COUNT(*)
FROM warehouse_inventory
WHERE part_id = ANY(%s) AND stock_quantity > 0
GROUP BY part_id
""", (alt_part_ids,))
bodega_map = {r[0]: r[1] for r in cur2.fetchall()}
cur2.close()
for a in alternatives:
l = alt_local.get(a['part_number'])
a['local_stock'] = l['stock'] if l else 0
pid = oem_to_part.get(a['part_number'])
a['bodega_count'] = bodega_map.get(pid, 0) if pid else 0
return {
'part': part_info,
'local': local,
'bodegas': bodegas,
'alternatives': alternatives,
}
def _get_alternatives(cur, part_id):
"""Get cross-references + aftermarket parts for a given OEM part."""
results = []
# Cross-references (other OEM numbers that reference this part)
cur.execute("""
SELECT pcr.cross_reference_number, pcr.source_ref
FROM part_cross_references pcr
WHERE pcr.part_id = %s
LIMIT 50
""", (part_id,))
for r in cur.fetchall():
results.append({
'part_number': r[0],
'manufacturer': r[1] or 'OEM Cross-Ref',
'name': None,
'type': 'cross_reference',
'local_stock': 0,
'bodega_count': 0,
})
# Aftermarket alternatives
cur.execute("""
SELECT ap.part_number, m.name_manufacture,
COALESCE(ap.name_es, ap.name_aftermarket_parts) AS name
FROM aftermarket_parts ap
JOIN manufacturers m ON m.id_manufacture = ap.manufacturer_id
WHERE ap.oem_part_id = %s
LIMIT 50
""", (part_id,))
for r in cur.fetchall():
results.append({
'part_number': r[0],
'manufacturer': r[1],
'name': r[2],
'type': 'aftermarket',
'local_stock': 0,
'bodega_count': 0,
})
return results
# ─────────────────────────────────────────────────────────────────────────────
# SMART SEARCH
# ─────────────────────────────────────────────────────────────────────────────
def _search_meili_fallback(master_conn, q, limit):
"""Search Meilisearch and hydrate from PostgreSQL.
Returns list of tuples (id_part, oem_part_number, name_part, name_es,
image_url, group_id) or None if Meilisearch is unavailable.
"""
try:
from services.meili_search import search_parts
result = search_parts(q, limit=limit)
if result is None:
# Meilisearch error — signal fallback
return None
if not result.get('hits'):
return []
hits = result['hits']
part_ids = [h['id_part'] for h in hits]
cur = master_conn.cursor()
cur.execute("""
SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es,
p.image_url, p.group_id
FROM parts p
WHERE p.id_part = ANY(%s)
""", (part_ids,))
pg_rows = {r[0]: r for r in cur.fetchall()}
cur.close()
# Preserve Meilisearch ranking order
rows = []
for h in hits:
row = pg_rows.get(h['id_part'])
if row:
rows.append(row)
return rows
except Exception:
# Meilisearch unavailable — signal fallback
return None
def smart_search(master_conn, q, tenant_conn, branch_id, limit=50):
"""Search parts by part number or text. Enriches with local stock.
Strategy:
1. Try Meilisearch first (sub-100ms full-text + typo tolerance)
2. Fallback to PostgreSQL tsvector / ILIKE if Meilisearch is down
3. Always enriches results with local stock from tenant DB
"""
q = q.strip()
if not q or len(q) < 2:
return []
limit = min(limit, 100)
cur = master_conn.cursor()
# ── Attempt Meilisearch first ───────────────────────────────────────────
meili_rows = _search_meili_fallback(master_conn, q, limit)
if meili_rows is not None:
rows = meili_rows
else:
# PostgreSQL fallback
is_part_number = bool(re.search(r'\d.*[-/]|[-/].*\d|\d{3,}', q))
if is_part_number:
clean_q = q.replace(' ', '').upper()
cur.execute("""
SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es,
p.image_url, p.group_id
FROM parts p
WHERE REPLACE(UPPER(p.oem_part_number), ' ', '') LIKE %s
ORDER BY p.oem_part_number
LIMIT %s
""", (f'%{clean_q}%', limit))
else:
tsquery = ' & '.join(q.split())
cur.execute("""
SELECT p.id_part, p.oem_part_number, p.name_part, p.name_es,
p.image_url, p.group_id
FROM parts p
WHERE p.search_vector @@ to_tsquery('spanish', %s)
OR p.name_part ILIKE %s
OR p.name_es ILIKE %s
ORDER BY
CASE WHEN p.search_vector @@ to_tsquery('spanish', %s)
THEN 0 ELSE 1 END,
p.name_part
LIMIT %s
""", (tsquery, f'%{q}%', f'%{q}%', tsquery, limit))
rows = cur.fetchall()
if not rows:
cur.close()
return []
part_ids = [r[0] for r in rows]
oem_numbers = [r[1] for r in rows]
# Get vehicle info for each part (first match only)
vehicle_info_map = {}
cur.execute("""
SELECT DISTINCT ON (vp.part_id)
vp.part_id, b.name_brand, m.name_model, y.year_car
FROM vehicle_parts vp
JOIN model_year_engine mye ON mye.id_mye = vp.model_year_engine_id
JOIN models m ON m.id_model = mye.model_id
JOIN brands b ON b.id_brand = m.brand_id
JOIN years y ON y.id_year = mye.year_id
WHERE vp.part_id = ANY(%s)
ORDER BY vp.part_id, y.year_car DESC
""", (part_ids,))
for r in cur.fetchall():
vehicle_info_map[r[0]] = f"{r[1]} {r[2]} {r[3]}"
cur.close()
# Local stock enrichment
local_map = _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, part_ids)
results = []
for r in rows:
part_id = r[0]
oem = r[1]
local = local_map.get(oem) or local_map.get(f'cat:{part_id}')
results.append({
'id_part': part_id,
'oem_part_number': oem,
'name': translate_part_name(r[3] or r[2]),
'image_url': r[4],
'local_stock': local['stock'] if local else 0,
'local_price': local['price_1'] if local else None,
'vehicle_info': vehicle_info_map.get(part_id, ''),
})
return results
# ─────────────────────────────────────────────────────────────────────────────
# LOCAL STOCK HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def _get_local_stock_bulk(tenant_conn, branch_id, oem_numbers, catalog_part_ids):
"""Look up tenant inventory for a batch of OEM numbers / catalog part IDs.
Returns: dict keyed by oem_number (or 'cat:{id}') -> {stock, price_1, ...}
Matches by: part_number = oem_number OR catalog_part_id = id
Public-catalog-safe: when tenant_conn is None (public browsing, no tenant
context) returns an empty dict so the parts list still renders without
local stock/price enrichment.
"""
if tenant_conn is None:
return {}
if not oem_numbers and not catalog_part_ids:
return {}
cur = tenant_conn.cursor()
conditions = []
params = []
if oem_numbers:
conditions.append("i.part_number = ANY(%s)")
params.append(oem_numbers)
if catalog_part_ids:
conditions.append("i.catalog_part_id = ANY(%s)")
params.append(catalog_part_ids)
where = " OR ".join(conditions)
branch_filter = ""
if branch_id:
branch_filter = " AND i.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.catalog_part_id,
i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
COALESCE(SUM(io.quantity), 0) AS stock,
i.image_url
FROM inventory i
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
WHERE ({where}) AND i.is_active = true{branch_filter}
GROUP BY i.id
""", params)
result = {}
for r in cur.fetchall():
entry = {
'inventory_id': r[0],
'part_number': r[1],
'catalog_part_id': r[2],
'price_1': float(r[3]) if r[3] else 0,
'price_2': float(r[4]) if r[4] else 0,
'price_3': float(r[5]) if r[5] else 0,
'cost': float(r[6]) if r[6] else 0,
'tax_rate': float(r[7]) if r[7] else 0.16,
'stock': r[8],
'image_url': r[9],
}
if r[1]:
result[r[1]] = entry
if r[2]:
result[f'cat:{r[2]}'] = entry
cur.close()
return result
def _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id):
"""Look up a single part in the tenant inventory. Returns dict or None."""
cur = tenant_conn.cursor()
branch_filter = ""
params = [oem_part_number, catalog_part_id]
if branch_id:
branch_filter = " AND i.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.price_1, i.price_2, i.price_3, i.cost, i.tax_rate,
i.location, i.unit, i.barcode,
COALESCE(SUM(io.quantity), 0) AS stock,
i.image_url
FROM inventory i
LEFT JOIN inventory_operations io ON io.inventory_id = i.id
WHERE (i.part_number = %s OR i.catalog_part_id = %s)
AND i.is_active = true{branch_filter}
GROUP BY i.id
LIMIT 1
""", params)
row = cur.fetchone()
cur.close()
if not row:
return None
return {
'inventory_id': row[0],
'price_1': float(row[1]) if row[1] else 0,
'price_2': float(row[2]) if row[2] else 0,
'price_3': float(row[3]) if row[3] else 0,
'cost': float(row[4]) if row[4] else 0,
'tax_rate': float(row[5]) if row[5] else 0.16,
'location': row[6],
'unit': row[7] or 'PZA',
'barcode': row[8],
'stock': row[9],
'image_url': row[10],
}
# ─────────────────────────────────────────────────────────────────────────────
# PUBLIC WRAPPERS (for direct use by callers)
# ─────────────────────────────────────────────────────────────────────────────
def get_local_stock(tenant_conn, oem_part_number, catalog_part_id, branch_id=None):
"""Public wrapper: look up a single part in the tenant inventory."""
return _get_local_stock_single(tenant_conn, branch_id, oem_part_number, catalog_part_id)
def get_bodega_availability(master_conn, part_id):
"""Check warehouse_inventory for a part. Returns list of bodegas with stock."""
cur = master_conn.cursor()
cur.execute("""
SELECT u.business_name, wi.price, wi.stock_quantity, wi.warehouse_location
FROM warehouse_inventory wi
JOIN users u ON u.id_user = wi.user_id
WHERE wi.part_id = %s AND wi.stock_quantity > 0
ORDER BY wi.price ASC
LIMIT 20
""", (part_id,))
rows = cur.fetchall()
cur.close()
return [
{'business_name': r[0], 'price': float(r[1]) if r[1] else None,
'stock': r[2], 'location': r[3]}
for r in rows
]
def get_alternatives(master_conn, part_id):
"""Public wrapper: get cross-references + aftermarket parts for a given OEM part."""
cur = master_conn.cursor()
results = _get_alternatives(cur, part_id)
cur.close()
return results
# ─────────────────────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def _pagination(page, per_page, total):
"""Build standard pagination dict."""
total_pages = max(1, (total + per_page - 1) // per_page)
return {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': total_pages,
}