from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from app.core.database import get_db from app.core.security import ( verify_password, get_password_hash, create_access_token, create_refresh_token, decode_token, get_current_user, ) from app.models.user import User, UserRole from app.schemas.auth import ( LoginRequest, LoginResponse, RefreshRequest, TokenResponse, UserResponse, CreateUserRequest, ) router = APIRouter(prefix="/auth", tags=["auth"]) @router.post("/login", response_model=LoginResponse) def login(request: LoginRequest, db: Session = Depends(get_db)): user = db.query(User).filter(User.email == request.email).first() if not user or not verify_password(request.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password", ) if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is disabled", ) access_token = create_access_token(data={"sub": str(user.id)}) refresh_token = create_refresh_token(data={"sub": str(user.id)}) return LoginResponse( access_token=access_token, refresh_token=refresh_token, user=UserResponse.model_validate(user), ) @router.post("/refresh", response_model=TokenResponse) def refresh_token(request: RefreshRequest, db: Session = Depends(get_db)): payload = decode_token(request.refresh_token) if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid token type") user_id = payload.get("sub") user = db.query(User).filter(User.id == user_id).first() if not user or not user.is_active: raise HTTPException(status_code=401, detail="User not found") access_token = create_access_token(data={"sub": str(user.id)}) new_refresh_token = create_refresh_token(data={"sub": str(user.id)}) return TokenResponse(access_token=access_token, refresh_token=new_refresh_token) @router.get("/me", response_model=UserResponse) def get_me(current_user: User = Depends(get_current_user)): return UserResponse.model_validate(current_user) @router.post("/register", response_model=UserResponse) def register_first_admin(request: CreateUserRequest, db: Session = Depends(get_db)): # Only allow if no users exist (first admin) user_count = db.query(User).count() if user_count > 0: raise HTTPException(status_code=403, detail="Registration disabled") existing = db.query(User).filter(User.email == request.email).first() if existing: raise HTTPException(status_code=400, detail="Email already registered") user = User( email=request.email, password_hash=get_password_hash(request.password), name=request.name, role=UserRole.ADMIN, ) db.add(user) db.commit() db.refresh(user) return UserResponse.model_validate(user)