""" API Routes para autenticación. """ from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.database import get_db from app.core.security import ( verify_password, get_password_hash, create_access_token, decode_token, ACCESS_TOKEN_EXPIRE_MINUTES ) from app.models.user import User router = APIRouter() templates = Jinja2Templates(directory="dashboard/templates") # Cookie name for JWT token TOKEN_COOKIE_NAME = "access_token" # =========================================== # SCHEMAS # =========================================== class LoginRequest(BaseModel): username: str password: str class UserCreate(BaseModel): username: str email: str password: str full_name: Optional[str] = None class TokenResponse(BaseModel): access_token: str token_type: str = "bearer" # =========================================== # HELPER FUNCTIONS # =========================================== def get_current_user_from_cookie(request: Request, db: Session) -> Optional[User]: """Obtener usuario actual desde la cookie JWT.""" token = request.cookies.get(TOKEN_COOKIE_NAME) if not token: return None try: # Remover "Bearer " si existe if token.startswith("Bearer "): token = token[7:] payload = decode_token(token) username = payload.get("sub") if not username: return None user = db.query(User).filter(User.username == username).first() return user if user and user.is_active else None except Exception: return None def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """Autenticar usuario con username y password.""" user = db.query(User).filter(User.username == username).first() if not user: # Intentar con email user = db.query(User).filter(User.email == username).first() if not user or not verify_password(password, user.hashed_password): return None return user # =========================================== # PAGES (HTML) # =========================================== @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request, db: Session = Depends(get_db)): """Página de login.""" # Si ya está autenticado, redirigir al dashboard user = get_current_user_from_cookie(request, db) if user: return RedirectResponse(url="/", status_code=302) return templates.TemplateResponse("login.html", { "request": request, "error": None }) @router.post("/login", response_class=HTMLResponse) async def login_submit( request: Request, db: Session = Depends(get_db) ): """Procesar formulario de login.""" form = await request.form() username = form.get("username") password = form.get("password") user = authenticate_user(db, username, password) if not user: return templates.TemplateResponse("login.html", { "request": request, "error": "Usuario o contraseña incorrectos" }) # Crear token access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) # Actualizar last_login user.last_login = datetime.utcnow() db.commit() # Crear respuesta con cookie response = RedirectResponse(url="/", status_code=302) response.set_cookie( key=TOKEN_COOKIE_NAME, value=f"Bearer {access_token}", httponly=True, max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60, samesite="lax" ) return response @router.get("/logout") async def logout(): """Cerrar sesión.""" response = RedirectResponse(url="/login", status_code=302) response.delete_cookie(TOKEN_COOKIE_NAME) return response # =========================================== # API ENDPOINTS (JSON) # =========================================== @router.post("/api/auth/login", response_model=TokenResponse) async def api_login( login_data: LoginRequest, db: Session = Depends(get_db) ): """Login via API (devuelve JWT).""" user = authenticate_user(db, login_data.username, login_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuario o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) ) # Actualizar last_login user.last_login = datetime.utcnow() db.commit() return TokenResponse(access_token=access_token) @router.post("/api/auth/register") async def api_register( user_data: UserCreate, db: Session = Depends(get_db) ): """Registrar nuevo usuario (solo para setup inicial).""" # Verificar si ya existe existing = db.query(User).filter( (User.username == user_data.username) | (User.email == user_data.email) ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Usuario o email ya existe" ) # Crear usuario user = User( username=user_data.username, email=user_data.email, hashed_password=get_password_hash(user_data.password), full_name=user_data.full_name, is_active=True ) db.add(user) db.commit() db.refresh(user) return {"message": "Usuario creado", "user": user.to_dict()} @router.get("/api/auth/me") async def api_get_current_user( request: Request, db: Session = Depends(get_db) ): """Obtener información del usuario actual.""" user = get_current_user_from_cookie(request, db) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="No autenticado" ) return user.to_dict()