""" Configuración de conexión a la base de datos PostgreSQL/TimescaleDB. Proporciona: - Engine async para SQLAlchemy - Session factory async - Dependency para obtener sesiones en endpoints - Base declarativa para modelos """ from typing import AsyncGenerator from sqlalchemy.ext.asyncio import ( AsyncSession, async_sessionmaker, create_async_engine, ) from sqlalchemy.orm import DeclarativeBase from sqlalchemy.pool import NullPool from app.core.config import settings class Base(DeclarativeBase): """Clase base para todos los modelos SQLAlchemy.""" pass # Configuración del engine según el entorno if settings.ENVIRONMENT == "testing": # En testing usamos NullPool para evitar problemas con conexiones engine = create_async_engine( settings.DATABASE_URL, echo=settings.DEBUG, poolclass=NullPool, ) else: # En producción usamos pool de conexiones engine = create_async_engine( settings.DATABASE_URL, echo=settings.DEBUG, pool_size=settings.DATABASE_POOL_SIZE, max_overflow=settings.DATABASE_MAX_OVERFLOW, pool_pre_ping=True, # Verifica conexiones antes de usar pool_recycle=3600, # Recicla conexiones cada hora ) # Factory de sesiones async async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, ) async def get_db() -> AsyncGenerator[AsyncSession, None]: """ Dependency que proporciona una sesión de base de datos. Yields: AsyncSession: Sesión de base de datos para usar en el endpoint. Example: @router.get("/items") async def get_items(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Item)) return result.scalars().all() """ async with async_session_factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def init_db() -> None: """ Inicializa la base de datos creando todas las tablas. Nota: En producción se recomienda usar Alembic para migraciones. Esta función es útil para desarrollo y testing. """ async with engine.begin() as conn: # Importar todos los modelos para que SQLAlchemy los conozca from app.models import ( # noqa: F401 alerta, camara, carga_combustible, conductor, configuracion, dispositivo, evento_video, geocerca, grabacion, grupo_vehiculos, mantenimiento, mensaje, parada, poi, tipo_alerta, tipo_mantenimiento, ubicacion, usuario, vehiculo, viaje, ) await conn.run_sync(Base.metadata.create_all) async def close_db() -> None: """ Cierra el pool de conexiones a la base de datos. Debe llamarse al apagar la aplicación para liberar recursos. """ await engine.dispose() async def check_db_connection() -> bool: """ Verifica que la conexión a la base de datos funcione. Returns: bool: True si la conexión es exitosa. Raises: Exception: Si no se puede conectar a la base de datos. """ try: async with engine.connect() as conn: await conn.execute("SELECT 1") return True except Exception as e: raise Exception(f"Error conectando a la base de datos: {e}")