Files
ATLAS/backend/app/api/v1/conductores.py
FlotillasGPS Developer 51d78bacf4 FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con:
- Backend FastAPI con PostgreSQL/TimescaleDB
- Frontend React con TypeScript y TailwindCSS
- App movil React Native con Expo
- Soporte para dispositivos GPS, Meshtastic y celulares
- Video streaming en vivo con MediaMTX
- Geocercas, alertas, viajes y reportes
- Autenticacion JWT y WebSockets en tiempo real

Documentacion completa y guias de usuario incluidas.
2026-01-21 08:18:00 +00:00

412 lines
13 KiB
Python

"""
Endpoints para gestión de conductores.
"""
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.usuario import Usuario
from app.models.conductor import Conductor
from app.models.viaje import Viaje
from app.models.alerta import Alerta
from app.schemas.conductor import (
ConductorCreate,
ConductorUpdate,
ConductorResponse,
ConductorResumen,
ConductorEstadisticas,
)
router = APIRouter(prefix="/conductores", tags=["Conductores"])
@router.get("", response_model=List[ConductorResumen])
async def listar_conductores(
activo: Optional[bool] = None,
buscar: Optional[str] = None,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Lista todos los conductores con filtros opcionales.
Args:
activo: Filtrar por estado activo.
buscar: Búsqueda por nombre, apellido o licencia.
skip: Registros a saltar.
limit: Límite de registros.
Returns:
Lista de conductores.
"""
query = select(Conductor)
if activo is not None:
query = query.where(Conductor.activo == activo)
if buscar:
query = query.where(
(Conductor.nombre.ilike(f"%{buscar}%")) |
(Conductor.apellido.ilike(f"%{buscar}%")) |
(Conductor.licencia_numero.ilike(f"%{buscar}%"))
)
query = query.offset(skip).limit(limit).order_by(Conductor.nombre)
result = await db.execute(query)
conductores = result.scalars().all()
return [
ConductorResumen(
id=c.id,
nombre_completo=c.nombre_completo,
telefono=c.telefono,
licencia_vigente=c.licencia_vigente,
activo=c.activo,
)
for c in conductores
]
@router.get("/{conductor_id}", response_model=ConductorResponse)
async def obtener_conductor(
conductor_id: int,
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Obtiene un conductor por su ID.
Args:
conductor_id: ID del conductor.
Returns:
Conductor encontrado.
"""
result = await db.execute(
select(Conductor).where(Conductor.id == conductor_id)
)
conductor = result.scalar_one_or_none()
if not conductor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conductor con id {conductor_id} no encontrado",
)
return ConductorResponse(
id=conductor.id,
nombre=conductor.nombre,
apellido=conductor.apellido,
telefono=conductor.telefono,
email=conductor.email,
documento_tipo=conductor.documento_tipo,
documento_numero=conductor.documento_numero,
licencia_numero=conductor.licencia_numero,
licencia_tipo=conductor.licencia_tipo,
licencia_vencimiento=conductor.licencia_vencimiento,
fecha_nacimiento=conductor.fecha_nacimiento,
direccion=conductor.direccion,
contacto_emergencia=conductor.contacto_emergencia,
telefono_emergencia=conductor.telefono_emergencia,
fecha_contratacion=conductor.fecha_contratacion,
numero_empleado=conductor.numero_empleado,
foto_url=conductor.foto_url,
activo=conductor.activo,
notas=conductor.notas,
nombre_completo=conductor.nombre_completo,
licencia_vigente=conductor.licencia_vigente,
creado_en=conductor.creado_en,
actualizado_en=conductor.actualizado_en,
)
@router.post("", response_model=ConductorResponse, status_code=status.HTTP_201_CREATED)
async def crear_conductor(
conductor_data: ConductorCreate,
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Crea un nuevo conductor.
Args:
conductor_data: Datos del conductor.
Returns:
Conductor creado.
"""
# Verificar licencia única si se proporciona
if conductor_data.licencia_numero:
result = await db.execute(
select(Conductor).where(Conductor.licencia_numero == conductor_data.licencia_numero)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Ya existe un conductor con la licencia {conductor_data.licencia_numero}",
)
conductor = Conductor(**conductor_data.model_dump())
db.add(conductor)
await db.commit()
await db.refresh(conductor)
return ConductorResponse(
id=conductor.id,
nombre=conductor.nombre,
apellido=conductor.apellido,
telefono=conductor.telefono,
email=conductor.email,
documento_tipo=conductor.documento_tipo,
documento_numero=conductor.documento_numero,
licencia_numero=conductor.licencia_numero,
licencia_tipo=conductor.licencia_tipo,
licencia_vencimiento=conductor.licencia_vencimiento,
fecha_nacimiento=conductor.fecha_nacimiento,
direccion=conductor.direccion,
contacto_emergencia=conductor.contacto_emergencia,
telefono_emergencia=conductor.telefono_emergencia,
fecha_contratacion=conductor.fecha_contratacion,
numero_empleado=conductor.numero_empleado,
foto_url=conductor.foto_url,
activo=conductor.activo,
notas=conductor.notas,
nombre_completo=conductor.nombre_completo,
licencia_vigente=conductor.licencia_vigente,
creado_en=conductor.creado_en,
actualizado_en=conductor.actualizado_en,
)
@router.put("/{conductor_id}", response_model=ConductorResponse)
async def actualizar_conductor(
conductor_id: int,
conductor_data: ConductorUpdate,
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Actualiza un conductor existente.
Args:
conductor_id: ID del conductor.
conductor_data: Datos a actualizar.
Returns:
Conductor actualizado.
"""
result = await db.execute(
select(Conductor).where(Conductor.id == conductor_id)
)
conductor = result.scalar_one_or_none()
if not conductor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conductor con id {conductor_id} no encontrado",
)
# Verificar licencia única si se cambia
if conductor_data.licencia_numero and conductor_data.licencia_numero != conductor.licencia_numero:
result = await db.execute(
select(Conductor).where(Conductor.licencia_numero == conductor_data.licencia_numero)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Ya existe un conductor con la licencia {conductor_data.licencia_numero}",
)
update_data = conductor_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(conductor, field, value)
await db.commit()
await db.refresh(conductor)
return ConductorResponse(
id=conductor.id,
nombre=conductor.nombre,
apellido=conductor.apellido,
telefono=conductor.telefono,
email=conductor.email,
documento_tipo=conductor.documento_tipo,
documento_numero=conductor.documento_numero,
licencia_numero=conductor.licencia_numero,
licencia_tipo=conductor.licencia_tipo,
licencia_vencimiento=conductor.licencia_vencimiento,
fecha_nacimiento=conductor.fecha_nacimiento,
direccion=conductor.direccion,
contacto_emergencia=conductor.contacto_emergencia,
telefono_emergencia=conductor.telefono_emergencia,
fecha_contratacion=conductor.fecha_contratacion,
numero_empleado=conductor.numero_empleado,
foto_url=conductor.foto_url,
activo=conductor.activo,
notas=conductor.notas,
nombre_completo=conductor.nombre_completo,
licencia_vigente=conductor.licencia_vigente,
creado_en=conductor.creado_en,
actualizado_en=conductor.actualizado_en,
)
@router.delete("/{conductor_id}", status_code=status.HTTP_204_NO_CONTENT)
async def eliminar_conductor(
conductor_id: int,
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Elimina un conductor (soft delete - desactiva).
Args:
conductor_id: ID del conductor.
"""
result = await db.execute(
select(Conductor).where(Conductor.id == conductor_id)
)
conductor = result.scalar_one_or_none()
if not conductor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conductor con id {conductor_id} no encontrado",
)
conductor.activo = False
await db.commit()
@router.get("/{conductor_id}/estadisticas", response_model=ConductorEstadisticas)
async def obtener_estadisticas_conductor(
conductor_id: int,
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Obtiene estadísticas de un conductor.
Args:
conductor_id: ID del conductor.
Returns:
Estadísticas del conductor.
"""
result = await db.execute(
select(Conductor).where(Conductor.id == conductor_id)
)
conductor = result.scalar_one_or_none()
if not conductor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conductor con id {conductor_id} no encontrado",
)
# Total de viajes
result = await db.execute(
select(func.count(Viaje.id))
.where(Viaje.conductor_id == conductor_id)
)
total_viajes = result.scalar() or 0
# Distancia total
result = await db.execute(
select(func.coalesce(func.sum(Viaje.distancia_km), 0))
.where(Viaje.conductor_id == conductor_id)
)
distancia_total = result.scalar() or 0
# Tiempo de conducción
result = await db.execute(
select(func.coalesce(func.sum(Viaje.tiempo_movimiento_segundos), 0))
.where(Viaje.conductor_id == conductor_id)
)
tiempo_conduccion = result.scalar() or 0
# Velocidad promedio
result = await db.execute(
select(func.avg(Viaje.velocidad_promedio))
.where(Viaje.conductor_id == conductor_id)
.where(Viaje.velocidad_promedio.isnot(None))
)
velocidad_promedio = result.scalar() or 0
# Total de alertas
result = await db.execute(
select(func.count(Alerta.id))
.where(Alerta.conductor_id == conductor_id)
)
alertas_total = result.scalar() or 0
# Alertas de velocidad
from app.models.tipo_alerta import TipoAlerta
result = await db.execute(
select(func.count(Alerta.id))
.join(TipoAlerta, Alerta.tipo_alerta_id == TipoAlerta.id)
.where(Alerta.conductor_id == conductor_id)
.where(TipoAlerta.codigo == "EXCESO_VELOCIDAD")
)
alertas_velocidad = result.scalar() or 0
return ConductorEstadisticas(
conductor_id=conductor.id,
nombre_completo=conductor.nombre_completo,
total_viajes=total_viajes,
distancia_total_km=float(distancia_total),
tiempo_conduccion_horas=tiempo_conduccion / 3600,
velocidad_promedio=float(velocidad_promedio) if velocidad_promedio else 0,
alertas_total=alertas_total,
alertas_velocidad=alertas_velocidad,
)
@router.get("/licencias/por-vencer", response_model=List[ConductorResumen])
async def obtener_licencias_por_vencer(
dias: int = Query(30, ge=1, le=365),
db: AsyncSession = Depends(get_db),
current_user: Usuario = Depends(get_current_user),
):
"""
Obtiene conductores con licencias próximas a vencer.
Args:
dias: Días para considerar como "próximo a vencer".
Returns:
Lista de conductores con licencias por vencer.
"""
from datetime import date
fecha_limite = date.today() + timedelta(days=dias)
result = await db.execute(
select(Conductor)
.where(Conductor.activo == True)
.where(Conductor.licencia_vencimiento.isnot(None))
.where(Conductor.licencia_vencimiento <= fecha_limite)
.order_by(Conductor.licencia_vencimiento)
)
conductores = result.scalars().all()
return [
ConductorResumen(
id=c.id,
nombre_completo=c.nombre_completo,
telefono=c.telefono,
licencia_vigente=c.licencia_vigente,
activo=c.activo,
)
for c in conductores
]