from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from datetime import datetime, timedelta from jose import JWTError, jwt from passlib.context import CryptContext from typing import List import csv import json from io import StringIO from app.models.base import get_db from app.models.admin import Admin from app.models.question import Question from app.models.category import Category from app.schemas.admin import AdminCreate, Token, TokenData from app.schemas.question import ( QuestionCreate, QuestionUpdate, QuestionResponse, AIGenerateRequest ) from app.services.ai_generator import ai_generator from app.services.room_manager import room_manager from app.services.game_config import get_game_settings, update_game_settings from app.schemas.game_config import GameSettingsSchema from app.config import get_settings router = APIRouter() settings = get_settings() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login") def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expire_minutes) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm) async def get_current_admin( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db) ) -> Admin: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.jwt_secret, algorithms=[settings.jwt_algorithm] ) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception result = await db.execute(select(Admin).where(Admin.username == username)) admin = result.scalar_one_or_none() if admin is None: raise credentials_exception return admin @router.post("/login", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db) ): result = await db.execute( select(Admin).where(Admin.username == form_data.username) ) admin = result.scalar_one_or_none() if not admin or not verify_password(form_data.password, admin.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token(data={"sub": admin.username}) return {"access_token": access_token, "token_type": "bearer"} @router.post("/register", response_model=Token) async def register_admin( admin_data: AdminCreate, db: AsyncSession = Depends(get_db) ): # Check if admin exists result = await db.execute( select(Admin).where(Admin.username == admin_data.username) ) if result.scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) # Create admin admin = Admin( username=admin_data.username, password_hash=get_password_hash(admin_data.password) ) db.add(admin) await db.commit() access_token = create_access_token(data={"sub": admin.username}) return {"access_token": access_token, "token_type": "bearer"} # Question Management @router.get("/questions", response_model=List[QuestionResponse]) async def get_questions( category_id: int = None, status: str = None, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): query = select(Question) if category_id: query = query.where(Question.category_id == category_id) if status: query = query.where(Question.status == status) result = await db.execute(query.order_by(Question.created_at.desc())) return result.scalars().all() @router.post("/questions", response_model=QuestionResponse) async def create_question( question_data: QuestionCreate, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): question = Question( **question_data.model_dump(), points=settings.default_points.get(question_data.difficulty, 300), time_seconds=settings.default_times.get(question_data.difficulty, 25) ) db.add(question) await db.commit() await db.refresh(question) return question @router.put("/questions/{question_id}", response_model=QuestionResponse) async def update_question( question_id: int, question_data: QuestionUpdate, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): result = await db.execute(select(Question).where(Question.id == question_id)) question = result.scalar_one_or_none() if not question: raise HTTPException(status_code=404, detail="Question not found") for key, value in question_data.model_dump(exclude_unset=True).items(): setattr(question, key, value) await db.commit() await db.refresh(question) return question @router.delete("/questions/{question_id}") async def delete_question( question_id: int, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): result = await db.execute(select(Question).where(Question.id == question_id)) question = result.scalar_one_or_none() if not question: raise HTTPException(status_code=404, detail="Question not found") await db.delete(question) await db.commit() return {"status": "deleted"} @router.post("/questions/generate") async def generate_questions( request: AIGenerateRequest, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): # Get category name result = await db.execute( select(Category).where(Category.id == request.category_id) ) category = result.scalar_one_or_none() if not category: raise HTTPException(status_code=404, detail="Category not found") # Generate questions with AI generated = await ai_generator.generate_questions( category_name=category.name, difficulty=request.difficulty, count=request.count ) # Save to database as pending questions = [] for q_data in generated: question = Question( category_id=request.category_id, question_text=q_data["question"], correct_answer=q_data["correct_answer"], alt_answers=q_data.get("alt_answers", []), difficulty=q_data["difficulty"], points=q_data["points"], time_seconds=q_data["time_seconds"], fun_fact=q_data.get("fun_fact"), status="pending" ) db.add(question) questions.append(question) await db.commit() return { "generated": len(questions), "questions": [q.id for q in questions] } @router.post("/questions/{question_id}/approve") async def approve_question( question_id: int, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): result = await db.execute(select(Question).where(Question.id == question_id)) question = result.scalar_one_or_none() if not question: raise HTTPException(status_code=404, detail="Question not found") question.status = "approved" await db.commit() return {"status": "approved"} @router.post("/questions/{question_id}/reject") async def reject_question( question_id: int, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): result = await db.execute(select(Question).where(Question.id == question_id)) question = result.scalar_one_or_none() if not question: raise HTTPException(status_code=404, detail="Question not found") await db.delete(question) await db.commit() return {"status": "rejected"} # Categories @router.get("/categories") async def get_categories( db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): result = await db.execute(select(Category)) return result.scalars().all() @router.post("/categories") async def create_category( name: str, icon: str = None, color: str = None, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): category = Category(name=name, icon=icon, color=color) db.add(category) await db.commit() await db.refresh(category) return category # CSV Import/Export @router.get("/questions/export") async def export_questions( category_id: int = None, status: str = None, db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): """ Export questions to CSV format. Query params: category_id (optional), status (optional) Returns CSV file as download. """ # Build query with filters query = select(Question, Category.name.label("category_name")).join( Category, Question.category_id == Category.id ) if category_id: query = query.where(Question.category_id == category_id) if status: query = query.where(Question.status == status) result = await db.execute(query.order_by(Question.created_at.desc())) rows = result.all() # Create CSV in memory output = StringIO() writer = csv.writer(output) # Write header writer.writerow([ "category", "question", "correct_answer", "alt_answers", "difficulty", "fun_fact", "status", "date_active" ]) # Write data rows for row in rows: question = row[0] category_name = row[1] # Join alt_answers with pipe separator alt_answers_str = "|".join(question.alt_answers) if question.alt_answers else "" # Format date_active date_active_str = question.date_active.isoformat() if question.date_active else "" writer.writerow([ category_name, question.question_text, question.correct_answer, alt_answers_str, question.difficulty, question.fun_fact or "", question.status, date_active_str ]) # Prepare response output.seek(0) # Generate filename with timestamp timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") filename = f"questions_export_{timestamp}.csv" return StreamingResponse( iter([output.getvalue()]), media_type="text/csv; charset=utf-8", headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Type": "text/csv; charset=utf-8" } ) @router.post("/questions/import") async def import_questions( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), admin: Admin = Depends(get_current_admin) ): """ Import questions from CSV file. Expected columns: category, question, correct_answer, alt_answers, difficulty, fun_fact alt_answers should be separated by pipe | Returns: {imported: count, errors: [{row, error}]} """ # Validate file type if not file.filename.endswith('.csv'): raise HTTPException( status_code=400, detail="File must be a CSV" ) # Read file content try: content = await file.read() decoded_content = content.decode('utf-8') except UnicodeDecodeError: # Try with latin-1 encoding as fallback try: decoded_content = content.decode('latin-1') except: raise HTTPException( status_code=400, detail="Could not decode file. Please use UTF-8 encoding." ) # Parse CSV csv_reader = csv.DictReader(StringIO(decoded_content)) # Required columns required_columns = {"category", "question", "correct_answer", "difficulty"} # Validate headers if not csv_reader.fieldnames: raise HTTPException( status_code=400, detail="CSV file is empty or has no headers" ) headers = set(csv_reader.fieldnames) missing_columns = required_columns - headers if missing_columns: raise HTTPException( status_code=400, detail=f"Missing required columns: {', '.join(missing_columns)}" ) # Get all categories for lookup categories_result = await db.execute(select(Category)) categories = {cat.name.lower(): cat.id for cat in categories_result.scalars().all()} imported_count = 0 errors = [] for row_num, row in enumerate(csv_reader, start=2): # Start at 2 (1 is header) try: # Get category category_name = row.get("category", "").strip() if not category_name: errors.append({"row": row_num, "error": "Category is required"}) continue category_id = categories.get(category_name.lower()) if not category_id: errors.append({"row": row_num, "error": f"Category '{category_name}' not found"}) continue # Get question text question_text = row.get("question", "").strip() if not question_text: errors.append({"row": row_num, "error": "Question text is required"}) continue # Get correct answer correct_answer = row.get("correct_answer", "").strip() if not correct_answer: errors.append({"row": row_num, "error": "Correct answer is required"}) continue # Get difficulty try: difficulty = int(row.get("difficulty", "3")) if difficulty < 1 or difficulty > 5: errors.append({"row": row_num, "error": "Difficulty must be between 1 and 5"}) continue except ValueError: errors.append({"row": row_num, "error": "Difficulty must be a number"}) continue # Parse alt_answers (pipe separated) alt_answers_str = row.get("alt_answers", "").strip() alt_answers = [a.strip() for a in alt_answers_str.split("|") if a.strip()] if alt_answers_str else [] # Get fun_fact (optional) fun_fact = row.get("fun_fact", "").strip() or None # Calculate points and time based on difficulty points = settings.default_points.get(difficulty, 300) time_seconds = settings.default_times.get(difficulty, 25) # Create question with pending status question = Question( category_id=category_id, question_text=question_text, correct_answer=correct_answer, alt_answers=alt_answers, difficulty=difficulty, points=points, time_seconds=time_seconds, fun_fact=fun_fact, status="pending" ) db.add(question) imported_count += 1 except Exception as e: errors.append({"row": row_num, "error": str(e)}) # Commit all valid questions if imported_count > 0: await db.commit() return { "imported": imported_count, "errors": errors } # Game Settings @router.get("/settings") async def get_settings_endpoint( admin: Admin = Depends(get_current_admin) ): """ Get current game settings. Returns configuration for points, times, steal mechanics, and team limits. """ return get_game_settings() @router.put("/settings") async def update_settings_endpoint( settings_data: GameSettingsSchema, admin: Admin = Depends(get_current_admin) ): """ Update game settings. Expects a complete settings object with all fields. """ return update_game_settings(settings_data) # Room Monitor async def get_active_rooms_from_redis() -> List[dict]: """ Helper function to scan and retrieve all active rooms from Redis. Returns list of room summaries with player counts and team info. """ await room_manager.connect() rooms = [] cursor = 0 # Scan for all room:* keys while True: cursor, keys = await room_manager.redis.scan(cursor, match="room:*", count=100) for key in keys: try: data = await room_manager.redis.get(key) if data: room_data = json.loads(data) # Count players per team team_a_count = len(room_data.get("teams", {}).get("A", [])) team_b_count = len(room_data.get("teams", {}).get("B", [])) total_players = team_a_count + team_b_count # Get TTL for time remaining ttl = await room_manager.redis.ttl(key) rooms.append({ "room_code": room_data.get("code", ""), "players_count": total_players, "teams": { "A": team_a_count, "B": team_b_count }, "status": room_data.get("status", "unknown"), "host": room_data.get("host", ""), "ttl_seconds": ttl if ttl > 0 else 0, "scores": room_data.get("scores", {"A": 0, "B": 0}) }) except (json.JSONDecodeError, Exception): # Skip malformed room data continue if cursor == 0: break return rooms @router.get("/rooms/active") async def get_active_rooms( admin: Admin = Depends(get_current_admin) ): """ Get list of all active game rooms from Redis. Returns: list of {room_code, players_count, teams: {A: count, B: count}, status, host, ttl_seconds} """ rooms = await get_active_rooms_from_redis() return {"rooms": rooms, "total": len(rooms)} @router.delete("/rooms/{room_code}") async def close_room( room_code: str, admin: Admin = Depends(get_current_admin) ): """ Close a room by removing it from Redis. Also removes all player mappings associated with the room. Socket notifications should be handled by the caller through the socket server. """ await room_manager.connect() # Check if room exists room_data = await room_manager.get_room(room_code) if not room_data: raise HTTPException(status_code=404, detail="Room not found") # Get all player socket IDs to clean up player mappings player_sockets = [] for team in ["A", "B"]: for player in room_data.get("teams", {}).get(team, []): socket_id = player.get("socket_id") if socket_id: player_sockets.append(socket_id) # Delete player mappings for socket_id in player_sockets: await room_manager.redis.delete(f"player:{socket_id}") # Delete room stats for all players for team in ["A", "B"]: for player in room_data.get("teams", {}).get(team, []): player_name = player.get("name") if player_name: await room_manager.redis.delete(f"stats:{room_code}:{player_name}") # Delete the room itself await room_manager.redis.delete(f"room:{room_code}") return { "status": "closed", "room_code": room_code, "players_affected": len(player_sockets) }