Files
Autoparts-DB/pos/services/inventory_vehicle_compat.py
consultoria-as 371d72887e refactor: centralize QWEN fitment saving via save_qwen_fitment()
- Added save_qwen_fitment() in inventory_vehicle_compat.py to centralize
  inserting QWEN results into inventory_vehicle_compat
- Simplified inventory_bp.py create_item() and auto_match_item_vehicles()
  to use the centralized function, removing duplicated INSERT logic
2026-05-01 07:03:04 +00:00

406 lines
14 KiB
Python

"""Inventory Vehicle Compatibility Engine.
Links local inventory items to model_year_engine_ids so they appear
when browsing the Local catalog by vehicle.
Features:
- Auto-match by part_number against parts/aftermarket_parts/cross_refs
- Manual add/remove MYE compatibility
- Batch operations
"""
from typing import List, Dict, Optional
def get_compat_source(tenant_id):
"""Return the configured compatibility source: 'tecdoc', 'qwen', or 'both'.
Reads from tenant_config table. Defaults to 'both'.
"""
from tenant_db import get_tenant_conn
try:
conn = get_tenant_conn(tenant_id)
cur = conn.cursor()
cur.execute(
"SELECT value FROM tenant_config WHERE key = 'vehicle_compat_source'"
)
row = cur.fetchone()
cur.close()
conn.close()
source = row[0] if row else 'both'
if source in ('tecdoc', 'qwen', 'both'):
return source
except Exception:
pass
return 'both'
def auto_match_vehicle_compatibility(master_conn, tenant_conn, inventory_id, part_number,
brand=None, name=None):
"""Find vehicle compatibility for an inventory item by part_number.
Searches (in order of precision):
1. parts.oem_part_number exact normalized
2. aftermarket_parts.part_number exact normalized
3. part_cross_references.cross_reference_number exact normalized
4. parts.oem_part_number partial (ILIKE) — strips separators too
5. aftermarket_parts.part_number partial (ILIKE)
6. part_cross_references.cross_reference_number partial (ILIKE)
7. parts.name_part / parts.name_es by inventory item name (ILIKE)
8. parts.name_part / parts.name_es by inventory item name (tsvector)
If a brand hint is provided, results are preferentially filtered to
vehicles of that brand.
Returns:
dict: {'matched': bool, 'matches': [...], 'myes': [...]}
"""
cur = master_conn.cursor()
oem_ids = set()
# Normalization helpers
clean_pn = (part_number or '').replace(' ', '').upper()
no_seps = clean_pn.replace('-', '').replace('/', '').replace('.', '').replace('_', '')
name_q = (name or '').strip()
brand_hint = (brand or '').strip().upper()
# ── 1. Exact normalized ──────────────────────────────────────────────
if clean_pn:
cur.execute("""
SELECT id_part FROM parts
WHERE REPLACE(UPPER(oem_part_number), ' ', '') = %s
""", (clean_pn,))
for r in cur.fetchall():
oem_ids.add(r[0])
if clean_pn and not oem_ids:
cur.execute("""
SELECT DISTINCT oem_part_id FROM aftermarket_parts
WHERE REPLACE(UPPER(part_number), ' ', '') = %s
""", (clean_pn,))
for r in cur.fetchall():
if r[0]:
oem_ids.add(r[0])
if clean_pn and not oem_ids:
cur.execute("""
SELECT DISTINCT part_id FROM part_cross_references
WHERE REPLACE(UPPER(cross_reference_number), ' ', '') = %s
""", (clean_pn,))
for r in cur.fetchall():
if r[0]:
oem_ids.add(r[0])
# ── 2. Partial / separator-stripped ──────────────────────────────────
if clean_pn and len(clean_pn) >= 3 and len(oem_ids) < 10:
# OEM part number partial
cur.execute("""
SELECT id_part FROM parts
WHERE REPLACE(UPPER(oem_part_number), ' ', '') LIKE %s
AND id_part NOT IN (SELECT unnest(%s::int[]))
LIMIT 20
""", (f'%{clean_pn}%', list(oem_ids) or [0]))
for r in cur.fetchall():
oem_ids.add(r[0])
# Aftermarket partial
if len(oem_ids) < 10:
cur.execute("""
SELECT DISTINCT oem_part_id FROM aftermarket_parts
WHERE REPLACE(UPPER(part_number), ' ', '') LIKE %s
AND oem_part_id NOT IN (SELECT unnest(%s::int[]))
LIMIT 20
""", (f'%{clean_pn}%', list(oem_ids) or [0]))
for r in cur.fetchall():
if r[0]:
oem_ids.add(r[0])
# Cross-reference partial
if len(oem_ids) < 10:
cur.execute("""
SELECT DISTINCT part_id FROM part_cross_references
WHERE REPLACE(UPPER(cross_reference_number), ' ', '') LIKE %s
AND part_id NOT IN (SELECT unnest(%s::int[]))
LIMIT 20
""", (f'%{clean_pn}%', list(oem_ids) or [0]))
for r in cur.fetchall():
if r[0]:
oem_ids.add(r[0])
# Separator-stripped fallback (e.g. KYB-343412 → KYB343412)
if len(no_seps) >= 4 and len(oem_ids) < 10:
cur.execute("""
SELECT id_part FROM parts
WHERE REPLACE(REPLACE(REPLACE(REPLACE(UPPER(oem_part_number), ' ', ''), '-', ''), '/', ''), '.', '') = %s
AND id_part NOT IN (SELECT unnest(%s::int[]))
LIMIT 10
""", (no_seps, list(oem_ids) or [0]))
for r in cur.fetchall():
oem_ids.add(r[0])
# ── 3. Search by item name (when part_number yields nothing) ─────────
if not oem_ids and name_q and len(name_q) >= 3:
# Name-based ILIKE search on parts
cur.execute("""
SELECT id_part FROM parts
WHERE UPPER(name_part) LIKE %s OR UPPER(COALESCE(name_es, '')) LIKE %s
LIMIT 20
""", (f'%{name_q.upper()}%', f'%{name_q.upper()}%'))
for r in cur.fetchall():
oem_ids.add(r[0])
# Name-based ILIKE search on aftermarket_parts
if len(oem_ids) < 20:
cur.execute("""
SELECT DISTINCT oem_part_id FROM aftermarket_parts
WHERE UPPER(COALESCE(name_aftermarket_parts, '')) LIKE %s
OR UPPER(COALESCE(name_es, '')) LIKE %s
LIMIT 20
""", (f'%{name_q.upper()}%', f'%{name_q.upper()}%'))
for r in cur.fetchall():
if r[0]:
oem_ids.add(r[0])
if not oem_ids:
cur.close()
return {'matched': False, 'matches': [], 'myes': []}
# ── Resolve MYEs ─────────────────────────────────────────────────────
oem_ids = list(oem_ids)
if brand_hint and brand_hint != 'GENERAL':
# Brand-aware: only MYEs for vehicles of the hinted brand
cur.execute("""
SELECT DISTINCT vp.model_year_engine_id
FROM vehicle_parts vp
JOIN model_year_engine mye ON mye.id_mye = vp.model_year_engine_id
JOIN models m ON m.id_model = mye.model_id
JOIN brands b ON b.id_brand = m.brand_id
WHERE vp.part_id = ANY(%s)
AND b.name_brand = %s
LIMIT 200
""", (oem_ids, brand_hint))
else:
# No brand hint — return all MYEs for these parts
cur.execute("""
SELECT DISTINCT model_year_engine_id
FROM vehicle_parts
WHERE part_id = ANY(%s)
LIMIT 200
""", (oem_ids,))
mye_ids = [r[0] for r in cur.fetchall()]
cur.close()
# ── Insert into tenant table ─────────────────────────────────────────
inserted = 0
cur2 = tenant_conn.cursor()
for mye_id in mye_ids:
cur2.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence)
VALUES (%s, %s, 'auto_match', 1.0)
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
""", (inventory_id, mye_id))
if cur2.rowcount > 0:
inserted += 1
tenant_conn.commit()
cur2.close()
return {
'matched': len(mye_ids) > 0,
'matches': oem_ids,
'myes': mye_ids,
'inserted': inserted,
}
def add_compatibility(tenant_conn, inventory_id, model_year_engine_id, source='manual'):
"""Manually add a vehicle compatibility."""
cur = tenant_conn.cursor()
cur.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence)
VALUES (%s, %s, %s, 1.0)
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
RETURNING id
""", (inventory_id, model_year_engine_id, source))
row = cur.fetchone()
tenant_conn.commit()
cur.close()
return row[0] if row else None
def remove_compatibility(tenant_conn, inventory_id, model_year_engine_id):
cur = tenant_conn.cursor()
cur.execute("""
DELETE FROM inventory_vehicle_compat
WHERE inventory_id = %s AND model_year_engine_id = %s
""", (inventory_id, model_year_engine_id))
deleted = cur.rowcount
tenant_conn.commit()
cur.close()
return deleted
def remove_all_compatibility(tenant_conn, inventory_id):
cur = tenant_conn.cursor()
cur.execute("""
DELETE FROM inventory_vehicle_compat WHERE inventory_id = %s
""", (inventory_id,))
deleted = cur.rowcount
tenant_conn.commit()
cur.close()
return deleted
def get_compatibility(tenant_conn, master_conn, inventory_id):
"""Get all MYE compatibilities for an inventory item.
Queries inventory_vehicle_compat from the tenant DB, then resolves
vehicle details (brand/model/year/engine) from the master DB.
"""
# 1. Get MYE IDs + metadata from tenant
cur_t = tenant_conn.cursor()
cur_t.execute("""
SELECT model_year_engine_id, source, confidence, created_at
FROM inventory_vehicle_compat
WHERE inventory_id = %s
ORDER BY model_year_engine_id
""", (inventory_id,))
rows = cur_t.fetchall()
cur_t.close()
if not rows:
return []
mye_ids = [r[0] for r in rows]
# 2. Resolve vehicle details from master DB
cur_m = master_conn.cursor()
cur_m.execute("""
SELECT mye.id_mye, b.name_brand, m.name_model, y.year_car, e.name_engine
FROM model_year_engine mye
JOIN models m ON m.id_model = mye.model_id
JOIN brands b ON b.id_brand = m.brand_id
JOIN years y ON y.id_year = mye.year_id
JOIN engines e ON e.id_engine = mye.engine_id
WHERE mye.id_mye = ANY(%s)
ORDER BY b.name_brand, m.name_model, y.year_car
""", (mye_ids,))
details = {r[0]: r for r in cur_m.fetchall()}
cur_m.close()
# 3. Merge
result = []
for mye_id, source, confidence, created_at in rows:
d = details.get(mye_id)
if d:
result.append({
'model_year_engine_id': mye_id,
'brand': d[1],
'model': d[2],
'year': d[3],
'engine': d[4],
'source': source,
'confidence': float(confidence),
'created_at': str(created_at),
})
return result
def search_mye(master_conn, brand_id=None, model_id=None, year_id=None, engine_id=None):
"""Search MYE IDs by vehicle criteria."""
cur = master_conn.cursor()
clauses = []
params = []
if brand_id:
clauses.append("mye.model_id IN (SELECT id_model FROM models WHERE brand_id = %s)")
params.append(brand_id)
if model_id:
clauses.append("mye.model_id = %s")
params.append(model_id)
if year_id:
clauses.append("mye.year_id = %s")
params.append(year_id)
if engine_id:
clauses.append("mye.engine_id = %s")
params.append(engine_id)
where = " AND ".join(clauses) if clauses else "TRUE"
cur.execute(f"""
SELECT mye.id_mye, b.name_brand, m.name_model, y.year_car, e.name_engine
FROM model_year_engine mye
JOIN models m ON m.id_model = mye.model_id
JOIN brands b ON b.id_brand = m.brand_id
JOIN years y ON y.id_year = mye.year_id
JOIN engines e ON e.id_engine = mye.engine_id
WHERE {where}
ORDER BY b.name_brand, m.name_model, y.year_car
LIMIT 100
""", tuple(params))
rows = cur.fetchall()
cur.close()
return [
{
'id_mye': r[0],
'brand': r[1],
'model': r[2],
'year': r[3],
'engine': r[4],
}
for r in rows
]
def batch_add_compatibilities(tenant_conn, inventory_id, mye_ids, source='manual'):
"""Add multiple MYE compatibilities at once."""
cur = tenant_conn.cursor()
inserted = 0
for mye_id in mye_ids:
cur.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence)
VALUES (%s, %s, %s, 1.0)
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
""", (inventory_id, mye_id, source))
if cur.rowcount > 0:
inserted += 1
tenant_conn.commit()
cur.close()
return inserted
def save_qwen_fitment(tenant_conn, inventory_id, fitment_result):
"""Save QWEN fitment results into inventory_vehicle_compat.
Args:
tenant_conn: Connection to tenant DB.
inventory_id: The inventory item ID.
fitment_result: Dict from get_vehicle_fitment() with 'vehicles' list.
Returns:
int: Number of compatibilities inserted.
"""
vehicles = fitment_result.get('vehicles', [])
if not vehicles:
return 0
inserted = 0
cur = tenant_conn.cursor()
for v in vehicles:
mye_id = v.get('mye_id')
if not mye_id:
continue
cur.execute("""
INSERT INTO inventory_vehicle_compat
(inventory_id, model_year_engine_id, source, confidence, created_at)
VALUES (%s, %s, 'qwen_ai', %s, NOW())
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
""", (inventory_id, mye_id, fitment_result.get('confidence', 0)))
if cur.rowcount > 0:
inserted += 1
tenant_conn.commit()
cur.close()
return inserted