diff --git a/pos/app.py b/pos/app.py index 1728a7a..0fa19e5 100644 --- a/pos/app.py +++ b/pos/app.py @@ -3,6 +3,11 @@ from flask import Flask def create_app(): app = Flask(__name__) + # Register blueprints + from blueprints.auth_bp import auth_bp + app.register_blueprint(auth_bp) + + # Health check @app.route('/pos/health') def health(): return {'status': 'ok'} diff --git a/pos/blueprints/auth_bp.py b/pos/blueprints/auth_bp.py new file mode 100644 index 0000000..3dd1870 --- /dev/null +++ b/pos/blueprints/auth_bp.py @@ -0,0 +1,178 @@ +# /home/Autopartes/pos/blueprints/auth_bp.py +"""Auth blueprint: PIN login, JWT tokens, session management.""" + +import jwt +import bcrypt +import time +from datetime import datetime, timezone, timedelta +from flask import Blueprint, request, jsonify +from config import JWT_SECRET, JWT_ACCESS_EXPIRES, PIN_MAX_ATTEMPTS_PER_MINUTE, PIN_LOCKOUT_THRESHOLD, PIN_LOCKOUT_MINUTES +from tenant_db import get_tenant_conn, get_master_conn + +auth_bp = Blueprint('auth', __name__, url_prefix='/pos/api/auth') + +# In-memory rate limiting (per device) +_pin_attempts = {} # device_id -> [(timestamp, success)] + + +def _check_rate_limit(device_id): + """Check PIN rate limit. Returns (allowed, message).""" + now = time.time() + attempts = _pin_attempts.get(device_id, []) + + # Clean old attempts (older than lockout period) + cutoff = now - (PIN_LOCKOUT_MINUTES * 60) + attempts = [a for a in attempts if a[0] > cutoff] + _pin_attempts[device_id] = attempts + + # Check lockout + failed_count = sum(1 for a in attempts if not a[1]) + if failed_count >= PIN_LOCKOUT_THRESHOLD: + return False, f'Dispositivo bloqueado. Intente en {PIN_LOCKOUT_MINUTES} minutos.' + + # Check per-minute rate + one_min_ago = now - 60 + recent = sum(1 for a in attempts if a[0] > one_min_ago and not a[1]) + if recent >= PIN_MAX_ATTEMPTS_PER_MINUTE: + return False, 'Demasiados intentos. Espere un momento.' + + return True, '' + + +def _record_attempt(device_id, success): + """Record a PIN attempt.""" + if device_id not in _pin_attempts: + _pin_attempts[device_id] = [] + _pin_attempts[device_id].append((time.time(), success)) + + +@auth_bp.route('/login', methods=['POST']) +def login_pin(): + """Login with tenant_id + PIN + device_id.""" + data = request.get_json() or {} + tenant_id = data.get('tenant_id') + pin = data.get('pin', '') + device_id = data.get('device_id', request.headers.get('X-Device-Id', 'unknown')) + # Optional: branch_id from the device for PIN search optimization + device_branch_id = data.get('branch_id') + + if not tenant_id or not pin: + return jsonify({'error': 'tenant_id and pin required'}), 400 + + # Rate limit check + allowed, msg = _check_rate_limit(device_id) + if not allowed: + return jsonify({'error': msg}), 429 + + try: + conn = get_tenant_conn(tenant_id) + except ValueError: + return jsonify({'error': 'Tenant not found'}), 404 + + cur = conn.cursor() + + # PERFORMANCE NOTE: This PIN check is O(n) over active employees because PINs are + # hashed and cannot be looked up directly. For most tenants (<100 employees) this is + # fine. If a tenant has hundreds of employees, consider: + # 1. Adding a PIN prefix index (first 2 digits stored as a non-secret hint column) + # 2. Caching active employee count and alerting if >200 + # + # Short-circuit optimization: if the device is bound to a branch (branch_id known), + # query that branch's employees first to reduce the bcrypt comparison space. + + matched_employee = None + + if device_branch_id: + # Try branch employees first (fast path for known devices) + cur.execute(""" + SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct + FROM employees e + WHERE e.is_active = true AND e.pin IS NOT NULL AND e.branch_id = %s + """, (device_branch_id,)) + for emp in cur.fetchall(): + emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp + if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()): + matched_employee = { + 'id': emp_id, 'name': emp_name, 'role': emp_role, + 'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0 + } + break + + if not matched_employee: + # Fallback: check ALL active employees (covers owners, admins, roaming staff) + cur.execute(""" + SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct + FROM employees e + WHERE e.is_active = true AND e.pin IS NOT NULL + """) + employees = cur.fetchall() + + for emp in employees: + emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp + if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()): + matched_employee = { + 'id': emp_id, 'name': emp_name, 'role': emp_role, + 'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0 + } + break + + if not matched_employee: + _record_attempt(device_id, False) + cur.close() + conn.close() + return jsonify({'error': 'PIN incorrecto'}), 401 + + _record_attempt(device_id, True) + + # Get permissions + cur.execute( + "SELECT permission FROM employee_permissions WHERE employee_id = %s", + (matched_employee['id'],) + ) + permissions = [r[0] for r in cur.fetchall()] + + cur.close() + conn.close() + + # Generate JWT + payload = { + 'tenant_id': tenant_id, + 'employee_id': matched_employee['id'], + 'name': matched_employee['name'], + 'role': matched_employee['role'], + 'branch_id': matched_employee['branch_id'], + 'max_discount_pct': matched_employee['max_discount_pct'], + 'permissions': permissions, + 'device_id': device_id, + 'type': 'pos_access', + 'exp': datetime.now(timezone.utc) + timedelta(seconds=JWT_ACCESS_EXPIRES), + 'iat': datetime.now(timezone.utc), + } + token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') + + return jsonify({ + 'token': token, + 'employee': matched_employee, + 'permissions': permissions + }) + + +@auth_bp.route('/me', methods=['GET']) +def auth_me(): + """Get current employee info from token.""" + auth_header = request.headers.get('Authorization', '') + if not auth_header.startswith('Bearer '): + return jsonify({'error': 'Token required'}), 401 + + try: + payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256']) + return jsonify({ + 'employee_id': payload['employee_id'], + 'name': payload['name'], + 'role': payload['role'], + 'tenant_id': payload['tenant_id'], + 'branch_id': payload.get('branch_id'), + 'permissions': payload.get('permissions', []) + }) + except jwt.InvalidTokenError: + return jsonify({'error': 'Invalid token'}), 401 diff --git a/pos/middleware.py b/pos/middleware.py new file mode 100644 index 0000000..15df096 --- /dev/null +++ b/pos/middleware.py @@ -0,0 +1,57 @@ +# /home/Autopartes/pos/middleware.py +"""Auth middleware for POS: JWT validation + tenant resolution + permission checks.""" + +import jwt +from functools import wraps +from flask import request, jsonify, g +from config import JWT_SECRET + + +def require_auth(*required_permissions): + """Decorator: validate JWT, resolve tenant, optionally check permissions. + + Usage: + @require_auth() # any authenticated employee + @require_auth('pos.sell') # needs specific permission + @require_auth('pos.sell', 'pos.discount') # needs ALL listed permissions + """ + def decorator(f): + @wraps(f) + def decorated(*args, **kwargs): + auth_header = request.headers.get('Authorization', '') + if not auth_header.startswith('Bearer '): + return jsonify({'error': 'Token required'}), 401 + + try: + payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256']) + except jwt.ExpiredSignatureError: + return jsonify({'error': 'Token expired'}), 401 + except jwt.InvalidTokenError: + return jsonify({'error': 'Invalid token'}), 401 + + if payload.get('type') != 'pos_access': + return jsonify({'error': 'Invalid token type'}), 401 + + g.tenant_id = payload['tenant_id'] + g.employee_id = payload['employee_id'] + g.employee_role = payload['role'] + g.employee_name = payload['name'] + g.branch_id = payload.get('branch_id') + g.permissions = set(payload.get('permissions', [])) + g.device_id = request.headers.get('X-Device-Id', 'unknown') + + # Check permissions + if required_permissions: + missing = set(required_permissions) - g.permissions + # owner role bypasses all permission checks + if g.employee_role != 'owner' and missing: + return jsonify({'error': f'Missing permissions: {", ".join(missing)}'}), 403 + + return f(*args, **kwargs) + return decorated + return decorator + + +def has_permission(permission): + """Check if current user has a specific permission. Use inside a route.""" + return g.employee_role == 'owner' or permission in g.permissions