Files
WhatsAppCentralizado/services/api-gateway/app/routers/whatsapp.py
2026-01-29 10:56:35 +00:00

451 lines
15 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
import httpx
from pydantic import BaseModel
from app.core.database import get_db
from app.core.config import get_settings
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.whatsapp import (
WhatsAppAccount, Contact, Conversation, Message,
AccountStatus, MessageDirection, MessageType, MessageStatus, ConversationStatus
)
from app.schemas.whatsapp import (
WhatsAppAccountCreate, WhatsAppAccountResponse,
ConversationResponse, ConversationDetailResponse,
SendMessageRequest, MessageResponse, InternalEventRequest,
TransferToQueueRequest, TransferToAgentRequest,
ResolveConversationRequest, InternalNoteRequest
)
from app.services.assignment import AssignmentService
router = APIRouter(prefix="/api/whatsapp", tags=["whatsapp"])
settings = get_settings()
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.post("/accounts", response_model=WhatsAppAccountResponse)
async def create_account(
request: WhatsAppAccountCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
account = WhatsAppAccount(name=request.name)
db.add(account)
db.commit()
db.refresh(account)
# Start session in WhatsApp Core
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions",
json={"accountId": str(account.id), "name": account.name},
timeout=30,
)
if response.status_code == 200:
data = response.json()
account.qr_code = data.get("qrCode")
account.status = AccountStatus(data.get("status", "connecting"))
db.commit()
except Exception:
pass
return account
@router.get("/accounts", response_model=List[WhatsAppAccountResponse])
def list_accounts(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
accounts = db.query(WhatsAppAccount).all()
return accounts
@router.get("/accounts/{account_id}", response_model=WhatsAppAccountResponse)
async def get_account(
account_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first()
if not account:
raise HTTPException(status_code=404, detail="Account not found")
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}",
timeout=10,
)
if response.status_code == 200:
data = response.json()
account.qr_code = data.get("qrCode")
account.status = AccountStatus(data.get("status", "disconnected"))
account.phone_number = data.get("phoneNumber")
db.commit()
except Exception:
pass
return account
@router.delete("/accounts/{account_id}")
async def delete_account(
account_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first()
if not account:
raise HTTPException(status_code=404, detail="Account not found")
async with httpx.AsyncClient() as client:
try:
await client.delete(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}",
timeout=10,
)
except Exception:
pass
db.delete(account)
db.commit()
return {"success": True}
@router.get("/conversations", response_model=List[ConversationResponse])
def list_conversations(
status: ConversationStatus = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(Conversation)
if status:
query = query.filter(Conversation.status == status)
conversations = query.order_by(Conversation.last_message_at.desc()).limit(50).all()
return conversations
@router.get("/conversations/{conversation_id}", response_model=ConversationDetailResponse)
def get_conversation(
conversation_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation
@router.post("/conversations/{conversation_id}/messages", response_model=MessageResponse)
async def send_message(
conversation_id: UUID,
request: SendMessageRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=request.type,
content=request.content,
media_url=request.media_url,
sent_by=current_user.id,
status=MessageStatus.PENDING,
)
db.add(message)
db.commit()
db.refresh(message)
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
json={
"to": conversation.contact.phone_number,
"type": request.type.value,
"content": {"text": request.content} if request.type == MessageType.TEXT else {
"url": request.media_url,
"caption": request.content,
},
},
timeout=30,
)
if response.status_code == 200:
data = response.json()
message.whatsapp_message_id = data.get("messageId")
message.status = MessageStatus.SENT
else:
message.status = MessageStatus.FAILED
except Exception:
message.status = MessageStatus.FAILED
db.commit()
db.refresh(message)
return message
# Internal endpoint for WhatsApp Core events
@router.post("/internal/whatsapp/event")
async def handle_whatsapp_event(
event: InternalEventRequest,
db: Session = Depends(get_db),
):
account = db.query(WhatsAppAccount).filter(
WhatsAppAccount.id == event.accountId
).first()
if not account:
return {"status": "ignored", "reason": "account not found"}
if event.type == "qr":
account.qr_code = event.data.get("qrCode")
account.status = AccountStatus.CONNECTING
elif event.type == "connected":
account.status = AccountStatus.CONNECTED
account.phone_number = event.data.get("phoneNumber")
account.qr_code = None
elif event.type == "disconnected":
account.status = AccountStatus.DISCONNECTED
account.qr_code = None
elif event.type == "message":
msg_data = event.data
phone = msg_data.get("from", "").split("@")[0]
contact = db.query(Contact).filter(Contact.phone_number == phone).first()
if not contact:
contact = Contact(
phone_number=phone,
name=msg_data.get("pushName"),
)
db.add(contact)
db.commit()
db.refresh(contact)
conversation = db.query(Conversation).filter(
Conversation.whatsapp_account_id == account.id,
Conversation.contact_id == contact.id,
Conversation.status != ConversationStatus.RESOLVED,
).first()
if not conversation:
conversation = Conversation(
whatsapp_account_id=account.id,
contact_id=contact.id,
status=ConversationStatus.BOT,
)
db.add(conversation)
db.commit()
db.refresh(conversation)
wa_message = msg_data.get("message", {})
content = (
wa_message.get("conversation") or
wa_message.get("extendedTextMessage", {}).get("text") or
"[Media]"
)
message = Message(
conversation_id=conversation.id,
whatsapp_message_id=msg_data.get("id"),
direction=MessageDirection.INBOUND,
type=MessageType.TEXT,
content=content,
status=MessageStatus.DELIVERED,
)
db.add(message)
from datetime import datetime
conversation.last_message_at = datetime.utcnow()
db.commit()
db.refresh(message)
# Process message through Flow Engine (if in BOT status)
if conversation.status == ConversationStatus.BOT:
async with httpx.AsyncClient() as client:
try:
await client.post(
f"{settings.FLOW_ENGINE_URL}/process",
json={
"conversation_id": str(conversation.id),
"contact": {
"id": str(contact.id),
"phone_number": contact.phone_number,
"name": contact.name,
},
"conversation": {
"id": str(conversation.id),
"status": conversation.status.value,
},
"message": {
"id": str(message.id),
"content": content,
"type": message.type.value,
},
},
timeout=30,
)
except Exception as e:
print(f"Flow engine error: {e}")
return {"status": "ok"}
db.commit()
return {"status": "ok"}
class FlowSendRequest(BaseModel):
conversation_id: str
content: str
type: str = "text"
@router.post("/internal/flow/send")
async def flow_send_message(
request: FlowSendRequest,
db: Session = Depends(get_db),
):
"""Internal endpoint for flow engine to send messages"""
conversation = db.query(Conversation).filter(
Conversation.id == request.conversation_id
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
# Create message in DB
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=MessageType.TEXT,
content=request.content,
status=MessageStatus.PENDING,
)
db.add(message)
db.commit()
db.refresh(message)
# Send via WhatsApp Core
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
json={
"to": conversation.contact.phone_number,
"type": "text",
"content": {"text": request.content},
},
timeout=30,
)
if response.status_code == 200:
data = response.json()
message.whatsapp_message_id = data.get("messageId")
message.status = MessageStatus.SENT
else:
message.status = MessageStatus.FAILED
except Exception:
message.status = MessageStatus.FAILED
db.commit()
return {"success": True, "message_id": str(message.id)}
@router.post("/conversations/{conversation_id}/transfer-to-queue")
def transfer_to_queue(
conversation_id: UUID,
request: TransferToQueueRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = AssignmentService(db)
if service.transfer_to_queue(conversation_id, request.queue_id):
return {"success": True}
raise HTTPException(status_code=400, detail="Transfer failed")
@router.post("/conversations/{conversation_id}/transfer-to-agent")
def transfer_to_agent(
conversation_id: UUID,
request: TransferToAgentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = AssignmentService(db)
if service.transfer_to_agent(conversation_id, request.agent_id):
return {"success": True}
raise HTTPException(status_code=400, detail="Transfer failed")
@router.post("/conversations/{conversation_id}/transfer-to-bot")
def transfer_to_bot(
conversation_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = AssignmentService(db)
if service.transfer_to_bot(conversation_id):
return {"success": True}
raise HTTPException(status_code=400, detail="Transfer failed")
@router.post("/conversations/{conversation_id}/resolve")
def resolve_conversation(
conversation_id: UUID,
request: ResolveConversationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
service = AssignmentService(db)
if service.resolve_conversation(
conversation_id,
request.csat_score,
request.csat_feedback
):
return {"success": True}
raise HTTPException(status_code=400, detail="Failed to resolve")
@router.post("/conversations/{conversation_id}/notes")
def add_internal_note(
conversation_id: UUID,
request: InternalNoteRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
conversation = db.query(Conversation).filter(
Conversation.id == conversation_id
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=MessageType.TEXT,
content=request.content,
sent_by=current_user.id,
is_internal_note=True,
status=MessageStatus.DELIVERED,
)
db.add(message)
db.commit()
db.refresh(message)
return {"success": True, "message_id": str(message.id)}