Files
ATLAS/backend/app/core/database.py
FlotillasGPS Developer 51d78bacf4 FlotillasGPS - Sistema completo de monitoreo de flotillas GPS
Sistema completo para monitoreo y gestion de flotas de vehiculos con:
- Backend FastAPI con PostgreSQL/TimescaleDB
- Frontend React con TypeScript y TailwindCSS
- App movil React Native con Expo
- Soporte para dispositivos GPS, Meshtastic y celulares
- Video streaming en vivo con MediaMTX
- Geocercas, alertas, viajes y reportes
- Autenticacion JWT y WebSockets en tiempo real

Documentacion completa y guias de usuario incluidas.
2026-01-21 08:18:00 +00:00

141 lines
3.6 KiB
Python

"""
Configuración de conexión a la base de datos PostgreSQL/TimescaleDB.
Proporciona:
- Engine async para SQLAlchemy
- Session factory async
- Dependency para obtener sesiones en endpoints
- Base declarativa para modelos
"""
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.pool import NullPool
from app.core.config import settings
class Base(DeclarativeBase):
"""Clase base para todos los modelos SQLAlchemy."""
pass
# Configuración del engine según el entorno
if settings.ENVIRONMENT == "testing":
# En testing usamos NullPool para evitar problemas con conexiones
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
poolclass=NullPool,
)
else:
# En producción usamos pool de conexiones
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_pre_ping=True, # Verifica conexiones antes de usar
pool_recycle=3600, # Recicla conexiones cada hora
)
# Factory de sesiones async
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency que proporciona una sesión de base de datos.
Yields:
AsyncSession: Sesión de base de datos para usar en el endpoint.
Example:
@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Item))
return result.scalars().all()
"""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db() -> None:
"""
Inicializa la base de datos creando todas las tablas.
Nota: En producción se recomienda usar Alembic para migraciones.
Esta función es útil para desarrollo y testing.
"""
async with engine.begin() as conn:
# Importar todos los modelos para que SQLAlchemy los conozca
from app.models import ( # noqa: F401
alerta,
camara,
carga_combustible,
conductor,
configuracion,
dispositivo,
evento_video,
geocerca,
grabacion,
grupo_vehiculos,
mantenimiento,
mensaje,
parada,
poi,
tipo_alerta,
tipo_mantenimiento,
ubicacion,
usuario,
vehiculo,
viaje,
)
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""
Cierra el pool de conexiones a la base de datos.
Debe llamarse al apagar la aplicación para liberar recursos.
"""
await engine.dispose()
async def check_db_connection() -> bool:
"""
Verifica que la conexión a la base de datos funcione.
Returns:
bool: True si la conexión es exitosa.
Raises:
Exception: Si no se puede conectar a la base de datos.
"""
try:
async with engine.connect() as conn:
await conn.execute("SELECT 1")
return True
except Exception as e:
raise Exception(f"Error conectando a la base de datos: {e}")