From 09d3304b2192b53010578d91128908784d3d8461 Mon Sep 17 00:00:00 2001 From: consultoria-as Date: Wed, 18 Mar 2026 22:24:38 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20add=20JWT=20auth=20module=20=E2=80=94?= =?UTF-8?q?=20login,=20tokens,=20role-based=20middleware?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements hash_password, check_password, create_access_token, create_refresh_token, decode_token, and require_auth() decorator for role-based endpoint protection. Co-Authored-By: Claude Opus 4.6 (1M context) --- dashboard/auth.py | 98 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 dashboard/auth.py diff --git a/dashboard/auth.py b/dashboard/auth.py new file mode 100644 index 0000000..5d7688e --- /dev/null +++ b/dashboard/auth.py @@ -0,0 +1,98 @@ +""" +JWT authentication module for Nexus Autoparts. +""" +import sys +import os +import secrets +from datetime import datetime, timedelta +from functools import wraps + +import bcrypt +import jwt +import psycopg2 + +sys.path.insert(0, '/home/Autopartes') +from config import DB_URL, JWT_SECRET, JWT_ACCESS_EXPIRES, JWT_REFRESH_EXPIRES + +from flask import request, g, jsonify + + +def hash_password(password): + """Hash a password using bcrypt.""" + return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') + + +def check_password(password, hashed): + """Verify a password against a bcrypt hash.""" + return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) + + +def create_access_token(user_id, role, business_name): + """Create a JWT access token with 15-minute expiry.""" + payload = { + 'user_id': user_id, + 'role': role, + 'business_name': business_name, + 'type': 'access', + 'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_EXPIRES), + 'iat': datetime.utcnow() + } + return jwt.encode(payload, JWT_SECRET, algorithm='HS256') + + +def create_refresh_token(user_id): + """Create a random refresh token and store it in the sessions table.""" + token = secrets.token_urlsafe(48) + expires_at = datetime.utcnow() + timedelta(seconds=JWT_REFRESH_EXPIRES) + + conn = psycopg2.connect(DB_URL) + try: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO sessions (user_id, refresh_token, expires_at, created_at) + VALUES (%s, %s, %s, %s)""", + (user_id, token, expires_at, datetime.utcnow()) + ) + conn.commit() + finally: + conn.close() + + return token + + +def decode_token(token): + """Decode a JWT token. Returns the payload dict or None if invalid/expired.""" + try: + return jwt.decode(token, JWT_SECRET, algorithms=['HS256']) + except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): + return None + + +def require_auth(*allowed_roles): + """Flask decorator that validates Bearer token, checks role, and sets g.user.""" + def decorator(f): + @wraps(f) + def decorated(*args, **kwargs): + auth_header = request.headers.get('Authorization', '') + if not auth_header.startswith('Bearer '): + return jsonify({'error': 'Missing or invalid Authorization header'}), 401 + + token = auth_header[7:] # strip "Bearer " + payload = decode_token(token) + if payload is None: + return jsonify({'error': 'Invalid or expired token'}), 401 + + if payload.get('type') != 'access': + return jsonify({'error': 'Invalid token type'}), 401 + + if allowed_roles and payload.get('role') not in allowed_roles: + return jsonify({'error': 'Insufficient permissions'}), 403 + + g.user = { + 'user_id': payload['user_id'], + 'role': payload['role'], + 'business_name': payload.get('business_name') + } + return f(*args, **kwargs) + return decorated + return decorator