Files
WhatsAppCentralizado/docs/plans/2026-01-29-fase-2-flow-engine.md
2026-01-29 10:13:58 +00:00

42 KiB

Fase 2: Flow Engine Básico - Plan de Implementación

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Crear el motor de chatbot con Flow Builder visual para automatizar conversaciones de WhatsApp.

Architecture: Flow Engine (Python) procesa mensajes entrantes, evalúa triggers, ejecuta nodos y gestiona estado. Frontend usa React Flow para editor visual drag & drop. Los flujos se almacenan como JSON en PostgreSQL.

Tech Stack: Python 3.11, FastAPI, React Flow, Zustand, PostgreSQL JSONB, Redis para estado de sesiones.


Task 1: Database Models para Flows

Files:

  • Create: services/api-gateway/app/models/flow.py
  • Modify: services/api-gateway/app/models/__init__.py

Step 1: Create flow.py

import uuid
from datetime import datetime
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum
from sqlalchemy.dialects.postgresql import UUID, JSONB
import enum
from app.core.database import Base


class TriggerType(str, enum.Enum):
    WELCOME = "welcome"
    KEYWORD = "keyword"
    FALLBACK = "fallback"
    EVENT = "event"
    MANUAL = "manual"


class Flow(Base):
    __tablename__ = "flows"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    trigger_type = Column(SQLEnum(TriggerType), nullable=False)
    trigger_value = Column(String(255), nullable=True)  # keywords, event name, etc.
    nodes = Column(JSONB, default=list)  # Array of node definitions
    edges = Column(JSONB, default=list)  # React Flow edges
    variables = Column(JSONB, default=dict)  # Flow-level variables
    is_active = Column(Boolean, default=False, nullable=False)
    version = Column(Integer, default=1, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)


class FlowSession(Base):
    """Active flow execution state per conversation"""
    __tablename__ = "flow_sessions"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
    flow_id = Column(UUID(as_uuid=True), nullable=False)
    current_node_id = Column(String(100), nullable=True)
    variables = Column(JSONB, default=dict)  # Session variables
    waiting_for_input = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)

Step 2: Update models/init.py

from app.models.user import User
from app.models.whatsapp import WhatsAppAccount, Contact, Conversation, Message
from app.models.flow import Flow, FlowSession, TriggerType

__all__ = [
    "User",
    "WhatsAppAccount", "Contact", "Conversation", "Message",
    "Flow", "FlowSession", "TriggerType"
]

Step 3: Commit

git add services/api-gateway/app/models/
git commit -m "feat(api-gateway): add Flow and FlowSession database models"

Task 2: Flow API Schemas

Files:

  • Create: services/api-gateway/app/schemas/flow.py
  • Modify: services/api-gateway/app/schemas/__init__.py

Step 1: Create flow.py

from pydantic import BaseModel
from typing import Optional, List, Any
from uuid import UUID
from datetime import datetime
from app.models.flow import TriggerType


class NodeData(BaseModel):
    label: str
    type: str  # message, condition, wait_input, buttons, etc.
    config: dict = {}


class FlowNode(BaseModel):
    id: str
    type: str
    position: dict  # {x: number, y: number}
    data: NodeData


class FlowEdge(BaseModel):
    id: str
    source: str
    target: str
    sourceHandle: Optional[str] = None
    targetHandle: Optional[str] = None


class FlowCreate(BaseModel):
    name: str
    description: Optional[str] = None
    trigger_type: TriggerType
    trigger_value: Optional[str] = None


class FlowUpdate(BaseModel):
    name: Optional[str] = None
    description: Optional[str] = None
    trigger_type: Optional[TriggerType] = None
    trigger_value: Optional[str] = None
    nodes: Optional[List[dict]] = None
    edges: Optional[List[dict]] = None
    variables: Optional[dict] = None
    is_active: Optional[bool] = None


class FlowResponse(BaseModel):
    id: UUID
    name: str
    description: Optional[str]
    trigger_type: TriggerType
    trigger_value: Optional[str]
    nodes: List[dict]
    edges: List[dict]
    variables: dict
    is_active: bool
    version: int
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True


class FlowListResponse(BaseModel):
    id: UUID
    name: str
    trigger_type: TriggerType
    is_active: bool
    version: int
    updated_at: datetime

    class Config:
        from_attributes = True

Step 2: Update schemas/init.py

from app.schemas import auth, whatsapp, flow

__all__ = ["auth", "whatsapp", "flow"]

Step 3: Commit

git add services/api-gateway/app/schemas/
git commit -m "feat(api-gateway): add Flow API schemas"

Task 3: Flow API Routes

Files:

  • Create: services/api-gateway/app/routers/flows.py
  • Modify: services/api-gateway/app/routers/__init__.py
  • Modify: services/api-gateway/app/main.py

Step 1: Create flows.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.flow import Flow, TriggerType
from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse

router = APIRouter(prefix="/api/flows", tags=["flows"])


def require_admin(current_user: User = Depends(get_current_user)):
    if current_user.role != UserRole.ADMIN:
        raise HTTPException(status_code=403, detail="Admin required")
    return current_user


@router.get("", response_model=List[FlowListResponse])
def list_flows(
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    flows = db.query(Flow).order_by(Flow.updated_at.desc()).all()
    return flows


@router.post("", response_model=FlowResponse)
def create_flow(
    request: FlowCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    flow = Flow(
        name=request.name,
        description=request.description,
        trigger_type=request.trigger_type,
        trigger_value=request.trigger_value,
        nodes=[],
        edges=[],
        variables={},
    )
    db.add(flow)
    db.commit()
    db.refresh(flow)
    return flow


@router.get("/{flow_id}", response_model=FlowResponse)
def get_flow(
    flow_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    flow = db.query(Flow).filter(Flow.id == flow_id).first()
    if not flow:
        raise HTTPException(status_code=404, detail="Flow not found")
    return flow


@router.put("/{flow_id}", response_model=FlowResponse)
def update_flow(
    flow_id: UUID,
    request: FlowUpdate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    flow = db.query(Flow).filter(Flow.id == flow_id).first()
    if not flow:
        raise HTTPException(status_code=404, detail="Flow not found")

    update_data = request.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(flow, field, value)

    flow.version += 1
    db.commit()
    db.refresh(flow)
    return flow


@router.delete("/{flow_id}")
def delete_flow(
    flow_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    flow = db.query(Flow).filter(Flow.id == flow_id).first()
    if not flow:
        raise HTTPException(status_code=404, detail="Flow not found")

    db.delete(flow)
    db.commit()
    return {"success": True}


@router.post("/{flow_id}/activate")
def activate_flow(
    flow_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    flow = db.query(Flow).filter(Flow.id == flow_id).first()
    if not flow:
        raise HTTPException(status_code=404, detail="Flow not found")

    # Deactivate other flows with same trigger if welcome/fallback
    if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]:
        db.query(Flow).filter(
            Flow.trigger_type == flow.trigger_type,
            Flow.id != flow_id
        ).update({"is_active": False})

    flow.is_active = True
    db.commit()
    return {"success": True}


@router.post("/{flow_id}/deactivate")
def deactivate_flow(
    flow_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    flow = db.query(Flow).filter(Flow.id == flow_id).first()
    if not flow:
        raise HTTPException(status_code=404, detail="Flow not found")

    flow.is_active = False
    db.commit()
    return {"success": True}

Step 2: Update routers/init.py

from app.routers import auth, whatsapp, flows

__all__ = ["auth", "whatsapp", "flows"]

Step 3: Update main.py - add import and router

Add to imports:

from app.routers import auth, whatsapp, flows

Add router:

app.include_router(flows.router)

Step 4: Commit

git add services/api-gateway/app/routers/ services/api-gateway/app/main.py
git commit -m "feat(api-gateway): add Flow CRUD API routes"

Task 4: Flow Engine Service Setup

Files:

  • Create: services/flow-engine/requirements.txt
  • Create: services/flow-engine/Dockerfile
  • Create: services/flow-engine/app/__init__.py
  • Create: services/flow-engine/app/config.py

Step 1: Create requirements.txt

fastapi==0.115.6
uvicorn[standard]==0.34.0
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
redis==5.2.1
httpx==0.28.1
pydantic==2.10.4
pydantic-settings==2.7.1

Step 2: Create Dockerfile

FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app ./app

EXPOSE 8001

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"]

Step 3: Create app/init.py

# WhatsApp Centralizado - Flow Engine

Step 4: Create app/config.py

from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    DATABASE_URL: str = "postgresql://whatsapp_admin:password@localhost:5432/whatsapp_central"
    REDIS_URL: str = "redis://localhost:6379"
    API_GATEWAY_URL: str = "http://localhost:8000"
    WHATSAPP_CORE_URL: str = "http://localhost:3001"

    class Config:
        env_file = ".env"


@lru_cache
def get_settings() -> Settings:
    return Settings()

Step 5: Commit

git add services/flow-engine/
git commit -m "feat(flow-engine): setup project structure"

Task 5: Flow Engine Core

Files:

  • Create: services/flow-engine/app/engine.py
  • Create: services/flow-engine/app/context.py

Step 1: Create context.py

from typing import Any, Optional
from datetime import datetime
import re


class FlowContext:
    """Manages variables and state during flow execution"""

    def __init__(
        self,
        contact: dict,
        conversation: dict,
        message: dict,
        session_vars: dict = None
    ):
        self.contact = contact
        self.conversation = conversation
        self.message = message
        self.variables = session_vars or {}
        self._system = {
            "date": datetime.now().strftime("%Y-%m-%d"),
            "time": datetime.now().strftime("%H:%M"),
            "day_of_week": datetime.now().strftime("%A"),
        }

    def get(self, key: str) -> Any:
        """Get variable by dot notation: contact.name, variables.email"""
        parts = key.split(".")

        if parts[0] == "contact":
            return self._get_nested(self.contact, parts[1:])
        elif parts[0] == "conversation":
            return self._get_nested(self.conversation, parts[1:])
        elif parts[0] == "message":
            return self._get_nested(self.message, parts[1:])
        elif parts[0] == "system":
            return self._get_nested(self._system, parts[1:])
        elif parts[0] == "variables":
            return self._get_nested(self.variables, parts[1:])
        else:
            return self.variables.get(key)

    def set(self, key: str, value: Any):
        """Set a session variable"""
        self.variables[key] = value

    def _get_nested(self, obj: dict, keys: list) -> Any:
        for key in keys:
            if isinstance(obj, dict):
                obj = obj.get(key)
            else:
                return None
        return obj

    def interpolate(self, text: str) -> str:
        """Replace {{variable}} with actual values"""
        pattern = r'\{\{([^}]+)\}\}'

        def replace(match):
            key = match.group(1).strip()
            value = self.get(key)
            return str(value) if value is not None else ""

        return re.sub(pattern, replace, text)

    def to_dict(self) -> dict:
        return self.variables.copy()

Step 2: Create engine.py

from typing import Optional, Tuple
import httpx
from app.config import get_settings
from app.context import FlowContext

settings = get_settings()


class FlowEngine:
    """Executes flow nodes based on incoming messages"""

    def __init__(self, db_session):
        self.db = db_session

    async def process_message(
        self,
        conversation_id: str,
        contact: dict,
        conversation: dict,
        message: dict,
    ) -> bool:
        """
        Process incoming message through flow engine.
        Returns True if handled by a flow, False otherwise.
        """
        from app.models import Flow, FlowSession, TriggerType

        # Check for active flow session
        session = self.db.query(FlowSession).filter(
            FlowSession.conversation_id == conversation_id
        ).first()

        if session and session.waiting_for_input:
            # Continue existing flow
            flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
            if flow:
                context = FlowContext(contact, conversation, message, session.variables)
                await self._execute_from_node(flow, session, context)
                return True

        # Find matching flow by trigger
        flow = self._find_matching_flow(message.get("content", ""))

        if flow:
            context = FlowContext(contact, conversation, message)
            session = FlowSession(
                conversation_id=conversation_id,
                flow_id=flow.id,
                variables={},
            )
            self.db.add(session)
            self.db.commit()

            await self._execute_flow(flow, session, context)
            return True

        return False

    def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
        from app.models import Flow, TriggerType

        message_lower = message_text.lower().strip()

        # Check keyword triggers
        keyword_flows = self.db.query(Flow).filter(
            Flow.trigger_type == TriggerType.KEYWORD,
            Flow.is_active == True
        ).all()

        for flow in keyword_flows:
            keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
            if any(kw in message_lower for kw in keywords if kw):
                return flow

        # Check welcome trigger (first message - simplified)
        welcome_flow = self.db.query(Flow).filter(
            Flow.trigger_type == TriggerType.WELCOME,
            Flow.is_active == True
        ).first()

        if welcome_flow:
            return welcome_flow

        # Fallback
        return self.db.query(Flow).filter(
            Flow.trigger_type == TriggerType.FALLBACK,
            Flow.is_active == True
        ).first()

    async def _execute_flow(self, flow, session, context: FlowContext):
        """Start flow execution from first node"""
        nodes = flow.nodes or []
        if not nodes:
            return

        # Find start node (trigger node)
        start_node = next(
            (n for n in nodes if n.get("data", {}).get("type") == "trigger"),
            nodes[0] if nodes else None
        )

        if start_node:
            session.current_node_id = start_node["id"]
            await self._execute_from_node(flow, session, context)

    async def _execute_from_node(self, flow, session, context: FlowContext):
        """Execute flow starting from current node"""
        nodes = {n["id"]: n for n in (flow.nodes or [])}
        edges = flow.edges or []

        current_id = session.current_node_id

        while current_id:
            node = nodes.get(current_id)
            if not node:
                break

            node_type = node.get("data", {}).get("type", "")
            config = node.get("data", {}).get("config", {})

            # Execute node
            result = await self._execute_node(node_type, config, context, session)

            if result == "wait":
                # Node requires input, save state and exit
                session.waiting_for_input = True
                session.variables = context.to_dict()
                self.db.commit()
                return

            # Find next node
            next_edge = self._find_next_edge(edges, current_id, result)
            current_id = next_edge["target"] if next_edge else None
            session.current_node_id = current_id
            session.waiting_for_input = False

        # Flow completed
        session.variables = context.to_dict()
        self.db.commit()

    async def _execute_node(
        self,
        node_type: str,
        config: dict,
        context: FlowContext,
        session
    ) -> Optional[str]:
        """Execute a single node, return result for routing"""

        if node_type == "trigger":
            return "default"

        elif node_type == "message":
            text = context.interpolate(config.get("text", ""))
            await self._send_message(session.conversation_id, text)
            return "default"

        elif node_type == "buttons":
            text = context.interpolate(config.get("text", ""))
            buttons = config.get("buttons", [])
            # For now, send as text with options
            button_text = "\n".join([f"• {b['label']}" for b in buttons])
            await self._send_message(session.conversation_id, f"{text}\n\n{button_text}")
            return "wait"

        elif node_type == "wait_input":
            variable = config.get("variable", "user_input")
            context.set(variable, context.message.get("content", ""))
            return "default"

        elif node_type == "condition":
            return self._evaluate_condition(config, context)

        elif node_type == "set_variable":
            var_name = config.get("variable", "")
            var_value = context.interpolate(config.get("value", ""))
            context.set(var_name, var_value)
            return "default"

        return "default"

    def _evaluate_condition(self, config: dict, context: FlowContext) -> str:
        """Evaluate condition and return branch name"""
        conditions = config.get("conditions", [])

        for cond in conditions:
            field = cond.get("field", "")
            operator = cond.get("operator", "equals")
            value = cond.get("value", "")
            branch = cond.get("branch", "default")

            actual = context.get(field) or context.message.get("content", "")

            if operator == "equals" and str(actual).lower() == str(value).lower():
                return branch
            elif operator == "contains" and str(value).lower() in str(actual).lower():
                return branch
            elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
                return branch

        return "default"

    def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
        """Find the next edge from source node"""
        for edge in edges:
            if edge.get("source") == source_id:
                if handle and edge.get("sourceHandle") != handle:
                    continue
                return edge
        return None

    async def _send_message(self, conversation_id: str, text: str):
        """Send message via API Gateway"""
        async with httpx.AsyncClient() as client:
            try:
                await client.post(
                    f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
                    json={
                        "conversation_id": conversation_id,
                        "content": text,
                        "type": "text"
                    },
                    timeout=30
                )
            except Exception as e:
                print(f"Failed to send message: {e}")

Step 3: Commit

git add services/flow-engine/app/
git commit -m "feat(flow-engine): add FlowContext and FlowEngine core"

Task 6: Flow Engine Main & API

Files:

  • Create: services/flow-engine/app/main.py
  • Create: services/flow-engine/app/models.py

Step 1: Create models.py (shared models import)

from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import sessionmaker, declarative_base
import enum
import uuid
from datetime import datetime
from app.config import get_settings

settings = get_settings()
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


class TriggerType(str, enum.Enum):
    WELCOME = "welcome"
    KEYWORD = "keyword"
    FALLBACK = "fallback"
    EVENT = "event"
    MANUAL = "manual"


class Flow(Base):
    __tablename__ = "flows"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    trigger_type = Column(SQLEnum(TriggerType), nullable=False)
    trigger_value = Column(String(255), nullable=True)
    nodes = Column(JSONB, default=list)
    edges = Column(JSONB, default=list)
    variables = Column(JSONB, default=dict)
    is_active = Column(Boolean, default=False, nullable=False)
    version = Column(Integer, default=1, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)


class FlowSession(Base):
    __tablename__ = "flow_sessions"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
    flow_id = Column(UUID(as_uuid=True), nullable=False)
    current_node_id = Column(String(100), nullable=True)
    variables = Column(JSONB, default=dict)
    waiting_for_input = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 2: Create main.py

from fastapi import FastAPI, Depends
from pydantic import BaseModel
from typing import Optional
from sqlalchemy.orm import Session
from app.models import get_db
from app.engine import FlowEngine
from app.config import get_settings

settings = get_settings()

app = FastAPI(
    title="WhatsApp Centralizado - Flow Engine",
    version="1.0.0",
)


class ProcessMessageRequest(BaseModel):
    conversation_id: str
    contact: dict
    conversation: dict
    message: dict


class ProcessMessageResponse(BaseModel):
    handled: bool
    flow_id: Optional[str] = None


@app.get("/health")
def health_check():
    return {"status": "ok", "service": "flow-engine"}


@app.post("/process", response_model=ProcessMessageResponse)
async def process_message(
    request: ProcessMessageRequest,
    db: Session = Depends(get_db),
):
    """Process an incoming message through the flow engine"""
    engine = FlowEngine(db)

    handled = await engine.process_message(
        conversation_id=request.conversation_id,
        contact=request.contact,
        conversation=request.conversation,
        message=request.message,
    )

    return ProcessMessageResponse(handled=handled)

Step 3: Commit

git add services/flow-engine/app/
git commit -m "feat(flow-engine): add main API and database models"

Task 7: Update Docker Compose

Files:

  • Modify: docker-compose.yml

Step 1: Add flow-engine service

Add after api-gateway service:

  flow-engine:
    build:
      context: ./services/flow-engine
      dockerfile: Dockerfile
    container_name: wac_flow_engine
    restart: unless-stopped
    environment:
      DATABASE_URL: postgresql://${DB_USER:-whatsapp_admin}:${DB_PASSWORD}@postgres:5432/${DB_NAME:-whatsapp_central}
      REDIS_URL: redis://redis:6379
      API_GATEWAY_URL: http://api-gateway:8000
      WHATSAPP_CORE_URL: http://whatsapp-core:3001
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    networks:
      - wac_network

Step 2: Commit

git add docker-compose.yml
git commit -m "feat(docker): add flow-engine service"

Task 8: API Gateway - Internal Flow Routes

Files:

  • Modify: services/api-gateway/app/routers/whatsapp.py

Step 1: Add internal endpoint for flow messages

Add at the end of whatsapp.py:

class FlowSendRequest(BaseModel):
    conversation_id: str
    content: str
    type: str = "text"


@router.post("/internal/flow/send")
async def flow_send_message(
    request: FlowSendRequest,
    db: Session = Depends(get_db),
):
    """Internal endpoint for flow engine to send messages"""
    conversation = db.query(Conversation).filter(
        Conversation.id == request.conversation_id
    ).first()

    if not conversation:
        raise HTTPException(status_code=404, detail="Conversation not found")

    # Create message in DB
    message = Message(
        conversation_id=conversation.id,
        direction=MessageDirection.OUTBOUND,
        type=MessageType.TEXT,
        content=request.content,
        status=MessageStatus.PENDING,
    )
    db.add(message)
    db.commit()
    db.refresh(message)

    # Send via WhatsApp Core
    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
                json={
                    "to": conversation.contact.phone_number,
                    "type": "text",
                    "content": {"text": request.content},
                },
                timeout=30,
            )
            if response.status_code == 200:
                data = response.json()
                message.whatsapp_message_id = data.get("messageId")
                message.status = MessageStatus.SENT
            else:
                message.status = MessageStatus.FAILED
        except Exception:
            message.status = MessageStatus.FAILED

    db.commit()
    return {"success": True, "message_id": str(message.id)}

Step 2: Add Pydantic import for FlowSendRequest

Add to imports at top of whatsapp.py:

from pydantic import BaseModel

Step 3: Commit

git add services/api-gateway/app/routers/whatsapp.py
git commit -m "feat(api-gateway): add internal flow send message endpoint"

Task 9: Integrate Flow Engine with Message Handler

Files:

  • Modify: services/api-gateway/app/routers/whatsapp.py

Step 1: Update handle_whatsapp_event to call flow engine

In the elif event.type == "message": block, after creating the message, add flow engine call:

        # After creating message and before db.commit()
        # Call flow engine
        try:
            async with httpx.AsyncClient() as client:
                await client.post(
                    "http://flow-engine:8001/process",
                    json={
                        "conversation_id": str(conversation.id),
                        "contact": {
                            "id": str(contact.id),
                            "phone_number": contact.phone_number,
                            "name": contact.name,
                        },
                        "conversation": {
                            "id": str(conversation.id),
                            "status": conversation.status.value,
                        },
                        "message": {
                            "id": str(message.id),
                            "content": content,
                            "type": "text",
                        },
                    },
                    timeout=30,
                )
        except Exception as e:
            print(f"Flow engine error: {e}")

Step 2: Commit

git add services/api-gateway/app/routers/whatsapp.py
git commit -m "feat(api-gateway): integrate flow engine on message events"

Task 10: Frontend - Flow Builder Page Setup

Files:

  • Modify: frontend/package.json
  • Create: frontend/src/pages/FlowBuilder.tsx
  • Modify: frontend/src/layouts/MainLayout.tsx

Step 1: Add reactflow dependency to package.json

Add to dependencies:

"reactflow": "^11.11.4"

Step 2: Create FlowBuilder.tsx

import { useState, useCallback } from 'react';
import { useParams, useNavigate } from 'react-router-dom';
import ReactFlow, {
  Node,
  Edge,
  Controls,
  Background,
  useNodesState,
  useEdgesState,
  addEdge,
  Connection,
  NodeTypes,
} from 'reactflow';
import 'reactflow/dist/style.css';
import { Button, message, Space } from 'antd';
import { SaveOutlined, ArrowLeftOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';

// Custom node components (simplified for now)
const TriggerNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
    <strong>🚀 {data.label}</strong>
  </div>
);

const MessageNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #1890ff', borderRadius: 8, background: '#e6f7ff', minWidth: 150 }}>
    <strong>💬 Mensaje</strong>
    <div style={{ fontSize: 12, marginTop: 4 }}>{data.config?.text?.slice(0, 50) || 'Sin texto'}</div>
  </div>
);

const ConditionNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #faad14', borderRadius: 8, background: '#fffbe6' }}>
    <strong> Condición</strong>
  </div>
);

const WaitInputNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
    <strong> Esperar Input</strong>
  </div>
);

const nodeTypes: NodeTypes = {
  trigger: TriggerNode,
  message: MessageNode,
  condition: ConditionNode,
  wait_input: WaitInputNode,
};

interface Flow {
  id: string;
  name: string;
  nodes: Node[];
  edges: Edge[];
}

export default function FlowBuilder() {
  const { flowId } = useParams<{ flowId: string }>();
  const navigate = useNavigate();
  const queryClient = useQueryClient();

  const [nodes, setNodes, onNodesChange] = useNodesState([]);
  const [edges, setEdges, onEdgesChange] = useEdgesState([]);

  const { data: flow, isLoading } = useQuery({
    queryKey: ['flow', flowId],
    queryFn: async () => {
      const data = await apiClient.get<Flow>(`/api/flows/${flowId}`);
      setNodes(data.nodes || []);
      setEdges(data.edges || []);
      return data;
    },
    enabled: !!flowId,
  });

  const saveMutation = useMutation({
    mutationFn: async () => {
      await apiClient.put(`/api/flows/${flowId}`, {
        nodes,
        edges,
      });
    },
    onSuccess: () => {
      message.success('Flujo guardado');
      queryClient.invalidateQueries({ queryKey: ['flow', flowId] });
    },
    onError: () => {
      message.error('Error al guardar');
    },
  });

  const onConnect = useCallback(
    (params: Connection) => setEdges((eds) => addEdge(params, eds)),
    [setEdges]
  );

  const addNode = (type: string) => {
    const newNode: Node = {
      id: `${type}-${Date.now()}`,
      type,
      position: { x: 250, y: nodes.length * 100 + 50 },
      data: {
        label: type === 'trigger' ? 'Inicio' : type,
        type,
        config: type === 'message' ? { text: 'Hola {{contact.name}}!' } : {}
      },
    };
    setNodes((nds) => [...nds, newNode]);
  };

  if (isLoading) {
    return <div>Cargando...</div>;
  }

  return (
    <div style={{ height: 'calc(100vh - 140px)' }}>
      <div style={{ marginBottom: 16, display: 'flex', justifyContent: 'space-between' }}>
        <Space>
          <Button icon={<ArrowLeftOutlined />} onClick={() => navigate('/flows')}>
            Volver
          </Button>
          <span style={{ fontSize: 18, fontWeight: 'bold' }}>{flow?.name}</span>
        </Space>
        <Space>
          <Button onClick={() => addNode('trigger')}>+ Trigger</Button>
          <Button onClick={() => addNode('message')}>+ Mensaje</Button>
          <Button onClick={() => addNode('condition')}>+ Condición</Button>
          <Button onClick={() => addNode('wait_input')}>+ Esperar Input</Button>
          <Button
            type="primary"
            icon={<SaveOutlined />}
            onClick={() => saveMutation.mutate()}
            loading={saveMutation.isPending}
            style={{ background: '#25D366' }}
          >
            Guardar
          </Button>
        </Space>
      </div>

      <div style={{ height: 'calc(100% - 50px)', border: '1px solid #d9d9d9', borderRadius: 8 }}>
        <ReactFlow
          nodes={nodes}
          edges={edges}
          onNodesChange={onNodesChange}
          onEdgesChange={onEdgesChange}
          onConnect={onConnect}
          nodeTypes={nodeTypes}
          fitView
        >
          <Controls />
          <Background />
        </ReactFlow>
      </div>
    </div>
  );
}

Step 3: Update MainLayout.tsx

Add import and route:

import FlowBuilder from '../pages/FlowBuilder';

Add menu item:

{
  key: '/flows',
  icon: <BranchesOutlined />,
  label: 'Flujos',
},

Add route in Routes:

<Route path="/flows" element={<FlowList />} />
<Route path="/flows/:flowId" element={<FlowBuilder />} />

Add import for BranchesOutlined:

import { BranchesOutlined } from '@ant-design/icons';

Step 4: Commit

git add frontend/
git commit -m "feat(frontend): add FlowBuilder page with React Flow"

Task 11: Frontend - Flow List Page

Files:

  • Create: frontend/src/pages/FlowList.tsx

Step 1: Create FlowList.tsx

import { useState } from 'react';
import { useNavigate } from 'react-router-dom';
import {
  Card,
  Button,
  Table,
  Tag,
  Modal,
  Form,
  Input,
  Select,
  message,
  Space,
  Typography,
} from 'antd';
import {
  PlusOutlined,
  EditOutlined,
  DeleteOutlined,
  PlayCircleOutlined,
  PauseCircleOutlined,
} from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';

const { Title } = Typography;

interface Flow {
  id: string;
  name: string;
  trigger_type: string;
  is_active: boolean;
  version: number;
  updated_at: string;
}

const triggerLabels: Record<string, string> = {
  welcome: 'Bienvenida',
  keyword: 'Palabra clave',
  fallback: 'Fallback',
  event: 'Evento',
  manual: 'Manual',
};

export default function FlowList() {
  const [isModalOpen, setIsModalOpen] = useState(false);
  const [form] = Form.useForm();
  const navigate = useNavigate();
  const queryClient = useQueryClient();

  const { data: flows, isLoading } = useQuery({
    queryKey: ['flows'],
    queryFn: () => apiClient.get<Flow[]>('/api/flows'),
  });

  const createMutation = useMutation({
    mutationFn: (data: any) => apiClient.post<Flow>('/api/flows', data),
    onSuccess: (data) => {
      message.success('Flujo creado');
      setIsModalOpen(false);
      form.resetFields();
      queryClient.invalidateQueries({ queryKey: ['flows'] });
      navigate(`/flows/${data.id}`);
    },
    onError: () => {
      message.error('Error al crear flujo');
    },
  });

  const deleteMutation = useMutation({
    mutationFn: (id: string) => apiClient.delete(`/api/flows/${id}`),
    onSuccess: () => {
      message.success('Flujo eliminado');
      queryClient.invalidateQueries({ queryKey: ['flows'] });
    },
  });

  const toggleActiveMutation = useMutation({
    mutationFn: async ({ id, active }: { id: string; active: boolean }) => {
      const endpoint = active ? 'activate' : 'deactivate';
      await apiClient.post(`/api/flows/${id}/${endpoint}`, {});
    },
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ['flows'] });
      message.success('Estado actualizado');
    },
  });

  const columns = [
    {
      title: 'Nombre',
      dataIndex: 'name',
      key: 'name',
    },
    {
      title: 'Trigger',
      dataIndex: 'trigger_type',
      key: 'trigger_type',
      render: (type: string) => triggerLabels[type] || type,
    },
    {
      title: 'Estado',
      dataIndex: 'is_active',
      key: 'is_active',
      render: (active: boolean) => (
        <Tag color={active ? 'green' : 'default'}>
          {active ? 'Activo' : 'Inactivo'}
        </Tag>
      ),
    },
    {
      title: 'Versión',
      dataIndex: 'version',
      key: 'version',
    },
    {
      title: 'Acciones',
      key: 'actions',
      render: (_: any, record: Flow) => (
        <Space>
          <Button
            icon={<EditOutlined />}
            onClick={() => navigate(`/flows/${record.id}`)}
          >
            Editar
          </Button>
          <Button
            icon={record.is_active ? <PauseCircleOutlined /> : <PlayCircleOutlined />}
            onClick={() =>
              toggleActiveMutation.mutate({ id: record.id, active: !record.is_active })
            }
          >
            {record.is_active ? 'Desactivar' : 'Activar'}
          </Button>
          <Button
            danger
            icon={<DeleteOutlined />}
            onClick={() => {
              Modal.confirm({
                title: '¿Eliminar flujo?',
                onOk: () => deleteMutation.mutate(record.id),
              });
            }}
          />
        </Space>
      ),
    },
  ];

  return (
    <div>
      <div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
        <Title level={4} style={{ margin: 0 }}>Flujos de Chatbot</Title>
        <Button
          type="primary"
          icon={<PlusOutlined />}
          onClick={() => setIsModalOpen(true)}
          style={{ background: '#25D366' }}
        >
          Crear Flujo
        </Button>
      </div>

      <Card>
        <Table
          dataSource={flows}
          columns={columns}
          rowKey="id"
          loading={isLoading}
          pagination={false}
        />
      </Card>

      <Modal
        title="Crear nuevo flujo"
        open={isModalOpen}
        onCancel={() => {
          setIsModalOpen(false);
          form.resetFields();
        }}
        footer={null}
      >
        <Form
          form={form}
          layout="vertical"
          onFinish={(values) => createMutation.mutate(values)}
        >
          <Form.Item
            name="name"
            label="Nombre"
            rules={[{ required: true, message: 'Ingresa un nombre' }]}
          >
            <Input placeholder="Ej: Bienvenida Principal" />
          </Form.Item>

          <Form.Item
            name="trigger_type"
            label="Tipo de Trigger"
            rules={[{ required: true, message: 'Selecciona un trigger' }]}
          >
            <Select placeholder="Selecciona...">
              <Select.Option value="welcome">Bienvenida (primer mensaje)</Select.Option>
              <Select.Option value="keyword">Palabra clave</Select.Option>
              <Select.Option value="fallback">Fallback (sin coincidencia)</Select.Option>
            </Select>
          </Form.Item>

          <Form.Item
            noStyle
            shouldUpdate={(prev, curr) => prev.trigger_type !== curr.trigger_type}
          >
            {({ getFieldValue }) =>
              getFieldValue('trigger_type') === 'keyword' && (
                <Form.Item
                  name="trigger_value"
                  label="Palabras clave"
                  rules={[{ required: true }]}
                >
                  <Input placeholder="hola, menu, ayuda (separadas por coma)" />
                </Form.Item>
              )
            }
          </Form.Item>

          <Form.Item>
            <Button
              type="primary"
              htmlType="submit"
              loading={createMutation.isPending}
              block
            >
              Crear
            </Button>
          </Form.Item>
        </Form>
      </Modal>
    </div>
  );
}

Step 2: Commit

git add frontend/src/pages/FlowList.tsx
git commit -m "feat(frontend): add FlowList page for managing flows"

Task 12: Final Integration & Testing

Files:

  • Update: frontend/src/layouts/MainLayout.tsx (complete updates)

Step 1: Complete MainLayout updates

Ensure all imports and routes are properly added for FlowList and FlowBuilder.

Step 2: Create database migration for flows table

Run alembic revision:

cd services/api-gateway
alembic revision --autogenerate -m "add flows and flow_sessions tables"

Step 3: Final commit

git add .
git commit -m "feat: complete Fase 2 - Flow Engine with visual builder"

Summary

Fase 2 creates:

  1. Database Models: Flow, FlowSession tables for storing chatbot flows
  2. Flow API: CRUD operations for managing flows
  3. Flow Engine Service: Python service that executes flows
  4. Flow Context: Variable interpolation system
  5. Node Types: trigger, message, condition, wait_input, set_variable
  6. Frontend Flow Builder: Visual drag & drop editor with React Flow
  7. Frontend Flow List: Management page for flows

To test:

docker-compose up -d --build

# Open http://localhost:3000
# Go to Flujos → Create flow
# Add trigger + message nodes
# Connect them
# Activate flow
# Send message from WhatsApp to test