Files
Autoparts-DB/manager/blueprints/auth_bp.py
consultoria-as be4bb8d9ad feat(manager): add Nexus Instance Manager for demo orchestration
- Complete Flask-based control panel for multi-tenant POS instances
- Dashboard with global stats, system health, and recent demos
- Demo provisioning in 1 click with auto-expiration tracking
- Tenant management: activate/deactivate, reset data, delete
- Health monitoring: PostgreSQL, Redis, disk, memory, systemd services
- Migration orchestration UI for running schema updates across all tenants
- JWT authentication with manager_users table
- Dark theme SPA frontend with real-time search and actions
- systemd service file included
2026-05-17 21:01:01 +00:00

100 lines
2.9 KiB
Python

"""Auth blueprint for Nexus Manager."""
import datetime
import jwt
import bcrypt
from flask import Blueprint, request, jsonify, current_app
from config import MANAGER_JWT_SECRET, MANAGER_JWT_EXPIRES
from services.tenant_service import get_master_conn
auth_bp = Blueprint("auth", __name__, url_prefix="/api/auth")
def hash_password(password):
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def check_password(password, hashed):
return bcrypt.checkpw(password.encode(), hashed.encode())
def create_manager_token(user_id, email, role="admin"):
payload = {
"user_id": user_id,
"email": email,
"role": role,
"type": "access",
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=MANAGER_JWT_EXPIRES),
"iat": datetime.datetime.utcnow()
}
return jwt.encode(payload, MANAGER_JWT_SECRET, algorithm="HS256")
def decode_manager_token(token):
try:
return jwt.decode(token, MANAGER_JWT_SECRET, algorithms=["HS256"])
except Exception:
return None
def require_manager_auth(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization", "")
token = None
if auth_header.startswith("Bearer "):
token = auth_header[7:]
elif request.cookies.get("manager_token"):
token = request.cookies.get("manager_token")
if not token:
return jsonify({"error": "Unauthorized"}), 401
payload = decode_manager_token(token)
if not payload or payload.get("type") != "access":
return jsonify({"error": "Invalid or expired token"}), 401
request.manager_user = payload
return f(*args, **kwargs)
return decorated
@auth_bp.route("/login", methods=["POST"])
def login():
data = request.get_json() or {}
email = data.get("email", "").strip().lower()
password = data.get("password", "")
if not email or not password:
return jsonify({"error": "Email and password required"}), 400
conn = get_master_conn()
cur = conn.cursor()
cur.execute("""
SELECT id, email, password_hash, role, name
FROM manager_users
WHERE email = %s AND is_active = true
""", (email,))
row = cur.fetchone()
cur.close()
conn.close()
if not row:
return jsonify({"error": "Invalid credentials"}), 401
user_id, db_email, pwd_hash, role, name = row
if not check_password(password, pwd_hash):
return jsonify({"error": "Invalid credentials"}), 401
token = create_manager_token(user_id, db_email, role)
return jsonify({
"access_token": token,
"user": {"id": user_id, "email": db_email, "role": role, "name": name}
})
@auth_bp.route("/me", methods=["GET"])
@require_manager_auth
def me():
return jsonify({"user": request.manager_user})