- Add User model and authentication system with JWT cookies - Implement login/logout routes and protected dashboard - Add Alembic database migration configuration - Add create_admin.py script for initial user setup - Make ContentGenerator and ImageGenerator lazy-initialized - Add comprehensive API keys setup documentation - Fix startup errors when services unavailable Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
237 lines
6.1 KiB
Python
237 lines
6.1 KiB
Python
"""
|
|
API Routes para autenticación.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request, Response
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy.orm import Session
|
|
from pydantic import BaseModel
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import (
|
|
verify_password,
|
|
get_password_hash,
|
|
create_access_token,
|
|
decode_token,
|
|
ACCESS_TOKEN_EXPIRE_MINUTES
|
|
)
|
|
from app.models.user import User
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="dashboard/templates")
|
|
|
|
# Cookie name for JWT token
|
|
TOKEN_COOKIE_NAME = "access_token"
|
|
|
|
|
|
# ===========================================
|
|
# SCHEMAS
|
|
# ===========================================
|
|
|
|
class LoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
full_name: Optional[str] = None
|
|
|
|
|
|
class TokenResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
|
|
|
|
# ===========================================
|
|
# HELPER FUNCTIONS
|
|
# ===========================================
|
|
|
|
def get_current_user_from_cookie(request: Request, db: Session) -> Optional[User]:
|
|
"""Obtener usuario actual desde la cookie JWT."""
|
|
token = request.cookies.get(TOKEN_COOKIE_NAME)
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
# Remover "Bearer " si existe
|
|
if token.startswith("Bearer "):
|
|
token = token[7:]
|
|
|
|
payload = decode_token(token)
|
|
username = payload.get("sub")
|
|
if not username:
|
|
return None
|
|
|
|
user = db.query(User).filter(User.username == username).first()
|
|
return user if user and user.is_active else None
|
|
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
|
|
"""Autenticar usuario con username y password."""
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
# Intentar con email
|
|
user = db.query(User).filter(User.email == username).first()
|
|
|
|
if not user or not verify_password(password, user.hashed_password):
|
|
return None
|
|
|
|
return user
|
|
|
|
|
|
# ===========================================
|
|
# PAGES (HTML)
|
|
# ===========================================
|
|
|
|
@router.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request, db: Session = Depends(get_db)):
|
|
"""Página de login."""
|
|
# Si ya está autenticado, redirigir al dashboard
|
|
user = get_current_user_from_cookie(request, db)
|
|
if user:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": None
|
|
})
|
|
|
|
|
|
@router.post("/login", response_class=HTMLResponse)
|
|
async def login_submit(
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Procesar formulario de login."""
|
|
form = await request.form()
|
|
username = form.get("username")
|
|
password = form.get("password")
|
|
|
|
user = authenticate_user(db, username, password)
|
|
|
|
if not user:
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": "Usuario o contraseña incorrectos"
|
|
})
|
|
|
|
# Crear token
|
|
access_token = create_access_token(
|
|
data={"sub": user.username},
|
|
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
)
|
|
|
|
# Actualizar last_login
|
|
user.last_login = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Crear respuesta con cookie
|
|
response = RedirectResponse(url="/", status_code=302)
|
|
response.set_cookie(
|
|
key=TOKEN_COOKIE_NAME,
|
|
value=f"Bearer {access_token}",
|
|
httponly=True,
|
|
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
|
|
samesite="lax"
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
@router.get("/logout")
|
|
async def logout():
|
|
"""Cerrar sesión."""
|
|
response = RedirectResponse(url="/login", status_code=302)
|
|
response.delete_cookie(TOKEN_COOKIE_NAME)
|
|
return response
|
|
|
|
|
|
# ===========================================
|
|
# API ENDPOINTS (JSON)
|
|
# ===========================================
|
|
|
|
@router.post("/api/auth/login", response_model=TokenResponse)
|
|
async def api_login(
|
|
login_data: LoginRequest,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Login via API (devuelve JWT)."""
|
|
user = authenticate_user(db, login_data.username, login_data.password)
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Usuario o contraseña incorrectos",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token = create_access_token(
|
|
data={"sub": user.username},
|
|
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
)
|
|
|
|
# Actualizar last_login
|
|
user.last_login = datetime.utcnow()
|
|
db.commit()
|
|
|
|
return TokenResponse(access_token=access_token)
|
|
|
|
|
|
@router.post("/api/auth/register")
|
|
async def api_register(
|
|
user_data: UserCreate,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Registrar nuevo usuario (solo para setup inicial)."""
|
|
# Verificar si ya existe
|
|
existing = db.query(User).filter(
|
|
(User.username == user_data.username) | (User.email == user_data.email)
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Usuario o email ya existe"
|
|
)
|
|
|
|
# Crear usuario
|
|
user = User(
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
hashed_password=get_password_hash(user_data.password),
|
|
full_name=user_data.full_name,
|
|
is_active=True
|
|
)
|
|
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return {"message": "Usuario creado", "user": user.to_dict()}
|
|
|
|
|
|
@router.get("/api/auth/me")
|
|
async def api_get_current_user(
|
|
request: Request,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Obtener información del usuario actual."""
|
|
user = get_current_user_from_cookie(request, db)
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="No autenticado"
|
|
)
|
|
|
|
return user.to_dict()
|