- Complete Flask-based control panel for multi-tenant POS instances - Dashboard with global stats, system health, and recent demos - Demo provisioning in 1 click with auto-expiration tracking - Tenant management: activate/deactivate, reset data, delete - Health monitoring: PostgreSQL, Redis, disk, memory, systemd services - Migration orchestration UI for running schema updates across all tenants - JWT authentication with manager_users table - Dark theme SPA frontend with real-time search and actions - systemd service file included
100 lines
2.9 KiB
Python
100 lines
2.9 KiB
Python
"""Auth blueprint for Nexus Manager."""
|
|
import datetime
|
|
import jwt
|
|
import bcrypt
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
from config import MANAGER_JWT_SECRET, MANAGER_JWT_EXPIRES
|
|
from services.tenant_service import get_master_conn
|
|
|
|
auth_bp = Blueprint("auth", __name__, url_prefix="/api/auth")
|
|
|
|
|
|
def hash_password(password):
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def check_password(password, hashed):
|
|
return bcrypt.checkpw(password.encode(), hashed.encode())
|
|
|
|
|
|
def create_manager_token(user_id, email, role="admin"):
|
|
payload = {
|
|
"user_id": user_id,
|
|
"email": email,
|
|
"role": role,
|
|
"type": "access",
|
|
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=MANAGER_JWT_EXPIRES),
|
|
"iat": datetime.datetime.utcnow()
|
|
}
|
|
return jwt.encode(payload, MANAGER_JWT_SECRET, algorithm="HS256")
|
|
|
|
|
|
def decode_manager_token(token):
|
|
try:
|
|
return jwt.decode(token, MANAGER_JWT_SECRET, algorithms=["HS256"])
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def require_manager_auth(f):
|
|
from functools import wraps
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth_header = request.headers.get("Authorization", "")
|
|
token = None
|
|
if auth_header.startswith("Bearer "):
|
|
token = auth_header[7:]
|
|
elif request.cookies.get("manager_token"):
|
|
token = request.cookies.get("manager_token")
|
|
|
|
if not token:
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
|
|
payload = decode_manager_token(token)
|
|
if not payload or payload.get("type") != "access":
|
|
return jsonify({"error": "Invalid or expired token"}), 401
|
|
|
|
request.manager_user = payload
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
|
|
@auth_bp.route("/login", methods=["POST"])
|
|
def login():
|
|
data = request.get_json() or {}
|
|
email = data.get("email", "").strip().lower()
|
|
password = data.get("password", "")
|
|
|
|
if not email or not password:
|
|
return jsonify({"error": "Email and password required"}), 400
|
|
|
|
conn = get_master_conn()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT id, email, password_hash, role, name
|
|
FROM manager_users
|
|
WHERE email = %s AND is_active = true
|
|
""", (email,))
|
|
row = cur.fetchone()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if not row:
|
|
return jsonify({"error": "Invalid credentials"}), 401
|
|
|
|
user_id, db_email, pwd_hash, role, name = row
|
|
if not check_password(password, pwd_hash):
|
|
return jsonify({"error": "Invalid credentials"}), 401
|
|
|
|
token = create_manager_token(user_id, db_email, role)
|
|
return jsonify({
|
|
"access_token": token,
|
|
"user": {"id": user_id, "email": db_email, "role": role, "name": name}
|
|
})
|
|
|
|
|
|
@auth_bp.route("/me", methods=["GET"])
|
|
@require_manager_auth
|
|
def me():
|
|
return jsonify({"user": request.manager_user})
|