Files
Trivy/backend/app/sockets/game_events.py
consultoria-as 0141153653 feat(phase3): Implement complete game logic with WebSocket events
Timer System:
- Add TimerManager service with asyncio for server-side timers
- Support steal time reduction (50% time)
- Automatic timer cancellation on answer

Question Loading:
- Add QuestionService to load daily questions from PostgreSQL
- Generate 8×5 board (categories × difficulties)
- Filter by date_active and approved status

Database Integration:
- Create GameSession in PostgreSQL when game starts
- Update scores during game and on finish
- Store db_session_id in Redis for cross-reference

Replay Integration:
- Save all game events: question_selected, answer_submitted, steal_attempted, steal_passed, game_finished
- Generate unique replay code on game finish

Achievement Integration:
- Initialize PlayerStats in Redis when joining room
- Update stats on every answer (streak, category, speed, etc.)
- Check achievements on game finish for all players

Game Finish:
- Automatic finish when all questions answered
- Manual finish by host
- Emit game_finished with winner, scores, replay_code, achievements

Phase 3 tasks completed:
- F3.1: Timer manager with asyncio
- F3.2: Question service for board loading
- F3.3: GameSession PostgreSQL integration
- F3.4: Replay event saving
- F3.5: Achievement stats tracking
- F3.6: Complete game finish flow

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 08:32:22 +00:00

527 lines
18 KiB
Python

import socketio
from datetime import datetime
from app.services.room_manager import room_manager
from app.services.game_manager import game_manager
from app.services.replay_manager import replay_manager
from app.services.achievement_manager import achievement_manager
from app.schemas.achievement import PlayerStats
from app.models.base import get_async_session
async def get_db_session():
"""Helper para obtener sesion de BD en contexto de socket."""
AsyncSessionLocal = get_async_session()
return AsyncSessionLocal()
def register_socket_events(sio: socketio.AsyncServer):
"""Register all Socket.IO event handlers."""
@sio.event
async def connect(sid, environ):
print(f"Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"Client disconnected: {sid}")
# Remove player from room
room = await room_manager.remove_player(sid)
if room:
await sio.emit(
"player_left",
{"room": room},
room=room["code"]
)
@sio.event
async def create_room(sid, data):
"""Create a new game room."""
player_name = data.get("player_name", "Player")
room = await room_manager.create_room(player_name, sid)
# Inicializar stats del jugador (host) para logros
await room_manager.init_player_stats(room["code"], player_name)
# Join socket room
sio.enter_room(sid, room["code"])
await sio.emit("room_created", {"room": room}, to=sid)
@sio.event
async def join_room(sid, data):
"""Join an existing room."""
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
team = data.get("team", "A")
room = await room_manager.add_player(room_code, player_name, team, sid)
if not room:
await sio.emit(
"error",
{"message": "Could not join room. It may be full or the name is taken."},
to=sid
)
return
# Inicializar stats del jugador para logros
await room_manager.init_player_stats(room_code, player_name)
# Join socket room
sio.enter_room(sid, room_code)
# Notify all players
await sio.emit("player_joined", {"room": room}, room=room_code)
@sio.event
async def change_team(sid, data):
"""Switch player to another team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
new_team = data.get("team")
room = await room_manager.get_room(room_code)
if not room or len(room["teams"][new_team]) >= 4:
await sio.emit(
"error",
{"message": "Cannot change team. It may be full."},
to=sid
)
return
# Remove from current team
current_team = player["team"]
room["teams"][current_team] = [
p for p in room["teams"][current_team] if p["socket_id"] != sid
]
# Add to new team
room["teams"][new_team].append({
"name": player["name"],
"team": new_team,
"position": len(room["teams"][new_team]),
"socket_id": sid
})
await room_manager.update_room(room_code, room)
await sio.emit("team_changed", {"room": room}, room=room_code)
@sio.event
async def start_game(sid, data):
"""Start the game (host only)."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Check if player is host
if room["host"] != player["name"]:
await sio.emit(
"error",
{"message": "Only the host can start the game."},
to=sid
)
return
# Check minimum players
if not room["teams"]["A"] or not room["teams"]["B"]:
await sio.emit(
"error",
{"message": "Both teams need at least one player."},
to=sid
)
return
# Get board from data or generate
board = data.get("board", {})
updated_room = await game_manager.start_game(room_code, board)
if updated_room:
await sio.emit("game_started", {"room": updated_room}, room=room_code)
@sio.event
async def select_question(sid, data):
"""Select a question from the board."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
question_id = data.get("question_id")
category_id = data.get("category_id")
room = await game_manager.select_question(room_code, question_id, category_id)
if room:
# Get current player info
current_player = await game_manager.get_current_player(room)
await sio.emit(
"question_selected",
{
"room": room,
"question_id": question_id,
"current_player": current_player
},
room=room_code
)
# Guardar evento para replay
if room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="question_selected",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
@sio.event
async def submit_answer(sid, data):
"""Submit an answer to the current question."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
answer = data.get("answer", "")
question = data.get("question", {})
is_steal = data.get("is_steal", False)
result = await game_manager.submit_answer(
room_code, question, answer, is_steal
)
if "error" in result:
await sio.emit("error", {"message": result["error"]}, to=sid)
return
# Actualizar stats del jugador para logros
stats_dict = await room_manager.get_player_stats(room_code, player["name"])
if stats_dict:
stats = PlayerStats(**stats_dict)
# Actualizar con achievement_manager
updated_stats = achievement_manager.update_stats_on_answer(
stats=stats,
was_correct=result["valid"],
was_steal=is_steal,
category_id=question.get("category_id", 0),
points=question.get("points", 0),
answer_time_seconds=data.get("answer_time", 30) # Frontend debe enviar esto
)
# Guardar stats actualizadas
await room_manager.set_player_stats(room_code, player["name"], updated_stats.model_dump())
await sio.emit(
"answer_result",
{
"player_name": player["name"],
"team": player["team"],
"answer": answer,
"valid": result["valid"],
"reason": result["reason"],
"points_earned": result["points_earned"],
"was_steal": is_steal,
"room": result["room"]
},
room=room_code
)
# Guardar evento para replay
room_data = result.get("room", {})
if room_data.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room_data["db_session_id"],
event_type="answer_submitted",
player_name=player["name"],
team=player["team"],
question_id=question.get("id"),
answer_given=answer,
was_correct=result["valid"],
was_steal=is_steal,
points_earned=result["points_earned"]
)
# Verificar si el juego termino (todas las preguntas respondidas)
if room_data.get("status") == "finished":
# Disparar finalizacion automatica
await finish_game_internal(room_code)
@sio.event
async def steal_decision(sid, data):
"""Decide whether to attempt stealing."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
attempt = data.get("attempt", False)
question_id = data.get("question_id")
if not attempt:
# Pass on steal
room = await game_manager.pass_steal(room_code, question_id)
if room:
await sio.emit(
"steal_passed",
{"room": room, "team": player["team"]},
room=room_code
)
# Guardar evento para replay
if room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="steal_passed",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
else:
# Will attempt steal - just notify, answer comes separately
room = await room_manager.get_room(room_code)
await sio.emit(
"steal_attempted",
{
"team": player["team"],
"player_name": player["name"],
"room": room
},
room=room_code
)
# Guardar evento para replay
if room and room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="steal_attempted",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
@sio.event
async def chat_message(sid, data):
"""Send a chat message to team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
message = data.get("message", "")[:500] # Limit message length
# Get all team members' socket IDs
room = await room_manager.get_room(room_code)
if not room:
return
team_sockets = [
p["socket_id"] for p in room["teams"][player["team"]]
]
# Send only to team members
for socket_id in team_sockets:
await sio.emit(
"chat_message",
{
"player_name": player["name"],
"team": player["team"],
"message": message,
"timestamp": datetime.utcnow().isoformat()
},
to=socket_id
)
@sio.event
async def emoji_reaction(sid, data):
"""Send an emoji reaction visible to all."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
emoji = data.get("emoji", "")
# Validate emoji
allowed_emojis = ["👏", "😮", "😂", "🔥", "💀", "🎉", "😭", "🤔"]
if emoji not in allowed_emojis:
return
await sio.emit(
"emoji_reaction",
{
"player_name": player["name"],
"team": player["team"],
"emoji": emoji
},
room=room_code
)
@sio.event
async def timer_expired(sid, data):
"""Handle timer expiration."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Treat as wrong answer
if room["can_steal"]:
# Steal timer expired - pass
question_id = room["current_question"]
room = await game_manager.pass_steal(room_code, question_id)
await sio.emit("time_up", {"room": room, "was_steal": True}, room=room_code)
else:
# Answer timer expired - enable steal
room["can_steal"] = True
room["current_team"] = "B" if room["current_team"] == "A" else "A"
await room_manager.update_room(room_code, room)
await sio.emit("time_up", {"room": room, "was_steal": False}, room=room_code)
async def finish_game_internal(room_code: str):
"""
Funcion interna para finalizar la partida.
Se llama automaticamente cuando todas las preguntas fueron respondidas,
o manualmente desde el evento finish_game.
"""
room = await room_manager.get_room(room_code)
if not room or room["status"] != "finished":
return
async with await get_db_session() as db:
# 1. Guardar evento de fin de partida
if room.get("db_session_id"):
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="game_finished",
player_name="system",
team="",
points_earned=0
)
# 2. Finalizar sesion en BD
team_a_score = room["scores"]["A"]
team_b_score = room["scores"]["B"]
questions_used = [
q["id"]
for questions in room["board"].values()
for q in questions
if q.get("answered")
]
db_session = await game_manager.finish_game(
db, room_code, team_a_score, team_b_score, questions_used
)
# 3. Generar codigo de replay
replay_code = None
if db_session:
replay_code = replay_manager.generate_replay_code(db_session.id)
# 4. Determinar ganador
winner = None
if team_a_score > team_b_score:
winner = "A"
elif team_b_score > team_a_score:
winner = "B"
# else: empate
# 5. Verificar logros para cada jugador
all_achievements = []
for team in ["A", "B"]:
for player_info in room["teams"][team]:
stats_dict = await room_manager.get_player_stats(room_code, player_info["name"])
if stats_dict:
stats = PlayerStats(**stats_dict)
# Contexto del juego para este jugador
player_won = (winner == team)
own_score = team_a_score if team == "A" else team_b_score
opp_score = team_b_score if team == "A" else team_a_score
game_context = {
"won": player_won,
"team_score": own_score,
"opponent_score": opp_score,
"max_deficit_overcome": max(0, opp_score - own_score) if player_won else 0,
"categories_swept": [], # TODO: calcular si completo categoria
"no_mistakes": stats.total_correct == stats.total_correct + 0 # TODO: rastrear errores
}
unlocked = await achievement_manager.check_achievements(db, stats, game_context)
for ach in unlocked:
all_achievements.append({
"player_name": player_info["name"],
"team": team,
"achievement": ach.achievement.model_dump() if hasattr(ach.achievement, 'model_dump') else ach.achievement
})
# 6. Actualizar estado de la sala (ya esta en finished desde game_manager)
await room_manager.update_room(room_code, room)
# 7. Emitir evento a todos
await sio.emit(
"game_finished",
{
"room": room,
"winner": winner,
"final_scores": {
"A": team_a_score,
"B": team_b_score
},
"replay_code": replay_code,
"achievements_unlocked": all_achievements
},
room=room_code
)
@sio.event
async def finish_game(sid, data):
"""Finaliza la partida y procesa resultados."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room or room["status"] != "playing":
return
# Solo el host puede finalizar (o se detecta automaticamente)
if room["host"] != player["name"] and not data.get("auto_finish"):
return
# Marcar como terminado
room["status"] = "finished"
await room_manager.update_room(room_code, room)
# Procesar finalizacion
await finish_game_internal(room_code)