From 4a004b0b0083f864d2b9e2febcd4aedd5a1a229b Mon Sep 17 00:00:00 2001 From: Claude AI Date: Thu, 29 Jan 2026 10:13:58 +0000 Subject: [PATCH] docs: add Fase 2 Flow Engine implementation plan --- docs/plans/2026-01-29-fase-2-flow-engine.md | 1588 +++++++++++++++++++ 1 file changed, 1588 insertions(+) create mode 100644 docs/plans/2026-01-29-fase-2-flow-engine.md diff --git a/docs/plans/2026-01-29-fase-2-flow-engine.md b/docs/plans/2026-01-29-fase-2-flow-engine.md new file mode 100644 index 0000000..3a290b0 --- /dev/null +++ b/docs/plans/2026-01-29-fase-2-flow-engine.md @@ -0,0 +1,1588 @@ +# Fase 2: Flow Engine Básico - Plan de Implementación + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Crear el motor de chatbot con Flow Builder visual para automatizar conversaciones de WhatsApp. + +**Architecture:** Flow Engine (Python) procesa mensajes entrantes, evalúa triggers, ejecuta nodos y gestiona estado. Frontend usa React Flow para editor visual drag & drop. Los flujos se almacenan como JSON en PostgreSQL. + +**Tech Stack:** Python 3.11, FastAPI, React Flow, Zustand, PostgreSQL JSONB, Redis para estado de sesiones. + +--- + +## Task 1: Database Models para Flows + +**Files:** +- Create: `services/api-gateway/app/models/flow.py` +- Modify: `services/api-gateway/app/models/__init__.py` + +**Step 1: Create flow.py** + +```python +import uuid +from datetime import datetime +from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum +from sqlalchemy.dialects.postgresql import UUID, JSONB +import enum +from app.core.database import Base + + +class TriggerType(str, enum.Enum): + WELCOME = "welcome" + KEYWORD = "keyword" + FALLBACK = "fallback" + EVENT = "event" + MANUAL = "manual" + + +class Flow(Base): + __tablename__ = "flows" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(100), nullable=False) + description = Column(Text, nullable=True) + trigger_type = Column(SQLEnum(TriggerType), nullable=False) + trigger_value = Column(String(255), nullable=True) # keywords, event name, etc. + nodes = Column(JSONB, default=list) # Array of node definitions + edges = Column(JSONB, default=list) # React Flow edges + variables = Column(JSONB, default=dict) # Flow-level variables + is_active = Column(Boolean, default=False, nullable=False) + version = Column(Integer, default=1, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + +class FlowSession(Base): + """Active flow execution state per conversation""" + __tablename__ = "flow_sessions" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True) + flow_id = Column(UUID(as_uuid=True), nullable=False) + current_node_id = Column(String(100), nullable=True) + variables = Column(JSONB, default=dict) # Session variables + waiting_for_input = Column(Boolean, default=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) +``` + +**Step 2: Update models/__init__.py** + +```python +from app.models.user import User +from app.models.whatsapp import WhatsAppAccount, Contact, Conversation, Message +from app.models.flow import Flow, FlowSession, TriggerType + +__all__ = [ + "User", + "WhatsAppAccount", "Contact", "Conversation", "Message", + "Flow", "FlowSession", "TriggerType" +] +``` + +**Step 3: Commit** + +```bash +git add services/api-gateway/app/models/ +git commit -m "feat(api-gateway): add Flow and FlowSession database models" +``` + +--- + +## Task 2: Flow API Schemas + +**Files:** +- Create: `services/api-gateway/app/schemas/flow.py` +- Modify: `services/api-gateway/app/schemas/__init__.py` + +**Step 1: Create flow.py** + +```python +from pydantic import BaseModel +from typing import Optional, List, Any +from uuid import UUID +from datetime import datetime +from app.models.flow import TriggerType + + +class NodeData(BaseModel): + label: str + type: str # message, condition, wait_input, buttons, etc. + config: dict = {} + + +class FlowNode(BaseModel): + id: str + type: str + position: dict # {x: number, y: number} + data: NodeData + + +class FlowEdge(BaseModel): + id: str + source: str + target: str + sourceHandle: Optional[str] = None + targetHandle: Optional[str] = None + + +class FlowCreate(BaseModel): + name: str + description: Optional[str] = None + trigger_type: TriggerType + trigger_value: Optional[str] = None + + +class FlowUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + trigger_type: Optional[TriggerType] = None + trigger_value: Optional[str] = None + nodes: Optional[List[dict]] = None + edges: Optional[List[dict]] = None + variables: Optional[dict] = None + is_active: Optional[bool] = None + + +class FlowResponse(BaseModel): + id: UUID + name: str + description: Optional[str] + trigger_type: TriggerType + trigger_value: Optional[str] + nodes: List[dict] + edges: List[dict] + variables: dict + is_active: bool + version: int + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class FlowListResponse(BaseModel): + id: UUID + name: str + trigger_type: TriggerType + is_active: bool + version: int + updated_at: datetime + + class Config: + from_attributes = True +``` + +**Step 2: Update schemas/__init__.py** + +```python +from app.schemas import auth, whatsapp, flow + +__all__ = ["auth", "whatsapp", "flow"] +``` + +**Step 3: Commit** + +```bash +git add services/api-gateway/app/schemas/ +git commit -m "feat(api-gateway): add Flow API schemas" +``` + +--- + +## Task 3: Flow API Routes + +**Files:** +- Create: `services/api-gateway/app/routers/flows.py` +- Modify: `services/api-gateway/app/routers/__init__.py` +- Modify: `services/api-gateway/app/main.py` + +**Step 1: Create flows.py** + +```python +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from typing import List +from uuid import UUID +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User, UserRole +from app.models.flow import Flow, TriggerType +from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse + +router = APIRouter(prefix="/api/flows", tags=["flows"]) + + +def require_admin(current_user: User = Depends(get_current_user)): + if current_user.role != UserRole.ADMIN: + raise HTTPException(status_code=403, detail="Admin required") + return current_user + + +@router.get("", response_model=List[FlowListResponse]) +def list_flows( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + flows = db.query(Flow).order_by(Flow.updated_at.desc()).all() + return flows + + +@router.post("", response_model=FlowResponse) +def create_flow( + request: FlowCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = Flow( + name=request.name, + description=request.description, + trigger_type=request.trigger_type, + trigger_value=request.trigger_value, + nodes=[], + edges=[], + variables={}, + ) + db.add(flow) + db.commit() + db.refresh(flow) + return flow + + +@router.get("/{flow_id}", response_model=FlowResponse) +def get_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + return flow + + +@router.put("/{flow_id}", response_model=FlowResponse) +def update_flow( + flow_id: UUID, + request: FlowUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + update_data = request.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(flow, field, value) + + flow.version += 1 + db.commit() + db.refresh(flow) + return flow + + +@router.delete("/{flow_id}") +def delete_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + db.delete(flow) + db.commit() + return {"success": True} + + +@router.post("/{flow_id}/activate") +def activate_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + # Deactivate other flows with same trigger if welcome/fallback + if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]: + db.query(Flow).filter( + Flow.trigger_type == flow.trigger_type, + Flow.id != flow_id + ).update({"is_active": False}) + + flow.is_active = True + db.commit() + return {"success": True} + + +@router.post("/{flow_id}/deactivate") +def deactivate_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + flow.is_active = False + db.commit() + return {"success": True} +``` + +**Step 2: Update routers/__init__.py** + +```python +from app.routers import auth, whatsapp, flows + +__all__ = ["auth", "whatsapp", "flows"] +``` + +**Step 3: Update main.py - add import and router** + +Add to imports: +```python +from app.routers import auth, whatsapp, flows +``` + +Add router: +```python +app.include_router(flows.router) +``` + +**Step 4: Commit** + +```bash +git add services/api-gateway/app/routers/ services/api-gateway/app/main.py +git commit -m "feat(api-gateway): add Flow CRUD API routes" +``` + +--- + +## Task 4: Flow Engine Service Setup + +**Files:** +- Create: `services/flow-engine/requirements.txt` +- Create: `services/flow-engine/Dockerfile` +- Create: `services/flow-engine/app/__init__.py` +- Create: `services/flow-engine/app/config.py` + +**Step 1: Create requirements.txt** + +``` +fastapi==0.115.6 +uvicorn[standard]==0.34.0 +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +redis==5.2.1 +httpx==0.28.1 +pydantic==2.10.4 +pydantic-settings==2.7.1 +``` + +**Step 2: Create Dockerfile** + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app ./app + +EXPOSE 8001 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"] +``` + +**Step 3: Create app/__init__.py** + +```python +# WhatsApp Centralizado - Flow Engine +``` + +**Step 4: Create app/config.py** + +```python +from pydantic_settings import BaseSettings +from functools import lru_cache + + +class Settings(BaseSettings): + DATABASE_URL: str = "postgresql://whatsapp_admin:password@localhost:5432/whatsapp_central" + REDIS_URL: str = "redis://localhost:6379" + API_GATEWAY_URL: str = "http://localhost:8000" + WHATSAPP_CORE_URL: str = "http://localhost:3001" + + class Config: + env_file = ".env" + + +@lru_cache +def get_settings() -> Settings: + return Settings() +``` + +**Step 5: Commit** + +```bash +git add services/flow-engine/ +git commit -m "feat(flow-engine): setup project structure" +``` + +--- + +## Task 5: Flow Engine Core + +**Files:** +- Create: `services/flow-engine/app/engine.py` +- Create: `services/flow-engine/app/context.py` + +**Step 1: Create context.py** + +```python +from typing import Any, Optional +from datetime import datetime +import re + + +class FlowContext: + """Manages variables and state during flow execution""" + + def __init__( + self, + contact: dict, + conversation: dict, + message: dict, + session_vars: dict = None + ): + self.contact = contact + self.conversation = conversation + self.message = message + self.variables = session_vars or {} + self._system = { + "date": datetime.now().strftime("%Y-%m-%d"), + "time": datetime.now().strftime("%H:%M"), + "day_of_week": datetime.now().strftime("%A"), + } + + def get(self, key: str) -> Any: + """Get variable by dot notation: contact.name, variables.email""" + parts = key.split(".") + + if parts[0] == "contact": + return self._get_nested(self.contact, parts[1:]) + elif parts[0] == "conversation": + return self._get_nested(self.conversation, parts[1:]) + elif parts[0] == "message": + return self._get_nested(self.message, parts[1:]) + elif parts[0] == "system": + return self._get_nested(self._system, parts[1:]) + elif parts[0] == "variables": + return self._get_nested(self.variables, parts[1:]) + else: + return self.variables.get(key) + + def set(self, key: str, value: Any): + """Set a session variable""" + self.variables[key] = value + + def _get_nested(self, obj: dict, keys: list) -> Any: + for key in keys: + if isinstance(obj, dict): + obj = obj.get(key) + else: + return None + return obj + + def interpolate(self, text: str) -> str: + """Replace {{variable}} with actual values""" + pattern = r'\{\{([^}]+)\}\}' + + def replace(match): + key = match.group(1).strip() + value = self.get(key) + return str(value) if value is not None else "" + + return re.sub(pattern, replace, text) + + def to_dict(self) -> dict: + return self.variables.copy() +``` + +**Step 2: Create engine.py** + +```python +from typing import Optional, Tuple +import httpx +from app.config import get_settings +from app.context import FlowContext + +settings = get_settings() + + +class FlowEngine: + """Executes flow nodes based on incoming messages""" + + def __init__(self, db_session): + self.db = db_session + + async def process_message( + self, + conversation_id: str, + contact: dict, + conversation: dict, + message: dict, + ) -> bool: + """ + Process incoming message through flow engine. + Returns True if handled by a flow, False otherwise. + """ + from app.models import Flow, FlowSession, TriggerType + + # Check for active flow session + session = self.db.query(FlowSession).filter( + FlowSession.conversation_id == conversation_id + ).first() + + if session and session.waiting_for_input: + # Continue existing flow + flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first() + if flow: + context = FlowContext(contact, conversation, message, session.variables) + await self._execute_from_node(flow, session, context) + return True + + # Find matching flow by trigger + flow = self._find_matching_flow(message.get("content", "")) + + if flow: + context = FlowContext(contact, conversation, message) + session = FlowSession( + conversation_id=conversation_id, + flow_id=flow.id, + variables={}, + ) + self.db.add(session) + self.db.commit() + + await self._execute_flow(flow, session, context) + return True + + return False + + def _find_matching_flow(self, message_text: str) -> Optional["Flow"]: + from app.models import Flow, TriggerType + + message_lower = message_text.lower().strip() + + # Check keyword triggers + keyword_flows = self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.KEYWORD, + Flow.is_active == True + ).all() + + for flow in keyword_flows: + keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")] + if any(kw in message_lower for kw in keywords if kw): + return flow + + # Check welcome trigger (first message - simplified) + welcome_flow = self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.WELCOME, + Flow.is_active == True + ).first() + + if welcome_flow: + return welcome_flow + + # Fallback + return self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.FALLBACK, + Flow.is_active == True + ).first() + + async def _execute_flow(self, flow, session, context: FlowContext): + """Start flow execution from first node""" + nodes = flow.nodes or [] + if not nodes: + return + + # Find start node (trigger node) + start_node = next( + (n for n in nodes if n.get("data", {}).get("type") == "trigger"), + nodes[0] if nodes else None + ) + + if start_node: + session.current_node_id = start_node["id"] + await self._execute_from_node(flow, session, context) + + async def _execute_from_node(self, flow, session, context: FlowContext): + """Execute flow starting from current node""" + nodes = {n["id"]: n for n in (flow.nodes or [])} + edges = flow.edges or [] + + current_id = session.current_node_id + + while current_id: + node = nodes.get(current_id) + if not node: + break + + node_type = node.get("data", {}).get("type", "") + config = node.get("data", {}).get("config", {}) + + # Execute node + result = await self._execute_node(node_type, config, context, session) + + if result == "wait": + # Node requires input, save state and exit + session.waiting_for_input = True + session.variables = context.to_dict() + self.db.commit() + return + + # Find next node + next_edge = self._find_next_edge(edges, current_id, result) + current_id = next_edge["target"] if next_edge else None + session.current_node_id = current_id + session.waiting_for_input = False + + # Flow completed + session.variables = context.to_dict() + self.db.commit() + + async def _execute_node( + self, + node_type: str, + config: dict, + context: FlowContext, + session + ) -> Optional[str]: + """Execute a single node, return result for routing""" + + if node_type == "trigger": + return "default" + + elif node_type == "message": + text = context.interpolate(config.get("text", "")) + await self._send_message(session.conversation_id, text) + return "default" + + elif node_type == "buttons": + text = context.interpolate(config.get("text", "")) + buttons = config.get("buttons", []) + # For now, send as text with options + button_text = "\n".join([f"• {b['label']}" for b in buttons]) + await self._send_message(session.conversation_id, f"{text}\n\n{button_text}") + return "wait" + + elif node_type == "wait_input": + variable = config.get("variable", "user_input") + context.set(variable, context.message.get("content", "")) + return "default" + + elif node_type == "condition": + return self._evaluate_condition(config, context) + + elif node_type == "set_variable": + var_name = config.get("variable", "") + var_value = context.interpolate(config.get("value", "")) + context.set(var_name, var_value) + return "default" + + return "default" + + def _evaluate_condition(self, config: dict, context: FlowContext) -> str: + """Evaluate condition and return branch name""" + conditions = config.get("conditions", []) + + for cond in conditions: + field = cond.get("field", "") + operator = cond.get("operator", "equals") + value = cond.get("value", "") + branch = cond.get("branch", "default") + + actual = context.get(field) or context.message.get("content", "") + + if operator == "equals" and str(actual).lower() == str(value).lower(): + return branch + elif operator == "contains" and str(value).lower() in str(actual).lower(): + return branch + elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): + return branch + + return "default" + + def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: + """Find the next edge from source node""" + for edge in edges: + if edge.get("source") == source_id: + if handle and edge.get("sourceHandle") != handle: + continue + return edge + return None + + async def _send_message(self, conversation_id: str, text: str): + """Send message via API Gateway""" + async with httpx.AsyncClient() as client: + try: + await client.post( + f"{settings.API_GATEWAY_URL}/api/internal/flow/send", + json={ + "conversation_id": conversation_id, + "content": text, + "type": "text" + }, + timeout=30 + ) + except Exception as e: + print(f"Failed to send message: {e}") +``` + +**Step 3: Commit** + +```bash +git add services/flow-engine/app/ +git commit -m "feat(flow-engine): add FlowContext and FlowEngine core" +``` + +--- + +## Task 6: Flow Engine Main & API + +**Files:** +- Create: `services/flow-engine/app/main.py` +- Create: `services/flow-engine/app/models.py` + +**Step 1: Create models.py (shared models import)** + +```python +from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import sessionmaker, declarative_base +import enum +import uuid +from datetime import datetime +from app.config import get_settings + +settings = get_settings() +engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base = declarative_base() + + +class TriggerType(str, enum.Enum): + WELCOME = "welcome" + KEYWORD = "keyword" + FALLBACK = "fallback" + EVENT = "event" + MANUAL = "manual" + + +class Flow(Base): + __tablename__ = "flows" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(100), nullable=False) + description = Column(Text, nullable=True) + trigger_type = Column(SQLEnum(TriggerType), nullable=False) + trigger_value = Column(String(255), nullable=True) + nodes = Column(JSONB, default=list) + edges = Column(JSONB, default=list) + variables = Column(JSONB, default=dict) + is_active = Column(Boolean, default=False, nullable=False) + version = Column(Integer, default=1, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + +class FlowSession(Base): + __tablename__ = "flow_sessions" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True) + flow_id = Column(UUID(as_uuid=True), nullable=False) + current_node_id = Column(String(100), nullable=True) + variables = Column(JSONB, default=dict) + waiting_for_input = Column(Boolean, default=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() +``` + +**Step 2: Create main.py** + +```python +from fastapi import FastAPI, Depends +from pydantic import BaseModel +from typing import Optional +from sqlalchemy.orm import Session +from app.models import get_db +from app.engine import FlowEngine +from app.config import get_settings + +settings = get_settings() + +app = FastAPI( + title="WhatsApp Centralizado - Flow Engine", + version="1.0.0", +) + + +class ProcessMessageRequest(BaseModel): + conversation_id: str + contact: dict + conversation: dict + message: dict + + +class ProcessMessageResponse(BaseModel): + handled: bool + flow_id: Optional[str] = None + + +@app.get("/health") +def health_check(): + return {"status": "ok", "service": "flow-engine"} + + +@app.post("/process", response_model=ProcessMessageResponse) +async def process_message( + request: ProcessMessageRequest, + db: Session = Depends(get_db), +): + """Process an incoming message through the flow engine""" + engine = FlowEngine(db) + + handled = await engine.process_message( + conversation_id=request.conversation_id, + contact=request.contact, + conversation=request.conversation, + message=request.message, + ) + + return ProcessMessageResponse(handled=handled) +``` + +**Step 3: Commit** + +```bash +git add services/flow-engine/app/ +git commit -m "feat(flow-engine): add main API and database models" +``` + +--- + +## Task 7: Update Docker Compose + +**Files:** +- Modify: `docker-compose.yml` + +**Step 1: Add flow-engine service** + +Add after api-gateway service: + +```yaml + flow-engine: + build: + context: ./services/flow-engine + dockerfile: Dockerfile + container_name: wac_flow_engine + restart: unless-stopped + environment: + DATABASE_URL: postgresql://${DB_USER:-whatsapp_admin}:${DB_PASSWORD}@postgres:5432/${DB_NAME:-whatsapp_central} + REDIS_URL: redis://redis:6379 + API_GATEWAY_URL: http://api-gateway:8000 + WHATSAPP_CORE_URL: http://whatsapp-core:3001 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + networks: + - wac_network +``` + +**Step 2: Commit** + +```bash +git add docker-compose.yml +git commit -m "feat(docker): add flow-engine service" +``` + +--- + +## Task 8: API Gateway - Internal Flow Routes + +**Files:** +- Modify: `services/api-gateway/app/routers/whatsapp.py` + +**Step 1: Add internal endpoint for flow messages** + +Add at the end of whatsapp.py: + +```python +class FlowSendRequest(BaseModel): + conversation_id: str + content: str + type: str = "text" + + +@router.post("/internal/flow/send") +async def flow_send_message( + request: FlowSendRequest, + db: Session = Depends(get_db), +): + """Internal endpoint for flow engine to send messages""" + conversation = db.query(Conversation).filter( + Conversation.id == request.conversation_id + ).first() + + if not conversation: + raise HTTPException(status_code=404, detail="Conversation not found") + + # Create message in DB + message = Message( + conversation_id=conversation.id, + direction=MessageDirection.OUTBOUND, + type=MessageType.TEXT, + content=request.content, + status=MessageStatus.PENDING, + ) + db.add(message) + db.commit() + db.refresh(message) + + # Send via WhatsApp Core + async with httpx.AsyncClient() as client: + try: + response = await client.post( + f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", + json={ + "to": conversation.contact.phone_number, + "type": "text", + "content": {"text": request.content}, + }, + timeout=30, + ) + if response.status_code == 200: + data = response.json() + message.whatsapp_message_id = data.get("messageId") + message.status = MessageStatus.SENT + else: + message.status = MessageStatus.FAILED + except Exception: + message.status = MessageStatus.FAILED + + db.commit() + return {"success": True, "message_id": str(message.id)} +``` + +**Step 2: Add Pydantic import for FlowSendRequest** + +Add to imports at top of whatsapp.py: + +```python +from pydantic import BaseModel +``` + +**Step 3: Commit** + +```bash +git add services/api-gateway/app/routers/whatsapp.py +git commit -m "feat(api-gateway): add internal flow send message endpoint" +``` + +--- + +## Task 9: Integrate Flow Engine with Message Handler + +**Files:** +- Modify: `services/api-gateway/app/routers/whatsapp.py` + +**Step 1: Update handle_whatsapp_event to call flow engine** + +In the `elif event.type == "message":` block, after creating the message, add flow engine call: + +```python + # After creating message and before db.commit() + # Call flow engine + try: + async with httpx.AsyncClient() as client: + await client.post( + "http://flow-engine:8001/process", + json={ + "conversation_id": str(conversation.id), + "contact": { + "id": str(contact.id), + "phone_number": contact.phone_number, + "name": contact.name, + }, + "conversation": { + "id": str(conversation.id), + "status": conversation.status.value, + }, + "message": { + "id": str(message.id), + "content": content, + "type": "text", + }, + }, + timeout=30, + ) + except Exception as e: + print(f"Flow engine error: {e}") +``` + +**Step 2: Commit** + +```bash +git add services/api-gateway/app/routers/whatsapp.py +git commit -m "feat(api-gateway): integrate flow engine on message events" +``` + +--- + +## Task 10: Frontend - Flow Builder Page Setup + +**Files:** +- Modify: `frontend/package.json` +- Create: `frontend/src/pages/FlowBuilder.tsx` +- Modify: `frontend/src/layouts/MainLayout.tsx` + +**Step 1: Add reactflow dependency to package.json** + +Add to dependencies: +```json +"reactflow": "^11.11.4" +``` + +**Step 2: Create FlowBuilder.tsx** + +```tsx +import { useState, useCallback } from 'react'; +import { useParams, useNavigate } from 'react-router-dom'; +import ReactFlow, { + Node, + Edge, + Controls, + Background, + useNodesState, + useEdgesState, + addEdge, + Connection, + NodeTypes, +} from 'reactflow'; +import 'reactflow/dist/style.css'; +import { Button, message, Space } from 'antd'; +import { SaveOutlined, ArrowLeftOutlined } from '@ant-design/icons'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { apiClient } from '../api/client'; + +// Custom node components (simplified for now) +const TriggerNode = ({ data }: { data: any }) => ( +
+ 🚀 {data.label} +
+); + +const MessageNode = ({ data }: { data: any }) => ( +
+ 💬 Mensaje +
{data.config?.text?.slice(0, 50) || 'Sin texto'}
+
+); + +const ConditionNode = ({ data }: { data: any }) => ( +
+ ❓ Condición +
+); + +const WaitInputNode = ({ data }: { data: any }) => ( +
+ ⏳ Esperar Input +
+); + +const nodeTypes: NodeTypes = { + trigger: TriggerNode, + message: MessageNode, + condition: ConditionNode, + wait_input: WaitInputNode, +}; + +interface Flow { + id: string; + name: string; + nodes: Node[]; + edges: Edge[]; +} + +export default function FlowBuilder() { + const { flowId } = useParams<{ flowId: string }>(); + const navigate = useNavigate(); + const queryClient = useQueryClient(); + + const [nodes, setNodes, onNodesChange] = useNodesState([]); + const [edges, setEdges, onEdgesChange] = useEdgesState([]); + + const { data: flow, isLoading } = useQuery({ + queryKey: ['flow', flowId], + queryFn: async () => { + const data = await apiClient.get(`/api/flows/${flowId}`); + setNodes(data.nodes || []); + setEdges(data.edges || []); + return data; + }, + enabled: !!flowId, + }); + + const saveMutation = useMutation({ + mutationFn: async () => { + await apiClient.put(`/api/flows/${flowId}`, { + nodes, + edges, + }); + }, + onSuccess: () => { + message.success('Flujo guardado'); + queryClient.invalidateQueries({ queryKey: ['flow', flowId] }); + }, + onError: () => { + message.error('Error al guardar'); + }, + }); + + const onConnect = useCallback( + (params: Connection) => setEdges((eds) => addEdge(params, eds)), + [setEdges] + ); + + const addNode = (type: string) => { + const newNode: Node = { + id: `${type}-${Date.now()}`, + type, + position: { x: 250, y: nodes.length * 100 + 50 }, + data: { + label: type === 'trigger' ? 'Inicio' : type, + type, + config: type === 'message' ? { text: 'Hola {{contact.name}}!' } : {} + }, + }; + setNodes((nds) => [...nds, newNode]); + }; + + if (isLoading) { + return
Cargando...
; + } + + return ( +
+
+ + + {flow?.name} + + + + + + + + +
+ +
+ + + + +
+
+ ); +} +``` + +**Step 3: Update MainLayout.tsx** + +Add import and route: +```tsx +import FlowBuilder from '../pages/FlowBuilder'; +``` + +Add menu item: +```tsx +{ + key: '/flows', + icon: , + label: 'Flujos', +}, +``` + +Add route in Routes: +```tsx +} /> +} /> +``` + +Add import for BranchesOutlined: +```tsx +import { BranchesOutlined } from '@ant-design/icons'; +``` + +**Step 4: Commit** + +```bash +git add frontend/ +git commit -m "feat(frontend): add FlowBuilder page with React Flow" +``` + +--- + +## Task 11: Frontend - Flow List Page + +**Files:** +- Create: `frontend/src/pages/FlowList.tsx` + +**Step 1: Create FlowList.tsx** + +```tsx +import { useState } from 'react'; +import { useNavigate } from 'react-router-dom'; +import { + Card, + Button, + Table, + Tag, + Modal, + Form, + Input, + Select, + message, + Space, + Typography, +} from 'antd'; +import { + PlusOutlined, + EditOutlined, + DeleteOutlined, + PlayCircleOutlined, + PauseCircleOutlined, +} from '@ant-design/icons'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { apiClient } from '../api/client'; + +const { Title } = Typography; + +interface Flow { + id: string; + name: string; + trigger_type: string; + is_active: boolean; + version: number; + updated_at: string; +} + +const triggerLabels: Record = { + welcome: 'Bienvenida', + keyword: 'Palabra clave', + fallback: 'Fallback', + event: 'Evento', + manual: 'Manual', +}; + +export default function FlowList() { + const [isModalOpen, setIsModalOpen] = useState(false); + const [form] = Form.useForm(); + const navigate = useNavigate(); + const queryClient = useQueryClient(); + + const { data: flows, isLoading } = useQuery({ + queryKey: ['flows'], + queryFn: () => apiClient.get('/api/flows'), + }); + + const createMutation = useMutation({ + mutationFn: (data: any) => apiClient.post('/api/flows', data), + onSuccess: (data) => { + message.success('Flujo creado'); + setIsModalOpen(false); + form.resetFields(); + queryClient.invalidateQueries({ queryKey: ['flows'] }); + navigate(`/flows/${data.id}`); + }, + onError: () => { + message.error('Error al crear flujo'); + }, + }); + + const deleteMutation = useMutation({ + mutationFn: (id: string) => apiClient.delete(`/api/flows/${id}`), + onSuccess: () => { + message.success('Flujo eliminado'); + queryClient.invalidateQueries({ queryKey: ['flows'] }); + }, + }); + + const toggleActiveMutation = useMutation({ + mutationFn: async ({ id, active }: { id: string; active: boolean }) => { + const endpoint = active ? 'activate' : 'deactivate'; + await apiClient.post(`/api/flows/${id}/${endpoint}`, {}); + }, + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['flows'] }); + message.success('Estado actualizado'); + }, + }); + + const columns = [ + { + title: 'Nombre', + dataIndex: 'name', + key: 'name', + }, + { + title: 'Trigger', + dataIndex: 'trigger_type', + key: 'trigger_type', + render: (type: string) => triggerLabels[type] || type, + }, + { + title: 'Estado', + dataIndex: 'is_active', + key: 'is_active', + render: (active: boolean) => ( + + {active ? 'Activo' : 'Inactivo'} + + ), + }, + { + title: 'Versión', + dataIndex: 'version', + key: 'version', + }, + { + title: 'Acciones', + key: 'actions', + render: (_: any, record: Flow) => ( + + + + + + + + + + + { + setIsModalOpen(false); + form.resetFields(); + }} + footer={null} + > +
createMutation.mutate(values)} + > + + + + + + + + + prev.trigger_type !== curr.trigger_type} + > + {({ getFieldValue }) => + getFieldValue('trigger_type') === 'keyword' && ( + + + + ) + } + + + + + + +
+ + ); +} +``` + +**Step 2: Commit** + +```bash +git add frontend/src/pages/FlowList.tsx +git commit -m "feat(frontend): add FlowList page for managing flows" +``` + +--- + +## Task 12: Final Integration & Testing + +**Files:** +- Update: `frontend/src/layouts/MainLayout.tsx` (complete updates) + +**Step 1: Complete MainLayout updates** + +Ensure all imports and routes are properly added for FlowList and FlowBuilder. + +**Step 2: Create database migration for flows table** + +Run alembic revision: +```bash +cd services/api-gateway +alembic revision --autogenerate -m "add flows and flow_sessions tables" +``` + +**Step 3: Final commit** + +```bash +git add . +git commit -m "feat: complete Fase 2 - Flow Engine with visual builder" +``` + +--- + +## Summary + +Fase 2 creates: + +1. **Database Models**: Flow, FlowSession tables for storing chatbot flows +2. **Flow API**: CRUD operations for managing flows +3. **Flow Engine Service**: Python service that executes flows +4. **Flow Context**: Variable interpolation system +5. **Node Types**: trigger, message, condition, wait_input, set_variable +6. **Frontend Flow Builder**: Visual drag & drop editor with React Flow +7. **Frontend Flow List**: Management page for flows + +To test: +```bash +docker-compose up -d --build + +# Open http://localhost:3000 +# Go to Flujos → Create flow +# Add trigger + message nodes +# Connect them +# Activate flow +# Send message from WhatsApp to test +```