""" JWT authentication module for Nexus Autoparts. """ import sys import os import secrets from datetime import datetime, timedelta from functools import wraps import bcrypt import jwt import psycopg2 sys.path.insert(0, '/home/Autopartes') from config import DB_URL, JWT_SECRET, JWT_ACCESS_EXPIRES, JWT_REFRESH_EXPIRES from flask import request, g, jsonify def hash_password(password): """Hash a password using bcrypt.""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def check_password(password, hashed): """Verify a password against a bcrypt hash.""" return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def create_access_token(user_id, role, business_name): """Create a JWT access token with 15-minute expiry.""" payload = { 'user_id': user_id, 'role': role, 'business_name': business_name, 'type': 'access', 'exp': datetime.utcnow() + timedelta(seconds=JWT_ACCESS_EXPIRES), 'iat': datetime.utcnow() } return jwt.encode(payload, JWT_SECRET, algorithm='HS256') def create_refresh_token(user_id): """Create a random refresh token and store it in the sessions table.""" token = secrets.token_urlsafe(48) expires_at = datetime.utcnow() + timedelta(seconds=JWT_REFRESH_EXPIRES) conn = psycopg2.connect(DB_URL) try: with conn.cursor() as cur: cur.execute( """INSERT INTO sessions (user_id, refresh_token, expires_at, created_at) VALUES (%s, %s, %s, %s)""", (user_id, token, expires_at, datetime.utcnow()) ) conn.commit() finally: conn.close() return token def decode_token(token): """Decode a JWT token. Returns the payload dict or None if invalid/expired.""" try: return jwt.decode(token, JWT_SECRET, algorithms=['HS256']) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return None def require_auth(*allowed_roles): """Flask decorator that validates Bearer token, checks role, and sets g.user.""" def decorator(f): @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid Authorization header'}), 401 token = auth_header[7:] # strip "Bearer " payload = decode_token(token) if payload is None: return jsonify({'error': 'Invalid or expired token'}), 401 if payload.get('type') != 'access': return jsonify({'error': 'Invalid token type'}), 401 if allowed_roles and payload.get('role') not in allowed_roles: return jsonify({'error': 'Insufficient permissions'}), 403 g.user = { 'user_id': payload['user_id'], 'role': payload['role'], 'business_name': payload.get('business_name') } return f(*args, **kwargs) return decorated return decorator