feat: add JWT auth module — login, tokens, role-based middleware

Implements hash_password, check_password, create_access_token,
create_refresh_token, decode_token, and require_auth() decorator
for role-based endpoint protection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-18 22:24:38 +00:00
parent c5e5f6ef7e
commit 09d3304b21

98
dashboard/auth.py Normal file
View File

@@ -0,0 +1,98 @@
"""
JWT authentication module for Nexus Autoparts.
"""
import sys
import os
import secrets
from datetime import datetime, timedelta
from functools import wraps
import bcrypt
import jwt
import psycopg2
sys.path.insert(0, '/home/Autopartes')
from config import DB_URL, JWT_SECRET, JWT_ACCESS_EXPIRES, JWT_REFRESH_EXPIRES
from flask import request, g, jsonify
def hash_password(password):
"""Hash a password using bcrypt."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def check_password(password, hashed):
"""Verify a password against a bcrypt hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(user_id, role, business_name):
"""Create a JWT access token with 15-minute expiry."""
payload = {
'user_id': user_id,
'role': role,
'business_name': business_name,
'type': 'access',
'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_EXPIRES),
'iat': datetime.utcnow()
}
return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
def create_refresh_token(user_id):
"""Create a random refresh token and store it in the sessions table."""
token = secrets.token_urlsafe(48)
expires_at = datetime.utcnow() + timedelta(seconds=JWT_REFRESH_EXPIRES)
conn = psycopg2.connect(DB_URL)
try:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO sessions (user_id, refresh_token, expires_at, created_at)
VALUES (%s, %s, %s, %s)""",
(user_id, token, expires_at, datetime.utcnow())
)
conn.commit()
finally:
conn.close()
return token
def decode_token(token):
"""Decode a JWT token. Returns the payload dict or None if invalid/expired."""
try:
return jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def require_auth(*allowed_roles):
"""Flask decorator that validates Bearer token, checks role, and sets g.user."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid Authorization header'}), 401
token = auth_header[7:] # strip "Bearer "
payload = decode_token(token)
if payload is None:
return jsonify({'error': 'Invalid or expired token'}), 401
if payload.get('type') != 'access':
return jsonify({'error': 'Invalid token type'}), 401
if allowed_roles and payload.get('role') not in allowed_roles:
return jsonify({'error': 'Insufficient permissions'}), 403
g.user = {
'user_id': payload['user_id'],
'role': payload['role'],
'business_name': payload.get('business_name')
}
return f(*args, **kwargs)
return decorated
return decorator