FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con: - Backend FastAPI con PostgreSQL/TimescaleDB - Frontend React con TypeScript y TailwindCSS - App movil React Native con Expo - Soporte para dispositivos GPS, Meshtastic y celulares - Video streaming en vivo con MediaMTX - Geocercas, alertas, viajes y reportes - Autenticacion JWT y WebSockets en tiempo real Documentacion completa y guias de usuario incluidas.
This commit is contained in:
285
backend/app/core/security.py
Normal file
285
backend/app/core/security.py
Normal file
@@ -0,0 +1,285 @@
|
||||
"""
|
||||
Módulo de seguridad para autenticación y autorización.
|
||||
|
||||
Proporciona:
|
||||
- Generación y verificación de tokens JWT
|
||||
- Hashing de contraseñas con bcrypt
|
||||
- Dependencies para obtener el usuario actual
|
||||
- Encriptación de datos sensibles
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated, Any, Optional
|
||||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from jose import JWTError, jwt
|
||||
from passlib.context import CryptContext
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.database import get_db
|
||||
|
||||
# Contexto de hashing para contraseñas
|
||||
pwd_context = CryptContext(
|
||||
schemes=["bcrypt"],
|
||||
deprecated="auto",
|
||||
bcrypt__rounds=12, # Factor de costo para bcrypt
|
||||
)
|
||||
|
||||
# Esquema de autenticación Bearer
|
||||
bearer_scheme = HTTPBearer(auto_error=False)
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""
|
||||
Genera un hash seguro de una contraseña.
|
||||
|
||||
Args:
|
||||
password: Contraseña en texto plano.
|
||||
|
||||
Returns:
|
||||
str: Hash bcrypt de la contraseña.
|
||||
"""
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
"""
|
||||
Verifica una contraseña contra su hash.
|
||||
|
||||
Args:
|
||||
plain_password: Contraseña en texto plano a verificar.
|
||||
hashed_password: Hash almacenado de la contraseña.
|
||||
|
||||
Returns:
|
||||
bool: True si la contraseña coincide.
|
||||
"""
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
|
||||
|
||||
def create_access_token(
|
||||
data: dict[str, Any],
|
||||
expires_delta: Optional[timedelta] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Crea un token JWT de acceso.
|
||||
|
||||
Args:
|
||||
data: Datos a incluir en el payload del token.
|
||||
expires_delta: Tiempo de expiración personalizado.
|
||||
|
||||
Returns:
|
||||
str: Token JWT codificado.
|
||||
"""
|
||||
to_encode = data.copy()
|
||||
expire = datetime.now(timezone.utc) + (
|
||||
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
)
|
||||
to_encode.update({
|
||||
"exp": expire,
|
||||
"iat": datetime.now(timezone.utc),
|
||||
"type": "access",
|
||||
})
|
||||
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
||||
|
||||
|
||||
def create_refresh_token(
|
||||
data: dict[str, Any],
|
||||
expires_delta: Optional[timedelta] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Crea un token JWT de refresco.
|
||||
|
||||
Args:
|
||||
data: Datos a incluir en el payload del token.
|
||||
expires_delta: Tiempo de expiración personalizado.
|
||||
|
||||
Returns:
|
||||
str: Token JWT de refresco codificado.
|
||||
"""
|
||||
to_encode = data.copy()
|
||||
expire = datetime.now(timezone.utc) + (
|
||||
expires_delta or timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
|
||||
)
|
||||
to_encode.update({
|
||||
"exp": expire,
|
||||
"iat": datetime.now(timezone.utc),
|
||||
"type": "refresh",
|
||||
})
|
||||
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict[str, Any]:
|
||||
"""
|
||||
Decodifica y valida un token JWT.
|
||||
|
||||
Args:
|
||||
token: Token JWT a decodificar.
|
||||
|
||||
Returns:
|
||||
dict: Payload del token decodificado.
|
||||
|
||||
Raises:
|
||||
HTTPException: Si el token es inválido o ha expirado.
|
||||
"""
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
settings.SECRET_KEY,
|
||||
algorithms=[settings.ALGORITHM],
|
||||
)
|
||||
return payload
|
||||
except JWTError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=f"Token inválido: {str(e)}",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
|
||||
def verify_token_type(payload: dict[str, Any], expected_type: str) -> None:
|
||||
"""
|
||||
Verifica que el token sea del tipo esperado.
|
||||
|
||||
Args:
|
||||
payload: Payload del token decodificado.
|
||||
expected_type: Tipo esperado ('access' o 'refresh').
|
||||
|
||||
Raises:
|
||||
HTTPException: Si el tipo de token no coincide.
|
||||
"""
|
||||
token_type = payload.get("type")
|
||||
if token_type != expected_type:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=f"Tipo de token inválido. Se esperaba '{expected_type}'",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
|
||||
async def get_current_user(
|
||||
credentials: Annotated[Optional[HTTPAuthorizationCredentials], Depends(bearer_scheme)],
|
||||
db: Annotated[AsyncSession, Depends(get_db)],
|
||||
):
|
||||
"""
|
||||
Dependency que obtiene el usuario actual desde el token JWT.
|
||||
|
||||
Args:
|
||||
credentials: Credenciales Bearer del header Authorization.
|
||||
db: Sesión de base de datos.
|
||||
|
||||
Returns:
|
||||
Usuario: Modelo del usuario autenticado.
|
||||
|
||||
Raises:
|
||||
HTTPException: Si no hay token, es inválido, o el usuario no existe.
|
||||
"""
|
||||
if credentials is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="No se proporcionaron credenciales",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
payload = decode_token(credentials.credentials)
|
||||
verify_token_type(payload, "access")
|
||||
|
||||
user_id = payload.get("sub")
|
||||
if user_id is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Token inválido: falta el identificador de usuario",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
# Importar aquí para evitar importación circular
|
||||
from app.models.usuario import Usuario
|
||||
|
||||
result = await db.execute(select(Usuario).where(Usuario.id == int(user_id)))
|
||||
user = result.scalar_one_or_none()
|
||||
|
||||
if user is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Usuario no encontrado",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
if not user.activo:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Usuario desactivado",
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_active_admin(
|
||||
current_user: Annotated[Any, Depends(get_current_user)],
|
||||
):
|
||||
"""
|
||||
Dependency que verifica que el usuario actual sea administrador.
|
||||
|
||||
Args:
|
||||
current_user: Usuario actual obtenido del token.
|
||||
|
||||
Returns:
|
||||
Usuario: Usuario administrador verificado.
|
||||
|
||||
Raises:
|
||||
HTTPException: Si el usuario no es administrador.
|
||||
"""
|
||||
if not current_user.es_admin:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Se requieren permisos de administrador",
|
||||
)
|
||||
return current_user
|
||||
|
||||
|
||||
def encrypt_sensitive_data(data: str) -> str:
|
||||
"""
|
||||
Encripta datos sensibles (ej: contraseñas de cámaras).
|
||||
|
||||
Por simplicidad, usa el mismo mecanismo de hash.
|
||||
En producción, usar Fernet o similar para encriptación reversible.
|
||||
|
||||
Args:
|
||||
data: Datos a encriptar.
|
||||
|
||||
Returns:
|
||||
str: Datos encriptados.
|
||||
"""
|
||||
# Para datos que necesitan ser recuperados (como passwords de cámaras),
|
||||
# se debería usar encriptación simétrica (Fernet)
|
||||
# Por ahora, usamos base64 + XOR simple como placeholder
|
||||
# TODO: Implementar encriptación Fernet apropiada
|
||||
import base64
|
||||
key = settings.SECRET_KEY[:32].encode()
|
||||
data_bytes = data.encode()
|
||||
encrypted = bytes(a ^ b for a, b in zip(data_bytes, key * (len(data_bytes) // len(key) + 1)))
|
||||
return base64.b64encode(encrypted).decode()
|
||||
|
||||
|
||||
def decrypt_sensitive_data(encrypted_data: str) -> str:
|
||||
"""
|
||||
Desencripta datos sensibles.
|
||||
|
||||
Args:
|
||||
encrypted_data: Datos encriptados.
|
||||
|
||||
Returns:
|
||||
str: Datos desencriptados.
|
||||
"""
|
||||
import base64
|
||||
key = settings.SECRET_KEY[:32].encode()
|
||||
encrypted_bytes = base64.b64decode(encrypted_data.encode())
|
||||
decrypted = bytes(a ^ b for a, b in zip(encrypted_bytes, key * (len(encrypted_bytes) // len(key) + 1)))
|
||||
return decrypted.decode()
|
||||
|
||||
|
||||
# Type aliases para uso en endpoints
|
||||
CurrentUser = Annotated[Any, Depends(get_current_user)]
|
||||
CurrentAdmin = Annotated[Any, Depends(get_current_active_admin)]
|
||||
Reference in New Issue
Block a user