import json import random import string import asyncio from typing import Optional import redis.asyncio as redis from app.config import get_settings settings = get_settings() # Lock timeout in seconds LOCK_TIMEOUT = 5 LOCK_RETRY_DELAY = 0.05 # 50ms class RoomManager: def __init__(self): self.redis: Optional[redis.Redis] = None async def connect(self): if not self.redis: self.redis = await redis.from_url(settings.redis_url) async def disconnect(self): if self.redis: await self.redis.close() async def _acquire_lock(self, room_code: str, timeout: float = LOCK_TIMEOUT) -> bool: """Acquire a lock for room operations.""" await self.connect() lock_key = f"lock:room:{room_code}" # Try to acquire lock with NX (only if not exists) and EX (expire) acquired = await self.redis.set(lock_key, "1", nx=True, ex=int(timeout)) return acquired is not None async def _release_lock(self, room_code: str): """Release a room lock.""" await self.connect() lock_key = f"lock:room:{room_code}" await self.redis.delete(lock_key) async def _with_lock(self, room_code: str, operation, max_retries: int = 20): """Execute an operation with a room lock.""" for attempt in range(max_retries): if await self._acquire_lock(room_code): try: return await operation() finally: await self._release_lock(room_code) await asyncio.sleep(LOCK_RETRY_DELAY) raise Exception(f"Could not acquire lock for room {room_code}") def _generate_room_code(self) -> str: """Generate a 6-character room code.""" return ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) async def create_room(self, player_name: str, socket_id: str) -> dict: """Create a new game room.""" await self.connect() # Generate unique room code room_code = self._generate_room_code() while await self.redis.exists(f"room:{room_code}"): room_code = self._generate_room_code() # Create room state room_state = { "code": room_code, "status": "waiting", "host": player_name, "teams": { "A": [], "B": [] }, "current_team": None, "current_player_index": {"A": 0, "B": 0}, "current_question": None, "can_steal": False, "scores": {"A": 0, "B": 0}, "questions_used": [], "board": {} } # Save room state await self.redis.setex( f"room:{room_code}", 3600 * 3, # 3 hours TTL json.dumps(room_state) ) # Add player to room room = await self.add_player(room_code, player_name, "A", socket_id) return room async def get_room(self, room_code: str) -> Optional[dict]: """Get room state by code.""" await self.connect() data = await self.redis.get(f"room:{room_code}") if data: return json.loads(data) return None async def update_room(self, room_code: str, room_state: dict) -> bool: """Update room state.""" await self.connect() await self.redis.setex( f"room:{room_code}", 3600 * 3, json.dumps(room_state) ) return True async def add_player( self, room_code: str, player_name: str, team: str, socket_id: str ) -> Optional[dict]: """Add a player to a room (with lock to prevent race conditions).""" await self.connect() async def _do_add_player(): room = await self.get_room(room_code) if not room: return None # Check if team is full if len(room["teams"][team]) >= 4: return None # Check if name is taken for t in ["A", "B"]: for p in room["teams"][t]: if p["name"].lower() == player_name.lower(): return None # Add player player = { "name": player_name, "team": team, "position": len(room["teams"][team]), "socket_id": socket_id } room["teams"][team].append(player) # Save player mapping await self.redis.setex( f"player:{socket_id}", 3600 * 3, json.dumps({"name": player_name, "room": room_code, "team": team}) ) await self.update_room(room_code, room) return room try: return await self._with_lock(room_code, _do_add_player) except Exception as e: print(f"Error adding player: {e}") return None async def remove_player(self, socket_id: str) -> Optional[dict]: """Remove a player from their room (with lock).""" await self.connect() # Get player info first (outside lock) player_data = await self.redis.get(f"player:{socket_id}") if not player_data: return None player_info = json.loads(player_data) room_code = player_info["room"] async def _do_remove_player(): # Get room room = await self.get_room(room_code) if not room: return None # Remove player from both teams (in case of inconsistency) for t in ["A", "B"]: room["teams"][t] = [ p for p in room["teams"][t] if p["socket_id"] != socket_id ] # Update positions for i, p in enumerate(room["teams"][t]): p["position"] = i # Delete player mapping await self.redis.delete(f"player:{socket_id}") # If room is empty, delete it if not room["teams"]["A"] and not room["teams"]["B"]: await self.redis.delete(f"room:{room_code}") return None await self.update_room(room_code, room) return room try: return await self._with_lock(room_code, _do_remove_player) except Exception as e: print(f"Error removing player: {e}") return None async def change_player_team( self, room_code: str, player_name: str, socket_id: str, new_team: str ) -> Optional[dict]: """Change a player's team (with lock).""" await self.connect() async def _do_change_team(): room = await self.get_room(room_code) if not room: return None # Find current team current_team = None for t in ["A", "B"]: for p in room["teams"][t]: if p["name"] == player_name: current_team = t break if current_team: break # If already on target team, just return room if current_team == new_team: return room # Check if target team is full if len(room["teams"][new_team]) >= 4: return None # Remove from both teams (safety) for t in ["A", "B"]: room["teams"][t] = [p for p in room["teams"][t] if p["name"] != player_name] # Add to new team player = { "name": player_name, "team": new_team, "position": len(room["teams"][new_team]), "socket_id": socket_id } room["teams"][new_team].append(player) # Update positions in both teams for t in ["A", "B"]: for i, p in enumerate(room["teams"][t]): p["position"] = i # Update player record await self.redis.setex( f"player:{socket_id}", 3600 * 3, json.dumps({"name": player_name, "room": room_code, "team": new_team}) ) await self.update_room(room_code, room) return room try: return await self._with_lock(room_code, _do_change_team) except Exception as e: print(f"Error changing team: {e}") return None async def get_player(self, socket_id: str) -> Optional[dict]: """Get player info by socket ID.""" await self.connect() data = await self.redis.get(f"player:{socket_id}") if data: return json.loads(data) return None async def update_player(self, socket_id: str, updates: dict) -> Optional[dict]: """Update player info.""" await self.connect() data = await self.redis.get(f"player:{socket_id}") if not data: return None player = json.loads(data) player.update(updates) await self.redis.setex( f"player:{socket_id}", 3600 * 3, json.dumps(player) ) return player async def get_player_stats(self, room_code: str, player_name: str) -> Optional[dict]: """Obtiene stats de un jugador.""" await self.connect() data = await self.redis.get(f"stats:{room_code}:{player_name}") if data: return json.loads(data) return None async def set_player_stats(self, room_code: str, player_name: str, stats: dict) -> None: """Guarda stats de un jugador.""" await self.connect() await self.redis.setex( f"stats:{room_code}:{player_name}", 3600 * 3, json.dumps(stats) ) async def init_player_stats(self, room_code: str, player_name: str) -> dict: """Inicializa stats para un nuevo jugador.""" stats = { "player_name": player_name, "current_streak": 0, "total_correct": 0, "total_steals": 0, "successful_steals": 0, "category_correct": {}, "fastest_answer_seconds": None, "questions_500_correct": 0 } await self.set_player_stats(room_code, player_name, stats) return stats # Singleton instance room_manager = RoomManager()