Files
Trivy/backend/app/services/replay_manager.py
consultoria-as 27ac4cb0cf feat(phase2): Add achievements and replay systems
Achievement System:
- Add Achievement model with condition types (streak, steal, specialist, etc.)
- Add AchievementManager service for tracking and awarding achievements
- Add Pydantic schemas for achievements (AchievementResponse, PlayerStats, etc.)
- Seed 18 achievements from design doc
- Add GET /api/game/achievements endpoint

Replay System:
- Add ReplayManager service for saving/loading game replays
- Add GET /api/replay/{code} and /api/replay/session/{id} endpoints
- Format replays for frontend consumption

Phase 2 tasks completed:
- F2.1: Achievement model and migration
- F2.2: Pydantic schemas
- F2.3: AchievementManager service
- F2.4: ReplayManager service
- F2.5: API endpoints
- F2.6: Seed 18 achievements data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 08:24:02 +00:00

220 lines
6.8 KiB
Python

import hashlib
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.game_session import GameSession
from app.models.game_event import GameEvent
class ReplayManager:
def generate_replay_code(self, session_id: int) -> str:
"""
Genera codigo unico de replay basado en session_id.
Args:
session_id: ID de la sesion de juego
Returns:
Codigo de replay de 8 caracteres
"""
# Usar hash corto del session_id + timestamp actual
timestamp = datetime.utcnow().isoformat()
data = f"{session_id}:{timestamp}"
hash_digest = hashlib.sha256(data.encode()).hexdigest()
# Retornar los primeros 8 caracteres en mayusculas
return hash_digest[:8].upper()
async def save_game_event(
self,
db: AsyncSession,
session_id: int,
event_type: str,
player_name: str,
team: str,
question_id: Optional[int] = None,
answer_given: Optional[str] = None,
was_correct: Optional[bool] = None,
was_steal: bool = False,
points_earned: int = 0
) -> GameEvent:
"""
Guarda un evento de juego para replay.
Args:
db: Sesion de base de datos
session_id: ID de la sesion de juego
event_type: Tipo de evento (question_selected, answer_submitted,
steal_attempted, steal_passed, game_finished)
player_name: Nombre del jugador
team: Equipo ('A' o 'B')
question_id: ID de la pregunta (opcional)
answer_given: Respuesta dada por el jugador (opcional)
was_correct: Si la respuesta fue correcta (opcional)
was_steal: Si fue un intento de robo
points_earned: Puntos ganados
Returns:
GameEvent creado
"""
event = GameEvent(
session_id=session_id,
event_type=event_type,
player_name=player_name,
team=team,
question_id=question_id,
answer_given=answer_given,
was_correct=was_correct,
was_steal=was_steal,
points_earned=points_earned
)
db.add(event)
await db.commit()
await db.refresh(event)
return event
async def get_replay(
self,
db: AsyncSession,
replay_code: str
) -> Optional[dict]:
"""
Obtiene replay completo por codigo.
Args:
db: Sesion de base de datos
replay_code: Codigo del replay
Returns:
Diccionario con datos del replay o None si no existe
"""
# Buscar session por room_code que contiene el replay_code
# El replay_code puede estar almacenado en room_code o como sufijo
result = await db.execute(
select(GameSession).where(
GameSession.room_code.contains(replay_code)
)
)
session = result.scalar_one_or_none()
if not session:
# Intentar busqueda exacta
result = await db.execute(
select(GameSession).where(
GameSession.room_code == replay_code
)
)
session = result.scalar_one_or_none()
if not session:
return None
return await self.get_replay_by_session(db, session.id)
async def get_replay_by_session(
self,
db: AsyncSession,
session_id: int
) -> Optional[dict]:
"""
Obtiene replay por session_id.
Args:
db: Sesion de base de datos
session_id: ID de la sesion
Returns:
Diccionario con datos del replay o None si no existe
"""
# Obtener la sesion
result = await db.execute(
select(GameSession).where(GameSession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
return None
# Obtener todos los eventos ordenados por timestamp
events_result = await db.execute(
select(GameEvent)
.where(GameEvent.session_id == session_id)
.order_by(GameEvent.timestamp.asc())
)
events = events_result.scalars().all()
return self.format_replay_for_frontend(session, list(events))
def format_replay_for_frontend(
self,
session: GameSession,
events: List[GameEvent]
) -> dict:
"""
Formatea replay para consumo del frontend.
Args:
session: Sesion de juego
events: Lista de eventos ordenados cronologicamente
Returns:
Diccionario formateado para el frontend
"""
# Formatear eventos para el frontend
formatted_events = []
for event in events:
formatted_event = {
"id": event.id,
"event_type": event.event_type,
"player_name": event.player_name,
"team": event.team,
"question_id": event.question_id,
"answer_given": event.answer_given,
"was_correct": event.was_correct,
"was_steal": event.was_steal,
"points_earned": event.points_earned,
"timestamp": event.timestamp.isoformat() if event.timestamp else None
}
formatted_events.append(formatted_event)
# Determinar ganador
winner = None
if session.status == "finished":
if session.team_a_score > session.team_b_score:
winner = "A"
elif session.team_b_score > session.team_a_score:
winner = "B"
else:
winner = "tie"
# Calcular duracion si la partida termino
duration_seconds = None
if session.finished_at and session.created_at:
duration = session.finished_at - session.created_at
duration_seconds = int(duration.total_seconds())
return {
"metadata": {
"session_id": session.id,
"room_code": session.room_code,
"status": session.status,
"created_at": session.created_at.isoformat() if session.created_at else None,
"finished_at": session.finished_at.isoformat() if session.finished_at else None,
"duration_seconds": duration_seconds
},
"final_scores": {
"team_a": session.team_a_score,
"team_b": session.team_b_score
},
"winner": winner,
"events": formatted_events,
"event_count": len(formatted_events)
}
# Singleton instance
replay_manager = ReplayManager()