Files
ATLAS/backend/app/services/ubicacion_service.py
FlotillasGPS Developer 51d78bacf4 FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con:
- Backend FastAPI con PostgreSQL/TimescaleDB
- Frontend React con TypeScript y TailwindCSS
- App movil React Native con Expo
- Soporte para dispositivos GPS, Meshtastic y celulares
- Video streaming en vivo con MediaMTX
- Geocercas, alertas, viajes y reportes
- Autenticacion JWT y WebSockets en tiempo real

Documentacion completa y guias de usuario incluidas.
2026-01-21 08:18:00 +00:00

490 lines
15 KiB
Python

"""
Servicio para procesamiento de ubicaciones GPS.
Maneja la recepción, procesamiento y análisis de datos de ubicación.
"""
import json
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Tuple
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.dispositivo import Dispositivo
from app.models.ubicacion import Ubicacion
from app.models.vehiculo import Vehiculo
from app.schemas.ubicacion import (
HistorialUbicacionesResponse,
UbicacionCreate,
UbicacionResponse,
)
class UbicacionService:
"""Servicio para gestión de ubicaciones GPS."""
def __init__(self, db: AsyncSession):
"""
Inicializa el servicio.
Args:
db: Sesión de base de datos async.
"""
self.db = db
async def procesar_ubicacion(
self,
ubicacion_data: UbicacionCreate,
) -> Optional[UbicacionResponse]:
"""
Procesa una nueva ubicación recibida.
Args:
ubicacion_data: Datos de la ubicación a procesar.
Returns:
UbicacionResponse si se procesó correctamente, None si se descartó.
"""
# Determinar el vehículo
vehiculo_id = ubicacion_data.vehiculo_id
if not vehiculo_id and ubicacion_data.dispositivo_id:
# Buscar vehículo por identificador de dispositivo
result = await self.db.execute(
select(Dispositivo)
.where(Dispositivo.identificador == ubicacion_data.dispositivo_id)
.where(Dispositivo.activo == True)
)
dispositivo = result.scalar_one_or_none()
if dispositivo:
vehiculo_id = dispositivo.vehiculo_id
# Actualizar último contacto del dispositivo
dispositivo.ultimo_contacto = datetime.now(timezone.utc)
dispositivo.conectado = True
if ubicacion_data.bateria_dispositivo:
dispositivo.bateria = ubicacion_data.bateria_dispositivo
if ubicacion_data.satelites:
dispositivo.satelites = ubicacion_data.satelites
if not vehiculo_id:
return None
# Usar timestamp del servidor si no viene
tiempo = ubicacion_data.tiempo or datetime.now(timezone.utc)
# Crear registro de ubicación
ubicacion = Ubicacion(
tiempo=tiempo,
vehiculo_id=vehiculo_id,
lat=ubicacion_data.lat,
lng=ubicacion_data.lng,
velocidad=ubicacion_data.velocidad,
rumbo=ubicacion_data.rumbo,
altitud=ubicacion_data.altitud,
precision=ubicacion_data.precision,
hdop=ubicacion_data.hdop,
satelites=ubicacion_data.satelites,
fuente=ubicacion_data.fuente,
bateria_dispositivo=ubicacion_data.bateria_dispositivo,
bateria_vehiculo=ubicacion_data.bateria_vehiculo,
motor_encendido=ubicacion_data.motor_encendido,
odometro=ubicacion_data.odometro,
rpm=ubicacion_data.rpm,
temperatura_motor=ubicacion_data.temperatura_motor,
nivel_combustible=ubicacion_data.nivel_combustible,
)
self.db.add(ubicacion)
# Actualizar última ubicación conocida del vehículo
result = await self.db.execute(
select(Vehiculo).where(Vehiculo.id == vehiculo_id)
)
vehiculo = result.scalar_one_or_none()
if vehiculo:
vehiculo.ultima_lat = ubicacion_data.lat
vehiculo.ultima_lng = ubicacion_data.lng
vehiculo.ultima_velocidad = ubicacion_data.velocidad
vehiculo.ultimo_rumbo = ubicacion_data.rumbo
vehiculo.ultima_ubicacion_tiempo = tiempo
vehiculo.motor_encendido = ubicacion_data.motor_encendido
if ubicacion_data.odometro:
vehiculo.odometro_actual = ubicacion_data.odometro
await self.db.commit()
return UbicacionResponse(
tiempo=ubicacion.tiempo,
vehiculo_id=ubicacion.vehiculo_id,
lat=ubicacion.lat,
lng=ubicacion.lng,
velocidad=ubicacion.velocidad,
rumbo=ubicacion.rumbo,
altitud=ubicacion.altitud,
precision=ubicacion.precision,
satelites=ubicacion.satelites,
fuente=ubicacion.fuente,
bateria_dispositivo=ubicacion.bateria_dispositivo,
motor_encendido=ubicacion.motor_encendido,
odometro=ubicacion.odometro,
)
async def obtener_historial(
self,
vehiculo_id: int,
desde: datetime,
hasta: datetime,
simplificar: bool = True,
intervalo_segundos: Optional[int] = None,
) -> HistorialUbicacionesResponse:
"""
Obtiene el historial de ubicaciones de un vehículo.
Args:
vehiculo_id: ID del vehículo.
desde: Fecha/hora de inicio.
hasta: Fecha/hora de fin.
simplificar: Si simplificar la ruta (Douglas-Peucker).
intervalo_segundos: Intervalo de muestreo opcional.
Returns:
Historial de ubicaciones con estadísticas.
"""
query = (
select(Ubicacion)
.where(
and_(
Ubicacion.vehiculo_id == vehiculo_id,
Ubicacion.tiempo >= desde,
Ubicacion.tiempo <= hasta,
)
)
.order_by(Ubicacion.tiempo)
)
result = await self.db.execute(query)
ubicaciones = result.scalars().all()
# Aplicar muestreo por intervalo si se especifica
if intervalo_segundos and ubicaciones:
ubicaciones = self._muestrear_por_intervalo(
ubicaciones, intervalo_segundos
)
# Calcular estadísticas
distancia_km = self._calcular_distancia_total(ubicaciones)
tiempo_movimiento = self._calcular_tiempo_movimiento(ubicaciones)
velocidad_promedio = None
velocidad_maxima = None
if ubicaciones:
velocidades = [u.velocidad for u in ubicaciones if u.velocidad]
if velocidades:
velocidad_promedio = sum(velocidades) / len(velocidades)
velocidad_maxima = max(velocidades)
# Simplificar ruta si se solicita
if simplificar and len(ubicaciones) > 100:
ubicaciones = self._simplificar_ruta(ubicaciones, epsilon=0.0001)
ubicaciones_response = [
UbicacionResponse(
tiempo=u.tiempo,
vehiculo_id=u.vehiculo_id,
lat=u.lat,
lng=u.lng,
velocidad=u.velocidad,
rumbo=u.rumbo,
altitud=u.altitud,
precision=u.precision,
satelites=u.satelites,
fuente=u.fuente,
bateria_dispositivo=u.bateria_dispositivo,
motor_encendido=u.motor_encendido,
odometro=u.odometro,
)
for u in ubicaciones
]
return HistorialUbicacionesResponse(
vehiculo_id=vehiculo_id,
desde=desde,
hasta=hasta,
total_puntos=len(ubicaciones_response),
distancia_km=distancia_km,
tiempo_movimiento_segundos=tiempo_movimiento,
velocidad_promedio=velocidad_promedio,
velocidad_maxima=velocidad_maxima,
ubicaciones=ubicaciones_response,
)
async def obtener_ultima_ubicacion(
self,
vehiculo_id: int,
) -> Optional[UbicacionResponse]:
"""
Obtiene la última ubicación conocida de un vehículo.
Args:
vehiculo_id: ID del vehículo.
Returns:
Última ubicación o None.
"""
result = await self.db.execute(
select(Ubicacion)
.where(Ubicacion.vehiculo_id == vehiculo_id)
.order_by(Ubicacion.tiempo.desc())
.limit(1)
)
ubicacion = result.scalar_one_or_none()
if not ubicacion:
return None
return UbicacionResponse(
tiempo=ubicacion.tiempo,
vehiculo_id=ubicacion.vehiculo_id,
lat=ubicacion.lat,
lng=ubicacion.lng,
velocidad=ubicacion.velocidad,
rumbo=ubicacion.rumbo,
altitud=ubicacion.altitud,
precision=ubicacion.precision,
satelites=ubicacion.satelites,
fuente=ubicacion.fuente,
bateria_dispositivo=ubicacion.bateria_dispositivo,
motor_encendido=ubicacion.motor_encendido,
odometro=ubicacion.odometro,
)
async def obtener_ubicaciones_flota(
self,
) -> List[dict]:
"""
Obtiene las últimas ubicaciones de todos los vehículos activos.
Returns:
Lista de ubicaciones actuales de la flota.
"""
result = await self.db.execute(
select(Vehiculo)
.where(Vehiculo.activo == True)
.where(Vehiculo.ultima_lat.isnot(None))
)
vehiculos = result.scalars().all()
ubicaciones = []
for v in vehiculos:
# Determinar si está en movimiento
en_movimiento = False
if v.ultima_velocidad and v.ultima_velocidad > 5:
en_movimiento = True
ubicaciones.append({
"id": v.id,
"nombre": v.nombre,
"placa": v.placa,
"color_marcador": v.color_marcador,
"icono": v.icono,
"lat": v.ultima_lat,
"lng": v.ultima_lng,
"velocidad": v.ultima_velocidad,
"rumbo": v.ultimo_rumbo,
"tiempo": v.ultima_ubicacion_tiempo,
"motor_encendido": v.motor_encendido,
"en_movimiento": en_movimiento,
"conductor_nombre": v.conductor.nombre_completo if v.conductor else None,
})
return ubicaciones
def _calcular_distancia_total(
self,
ubicaciones: List[Ubicacion],
) -> float:
"""
Calcula la distancia total recorrida entre ubicaciones.
Usa la fórmula de Haversine para calcular distancias.
Args:
ubicaciones: Lista de ubicaciones ordenadas por tiempo.
Returns:
Distancia total en kilómetros.
"""
if len(ubicaciones) < 2:
return 0.0
import math
total_km = 0.0
for i in range(1, len(ubicaciones)):
lat1 = math.radians(ubicaciones[i - 1].lat)
lat2 = math.radians(ubicaciones[i].lat)
dlat = lat2 - lat1
dlon = math.radians(ubicaciones[i].lng - ubicaciones[i - 1].lng)
a = (
math.sin(dlat / 2) ** 2
+ math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
)
c = 2 * math.asin(math.sqrt(a))
r = 6371 # Radio de la Tierra en km
total_km += r * c
return round(total_km, 2)
def _calcular_tiempo_movimiento(
self,
ubicaciones: List[Ubicacion],
) -> int:
"""
Calcula el tiempo en movimiento (velocidad > 5 km/h).
Args:
ubicaciones: Lista de ubicaciones ordenadas.
Returns:
Tiempo en movimiento en segundos.
"""
if len(ubicaciones) < 2:
return 0
tiempo_total = 0
for i in range(1, len(ubicaciones)):
if (
ubicaciones[i - 1].velocidad
and ubicaciones[i - 1].velocidad > 5
):
delta = (
ubicaciones[i].tiempo - ubicaciones[i - 1].tiempo
).total_seconds()
tiempo_total += delta
return int(tiempo_total)
def _muestrear_por_intervalo(
self,
ubicaciones: List[Ubicacion],
intervalo_segundos: int,
) -> List[Ubicacion]:
"""
Muestrea ubicaciones por intervalo de tiempo.
Args:
ubicaciones: Lista de ubicaciones.
intervalo_segundos: Intervalo de muestreo.
Returns:
Lista filtrada de ubicaciones.
"""
if not ubicaciones:
return []
resultado = [ubicaciones[0]]
ultimo_tiempo = ubicaciones[0].tiempo
for u in ubicaciones[1:]:
delta = (u.tiempo - ultimo_tiempo).total_seconds()
if delta >= intervalo_segundos:
resultado.append(u)
ultimo_tiempo = u.tiempo
# Siempre incluir el último punto
if resultado[-1] != ubicaciones[-1]:
resultado.append(ubicaciones[-1])
return resultado
def _simplificar_ruta(
self,
ubicaciones: List[Ubicacion],
epsilon: float = 0.0001,
) -> List[Ubicacion]:
"""
Simplifica la ruta usando el algoritmo Douglas-Peucker.
Args:
ubicaciones: Lista de ubicaciones.
epsilon: Tolerancia de simplificación.
Returns:
Lista simplificada de ubicaciones.
"""
if len(ubicaciones) <= 2:
return ubicaciones
# Convertir a lista de puntos
points = [(u.lat, u.lng, u) for u in ubicaciones]
# Douglas-Peucker
simplified = self._douglas_peucker(points, epsilon)
return [p[2] for p in simplified]
def _douglas_peucker(
self,
points: List[Tuple],
epsilon: float,
) -> List[Tuple]:
"""Implementación del algoritmo Douglas-Peucker."""
if len(points) <= 2:
return points
# Encontrar el punto más lejano de la línea
dmax = 0
index = 0
end = len(points) - 1
for i in range(1, end):
d = self._perpendicular_distance(
points[i], points[0], points[end]
)
if d > dmax:
index = i
dmax = d
# Si la distancia máxima es mayor que epsilon, simplificar recursivamente
if dmax > epsilon:
# Dividir en dos segmentos
rec1 = self._douglas_peucker(points[: index + 1], epsilon)
rec2 = self._douglas_peucker(points[index:], epsilon)
# Combinar (evitar duplicar el punto medio)
return rec1[:-1] + rec2
else:
return [points[0], points[end]]
def _perpendicular_distance(
self,
point: Tuple,
line_start: Tuple,
line_end: Tuple,
) -> float:
"""Calcula la distancia perpendicular de un punto a una línea."""
import math
x, y = point[0], point[1]
x1, y1 = line_start[0], line_start[1]
x2, y2 = line_end[0], line_end[1]
# Caso especial: línea de longitud cero
dx = x2 - x1
dy = y2 - y1
if dx == 0 and dy == 0:
return math.sqrt((x - x1) ** 2 + (y - y1) ** 2)
# Distancia perpendicular
numerator = abs(dy * x - dx * y + x2 * y1 - y2 * x1)
denominator = math.sqrt(dx ** 2 + dy ** 2)
return numerator / denominator