Files
Autoparts-DB/pos/blueprints/inventory_bp.py
consultoria-as e00dce7d5a feat(pos): add gunicorn, marketplace B2B, and subscription billing (#7, #8, #12)
- Gunicorn production server with auto-scaled workers, run.sh, updated systemd service
- Marketplace B2B: cross-tenant inventory search, ordering, seller management with full UI
- Subscription billing: plan limits enforced on products/employees/branches, billing API + upgrade flow

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 08:17:33 +00:00

986 lines
35 KiB
Python

# /home/Autopartes/pos/blueprints/inventory_bp.py
"""Inventory blueprint: CRUD for inventory items + stock operations + reports."""
import io
import json
import os
from datetime import datetime, timedelta
from flask import Blueprint, request, jsonify, g
from middleware import require_auth, has_permission
from tenant_db import get_tenant_conn
from services.inventory_engine import (
get_stock, get_stock_bulk, record_purchase, record_return,
record_adjustment, record_transfer, record_initial,
get_alerts, get_movement_history
)
from services.barcode_generator import generate_barcode
from services.audit import log_action
inventory_bp = Blueprint('inventory', __name__, url_prefix='/pos/api/inventory')
# ─── Item CRUD ──────────────────────────────────
@inventory_bp.route('/items', methods=['GET'])
@require_auth('inventory.view')
def list_items():
"""List inventory items with current stock. Supports search, pagination, filtering.
The low_stock filter is applied at the SQL level via a LEFT JOIN + HAVING clause,
so pagination counts remain accurate.
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
page = int(request.args.get('page', 1))
per_page = min(int(request.args.get('per_page', 50)), 200)
search = request.args.get('q', '')
category = request.args.get('category', '')
brand = request.args.get('brand', '')
branch_id = request.args.get('branch_id', g.branch_id)
low_stock = request.args.get('low_stock', '') == 'true'
where_clauses = ["i.is_active = true"]
params = []
if branch_id:
where_clauses.append("i.branch_id = %s")
params.append(branch_id)
if search:
where_clauses.append("(i.part_number ILIKE %s OR i.name ILIKE %s OR i.barcode ILIKE %s)")
params.extend([f'%{search}%', f'%{search}%', f'%{search}%'])
if category:
where_clauses.append("i.category_id = %s")
params.append(int(category))
if brand:
where_clauses.append("i.brand ILIKE %s")
params.append(f'%{brand}%')
where = " AND ".join(where_clauses)
if low_stock:
# low_stock filter: JOIN with stock subquery, filter items where stock < min_stock
# This keeps pagination accurate because the filter is in the SQL WHERE clause.
count_sql = f"""
SELECT count(*) FROM inventory i
LEFT JOIN (
SELECT inventory_id, COALESCE(SUM(quantity), 0) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
WHERE {where} AND i.min_stock IS NOT NULL AND i.min_stock > 0
AND COALESCE(s.stock, 0) < i.min_stock
"""
cur.execute(count_sql, params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT i.id, i.branch_id, i.part_number, i.barcode, i.name, i.description,
i.category_id, i.brand, i.unit, i.cost, i.price_1, i.price_2, i.price_3,
i.tax_rate, i.min_stock, i.max_stock, i.location, i.image_url, i.catalog_part_id,
COALESCE(s.stock, 0) AS stock
FROM inventory i
LEFT JOIN (
SELECT inventory_id, COALESCE(SUM(quantity), 0) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
WHERE {where} AND i.min_stock IS NOT NULL AND i.min_stock > 0
AND COALESCE(s.stock, 0) < i.min_stock
ORDER BY i.name
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'branch_id': r[1], 'part_number': r[2], 'barcode': r[3],
'name': r[4], 'description': r[5], 'category_id': r[6], 'brand': r[7],
'unit': r[8], 'cost': float(r[9]) if r[9] else 0,
'price_1': float(r[10]) if r[10] else 0,
'price_2': float(r[11]) if r[11] else 0,
'price_3': float(r[12]) if r[12] else 0,
'tax_rate': float(r[13]) if r[13] else 0.16,
'min_stock': r[14], 'max_stock': r[15], 'location': r[16],
'image_url': r[17], 'catalog_part_id': r[18],
'stock': r[19]
})
else:
# Normal path: count, fetch items, then bulk-lookup stock
cur.execute(f"SELECT count(*) FROM inventory i WHERE {where}", params)
total = cur.fetchone()[0]
cur.execute(f"""
SELECT i.id, i.branch_id, i.part_number, i.barcode, i.name, i.description,
i.category_id, i.brand, i.unit, i.cost, i.price_1, i.price_2, i.price_3,
i.tax_rate, i.min_stock, i.max_stock, i.location, i.image_url, i.catalog_part_id
FROM inventory i
WHERE {where}
ORDER BY i.name
LIMIT %s OFFSET %s
""", params + [per_page, (page - 1) * per_page])
items_raw = cur.fetchall()
# Get stock for all returned items
inv_ids = [r[0] for r in items_raw]
stock_map = {}
if inv_ids:
cur.execute("""
SELECT inventory_id, COALESCE(SUM(quantity), 0)
FROM inventory_operations
WHERE inventory_id = ANY(%s)
GROUP BY inventory_id
""", (inv_ids,))
stock_map = {r[0]: r[1] for r in cur.fetchall()}
items = []
for r in items_raw:
stock = stock_map.get(r[0], 0)
items.append({
'id': r[0], 'branch_id': r[1], 'part_number': r[2], 'barcode': r[3],
'name': r[4], 'description': r[5], 'category_id': r[6], 'brand': r[7],
'unit': r[8], 'cost': float(r[9]) if r[9] else 0,
'price_1': float(r[10]) if r[10] else 0,
'price_2': float(r[11]) if r[11] else 0,
'price_3': float(r[12]) if r[12] else 0,
'tax_rate': float(r[13]) if r[13] else 0.16,
'min_stock': r[14], 'max_stock': r[15], 'location': r[16],
'image_url': r[17], 'catalog_part_id': r[18],
'stock': stock
})
cur.close()
conn.close()
total_pages = (total + per_page - 1) // per_page
return jsonify({
'data': items,
'pagination': {'page': page, 'per_page': per_page, 'total': total, 'total_pages': total_pages}
})
@inventory_bp.route('/items/<int:item_id>', methods=['GET'])
@require_auth('inventory.view')
def get_item(item_id):
"""Get a single inventory item with stock and movement history."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT i.*, b.name as branch_name
FROM inventory i
LEFT JOIN branches b ON i.branch_id = b.id
WHERE i.id = %s
""", (item_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Item not found'}), 404
cols = [desc[0] for desc in cur.description]
item = dict(zip(cols, row))
# Convert Decimal to float
for k in ('cost', 'price_1', 'price_2', 'price_3', 'tax_rate'):
if item.get(k) is not None:
item[k] = float(item[k])
item['stock'] = get_stock(conn, item_id, item.get('branch_id'))
item['history'] = get_movement_history(conn, item_id, limit=20)
cur.close()
conn.close()
return jsonify(item)
@inventory_bp.route('/items', methods=['POST'])
@require_auth('inventory.create')
def create_item():
"""Create a new inventory item. Optionally set initial stock."""
data = request.get_json() or {}
required = ['part_number', 'name']
for f in required:
if not data.get(f):
return jsonify({'error': f'{f} required'}), 400
branch_id = data.get('branch_id', g.branch_id)
if not branch_id:
return jsonify({'error': 'branch_id required'}), 400
# Plan limit check
from services.billing import check_limit, next_plan, PLANS, get_plan
conn = get_tenant_conn(g.tenant_id)
cur_count = conn.cursor()
cur_count.execute("SELECT count(*) FROM inventory WHERE is_active = true")
current_products = cur_count.fetchone()[0]
cur_count.close()
allowed, limit, current = check_limit(g.tenant_id, 'max_products', current_products)
if not allowed:
conn.close()
plan_key = get_plan(g.tenant_id)
nxt = next_plan(plan_key)
nxt_name = PLANS[nxt]['name'] if nxt else 'Enterprise'
return jsonify({'error': f'Plan limit reached ({limit} products). Upgrade to {nxt_name}.'}), 403
conn.close()
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Generate barcode if not provided
barcode = data.get('barcode')
if not barcode:
# Look up tenant db_name
from tenant_db import get_master_conn
mconn = get_master_conn()
mcur = mconn.cursor()
mcur.execute("SELECT db_name FROM tenants WHERE id = %s", (g.tenant_id,))
db_name = mcur.fetchone()[0]
mcur.close(); mconn.close()
barcode = generate_barcode(conn, db_name)
try:
cur.execute("""
INSERT INTO inventory
(branch_id, part_number, barcode, name, description, category_id, brand,
vehicle_compatibility, unit, cost, price_1, price_2, price_3, tax_rate,
min_stock, max_stock, location, image_url, catalog_part_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id
""", (
branch_id, data['part_number'], barcode, data['name'],
data.get('description'), data.get('category_id'), data.get('brand'),
json.dumps(data.get('vehicle_compatibility')) if data.get('vehicle_compatibility') else None,
data.get('unit', 'PZA'), data.get('cost', 0),
data.get('price_1', 0), data.get('price_2', 0), data.get('price_3', 0),
data.get('tax_rate', 0.16),
data.get('min_stock', 0), data.get('max_stock', 0),
data.get('location'), data.get('image_url'), data.get('catalog_part_id')
))
item_id = cur.fetchone()[0]
# Record initial stock if provided
initial_stock = data.get('initial_stock', 0)
if initial_stock > 0:
record_initial(conn, item_id, branch_id, initial_stock, data.get('cost'))
log_action(conn, 'INVENTORY_CREATE', 'inventory', item_id,
new_value={'part_number': data['part_number'], 'name': data['name'], 'initial_stock': initial_stock})
conn.commit()
cur.close(); conn.close()
return jsonify({'id': item_id, 'barcode': barcode, 'message': 'Item created'}), 201
except Exception as e:
conn.rollback()
cur.close(); conn.close()
if 'idx_inventory_branch_part' in str(e):
return jsonify({'error': 'Part number already exists in this branch'}), 409
return jsonify({'error': str(e)}), 500
@inventory_bp.route('/items/<int:item_id>', methods=['PUT'])
@require_auth('inventory.edit')
def update_item(item_id):
"""Update inventory item fields (not stock — use operations for that)."""
data = request.get_json() or {}
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Get current values for audit
cur.execute("SELECT * FROM inventory WHERE id = %s", (item_id,))
old = cur.fetchone()
if not old:
cur.close(); conn.close()
return jsonify({'error': 'Item not found'}), 404
old_cols = [desc[0] for desc in cur.description]
old_dict = dict(zip(old_cols, old))
# Price change requires special permission
price_fields = {'price_1', 'price_2', 'price_3', 'cost'}
changing_prices = price_fields & set(data.keys())
if changing_prices and not has_permission('config.edit_prices'):
return jsonify({'error': 'Permission config.edit_prices required to change prices'}), 403
# Build dynamic update
allowed = ['part_number', 'barcode', 'name', 'description', 'category_id', 'brand',
'vehicle_compatibility', 'unit', 'cost', 'price_1', 'price_2', 'price_3',
'tax_rate', 'min_stock', 'max_stock', 'location', 'image_url', 'catalog_part_id', 'is_active']
sets = []
vals = []
for field in allowed:
if field in data:
val = data[field]
if field == 'vehicle_compatibility' and isinstance(val, (dict, list)):
val = json.dumps(val)
sets.append(f"{field} = %s")
vals.append(val)
if not sets:
cur.close(); conn.close()
return jsonify({'error': 'No fields to update'}), 400
vals.append(item_id)
cur.execute(f"UPDATE inventory SET {', '.join(sets)} WHERE id = %s", vals)
if changing_prices:
log_action(conn, 'PRICE_CHANGE', 'inventory', item_id,
old_value={k: float(old_dict[k]) if old_dict[k] else 0 for k in changing_prices},
new_value={k: data[k] for k in changing_prices})
conn.commit()
cur.close(); conn.close()
return jsonify({'message': 'Item updated'})
# ─── Image Upload / Delete ─────────────────────
IMAGES_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'static', 'images', 'parts')
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'}
MAX_IMAGE_BYTES = 5 * 1024 * 1024 # 5 MB
def _process_image(file_data, max_size=800):
"""Resize image to max_size and convert to JPEG."""
from PIL import Image
img = Image.open(io.BytesIO(file_data))
img.thumbnail((max_size, max_size), Image.LANCZOS)
if img.mode not in ('RGB', 'L'):
img = img.convert('RGB')
output = io.BytesIO()
img.save(output, format='JPEG', quality=85)
return output.getvalue()
def _process_thumbnail(file_data, size=300):
"""Generate a smaller thumbnail."""
return _process_image(file_data, max_size=size)
def _delete_image_files(tenant_id, item_id):
"""Remove image and thumbnail for the given item from disk."""
for suffix in ('', '_thumb'):
path = os.path.join(IMAGES_DIR, f'{tenant_id}_{item_id}{suffix}.jpg')
if os.path.exists(path):
os.remove(path)
@inventory_bp.route('/items/<int:item_id>/image', methods=['POST'])
@require_auth('inventory.edit')
def upload_image(item_id):
"""Upload an image for an inventory item. Accepts multipart file upload.
Validates file type (jpg, png, webp) and size (max 5 MB).
Saves resized image + thumbnail, updates inventory.image_url.
"""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
f = request.files['file']
if not f.filename:
return jsonify({'error': 'Empty filename'}), 400
ext = f.filename.rsplit('.', 1)[-1].lower() if '.' in f.filename else ''
if ext not in ALLOWED_EXTENSIONS:
return jsonify({'error': f'File type not allowed. Use: {", ".join(ALLOWED_EXTENSIONS)}'}), 400
raw = f.read()
if len(raw) > MAX_IMAGE_BYTES:
return jsonify({'error': 'File too large (max 5 MB)'}), 400
# Verify item exists
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("SELECT id FROM inventory WHERE id = %s", (item_id,))
if not cur.fetchone():
cur.close(); conn.close()
return jsonify({'error': 'Item not found'}), 404
try:
# Process and save main image
os.makedirs(IMAGES_DIR, exist_ok=True)
main_data = _process_image(raw)
main_filename = f'{g.tenant_id}_{item_id}.jpg'
main_path = os.path.join(IMAGES_DIR, main_filename)
with open(main_path, 'wb') as out:
out.write(main_data)
# Process and save thumbnail
thumb_data = _process_thumbnail(raw)
thumb_filename = f'{g.tenant_id}_{item_id}_thumb.jpg'
thumb_path = os.path.join(IMAGES_DIR, thumb_filename)
with open(thumb_path, 'wb') as out:
out.write(thumb_data)
# Update DB
image_url = f'/pos/static/images/parts/{main_filename}'
cur.execute("UPDATE inventory SET image_url = %s WHERE id = %s", (image_url, item_id))
conn.commit()
log_action(conn, 'IMAGE_UPLOAD', 'inventory', item_id,
new_value={'image_url': image_url})
cur.close(); conn.close()
return jsonify({
'image_url': image_url,
'thumbnail_url': f'/pos/static/images/parts/{thumb_filename}',
'message': 'Image uploaded'
})
except Exception as e:
conn.rollback()
cur.close(); conn.close()
return jsonify({'error': str(e)}), 500
@inventory_bp.route('/items/<int:item_id>/image', methods=['DELETE'])
@require_auth('inventory.edit')
def delete_image(item_id):
"""Delete the image for an inventory item. Removes files from disk and sets image_url = NULL."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("SELECT image_url FROM inventory WHERE id = %s", (item_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Item not found'}), 404
# Remove files from disk
_delete_image_files(g.tenant_id, item_id)
# Clear DB
cur.execute("UPDATE inventory SET image_url = NULL WHERE id = %s", (item_id,))
conn.commit()
log_action(conn, 'IMAGE_DELETE', 'inventory', item_id,
old_value={'image_url': row[0]})
cur.close(); conn.close()
return jsonify({'message': 'Image deleted'})
# ─── Stock Operations ──────────────────────────
@inventory_bp.route('/purchase', methods=['POST'])
@require_auth('inventory.create')
def api_purchase():
"""Record a purchase entry (stock in)."""
data = request.get_json() or {}
required = ['inventory_id', 'quantity', 'unit_cost']
for f in required:
if not data.get(f) and data.get(f) != 0:
return jsonify({'error': f'{f} required'}), 400
conn = get_tenant_conn(g.tenant_id)
branch_id = data.get('branch_id', g.branch_id)
op_id = record_purchase(
conn, data['inventory_id'], branch_id,
data['quantity'], data['unit_cost'],
supplier_invoice=data.get('supplier_invoice'),
notes=data.get('notes')
)
conn.commit()
conn.close()
return jsonify({'operation_id': op_id, 'message': 'Purchase recorded'})
@inventory_bp.route('/adjustment', methods=['POST'])
@require_auth('inventory.adjust')
def api_adjustment():
"""Record a manual stock adjustment."""
data = request.get_json() or {}
if not data.get('inventory_id') or data.get('quantity') is None or not data.get('reason'):
return jsonify({'error': 'inventory_id, quantity, and reason required'}), 400
conn = get_tenant_conn(g.tenant_id)
try:
branch_id = data.get('branch_id', g.branch_id)
op_id = record_adjustment(conn, data['inventory_id'], branch_id, data['quantity'], data['reason'])
conn.commit()
conn.close()
return jsonify({'operation_id': op_id, 'message': 'Adjustment recorded'})
except ValueError as e:
conn.close()
return jsonify({'error': str(e)}), 400
@inventory_bp.route('/transfer', methods=['POST'])
@require_auth('inventory.transfer')
def api_transfer():
"""Transfer stock between branches."""
data = request.get_json() or {}
required = ['inventory_id', 'from_branch_id', 'to_branch_id', 'quantity']
for f in required:
if not data.get(f):
return jsonify({'error': f'{f} required'}), 400
conn = get_tenant_conn(g.tenant_id)
out_id, in_id = record_transfer(
conn, data['inventory_id'], data['from_branch_id'],
data['to_branch_id'], data['quantity'], data.get('notes')
)
conn.commit()
conn.close()
return jsonify({'out_operation_id': out_id, 'in_operation_id': in_id, 'message': 'Transfer recorded'})
@inventory_bp.route('/return', methods=['POST'])
@require_auth('inventory.create')
def api_return():
"""Record a customer return."""
data = request.get_json() or {}
if not data.get('inventory_id') or not data.get('quantity'):
return jsonify({'error': 'inventory_id and quantity required'}), 400
conn = get_tenant_conn(g.tenant_id)
branch_id = data.get('branch_id', g.branch_id)
op_id = record_return(conn, data['inventory_id'], branch_id,
data['quantity'], data.get('sale_id'), data.get('notes'))
conn.commit()
conn.close()
return jsonify({'operation_id': op_id, 'message': 'Return recorded'})
# ─── Physical Count (two-phase: start → approve) ──────────
@inventory_bp.route('/physical-count/start', methods=['POST'])
@require_auth('inventory.view')
def physical_count_start():
"""Start a physical count. Creates a draft that compares expected vs counted
WITHOUT making any adjustments. Returns a draft ID and comparison results.
Body: { items: [{inventory_id, counted_quantity}, ...], branch_id, notes }
"""
data = request.get_json() or {}
items = data.get('items', [])
branch_id = data.get('branch_id', g.branch_id)
notes = data.get('notes', 'Toma fisica')
if not items:
return jsonify({'error': 'items array required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Create a draft physical count record
cur.execute("""
INSERT INTO physical_counts (branch_id, status, notes, employee_id, created_at)
VALUES (%s, 'draft', %s, %s, NOW())
RETURNING id
""", (branch_id, notes, getattr(g, 'employee_id', None)))
count_id = cur.fetchone()[0]
results = []
for item in items:
inv_id = item.get('inventory_id')
counted = item.get('counted_quantity', 0)
expected = get_stock(conn, inv_id, branch_id)
diff = counted - expected
# Store each line in the draft
cur.execute("""
INSERT INTO physical_count_lines
(physical_count_id, inventory_id, expected_quantity, counted_quantity, difference)
VALUES (%s, %s, %s, %s, %s)
""", (count_id, inv_id, expected, counted, diff))
results.append({
'inventory_id': inv_id,
'expected': expected,
'counted': counted,
'difference': diff,
'needs_adjustment': diff != 0
})
conn.commit()
cur.close()
conn.close()
adjustments_needed = sum(1 for r in results if r['needs_adjustment'])
return jsonify({
'count_id': count_id,
'status': 'draft',
'message': f'Draft created. {adjustments_needed} items need adjustment.',
'results': results
})
@inventory_bp.route('/physical-count/approve', methods=['POST'])
@require_auth('inventory.adjust')
def physical_count_approve():
"""Approve a draft physical count and create ADJUST operations for all differences.
Body: { count_id: int }
Requires inventory.adjust permission.
"""
data = request.get_json() or {}
count_id = data.get('count_id')
if not count_id:
return jsonify({'error': 'count_id required'}), 400
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
# Verify draft exists and is still draft
cur.execute("SELECT branch_id, status, notes FROM physical_counts WHERE id = %s", (count_id,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
return jsonify({'error': 'Physical count not found'}), 404
branch_id, status, notes = row
if status != 'draft':
cur.close(); conn.close()
return jsonify({'error': f'Count already {status} — cannot approve again'}), 409
# Get all lines with differences
cur.execute("""
SELECT inventory_id, expected_quantity, counted_quantity, difference
FROM physical_count_lines
WHERE physical_count_id = %s AND difference != 0
""", (count_id,))
lines = cur.fetchall()
results = []
for inv_id, expected, counted, diff in lines:
record_adjustment(
conn, inv_id, branch_id, diff,
f"{notes}: contado={counted}, esperado={expected}, diferencia={diff}"
)
results.append({
'inventory_id': inv_id,
'expected': expected,
'counted': counted,
'difference': diff,
'adjusted': True
})
# Mark count as approved
cur.execute("""
UPDATE physical_counts SET status = 'approved', approved_at = NOW(),
approved_by = %s WHERE id = %s
""", (getattr(g, 'employee_id', None), count_id))
conn.commit()
cur.close()
conn.close()
return jsonify({
'count_id': count_id,
'status': 'approved',
'message': f'Physical count approved. {len(results)} adjustments created.',
'results': results
})
# ─── Alerts and History ────────────────────────
@inventory_bp.route('/alerts', methods=['GET'])
@require_auth('inventory.view')
def api_alerts():
"""Get stock alerts (zero, low, over)."""
conn = get_tenant_conn(g.tenant_id)
branch_id = request.args.get('branch_id', g.branch_id)
alerts = get_alerts(conn, branch_id)
conn.close()
return jsonify({'data': alerts, 'count': len(alerts)})
@inventory_bp.route('/items/<int:item_id>/history', methods=['GET'])
@require_auth('inventory.view')
def api_history(item_id):
"""Get movement history for an item."""
conn = get_tenant_conn(g.tenant_id)
limit = min(int(request.args.get('limit', 50)), 200)
history = get_movement_history(conn, item_id, limit)
conn.close()
return jsonify({'data': history})
# ─── Inventory Reports ────────────────────────
@inventory_bp.route('/reports/valuation', methods=['GET'])
@require_auth('inventory.view')
def report_valuation():
"""Inventory valuation report: stock x cost per item, with totals.
Returns each active item with its current stock and cost, plus the
line-level value (stock * cost) and a grand total across all items.
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
branch_id = request.args.get('branch_id', g.branch_id)
where = "i.is_active = true"
params = []
if branch_id:
where += " AND i.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.brand, i.cost, i.branch_id,
COALESCE(s.stock, 0) AS stock,
COALESCE(s.stock, 0) * COALESCE(i.cost, 0) AS value
FROM inventory i
LEFT JOIN (
SELECT inventory_id, SUM(quantity) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
WHERE {where}
ORDER BY value DESC
""", params)
items = []
grand_total = 0
for r in cur.fetchall():
val = float(r[7])
grand_total += val
items.append({
'id': r[0], 'part_number': r[1], 'name': r[2], 'brand': r[3],
'cost': float(r[4]) if r[4] else 0, 'branch_id': r[5],
'stock': r[6], 'value': round(val, 2)
})
cur.close(); conn.close()
return jsonify({'data': items, 'grand_total': round(grand_total, 2), 'item_count': len(items)})
@inventory_bp.route('/reports/abc', methods=['GET'])
@require_auth('inventory.view')
def report_abc():
"""ABC classification by sales volume (last 90 days).
A = top 80% of sales volume, B = next 15%, C = remaining 5%.
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
branch_id = request.args.get('branch_id', g.branch_id)
days = int(request.args.get('days', 90))
where_branch = ""
params = [datetime.utcnow() - timedelta(days=days)]
if branch_id:
where_branch = "AND io.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.brand,
COALESCE(ABS(SUM(io.quantity)), 0) AS sales_volume
FROM inventory i
LEFT JOIN inventory_operations io
ON io.inventory_id = i.id
AND io.operation_type = 'SALE'
AND io.created_at >= %s
{where_branch}
WHERE i.is_active = true
GROUP BY i.id, i.part_number, i.name, i.brand
ORDER BY sales_volume DESC
""", params)
rows = cur.fetchall()
total_volume = sum(r[4] for r in rows)
items = []
cumulative = 0
for r in rows:
vol = r[4]
cumulative += vol
pct = (cumulative / total_volume * 100) if total_volume > 0 else 0
if pct <= 80:
cls = 'A'
elif pct <= 95:
cls = 'B'
else:
cls = 'C'
items.append({
'id': r[0], 'part_number': r[1], 'name': r[2], 'brand': r[3],
'sales_volume': vol, 'cumulative_pct': round(pct, 1), 'classification': cls
})
cur.close(); conn.close()
a_count = sum(1 for i in items if i['classification'] == 'A')
b_count = sum(1 for i in items if i['classification'] == 'B')
c_count = sum(1 for i in items if i['classification'] == 'C')
return jsonify({
'data': items,
'summary': {'A': a_count, 'B': b_count, 'C': c_count, 'total_volume': total_volume, 'days': days}
})
@inventory_bp.route('/reports/no-movement', methods=['GET'])
@require_auth('inventory.view')
def report_no_movement():
"""Products with no inventory operations in the last N days (default 60)."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
days = int(request.args.get('days', 60))
branch_id = request.args.get('branch_id', g.branch_id)
cutoff = datetime.utcnow() - timedelta(days=days)
where_branch = ""
params_main = []
if branch_id:
where_branch = "AND i.branch_id = %s"
params_main.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.brand, i.cost,
COALESCE(s.stock, 0) AS stock,
last_op.last_date
FROM inventory i
LEFT JOIN (
SELECT inventory_id, SUM(quantity) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
LEFT JOIN (
SELECT inventory_id, MAX(created_at) AS last_date
FROM inventory_operations GROUP BY inventory_id
) last_op ON last_op.inventory_id = i.id
WHERE i.is_active = true {where_branch}
AND (last_op.last_date IS NULL OR last_op.last_date < %s)
ORDER BY last_op.last_date ASC NULLS FIRST
""", params_main + [cutoff])
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'part_number': r[1], 'name': r[2], 'brand': r[3],
'cost': float(r[4]) if r[4] else 0, 'stock': r[5],
'last_movement': str(r[6]) if r[6] else None
})
cur.close(); conn.close()
return jsonify({'data': items, 'days_threshold': days, 'count': len(items)})
@inventory_bp.route('/reports/low-stock', methods=['GET'])
@require_auth('inventory.view')
def report_low_stock():
"""Items below their min_stock threshold."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
branch_id = request.args.get('branch_id', g.branch_id)
where_branch = ""
params = []
if branch_id:
where_branch = "AND i.branch_id = %s"
params.append(branch_id)
cur.execute(f"""
SELECT i.id, i.part_number, i.name, i.brand, i.min_stock,
COALESCE(s.stock, 0) AS stock,
i.min_stock - COALESCE(s.stock, 0) AS deficit
FROM inventory i
LEFT JOIN (
SELECT inventory_id, SUM(quantity) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
WHERE i.is_active = true {where_branch}
AND i.min_stock IS NOT NULL AND i.min_stock > 0
AND COALESCE(s.stock, 0) < i.min_stock
ORDER BY deficit DESC
""", params)
items = []
for r in cur.fetchall():
items.append({
'id': r[0], 'part_number': r[1], 'name': r[2], 'brand': r[3],
'min_stock': r[4], 'stock': r[5], 'deficit': r[6]
})
cur.close(); conn.close()
return jsonify({'data': items, 'count': len(items)})
@inventory_bp.route('/reports/branch-comparison', methods=['GET'])
@require_auth('inventory.view')
def report_branch_comparison():
"""Stock comparison across all branches for each item."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT i.id, i.part_number, i.name, i.brand, i.branch_id,
b.name AS branch_name,
COALESCE(s.stock, 0) AS stock
FROM inventory i
LEFT JOIN branches b ON i.branch_id = b.id
LEFT JOIN (
SELECT inventory_id, SUM(quantity) AS stock
FROM inventory_operations GROUP BY inventory_id
) s ON s.inventory_id = i.id
WHERE i.is_active = true
ORDER BY i.part_number, b.name
""")
# Group by part_number for comparison
by_part = {}
for r in cur.fetchall():
pn = r[1]
if pn not in by_part:
by_part[pn] = {'part_number': pn, 'name': r[2], 'brand': r[3], 'branches': []}
by_part[pn]['branches'].append({
'inventory_id': r[0], 'branch_id': r[4],
'branch_name': r[5], 'stock': r[6]
})
cur.close(); conn.close()
items = list(by_part.values())
return jsonify({'data': items, 'count': len(items)})
# ─── Categories and Brands ─────────────────────
@inventory_bp.route('/categories', methods=['GET'])
@require_auth('inventory.view')
def list_categories():
"""Get distinct categories from inventory."""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT DISTINCT category_id FROM inventory
WHERE is_active = true AND category_id IS NOT NULL
ORDER BY category_id
""")
categories = [r[0] for r in cur.fetchall()]
cur.close(); conn.close()
return jsonify({'data': categories})
@inventory_bp.route('/brands', methods=['GET'])
@require_auth('inventory.view')
def list_brands():
"""Get distinct part manufacturer brands from inventory.
NOTE: These are PART manufacturers (Bosch, NGK, Monroe), not vehicle brands.
Vehicle compatibility is stored in the vehicle_compatibility JSON field and
searched via the vehicle_brand parameter on the catalog search endpoint.
"""
conn = get_tenant_conn(g.tenant_id)
cur = conn.cursor()
cur.execute("""
SELECT DISTINCT brand FROM inventory
WHERE is_active = true AND brand IS NOT NULL AND brand != ''
ORDER BY brand
""")
brands = [r[0] for r in cur.fetchall()]
cur.close(); conn.close()
return jsonify({'data': brands})
# ─── Barcode ───────────────────────────────────
@inventory_bp.route('/generate-barcode', methods=['POST'])
@require_auth('inventory.create')
def api_generate_barcode():
"""Generate a new internal barcode."""
conn = get_tenant_conn(g.tenant_id)
from tenant_db import get_master_conn
mconn = get_master_conn()
mcur = mconn.cursor()
mcur.execute("SELECT db_name FROM tenants WHERE id = %s", (g.tenant_id,))
db_name = mcur.fetchone()[0]
mcur.close(); mconn.close()
barcode = generate_barcode(conn, db_name)
conn.close()
return jsonify({'barcode': barcode})