Files
Autoparts-DB/dashboard/auth.py
consultoria-as 09d3304b21 feat: add JWT auth module — login, tokens, role-based middleware
Implements hash_password, check_password, create_access_token,
create_refresh_token, decode_token, and require_auth() decorator
for role-based endpoint protection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 22:24:38 +00:00

99 lines
3.1 KiB
Python

"""
JWT authentication module for Nexus Autoparts.
"""
import sys
import os
import secrets
from datetime import datetime, timedelta
from functools import wraps
import bcrypt
import jwt
import psycopg2
sys.path.insert(0, '/home/Autopartes')
from config import DB_URL, JWT_SECRET, JWT_ACCESS_EXPIRES, JWT_REFRESH_EXPIRES
from flask import request, g, jsonify
def hash_password(password):
"""Hash a password using bcrypt."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def check_password(password, hashed):
"""Verify a password against a bcrypt hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(user_id, role, business_name):
"""Create a JWT access token with 15-minute expiry."""
payload = {
'user_id': user_id,
'role': role,
'business_name': business_name,
'type': 'access',
'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_EXPIRES),
'iat': datetime.utcnow()
}
return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
def create_refresh_token(user_id):
"""Create a random refresh token and store it in the sessions table."""
token = secrets.token_urlsafe(48)
expires_at = datetime.utcnow() + timedelta(seconds=JWT_REFRESH_EXPIRES)
conn = psycopg2.connect(DB_URL)
try:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO sessions (user_id, refresh_token, expires_at, created_at)
VALUES (%s, %s, %s, %s)""",
(user_id, token, expires_at, datetime.utcnow())
)
conn.commit()
finally:
conn.close()
return token
def decode_token(token):
"""Decode a JWT token. Returns the payload dict or None if invalid/expired."""
try:
return jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
def require_auth(*allowed_roles):
"""Flask decorator that validates Bearer token, checks role, and sets g.user."""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid Authorization header'}), 401
token = auth_header[7:] # strip "Bearer "
payload = decode_token(token)
if payload is None:
return jsonify({'error': 'Invalid or expired token'}), 401
if payload.get('type') != 'access':
return jsonify({'error': 'Invalid token type'}), 401
if allowed_roles and payload.get('role') not in allowed_roles:
return jsonify({'error': 'Insufficient permissions'}), 403
g.user = {
'user_id': payload['user_id'],
'role': payload['role'],
'business_name': payload.get('business_name')
}
return f(*args, **kwargs)
return decorated
return decorator