Files
ATLAS/backend/app/api/v1/auth.py
FlotillasGPS Developer 51d78bacf4 FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con:
- Backend FastAPI con PostgreSQL/TimescaleDB
- Frontend React con TypeScript y TailwindCSS
- App movil React Native con Expo
- Soporte para dispositivos GPS, Meshtastic y celulares
- Video streaming en vivo con MediaMTX
- Geocercas, alertas, viajes y reportes
- Autenticacion JWT y WebSockets en tiempo real

Documentacion completa y guias de usuario incluidas.
2026-01-21 08:18:00 +00:00

275 lines
7.1 KiB
Python

"""
Endpoints de autenticación.
"""
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import (
create_access_token,
create_refresh_token,
decode_token,
hash_password,
verify_password,
verify_token_type,
get_current_user,
)
from app.core.config import settings
from app.models.usuario import Usuario
from app.schemas.usuario import (
LoginRequest,
LoginResponse,
RefreshTokenRequest,
TokenResponse,
UsuarioCreate,
UsuarioResponse,
UsuarioUpdate,
UsuarioUpdatePassword,
)
router = APIRouter(prefix="/auth", tags=["Autenticacion"])
@router.post("/login", response_model=LoginResponse)
async def login(
request: LoginRequest,
db: AsyncSession = Depends(get_db),
):
"""
Autentica un usuario y devuelve tokens JWT.
Args:
request: Credenciales de login.
db: Sesión de base de datos.
Returns:
Tokens de acceso y refresco.
"""
# Buscar usuario por email
result = await db.execute(
select(Usuario).where(Usuario.email == request.email)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
)
if not verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Email o contraseña incorrectos",
)
if not user.activo:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Usuario desactivado",
)
# Actualizar último acceso
user.ultimo_acceso = datetime.now(timezone.utc)
await db.commit()
# Generar tokens
token_data = {"sub": str(user.id), "email": user.email}
access_token = create_access_token(token_data)
refresh_token = create_refresh_token(token_data)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=UsuarioResponse.model_validate(user),
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
request: RefreshTokenRequest,
db: AsyncSession = Depends(get_db),
):
"""
Renueva los tokens usando el refresh token.
Args:
request: Token de refresco.
db: Sesión de base de datos.
Returns:
Nuevos tokens de acceso y refresco.
"""
# Decodificar y verificar el refresh token
payload = decode_token(request.refresh_token)
verify_token_type(payload, "refresh")
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido",
)
# Verificar que el usuario existe y está activo
result = await db.execute(
select(Usuario).where(Usuario.id == int(user_id))
)
user = result.scalar_one_or_none()
if not user or not user.activo:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario no válido",
)
# Generar nuevos tokens
token_data = {"sub": str(user.id), "email": user.email}
access_token = create_access_token(token_data)
refresh_token = create_refresh_token(token_data)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
@router.post("/logout")
async def logout():
"""
Cierra la sesión del usuario.
En una implementación con blacklist de tokens, aquí se
agregaría el token a la lista negra.
Returns:
Mensaje de confirmación.
"""
# TODO: Implementar blacklist de tokens en Redis
return {"message": "Sesión cerrada correctamente"}
@router.post("/register", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UsuarioCreate,
db: AsyncSession = Depends(get_db),
):
"""
Registra un nuevo usuario.
Args:
user_data: Datos del usuario a crear.
db: Sesión de base de datos.
Returns:
Usuario creado.
"""
# Verificar si el email ya existe
result = await db.execute(
select(Usuario).where(Usuario.email == user_data.email)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="El email ya está registrado",
)
# Crear usuario
user = Usuario(
email=user_data.email,
password_hash=hash_password(user_data.password),
nombre=user_data.nombre,
apellido=user_data.apellido,
telefono=user_data.telefono,
es_admin=user_data.es_admin,
)
db.add(user)
await db.commit()
await db.refresh(user)
return UsuarioResponse.model_validate(user)
@router.get("/me", response_model=UsuarioResponse)
async def get_current_user_info(
current_user: Usuario = Depends(get_current_user),
):
"""
Obtiene información del usuario actual.
Args:
current_user: Usuario autenticado.
Returns:
Información del usuario.
"""
return UsuarioResponse.model_validate(current_user)
@router.put("/me", response_model=UsuarioResponse)
async def update_current_user(
user_data: UsuarioUpdate,
current_user: Usuario = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Actualiza información del usuario actual.
Args:
user_data: Datos a actualizar.
current_user: Usuario autenticado.
db: Sesión de base de datos.
Returns:
Usuario actualizado.
"""
# Actualizar solo campos proporcionados
update_data = user_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(current_user, field, value)
await db.commit()
await db.refresh(current_user)
return UsuarioResponse.model_validate(current_user)
@router.put("/me/password")
async def change_password(
password_data: UsuarioUpdatePassword,
current_user: Usuario = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
Cambia la contraseña del usuario actual.
Args:
password_data: Contraseñas actual y nueva.
current_user: Usuario autenticado.
db: Sesión de base de datos.
Returns:
Mensaje de confirmación.
"""
# Verificar contraseña actual
if not verify_password(password_data.password_actual, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Contraseña actual incorrecta",
)
# Actualizar contraseña
current_user.password_hash = hash_password(password_data.password_nuevo)
await db.commit()
return {"message": "Contraseña actualizada correctamente"}