- Add MercadoLibre OAuth, listings, orders, webhooks and category search - New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py - New marketplace_external.html/js with ML management UI - Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors - Inventory: new .btn--meli styles, select/label CSS fixes - WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog - DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue - Add Celery tasks for ML sync and webhook processing - Sidebar: MercadoLibre navigation link
58 lines
2.3 KiB
Python
58 lines
2.3 KiB
Python
# /home/Autopartes/pos/middleware.py
|
|
"""Auth middleware for POS: JWT validation + tenant resolution + permission checks."""
|
|
|
|
import jwt
|
|
from functools import wraps
|
|
from flask import request, jsonify, g
|
|
from config import JWT_SECRET
|
|
|
|
|
|
def require_auth(*required_permissions):
|
|
"""Decorator: validate JWT, resolve tenant, optionally check permissions.
|
|
|
|
Usage:
|
|
@require_auth() # any authenticated employee
|
|
@require_auth('pos.sell') # needs specific permission
|
|
@require_auth('pos.sell', 'pos.discount') # needs ALL listed permissions
|
|
"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({'error': 'Token required'}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256'])
|
|
except jwt.ExpiredSignatureError:
|
|
return jsonify({'error': 'Token expired'}), 401
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Invalid token'}), 401
|
|
|
|
if payload.get('type') not in ('pos_access', 'access'):
|
|
return jsonify({'error': 'Invalid token type'}), 401
|
|
|
|
g.tenant_id = payload['tenant_id']
|
|
g.employee_id = payload['employee_id']
|
|
g.employee_role = payload['role']
|
|
g.employee_name = payload['name']
|
|
g.branch_id = payload.get('branch_id')
|
|
g.permissions = set(payload.get('permissions', []))
|
|
g.device_id = request.headers.get('X-Device-Id', 'unknown')
|
|
|
|
# Check permissions
|
|
if required_permissions:
|
|
missing = set(required_permissions) - g.permissions
|
|
# owner role bypasses all permission checks
|
|
if g.employee_role != 'owner' and missing:
|
|
return jsonify({'error': f'Missing permissions: {", ".join(missing)}'}), 403
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
return decorator
|
|
|
|
|
|
def has_permission(permission):
|
|
"""Check if current user has a specific permission. Use inside a route."""
|
|
return g.employee_role == 'owner' or permission in g.permissions
|