Files
ATLAS/backend/app/core/security.py
FlotillasGPS Developer 51d78bacf4 FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con:
- Backend FastAPI con PostgreSQL/TimescaleDB
- Frontend React con TypeScript y TailwindCSS
- App movil React Native con Expo
- Soporte para dispositivos GPS, Meshtastic y celulares
- Video streaming en vivo con MediaMTX
- Geocercas, alertas, viajes y reportes
- Autenticacion JWT y WebSockets en tiempo real

Documentacion completa y guias de usuario incluidas.
2026-01-21 08:18:00 +00:00

286 lines
7.9 KiB
Python

"""
Módulo de seguridad para autenticación y autorización.
Proporciona:
- Generación y verificación de tokens JWT
- Hashing de contraseñas con bcrypt
- Dependencies para obtener el usuario actual
- Encriptación de datos sensibles
"""
from datetime import datetime, timedelta, timezone
from typing import Annotated, Any, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
# Contexto de hashing para contraseñas
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12, # Factor de costo para bcrypt
)
# Esquema de autenticación Bearer
bearer_scheme = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""
Genera un hash seguro de una contraseña.
Args:
password: Contraseña en texto plano.
Returns:
str: Hash bcrypt de la contraseña.
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verifica una contraseña contra su hash.
Args:
plain_password: Contraseña en texto plano a verificar.
hashed_password: Hash almacenado de la contraseña.
Returns:
bool: True si la contraseña coincide.
"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
data: dict[str, Any],
expires_delta: Optional[timedelta] = None,
) -> str:
"""
Crea un token JWT de acceso.
Args:
data: Datos a incluir en el payload del token.
expires_delta: Tiempo de expiración personalizado.
Returns:
str: Token JWT codificado.
"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"type": "access",
})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_refresh_token(
data: dict[str, Any],
expires_delta: Optional[timedelta] = None,
) -> str:
"""
Crea un token JWT de refresco.
Args:
data: Datos a incluir en el payload del token.
expires_delta: Tiempo de expiración personalizado.
Returns:
str: Token JWT de refresco codificado.
"""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
)
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"type": "refresh",
})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def decode_token(token: str) -> dict[str, Any]:
"""
Decodifica y valida un token JWT.
Args:
token: Token JWT a decodificar.
Returns:
dict: Payload del token decodificado.
Raises:
HTTPException: Si el token es inválido o ha expirado.
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token inválido: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_token_type(payload: dict[str, Any], expected_type: str) -> None:
"""
Verifica que el token sea del tipo esperado.
Args:
payload: Payload del token decodificado.
expected_type: Tipo esperado ('access' o 'refresh').
Raises:
HTTPException: Si el tipo de token no coincide.
"""
token_type = payload.get("type")
if token_type != expected_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Tipo de token inválido. Se esperaba '{expected_type}'",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user(
credentials: Annotated[Optional[HTTPAuthorizationCredentials], Depends(bearer_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""
Dependency que obtiene el usuario actual desde el token JWT.
Args:
credentials: Credenciales Bearer del header Authorization.
db: Sesión de base de datos.
Returns:
Usuario: Modelo del usuario autenticado.
Raises:
HTTPException: Si no hay token, es inválido, o el usuario no existe.
"""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No se proporcionaron credenciales",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(credentials.credentials)
verify_token_type(payload, "access")
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido: falta el identificador de usuario",
headers={"WWW-Authenticate": "Bearer"},
)
# Importar aquí para evitar importación circular
from app.models.usuario import Usuario
result = await db.execute(select(Usuario).where(Usuario.id == int(user_id)))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario no encontrado",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.activo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Usuario desactivado",
)
return user
async def get_current_active_admin(
current_user: Annotated[Any, Depends(get_current_user)],
):
"""
Dependency que verifica que el usuario actual sea administrador.
Args:
current_user: Usuario actual obtenido del token.
Returns:
Usuario: Usuario administrador verificado.
Raises:
HTTPException: Si el usuario no es administrador.
"""
if not current_user.es_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Se requieren permisos de administrador",
)
return current_user
def encrypt_sensitive_data(data: str) -> str:
"""
Encripta datos sensibles (ej: contraseñas de cámaras).
Por simplicidad, usa el mismo mecanismo de hash.
En producción, usar Fernet o similar para encriptación reversible.
Args:
data: Datos a encriptar.
Returns:
str: Datos encriptados.
"""
# Para datos que necesitan ser recuperados (como passwords de cámaras),
# se debería usar encriptación simétrica (Fernet)
# Por ahora, usamos base64 + XOR simple como placeholder
# TODO: Implementar encriptación Fernet apropiada
import base64
key = settings.SECRET_KEY[:32].encode()
data_bytes = data.encode()
encrypted = bytes(a ^ b for a, b in zip(data_bytes, key * (len(data_bytes) // len(key) + 1)))
return base64.b64encode(encrypted).decode()
def decrypt_sensitive_data(encrypted_data: str) -> str:
"""
Desencripta datos sensibles.
Args:
encrypted_data: Datos encriptados.
Returns:
str: Datos desencriptados.
"""
import base64
key = settings.SECRET_KEY[:32].encode()
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted = bytes(a ^ b for a, b in zip(encrypted_bytes, key * (len(encrypted_bytes) // len(key) + 1)))
return decrypted.decode()
# Type aliases para uso en endpoints
CurrentUser = Annotated[Any, Depends(get_current_user)]
CurrentAdmin = Annotated[Any, Depends(get_current_active_admin)]