from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID import httpx from pydantic import BaseModel from app.core.database import get_db from app.core.config import get_settings from app.core.security import get_current_user from app.models.user import User, UserRole from app.models.whatsapp import ( WhatsAppAccount, Contact, Conversation, Message, AccountStatus, MessageDirection, MessageType, MessageStatus, ConversationStatus ) from app.schemas.whatsapp import ( WhatsAppAccountCreate, WhatsAppAccountResponse, ConversationResponse, ConversationDetailResponse, SendMessageRequest, MessageResponse, InternalEventRequest, TransferToQueueRequest, TransferToAgentRequest, ResolveConversationRequest, InternalNoteRequest ) from app.services.assignment import AssignmentService router = APIRouter(prefix="/api/whatsapp", tags=["whatsapp"]) settings = get_settings() def require_admin(current_user: User = Depends(get_current_user)): if current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.post("/accounts", response_model=WhatsAppAccountResponse) async def create_account( request: WhatsAppAccountCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): account = WhatsAppAccount(name=request.name) db.add(account) db.commit() db.refresh(account) # Start session in WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions", json={"accountId": str(account.id), "name": account.name}, timeout=30, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "connecting")) db.commit() except Exception: pass return account @router.get("/accounts", response_model=List[WhatsAppAccountResponse]) async def list_accounts( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): accounts = db.query(WhatsAppAccount).all() # Sync status with WhatsApp Core for each account async with httpx.AsyncClient() as client: for account in accounts: try: response = await client.get( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account.id}", timeout=5, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "disconnected")) account.phone_number = data.get("phoneNumber") except Exception: pass db.commit() return accounts @router.get("/accounts/{account_id}", response_model=WhatsAppAccountResponse) async def get_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: response = await client.get( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}", timeout=10, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "disconnected")) account.phone_number = data.get("phoneNumber") db.commit() except Exception: pass return account @router.delete("/accounts/{account_id}") async def delete_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: await client.delete( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}", timeout=10, ) except Exception: pass db.delete(account) db.commit() return {"success": True} @router.post("/accounts/{account_id}/pause") async def pause_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): """Pause WhatsApp connection without logging out (preserves session)""" account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}/pause", timeout=10, ) if response.status_code == 200: account.status = AccountStatus.DISCONNECTED db.commit() return {"success": True, "status": "paused"} else: raise HTTPException(status_code=500, detail="Failed to pause session") except httpx.RequestError as e: raise HTTPException(status_code=500, detail=f"Connection error: {str(e)}") @router.post("/accounts/{account_id}/resume") async def resume_account( account_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): """Resume paused WhatsApp connection""" account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}/resume", timeout=30, ) if response.status_code == 200: data = response.json() session = data.get("session", {}) account.status = AccountStatus(session.get("status", "connecting")) account.qr_code = session.get("qrCode") db.commit() return {"success": True, "status": account.status.value} else: raise HTTPException(status_code=500, detail="Failed to resume session") except httpx.RequestError as e: raise HTTPException(status_code=500, detail=f"Connection error: {str(e)}") @router.get("/conversations", response_model=List[ConversationResponse]) def list_conversations( status: ConversationStatus = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): query = db.query(Conversation) if status: query = query.filter(Conversation.status == status) conversations = query.order_by(Conversation.last_message_at.desc()).limit(50).all() return conversations @router.get("/conversations/{conversation_id}", response_model=ConversationDetailResponse) def get_conversation( conversation_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") return conversation @router.post("/conversations/{conversation_id}/messages", response_model=MessageResponse) async def send_message( conversation_id: UUID, request: SendMessageRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): conversation = db.query(Conversation).filter(Conversation.id == conversation_id).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=request.type, content=request.content, media_url=request.media_url, sent_by=current_user.id, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", json={ "to": conversation.contact.phone_number, "type": request.type.value, "content": {"text": request.content} if request.type == MessageType.TEXT else { "url": request.media_url, "caption": request.content, }, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception: message.status = MessageStatus.FAILED db.commit() db.refresh(message) return message # Internal endpoint for WhatsApp Core events @router.post("/internal/whatsapp/event") async def handle_whatsapp_event( event: InternalEventRequest, db: Session = Depends(get_db), ): account = db.query(WhatsAppAccount).filter( WhatsAppAccount.id == event.accountId ).first() if not account: return {"status": "ignored", "reason": "account not found"} if event.type == "qr": account.qr_code = event.data.get("qrCode") account.status = AccountStatus.CONNECTING elif event.type == "connected": account.status = AccountStatus.CONNECTED account.phone_number = event.data.get("phoneNumber") account.qr_code = None elif event.type == "disconnected": account.status = AccountStatus.DISCONNECTED account.qr_code = None elif event.type == "message": msg_data = event.data phone = msg_data.get("from", "").split("@")[0] is_from_me = msg_data.get("fromMe", False) contact = db.query(Contact).filter(Contact.phone_number == phone).first() if not contact: contact = Contact( phone_number=phone, name=msg_data.get("pushName"), ) db.add(contact) db.commit() db.refresh(contact) conversation = db.query(Conversation).filter( Conversation.whatsapp_account_id == account.id, Conversation.contact_id == contact.id, Conversation.status != ConversationStatus.RESOLVED, ).first() if not conversation: conversation = Conversation( whatsapp_account_id=account.id, contact_id=contact.id, status=ConversationStatus.BOT, ) db.add(conversation) db.commit() db.refresh(conversation) wa_message = msg_data.get("message", {}) media_url = msg_data.get("mediaUrl") media_type = msg_data.get("mediaType", "text") # Extract text content content = ( wa_message.get("conversation") or wa_message.get("extendedTextMessage", {}).get("text") or wa_message.get("imageMessage", {}).get("caption") or wa_message.get("videoMessage", {}).get("caption") or wa_message.get("documentMessage", {}).get("fileName") or "" ) # Map media type to MessageType type_mapping = { "text": MessageType.TEXT, "image": MessageType.IMAGE, "audio": MessageType.AUDIO, "video": MessageType.VIDEO, "document": MessageType.DOCUMENT, "sticker": MessageType.IMAGE, } msg_type = type_mapping.get(media_type, MessageType.TEXT) # Build full media URL if present (use relative URL for browser access via nginx proxy) full_media_url = None if media_url: # Use relative URL that nginx will proxy to whatsapp-core full_media_url = media_url # e.g., "/media/uuid.jpg" # Set direction based on fromMe flag direction = MessageDirection.OUTBOUND if is_from_me else MessageDirection.INBOUND message = Message( conversation_id=conversation.id, whatsapp_message_id=msg_data.get("id"), direction=direction, type=msg_type, content=content if content else f"[{media_type.capitalize()}]", media_url=full_media_url, status=MessageStatus.DELIVERED if not is_from_me else MessageStatus.SENT, ) db.add(message) from datetime import datetime conversation.last_message_at = datetime.utcnow() db.commit() db.refresh(message) # Process message through Flow Engine (only for inbound messages in BOT status) if not is_from_me and conversation.status == ConversationStatus.BOT: async with httpx.AsyncClient() as client: try: await client.post( f"{settings.FLOW_ENGINE_URL}/process", json={ "conversation_id": str(conversation.id), "contact": { "id": str(contact.id), "phone_number": contact.phone_number, "name": contact.name, }, "conversation": { "id": str(conversation.id), "status": conversation.status.value, }, "message": { "id": str(message.id), "content": content, "type": message.type.value, }, }, timeout=30, ) except Exception as e: print(f"Flow engine error: {e}") # Send webhook to Odoo if configured if settings.ODOO_WEBHOOK_URL: async with httpx.AsyncClient() as client: try: await client.post( settings.ODOO_WEBHOOK_URL, json={ "type": "message", "account_id": str(account.id), "data": { "id": str(message.id), "conversation_id": str(conversation.id), "from": phone, "contact_name": contact.name, "content": content, "type": media_type, "direction": "outbound" if is_from_me else "inbound", "media_url": full_media_url, }, }, timeout=10, ) except Exception as e: print(f"Odoo webhook error: {e}") return {"status": "ok"} # Send account status to Odoo webhook if settings.ODOO_WEBHOOK_URL and event.type in ["connected", "disconnected", "qr"]: async with httpx.AsyncClient() as client: try: await client.post( settings.ODOO_WEBHOOK_URL, json={ "type": "account_status", "account_id": str(account.id), "data": { "status": account.status.value if account.status else "disconnected", "phone_number": account.phone_number, "qr_code": account.qr_code, }, }, timeout=10, ) except Exception as e: print(f"Odoo webhook error: {e}") db.commit() return {"status": "ok"} class FlowSendRequest(BaseModel): conversation_id: str content: str type: str = "text" @router.post("/internal/flow/send") async def flow_send_message( request: FlowSendRequest, db: Session = Depends(get_db), ): """Internal endpoint for flow engine to send messages""" conversation = db.query(Conversation).filter( Conversation.id == request.conversation_id ).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") # Create message in DB message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=MessageType.TEXT, content=request.content, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) # Send via WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", json={ "to": conversation.contact.phone_number, "type": "text", "content": {"text": request.content}, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception: message.status = MessageStatus.FAILED db.commit() return {"success": True, "message_id": str(message.id)} @router.post("/conversations/{conversation_id}/transfer-to-queue") def transfer_to_queue( conversation_id: UUID, request: TransferToQueueRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): service = AssignmentService(db) if service.transfer_to_queue(conversation_id, request.queue_id): return {"success": True} raise HTTPException(status_code=400, detail="Transfer failed") @router.post("/conversations/{conversation_id}/transfer-to-agent") def transfer_to_agent( conversation_id: UUID, request: TransferToAgentRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): service = AssignmentService(db) if service.transfer_to_agent(conversation_id, request.agent_id): return {"success": True} raise HTTPException(status_code=400, detail="Transfer failed") @router.post("/conversations/{conversation_id}/transfer-to-bot") def transfer_to_bot( conversation_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): service = AssignmentService(db) if service.transfer_to_bot(conversation_id): return {"success": True} raise HTTPException(status_code=400, detail="Transfer failed") @router.post("/conversations/{conversation_id}/resolve") def resolve_conversation( conversation_id: UUID, request: ResolveConversationRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): service = AssignmentService(db) if service.resolve_conversation( conversation_id, request.csat_score, request.csat_feedback ): return {"success": True} raise HTTPException(status_code=400, detail="Failed to resolve") @router.post("/conversations/{conversation_id}/notes") def add_internal_note( conversation_id: UUID, request: InternalNoteRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): conversation = db.query(Conversation).filter( Conversation.id == conversation_id ).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=MessageType.TEXT, content=request.content, sent_by=current_user.id, is_internal_note=True, status=MessageStatus.DELIVERED, ) db.add(message) db.commit() db.refresh(message) return {"success": True, "message_id": str(message.id)} # ============================================ # Odoo Internal Endpoints (no authentication) # ============================================ class OdooSendMessageRequest(BaseModel): phone_number: str message: str account_id: str @router.get("/internal/odoo/accounts/{account_id}") async def odoo_get_account( account_id: UUID, db: Session = Depends(get_db), ): """Get account status for Odoo (no auth required)""" account = db.query(WhatsAppAccount).filter(WhatsAppAccount.id == account_id).first() if not account: raise HTTPException(status_code=404, detail="Account not found") # Sync with WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.get( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account_id}", timeout=10, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "disconnected")) account.phone_number = data.get("phoneNumber") db.commit() except Exception: pass return { "id": str(account.id), "phone_number": account.phone_number, "name": account.name, "status": account.status.value if account.status else "disconnected", "qr_code": account.qr_code, } @router.get("/internal/odoo/accounts") async def odoo_list_accounts( db: Session = Depends(get_db), ): """List all accounts for Odoo (no auth required)""" accounts = db.query(WhatsAppAccount).all() async with httpx.AsyncClient() as client: for account in accounts: try: response = await client.get( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account.id}", timeout=5, ) if response.status_code == 200: data = response.json() account.qr_code = data.get("qrCode") account.status = AccountStatus(data.get("status", "disconnected")) account.phone_number = data.get("phoneNumber") except Exception: pass db.commit() return [ { "id": str(a.id), "phone_number": a.phone_number, "name": a.name, "status": a.status.value if a.status else "disconnected", } for a in accounts ] @router.post("/internal/odoo/send") async def odoo_send_message( request: OdooSendMessageRequest, db: Session = Depends(get_db), ): """Send WhatsApp message from Odoo (no auth required)""" account = db.query(WhatsAppAccount).filter( WhatsAppAccount.id == request.account_id ).first() if not account: raise HTTPException(status_code=404, detail="Account not found") # Find or create contact phone = request.phone_number.replace("+", "").replace(" ", "").replace("-", "") contact = db.query(Contact).filter(Contact.phone_number == phone).first() if not contact: contact = Contact(phone_number=phone) db.add(contact) db.commit() db.refresh(contact) # Find or create conversation conversation = db.query(Conversation).filter( Conversation.whatsapp_account_id == account.id, Conversation.contact_id == contact.id, Conversation.status != ConversationStatus.RESOLVED, ).first() if not conversation: conversation = Conversation( whatsapp_account_id=account.id, contact_id=contact.id, status=ConversationStatus.BOT, ) db.add(conversation) db.commit() db.refresh(conversation) # Create message message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=MessageType.TEXT, content=request.message, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) # Send via WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{account.id}/messages", json={ "to": phone, "type": "text", "content": {"text": request.message}, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception as e: message.status = MessageStatus.FAILED raise HTTPException(status_code=500, detail=f"Failed to send: {e}") db.commit() return {"success": True, "message_id": str(message.id)} @router.get("/internal/odoo/conversations") def odoo_list_conversations( account_id: str = None, db: Session = Depends(get_db), ): """List conversations for Odoo (no auth required)""" query = db.query(Conversation) if account_id: query = query.filter(Conversation.whatsapp_account_id == account_id) conversations = query.order_by(Conversation.last_message_at.desc()).limit(100).all() return [ { "id": str(c.id), "contact_phone": c.contact.phone_number if c.contact else None, "contact_name": c.contact.name if c.contact else None, "status": c.status.value if c.status else None, "last_message_at": c.last_message_at.isoformat() if c.last_message_at else None, } for c in conversations ]