- Backend: FastAPI + Python-SocketIO + SQLAlchemy - Models for categories, questions, game sessions, events - AI services for answer validation and question generation (Claude) - Room management with Redis - Game logic with stealing mechanics - Admin API for question management - Frontend: React + Vite + TypeScript + Tailwind - 5 visual themes (DRRR, Retro, Minimal, RGB, Anime 90s) - Real-time game with Socket.IO - Achievement system - Replay functionality - Sound effects per theme - Docker Compose for deployment - Design documentation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
205 lines
6.5 KiB
Python
205 lines
6.5 KiB
Python
from typing import Optional
|
|
from datetime import datetime, timedelta
|
|
from app.services.room_manager import room_manager
|
|
from app.services.ai_validator import ai_validator
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
class GameManager:
|
|
async def start_game(self, room_code: str, board: dict) -> Optional[dict]:
|
|
"""
|
|
Start a game in a room.
|
|
|
|
Args:
|
|
room_code: The room code
|
|
board: Dict of category_id -> list of questions
|
|
|
|
Returns:
|
|
Updated room state
|
|
"""
|
|
room = await room_manager.get_room(room_code)
|
|
if not room:
|
|
return None
|
|
|
|
# Check minimum players
|
|
if not room["teams"]["A"] or not room["teams"]["B"]:
|
|
return None
|
|
|
|
# Set up game state
|
|
room["status"] = "playing"
|
|
room["current_team"] = "A"
|
|
room["current_player_index"] = {"A": 0, "B": 0}
|
|
room["board"] = board
|
|
room["scores"] = {"A": 0, "B": 0}
|
|
|
|
await room_manager.update_room(room_code, room)
|
|
return room
|
|
|
|
async def select_question(
|
|
self,
|
|
room_code: str,
|
|
question_id: int,
|
|
category_id: int
|
|
) -> Optional[dict]:
|
|
"""Select a question from the board."""
|
|
room = await room_manager.get_room(room_code)
|
|
if not room or room["status"] != "playing":
|
|
return None
|
|
|
|
# Mark question as current
|
|
room["current_question"] = question_id
|
|
room["can_steal"] = False
|
|
|
|
# Find and mark question on board
|
|
if str(category_id) in room["board"]:
|
|
for q in room["board"][str(category_id)]:
|
|
if q["id"] == question_id:
|
|
q["selected"] = True
|
|
break
|
|
|
|
await room_manager.update_room(room_code, room)
|
|
return room
|
|
|
|
async def submit_answer(
|
|
self,
|
|
room_code: str,
|
|
question: dict,
|
|
player_answer: str,
|
|
is_steal: bool = False
|
|
) -> dict:
|
|
"""
|
|
Submit an answer for validation.
|
|
|
|
Returns:
|
|
dict with validation result and updated game state
|
|
"""
|
|
room = await room_manager.get_room(room_code)
|
|
if not room:
|
|
return {"error": "Room not found"}
|
|
|
|
# Validate answer with AI
|
|
result = await ai_validator.validate_answer(
|
|
question=question["question_text"],
|
|
correct_answer=question["correct_answer"],
|
|
alt_answers=question.get("alt_answers", []),
|
|
player_answer=player_answer
|
|
)
|
|
|
|
is_correct = result.get("valid", False)
|
|
points = question["points"]
|
|
|
|
if is_correct:
|
|
# Award points
|
|
current_team = room["current_team"]
|
|
room["scores"][current_team] += points
|
|
|
|
# Mark question as answered
|
|
category_id = str(question["category_id"])
|
|
if category_id in room["board"]:
|
|
for q in room["board"][category_id]:
|
|
if q["id"] == question["id"]:
|
|
q["answered"] = True
|
|
break
|
|
|
|
# Winner chooses next
|
|
room["current_question"] = None
|
|
room["can_steal"] = False
|
|
|
|
# Advance player rotation
|
|
team_players = room["teams"][current_team]
|
|
room["current_player_index"][current_team] = (
|
|
room["current_player_index"][current_team] + 1
|
|
) % len(team_players)
|
|
|
|
else:
|
|
if is_steal:
|
|
# Failed steal - penalize
|
|
stealing_team = room["current_team"]
|
|
penalty = int(points * settings.steal_penalty_multiplier)
|
|
room["scores"][stealing_team] = max(
|
|
0, room["scores"][stealing_team] - penalty
|
|
)
|
|
|
|
# Mark question as answered (nobody gets it)
|
|
category_id = str(question["category_id"])
|
|
if category_id in room["board"]:
|
|
for q in room["board"][category_id]:
|
|
if q["id"] == question["id"]:
|
|
q["answered"] = True
|
|
break
|
|
|
|
# Original team chooses next
|
|
room["current_team"] = "B" if stealing_team == "A" else "A"
|
|
room["current_question"] = None
|
|
room["can_steal"] = False
|
|
|
|
else:
|
|
# Original team failed - enable steal
|
|
room["can_steal"] = True
|
|
# Switch to other team for potential steal
|
|
room["current_team"] = "B" if room["current_team"] == "A" else "A"
|
|
|
|
# Check if game is over (all questions answered)
|
|
all_answered = all(
|
|
q["answered"]
|
|
for questions in room["board"].values()
|
|
for q in questions
|
|
)
|
|
if all_answered:
|
|
room["status"] = "finished"
|
|
|
|
await room_manager.update_room(room_code, room)
|
|
|
|
return {
|
|
"valid": is_correct,
|
|
"reason": result.get("reason", ""),
|
|
"points_earned": points if is_correct else 0,
|
|
"room": room
|
|
}
|
|
|
|
async def pass_steal(self, room_code: str, question_id: int) -> Optional[dict]:
|
|
"""Pass on stealing opportunity."""
|
|
room = await room_manager.get_room(room_code)
|
|
if not room:
|
|
return None
|
|
|
|
# Mark question as answered
|
|
for category_id, questions in room["board"].items():
|
|
for q in questions:
|
|
if q["id"] == question_id:
|
|
q["answered"] = True
|
|
break
|
|
|
|
# Switch back to original team for next selection
|
|
room["current_team"] = "B" if room["current_team"] == "A" else "A"
|
|
room["current_question"] = None
|
|
room["can_steal"] = False
|
|
|
|
await room_manager.update_room(room_code, room)
|
|
return room
|
|
|
|
async def get_current_player(self, room: dict) -> Optional[dict]:
|
|
"""Get the current player who should answer."""
|
|
team = room["current_team"]
|
|
if not team:
|
|
return None
|
|
|
|
players = room["teams"][team]
|
|
if not players:
|
|
return None
|
|
|
|
index = room["current_player_index"][team]
|
|
return players[index % len(players)]
|
|
|
|
def calculate_timer_end(self, time_seconds: int, is_steal: bool = False) -> datetime:
|
|
"""Calculate when the timer should end."""
|
|
if is_steal:
|
|
time_seconds = int(time_seconds * settings.steal_time_multiplier)
|
|
return datetime.utcnow() + timedelta(seconds=time_seconds)
|
|
|
|
|
|
# Singleton instance
|
|
game_manager = GameManager()
|