""" Servicio para procesamiento de ubicaciones GPS. Maneja la recepción, procesamiento y análisis de datos de ubicación. """ import json from datetime import datetime, timedelta, timezone from typing import List, Optional, Tuple from sqlalchemy import and_, func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.dispositivo import Dispositivo from app.models.ubicacion import Ubicacion from app.models.vehiculo import Vehiculo from app.schemas.ubicacion import ( HistorialUbicacionesResponse, UbicacionCreate, UbicacionResponse, ) class UbicacionService: """Servicio para gestión de ubicaciones GPS.""" def __init__(self, db: AsyncSession): """ Inicializa el servicio. Args: db: Sesión de base de datos async. """ self.db = db async def procesar_ubicacion( self, ubicacion_data: UbicacionCreate, ) -> Optional[UbicacionResponse]: """ Procesa una nueva ubicación recibida. Args: ubicacion_data: Datos de la ubicación a procesar. Returns: UbicacionResponse si se procesó correctamente, None si se descartó. """ # Determinar el vehículo vehiculo_id = ubicacion_data.vehiculo_id if not vehiculo_id and ubicacion_data.dispositivo_id: # Buscar vehículo por identificador de dispositivo result = await self.db.execute( select(Dispositivo) .where(Dispositivo.identificador == ubicacion_data.dispositivo_id) .where(Dispositivo.activo == True) ) dispositivo = result.scalar_one_or_none() if dispositivo: vehiculo_id = dispositivo.vehiculo_id # Actualizar último contacto del dispositivo dispositivo.ultimo_contacto = datetime.now(timezone.utc) dispositivo.conectado = True if ubicacion_data.bateria_dispositivo: dispositivo.bateria = ubicacion_data.bateria_dispositivo if ubicacion_data.satelites: dispositivo.satelites = ubicacion_data.satelites if not vehiculo_id: return None # Usar timestamp del servidor si no viene tiempo = ubicacion_data.tiempo or datetime.now(timezone.utc) # Crear registro de ubicación ubicacion = Ubicacion( tiempo=tiempo, vehiculo_id=vehiculo_id, lat=ubicacion_data.lat, lng=ubicacion_data.lng, velocidad=ubicacion_data.velocidad, rumbo=ubicacion_data.rumbo, altitud=ubicacion_data.altitud, precision=ubicacion_data.precision, hdop=ubicacion_data.hdop, satelites=ubicacion_data.satelites, fuente=ubicacion_data.fuente, bateria_dispositivo=ubicacion_data.bateria_dispositivo, bateria_vehiculo=ubicacion_data.bateria_vehiculo, motor_encendido=ubicacion_data.motor_encendido, odometro=ubicacion_data.odometro, rpm=ubicacion_data.rpm, temperatura_motor=ubicacion_data.temperatura_motor, nivel_combustible=ubicacion_data.nivel_combustible, ) self.db.add(ubicacion) # Actualizar última ubicación conocida del vehículo result = await self.db.execute( select(Vehiculo).where(Vehiculo.id == vehiculo_id) ) vehiculo = result.scalar_one_or_none() if vehiculo: vehiculo.ultima_lat = ubicacion_data.lat vehiculo.ultima_lng = ubicacion_data.lng vehiculo.ultima_velocidad = ubicacion_data.velocidad vehiculo.ultimo_rumbo = ubicacion_data.rumbo vehiculo.ultima_ubicacion_tiempo = tiempo vehiculo.motor_encendido = ubicacion_data.motor_encendido if ubicacion_data.odometro: vehiculo.odometro_actual = ubicacion_data.odometro await self.db.commit() return UbicacionResponse( tiempo=ubicacion.tiempo, vehiculo_id=ubicacion.vehiculo_id, lat=ubicacion.lat, lng=ubicacion.lng, velocidad=ubicacion.velocidad, rumbo=ubicacion.rumbo, altitud=ubicacion.altitud, precision=ubicacion.precision, satelites=ubicacion.satelites, fuente=ubicacion.fuente, bateria_dispositivo=ubicacion.bateria_dispositivo, motor_encendido=ubicacion.motor_encendido, odometro=ubicacion.odometro, ) async def obtener_historial( self, vehiculo_id: int, desde: datetime, hasta: datetime, simplificar: bool = True, intervalo_segundos: Optional[int] = None, ) -> HistorialUbicacionesResponse: """ Obtiene el historial de ubicaciones de un vehículo. Args: vehiculo_id: ID del vehículo. desde: Fecha/hora de inicio. hasta: Fecha/hora de fin. simplificar: Si simplificar la ruta (Douglas-Peucker). intervalo_segundos: Intervalo de muestreo opcional. Returns: Historial de ubicaciones con estadísticas. """ query = ( select(Ubicacion) .where( and_( Ubicacion.vehiculo_id == vehiculo_id, Ubicacion.tiempo >= desde, Ubicacion.tiempo <= hasta, ) ) .order_by(Ubicacion.tiempo) ) result = await self.db.execute(query) ubicaciones = result.scalars().all() # Aplicar muestreo por intervalo si se especifica if intervalo_segundos and ubicaciones: ubicaciones = self._muestrear_por_intervalo( ubicaciones, intervalo_segundos ) # Calcular estadísticas distancia_km = self._calcular_distancia_total(ubicaciones) tiempo_movimiento = self._calcular_tiempo_movimiento(ubicaciones) velocidad_promedio = None velocidad_maxima = None if ubicaciones: velocidades = [u.velocidad for u in ubicaciones if u.velocidad] if velocidades: velocidad_promedio = sum(velocidades) / len(velocidades) velocidad_maxima = max(velocidades) # Simplificar ruta si se solicita if simplificar and len(ubicaciones) > 100: ubicaciones = self._simplificar_ruta(ubicaciones, epsilon=0.0001) ubicaciones_response = [ UbicacionResponse( tiempo=u.tiempo, vehiculo_id=u.vehiculo_id, lat=u.lat, lng=u.lng, velocidad=u.velocidad, rumbo=u.rumbo, altitud=u.altitud, precision=u.precision, satelites=u.satelites, fuente=u.fuente, bateria_dispositivo=u.bateria_dispositivo, motor_encendido=u.motor_encendido, odometro=u.odometro, ) for u in ubicaciones ] return HistorialUbicacionesResponse( vehiculo_id=vehiculo_id, desde=desde, hasta=hasta, total_puntos=len(ubicaciones_response), distancia_km=distancia_km, tiempo_movimiento_segundos=tiempo_movimiento, velocidad_promedio=velocidad_promedio, velocidad_maxima=velocidad_maxima, ubicaciones=ubicaciones_response, ) async def obtener_ultima_ubicacion( self, vehiculo_id: int, ) -> Optional[UbicacionResponse]: """ Obtiene la última ubicación conocida de un vehículo. Args: vehiculo_id: ID del vehículo. Returns: Última ubicación o None. """ result = await self.db.execute( select(Ubicacion) .where(Ubicacion.vehiculo_id == vehiculo_id) .order_by(Ubicacion.tiempo.desc()) .limit(1) ) ubicacion = result.scalar_one_or_none() if not ubicacion: return None return UbicacionResponse( tiempo=ubicacion.tiempo, vehiculo_id=ubicacion.vehiculo_id, lat=ubicacion.lat, lng=ubicacion.lng, velocidad=ubicacion.velocidad, rumbo=ubicacion.rumbo, altitud=ubicacion.altitud, precision=ubicacion.precision, satelites=ubicacion.satelites, fuente=ubicacion.fuente, bateria_dispositivo=ubicacion.bateria_dispositivo, motor_encendido=ubicacion.motor_encendido, odometro=ubicacion.odometro, ) async def obtener_ubicaciones_flota( self, ) -> List[dict]: """ Obtiene las últimas ubicaciones de todos los vehículos activos. Returns: Lista de ubicaciones actuales de la flota. """ result = await self.db.execute( select(Vehiculo) .where(Vehiculo.activo == True) .where(Vehiculo.ultima_lat.isnot(None)) ) vehiculos = result.scalars().all() ubicaciones = [] for v in vehiculos: # Determinar si está en movimiento en_movimiento = False if v.ultima_velocidad and v.ultima_velocidad > 5: en_movimiento = True ubicaciones.append({ "id": v.id, "nombre": v.nombre, "placa": v.placa, "color_marcador": v.color_marcador, "icono": v.icono, "lat": v.ultima_lat, "lng": v.ultima_lng, "velocidad": v.ultima_velocidad, "rumbo": v.ultimo_rumbo, "tiempo": v.ultima_ubicacion_tiempo, "motor_encendido": v.motor_encendido, "en_movimiento": en_movimiento, "conductor_nombre": v.conductor.nombre_completo if v.conductor else None, }) return ubicaciones def _calcular_distancia_total( self, ubicaciones: List[Ubicacion], ) -> float: """ Calcula la distancia total recorrida entre ubicaciones. Usa la fórmula de Haversine para calcular distancias. Args: ubicaciones: Lista de ubicaciones ordenadas por tiempo. Returns: Distancia total en kilómetros. """ if len(ubicaciones) < 2: return 0.0 import math total_km = 0.0 for i in range(1, len(ubicaciones)): lat1 = math.radians(ubicaciones[i - 1].lat) lat2 = math.radians(ubicaciones[i].lat) dlat = lat2 - lat1 dlon = math.radians(ubicaciones[i].lng - ubicaciones[i - 1].lng) a = ( math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 ) c = 2 * math.asin(math.sqrt(a)) r = 6371 # Radio de la Tierra en km total_km += r * c return round(total_km, 2) def _calcular_tiempo_movimiento( self, ubicaciones: List[Ubicacion], ) -> int: """ Calcula el tiempo en movimiento (velocidad > 5 km/h). Args: ubicaciones: Lista de ubicaciones ordenadas. Returns: Tiempo en movimiento en segundos. """ if len(ubicaciones) < 2: return 0 tiempo_total = 0 for i in range(1, len(ubicaciones)): if ( ubicaciones[i - 1].velocidad and ubicaciones[i - 1].velocidad > 5 ): delta = ( ubicaciones[i].tiempo - ubicaciones[i - 1].tiempo ).total_seconds() tiempo_total += delta return int(tiempo_total) def _muestrear_por_intervalo( self, ubicaciones: List[Ubicacion], intervalo_segundos: int, ) -> List[Ubicacion]: """ Muestrea ubicaciones por intervalo de tiempo. Args: ubicaciones: Lista de ubicaciones. intervalo_segundos: Intervalo de muestreo. Returns: Lista filtrada de ubicaciones. """ if not ubicaciones: return [] resultado = [ubicaciones[0]] ultimo_tiempo = ubicaciones[0].tiempo for u in ubicaciones[1:]: delta = (u.tiempo - ultimo_tiempo).total_seconds() if delta >= intervalo_segundos: resultado.append(u) ultimo_tiempo = u.tiempo # Siempre incluir el último punto if resultado[-1] != ubicaciones[-1]: resultado.append(ubicaciones[-1]) return resultado def _simplificar_ruta( self, ubicaciones: List[Ubicacion], epsilon: float = 0.0001, ) -> List[Ubicacion]: """ Simplifica la ruta usando el algoritmo Douglas-Peucker. Args: ubicaciones: Lista de ubicaciones. epsilon: Tolerancia de simplificación. Returns: Lista simplificada de ubicaciones. """ if len(ubicaciones) <= 2: return ubicaciones # Convertir a lista de puntos points = [(u.lat, u.lng, u) for u in ubicaciones] # Douglas-Peucker simplified = self._douglas_peucker(points, epsilon) return [p[2] for p in simplified] def _douglas_peucker( self, points: List[Tuple], epsilon: float, ) -> List[Tuple]: """Implementación del algoritmo Douglas-Peucker.""" if len(points) <= 2: return points # Encontrar el punto más lejano de la línea dmax = 0 index = 0 end = len(points) - 1 for i in range(1, end): d = self._perpendicular_distance( points[i], points[0], points[end] ) if d > dmax: index = i dmax = d # Si la distancia máxima es mayor que epsilon, simplificar recursivamente if dmax > epsilon: # Dividir en dos segmentos rec1 = self._douglas_peucker(points[: index + 1], epsilon) rec2 = self._douglas_peucker(points[index:], epsilon) # Combinar (evitar duplicar el punto medio) return rec1[:-1] + rec2 else: return [points[0], points[end]] def _perpendicular_distance( self, point: Tuple, line_start: Tuple, line_end: Tuple, ) -> float: """Calcula la distancia perpendicular de un punto a una línea.""" import math x, y = point[0], point[1] x1, y1 = line_start[0], line_start[1] x2, y2 = line_end[0], line_end[1] # Caso especial: línea de longitud cero dx = x2 - x1 dy = y2 - y1 if dx == 0 and dy == 0: return math.sqrt((x - x1) ** 2 + (y - y1) ** 2) # Distancia perpendicular numerator = abs(dy * x - dx * y + x2 * y1 - y2 * x1) denominator = math.sqrt(dx ** 2 + dy ** 2) return numerator / denominator