Files
WhatsAppCentralizado/services/api-gateway/app/routers/whatsapp.py
Claude AI c97d380635 feat(phase2): add Flow Builder UI and internal flow routes
- Add FlowBuilder.tsx with React Flow visual editor
- Add FlowList.tsx for flow management
- Add /internal/flow/send endpoint for flow-engine messaging
- Add reactflow dependency to frontend

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 10:23:02 +00:00

333 lines
11 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
import httpx
from pydantic import BaseModel
from app.core.database import get_db
from app.core.config import get_settings
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.whatsapp import (
WhatsAppAccount, Contact, Conversation, Message,
AccountStatus, MessageDirection, MessageType, MessageStatus, ConversationStatus
)
from app.schemas.whatsapp import (
WhatsAppAccountCreate, WhatsAppAccountResponse,
ConversationResponse, ConversationDetailResponse,
SendMessageRequest, MessageResponse, InternalEventRequest
)
router = APIRouter(prefix="/api/whatsapp", tags=["whatsapp"])
settings = get_settings()
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.post("/accounts", response_model=WhatsAppAccountResponse)
async def create_account(
request: WhatsAppAccountCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
account = WhatsAppAccount(name=request.name)
db.add(account)
db.commit()
db.refresh(account)
# Start session in WhatsApp Core
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions",
json={"accountId": str(account.id), "name": account.name},
timeout=30,
)
if response.status_code == 200:
data = response.json()
account.qr_code = data.get("qrCode")
account.status = AccountStatus(data.get("status", "connecting"))
db.commit()
except Exception:
pass
return account
@router.get("/accounts", response_model=List[WhatsAppAccountResponse])
def list_accounts(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
accounts = db.query(WhatsAppAccount).all()
return accounts
@router.get("/accounts/{account_id}", response_model=WhatsAppAccountResponse)
async def get_account(
account_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first()
if not account:
raise HTTPException(status_code=404, detail="Account not found")
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}",
timeout=10,
)
if response.status_code == 200:
data = response.json()
account.qr_code = data.get("qrCode")
account.status = AccountStatus(data.get("status", "disconnected"))
account.phone_number = data.get("phoneNumber")
db.commit()
except Exception:
pass
return account
@router.delete("/accounts/{account_id}")
async def delete_account(
account_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first()
if not account:
raise HTTPException(status_code=404, detail="Account not found")
async with httpx.AsyncClient() as client:
try:
await client.delete(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}",
timeout=10,
)
except Exception:
pass
db.delete(account)
db.commit()
return {"success": True}
@router.get("/conversations", response_model=List[ConversationResponse])
def list_conversations(
status: ConversationStatus = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(Conversation)
if status:
query = query.filter(Conversation.status == status)
conversations = query.order_by(Conversation.last_message_at.desc()).limit(50).all()
return conversations
@router.get("/conversations/{conversation_id}", response_model=ConversationDetailResponse)
def get_conversation(
conversation_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
return conversation
@router.post("/conversations/{conversation_id}/messages", response_model=MessageResponse)
async def send_message(
conversation_id: UUID,
request: SendMessageRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=request.type,
content=request.content,
media_url=request.media_url,
sent_by=current_user.id,
status=MessageStatus.PENDING,
)
db.add(message)
db.commit()
db.refresh(message)
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
json={
"to": conversation.contact.phone_number,
"type": request.type.value,
"content": {"text": request.content} if request.type == MessageType.TEXT else {
"url": request.media_url,
"caption": request.content,
},
},
timeout=30,
)
if response.status_code == 200:
data = response.json()
message.whatsapp_message_id = data.get("messageId")
message.status = MessageStatus.SENT
else:
message.status = MessageStatus.FAILED
except Exception:
message.status = MessageStatus.FAILED
db.commit()
db.refresh(message)
return message
# Internal endpoint for WhatsApp Core events
@router.post("/internal/whatsapp/event")
async def handle_whatsapp_event(
event: InternalEventRequest,
db: Session = Depends(get_db),
):
account = db.query(WhatsAppAccount).filter(
WhatsAppAccount.id == event.accountId
).first()
if not account:
return {"status": "ignored", "reason": "account not found"}
if event.type == "qr":
account.qr_code = event.data.get("qrCode")
account.status = AccountStatus.CONNECTING
elif event.type == "connected":
account.status = AccountStatus.CONNECTED
account.phone_number = event.data.get("phoneNumber")
account.qr_code = None
elif event.type == "disconnected":
account.status = AccountStatus.DISCONNECTED
account.qr_code = None
elif event.type == "message":
msg_data = event.data
phone = msg_data.get("from", "").split("@")[0]
contact = db.query(Contact).filter(Contact.phone_number == phone).first()
if not contact:
contact = Contact(
phone_number=phone,
name=msg_data.get("pushName"),
)
db.add(contact)
db.commit()
db.refresh(contact)
conversation = db.query(Conversation).filter(
Conversation.whatsapp_account_id == account.id,
Conversation.contact_id == contact.id,
Conversation.status != ConversationStatus.RESOLVED,
).first()
if not conversation:
conversation = Conversation(
whatsapp_account_id=account.id,
contact_id=contact.id,
status=ConversationStatus.BOT,
)
db.add(conversation)
db.commit()
db.refresh(conversation)
wa_message = msg_data.get("message", {})
content = (
wa_message.get("conversation") or
wa_message.get("extendedTextMessage", {}).get("text") or
"[Media]"
)
message = Message(
conversation_id=conversation.id,
whatsapp_message_id=msg_data.get("id"),
direction=MessageDirection.INBOUND,
type=MessageType.TEXT,
content=content,
status=MessageStatus.DELIVERED,
)
db.add(message)
from datetime import datetime
conversation.last_message_at = datetime.utcnow()
db.commit()
return {"status": "ok"}
class FlowSendRequest(BaseModel):
conversation_id: str
content: str
type: str = "text"
@router.post("/internal/flow/send")
async def flow_send_message(
request: FlowSendRequest,
db: Session = Depends(get_db),
):
"""Internal endpoint for flow engine to send messages"""
conversation = db.query(Conversation).filter(
Conversation.id == request.conversation_id
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
# Create message in DB
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=MessageType.TEXT,
content=request.content,
status=MessageStatus.PENDING,
)
db.add(message)
db.commit()
db.refresh(message)
# Send via WhatsApp Core
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
json={
"to": conversation.contact.phone_number,
"type": "text",
"content": {"text": request.content},
},
timeout=30,
)
if response.status_code == 200:
data = response.json()
message.whatsapp_message_id = data.get("messageId")
message.status = MessageStatus.SENT
else:
message.status = MessageStatus.FAILED
except Exception:
message.status = MessageStatus.FAILED
db.commit()
return {"success": True, "message_id": str(message.id)}