# /home/Autopartes/pos/blueprints/auth_bp.py """Auth blueprint: PIN login, JWT tokens, session management.""" import jwt import bcrypt import time from datetime import datetime, timezone, timedelta from flask import Blueprint, request, jsonify from config import JWT_SECRET, JWT_ACCESS_EXPIRES, PIN_MAX_ATTEMPTS_PER_MINUTE, PIN_LOCKOUT_THRESHOLD, PIN_LOCKOUT_MINUTES from tenant_db import get_tenant_conn, get_master_conn auth_bp = Blueprint('auth', __name__, url_prefix='/pos/api/auth') # In-memory rate limiting (per device) _pin_attempts = {} # device_id -> [(timestamp, success)] def _check_rate_limit(device_id): """Check PIN rate limit. Returns (allowed, message).""" now = time.time() attempts = _pin_attempts.get(device_id, []) # Clean old attempts (older than lockout period) cutoff = now - (PIN_LOCKOUT_MINUTES * 60) attempts = [a for a in attempts if a[0] > cutoff] _pin_attempts[device_id] = attempts # Check lockout failed_count = sum(1 for a in attempts if not a[1]) if failed_count >= PIN_LOCKOUT_THRESHOLD: return False, f'Dispositivo bloqueado. Intente en {PIN_LOCKOUT_MINUTES} minutos.' # Check per-minute rate one_min_ago = now - 60 recent = sum(1 for a in attempts if a[0] > one_min_ago and not a[1]) if recent >= PIN_MAX_ATTEMPTS_PER_MINUTE: return False, 'Demasiados intentos. Espere un momento.' return True, '' def _record_attempt(device_id, success): """Record a PIN attempt.""" if device_id not in _pin_attempts: _pin_attempts[device_id] = [] _pin_attempts[device_id].append((time.time(), success)) @auth_bp.route('/login', methods=['POST']) def login_pin(): """Login with tenant_id + PIN + device_id.""" data = request.get_json() or {} tenant_id = data.get('tenant_id') pin = data.get('pin', '') device_id = data.get('device_id', request.headers.get('X-Device-Id', 'unknown')) # Optional: branch_id from the device for PIN search optimization device_branch_id = data.get('branch_id') if not tenant_id or not pin: return jsonify({'error': 'tenant_id and pin required'}), 400 # Rate limit check allowed, msg = _check_rate_limit(device_id) if not allowed: return jsonify({'error': msg}), 429 try: conn = get_tenant_conn(tenant_id) except ValueError: return jsonify({'error': 'Tenant not found'}), 404 cur = conn.cursor() # PERFORMANCE NOTE: This PIN check is O(n) over active employees because PINs are # hashed and cannot be looked up directly. For most tenants (<100 employees) this is # fine. If a tenant has hundreds of employees, consider: # 1. Adding a PIN prefix index (first 2 digits stored as a non-secret hint column) # 2. Caching active employee count and alerting if >200 # # Short-circuit optimization: if the device is bound to a branch (branch_id known), # query that branch's employees first to reduce the bcrypt comparison space. matched_employee = None if device_branch_id: # Try branch employees first (fast path for known devices) cur.execute(""" SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct FROM employees e WHERE e.is_active = true AND e.pin IS NOT NULL AND e.branch_id = %s """, (device_branch_id,)) for emp in cur.fetchall(): emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()): matched_employee = { 'id': emp_id, 'name': emp_name, 'role': emp_role, 'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0 } break if not matched_employee: # Fallback: check ALL active employees (covers owners, admins, roaming staff) cur.execute(""" SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct FROM employees e WHERE e.is_active = true AND e.pin IS NOT NULL """) employees = cur.fetchall() for emp in employees: emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()): matched_employee = { 'id': emp_id, 'name': emp_name, 'role': emp_role, 'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0 } break if not matched_employee: _record_attempt(device_id, False) cur.close() conn.close() return jsonify({'error': 'PIN incorrecto'}), 401 _record_attempt(device_id, True) # Get permissions cur.execute( "SELECT permission FROM employee_permissions WHERE employee_id = %s", (matched_employee['id'],) ) permissions = [r[0] for r in cur.fetchall()] cur.close() conn.close() # Generate JWT payload = { 'tenant_id': tenant_id, 'employee_id': matched_employee['id'], 'name': matched_employee['name'], 'role': matched_employee['role'], 'branch_id': matched_employee['branch_id'], 'max_discount_pct': matched_employee['max_discount_pct'], 'permissions': permissions, 'device_id': device_id, 'type': 'pos_access', 'exp': datetime.now(timezone.utc) + timedelta(seconds=JWT_ACCESS_EXPIRES), 'iat': datetime.now(timezone.utc), } token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') return jsonify({ 'token': token, 'employee': matched_employee, 'permissions': permissions }) @auth_bp.route('/me', methods=['GET']) def auth_me(): """Get current employee info from token.""" auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return jsonify({'error': 'Token required'}), 401 try: payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256']) return jsonify({ 'employee_id': payload['employee_id'], 'name': payload['name'], 'role': payload['role'], 'tenant_id': payload['tenant_id'], 'branch_id': payload.get('branch_id'), 'permissions': payload.get('permissions', []) }) except jwt.InvalidTokenError: return jsonify({'error': 'Invalid token'}), 401