Files
Trivy/backend/app/sockets/game_events.py
consultoria-as 720432702f feat(phase6): Add sounds, team chat, reactions, monitor, settings, and CSV import/export
Sound System:
- Add soundStore with volume/mute persistence
- Add useSound hook with Web Audio API fallback
- Add SoundControl component for in-game volume adjustment
- Play sounds for correct/incorrect, steal, timer, victory/defeat

Team Chat:
- Add TeamChat component with collapsible panel
- Add team_message WebSocket event (team-only visibility)
- Store up to 50 messages per session

Emoji Reactions:
- Add EmojiReactions bar with 8 emojis
- Add ReactionOverlay with floating animations (Framer Motion)
- Add rate limiting (1 reaction per 3 seconds)
- Broadcast reactions to all players in room

Admin Monitor:
- Add Monitor page showing active rooms from Redis
- Display player counts, team composition, status
- Add ability to close problematic rooms

Admin Settings:
- Add Settings page for game configuration
- Configure points/times by difficulty, steal penalty, max players
- Store config in JSON file with service helpers

CSV Import/Export:
- Add export endpoint with optional filters
- Add import endpoint with validation and error reporting
- Add UI buttons and import result modal in Questions page

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 08:58:33 +00:00

597 lines
20 KiB
Python

import socketio
import time
from datetime import datetime
from app.services.room_manager import room_manager
from app.services.game_manager import game_manager
from app.services.replay_manager import replay_manager
from app.services.achievement_manager import achievement_manager
from app.schemas.achievement import PlayerStats
from app.models.base import get_async_session
# Rate limiting para reacciones: {room_code: {player_name: last_reaction_timestamp}}
reaction_rate_limits: dict[str, dict[str, float]] = {}
REACTION_COOLDOWN_SECONDS = 3
async def get_db_session():
"""Helper para obtener sesion de BD en contexto de socket."""
AsyncSessionLocal = get_async_session()
return AsyncSessionLocal()
def register_socket_events(sio: socketio.AsyncServer):
"""Register all Socket.IO event handlers."""
@sio.event
async def connect(sid, environ):
print(f"Client connected: {sid}")
@sio.event
async def disconnect(sid):
print(f"Client disconnected: {sid}")
# Remove player from room
room = await room_manager.remove_player(sid)
if room:
await sio.emit(
"player_left",
{"room": room},
room=room["code"]
)
@sio.event
async def create_room(sid, data):
"""Create a new game room."""
player_name = data.get("player_name", "Player")
room = await room_manager.create_room(player_name, sid)
# Inicializar stats del jugador (host) para logros
await room_manager.init_player_stats(room["code"], player_name)
# Join socket room
sio.enter_room(sid, room["code"])
await sio.emit("room_created", {"room": room}, to=sid)
@sio.event
async def join_room(sid, data):
"""Join an existing room."""
room_code = data.get("room_code", "").upper()
player_name = data.get("player_name", "Player")
team = data.get("team", "A")
room = await room_manager.add_player(room_code, player_name, team, sid)
if not room:
await sio.emit(
"error",
{"message": "Could not join room. It may be full or the name is taken."},
to=sid
)
return
# Inicializar stats del jugador para logros
await room_manager.init_player_stats(room_code, player_name)
# Join socket room
sio.enter_room(sid, room_code)
# Notify all players
await sio.emit("player_joined", {"room": room}, room=room_code)
@sio.event
async def change_team(sid, data):
"""Switch player to another team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
new_team = data.get("team")
room = await room_manager.get_room(room_code)
if not room or len(room["teams"][new_team]) >= 4:
await sio.emit(
"error",
{"message": "Cannot change team. It may be full."},
to=sid
)
return
# Remove from current team
current_team = player["team"]
room["teams"][current_team] = [
p for p in room["teams"][current_team] if p["socket_id"] != sid
]
# Add to new team
room["teams"][new_team].append({
"name": player["name"],
"team": new_team,
"position": len(room["teams"][new_team]),
"socket_id": sid
})
await room_manager.update_room(room_code, room)
await sio.emit("team_changed", {"room": room}, room=room_code)
@sio.event
async def start_game(sid, data):
"""Start the game (host only)."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Check if player is host
if room["host"] != player["name"]:
await sio.emit(
"error",
{"message": "Only the host can start the game."},
to=sid
)
return
# Check minimum players
if not room["teams"]["A"] or not room["teams"]["B"]:
await sio.emit(
"error",
{"message": "Both teams need at least one player."},
to=sid
)
return
# Get board from data or generate
board = data.get("board", {})
updated_room = await game_manager.start_game(room_code, board)
if updated_room:
await sio.emit("game_started", {"room": updated_room}, room=room_code)
@sio.event
async def select_question(sid, data):
"""Select a question from the board."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
question_id = data.get("question_id")
category_id = data.get("category_id")
room = await game_manager.select_question(room_code, question_id, category_id)
if room:
# Get current player info
current_player = await game_manager.get_current_player(room)
await sio.emit(
"question_selected",
{
"room": room,
"question_id": question_id,
"current_player": current_player
},
room=room_code
)
# Guardar evento para replay
if room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="question_selected",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
@sio.event
async def submit_answer(sid, data):
"""Submit an answer to the current question."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
answer = data.get("answer", "")
question = data.get("question", {})
is_steal = data.get("is_steal", False)
result = await game_manager.submit_answer(
room_code, question, answer, is_steal
)
if "error" in result:
await sio.emit("error", {"message": result["error"]}, to=sid)
return
# Actualizar stats del jugador para logros
stats_dict = await room_manager.get_player_stats(room_code, player["name"])
if stats_dict:
stats = PlayerStats(**stats_dict)
# Actualizar con achievement_manager
updated_stats = achievement_manager.update_stats_on_answer(
stats=stats,
was_correct=result["valid"],
was_steal=is_steal,
category_id=question.get("category_id", 0),
points=question.get("points", 0),
answer_time_seconds=data.get("answer_time", 30) # Frontend debe enviar esto
)
# Guardar stats actualizadas
await room_manager.set_player_stats(room_code, player["name"], updated_stats.model_dump())
await sio.emit(
"answer_result",
{
"player_name": player["name"],
"team": player["team"],
"answer": answer,
"valid": result["valid"],
"reason": result["reason"],
"points_earned": result["points_earned"],
"was_steal": is_steal,
"room": result["room"]
},
room=room_code
)
# Guardar evento para replay
room_data = result.get("room", {})
if room_data.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room_data["db_session_id"],
event_type="answer_submitted",
player_name=player["name"],
team=player["team"],
question_id=question.get("id"),
answer_given=answer,
was_correct=result["valid"],
was_steal=is_steal,
points_earned=result["points_earned"]
)
# Verificar si el juego termino (todas las preguntas respondidas)
if room_data.get("status") == "finished":
# Disparar finalizacion automatica
await finish_game_internal(room_code)
@sio.event
async def steal_decision(sid, data):
"""Decide whether to attempt stealing."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
attempt = data.get("attempt", False)
question_id = data.get("question_id")
if not attempt:
# Pass on steal
room = await game_manager.pass_steal(room_code, question_id)
if room:
await sio.emit(
"steal_passed",
{"room": room, "team": player["team"]},
room=room_code
)
# Guardar evento para replay
if room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="steal_passed",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
else:
# Will attempt steal - just notify, answer comes separately
room = await room_manager.get_room(room_code)
await sio.emit(
"steal_attempted",
{
"team": player["team"],
"player_name": player["name"],
"room": room
},
room=room_code
)
# Guardar evento para replay
if room and room.get("db_session_id"):
async with await get_db_session() as db:
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="steal_attempted",
player_name=player["name"],
team=player["team"],
question_id=question_id
)
@sio.event
async def chat_message(sid, data):
"""Send a chat message to team."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
message = data.get("message", "")[:500] # Limit message length
# Get all team members' socket IDs
room = await room_manager.get_room(room_code)
if not room:
return
team_sockets = [
p["socket_id"] for p in room["teams"][player["team"]]
]
# Send only to team members
for socket_id in team_sockets:
await sio.emit(
"chat_message",
{
"player_name": player["name"],
"team": player["team"],
"message": message,
"timestamp": datetime.utcnow().isoformat()
},
to=socket_id
)
@sio.event
async def team_message(sid, data):
"""Send a team chat message - only visible to teammates."""
room_code = data.get("room_code", "")
team = data.get("team", "")
player_name = data.get("player_name", "")
message = data.get("message", "")[:500] # Limit message length
if not all([room_code, team, player_name, message]):
return
# Validate player exists in room
player = await room_manager.get_player(sid)
if not player or player["room"] != room_code or player["team"] != team:
return
# Get room data to find team members
room = await room_manager.get_room(room_code)
if not room:
return
# Get socket IDs of team members
team_sockets = [
p["socket_id"] for p in room["teams"][team]
if p.get("socket_id")
]
# Emit only to team members
message_data = {
"player_name": player_name,
"team": team,
"message": message,
"timestamp": datetime.utcnow().isoformat()
}
for socket_id in team_sockets:
await sio.emit(
"receive_team_message",
message_data,
to=socket_id
)
@sio.event
async def send_reaction(sid, data):
"""Send an emoji reaction visible to all players in the room."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = data.get("room_code", player["room"])
player_name = data.get("player_name", player["name"])
emoji = data.get("emoji", "")
# Validate emoji
allowed_emojis = ["👏", "😮", "😂", "🔥", "💀", "🎉", "😭", "🤔"]
if emoji not in allowed_emojis:
return
# Rate limiting: max 1 reaction every 3 seconds per player
current_time = time.time()
if room_code not in reaction_rate_limits:
reaction_rate_limits[room_code] = {}
last_reaction = reaction_rate_limits[room_code].get(player_name, 0)
if current_time - last_reaction < REACTION_COOLDOWN_SECONDS:
# Player is rate limited, ignore the reaction
return
# Update last reaction time
reaction_rate_limits[room_code][player_name] = current_time
# Emit to ALL players in the room (both teams)
await sio.emit(
"receive_reaction",
{
"player_name": player_name,
"team": player["team"],
"emoji": emoji,
"timestamp": datetime.utcnow().isoformat()
},
room=room_code
)
# Keep old event name for backwards compatibility
@sio.event
async def emoji_reaction(sid, data):
"""Alias for send_reaction (backwards compatibility)."""
await send_reaction(sid, data)
@sio.event
async def timer_expired(sid, data):
"""Handle timer expiration."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room:
return
# Treat as wrong answer
if room["can_steal"]:
# Steal timer expired - pass
question_id = room["current_question"]
room = await game_manager.pass_steal(room_code, question_id)
await sio.emit("time_up", {"room": room, "was_steal": True}, room=room_code)
else:
# Answer timer expired - enable steal
room["can_steal"] = True
room["current_team"] = "B" if room["current_team"] == "A" else "A"
await room_manager.update_room(room_code, room)
await sio.emit("time_up", {"room": room, "was_steal": False}, room=room_code)
async def finish_game_internal(room_code: str):
"""
Funcion interna para finalizar la partida.
Se llama automaticamente cuando todas las preguntas fueron respondidas,
o manualmente desde el evento finish_game.
"""
room = await room_manager.get_room(room_code)
if not room or room["status"] != "finished":
return
async with await get_db_session() as db:
# 1. Guardar evento de fin de partida
if room.get("db_session_id"):
await replay_manager.save_game_event(
db=db,
session_id=room["db_session_id"],
event_type="game_finished",
player_name="system",
team="",
points_earned=0
)
# 2. Finalizar sesion en BD
team_a_score = room["scores"]["A"]
team_b_score = room["scores"]["B"]
questions_used = [
q["id"]
for questions in room["board"].values()
for q in questions
if q.get("answered")
]
db_session = await game_manager.finish_game(
db, room_code, team_a_score, team_b_score, questions_used
)
# 3. Generar codigo de replay
replay_code = None
if db_session:
replay_code = replay_manager.generate_replay_code(db_session.id)
# 4. Determinar ganador
winner = None
if team_a_score > team_b_score:
winner = "A"
elif team_b_score > team_a_score:
winner = "B"
# else: empate
# 5. Verificar logros para cada jugador
all_achievements = []
for team in ["A", "B"]:
for player_info in room["teams"][team]:
stats_dict = await room_manager.get_player_stats(room_code, player_info["name"])
if stats_dict:
stats = PlayerStats(**stats_dict)
# Contexto del juego para este jugador
player_won = (winner == team)
own_score = team_a_score if team == "A" else team_b_score
opp_score = team_b_score if team == "A" else team_a_score
game_context = {
"won": player_won,
"team_score": own_score,
"opponent_score": opp_score,
"max_deficit_overcome": max(0, opp_score - own_score) if player_won else 0,
"categories_swept": [], # TODO: calcular si completo categoria
"no_mistakes": stats.total_correct == stats.total_correct + 0 # TODO: rastrear errores
}
unlocked = await achievement_manager.check_achievements(db, stats, game_context)
for ach in unlocked:
all_achievements.append({
"player_name": player_info["name"],
"team": team,
"achievement": ach.achievement.model_dump() if hasattr(ach.achievement, 'model_dump') else ach.achievement
})
# 6. Actualizar estado de la sala (ya esta en finished desde game_manager)
await room_manager.update_room(room_code, room)
# 7. Emitir evento a todos
await sio.emit(
"game_finished",
{
"room": room,
"winner": winner,
"final_scores": {
"A": team_a_score,
"B": team_b_score
},
"replay_code": replay_code,
"achievements_unlocked": all_achievements
},
room=room_code
)
@sio.event
async def finish_game(sid, data):
"""Finaliza la partida y procesa resultados."""
player = await room_manager.get_player(sid)
if not player:
return
room_code = player["room"]
room = await room_manager.get_room(room_code)
if not room or room["status"] != "playing":
return
# Solo el host puede finalizar (o se detecta automaticamente)
if room["host"] != player["name"] and not data.get("auto_finish"):
return
# Marcar como terminado
room["status"] = "finished"
await room_manager.update_room(room_code, room)
# Procesar finalizacion
await finish_game_internal(room_code)