Files
Autoparts-DB/pos/blueprints/supplier_catalog_bp.py
consultoria-as 2b73c2c6db feat: Fase 1-3 completas - precios proveedor, multi-sucursal, factura global
Fase 1: Lista de precios de proveedor
- Tabla supplier_catalog_prices en master DB
- Endpoints GET/POST/PUT/DELETE /supplier-catalog/prices
- Upload CSV/Excel de precios de proveedor
- Visualizacion de supplier_price en catalogo y POS

Fase 2: Multi-sucursal completo
- Migracion v4.0: inventory.branch_id=NULL, tabla inventory_stock
- Campos fiscales en branches (RFC, regimen, CP, serie CFDI, certificados)
- Trigger trg_update_inventory_stock para sincronizar stock por sucursal
- Backend config_bp.py con CRUD de sucursales fiscales
- Backend inventory_bp.py y pos_bp.py refactorizados para inventario compartido
- Backend invoicing_bp.py usa datos fiscales de la sucursal de la venta
- Frontend config.html/js con modal de sucursales expandido

Fase 3: Factura global mensual
- Migracion v4.1: tablas global_invoice_sales, sales.global_invoiced_at
- build_global_invoice_xml() con InformacionGlobal SAT-compliant
- Servicio global_invoice.py para agrupar ventas PUE <=000
- Endpoints POST/GET /global-invoice y /global-invoice/eligible-sales
- Frontend invoicing.html/js con boton y modal de factura global
2026-06-11 08:59:56 +00:00

539 lines
19 KiB
Python

"""Supplier Catalog Blueprint — parts from suppliers with vehicle compatibility.
Independent from inventory. Supports:
- Browse by supplier/category
- Search by text or vehicle (MYE or make/model/year)
- Part detail with compatibilities and interchanges
- Bulk import via Excel
"""
import csv
import io
from datetime import date
from flask import Blueprint, request, jsonify, g, render_template
from psycopg2.extras import RealDictCursor
from tenant_db import get_master_conn
from middleware import require_auth
supplier_catalog_bp = Blueprint('supplier_catalog', __name__, url_prefix='/pos/api/supplier-catalog')
# ─── Helpers ───────────────────────────────────────────────────────────────
def _get_master_conn():
return get_master_conn()
def _json_response(data, status=200):
return jsonify(data), status
# ─── Brands ────────────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/brands', methods=['GET'])
@require_auth('catalog.view')
def list_brands():
"""Return distinct makes (vehicle brands) present in the supplier catalog."""
conn = _get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT DISTINCT make, COUNT(*) as cnt
FROM supplier_catalog_compat
WHERE make IS NOT NULL AND make != ''
GROUP BY make
ORDER BY make ASC
""")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify({'brands': [{'name': r[0], 'count': r[1]} for r in rows]})
# ─── Search ────────────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/search', methods=['GET'])
@require_auth('catalog.view')
def search_items():
"""Search supplier catalog by text and/or vehicle."""
q = (request.args.get('q') or '').strip()
mye_id = request.args.get('mye_id', type=int)
make = (request.args.get('make') or '').strip()
model = (request.args.get('model') or '').strip()
year = request.args.get('year', type=int)
supplier = (request.args.get('supplier') or '').strip()
category = (request.args.get('category') or '').strip()
page = max(1, request.args.get('page', 1, type=int))
per_page = min(100, request.args.get('per_page', 30, type=int))
offset = (page - 1) * per_page
conn = _get_master_conn()
cur = conn.cursor()
# Build query dynamically
where_parts = ["sc.is_active = true"]
params = []
if supplier:
where_parts.append("sc.supplier_name = %s")
params.append(supplier)
if category:
where_parts.append("sc.category = %s")
params.append(category)
# Text search on SKU, name, or interchange part_number
if q:
where_parts.append("""
(sc.sku ILIKE %s OR sc.name ILIKE %s
OR EXISTS (
SELECT 1 FROM supplier_catalog_interchange sci2
WHERE sci2.catalog_id = sc.id AND sci2.part_number ILIKE %s
))
""")
like_q = f'%{q}%'
params.extend([like_q, like_q, like_q])
# Vehicle filter
vehicle_join = ""
if mye_id:
vehicle_join = "JOIN supplier_catalog_compat scc ON scc.catalog_id = sc.id"
where_parts.append("scc.model_year_engine_id = %s")
params.append(mye_id)
elif make or model or year:
vehicle_join = "JOIN supplier_catalog_compat scc ON scc.catalog_id = sc.id"
if make:
where_parts.append("scc.make ILIKE %s")
params.append(f'%{make}%')
if model:
where_parts.append("scc.model ILIKE %s")
params.append(f'%{model}%')
if year:
where_parts.append("scc.year = %s")
params.append(year)
where_sql = " AND ".join(where_parts)
# Count total
count_sql = f"""
SELECT COUNT(DISTINCT sc.id)
FROM supplier_catalog sc
{vehicle_join}
WHERE {where_sql}
"""
cur.execute(count_sql, params)
total = cur.fetchone()[0]
# Fetch page
fetch_sql = f"""
SELECT DISTINCT
sc.id, sc.supplier_name, sc.sku, sc.name,
sc.category, sc.description, sc.image_url
FROM supplier_catalog sc
{vehicle_join}
WHERE {where_sql}
ORDER BY sc.name ASC
LIMIT %s OFFSET %s
"""
cur.execute(fetch_sql, params + [per_page, offset])
rows = cur.fetchall()
items = []
for r in rows:
items.append({
'id': r[0],
'supplier_name': r[1],
'sku': r[2],
'name': r[3],
'category': r[4],
'description': r[5],
'image_url': r[6],
})
cur.close(); conn.close()
return jsonify({
'data': items,
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'total_pages': (total + per_page - 1) // per_page,
}
})
# ─── Item Detail ───────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/items/<int:item_id>', methods=['GET'])
@require_auth('catalog.view')
def get_item_detail(item_id):
"""Return full detail for a supplier catalog item including compat + interchanges."""
conn = _get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT id, supplier_name, sku, name, category, description, image_url, created_at
FROM supplier_catalog WHERE id = %s AND is_active = true
""", (item_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Item not found'}), 404
item = {
'id': row[0],
'supplier_name': row[1],
'sku': row[2],
'name': row[3],
'category': row[4],
'description': row[5],
'image_url': row[6],
'created_at': str(row[7]) if row[7] else None,
}
# Compatibilities — deduplicate by (make, model, year, engine) because
# the same vehicle may map to multiple MYE ids (especially when engine
# text is empty from the supplier catalog).
cur.execute("""
SELECT make, model, year, engine, model_year_engine_id, source
FROM supplier_catalog_compat
WHERE catalog_id = %s
ORDER BY make, model, year, engine
""", (item_id,))
seen_compat = set()
compatibilities = []
for r in cur.fetchall():
key = (r[0], r[1], r[2], r[3])
if key in seen_compat:
continue
seen_compat.add(key)
compatibilities.append({
'make': r[0], 'model': r[1], 'year': r[2], 'engine': r[3],
'model_year_engine_id': r[4], 'source': r[5]
})
item['compatibilities'] = compatibilities
# Interchanges
cur.execute("""
SELECT brand, part_number
FROM supplier_catalog_interchange
WHERE catalog_id = %s
ORDER BY brand, part_number
""", (item_id,))
item['interchanges'] = [
{'brand': r[0], 'part_number': r[1]}
for r in cur.fetchall()
]
cur.close(); conn.close()
return jsonify(item)
# ─── Categories ────────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/categories', methods=['GET'])
@require_auth('catalog.view')
def list_categories():
"""Return distinct categories with counts."""
conn = _get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT category, COUNT(*) as cnt
FROM supplier_catalog
WHERE is_active = true
GROUP BY category
ORDER BY cnt DESC
""")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify({'categories': [{'name': r[0], 'count': r[1]} for r in rows]})
# ─── Suppliers ─────────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/suppliers', methods=['GET'])
@require_auth('catalog.view')
def list_suppliers():
"""Return distinct suppliers with counts."""
conn = _get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT supplier_name, COUNT(*) as cnt
FROM supplier_catalog
WHERE is_active = true
GROUP BY supplier_name
ORDER BY supplier_name ASC
""")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify({'suppliers': [{'name': r[0], 'count': r[1]} for r in rows]})
# ─── Delete ────────────────────────────────────────────────────────────────
@supplier_catalog_bp.route('/items/<int:item_id>', methods=['DELETE'])
@require_auth('inventory.edit')
def delete_item(item_id):
"""Soft-delete a supplier catalog item."""
conn = _get_master_conn()
cur = conn.cursor()
cur.execute("UPDATE supplier_catalog SET is_active = false WHERE id = %s", (item_id,))
conn.commit()
cur.close(); conn.close()
return jsonify({'success': True})
# ─── Prices ────────────────────────────────────────────────────────────────
def _get_latest_prices(master_conn, tenant_id, catalog_ids):
"""Return a dict catalog_id -> price row for the latest active price per item."""
if not catalog_ids:
return {}
cur = master_conn.cursor()
cur.execute("""
SELECT DISTINCT ON (catalog_id)
catalog_id, price, currency, effective_from, effective_to
FROM supplier_catalog_prices
WHERE tenant_id = %s AND catalog_id = ANY(%s) AND is_active = true
AND (effective_to IS NULL OR effective_to >= CURRENT_DATE)
ORDER BY catalog_id, effective_from DESC
""", (tenant_id, list(catalog_ids)))
prices = {}
for r in cur.fetchall():
prices[r[0]] = {
'price': float(r[1]) if r[1] is not None else None,
'currency': r[2] or 'MXN',
'effective_from': str(r[3]) if r[3] else None,
'effective_to': str(r[4]) if r[4] else None,
}
cur.close()
return prices
@supplier_catalog_bp.route('/prices', methods=['GET'])
@require_auth('catalog.view')
def list_prices():
"""List active supplier prices for the current tenant."""
supplier = (request.args.get('supplier') or '').strip()
q = (request.args.get('q') or '').strip()
page = max(1, request.args.get('page', 1, type=int))
per_page = min(200, request.args.get('per_page', 50, type=int))
offset = (page - 1) * per_page
conn = _get_master_conn()
cur = conn.cursor()
where_parts = ["sc.is_active = true", "scp.tenant_id = %s"]
params = [g.tenant_id]
if supplier:
where_parts.append("sc.supplier_name = %s")
params.append(supplier)
if q:
where_parts.append("(sc.sku ILIKE %s OR sc.name ILIKE %s)")
like_q = f'%{q}%'
params.extend([like_q, like_q])
where_sql = " AND ".join(where_parts)
cur.execute(f"""
SELECT COUNT(DISTINCT sc.id)
FROM supplier_catalog sc
JOIN supplier_catalog_prices scp ON scp.catalog_id = sc.id
WHERE {where_sql}
AND scp.is_active = true
AND (scp.effective_to IS NULL OR scp.effective_to >= CURRENT_DATE)
""", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT DISTINCT ON (sc.id)
sc.id, sc.supplier_name, sc.sku, sc.name, sc.category,
scp.price, scp.currency, scp.effective_from, scp.effective_to
FROM supplier_catalog sc
JOIN supplier_catalog_prices scp ON scp.catalog_id = sc.id
WHERE {where_sql}
AND scp.is_active = true
AND (scp.effective_to IS NULL OR scp.effective_to >= CURRENT_DATE)
ORDER BY sc.id, scp.effective_from DESC
LIMIT %s OFFSET %s
""", params + [per_page, offset])
items = []
for r in cur.fetchall():
items.append({
'catalog_id': r[0],
'supplier_name': r[1],
'sku': r[2],
'name': r[3],
'category': r[4],
'price': float(r[5]) if r[5] is not None else None,
'currency': r[6] or 'MXN',
'effective_from': str(r[7]) if r[7] else None,
'effective_to': str(r[8]) if r[8] else None,
})
cur.close(); conn.close()
return jsonify({
'data': items,
'pagination': {'page': page, 'per_page': per_page, 'total': total,
'total_pages': (total + per_page - 1) // per_page}
})
@supplier_catalog_bp.route('/prices/template', methods=['GET'])
@require_auth('catalog.view')
def download_price_template():
"""Return a CSV template for uploading supplier prices."""
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['supplier_name', 'sku', 'price', 'currency', 'effective_from'])
writer.writerow(['YOKOMITSU', 'DENK070A', '1250.00', 'MXN', '2026-01-01'])
output.seek(0)
return (output.getvalue(), 200, {
'Content-Type': 'text/csv; charset=utf-8',
'Content-Disposition': 'attachment; filename="supplier_prices_template.csv"'
})
def _read_upload_file(file_storage):
"""Read CSV or Excel upload and return list of dict rows."""
filename = (file_storage.filename or '').lower()
content = file_storage.read()
if filename.endswith('.csv'):
text = content.decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(text))
return [row for row in reader]
if filename.endswith(('.xlsx', '.xls')):
try:
import openpyxl
except ImportError as e:
raise RuntimeError('openpyxl no instalado; sube CSV o instala openpyxl') from e
wb = openpyxl.load_workbook(io.BytesIO(content), data_only=True)
ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if not rows:
return []
headers = [str(c).strip().lower() if c else '' for c in rows[0]]
return [
dict(zip(headers, row))
for row in rows[1:] if any(cell is not None and str(cell).strip() for cell in row)
]
raise ValueError('Formato no soportado. Usa CSV o Excel (.xlsx)')
@supplier_catalog_bp.route('/prices/upload', methods=['POST'])
@require_auth('inventory.edit')
def upload_prices():
"""Bulk upload/upsert supplier prices for the current tenant.
Expected columns: supplier_name, sku, price, [currency], [effective_from]
"""
if 'file' not in request.files:
return jsonify({'error': 'Archivo requerido'}), 400
file_storage = request.files['file']
if not file_storage or not file_storage.filename:
return jsonify({'error': 'Archivo requerido'}), 400
try:
rows = _read_upload_file(file_storage)
except Exception as e:
return jsonify({'error': str(e)}), 400
if not rows:
return jsonify({'error': 'El archivo esta vacio o no tiene filas validas'}), 400
conn = _get_master_conn()
cur = conn.cursor()
# Build a lookup of supplier+sku -> catalog_id
# We expect all rows to refer to existing catalog items.
normalized_rows = []
errors = []
for idx, row in enumerate(rows, start=2):
supplier = str(row.get('supplier_name') or '').strip()
sku = str(row.get('sku') or '').strip()
price_raw = row.get('price')
currency = str(row.get('currency') or 'MXN').strip().upper() or 'MXN'
eff_from_raw = row.get('effective_from')
if not supplier or not sku:
errors.append(f'Fila {idx}: supplier_name y sku son requeridos')
continue
try:
price = float(str(price_raw).replace(',', '').strip())
except Exception:
errors.append(f'Fila {idx}: precio invalido para {supplier}/{sku}')
continue
eff_from = date.today()
if eff_from_raw:
try:
eff_from = date.fromisoformat(str(eff_from_raw).strip())
except Exception:
errors.append(f'Fila {idx}: effective_from invalido (use YYYY-MM-DD)')
continue
normalized_rows.append((supplier, sku, price, currency, eff_from))
if errors:
cur.close(); conn.close()
return jsonify({'error': 'Errores de validacion', 'details': errors}), 400
# Bulk lookup catalog IDs
catalog_lookup = {}
for supplier, sku, *_ in normalized_rows:
catalog_lookup[(supplier, sku)] = None
if catalog_lookup:
keys = list(catalog_lookup.keys())
# Batch query using unnest
cur.execute("""
SELECT supplier_name, sku, id
FROM supplier_catalog
WHERE is_active = true
AND (supplier_name, sku) = ANY(%s)
""", (keys,))
for r in cur.fetchall():
catalog_lookup[(r[0], r[1])] = r[2]
upserts = []
for idx, (supplier, sku, price, currency, eff_from) in enumerate(normalized_rows, start=2):
catalog_id = catalog_lookup.get((supplier, sku))
if not catalog_id:
errors.append(f'Fila {idx}: SKU {supplier}/{sku} no existe en el catalogo')
continue
upserts.append((g.tenant_id, catalog_id, price, currency, eff_from))
if errors:
cur.close(); conn.close()
return jsonify({'error': 'Errores de validacion', 'details': errors}), 400
inserted = 0
updated = 0
for tenant_id, catalog_id, price, currency, eff_from in upserts:
# Try update existing row with same (tenant_id, catalog_id, effective_from)
cur.execute("""
UPDATE supplier_catalog_prices
SET price = %s, currency = %s, is_active = true, updated_at = NOW()
WHERE tenant_id = %s AND catalog_id = %s AND effective_from = %s
RETURNING id
""", (price, currency, tenant_id, catalog_id, eff_from))
if cur.fetchone():
updated += 1
else:
cur.execute("""
INSERT INTO supplier_catalog_prices
(tenant_id, catalog_id, price, currency, effective_from, is_active)
VALUES (%s, %s, %s, %s, %s, true)
""", (tenant_id, catalog_id, price, currency, eff_from))
inserted += 1
conn.commit()
cur.close(); conn.close()
return jsonify({
'success': True,
'processed': len(upserts),
'inserted': inserted,
'updated': updated,
})