Files
WhatsAppCentralizado/services/api-gateway/app/routers/queues.py
2026-01-29 10:56:39 +00:00

244 lines
7.1 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.queue import Queue, QueueAgent
from app.models.quick_reply import QuickReply
from app.schemas.queue import (
QueueCreate, QueueUpdate, QueueResponse, QueueDetailResponse,
QueueAgentAdd, QueueAgentResponse,
QuickReplyCreate, QuickReplyUpdate, QuickReplyResponse
)
router = APIRouter(prefix="/api/queues", tags=["queues"])
def require_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
def require_supervisor(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in [UserRole.ADMIN, UserRole.SUPERVISOR]:
raise HTTPException(status_code=403, detail="Supervisor or Admin required")
return current_user
@router.post("", response_model=QueueResponse)
def create_queue(
request: QueueCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
) -> Queue:
queue = Queue(**request.model_dump())
db.add(queue)
db.commit()
db.refresh(queue)
return queue
@router.get("", response_model=List[QueueResponse])
def list_queues(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> List[QueueResponse]:
queues = db.query(Queue).filter(Queue.is_active == True).all()
result = []
for q in queues:
q_dict = QueueResponse.model_validate(q).model_dump()
q_dict["agent_count"] = len(q.agents)
result.append(QueueResponse(**q_dict))
return result
@router.get("/{queue_id}", response_model=QueueDetailResponse)
def get_queue(
queue_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> QueueDetailResponse:
queue = db.query(Queue).filter(Queue.id == queue_id).first()
if not queue:
raise HTTPException(status_code=404, detail="Queue not found")
agents_data = []
for qa in queue.agents:
agents_data.append(QueueAgentResponse(
id=qa.id,
user_id=qa.user_id,
user_name=qa.user.name if qa.user else None,
user_email=qa.user.email if qa.user else None,
is_supervisor=qa.is_supervisor,
skills=qa.skills or [],
created_at=qa.created_at,
))
return QueueDetailResponse(
**QueueResponse.model_validate(queue).model_dump(),
agents=agents_data,
agent_count=len(agents_data),
)
@router.put("/{queue_id}", response_model=QueueResponse)
def update_queue(
queue_id: UUID,
request: QueueUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
) -> Queue:
queue = db.query(Queue).filter(Queue.id == queue_id).first()
if not queue:
raise HTTPException(status_code=404, detail="Queue not found")
for key, value in request.model_dump(exclude_unset=True).items():
setattr(queue, key, value)
db.commit()
db.refresh(queue)
return queue
@router.delete("/{queue_id}")
def delete_queue(
queue_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
) -> dict:
queue = db.query(Queue).filter(Queue.id == queue_id).first()
if not queue:
raise HTTPException(status_code=404, detail="Queue not found")
queue.is_active = False
db.commit()
return {"success": True}
@router.post("/{queue_id}/agents", response_model=QueueAgentResponse)
def add_agent_to_queue(
queue_id: UUID,
request: QueueAgentAdd,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
) -> QueueAgentResponse:
queue = db.query(Queue).filter(Queue.id == queue_id).first()
if not queue:
raise HTTPException(status_code=404, detail="Queue not found")
existing = db.query(QueueAgent).filter(
QueueAgent.queue_id == queue_id,
QueueAgent.user_id == request.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="Agent already in queue")
qa = QueueAgent(
queue_id=queue_id,
user_id=request.user_id,
is_supervisor=request.is_supervisor,
skills=request.skills,
)
db.add(qa)
db.commit()
db.refresh(qa)
return QueueAgentResponse(
id=qa.id,
user_id=qa.user_id,
user_name=qa.user.name if qa.user else None,
user_email=qa.user.email if qa.user else None,
is_supervisor=qa.is_supervisor,
skills=qa.skills or [],
created_at=qa.created_at,
)
@router.delete("/{queue_id}/agents/{user_id}")
def remove_agent_from_queue(
queue_id: UUID,
user_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
) -> dict:
qa = db.query(QueueAgent).filter(
QueueAgent.queue_id == queue_id,
QueueAgent.user_id == user_id
).first()
if not qa:
raise HTTPException(status_code=404, detail="Agent not in queue")
db.delete(qa)
db.commit()
return {"success": True}
@router.get("/quick-replies", response_model=List[QuickReplyResponse])
def list_quick_replies(
queue_id: UUID = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> List[QuickReply]:
query = db.query(QuickReply)
if queue_id:
query = query.filter(
(QuickReply.queue_id == queue_id) | (QuickReply.queue_id == None)
)
else:
query = query.filter(QuickReply.queue_id == None)
return query.all()
@router.post("/quick-replies", response_model=QuickReplyResponse)
def create_quick_reply(
request: QuickReplyCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_supervisor),
) -> QuickReply:
qr = QuickReply(
**request.model_dump(),
created_by=current_user.id,
)
db.add(qr)
db.commit()
db.refresh(qr)
return qr
@router.put("/quick-replies/{reply_id}", response_model=QuickReplyResponse)
def update_quick_reply(
reply_id: UUID,
request: QuickReplyUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_supervisor),
) -> QuickReply:
qr = db.query(QuickReply).filter(QuickReply.id == reply_id).first()
if not qr:
raise HTTPException(status_code=404, detail="Quick reply not found")
for key, value in request.model_dump(exclude_unset=True).items():
setattr(qr, key, value)
db.commit()
db.refresh(qr)
return qr
@router.delete("/quick-replies/{reply_id}")
def delete_quick_reply(
reply_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_supervisor),
) -> dict:
qr = db.query(QuickReply).filter(QuickReply.id == reply_id).first()
if not qr:
raise HTTPException(status_code=404, detail="Quick reply not found")
db.delete(qr)
db.commit()
return {"success": True}