from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID import httpx from pydantic import BaseModel from app.core.database import get_db from app.core.config import get_settings from app.core.security import get_current_user from app.models.user import User, UserRole from app.models.whatsapp import ( WhatsAppAccount, Contact, Conversation, Message, AccountStatus, MessageDirection, MessageType, MessageStatus, ConversationStatus ) from app.schemas.whatsapp import ( WhatsAppAccountCreate, WhatsAppAccountResponse, ConversationResponse, ConversationDetailResponse, SendMessageRequest, MessageResponse, InternalEventRequest ) router = APIRouter(prefix="/api/whatsapp", tags=["whatsapp"]) settings = get_settings() def require_admin(current_user: User = Depends(get_current_user)): if current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.post("/accounts", response_model=WhatsAppAccountResponse) async def create_account( request: WhatsAppAccountCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): account = WhatsAppAccount(name=request.name) db.add(account) db.commit() db.refresh(account) # Start session in WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions", json={"accountId": str(account.id), "name": account.name}, timeout=30, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "connecting")) db.commit() except Exception: pass return account @router.get("/accounts", response_model=List[WhatsAppAccountResponse]) def list_accounts( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): accounts = db.query(WhatsAppAccount).all() return accounts @router.get("/accounts/{account_id}", response_model=WhatsAppAccountResponse) async def get_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: response = await client.get( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}", timeout=10, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "disconnected")) account.phone_number = data.get("phoneNumber") db.commit() except Exception: pass return account @router.delete("/accounts/{account_id}") async def delete_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: await client.delete( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}", timeout=10, ) except Exception: pass db.delete(account) db.commit() return {"success": True} @router.get("/conversations", response_model=List[ConversationResponse]) def list_conversations( status: ConversationStatus = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): query = db.query(Conversation) if status: query = query.filter(Conversation.status == status) conversations = query.order_by(Conversation.last_message_at.desc()).limit(50).all() return conversations @router.get("/conversations/{conversation_id}", response_model=ConversationDetailResponse) def get_conversation( conversation_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") return conversation @router.post("/conversations/{conversation_id}/messages", response_model=MessageResponse) async def send_message( conversation_id: UUID, request: SendMessageRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=request.type, content=request.content, media_url=request.media_url, sent_by=current_user.id, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", json={ "to": conversation.contact.phone_number, "type": request.type.value, "content": {"text": request.content} if request.type == MessageType.TEXT else { "url": request.media_url, "caption": request.content, }, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception: message.status = MessageStatus.FAILED db.commit() db.refresh(message) return message # Internal endpoint for WhatsApp Core events @router.post("/internal/whatsapp/event") async def handle_whatsapp_event( event: InternalEventRequest, db: Session = Depends(get_db), ): account = db.query(WhatsAppAccount).filter( WhatsAppAccount.id == event.accountId ).first() if not account: return {"status": "ignored", "reason": "account not found"} if event.type == "qr": account.qr_code = event.data.get("qrCode") account.status = AccountStatus.CONNECTING elif event.type == "connected": account.status = AccountStatus.CONNECTED account.phone_number = event.data.get("phoneNumber") account.qr_code = None elif event.type == "disconnected": account.status = AccountStatus.DISCONNECTED account.qr_code = None elif event.type == "message": msg_data = event.data phone = msg_data.get("from", "").split("@")[0] contact = db.query(Contact).filter(Contact.phone_number == phone).first() if not contact: contact = Contact( phone_number=phone, name=msg_data.get("pushName"), ) db.add(contact) db.commit() db.refresh(contact) conversation = db.query(Conversation).filter( Conversation.whatsapp_account_id == account.id, Conversation.contact_id == contact.id, Conversation.status != ConversationStatus.RESOLVED, ).first() if not conversation: conversation = Conversation( whatsapp_account_id=account.id, contact_id=contact.id, status=ConversationStatus.BOT, ) db.add(conversation) db.commit() db.refresh(conversation) wa_message = msg_data.get("message", {}) content = ( wa_message.get("conversation") or wa_message.get("extendedTextMessage", {}).get("text") or "[Media]" ) message = Message( conversation_id=conversation.id, whatsapp_message_id=msg_data.get("id"), direction=MessageDirection.INBOUND, type=MessageType.TEXT, content=content, status=MessageStatus.DELIVERED, ) db.add(message) from datetime import datetime conversation.last_message_at = datetime.utcnow() db.commit() db.refresh(message) # Process message through Flow Engine (if in BOT status) if conversation.status == ConversationStatus.BOT: async with httpx.AsyncClient() as client: try: await client.post( f"{settings.FLOW_ENGINE_URL}/process", json={ "conversation_id": str(conversation.id), "contact": { "id": str(contact.id), "phone_number": contact.phone_number, "name": contact.name, }, "conversation": { "id": str(conversation.id), "status": conversation.status.value, }, "message": { "id": str(message.id), "content": content, "type": message.type.value, }, }, timeout=30, ) except Exception as e: print(f"Flow engine error: {e}") return {"status": "ok"} db.commit() return {"status": "ok"} class FlowSendRequest(BaseModel): conversation_id: str content: str type: str = "text" @router.post("/internal/flow/send") async def flow_send_message( request: FlowSendRequest, db: Session = Depends(get_db), ): """Internal endpoint for flow engine to send messages""" conversation = db.query(Conversation).filter( Conversation.id == request.conversation_id ).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") # Create message in DB message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=MessageType.TEXT, content=request.content, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) # Send via WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", json={ "to": conversation.contact.phone_number, "type": "text", "content": {"text": request.content}, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception: message.status = MessageStatus.FAILED db.commit() return {"success": True, "message_id": str(message.id)}