Files
Trivy/backend/app/api/admin.py
consultoria-as 720432702f feat(phase6): Add sounds, team chat, reactions, monitor, settings, and CSV import/export
Sound System:
- Add soundStore with volume/mute persistence
- Add useSound hook with Web Audio API fallback
- Add SoundControl component for in-game volume adjustment
- Play sounds for correct/incorrect, steal, timer, victory/defeat

Team Chat:
- Add TeamChat component with collapsible panel
- Add team_message WebSocket event (team-only visibility)
- Store up to 50 messages per session

Emoji Reactions:
- Add EmojiReactions bar with 8 emojis
- Add ReactionOverlay with floating animations (Framer Motion)
- Add rate limiting (1 reaction per 3 seconds)
- Broadcast reactions to all players in room

Admin Monitor:
- Add Monitor page showing active rooms from Redis
- Display player counts, team composition, status
- Add ability to close problematic rooms

Admin Settings:
- Add Settings page for game configuration
- Configure points/times by difficulty, steal penalty, max players
- Store config in JSON file with service helpers

CSV Import/Export:
- Add export endpoint with optional filters
- Add import endpoint with validation and error reporting
- Add UI buttons and import result modal in Questions page

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 08:58:33 +00:00

649 lines
20 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import List
import csv
import json
from io import StringIO
from app.models.base import get_db
from app.models.admin import Admin
from app.models.question import Question
from app.models.category import Category
from app.schemas.admin import AdminCreate, Token, TokenData
from app.schemas.question import (
QuestionCreate, QuestionUpdate, QuestionResponse,
AIGenerateRequest
)
from app.services.ai_generator import ai_generator
from app.services.room_manager import room_manager
from app.services.game_config import get_game_settings, update_game_settings
from app.schemas.game_config import GameSettingsSchema
from app.config import get_settings
router = APIRouter()
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.jwt_expire_minutes)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm)
async def get_current_admin(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> Admin:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]
)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(Admin).where(Admin.username == username))
admin = result.scalar_one_or_none()
if admin is None:
raise credentials_exception
return admin
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(Admin).where(Admin.username == form_data.username)
)
admin = result.scalar_one_or_none()
if not admin or not verify_password(form_data.password, admin.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(data={"sub": admin.username})
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=Token)
async def register_admin(
admin_data: AdminCreate,
db: AsyncSession = Depends(get_db)
):
# Check if admin exists
result = await db.execute(
select(Admin).where(Admin.username == admin_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Create admin
admin = Admin(
username=admin_data.username,
password_hash=get_password_hash(admin_data.password)
)
db.add(admin)
await db.commit()
access_token = create_access_token(data={"sub": admin.username})
return {"access_token": access_token, "token_type": "bearer"}
# Question Management
@router.get("/questions", response_model=List[QuestionResponse])
async def get_questions(
category_id: int = None,
status: str = None,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
query = select(Question)
if category_id:
query = query.where(Question.category_id == category_id)
if status:
query = query.where(Question.status == status)
result = await db.execute(query.order_by(Question.created_at.desc()))
return result.scalars().all()
@router.post("/questions", response_model=QuestionResponse)
async def create_question(
question_data: QuestionCreate,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
question = Question(
**question_data.model_dump(),
points=settings.default_points.get(question_data.difficulty, 300),
time_seconds=settings.default_times.get(question_data.difficulty, 25)
)
db.add(question)
await db.commit()
await db.refresh(question)
return question
@router.put("/questions/{question_id}", response_model=QuestionResponse)
async def update_question(
question_id: int,
question_data: QuestionUpdate,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
for key, value in question_data.model_dump(exclude_unset=True).items():
setattr(question, key, value)
await db.commit()
await db.refresh(question)
return question
@router.delete("/questions/{question_id}")
async def delete_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
await db.delete(question)
await db.commit()
return {"status": "deleted"}
@router.post("/questions/generate")
async def generate_questions(
request: AIGenerateRequest,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
# Get category name
result = await db.execute(
select(Category).where(Category.id == request.category_id)
)
category = result.scalar_one_or_none()
if not category:
raise HTTPException(status_code=404, detail="Category not found")
# Generate questions with AI
generated = await ai_generator.generate_questions(
category_name=category.name,
difficulty=request.difficulty,
count=request.count
)
# Save to database as pending
questions = []
for q_data in generated:
question = Question(
category_id=request.category_id,
question_text=q_data["question"],
correct_answer=q_data["correct_answer"],
alt_answers=q_data.get("alt_answers", []),
difficulty=q_data["difficulty"],
points=q_data["points"],
time_seconds=q_data["time_seconds"],
fun_fact=q_data.get("fun_fact"),
status="pending"
)
db.add(question)
questions.append(question)
await db.commit()
return {
"generated": len(questions),
"questions": [q.id for q in questions]
}
@router.post("/questions/{question_id}/approve")
async def approve_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
question.status = "approved"
await db.commit()
return {"status": "approved"}
@router.post("/questions/{question_id}/reject")
async def reject_question(
question_id: int,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Question).where(Question.id == question_id))
question = result.scalar_one_or_none()
if not question:
raise HTTPException(status_code=404, detail="Question not found")
await db.delete(question)
await db.commit()
return {"status": "rejected"}
# Categories
@router.get("/categories")
async def get_categories(
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
result = await db.execute(select(Category))
return result.scalars().all()
@router.post("/categories")
async def create_category(
name: str,
icon: str = None,
color: str = None,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
category = Category(name=name, icon=icon, color=color)
db.add(category)
await db.commit()
await db.refresh(category)
return category
# CSV Import/Export
@router.get("/questions/export")
async def export_questions(
category_id: int = None,
status: str = None,
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
"""
Export questions to CSV format.
Query params: category_id (optional), status (optional)
Returns CSV file as download.
"""
# Build query with filters
query = select(Question, Category.name.label("category_name")).join(
Category, Question.category_id == Category.id
)
if category_id:
query = query.where(Question.category_id == category_id)
if status:
query = query.where(Question.status == status)
result = await db.execute(query.order_by(Question.created_at.desc()))
rows = result.all()
# Create CSV in memory
output = StringIO()
writer = csv.writer(output)
# Write header
writer.writerow([
"category", "question", "correct_answer", "alt_answers",
"difficulty", "fun_fact", "status", "date_active"
])
# Write data rows
for row in rows:
question = row[0]
category_name = row[1]
# Join alt_answers with pipe separator
alt_answers_str = "|".join(question.alt_answers) if question.alt_answers else ""
# Format date_active
date_active_str = question.date_active.isoformat() if question.date_active else ""
writer.writerow([
category_name,
question.question_text,
question.correct_answer,
alt_answers_str,
question.difficulty,
question.fun_fact or "",
question.status,
date_active_str
])
# Prepare response
output.seek(0)
# Generate filename with timestamp
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
filename = f"questions_export_{timestamp}.csv"
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Type": "text/csv; charset=utf-8"
}
)
@router.post("/questions/import")
async def import_questions(
file: UploadFile = File(...),
db: AsyncSession = Depends(get_db),
admin: Admin = Depends(get_current_admin)
):
"""
Import questions from CSV file.
Expected columns: category, question, correct_answer, alt_answers, difficulty, fun_fact
alt_answers should be separated by pipe |
Returns: {imported: count, errors: [{row, error}]}
"""
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(
status_code=400,
detail="File must be a CSV"
)
# Read file content
try:
content = await file.read()
decoded_content = content.decode('utf-8')
except UnicodeDecodeError:
# Try with latin-1 encoding as fallback
try:
decoded_content = content.decode('latin-1')
except:
raise HTTPException(
status_code=400,
detail="Could not decode file. Please use UTF-8 encoding."
)
# Parse CSV
csv_reader = csv.DictReader(StringIO(decoded_content))
# Required columns
required_columns = {"category", "question", "correct_answer", "difficulty"}
# Validate headers
if not csv_reader.fieldnames:
raise HTTPException(
status_code=400,
detail="CSV file is empty or has no headers"
)
headers = set(csv_reader.fieldnames)
missing_columns = required_columns - headers
if missing_columns:
raise HTTPException(
status_code=400,
detail=f"Missing required columns: {', '.join(missing_columns)}"
)
# Get all categories for lookup
categories_result = await db.execute(select(Category))
categories = {cat.name.lower(): cat.id for cat in categories_result.scalars().all()}
imported_count = 0
errors = []
for row_num, row in enumerate(csv_reader, start=2): # Start at 2 (1 is header)
try:
# Get category
category_name = row.get("category", "").strip()
if not category_name:
errors.append({"row": row_num, "error": "Category is required"})
continue
category_id = categories.get(category_name.lower())
if not category_id:
errors.append({"row": row_num, "error": f"Category '{category_name}' not found"})
continue
# Get question text
question_text = row.get("question", "").strip()
if not question_text:
errors.append({"row": row_num, "error": "Question text is required"})
continue
# Get correct answer
correct_answer = row.get("correct_answer", "").strip()
if not correct_answer:
errors.append({"row": row_num, "error": "Correct answer is required"})
continue
# Get difficulty
try:
difficulty = int(row.get("difficulty", "3"))
if difficulty < 1 or difficulty > 5:
errors.append({"row": row_num, "error": "Difficulty must be between 1 and 5"})
continue
except ValueError:
errors.append({"row": row_num, "error": "Difficulty must be a number"})
continue
# Parse alt_answers (pipe separated)
alt_answers_str = row.get("alt_answers", "").strip()
alt_answers = [a.strip() for a in alt_answers_str.split("|") if a.strip()] if alt_answers_str else []
# Get fun_fact (optional)
fun_fact = row.get("fun_fact", "").strip() or None
# Calculate points and time based on difficulty
points = settings.default_points.get(difficulty, 300)
time_seconds = settings.default_times.get(difficulty, 25)
# Create question with pending status
question = Question(
category_id=category_id,
question_text=question_text,
correct_answer=correct_answer,
alt_answers=alt_answers,
difficulty=difficulty,
points=points,
time_seconds=time_seconds,
fun_fact=fun_fact,
status="pending"
)
db.add(question)
imported_count += 1
except Exception as e:
errors.append({"row": row_num, "error": str(e)})
# Commit all valid questions
if imported_count > 0:
await db.commit()
return {
"imported": imported_count,
"errors": errors
}
# Game Settings
@router.get("/settings")
async def get_settings_endpoint(
admin: Admin = Depends(get_current_admin)
):
"""
Get current game settings.
Returns configuration for points, times, steal mechanics, and team limits.
"""
return get_game_settings()
@router.put("/settings")
async def update_settings_endpoint(
settings_data: GameSettingsSchema,
admin: Admin = Depends(get_current_admin)
):
"""
Update game settings.
Expects a complete settings object with all fields.
"""
return update_game_settings(settings_data)
# Room Monitor
async def get_active_rooms_from_redis() -> List[dict]:
"""
Helper function to scan and retrieve all active rooms from Redis.
Returns list of room summaries with player counts and team info.
"""
await room_manager.connect()
rooms = []
cursor = 0
# Scan for all room:* keys
while True:
cursor, keys = await room_manager.redis.scan(cursor, match="room:*", count=100)
for key in keys:
try:
data = await room_manager.redis.get(key)
if data:
room_data = json.loads(data)
# Count players per team
team_a_count = len(room_data.get("teams", {}).get("A", []))
team_b_count = len(room_data.get("teams", {}).get("B", []))
total_players = team_a_count + team_b_count
# Get TTL for time remaining
ttl = await room_manager.redis.ttl(key)
rooms.append({
"room_code": room_data.get("code", ""),
"players_count": total_players,
"teams": {
"A": team_a_count,
"B": team_b_count
},
"status": room_data.get("status", "unknown"),
"host": room_data.get("host", ""),
"ttl_seconds": ttl if ttl > 0 else 0,
"scores": room_data.get("scores", {"A": 0, "B": 0})
})
except (json.JSONDecodeError, Exception):
# Skip malformed room data
continue
if cursor == 0:
break
return rooms
@router.get("/rooms/active")
async def get_active_rooms(
admin: Admin = Depends(get_current_admin)
):
"""
Get list of all active game rooms from Redis.
Returns: list of {room_code, players_count, teams: {A: count, B: count}, status, host, ttl_seconds}
"""
rooms = await get_active_rooms_from_redis()
return {"rooms": rooms, "total": len(rooms)}
@router.delete("/rooms/{room_code}")
async def close_room(
room_code: str,
admin: Admin = Depends(get_current_admin)
):
"""
Close a room by removing it from Redis.
Also removes all player mappings associated with the room.
Socket notifications should be handled by the caller through the socket server.
"""
await room_manager.connect()
# Check if room exists
room_data = await room_manager.get_room(room_code)
if not room_data:
raise HTTPException(status_code=404, detail="Room not found")
# Get all player socket IDs to clean up player mappings
player_sockets = []
for team in ["A", "B"]:
for player in room_data.get("teams", {}).get(team, []):
socket_id = player.get("socket_id")
if socket_id:
player_sockets.append(socket_id)
# Delete player mappings
for socket_id in player_sockets:
await room_manager.redis.delete(f"player:{socket_id}")
# Delete room stats for all players
for team in ["A", "B"]:
for player in room_data.get("teams", {}).get(team, []):
player_name = player.get("name")
if player_name:
await room_manager.redis.delete(f"stats:{room_code}:{player_name}")
# Delete the room itself
await room_manager.redis.delete(f"room:{room_code}")
return {
"status": "closed",
"room_code": room_code,
"players_affected": len(player_sockets)
}