Files
Trivy/backend/app/services/game_manager.py
consultoria-as be5b1775a0 feat: sistema de 2 rondas con puntos dobles
Ronda 1: 5 categorías con puntos normales (100-500)
Ronda 2: 5 categorías diferentes con puntos x2 (200-1000)

Backend:
- question_service: soporta excluir categorías y multiplicador de puntos
- game_manager: trackea current_round, start_round_2() carga nuevo tablero
- game_events: emite round_started al completar ronda 1

Frontend:
- useSocket: escucha evento round_started
- Game.tsx: muestra indicador de ronda actual
- types: GameRoom incluye current_round

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-27 02:28:28 +00:00

377 lines
12 KiB
Python

from typing import Optional
from datetime import datetime, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.services.room_manager import room_manager
from app.services.ai_validator import ai_validator
from app.services.question_service import question_service
from app.models.game_session import GameSession
from app.config import get_settings
settings = get_settings()
class GameManager:
async def start_game(self, room_code: str, board: dict) -> Optional[dict]:
"""
Start a game in a room.
Args:
room_code: The room code
board: Dict of category_id -> list of questions
Returns:
Updated room state
"""
room = await room_manager.get_room(room_code)
if not room:
return None
# Check minimum players
if not room["teams"]["A"] or not room["teams"]["B"]:
return None
# Set up game state
room["status"] = "playing"
room["current_team"] = "A"
room["current_player_index"] = {"A": 0, "B": 0}
room["board"] = board
room["scores"] = {"A": 0, "B": 0}
room["current_round"] = 1
room["round1_categories"] = [int(cat_id) for cat_id in board.keys()]
await room_manager.update_room(room_code, room)
return room
async def select_question(
self,
room_code: str,
question_id: int,
category_id: int
) -> Optional[dict]:
"""Select a question from the board."""
room = await room_manager.get_room(room_code)
if not room or room["status"] != "playing":
return None
# Mark question as current
room["current_question"] = question_id
room["can_steal"] = False
# Find and mark question on board
if str(category_id) in room["board"]:
for q in room["board"][str(category_id)]:
if q["id"] == question_id:
q["selected"] = True
break
await room_manager.update_room(room_code, room)
return room
async def submit_answer(
self,
room_code: str,
question: dict,
player_answer: str,
is_steal: bool = False
) -> dict:
"""
Submit an answer for validation.
Returns:
dict with validation result and updated game state
"""
room = await room_manager.get_room(room_code)
if not room:
return {"error": "Room not found"}
# Validate answer with AI
result = await ai_validator.validate_answer(
question=question["question_text"],
correct_answer=question["correct_answer"],
alt_answers=question.get("alt_answers", []),
player_answer=player_answer
)
is_correct = result.get("valid", False)
points = question["points"]
if is_correct:
# Award points
current_team = room["current_team"]
room["scores"][current_team] += points
# Mark question as answered
category_id = str(question["category_id"])
if category_id in room["board"]:
for q in room["board"][category_id]:
if q["id"] == question["id"]:
q["answered"] = True
break
# Winner chooses next
room["current_question"] = None
room["can_steal"] = False
# Advance player rotation
team_players = room["teams"][current_team]
room["current_player_index"][current_team] = (
room["current_player_index"][current_team] + 1
) % len(team_players)
else:
if is_steal:
# Failed steal - penalize
stealing_team = room["current_team"]
penalty = int(points * settings.steal_penalty_multiplier)
room["scores"][stealing_team] = max(
0, room["scores"][stealing_team] - penalty
)
# Mark question as answered (nobody gets it)
category_id = str(question["category_id"])
if category_id in room["board"]:
for q in room["board"][category_id]:
if q["id"] == question["id"]:
q["answered"] = True
break
# Original team chooses next
room["current_team"] = "B" if stealing_team == "A" else "A"
room["current_question"] = None
room["can_steal"] = False
else:
# Original team failed - enable steal
failed_team = room["current_team"]
room["can_steal"] = True
# Advance failed team's player index (they had their turn)
team_players = room["teams"][failed_team]
room["current_player_index"][failed_team] = (
room["current_player_index"][failed_team] + 1
) % len(team_players)
# Switch to other team for potential steal
room["current_team"] = "B" if failed_team == "A" else "A"
# Check if round is over (all questions answered)
all_answered = all(
q["answered"]
for questions in room["board"].values()
for q in questions
)
if all_answered:
current_round = room.get("current_round", 1)
if current_round == 1:
# Round 1 finished - need to start round 2
room["round_finished"] = True
else:
# Round 2 finished - game over
room["status"] = "finished"
await room_manager.update_room(room_code, room)
return {
"valid": is_correct,
"reason": result.get("reason", ""),
"points_earned": points if is_correct else 0,
"room": room
}
async def pass_steal(self, room_code: str, question_id: int) -> Optional[dict]:
"""Pass on stealing opportunity."""
room = await room_manager.get_room(room_code)
if not room:
return None
# Mark question as answered
for category_id, questions in room["board"].items():
for q in questions:
if q["id"] == question_id:
q["answered"] = True
break
# Switch back to original team for next selection
room["current_team"] = "B" if room["current_team"] == "A" else "A"
room["current_question"] = None
room["can_steal"] = False
# Check if round is over
all_answered = all(
q["answered"]
for questions in room["board"].values()
for q in questions
)
if all_answered:
current_round = room.get("current_round", 1)
if current_round == 1:
room["round_finished"] = True
else:
room["status"] = "finished"
await room_manager.update_room(room_code, room)
return room
async def get_current_player(self, room: dict) -> Optional[dict]:
"""Get the current player who should answer."""
team = room["current_team"]
if not team:
return None
players = room["teams"][team]
if not players:
return None
index = room["current_player_index"][team]
return players[index % len(players)]
async def start_round_2(
self,
db: AsyncSession,
room_code: str
) -> Optional[dict]:
"""
Start round 2 with different categories and double points.
"""
room = await room_manager.get_room(room_code)
if not room:
return None
# Get categories used in round 1
round1_categories = room.get("round1_categories", [])
# Get new board excluding round 1 categories, with 2x points
new_board = await question_service.get_board_for_game(
db,
exclude_categories=round1_categories,
point_multiplier=2
)
if not new_board:
# Not enough categories for round 2 - end game
room["status"] = "finished"
await room_manager.update_room(room_code, room)
return room
# Update room for round 2
room["board"] = new_board
room["current_round"] = 2
room["round_finished"] = False
room["current_question"] = None
room["can_steal"] = False
# Keep current_team - winner of last question picks first
await room_manager.update_room(room_code, room)
return room
def calculate_timer_end(self, time_seconds: int, is_steal: bool = False) -> datetime:
"""Calculate when the timer should end."""
if is_steal:
time_seconds = int(time_seconds * settings.steal_time_multiplier)
return datetime.utcnow() + timedelta(seconds=time_seconds)
# ============================================================
# Database Integration Methods
# ============================================================
async def create_db_session(
self,
db: AsyncSession,
room_code: str
) -> GameSession:
"""Crea una sesión de juego en PostgreSQL."""
session = GameSession(
room_code=room_code,
status="waiting"
)
db.add(session)
await db.commit()
await db.refresh(session)
return session
async def get_db_session(
self,
db: AsyncSession,
room_code: str
) -> Optional[GameSession]:
"""Obtiene sesión de BD por room_code."""
result = await db.execute(
select(GameSession).where(GameSession.room_code == room_code)
)
return result.scalar_one_or_none()
async def update_db_session(
self,
db: AsyncSession,
room_code: str,
**kwargs
) -> Optional[GameSession]:
"""Actualiza sesión en BD."""
session = await self.get_db_session(db, room_code)
if session:
for key, value in kwargs.items():
if hasattr(session, key):
setattr(session, key, value)
await db.commit()
await db.refresh(session)
return session
async def start_game_with_db(
self,
db: AsyncSession,
room_code: str
) -> Optional[dict]:
"""
Inicia juego: crea sesión en BD, carga tablero, actualiza Redis.
"""
# Crear sesión en BD
db_session = await self.create_db_session(db, room_code)
# Cargar tablero del día
board = await question_service.get_board_for_game(db)
if not board:
# No hay preguntas para hoy
return None
# Iniciar en Redis (método existente)
room = await self.start_game(room_code, board)
if room:
# Guardar session_id en Redis para referencia
room["db_session_id"] = db_session.id
await room_manager.update_room(room_code, room)
# Actualizar BD
await self.update_db_session(
db, room_code,
status="playing"
)
return room
async def finish_game(
self,
db: AsyncSession,
room_code: str,
team_a_score: int,
team_b_score: int,
questions_used: list
) -> Optional[GameSession]:
"""Finaliza el juego y guarda en BD."""
session = await self.update_db_session(
db, room_code,
status="finished",
team_a_score=team_a_score,
team_b_score=team_b_score,
questions_used=questions_used,
finished_at=datetime.utcnow()
)
return session
# Singleton instance
game_manager = GameManager()