Files
Autoparts-DB/pos/middleware.py
consultoria-as a236187f3a feat: MercadoLibre integration + inventory bulk publish + WhatsApp bridge fixes
- Add MercadoLibre OAuth, listings, orders, webhooks and category search
- New marketplace_external_bp.py, meli_service.py, marketplace_external_service.py
- New marketplace_external.html/js with ML management UI
- Inventory: bulk publish to ML with category autocomplete, listing type and shipping selectors
- Inventory: new .btn--meli styles, select/label CSS fixes
- WhatsApp bridge: rate limiting, 440/515/408 error handling, stale watchdog
- DB migration v3.4_meli_integration.sql for marketplace_listings, orders, sync_queue
- Add Celery tasks for ML sync and webhook processing
- Sidebar: MercadoLibre navigation link
2026-05-26 04:24:07 +00:00

58 lines
2.3 KiB
Python

# /home/Autopartes/pos/middleware.py
"""Auth middleware for POS: JWT validation + tenant resolution + permission checks."""
import jwt
from functools import wraps
from flask import request, jsonify, g
from config import JWT_SECRET
def require_auth(*required_permissions):
"""Decorator: validate JWT, resolve tenant, optionally check permissions.
Usage:
@require_auth() # any authenticated employee
@require_auth('pos.sell') # needs specific permission
@require_auth('pos.sell', 'pos.discount') # needs ALL listed permissions
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Token required'}), 401
try:
payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
if payload.get('type') not in ('pos_access', 'access'):
return jsonify({'error': 'Invalid token type'}), 401
g.tenant_id = payload['tenant_id']
g.employee_id = payload['employee_id']
g.employee_role = payload['role']
g.employee_name = payload['name']
g.branch_id = payload.get('branch_id')
g.permissions = set(payload.get('permissions', []))
g.device_id = request.headers.get('X-Device-Id', 'unknown')
# Check permissions
if required_permissions:
missing = set(required_permissions) - g.permissions
# owner role bypasses all permission checks
if g.employee_role != 'owner' and missing:
return jsonify({'error': f'Missing permissions: {", ".join(missing)}'}), 403
return f(*args, **kwargs)
return decorated
return decorator
def has_permission(permission):
"""Check if current user has a specific permission. Use inside a route."""
return g.employee_role == 'owner' or permission in g.permissions