feat: Initial project structure for WebTriviasMulti

- Backend: FastAPI + Python-SocketIO + SQLAlchemy
  - Models for categories, questions, game sessions, events
  - AI services for answer validation and question generation (Claude)
  - Room management with Redis
  - Game logic with stealing mechanics
  - Admin API for question management

- Frontend: React + Vite + TypeScript + Tailwind
  - 5 visual themes (DRRR, Retro, Minimal, RGB, Anime 90s)
  - Real-time game with Socket.IO
  - Achievement system
  - Replay functionality
  - Sound effects per theme

- Docker Compose for deployment
- Design documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-26 07:50:48 +00:00
commit 43021b9c3c
57 changed files with 5446 additions and 0 deletions

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# WebTriviasMulti Backend

View File

@@ -0,0 +1 @@
# API routers

296
backend/app/api/admin.py Normal file
View File

@@ -0,0 +1,296 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import List
from app.models.base import get_db
from app.models.admin import Admin
from app.models.question import Question
from app.models.category import Category
from app.schemas.admin import AdminCreate, Token, TokenData
from app.schemas.question import (
QuestionCreate, QuestionUpdate, QuestionResponse,
AIGenerateRequest
)
from app.services.ai_generator import ai_generator
from app.config import get_settings
router = APIRouter()
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expire_minutes)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm)
async def get_current_admin(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> Admin:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]
)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(Admin).where(Admin.username == username))
admin = result.scalar_one_or_none()
if admin is None:
raise credentials_exception
return admin
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(Admin).where(Admin.username == form_data.username)
)
admin = result.scalar_one_or_none()
if not admin or not verify_password(form_data.password, admin.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": admin.username})
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=Token)
async def register_admin(
admin_data: AdminCreate,
db: AsyncSession = Depends(get_db)
):
# Check if admin exists
result = await db.execute(
select(Admin).where(Admin.username == admin_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Create admin
admin = Admin(
username=admin_data.username,
password_hash=get_password_hash(admin_data.password)
)
db.add(admin)
await db.commit()
access_token = create_access_token(data={"sub": admin.username})
return {"access_token": access_token, "token_type": "bearer"}
# Question Management
@router.get("/questions", response_model=List[QuestionResponse])
async def get_questions(
category_id: int = None,
status: str = None,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
query = select(Question)
if category_id:
query = query.where(Question.category_id == category_id)
if status:
query = query.where(Question.status == status)
result = await db.execute(query.order_by(Question.created_at.desc()))
return result.scalars().all()
@router.post("/questions", response_model=QuestionResponse)
async def create_question(
question_data: QuestionCreate,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
question = Question(
**question_data.model_dump(),
points=settings.default_points.get(question_data.difficulty, 300),
time_seconds=settings.default_times.get(question_data.difficulty, 25)
)
db.add(question)
await db.commit()
await db.refresh(question)
return question
@router.put("/questions/{question_id}", response_model=QuestionResponse)
async def update_question(
question_id: int,
question_data: QuestionUpdate,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
for key, value in question_data.model_dump(exclude_unset=True).items():
setattr(question, key, value)
await db.commit()
await db.refresh(question)
return question
@router.delete("/questions/{question_id}")
async def delete_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
await db.delete(question)
await db.commit()
return {"status": "deleted"}
@router.post("/questions/generate")
async def generate_questions(
request: AIGenerateRequest,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
# Get category name
result = await db.execute(
select(Category).where(Category.id == request.category_id)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# Generate questions with AI
generated = await ai_generator.generate_questions(
category_name=category.name,
difficulty=request.difficulty,
count=request.count
)
# Save to database as pending
questions = []
for q_data in generated:
question = Question(
category_id=request.category_id,
question_text=q_data["question"],
correct_answer=q_data["correct_answer"],
alt_answers=q_data.get("alt_answers", []),
difficulty=q_data["difficulty"],
points=q_data["points"],
time_seconds=q_data["time_seconds"],
fun_fact=q_data.get("fun_fact"),
status="pending"
)
db.add(question)
questions.append(question)
await db.commit()
return {
"generated": len(questions),
"questions": [q.id for q in questions]
}
@router.post("/questions/{question_id}/approve")
async def approve_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
question.status = "approved"
await db.commit()
return {"status": "approved"}
@router.post("/questions/{question_id}/reject")
async def reject_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
await db.delete(question)
await db.commit()
return {"status": "rejected"}
# Categories
@router.get("/categories")
async def get_categories(
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Category))
return result.scalars().all()
@router.post("/categories")
async def create_category(
name: str,
icon: str = None,
color: str = None,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
category = Category(name=name, icon=icon, color=color)
db.add(category)
await db.commit()
await db.refresh(category)
return category

119
backend/app/api/game.py Normal file
View File

@@ -0,0 +1,119 @@
from fastapi import APIRouter, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import date
from typing import Dict, List
from app.models.base import get_db
from app.models.question import Question
from app.models.category import Category
from app.schemas.game import RoomCreate, RoomJoin, GameState
router = APIRouter()
@router.get("/categories")
async def get_game_categories():
"""Get all categories for the game board."""
# Return hardcoded categories for now
# In production, these would come from the database
return [
{"id": 1, "name": "Nintendo", "icon": "🍄", "color": "#E60012"},
{"id": 2, "name": "Xbox", "icon": "🎮", "color": "#107C10"},
{"id": 3, "name": "PlayStation", "icon": "🎯", "color": "#003791"},
{"id": 4, "name": "Anime", "icon": "⛩️", "color": "#FF6B9D"},
{"id": 5, "name": "Música", "icon": "🎵", "color": "#1DB954"},
{"id": 6, "name": "Películas", "icon": "🎬", "color": "#F5C518"},
{"id": 7, "name": "Libros", "icon": "📚", "color": "#8B4513"},
{"id": 8, "name": "Historia-Cultura", "icon": "🏛️", "color": "#6B5B95"},
]
@router.get("/board/{room_code}")
async def get_game_board(room_code: str):
"""
Get the game board with questions for today.
Returns questions grouped by category.
"""
from app.services.room_manager import room_manager
room = await room_manager.get_room(room_code)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
# If board already exists in room, return it
if room.get("board"):
return room["board"]
# Otherwise, this would load from database
# For now, return empty board structure
return {}
@router.get("/today-questions")
async def get_today_questions():
"""
Get all approved questions for today, grouped by category and difficulty.
This is used to build the game board.
"""
# This would query the database for questions with date_active = today
# For now, return sample structure
return {
"date": str(date.today()),
"categories": {
"1": { # Nintendo
"name": "Nintendo",
"questions": [
{"difficulty": 1, "id": 1, "points": 100},
{"difficulty": 2, "id": 2, "points": 200},
{"difficulty": 3, "id": 3, "points": 300},
{"difficulty": 4, "id": 4, "points": 400},
{"difficulty": 5, "id": 5, "points": 500},
]
}
# ... other categories
}
}
@router.get("/question/{question_id}")
async def get_question(question_id: int):
"""
Get a specific question (without the answer).
Used when a player selects a question.
"""
# This would query the database
# For now, return sample
return {
"id": question_id,
"question_text": "¿En qué año se lanzó la primera consola Nintendo Entertainment System (NES) en Japón?",
"difficulty": 3,
"points": 300,
"time_seconds": 25,
"category_id": 1
}
@router.get("/achievements")
async def get_achievements():
"""Get list of all available achievements."""
return [
{"id": 1, "name": "Primera Victoria", "description": "Ganar tu primera partida", "icon": "🏆"},
{"id": 2, "name": "Racha de 3", "description": "Responder 3 correctas seguidas", "icon": "🔥"},
{"id": 3, "name": "Racha de 5", "description": "Responder 5 correctas seguidas", "icon": "🔥🔥"},
{"id": 4, "name": "Ladrón Novato", "description": "Primer robo exitoso", "icon": "🦝"},
{"id": 5, "name": "Ladrón Maestro", "description": "5 robos exitosos en una partida", "icon": "🦝👑"},
{"id": 6, "name": "Especialista Nintendo", "description": "10 correctas en Nintendo", "icon": "🍄"},
{"id": 7, "name": "Especialista Xbox", "description": "10 correctas en Xbox", "icon": "🎮"},
{"id": 8, "name": "Especialista PlayStation", "description": "10 correctas en PlayStation", "icon": "🎯"},
{"id": 9, "name": "Especialista Anime", "description": "10 correctas en Anime", "icon": "⛩️"},
{"id": 10, "name": "Especialista Música", "description": "10 correctas en Música", "icon": "🎵"},
{"id": 11, "name": "Especialista Películas", "description": "10 correctas en Películas", "icon": "🎬"},
{"id": 12, "name": "Especialista Libros", "description": "10 correctas en Libros", "icon": "📚"},
{"id": 13, "name": "Especialista Historia", "description": "10 correctas en Historia-Cultura", "icon": "🏛️"},
{"id": 14, "name": "Invicto", "description": "Ganar sin fallar ninguna pregunta", "icon": ""},
{"id": 15, "name": "Velocista", "description": "Responder correctamente en menos de 3 segundos", "icon": ""},
{"id": 16, "name": "Comeback", "description": "Ganar estando 500+ puntos abajo", "icon": "🔄"},
{"id": 17, "name": "Dominio Total", "description": "Responder las 5 preguntas de una categoría", "icon": "👑"},
{"id": 18, "name": "Arriesgado", "description": "Responder correctamente 3 preguntas de 500 pts", "icon": "🎰"},
]

113
backend/app/api/replay.py Normal file
View File

@@ -0,0 +1,113 @@
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List
from app.models.base import get_db
from app.models.game_session import GameSession
from app.models.game_event import GameEvent
router = APIRouter()
@router.get("/{session_id}")
async def get_replay(
session_id: int,
db: AsyncSession = Depends(get_db)
):
"""
Get replay data for a game session.
Returns all events in chronological order.
"""
# Get session
result = await db.execute(
select(GameSession).where(GameSession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
# Get all events
events_result = await db.execute(
select(GameEvent)
.where(GameEvent.session_id == session_id)
.order_by(GameEvent.timestamp)
)
events = events_result.scalars().all()
return {
"session": {
"id": session.id,
"room_code": session.room_code,
"team_a_score": session.team_a_score,
"team_b_score": session.team_b_score,
"status": session.status,
"created_at": session.created_at,
"finished_at": session.finished_at
},
"events": [
{
"id": e.id,
"event_type": e.event_type,
"player_name": e.player_name,
"team": e.team,
"question_id": e.question_id,
"answer_given": e.answer_given,
"was_correct": e.was_correct,
"was_steal": e.was_steal,
"points_earned": e.points_earned,
"timestamp": e.timestamp
}
for e in events
]
}
@router.get("/code/{room_code}")
async def get_replay_by_code(
room_code: str,
db: AsyncSession = Depends(get_db)
):
"""
Get replay data by room code.
"""
result = await db.execute(
select(GameSession).where(GameSession.room_code == room_code)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return await get_replay(session.id, db)
@router.get("/")
async def list_replays(
limit: int = 20,
offset: int = 0,
db: AsyncSession = Depends(get_db)
):
"""
List recent finished game sessions.
"""
result = await db.execute(
select(GameSession)
.where(GameSession.status == "finished")
.order_by(GameSession.finished_at.desc())
.offset(offset)
.limit(limit)
)
sessions = result.scalars().all()
return [
{
"id": s.id,
"room_code": s.room_code,
"team_a_score": s.team_a_score,
"team_b_score": s.team_b_score,
"finished_at": s.finished_at
}
for s in sessions
]

47
backend/app/config.py Normal file
View File

@@ -0,0 +1,47 @@
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Database
database_url: str = "postgresql://trivia:trivia@localhost:5432/trivia"
# Redis
redis_url: str = "redis://localhost:6379"
# JWT
jwt_secret: str = "dev-secret-key-change-in-production"
jwt_algorithm: str = "HS256"
jwt_expire_minutes: int = 1440 # 24 hours
# Anthropic
anthropic_api_key: str = ""
# Game settings
default_times: dict = {
1: 15, # 100 pts
2: 20, # 200 pts
3: 25, # 300 pts
4: 35, # 400 pts
5: 45, # 500 pts
}
default_points: dict = {
1: 100,
2: 200,
3: 300,
4: 400,
5: 500,
}
steal_penalty_multiplier: float = 0.5
steal_time_multiplier: float = 0.5
class Config:
env_file = ".env"
extra = "ignore"
@lru_cache()
def get_settings() -> Settings:
return Settings()

76
backend/app/main.py Normal file
View File

@@ -0,0 +1,76 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import socketio
from contextlib import asynccontextmanager
from app.config import get_settings
from app.api import admin, game, replay
from app.sockets.game_events import register_socket_events
settings = get_settings()
# Socket.IO server
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*",
logger=True,
engineio_logger=True
)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting WebTriviasMulti server...")
yield
# Shutdown
print("Shutting down WebTriviasMulti server...")
# FastAPI app
app = FastAPI(
title="WebTriviasMulti API",
description="API para el juego de trivia multiplayer",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(admin.router, prefix="/api/admin", tags=["admin"])
app.include_router(game.router, prefix="/api/game", tags=["game"])
app.include_router(replay.router, prefix="/api/replay", tags=["replay"])
# Register Socket.IO events
register_socket_events(sio)
# Mount Socket.IO
socket_app = socketio.ASGIApp(sio, app)
@app.get("/")
async def root():
return {
"message": "WebTriviasMulti API",
"version": "1.0.0",
"status": "running"
}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
# For running with uvicorn directly
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:socket_app", host="0.0.0.0", port=8000, reload=True)

View File

@@ -0,0 +1,7 @@
from app.models.category import Category
from app.models.question import Question
from app.models.game_session import GameSession
from app.models.game_event import GameEvent
from app.models.admin import Admin
__all__ = ["Category", "Question", "GameSession", "GameEvent", "Admin"]

View File

@@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func
from app.models.base import Base
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(100), unique=True, nullable=False, index=True)
password_hash = Column(String(255), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
def __repr__(self):
return f"<Admin(id={self.id}, username='{self.username}')>"

View File

@@ -0,0 +1,27 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.config import get_settings
settings = get_settings()
# Convert postgresql:// to postgresql+asyncpg://
database_url = settings.database_url.replace(
"postgresql://", "postgresql+asyncpg://"
)
engine = create_async_engine(database_url, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()

View File

@@ -0,0 +1,18 @@
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from app.models.base import Base
class Category(Base):
__tablename__ = "categories"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False, unique=True)
icon = Column(String(50))
color = Column(String(7)) # Hex color
# Relationships
questions = relationship("Question", back_populates="category")
def __repr__(self):
return f"<Category(id={self.id}, name='{self.name}')>"

View File

@@ -0,0 +1,27 @@
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.models.base import Base
class GameEvent(Base):
__tablename__ = "game_events"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(Integer, ForeignKey("game_sessions.id"), nullable=False)
event_type = Column(String(50), nullable=False) # question_selected, answer_submitted, steal_attempted, etc.
player_name = Column(String(100))
team = Column(String(1)) # 'A' or 'B'
question_id = Column(Integer, ForeignKey("questions.id"))
answer_given = Column(Text)
was_correct = Column(Boolean)
was_steal = Column(Boolean, default=False)
points_earned = Column(Integer)
timestamp = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
session = relationship("GameSession", back_populates="events")
question = relationship("Question", back_populates="game_events")
def __repr__(self):
return f"<GameEvent(id={self.id}, type='{self.event_type}', player='{self.player_name}')>"

View File

@@ -0,0 +1,24 @@
from sqlalchemy import Column, Integer, String, DateTime, ARRAY
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.models.base import Base
class GameSession(Base):
__tablename__ = "game_sessions"
id = Column(Integer, primary_key=True, index=True)
room_code = Column(String(6), unique=True, nullable=False, index=True)
status = Column(String(20), default="waiting") # waiting, playing, finished
team_a_score = Column(Integer, default=0)
team_b_score = Column(Integer, default=0)
current_team = Column(String(1)) # 'A' or 'B'
questions_used = Column(ARRAY(Integer), default=[])
created_at = Column(DateTime(timezone=True), server_default=func.now())
finished_at = Column(DateTime(timezone=True))
# Relationships
events = relationship("GameEvent", back_populates="session")
def __repr__(self):
return f"<GameSession(id={self.id}, room_code='{self.room_code}', status='{self.status}')>"

View File

@@ -0,0 +1,28 @@
from sqlalchemy import Column, Integer, String, Text, Date, DateTime, ForeignKey, ARRAY
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.models.base import Base
class Question(Base):
__tablename__ = "questions"
id = Column(Integer, primary_key=True, index=True)
category_id = Column(Integer, ForeignKey("categories.id"), nullable=False)
question_text = Column(Text, nullable=False)
correct_answer = Column(String(500), nullable=False)
alt_answers = Column(ARRAY(String), default=[])
difficulty = Column(Integer, nullable=False) # 1-5
points = Column(Integer, nullable=False)
time_seconds = Column(Integer, nullable=False)
date_active = Column(Date, index=True)
status = Column(String(20), default="pending") # pending, approved, used
fun_fact = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationships
category = relationship("Category", back_populates="questions")
game_events = relationship("GameEvent", back_populates="question")
def __repr__(self):
return f"<Question(id={self.id}, difficulty={self.difficulty}, status='{self.status}')>"

View File

@@ -0,0 +1,17 @@
from app.schemas.question import QuestionCreate, QuestionUpdate, QuestionResponse
from app.schemas.game import (
RoomCreate,
RoomJoin,
PlayerInfo,
GameState,
AnswerSubmit,
StealAttempt
)
from app.schemas.admin import AdminCreate, AdminLogin, Token
__all__ = [
"QuestionCreate", "QuestionUpdate", "QuestionResponse",
"RoomCreate", "RoomJoin", "PlayerInfo", "GameState",
"AnswerSubmit", "StealAttempt",
"AdminCreate", "AdminLogin", "Token"
]

View File

@@ -0,0 +1,31 @@
from pydantic import BaseModel
from datetime import datetime
class AdminBase(BaseModel):
username: str
class AdminCreate(AdminBase):
password: str
class AdminLogin(AdminBase):
password: str
class AdminResponse(AdminBase):
id: int
created_at: datetime
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
username: str | None = None

View File

@@ -0,0 +1,70 @@
from pydantic import BaseModel
from typing import Optional, List, Dict
from datetime import datetime
class PlayerInfo(BaseModel):
name: str
team: str # 'A' or 'B'
position: int
socket_id: Optional[str] = None
class RoomCreate(BaseModel):
player_name: str
class RoomJoin(BaseModel):
room_code: str
player_name: str
team: str # 'A' or 'B'
class TeamState(BaseModel):
players: List[PlayerInfo]
score: int
current_player_index: int
class QuestionState(BaseModel):
id: int
category_id: int
difficulty: int
points: int
answered: bool = False
class GameState(BaseModel):
room_code: str
status: str # waiting, playing, finished
team_a: TeamState
team_b: TeamState
current_team: Optional[str] = None
current_question: Optional[int] = None
can_steal: bool = False
board: Dict[int, List[QuestionState]] # category_id -> questions
timer_end: Optional[datetime] = None
class AnswerSubmit(BaseModel):
question_id: int
answer: str
class StealAttempt(BaseModel):
question_id: int
attempt: bool # True = try to steal, False = pass
answer: Optional[str] = None
class ChatMessage(BaseModel):
player_name: str
team: str
message: str
timestamp: datetime
class EmojiReaction(BaseModel):
player_name: str
team: str
emoji: str # One of: 👏 😮 😂 🔥 💀 🎉 😭 🤔

View File

@@ -0,0 +1,61 @@
from pydantic import BaseModel
from typing import Optional, List
from datetime import date, datetime
class QuestionBase(BaseModel):
question_text: str
correct_answer: str
alt_answers: List[str] = []
difficulty: int
fun_fact: Optional[str] = None
class QuestionCreate(QuestionBase):
category_id: int
class QuestionUpdate(BaseModel):
question_text: Optional[str] = None
correct_answer: Optional[str] = None
alt_answers: Optional[List[str]] = None
difficulty: Optional[int] = None
fun_fact: Optional[str] = None
status: Optional[str] = None
date_active: Optional[date] = None
class QuestionResponse(QuestionBase):
id: int
category_id: int
points: int
time_seconds: int
date_active: Optional[date]
status: str
created_at: datetime
class Config:
from_attributes = True
class QuestionForGame(BaseModel):
id: int
category_id: int
question_text: str
difficulty: int
points: int
time_seconds: int
class Config:
from_attributes = True
class AIGenerateRequest(BaseModel):
category_id: int
difficulty: int
count: int = 5
class AIValidateRequest(BaseModel):
question_id: int
player_answer: str

View File

@@ -0,0 +1,6 @@
from app.services.ai_validator import AIValidator
from app.services.ai_generator import AIGenerator
from app.services.game_manager import GameManager
from app.services.room_manager import RoomManager
__all__ = ["AIValidator", "AIGenerator", "GameManager", "RoomManager"]

View File

@@ -0,0 +1,97 @@
import json
from anthropic import Anthropic
from app.config import get_settings
settings = get_settings()
class AIGenerator:
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
async def generate_questions(
self,
category_name: str,
difficulty: int,
count: int = 5
) -> list[dict]:
"""
Generate trivia questions using Claude AI.
Args:
category_name: Name of the category (e.g., "Nintendo", "Anime")
difficulty: 1-5 (1=very easy, 5=very hard)
count: Number of questions to generate
Returns:
list[dict]: List of question objects
"""
difficulty_descriptions = {
1: "muy fácil - conocimiento básico que la mayoría conoce",
2: "fácil - conocimiento común entre fans casuales",
3: "medio - requiere ser fan de la categoría",
4: "difícil - conocimiento profundo del tema",
5: "muy difícil - solo expertos conocerían esto"
}
prompt = f"""Genera {count} preguntas de trivia para la categoría "{category_name}".
Dificultad: {difficulty} ({difficulty_descriptions.get(difficulty, 'medio')})
Requisitos:
- Las preguntas deben ser verificables y precisas
- Evitar ambigüedades
- Las respuestas deben ser específicas y concisas
- Incluir variaciones comunes de la respuesta
- Para gaming: referencias a juegos, personajes, mecánicas, fechas de lanzamiento
- Para anime: personajes, series, estudios, seiyuus
- Para música: artistas, canciones, álbumes, letras famosas
- Para películas: actores, directores, frases icónicas, premios
- Para libros: autores, obras, personajes literarios
- Para historia-cultura: eventos, fechas, personajes históricos, arte
Formato JSON (array de objetos):
[
{{
"question": "texto de la pregunta",
"correct_answer": "respuesta principal",
"alt_answers": ["variación1", "variación2"],
"fun_fact": "dato curioso opcional sobre la respuesta"
}}
]
Responde SOLO con el JSON, sin texto adicional."""
try:
message = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
messages=[
{"role": "user", "content": prompt}
]
)
response_text = message.content[0].text.strip()
# Parse JSON response
questions = json.loads(response_text)
# Add metadata to each question
for q in questions:
q["difficulty"] = difficulty
q["points"] = settings.default_points.get(difficulty, 300)
q["time_seconds"] = settings.default_times.get(difficulty, 25)
return questions
except json.JSONDecodeError as e:
print(f"Error parsing AI response: {e}")
print(f"Response was: {response_text}")
return []
except Exception as e:
print(f"Error generating questions: {e}")
return []
# Singleton instance
ai_generator = AIGenerator()

View File

@@ -0,0 +1,80 @@
import json
from anthropic import Anthropic
from app.config import get_settings
settings = get_settings()
class AIValidator:
def __init__(self):
self.client = Anthropic(api_key=settings.anthropic_api_key)
async def validate_answer(
self,
question: str,
correct_answer: str,
alt_answers: list[str],
player_answer: str
) -> dict:
"""
Validate if the player's answer is correct using Claude AI.
Returns:
dict: {"valid": bool, "reason": str}
"""
prompt = f"""Eres un validador de trivia. Determina si la respuesta del jugador
es correcta comparándola con la respuesta oficial.
Pregunta: {question}
Respuesta correcta: {correct_answer}
Respuestas alternativas válidas: {', '.join(alt_answers) if alt_answers else 'Ninguna'}
Respuesta del jugador: {player_answer}
Considera válido si:
- Es sinónimo o variación de la respuesta correcta
- Tiene errores menores de ortografía
- Usa abreviaciones comunes (ej: "BOTW" = "Breath of the Wild")
- Es conceptualmente equivalente
Responde SOLO con JSON: {{"valid": true/false, "reason": "breve explicación"}}"""
try:
message = self.client.messages.create(
model="claude-3-haiku-20240307",
max_tokens=150,
messages=[
{"role": "user", "content": prompt}
]
)
response_text = message.content[0].text.strip()
# Parse JSON response
result = json.loads(response_text)
return result
except json.JSONDecodeError:
# If JSON parsing fails, try to extract the result
response_lower = response_text.lower()
if "true" in response_lower:
return {"valid": True, "reason": "Respuesta validada por IA"}
return {"valid": False, "reason": "No se pudo validar la respuesta"}
except Exception as e:
print(f"Error validating answer: {e}")
# Fallback to exact match
player_lower = player_answer.lower().strip()
correct_lower = correct_answer.lower().strip()
if player_lower == correct_lower:
return {"valid": True, "reason": "Coincidencia exacta"}
for alt in alt_answers:
if player_lower == alt.lower().strip():
return {"valid": True, "reason": "Coincide con respuesta alternativa"}
return {"valid": False, "reason": "Respuesta incorrecta"}
# Singleton instance
ai_validator = AIValidator()

View File

@@ -0,0 +1,204 @@
from typing import Optional
from datetime import datetime, timedelta
from app.services.room_manager import room_manager
from app.services.ai_validator import ai_validator
from app.config import get_settings
settings = get_settings()
class GameManager:
async def start_game(self, room_code: str, board: dict) -> Optional[dict]:
"""
Start a game in a room.
Args:
room_code: The room code
board: Dict of category_id -> list of questions
Returns:
Updated room state
"""
room = await room_manager.get_room(room_code)
if not room:
return None
# Check minimum players
if not room["teams"]["A"] or not room["teams"]["B"]:
return None
# Set up game state
room["status"] = "playing"
room["current_team"] = "A"
room["current_player_index"] = {"A": 0, "B": 0}
room["board"] = board
room["scores"] = {"A": 0, "B": 0}
await room_manager.update_room(room_code, room)
return room
async def select_question(
self,
room_code: str,
question_id: int,
category_id: int
) -> Optional[dict]:
"""Select a question from the board."""
room = await room_manager.get_room(room_code)
if not room or room["status"] != "playing":
return None
# Mark question as current
room["current_question"] = question_id
room["can_steal"] = False
# Find and mark question on board
if str(category_id) in room["board"]:
for q in room["board"][str(category_id)]:
if q["id"] == question_id:
q["selected"] = True
break
await room_manager.update_room(room_code, room)
return room
async def submit_answer(
self,
room_code: str,
question: dict,
player_answer: str,
is_steal: bool = False
) -> dict:
"""
Submit an answer for validation.
Returns:
dict with validation result and updated game state
"""
room = await room_manager.get_room(room_code)
if not room:
return {"error": "Room not found"}
# Validate answer with AI
result = await ai_validator.validate_answer(
question=question["question_text"],
correct_answer=question["correct_answer"],
alt_answers=question.get("alt_answers", []),
player_answer=player_answer
)
is_correct = result.get("valid", False)
points = question["points"]
if is_correct:
# Award points
current_team = room["current_team"]
room["scores"][current_team] += points
# Mark question as answered
category_id = str(question["category_id"])
if category_id in room["board"]:
for q in room["board"][category_id]:
if q["id"] == question["id"]:
q["answered"] = True
break
# Winner chooses next
room["current_question"] = None
room["can_steal"] = False
# Advance player rotation
team_players = room["teams"][current_team]
room["current_player_index"][current_team] = (
room["current_player_index"][current_team] + 1
) % len(team_players)
else:
if is_steal:
# Failed steal - penalize
stealing_team = room["current_team"]
penalty = int(points * settings.steal_penalty_multiplier)
room["scores"][stealing_team] = max(
0, room["scores"][stealing_team] - penalty
)
# Mark question as answered (nobody gets it)
category_id = str(question["category_id"])
if category_id in room["board"]:
for q in room["board"][category_id]:
if q["id"] == question["id"]:
q["answered"] = True
break
# Original team chooses next
room["current_team"] = "B" if stealing_team == "A" else "A"
room["current_question"] = None
room["can_steal"] = False
else:
# Original team failed - enable steal
room["can_steal"] = True
# Switch to other team for potential steal
room["current_team"] = "B" if room["current_team"] == "A" else "A"
# Check if game is over (all questions answered)
all_answered = all(
q["answered"]
for questions in room["board"].values()
for q in questions
)
if all_answered:
room["status"] = "finished"
await room_manager.update_room(room_code, room)
return {
"valid": is_correct,
"reason": result.get("reason", ""),
"points_earned": points if is_correct else 0,
"room": room
}
async def pass_steal(self, room_code: str, question_id: int) -> Optional[dict]:
"""Pass on stealing opportunity."""
room = await room_manager.get_room(room_code)
if not room:
return None
# Mark question as answered
for category_id, questions in room["board"].items():
for q in questions:
if q["id"] == question_id:
q["answered"] = True
break
# Switch back to original team for next selection
room["current_team"] = "B" if room["current_team"] == "A" else "A"
room["current_question"] = None
room["can_steal"] = False
await room_manager.update_room(room_code, room)
return room
async def get_current_player(self, room: dict) -> Optional[dict]:
"""Get the current player who should answer."""
team = room["current_team"]
if not team:
return None
players = room["teams"][team]
if not players:
return None
index = room["current_player_index"][team]
return players[index % len(players)]
def calculate_timer_end(self, time_seconds: int, is_steal: bool = False) -> datetime:
"""Calculate when the timer should end."""
if is_steal:
time_seconds = int(time_seconds * settings.steal_time_multiplier)
return datetime.utcnow() + timedelta(seconds=time_seconds)
# Singleton instance
game_manager = GameManager()

View File

@@ -0,0 +1,173 @@
import json
import random
import string
from typing import Optional
import redis.asyncio as redis
from app.config import get_settings
settings = get_settings()
class RoomManager:
def __init__(self):
self.redis: Optional[redis.Redis] = None
async def connect(self):
if not self.redis:
self.redis = await redis.from_url(settings.redis_url)
async def disconnect(self):
if self.redis:
await self.redis.close()
def _generate_room_code(self) -> str:
"""Generate a 6-character room code."""
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
async def create_room(self, player_name: str, socket_id: str) -> dict:
"""Create a new game room."""
await self.connect()
# Generate unique room code
room_code = self._generate_room_code()
while await self.redis.exists(f"room:{room_code}"):
room_code = self._generate_room_code()
# Create room state
room_state = {
"code": room_code,
"status": "waiting",
"host": player_name,
"teams": {
"A": [],
"B": []
},
"current_team": None,
"current_player_index": {"A": 0, "B": 0},
"current_question": None,
"can_steal": False,
"scores": {"A": 0, "B": 0},
"questions_used": [],
"board": {}
}
# Save room state
await self.redis.setex(
f"room:{room_code}",
3600 * 3, # 3 hours TTL
json.dumps(room_state)
)
# Add player to room
await self.add_player(room_code, player_name, "A", socket_id)
return room_state
async def get_room(self, room_code: str) -> Optional[dict]:
"""Get room state by code."""
await self.connect()
data = await self.redis.get(f"room:{room_code}")
if data:
return json.loads(data)
return None
async def update_room(self, room_code: str, room_state: dict) -> bool:
"""Update room state."""
await self.connect()
await self.redis.setex(
f"room:{room_code}",
3600 * 3,
json.dumps(room_state)
)
return True
async def add_player(
self,
room_code: str,
player_name: str,
team: str,
socket_id: str
) -> Optional[dict]:
"""Add a player to a room."""
room = await self.get_room(room_code)
if not room:
return None
# Check if team is full
if len(room["teams"][team]) >= 4:
return None
# Check if name is taken
for t in ["A", "B"]:
for p in room["teams"][t]:
if p["name"].lower() == player_name.lower():
return None
# Add player
player = {
"name": player_name,
"team": team,
"position": len(room["teams"][team]),
"socket_id": socket_id
}
room["teams"][team].append(player)
# Save player mapping
await self.redis.setex(
f"player:{socket_id}",
3600 * 3,
json.dumps({"name": player_name, "room": room_code, "team": team})
)
await self.update_room(room_code, room)
return room
async def remove_player(self, socket_id: str) -> Optional[dict]:
"""Remove a player from their room."""
await self.connect()
# Get player info
player_data = await self.redis.get(f"player:{socket_id}")
if not player_data:
return None
player_info = json.loads(player_data)
room_code = player_info["room"]
team = player_info["team"]
# Get room
room = await self.get_room(room_code)
if not room:
return None
# Remove player from team
room["teams"][team] = [
p for p in room["teams"][team] if p["socket_id"] != socket_id
]
# Update positions
for i, p in enumerate(room["teams"][team]):
p["position"] = i
# Delete player mapping
await self.redis.delete(f"player:{socket_id}")
# If room is empty, delete it
if not room["teams"]["A"] and not room["teams"]["B"]:
await self.redis.delete(f"room:{room_code}")
return None
await self.update_room(room_code, room)
return room
async def get_player(self, socket_id: str) -> Optional[dict]:
"""Get player info by socket ID."""
await self.connect()
data = await self.redis.get(f"player:{socket_id}")
if data:
return json.loads(data)
return None
# Singleton instance
room_manager = RoomManager()

View File

@@ -0,0 +1 @@
# Socket.IO events

View File

@@ -0,0 +1,312 @@
import socketio
from datetime import datetime
from app.services.room_manager import room_manager
from app.services.game_manager import game_manager
def register_socket_events(sio: socketio.AsyncServer):
"""Register all Socket.IO event handlers."""
@sio.event
async def connect(sid, environ):
print(f"Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"Client disconnected: {sid}")
# Remove player from room
room = await room_manager.remove_player(sid)
if room:
await sio.emit(
"player_left",
{"room": room},
room=room["code"]
)
@sio.event
async def create_room(sid, data):
"""Create a new game room."""
player_name = data.get("player_name", "Player")
room = await room_manager.create_room(player_name, sid)
# Join socket room
sio.enter_room(sid, room["code"])
await sio.emit("room_created", {"room": room}, to=sid)
@sio.event
async def join_room(sid, data):
"""Join an existing room."""
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
team = data.get("team", "A")
room = await room_manager.add_player(room_code, player_name, team, sid)
if not room:
await sio.emit(
"error",
{"message": "Could not join room. It may be full or the name is taken."},
to=sid
)
return
# Join socket room
sio.enter_room(sid, room_code)
# Notify all players
await sio.emit("player_joined", {"room": room}, room=room_code)
@sio.event
async def change_team(sid, data):
"""Switch player to another team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
new_team = data.get("team")
room = await room_manager.get_room(room_code)
if not room or len(room["teams"][new_team]) >= 4:
await sio.emit(
"error",
{"message": "Cannot change team. It may be full."},
to=sid
)
return
# Remove from current team
current_team = player["team"]
room["teams"][current_team] = [
p for p in room["teams"][current_team] if p["socket_id"] != sid
]
# Add to new team
room["teams"][new_team].append({
"name": player["name"],
"team": new_team,
"position": len(room["teams"][new_team]),
"socket_id": sid
})
await room_manager.update_room(room_code, room)
await sio.emit("team_changed", {"room": room}, room=room_code)
@sio.event
async def start_game(sid, data):
"""Start the game (host only)."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Check if player is host
if room["host"] != player["name"]:
await sio.emit(
"error",
{"message": "Only the host can start the game."},
to=sid
)
return
# Check minimum players
if not room["teams"]["A"] or not room["teams"]["B"]:
await sio.emit(
"error",
{"message": "Both teams need at least one player."},
to=sid
)
return
# Get board from data or generate
board = data.get("board", {})
updated_room = await game_manager.start_game(room_code, board)
if updated_room:
await sio.emit("game_started", {"room": updated_room}, room=room_code)
@sio.event
async def select_question(sid, data):
"""Select a question from the board."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
question_id = data.get("question_id")
category_id = data.get("category_id")
room = await game_manager.select_question(room_code, question_id, category_id)
if room:
# Get current player info
current_player = await game_manager.get_current_player(room)
await sio.emit(
"question_selected",
{
"room": room,
"question_id": question_id,
"current_player": current_player
},
room=room_code
)
@sio.event
async def submit_answer(sid, data):
"""Submit an answer to the current question."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
answer = data.get("answer", "")
question = data.get("question", {})
is_steal = data.get("is_steal", False)
result = await game_manager.submit_answer(
room_code, question, answer, is_steal
)
if "error" in result:
await sio.emit("error", {"message": result["error"]}, to=sid)
return
await sio.emit(
"answer_result",
{
"player_name": player["name"],
"team": player["team"],
"answer": answer,
"valid": result["valid"],
"reason": result["reason"],
"points_earned": result["points_earned"],
"was_steal": is_steal,
"room": result["room"]
},
room=room_code
)
@sio.event
async def steal_decision(sid, data):
"""Decide whether to attempt stealing."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
attempt = data.get("attempt", False)
question_id = data.get("question_id")
if not attempt:
# Pass on steal
room = await game_manager.pass_steal(room_code, question_id)
if room:
await sio.emit(
"steal_passed",
{"room": room, "team": player["team"]},
room=room_code
)
else:
# Will attempt steal - just notify, answer comes separately
room = await room_manager.get_room(room_code)
await sio.emit(
"steal_attempted",
{
"team": player["team"],
"player_name": player["name"],
"room": room
},
room=room_code
)
@sio.event
async def chat_message(sid, data):
"""Send a chat message to team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
message = data.get("message", "")[:500] # Limit message length
# Get all team members' socket IDs
room = await room_manager.get_room(room_code)
if not room:
return
team_sockets = [
p["socket_id"] for p in room["teams"][player["team"]]
]
# Send only to team members
for socket_id in team_sockets:
await sio.emit(
"chat_message",
{
"player_name": player["name"],
"team": player["team"],
"message": message,
"timestamp": datetime.utcnow().isoformat()
},
to=socket_id
)
@sio.event
async def emoji_reaction(sid, data):
"""Send an emoji reaction visible to all."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
emoji = data.get("emoji", "")
# Validate emoji
allowed_emojis = ["👏", "😮", "😂", "🔥", "💀", "🎉", "😭", "🤔"]
if emoji not in allowed_emojis:
return
await sio.emit(
"emoji_reaction",
{
"player_name": player["name"],
"team": player["team"],
"emoji": emoji
},
room=room_code
)
@sio.event
async def timer_expired(sid, data):
"""Handle timer expiration."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Treat as wrong answer
if room["can_steal"]:
# Steal timer expired - pass
question_id = room["current_question"]
room = await game_manager.pass_steal(room_code, question_id)
await sio.emit("time_up", {"room": room, "was_steal": True}, room=room_code)
else:
# Answer timer expired - enable steal
room["can_steal"] = True
room["current_team"] = "B" if room["current_team"] == "A" else "A"
await room_manager.update_room(room_code, room)
await sio.emit("time_up", {"room": room, "was_steal": False}, room=room_code)