179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
# /home/Autopartes/pos/blueprints/auth_bp.py
|
|
"""Auth blueprint: PIN login, JWT tokens, session management."""
|
|
|
|
import jwt
|
|
import bcrypt
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
from flask import Blueprint, request, jsonify
|
|
from config import JWT_SECRET, JWT_ACCESS_EXPIRES, PIN_MAX_ATTEMPTS_PER_MINUTE, PIN_LOCKOUT_THRESHOLD, PIN_LOCKOUT_MINUTES
|
|
from tenant_db import get_tenant_conn, get_master_conn
|
|
|
|
auth_bp = Blueprint('auth', __name__, url_prefix='/pos/api/auth')
|
|
|
|
# In-memory rate limiting (per device)
|
|
_pin_attempts = {} # device_id -> [(timestamp, success)]
|
|
|
|
|
|
def _check_rate_limit(device_id):
|
|
"""Check PIN rate limit. Returns (allowed, message)."""
|
|
now = time.time()
|
|
attempts = _pin_attempts.get(device_id, [])
|
|
|
|
# Clean old attempts (older than lockout period)
|
|
cutoff = now - (PIN_LOCKOUT_MINUTES * 60)
|
|
attempts = [a for a in attempts if a[0] > cutoff]
|
|
_pin_attempts[device_id] = attempts
|
|
|
|
# Check lockout
|
|
failed_count = sum(1 for a in attempts if not a[1])
|
|
if failed_count >= PIN_LOCKOUT_THRESHOLD:
|
|
return False, f'Dispositivo bloqueado. Intente en {PIN_LOCKOUT_MINUTES} minutos.'
|
|
|
|
# Check per-minute rate
|
|
one_min_ago = now - 60
|
|
recent = sum(1 for a in attempts if a[0] > one_min_ago and not a[1])
|
|
if recent >= PIN_MAX_ATTEMPTS_PER_MINUTE:
|
|
return False, 'Demasiados intentos. Espere un momento.'
|
|
|
|
return True, ''
|
|
|
|
|
|
def _record_attempt(device_id, success):
|
|
"""Record a PIN attempt."""
|
|
if device_id not in _pin_attempts:
|
|
_pin_attempts[device_id] = []
|
|
_pin_attempts[device_id].append((time.time(), success))
|
|
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
def login_pin():
|
|
"""Login with tenant_id + PIN + device_id."""
|
|
data = request.get_json() or {}
|
|
tenant_id = data.get('tenant_id')
|
|
pin = data.get('pin', '')
|
|
device_id = data.get('device_id', request.headers.get('X-Device-Id', 'unknown'))
|
|
# Optional: branch_id from the device for PIN search optimization
|
|
device_branch_id = data.get('branch_id')
|
|
|
|
if not tenant_id or not pin:
|
|
return jsonify({'error': 'tenant_id and pin required'}), 400
|
|
|
|
# Rate limit check
|
|
allowed, msg = _check_rate_limit(device_id)
|
|
if not allowed:
|
|
return jsonify({'error': msg}), 429
|
|
|
|
try:
|
|
conn = get_tenant_conn(tenant_id)
|
|
except ValueError:
|
|
return jsonify({'error': 'Tenant not found'}), 404
|
|
|
|
cur = conn.cursor()
|
|
|
|
# PERFORMANCE NOTE: This PIN check is O(n) over active employees because PINs are
|
|
# hashed and cannot be looked up directly. For most tenants (<100 employees) this is
|
|
# fine. If a tenant has hundreds of employees, consider:
|
|
# 1. Adding a PIN prefix index (first 2 digits stored as a non-secret hint column)
|
|
# 2. Caching active employee count and alerting if >200
|
|
#
|
|
# Short-circuit optimization: if the device is bound to a branch (branch_id known),
|
|
# query that branch's employees first to reduce the bcrypt comparison space.
|
|
|
|
matched_employee = None
|
|
|
|
if device_branch_id:
|
|
# Try branch employees first (fast path for known devices)
|
|
cur.execute("""
|
|
SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct
|
|
FROM employees e
|
|
WHERE e.is_active = true AND e.pin IS NOT NULL AND e.branch_id = %s
|
|
""", (device_branch_id,))
|
|
for emp in cur.fetchall():
|
|
emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp
|
|
if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()):
|
|
matched_employee = {
|
|
'id': emp_id, 'name': emp_name, 'role': emp_role,
|
|
'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0
|
|
}
|
|
break
|
|
|
|
if not matched_employee:
|
|
# Fallback: check ALL active employees (covers owners, admins, roaming staff)
|
|
cur.execute("""
|
|
SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct
|
|
FROM employees e
|
|
WHERE e.is_active = true AND e.pin IS NOT NULL
|
|
""")
|
|
employees = cur.fetchall()
|
|
|
|
for emp in employees:
|
|
emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp
|
|
if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()):
|
|
matched_employee = {
|
|
'id': emp_id, 'name': emp_name, 'role': emp_role,
|
|
'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0
|
|
}
|
|
break
|
|
|
|
if not matched_employee:
|
|
_record_attempt(device_id, False)
|
|
cur.close()
|
|
conn.close()
|
|
return jsonify({'error': 'PIN incorrecto'}), 401
|
|
|
|
_record_attempt(device_id, True)
|
|
|
|
# Get permissions
|
|
cur.execute(
|
|
"SELECT permission FROM employee_permissions WHERE employee_id = %s",
|
|
(matched_employee['id'],)
|
|
)
|
|
permissions = [r[0] for r in cur.fetchall()]
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
# Generate JWT
|
|
payload = {
|
|
'tenant_id': tenant_id,
|
|
'employee_id': matched_employee['id'],
|
|
'name': matched_employee['name'],
|
|
'role': matched_employee['role'],
|
|
'branch_id': matched_employee['branch_id'],
|
|
'max_discount_pct': matched_employee['max_discount_pct'],
|
|
'permissions': permissions,
|
|
'device_id': device_id,
|
|
'type': 'pos_access',
|
|
'exp': datetime.now(timezone.utc) + timedelta(seconds=JWT_ACCESS_EXPIRES),
|
|
'iat': datetime.now(timezone.utc),
|
|
}
|
|
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
|
|
|
|
return jsonify({
|
|
'token': token,
|
|
'employee': matched_employee,
|
|
'permissions': permissions
|
|
})
|
|
|
|
|
|
@auth_bp.route('/me', methods=['GET'])
|
|
def auth_me():
|
|
"""Get current employee info from token."""
|
|
auth_header = request.headers.get('Authorization', '')
|
|
if not auth_header.startswith('Bearer '):
|
|
return jsonify({'error': 'Token required'}), 401
|
|
|
|
try:
|
|
payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256'])
|
|
return jsonify({
|
|
'employee_id': payload['employee_id'],
|
|
'name': payload['name'],
|
|
'role': payload['role'],
|
|
'tenant_id': payload['tenant_id'],
|
|
'branch_id': payload.get('branch_id'),
|
|
'permissions': payload.get('permissions', [])
|
|
})
|
|
except jwt.InvalidTokenError:
|
|
return jsonify({'error': 'Invalid token'}), 401
|