Cambios implementados: 1. Connection pooling (tenant_db.py): - psycopg2.pool.ThreadedConnectionPool para master y tenants - Wrapper _PooledConnection que devuelve al pool en .close() - Cero cambios en blueprints (backward compatible) 2. Tabla inventory_stock_summary + triggers (v3.2): - O(1) stock lookup en vez de SUM() sobre historial completo - Trigger AFTER INSERT en inventory_operations recalcula stock - Poblada inicialmente en ambos tenants - Refactor en 6 archivos de servicios para usar la nueva tabla 3. Fix N+1 en process_sale (pos_engine.py): - Precarga retail_price en bulk query FOR UPDATE - Elimina SELECT individual por item en loop 4. Índices críticos: - idx_parts_name_part + pattern_ops (master) - idx_inv_ops_inventory_branch_created (tenants) - idx_wi_part_stock_positive (master, ya existía desde Fase 1) Tests: 73/73 pasando (compat + fase3 + fase5 + fase6) Migración: v3.2_db_performance.sql
246 lines
7.7 KiB
Python
246 lines
7.7 KiB
Python
"""Inventory Vehicle Compatibility Engine.
|
|
|
|
Links local inventory items to model_year_engine_ids so they appear
|
|
when browsing the Local catalog by vehicle.
|
|
|
|
Features:
|
|
- Auto-match by part_number against parts/aftermarket_parts/cross_refs
|
|
- Manual add/remove MYE compatibility
|
|
- Batch operations
|
|
"""
|
|
|
|
from typing import List, Dict, Optional
|
|
|
|
|
|
def auto_match_vehicle_compatibility(master_conn, tenant_conn, inventory_id, part_number,
|
|
brand=None, name=None):
|
|
"""Find vehicle compatibility for an inventory item by part_number.
|
|
|
|
Searches:
|
|
1. parts.oem_part_number (exact, case-insensitive, spaces stripped)
|
|
2. aftermarket_parts.part_number
|
|
3. part_cross_references.cross_ref_number
|
|
|
|
Returns:
|
|
dict: {'matched': bool, 'matches': [...], 'myes': [...]}
|
|
"""
|
|
cur = master_conn.cursor()
|
|
clean_pn = part_number.replace(' ', '').upper() if part_number else ''
|
|
|
|
# 1. Direct OEM match
|
|
cur.execute("""
|
|
SELECT id_part FROM parts
|
|
WHERE REPLACE(UPPER(oem_part_number), ' ', '') = %s
|
|
""", (clean_pn,))
|
|
oem_ids = [r[0] for r in cur.fetchall()]
|
|
|
|
# 2. Aftermarket match → get oem_part_id
|
|
if not oem_ids:
|
|
cur.execute("""
|
|
SELECT DISTINCT oem_part_id FROM aftermarket_parts
|
|
WHERE REPLACE(UPPER(part_number), ' ', '') = %s
|
|
""", (clean_pn,))
|
|
oem_ids = [r[0] for r in cur.fetchall() if r[0]]
|
|
|
|
# 3. Cross-reference match
|
|
if not oem_ids:
|
|
cur.execute("""
|
|
SELECT DISTINCT part_id FROM part_cross_references
|
|
WHERE REPLACE(UPPER(cross_ref_number), ' ', '') = %s
|
|
""", (clean_pn,))
|
|
oem_ids = [r[0] for r in cur.fetchall() if r[0]]
|
|
|
|
if not oem_ids:
|
|
cur.close()
|
|
return {'matched': False, 'matches': [], 'myes': []}
|
|
|
|
# Get MYEs for these part IDs
|
|
cur.execute("""
|
|
SELECT DISTINCT model_year_engine_id
|
|
FROM vehicle_parts
|
|
WHERE part_id = ANY(%s)
|
|
""", (oem_ids,))
|
|
mye_ids = [r[0] for r in cur.fetchall()]
|
|
cur.close()
|
|
|
|
# Insert into tenant table
|
|
inserted = 0
|
|
cur2 = tenant_conn.cursor()
|
|
for mye_id in mye_ids:
|
|
cur2.execute("""
|
|
INSERT INTO inventory_vehicle_compat
|
|
(inventory_id, model_year_engine_id, source, confidence)
|
|
VALUES (%s, %s, 'auto_match', 1.0)
|
|
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
|
|
""", (inventory_id, mye_id))
|
|
if cur2.rowcount > 0:
|
|
inserted += 1
|
|
tenant_conn.commit()
|
|
cur2.close()
|
|
|
|
return {
|
|
'matched': True,
|
|
'matches': oem_ids,
|
|
'myes': mye_ids,
|
|
'inserted': inserted,
|
|
}
|
|
|
|
|
|
def add_compatibility(tenant_conn, inventory_id, model_year_engine_id, source='manual'):
|
|
"""Manually add a vehicle compatibility."""
|
|
cur = tenant_conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO inventory_vehicle_compat
|
|
(inventory_id, model_year_engine_id, source, confidence)
|
|
VALUES (%s, %s, %s, 1.0)
|
|
ON CONFLICT (inventory_id, model_year_engine_id) DO NOTHING
|
|
RETURNING id
|
|
""", (inventory_id, model_year_engine_id, source))
|
|
row = cur.fetchone()
|
|
tenant_conn.commit()
|
|
cur.close()
|
|
return row[0] if row else None
|
|
|
|
|
|
def remove_compatibility(tenant_conn, inventory_id, model_year_engine_id):
|
|
cur = tenant_conn.cursor()
|
|
cur.execute("""
|
|
DELETE FROM inventory_vehicle_compat
|
|
WHERE inventory_id = %s AND model_year_engine_id = %s
|
|
""", (inventory_id, model_year_engine_id))
|
|
deleted = cur.rowcount
|
|
tenant_conn.commit()
|
|
cur.close()
|
|
return deleted
|
|
|
|
|
|
def remove_all_compatibility(tenant_conn, inventory_id):
|
|
cur = tenant_conn.cursor()
|
|
cur.execute("""
|
|
DELETE FROM inventory_vehicle_compat WHERE inventory_id = %s
|
|
""", (inventory_id,))
|
|
deleted = cur.rowcount
|
|
tenant_conn.commit()
|
|
cur.close()
|
|
return deleted
|
|
|
|
|
|
def get_compatibility(tenant_conn, master_conn, inventory_id):
|
|
"""Get all vehicle compatibilities for an inventory item with vehicle details."""
|
|
cur = tenant_conn.cursor()
|
|
cur.execute("""
|
|
SELECT model_year_engine_id, source, confidence, created_at
|
|
FROM inventory_vehicle_compat
|
|
WHERE inventory_id = %s
|
|
ORDER BY created_at DESC
|
|
""", (inventory_id,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
|
|
if not rows:
|
|
return []
|
|
|
|
mye_ids = [r[0] for r in rows]
|
|
|
|
# Fetch vehicle details from master
|
|
cur2 = master_conn.cursor()
|
|
cur2.execute("""
|
|
SELECT mye.id_mye, b.name_brand, m.name_model, y.year_car, e.name_engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON m.id_model = mye.model_id
|
|
JOIN brands b ON b.id_brand = m.brand_id
|
|
JOIN years y ON y.id_year = mye.year_id
|
|
JOIN engines e ON e.id_engine = mye.engine_id
|
|
WHERE mye.id_mye = ANY(%s)
|
|
""", (mye_ids,))
|
|
vehicle_map = {}
|
|
for r in cur2.fetchall():
|
|
vehicle_map[r[0]] = {
|
|
'brand': r[1], 'model': r[2], 'year': r[3], 'engine': r[4],
|
|
}
|
|
cur2.close()
|
|
|
|
results = []
|
|
for mye_id, source, confidence, created_at in rows:
|
|
v = vehicle_map.get(mye_id, {})
|
|
results.append({
|
|
'model_year_engine_id': mye_id,
|
|
'brand': v.get('brand', ''),
|
|
'model': v.get('model', ''),
|
|
'year': v.get('year', ''),
|
|
'engine': v.get('engine', ''),
|
|
'source': source,
|
|
'confidence': float(confidence) if confidence else 1.0,
|
|
'created_at': str(created_at),
|
|
})
|
|
return results
|
|
|
|
|
|
def get_inventory_by_vehicle(tenant_conn, master_conn, mye_id, branch_id=None):
|
|
"""Get local inventory items compatible with a specific vehicle.
|
|
|
|
Used by catalog_service to inject local items into Local mode browsing.
|
|
"""
|
|
cur = tenant_conn.cursor()
|
|
|
|
branch_filter = ""
|
|
params = [mye_id]
|
|
if branch_id:
|
|
branch_filter = "AND i.branch_id = %s"
|
|
params.append(branch_id)
|
|
|
|
cur.execute(f"""
|
|
SELECT i.id, i.part_number, i.name, i.brand, i.price_1, i.price_2, i.price_3,
|
|
i.image_url, i.description, COALESCE(s.stock, 0) as stock
|
|
FROM inventory i
|
|
JOIN inventory_vehicle_compat ivc ON ivc.inventory_id = i.id
|
|
LEFT JOIN inventory_stock_summary s ON s.inventory_id = i.id
|
|
WHERE ivc.model_year_engine_id = %s {branch_filter}
|
|
AND i.is_active = true
|
|
ORDER BY i.name
|
|
""", params)
|
|
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return rows
|
|
|
|
|
|
def search_mye(master_conn, brand_id=None, model_id=None, year_id=None, engine_id=None):
|
|
"""Search model_year_engine records. Returns list of MYE IDs."""
|
|
cur = master_conn.cursor()
|
|
where = ["true"]
|
|
params = []
|
|
if brand_id:
|
|
where.append("m.brand_id = %s")
|
|
params.append(brand_id)
|
|
if model_id:
|
|
where.append("m.id_model = %s")
|
|
params.append(model_id)
|
|
if year_id:
|
|
where.append("mye.year_id = %s")
|
|
params.append(year_id)
|
|
if engine_id:
|
|
where.append("mye.engine_id = %s")
|
|
params.append(engine_id)
|
|
|
|
cur.execute(f"""
|
|
SELECT mye.id_mye, b.name_brand, m.name_model, y.year_car, e.name_engine
|
|
FROM model_year_engine mye
|
|
JOIN models m ON m.id_model = mye.model_id
|
|
JOIN brands b ON b.id_brand = m.brand_id
|
|
JOIN years y ON y.id_year = mye.year_id
|
|
JOIN engines e ON e.id_engine = mye.engine_id
|
|
WHERE {' AND '.join(where)}
|
|
ORDER BY b.name_brand, m.name_model, y.year_car, e.name_engine
|
|
LIMIT 500
|
|
""", params)
|
|
|
|
results = []
|
|
for r in cur.fetchall():
|
|
results.append({
|
|
'id_mye': r[0], 'brand': r[1], 'model': r[2],
|
|
'year': r[3], 'engine': r[4],
|
|
})
|
|
cur.close()
|
|
return results
|