Files
Autoparts-DB/pos/blueprints/auth_bp.py
consultoria-as 6628f2deef feat: subdomain routing por tenant — refac-xxx.nexusautoparts.com
- Nginx wildcard config: *.nexusautoparts.com routes to POS app with X-Tenant-Subdomain header
- middleware_tenant.py: resolves subdomain -> tenant_id via nexus_master.tenants.subdomain
- auth_bp: login and employee list endpoints accept tenant from subdomain, URL param, or body
- login.html: auto-detects tenant from subdomain, shows business name, falls back to ?tenant=ID
- tenant_manager: generates subdomain slug from business name on provision_tenant()
- Migration v1.2: adds subdomain column + unique index to tenants table
- setup-nginx.sh: one-command install script for the nginx config

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 07:16:49 +00:00

232 lines
8.2 KiB
Python

# /home/Autopartes/pos/blueprints/auth_bp.py
"""Auth blueprint: PIN login, JWT tokens, session management."""
import jwt
import bcrypt
import time
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, jsonify, g
from config import JWT_SECRET, JWT_ACCESS_EXPIRES, PIN_MAX_ATTEMPTS_PER_MINUTE, PIN_LOCKOUT_THRESHOLD, PIN_LOCKOUT_MINUTES
from tenant_db import get_tenant_conn, get_master_conn
auth_bp = Blueprint('auth', __name__, url_prefix='/pos/api/auth')
# In-memory rate limiting (per device)
_pin_attempts = {} # device_id -> [(timestamp, success)]
def _check_rate_limit(device_id):
"""Check PIN rate limit. Returns (allowed, message)."""
now = time.time()
attempts = _pin_attempts.get(device_id, [])
# Clean old attempts (older than lockout period)
cutoff = now - (PIN_LOCKOUT_MINUTES * 60)
attempts = [a for a in attempts if a[0] > cutoff]
_pin_attempts[device_id] = attempts
# Check lockout
failed_count = sum(1 for a in attempts if not a[1])
if failed_count >= PIN_LOCKOUT_THRESHOLD:
return False, f'Dispositivo bloqueado. Intente en {PIN_LOCKOUT_MINUTES} minutos.'
# Check per-minute rate
one_min_ago = now - 60
recent = sum(1 for a in attempts if a[0] > one_min_ago and not a[1])
if recent >= PIN_MAX_ATTEMPTS_PER_MINUTE:
return False, 'Demasiados intentos. Espere un momento.'
return True, ''
def _record_attempt(device_id, success):
"""Record a PIN attempt."""
if device_id not in _pin_attempts:
_pin_attempts[device_id] = []
_pin_attempts[device_id].append((time.time(), success))
@auth_bp.route('/login', methods=['POST'])
def login_pin():
"""Login with tenant_id + PIN + device_id.
tenant_id can come from:
1. Subdomain (resolved by middleware_tenant into g.tenant_id)
2. POST body tenant_id field
3. Both (subdomain takes precedence)
"""
data = request.get_json() or {}
# Subdomain-resolved tenant takes priority over body param
tenant_id = getattr(g, 'tenant_id', None) or data.get('tenant_id')
pin = data.get('pin', '')
device_id = data.get('device_id', request.headers.get('X-Device-Id', 'unknown'))
# Optional: branch_id from the device for PIN search optimization
device_branch_id = data.get('branch_id')
if not tenant_id or not pin:
return jsonify({'error': 'tenant_id and pin required'}), 400
# Rate limit check
allowed, msg = _check_rate_limit(device_id)
if not allowed:
return jsonify({'error': msg}), 429
try:
conn = get_tenant_conn(tenant_id)
except ValueError:
return jsonify({'error': 'Tenant not found'}), 404
cur = conn.cursor()
# PERFORMANCE NOTE: This PIN check is O(n) over active employees because PINs are
# hashed and cannot be looked up directly. For most tenants (<100 employees) this is
# fine. If a tenant has hundreds of employees, consider:
# 1. Adding a PIN prefix index (first 2 digits stored as a non-secret hint column)
# 2. Caching active employee count and alerting if >200
#
# Short-circuit optimization: if the device is bound to a branch (branch_id known),
# query that branch's employees first to reduce the bcrypt comparison space.
matched_employee = None
if device_branch_id:
# Try branch employees first (fast path for known devices)
cur.execute("""
SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct
FROM employees e
WHERE e.is_active = true AND e.pin IS NOT NULL AND e.branch_id = %s
""", (device_branch_id,))
for emp in cur.fetchall():
emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp
if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()):
matched_employee = {
'id': emp_id, 'name': emp_name, 'role': emp_role,
'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0
}
break
if not matched_employee:
# Fallback: check ALL active employees (covers owners, admins, roaming staff)
cur.execute("""
SELECT e.id, e.name, e.pin, e.role, e.branch_id, e.max_discount_pct
FROM employees e
WHERE e.is_active = true AND e.pin IS NOT NULL
""")
employees = cur.fetchall()
for emp in employees:
emp_id, emp_name, emp_pin_hash, emp_role, emp_branch, emp_discount = emp
if emp_pin_hash and bcrypt.checkpw(pin.encode(), emp_pin_hash.encode()):
matched_employee = {
'id': emp_id, 'name': emp_name, 'role': emp_role,
'branch_id': emp_branch, 'max_discount_pct': float(emp_discount) if emp_discount else 0
}
break
if not matched_employee:
_record_attempt(device_id, False)
cur.close()
conn.close()
return jsonify({'error': 'PIN incorrecto'}), 401
_record_attempt(device_id, True)
# Get permissions
cur.execute(
"SELECT permission FROM employee_permissions WHERE employee_id = %s",
(matched_employee['id'],)
)
permissions = [r[0] for r in cur.fetchall()]
cur.close()
conn.close()
# Generate JWT
payload = {
'tenant_id': tenant_id,
'employee_id': matched_employee['id'],
'name': matched_employee['name'],
'role': matched_employee['role'],
'branch_id': matched_employee['branch_id'],
'max_discount_pct': matched_employee['max_discount_pct'],
'permissions': permissions,
'device_id': device_id,
'type': 'pos_access',
'exp': datetime.now(timezone.utc) + timedelta(seconds=JWT_ACCESS_EXPIRES),
'iat': datetime.now(timezone.utc),
}
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
return jsonify({
'token': token,
'employee': matched_employee,
'permissions': permissions
})
@auth_bp.route('/employees/<int:tenant_id>', methods=['GET'])
@auth_bp.route('/employees', methods=['GET'])
def list_login_employees(tenant_id=None):
"""Public endpoint: list employees for the login screen (names + roles only, no sensitive data).
tenant_id comes from URL path, subdomain, or ?tenant= param.
"""
# Resolve tenant_id: URL path > subdomain > query param
tid = tenant_id or getattr(g, 'tenant_id', None)
if not tid:
try:
tid = int(request.args.get('tenant', 0))
except (ValueError, TypeError):
pass
if not tid:
return jsonify({'error': 'Tenant not specified'}), 400
try:
conn = get_tenant_conn(tid)
except ValueError:
return jsonify({'error': 'Tenant not found'}), 404
cur = conn.cursor()
cur.execute("""
SELECT id, name, role FROM employees
WHERE is_active = true AND pin IS NOT NULL
ORDER BY name
""")
employees = []
for row in cur.fetchall():
name = row[1]
parts = name.split()
initials = ''.join([p[0].upper() for p in parts[:2]]) if parts else '?'
role_labels = {'owner': 'Dueño', 'admin': 'Administrador', 'cashier': 'Cajero', 'warehouse': 'Almacén', 'accountant': 'Contador'}
employees.append({
'id': row[0],
'name': name,
'initials': initials,
'role': row[2],
'role_label': role_labels.get(row[2], row[2])
})
cur.close()
conn.close()
return jsonify({'data': employees})
@auth_bp.route('/me', methods=['GET'])
def auth_me():
"""Get current employee info from token."""
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Token required'}), 401
try:
payload = jwt.decode(auth_header[7:], JWT_SECRET, algorithms=['HS256'])
return jsonify({
'employee_id': payload['employee_id'],
'name': payload['name'],
'role': payload['role'],
'tenant_id': payload['tenant_id'],
'branch_id': payload.get('branch_id'),
'permissions': payload.get('permissions', [])
})
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401