feat(pos): add PIN auth with JWT, rate limiting, and permission middleware
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
57
pos/middleware.py
Normal file
57
pos/middleware.py
Normal file
@@ -0,0 +1,57 @@
|
||||
# /home/Autopartes/pos/middleware.py
|
||||
"""Auth middleware for POS: JWT validation + tenant resolution + permission checks."""
|
||||
|
||||
import jwt
|
||||
from functools import wraps
|
||||
from flask import request, jsonify, g
|
||||
from config import JWT_SECRET
|
||||
|
||||
|
||||
def require_auth(*required_permissions):
|
||||
"""Decorator: validate JWT, resolve tenant, optionally check permissions.
|
||||
|
||||
Usage:
|
||||
@require_auth() # any authenticated employee
|
||||
@require_auth('pos.sell') # needs specific permission
|
||||
@require_auth('pos.sell', 'pos.discount') # needs ALL listed permissions
|
||||
"""
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
auth_header = request.headers.get('Authorization', '')
|
||||
if not auth_header.startswith('Bearer '):
|
||||
return jsonify({'error': 'Token required'}), 401
|
||||
|
||||
try:
|
||||
payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256'])
|
||||
except jwt.ExpiredSignatureError:
|
||||
return jsonify({'error': 'Token expired'}), 401
|
||||
except jwt.InvalidTokenError:
|
||||
return jsonify({'error': 'Invalid token'}), 401
|
||||
|
||||
if payload.get('type') != 'pos_access':
|
||||
return jsonify({'error': 'Invalid token type'}), 401
|
||||
|
||||
g.tenant_id = payload['tenant_id']
|
||||
g.employee_id = payload['employee_id']
|
||||
g.employee_role = payload['role']
|
||||
g.employee_name = payload['name']
|
||||
g.branch_id = payload.get('branch_id')
|
||||
g.permissions = set(payload.get('permissions', []))
|
||||
g.device_id = request.headers.get('X-Device-Id', 'unknown')
|
||||
|
||||
# Check permissions
|
||||
if required_permissions:
|
||||
missing = set(required_permissions) - g.permissions
|
||||
# owner role bypasses all permission checks
|
||||
if g.employee_role != 'owner' and missing:
|
||||
return jsonify({'error': f'Missing permissions: {", ".join(missing)}'}), 403
|
||||
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
return decorator
|
||||
|
||||
|
||||
def has_permission(permission):
|
||||
"""Check if current user has a specific permission. Use inside a route."""
|
||||
return g.employee_role == 'owner' or permission in g.permissions
|
||||
Reference in New Issue
Block a user