""" Módulo de seguridad para autenticación y autorización. Proporciona: - Generación y verificación de tokens JWT - Hashing de contraseñas con bcrypt - Dependencies para obtener el usuario actual - Encriptación de datos sensibles """ from datetime import datetime, timedelta, timezone from typing import Annotated, Any, Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.database import get_db # Contexto de hashing para contraseñas pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12, # Factor de costo para bcrypt ) # Esquema de autenticación Bearer bearer_scheme = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """ Genera un hash seguro de una contraseña. Args: password: Contraseña en texto plano. Returns: str: Hash bcrypt de la contraseña. """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verifica una contraseña contra su hash. Args: plain_password: Contraseña en texto plano a verificar. hashed_password: Hash almacenado de la contraseña. Returns: bool: True si la contraseña coincide. """ return pwd_context.verify(plain_password, hashed_password) def create_access_token( data: dict[str, Any], expires_delta: Optional[timedelta] = None, ) -> str: """ Crea un token JWT de acceso. Args: data: Datos a incluir en el payload del token. expires_delta: Tiempo de expiración personalizado. Returns: str: Token JWT codificado. """ to_encode = data.copy() expire = datetime.now(timezone.utc) + ( expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) ) to_encode.update({ "exp": expire, "iat": datetime.now(timezone.utc), "type": "access", }) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def create_refresh_token( data: dict[str, Any], expires_delta: Optional[timedelta] = None, ) -> str: """ Crea un token JWT de refresco. Args: data: Datos a incluir en el payload del token. expires_delta: Tiempo de expiración personalizado. Returns: str: Token JWT de refresco codificado. """ to_encode = data.copy() expire = datetime.now(timezone.utc) + ( expires_delta or timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) ) to_encode.update({ "exp": expire, "iat": datetime.now(timezone.utc), "type": "refresh", }) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def decode_token(token: str) -> dict[str, Any]: """ Decodifica y valida un token JWT. Args: token: Token JWT a decodificar. Returns: dict: Payload del token decodificado. Raises: HTTPException: Si el token es inválido o ha expirado. """ try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) return payload except JWTError as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Token inválido: {str(e)}", headers={"WWW-Authenticate": "Bearer"}, ) def verify_token_type(payload: dict[str, Any], expected_type: str) -> None: """ Verifica que el token sea del tipo esperado. Args: payload: Payload del token decodificado. expected_type: Tipo esperado ('access' o 'refresh'). Raises: HTTPException: Si el tipo de token no coincide. """ token_type = payload.get("type") if token_type != expected_type: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Tipo de token inválido. Se esperaba '{expected_type}'", headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_user( credentials: Annotated[Optional[HTTPAuthorizationCredentials], Depends(bearer_scheme)], db: Annotated[AsyncSession, Depends(get_db)], ): """ Dependency que obtiene el usuario actual desde el token JWT. Args: credentials: Credenciales Bearer del header Authorization. db: Sesión de base de datos. Returns: Usuario: Modelo del usuario autenticado. Raises: HTTPException: Si no hay token, es inválido, o el usuario no existe. """ if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="No se proporcionaron credenciales", headers={"WWW-Authenticate": "Bearer"}, ) payload = decode_token(credentials.credentials) verify_token_type(payload, "access") user_id = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido: falta el identificador de usuario", headers={"WWW-Authenticate": "Bearer"}, ) # Importar aquí para evitar importación circular from app.models.usuario import Usuario result = await db.execute(select(Usuario).where(Usuario.id == int(user_id))) user = result.scalar_one_or_none() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuario no encontrado", headers={"WWW-Authenticate": "Bearer"}, ) if not user.activo: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Usuario desactivado", ) return user async def get_current_active_admin( current_user: Annotated[Any, Depends(get_current_user)], ): """ Dependency que verifica que el usuario actual sea administrador. Args: current_user: Usuario actual obtenido del token. Returns: Usuario: Usuario administrador verificado. Raises: HTTPException: Si el usuario no es administrador. """ if not current_user.es_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Se requieren permisos de administrador", ) return current_user def encrypt_sensitive_data(data: str) -> str: """ Encripta datos sensibles (ej: contraseñas de cámaras). Por simplicidad, usa el mismo mecanismo de hash. En producción, usar Fernet o similar para encriptación reversible. Args: data: Datos a encriptar. Returns: str: Datos encriptados. """ # Para datos que necesitan ser recuperados (como passwords de cámaras), # se debería usar encriptación simétrica (Fernet) # Por ahora, usamos base64 + XOR simple como placeholder # TODO: Implementar encriptación Fernet apropiada import base64 key = settings.SECRET_KEY[:32].encode() data_bytes = data.encode() encrypted = bytes(a ^ b for a, b in zip(data_bytes, key * (len(data_bytes) // len(key) + 1))) return base64.b64encode(encrypted).decode() def decrypt_sensitive_data(encrypted_data: str) -> str: """ Desencripta datos sensibles. Args: encrypted_data: Datos encriptados. Returns: str: Datos desencriptados. """ import base64 key = settings.SECRET_KEY[:32].encode() encrypted_bytes = base64.b64decode(encrypted_data.encode()) decrypted = bytes(a ^ b for a, b in zip(encrypted_bytes, key * (len(encrypted_bytes) // len(key) + 1))) return decrypted.decode() # Type aliases para uso en endpoints CurrentUser = Annotated[Any, Depends(get_current_user)] CurrentAdmin = Annotated[Any, Depends(get_current_active_admin)]