feat(phase2): Add achievements and replay systems

Achievement System:
- Add Achievement model with condition types (streak, steal, specialist, etc.)
- Add AchievementManager service for tracking and awarding achievements
- Add Pydantic schemas for achievements (AchievementResponse, PlayerStats, etc.)
- Seed 18 achievements from design doc
- Add GET /api/game/achievements endpoint

Replay System:
- Add ReplayManager service for saving/loading game replays
- Add GET /api/replay/{code} and /api/replay/session/{id} endpoints
- Format replays for frontend consumption

Phase 2 tasks completed:
- F2.1: Achievement model and migration
- F2.2: Pydantic schemas
- F2.3: AchievementManager service
- F2.4: ReplayManager service
- F2.5: API endpoints
- F2.6: Seed 18 achievements data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-26 08:24:02 +00:00
parent b3fab9f8df
commit 27ac4cb0cf
12 changed files with 804 additions and 131 deletions

View File

@@ -2,5 +2,14 @@ from app.services.ai_validator import AIValidator
from app.services.ai_generator import AIGenerator
from app.services.game_manager import GameManager
from app.services.room_manager import RoomManager
from app.services.replay_manager import ReplayManager
from app.services.achievement_manager import AchievementManager
__all__ = ["AIValidator", "AIGenerator", "GameManager", "RoomManager"]
__all__ = [
"AIValidator",
"AIGenerator",
"GameManager",
"RoomManager",
"ReplayManager",
"AchievementManager",
]

View File

@@ -0,0 +1,245 @@
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.achievement import Achievement
from app.schemas.achievement import PlayerStats, AchievementUnlock, AchievementResponse
class AchievementManager:
"""Servicio para detectar y otorgar logros a jugadores durante las partidas."""
async def get_all_achievements(self, db: AsyncSession) -> List[Achievement]:
"""Obtiene todos los logros disponibles de la base de datos."""
result = await db.execute(select(Achievement))
return list(result.scalars().all())
async def check_achievements(
self,
db: AsyncSession,
player_stats: PlayerStats,
game_context: dict
) -> List[AchievementUnlock]:
"""
Verifica que logros ha desbloqueado el jugador.
Args:
db: Sesion de base de datos
player_stats: Estadisticas actuales del jugador
game_context: {
'won': bool,
'team_score': int,
'opponent_score': int,
'max_deficit_overcome': int, # para comeback
'categories_swept': List[int], # IDs de categorias con 5/5
'no_mistakes': bool, # para perfect_game
}
Returns:
Lista de logros desbloqueados
"""
achievements = await self.get_all_achievements(db)
unlocked: List[AchievementUnlock] = []
now = datetime.utcnow()
for achievement in achievements:
is_unlocked = self._evaluate_condition(
achievement=achievement,
stats=player_stats,
game_context=game_context
)
if is_unlocked:
achievement_response = AchievementResponse(
id=achievement.id,
name=achievement.name,
description=achievement.description,
icon=achievement.icon,
condition_type=achievement.condition_type,
condition_value=achievement.condition_value,
category_id=achievement.category_id,
created_at=achievement.created_at
)
unlocked.append(AchievementUnlock(
player_name=player_stats.player_name,
achievement=achievement_response,
unlocked_at=now
))
return unlocked
def _evaluate_condition(
self,
achievement: Achievement,
stats: PlayerStats,
game_context: dict
) -> bool:
"""Evalua si se cumple la condicion de un logro."""
condition_type = achievement.condition_type
condition_value = achievement.condition_value
if condition_type == "first_win":
return game_context.get("won", False)
elif condition_type == "streak":
return self._check_streak(stats, condition_value)
elif condition_type == "steal_success":
return self._check_steal_success(stats, condition_value)
elif condition_type == "category_specialist":
if achievement.category_id is not None:
return self._check_category_specialist(
stats, achievement.category_id, condition_value
)
return False
elif condition_type == "perfect_game":
return (
game_context.get("won", False) and
game_context.get("no_mistakes", False)
)
elif condition_type == "fast_answer":
return self._check_fast_answer(stats, condition_value)
elif condition_type == "comeback":
return (
game_context.get("won", False) and
game_context.get("max_deficit_overcome", 0) >= condition_value
)
elif condition_type == "category_sweep":
categories_swept = game_context.get("categories_swept", [])
return len(categories_swept) >= condition_value
elif condition_type == "high_stakes":
return self._check_high_stakes(stats, condition_value)
return False
def _check_streak(self, stats: PlayerStats, required: int) -> bool:
"""Verifica racha de respuestas correctas."""
return stats.current_streak >= required
def _check_steal_success(self, stats: PlayerStats, required: int) -> bool:
"""Verifica robos exitosos."""
return stats.successful_steals >= required
def _check_category_specialist(
self,
stats: PlayerStats,
category_id: int,
required: int
) -> bool:
"""Verifica especialista de categoria."""
return stats.category_correct.get(category_id, 0) >= required
def _check_fast_answer(self, stats: PlayerStats, max_seconds: int) -> bool:
"""Verifica respuesta rapida."""
if stats.fastest_answer_seconds is None:
return False
return stats.fastest_answer_seconds <= max_seconds
def _check_high_stakes(self, stats: PlayerStats, required: int) -> bool:
"""Verifica preguntas de 500 pts correctas."""
return stats.questions_500_correct >= required
def update_stats_on_answer(
self,
stats: PlayerStats,
was_correct: bool,
was_steal: bool,
category_id: int,
points: int,
answer_time_seconds: float
) -> PlayerStats:
"""
Actualiza estadisticas despues de una respuesta.
Args:
stats: Estadisticas actuales del jugador
was_correct: Si la respuesta fue correcta
was_steal: Si fue un intento de robo
category_id: ID de la categoria de la pregunta
points: Puntos de la pregunta
answer_time_seconds: Tiempo de respuesta en segundos
Returns:
PlayerStats actualizado
"""
# Crear copia mutable de las estadisticas
updated_category_correct = dict(stats.category_correct)
if was_correct:
# Actualizar racha
new_streak = stats.current_streak + 1
new_total_correct = stats.total_correct + 1
# Actualizar correctas por categoria
updated_category_correct[category_id] = (
updated_category_correct.get(category_id, 0) + 1
)
# Actualizar tiempo mas rapido
new_fastest = stats.fastest_answer_seconds
if new_fastest is None or answer_time_seconds < new_fastest:
new_fastest = answer_time_seconds
# Actualizar preguntas de 500 pts
new_500_correct = stats.questions_500_correct
if points == 500:
new_500_correct += 1
# Actualizar robos exitosos
new_successful_steals = stats.successful_steals
new_total_steals = stats.total_steals
if was_steal:
new_successful_steals += 1
new_total_steals += 1
return PlayerStats(
player_name=stats.player_name,
current_streak=new_streak,
total_correct=new_total_correct,
total_steals=new_total_steals,
successful_steals=new_successful_steals,
category_correct=updated_category_correct,
fastest_answer_seconds=new_fastest,
questions_500_correct=new_500_correct
)
else:
# Respuesta incorrecta - resetear racha
new_total_steals = stats.total_steals
if was_steal:
new_total_steals += 1
return PlayerStats(
player_name=stats.player_name,
current_streak=0,
total_correct=stats.total_correct,
total_steals=new_total_steals,
successful_steals=stats.successful_steals,
category_correct=updated_category_correct,
fastest_answer_seconds=stats.fastest_answer_seconds,
questions_500_correct=stats.questions_500_correct
)
def create_initial_stats(self, player_name: str) -> PlayerStats:
"""Crea estadisticas iniciales para un jugador."""
return PlayerStats(
player_name=player_name,
current_streak=0,
total_correct=0,
total_steals=0,
successful_steals=0,
category_correct={},
fastest_answer_seconds=None,
questions_500_correct=0
)
# Singleton instance
achievement_manager = AchievementManager()

View File

@@ -0,0 +1,219 @@
import hashlib
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.game_session import GameSession
from app.models.game_event import GameEvent
class ReplayManager:
def generate_replay_code(self, session_id: int) -> str:
"""
Genera codigo unico de replay basado en session_id.
Args:
session_id: ID de la sesion de juego
Returns:
Codigo de replay de 8 caracteres
"""
# Usar hash corto del session_id + timestamp actual
timestamp = datetime.utcnow().isoformat()
data = f"{session_id}:{timestamp}"
hash_digest = hashlib.sha256(data.encode()).hexdigest()
# Retornar los primeros 8 caracteres en mayusculas
return hash_digest[:8].upper()
async def save_game_event(
self,
db: AsyncSession,
session_id: int,
event_type: str,
player_name: str,
team: str,
question_id: Optional[int] = None,
answer_given: Optional[str] = None,
was_correct: Optional[bool] = None,
was_steal: bool = False,
points_earned: int = 0
) -> GameEvent:
"""
Guarda un evento de juego para replay.
Args:
db: Sesion de base de datos
session_id: ID de la sesion de juego
event_type: Tipo de evento (question_selected, answer_submitted,
steal_attempted, steal_passed, game_finished)
player_name: Nombre del jugador
team: Equipo ('A' o 'B')
question_id: ID de la pregunta (opcional)
answer_given: Respuesta dada por el jugador (opcional)
was_correct: Si la respuesta fue correcta (opcional)
was_steal: Si fue un intento de robo
points_earned: Puntos ganados
Returns:
GameEvent creado
"""
event = GameEvent(
session_id=session_id,
event_type=event_type,
player_name=player_name,
team=team,
question_id=question_id,
answer_given=answer_given,
was_correct=was_correct,
was_steal=was_steal,
points_earned=points_earned
)
db.add(event)
await db.commit()
await db.refresh(event)
return event
async def get_replay(
self,
db: AsyncSession,
replay_code: str
) -> Optional[dict]:
"""
Obtiene replay completo por codigo.
Args:
db: Sesion de base de datos
replay_code: Codigo del replay
Returns:
Diccionario con datos del replay o None si no existe
"""
# Buscar session por room_code que contiene el replay_code
# El replay_code puede estar almacenado en room_code o como sufijo
result = await db.execute(
select(GameSession).where(
GameSession.room_code.contains(replay_code)
)
)
session = result.scalar_one_or_none()
if not session:
# Intentar busqueda exacta
result = await db.execute(
select(GameSession).where(
GameSession.room_code == replay_code
)
)
session = result.scalar_one_or_none()
if not session:
return None
return await self.get_replay_by_session(db, session.id)
async def get_replay_by_session(
self,
db: AsyncSession,
session_id: int
) -> Optional[dict]:
"""
Obtiene replay por session_id.
Args:
db: Sesion de base de datos
session_id: ID de la sesion
Returns:
Diccionario con datos del replay o None si no existe
"""
# Obtener la sesion
result = await db.execute(
select(GameSession).where(GameSession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
return None
# Obtener todos los eventos ordenados por timestamp
events_result = await db.execute(
select(GameEvent)
.where(GameEvent.session_id == session_id)
.order_by(GameEvent.timestamp.asc())
)
events = events_result.scalars().all()
return self.format_replay_for_frontend(session, list(events))
def format_replay_for_frontend(
self,
session: GameSession,
events: List[GameEvent]
) -> dict:
"""
Formatea replay para consumo del frontend.
Args:
session: Sesion de juego
events: Lista de eventos ordenados cronologicamente
Returns:
Diccionario formateado para el frontend
"""
# Formatear eventos para el frontend
formatted_events = []
for event in events:
formatted_event = {
"id": event.id,
"event_type": event.event_type,
"player_name": event.player_name,
"team": event.team,
"question_id": event.question_id,
"answer_given": event.answer_given,
"was_correct": event.was_correct,
"was_steal": event.was_steal,
"points_earned": event.points_earned,
"timestamp": event.timestamp.isoformat() if event.timestamp else None
}
formatted_events.append(formatted_event)
# Determinar ganador
winner = None
if session.status == "finished":
if session.team_a_score > session.team_b_score:
winner = "A"
elif session.team_b_score > session.team_a_score:
winner = "B"
else:
winner = "tie"
# Calcular duracion si la partida termino
duration_seconds = None
if session.finished_at and session.created_at:
duration = session.finished_at - session.created_at
duration_seconds = int(duration.total_seconds())
return {
"metadata": {
"session_id": session.id,
"room_code": session.room_code,
"status": session.status,
"created_at": session.created_at.isoformat() if session.created_at else None,
"finished_at": session.finished_at.isoformat() if session.finished_at else None,
"duration_seconds": duration_seconds
},
"final_scores": {
"team_a": session.team_a_score,
"team_b": session.team_b_score
},
"winner": winner,
"events": formatted_events,
"event_count": len(formatted_events)
}
# Singleton instance
replay_manager = ReplayManager()