Files
WhatsAppCentralizado/docs/plans/2026-01-29-fase-2-flow-engine.md
2026-01-29 10:13:58 +00:00

1589 lines
42 KiB
Markdown

# Fase 2: Flow Engine Básico - Plan de Implementación
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Crear el motor de chatbot con Flow Builder visual para automatizar conversaciones de WhatsApp.
**Architecture:** Flow Engine (Python) procesa mensajes entrantes, evalúa triggers, ejecuta nodos y gestiona estado. Frontend usa React Flow para editor visual drag & drop. Los flujos se almacenan como JSON en PostgreSQL.
**Tech Stack:** Python 3.11, FastAPI, React Flow, Zustand, PostgreSQL JSONB, Redis para estado de sesiones.
---
## Task 1: Database Models para Flows
**Files:**
- Create: `services/api-gateway/app/models/flow.py`
- Modify: `services/api-gateway/app/models/__init__.py`
**Step 1: Create flow.py**
```python
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum
from sqlalchemy.dialects.postgresql import UUID, JSONB
import enum
from app.core.database import Base
class TriggerType(str, enum.Enum):
WELCOME = "welcome"
KEYWORD = "keyword"
FALLBACK = "fallback"
EVENT = "event"
MANUAL = "manual"
class Flow(Base):
__tablename__ = "flows"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
trigger_type = Column(SQLEnum(TriggerType), nullable=False)
trigger_value = Column(String(255), nullable=True) # keywords, event name, etc.
nodes = Column(JSONB, default=list) # Array of node definitions
edges = Column(JSONB, default=list) # React Flow edges
variables = Column(JSONB, default=dict) # Flow-level variables
is_active = Column(Boolean, default=False, nullable=False)
version = Column(Integer, default=1, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
class FlowSession(Base):
"""Active flow execution state per conversation"""
__tablename__ = "flow_sessions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
flow_id = Column(UUID(as_uuid=True), nullable=False)
current_node_id = Column(String(100), nullable=True)
variables = Column(JSONB, default=dict) # Session variables
waiting_for_input = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
```
**Step 2: Update models/__init__.py**
```python
from app.models.user import User
from app.models.whatsapp import WhatsAppAccount, Contact, Conversation, Message
from app.models.flow import Flow, FlowSession, TriggerType
__all__ = [
"User",
"WhatsAppAccount", "Contact", "Conversation", "Message",
"Flow", "FlowSession", "TriggerType"
]
```
**Step 3: Commit**
```bash
git add services/api-gateway/app/models/
git commit -m "feat(api-gateway): add Flow and FlowSession database models"
```
---
## Task 2: Flow API Schemas
**Files:**
- Create: `services/api-gateway/app/schemas/flow.py`
- Modify: `services/api-gateway/app/schemas/__init__.py`
**Step 1: Create flow.py**
```python
from pydantic import BaseModel
from typing import Optional, List, Any
from uuid import UUID
from datetime import datetime
from app.models.flow import TriggerType
class NodeData(BaseModel):
label: str
type: str # message, condition, wait_input, buttons, etc.
config: dict = {}
class FlowNode(BaseModel):
id: str
type: str
position: dict # {x: number, y: number}
data: NodeData
class FlowEdge(BaseModel):
id: str
source: str
target: str
sourceHandle: Optional[str] = None
targetHandle: Optional[str] = None
class FlowCreate(BaseModel):
name: str
description: Optional[str] = None
trigger_type: TriggerType
trigger_value: Optional[str] = None
class FlowUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
trigger_type: Optional[TriggerType] = None
trigger_value: Optional[str] = None
nodes: Optional[List[dict]] = None
edges: Optional[List[dict]] = None
variables: Optional[dict] = None
is_active: Optional[bool] = None
class FlowResponse(BaseModel):
id: UUID
name: str
description: Optional[str]
trigger_type: TriggerType
trigger_value: Optional[str]
nodes: List[dict]
edges: List[dict]
variables: dict
is_active: bool
version: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class FlowListResponse(BaseModel):
id: UUID
name: str
trigger_type: TriggerType
is_active: bool
version: int
updated_at: datetime
class Config:
from_attributes = True
```
**Step 2: Update schemas/__init__.py**
```python
from app.schemas import auth, whatsapp, flow
__all__ = ["auth", "whatsapp", "flow"]
```
**Step 3: Commit**
```bash
git add services/api-gateway/app/schemas/
git commit -m "feat(api-gateway): add Flow API schemas"
```
---
## Task 3: Flow API Routes
**Files:**
- Create: `services/api-gateway/app/routers/flows.py`
- Modify: `services/api-gateway/app/routers/__init__.py`
- Modify: `services/api-gateway/app/main.py`
**Step 1: Create flows.py**
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.flow import Flow, TriggerType
from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse
router = APIRouter(prefix="/api/flows", tags=["flows"])
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.get("", response_model=List[FlowListResponse])
def list_flows(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
flows = db.query(Flow).order_by(Flow.updated_at.desc()).all()
return flows
@router.post("", response_model=FlowResponse)
def create_flow(
request: FlowCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
flow = Flow(
name=request.name,
description=request.description,
trigger_type=request.trigger_type,
trigger_value=request.trigger_value,
nodes=[],
edges=[],
variables={},
)
db.add(flow)
db.commit()
db.refresh(flow)
return flow
@router.get("/{flow_id}", response_model=FlowResponse)
def get_flow(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
return flow
@router.put("/{flow_id}", response_model=FlowResponse)
def update_flow(
flow_id: UUID,
request: FlowUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
update_data = request.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(flow, field, value)
flow.version += 1
db.commit()
db.refresh(flow)
return flow
@router.delete("/{flow_id}")
def delete_flow(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
db.delete(flow)
db.commit()
return {"success": True}
@router.post("/{flow_id}/activate")
def activate_flow(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
# Deactivate other flows with same trigger if welcome/fallback
if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]:
db.query(Flow).filter(
Flow.trigger_type == flow.trigger_type,
Flow.id != flow_id
).update({"is_active": False})
flow.is_active = True
db.commit()
return {"success": True}
@router.post("/{flow_id}/deactivate")
def deactivate_flow(
flow_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
flow = db.query(Flow).filter(Flow.id == flow_id).first()
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
flow.is_active = False
db.commit()
return {"success": True}
```
**Step 2: Update routers/__init__.py**
```python
from app.routers import auth, whatsapp, flows
__all__ = ["auth", "whatsapp", "flows"]
```
**Step 3: Update main.py - add import and router**
Add to imports:
```python
from app.routers import auth, whatsapp, flows
```
Add router:
```python
app.include_router(flows.router)
```
**Step 4: Commit**
```bash
git add services/api-gateway/app/routers/ services/api-gateway/app/main.py
git commit -m "feat(api-gateway): add Flow CRUD API routes"
```
---
## Task 4: Flow Engine Service Setup
**Files:**
- Create: `services/flow-engine/requirements.txt`
- Create: `services/flow-engine/Dockerfile`
- Create: `services/flow-engine/app/__init__.py`
- Create: `services/flow-engine/app/config.py`
**Step 1: Create requirements.txt**
```
fastapi==0.115.6
uvicorn[standard]==0.34.0
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
redis==5.2.1
httpx==0.28.1
pydantic==2.10.4
pydantic-settings==2.7.1
```
**Step 2: Create Dockerfile**
```dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
EXPOSE 8001
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"]
```
**Step 3: Create app/__init__.py**
```python
# WhatsApp Centralizado - Flow Engine
```
**Step 4: Create app/config.py**
```python
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql://whatsapp_admin:password@localhost:5432/whatsapp_central"
REDIS_URL: str = "redis://localhost:6379"
API_GATEWAY_URL: str = "http://localhost:8000"
WHATSAPP_CORE_URL: str = "http://localhost:3001"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()
```
**Step 5: Commit**
```bash
git add services/flow-engine/
git commit -m "feat(flow-engine): setup project structure"
```
---
## Task 5: Flow Engine Core
**Files:**
- Create: `services/flow-engine/app/engine.py`
- Create: `services/flow-engine/app/context.py`
**Step 1: Create context.py**
```python
from typing import Any, Optional
from datetime import datetime
import re
class FlowContext:
"""Manages variables and state during flow execution"""
def __init__(
self,
contact: dict,
conversation: dict,
message: dict,
session_vars: dict = None
):
self.contact = contact
self.conversation = conversation
self.message = message
self.variables = session_vars or {}
self._system = {
"date": datetime.now().strftime("%Y-%m-%d"),
"time": datetime.now().strftime("%H:%M"),
"day_of_week": datetime.now().strftime("%A"),
}
def get(self, key: str) -> Any:
"""Get variable by dot notation: contact.name, variables.email"""
parts = key.split(".")
if parts[0] == "contact":
return self._get_nested(self.contact, parts[1:])
elif parts[0] == "conversation":
return self._get_nested(self.conversation, parts[1:])
elif parts[0] == "message":
return self._get_nested(self.message, parts[1:])
elif parts[0] == "system":
return self._get_nested(self._system, parts[1:])
elif parts[0] == "variables":
return self._get_nested(self.variables, parts[1:])
else:
return self.variables.get(key)
def set(self, key: str, value: Any):
"""Set a session variable"""
self.variables[key] = value
def _get_nested(self, obj: dict, keys: list) -> Any:
for key in keys:
if isinstance(obj, dict):
obj = obj.get(key)
else:
return None
return obj
def interpolate(self, text: str) -> str:
"""Replace {{variable}} with actual values"""
pattern = r'\{\{([^}]+)\}\}'
def replace(match):
key = match.group(1).strip()
value = self.get(key)
return str(value) if value is not None else ""
return re.sub(pattern, replace, text)
def to_dict(self) -> dict:
return self.variables.copy()
```
**Step 2: Create engine.py**
```python
from typing import Optional, Tuple
import httpx
from app.config import get_settings
from app.context import FlowContext
settings = get_settings()
class FlowEngine:
"""Executes flow nodes based on incoming messages"""
def __init__(self, db_session):
self.db = db_session
async def process_message(
self,
conversation_id: str,
contact: dict,
conversation: dict,
message: dict,
) -> bool:
"""
Process incoming message through flow engine.
Returns True if handled by a flow, False otherwise.
"""
from app.models import Flow, FlowSession, TriggerType
# Check for active flow session
session = self.db.query(FlowSession).filter(
FlowSession.conversation_id == conversation_id
).first()
if session and session.waiting_for_input:
# Continue existing flow
flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
if flow:
context = FlowContext(contact, conversation, message, session.variables)
await self._execute_from_node(flow, session, context)
return True
# Find matching flow by trigger
flow = self._find_matching_flow(message.get("content", ""))
if flow:
context = FlowContext(contact, conversation, message)
session = FlowSession(
conversation_id=conversation_id,
flow_id=flow.id,
variables={},
)
self.db.add(session)
self.db.commit()
await self._execute_flow(flow, session, context)
return True
return False
def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
from app.models import Flow, TriggerType
message_lower = message_text.lower().strip()
# Check keyword triggers
keyword_flows = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.KEYWORD,
Flow.is_active == True
).all()
for flow in keyword_flows:
keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
if any(kw in message_lower for kw in keywords if kw):
return flow
# Check welcome trigger (first message - simplified)
welcome_flow = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.WELCOME,
Flow.is_active == True
).first()
if welcome_flow:
return welcome_flow
# Fallback
return self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.FALLBACK,
Flow.is_active == True
).first()
async def _execute_flow(self, flow, session, context: FlowContext):
"""Start flow execution from first node"""
nodes = flow.nodes or []
if not nodes:
return
# Find start node (trigger node)
start_node = next(
(n for n in nodes if n.get("data", {}).get("type") == "trigger"),
nodes[0] if nodes else None
)
if start_node:
session.current_node_id = start_node["id"]
await self._execute_from_node(flow, session, context)
async def _execute_from_node(self, flow, session, context: FlowContext):
"""Execute flow starting from current node"""
nodes = {n["id"]: n for n in (flow.nodes or [])}
edges = flow.edges or []
current_id = session.current_node_id
while current_id:
node = nodes.get(current_id)
if not node:
break
node_type = node.get("data", {}).get("type", "")
config = node.get("data", {}).get("config", {})
# Execute node
result = await self._execute_node(node_type, config, context, session)
if result == "wait":
# Node requires input, save state and exit
session.waiting_for_input = True
session.variables = context.to_dict()
self.db.commit()
return
# Find next node
next_edge = self._find_next_edge(edges, current_id, result)
current_id = next_edge["target"] if next_edge else None
session.current_node_id = current_id
session.waiting_for_input = False
# Flow completed
session.variables = context.to_dict()
self.db.commit()
async def _execute_node(
self,
node_type: str,
config: dict,
context: FlowContext,
session
) -> Optional[str]:
"""Execute a single node, return result for routing"""
if node_type == "trigger":
return "default"
elif node_type == "message":
text = context.interpolate(config.get("text", ""))
await self._send_message(session.conversation_id, text)
return "default"
elif node_type == "buttons":
text = context.interpolate(config.get("text", ""))
buttons = config.get("buttons", [])
# For now, send as text with options
button_text = "\n".join([f"{b['label']}" for b in buttons])
await self._send_message(session.conversation_id, f"{text}\n\n{button_text}")
return "wait"
elif node_type == "wait_input":
variable = config.get("variable", "user_input")
context.set(variable, context.message.get("content", ""))
return "default"
elif node_type == "condition":
return self._evaluate_condition(config, context)
elif node_type == "set_variable":
var_name = config.get("variable", "")
var_value = context.interpolate(config.get("value", ""))
context.set(var_name, var_value)
return "default"
return "default"
def _evaluate_condition(self, config: dict, context: FlowContext) -> str:
"""Evaluate condition and return branch name"""
conditions = config.get("conditions", [])
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "equals")
value = cond.get("value", "")
branch = cond.get("branch", "default")
actual = context.get(field) or context.message.get("content", "")
if operator == "equals" and str(actual).lower() == str(value).lower():
return branch
elif operator == "contains" and str(value).lower() in str(actual).lower():
return branch
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
return branch
return "default"
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
"""Find the next edge from source node"""
for edge in edges:
if edge.get("source") == source_id:
if handle and edge.get("sourceHandle") != handle:
continue
return edge
return None
async def _send_message(self, conversation_id: str, text: str):
"""Send message via API Gateway"""
async with httpx.AsyncClient() as client:
try:
await client.post(
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
json={
"conversation_id": conversation_id,
"content": text,
"type": "text"
},
timeout=30
)
except Exception as e:
print(f"Failed to send message: {e}")
```
**Step 3: Commit**
```bash
git add services/flow-engine/app/
git commit -m "feat(flow-engine): add FlowContext and FlowEngine core"
```
---
## Task 6: Flow Engine Main & API
**Files:**
- Create: `services/flow-engine/app/main.py`
- Create: `services/flow-engine/app/models.py`
**Step 1: Create models.py (shared models import)**
```python
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import sessionmaker, declarative_base
import enum
import uuid
from datetime import datetime
from app.config import get_settings
settings = get_settings()
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class TriggerType(str, enum.Enum):
WELCOME = "welcome"
KEYWORD = "keyword"
FALLBACK = "fallback"
EVENT = "event"
MANUAL = "manual"
class Flow(Base):
__tablename__ = "flows"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
trigger_type = Column(SQLEnum(TriggerType), nullable=False)
trigger_value = Column(String(255), nullable=True)
nodes = Column(JSONB, default=list)
edges = Column(JSONB, default=list)
variables = Column(JSONB, default=dict)
is_active = Column(Boolean, default=False, nullable=False)
version = Column(Integer, default=1, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
class FlowSession(Base):
__tablename__ = "flow_sessions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
flow_id = Column(UUID(as_uuid=True), nullable=False)
current_node_id = Column(String(100), nullable=True)
variables = Column(JSONB, default=dict)
waiting_for_input = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
```
**Step 2: Create main.py**
```python
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from typing import Optional
from sqlalchemy.orm import Session
from app.models import get_db
from app.engine import FlowEngine
from app.config import get_settings
settings = get_settings()
app = FastAPI(
title="WhatsApp Centralizado - Flow Engine",
version="1.0.0",
)
class ProcessMessageRequest(BaseModel):
conversation_id: str
contact: dict
conversation: dict
message: dict
class ProcessMessageResponse(BaseModel):
handled: bool
flow_id: Optional[str] = None
@app.get("/health")
def health_check():
return {"status": "ok", "service": "flow-engine"}
@app.post("/process", response_model=ProcessMessageResponse)
async def process_message(
request: ProcessMessageRequest,
db: Session = Depends(get_db),
):
"""Process an incoming message through the flow engine"""
engine = FlowEngine(db)
handled = await engine.process_message(
conversation_id=request.conversation_id,
contact=request.contact,
conversation=request.conversation,
message=request.message,
)
return ProcessMessageResponse(handled=handled)
```
**Step 3: Commit**
```bash
git add services/flow-engine/app/
git commit -m "feat(flow-engine): add main API and database models"
```
---
## Task 7: Update Docker Compose
**Files:**
- Modify: `docker-compose.yml`
**Step 1: Add flow-engine service**
Add after api-gateway service:
```yaml
flow-engine:
build:
context: ./services/flow-engine
dockerfile: Dockerfile
container_name: wac_flow_engine
restart: unless-stopped
environment:
DATABASE_URL: postgresql://${DB_USER:-whatsapp_admin}:${DB_PASSWORD}@postgres:5432/${DB_NAME:-whatsapp_central}
REDIS_URL: redis://redis:6379
API_GATEWAY_URL: http://api-gateway:8000
WHATSAPP_CORE_URL: http://whatsapp-core:3001
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- wac_network
```
**Step 2: Commit**
```bash
git add docker-compose.yml
git commit -m "feat(docker): add flow-engine service"
```
---
## Task 8: API Gateway - Internal Flow Routes
**Files:**
- Modify: `services/api-gateway/app/routers/whatsapp.py`
**Step 1: Add internal endpoint for flow messages**
Add at the end of whatsapp.py:
```python
class FlowSendRequest(BaseModel):
conversation_id: str
content: str
type: str = "text"
@router.post("/internal/flow/send")
async def flow_send_message(
request: FlowSendRequest,
db: Session = Depends(get_db),
):
"""Internal endpoint for flow engine to send messages"""
conversation = db.query(Conversation).filter(
Conversation.id == request.conversation_id
).first()
if not conversation:
raise HTTPException(status_code=404, detail="Conversation not found")
# Create message in DB
message = Message(
conversation_id=conversation.id,
direction=MessageDirection.OUTBOUND,
type=MessageType.TEXT,
content=request.content,
status=MessageStatus.PENDING,
)
db.add(message)
db.commit()
db.refresh(message)
# Send via WhatsApp Core
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
json={
"to": conversation.contact.phone_number,
"type": "text",
"content": {"text": request.content},
},
timeout=30,
)
if response.status_code == 200:
data = response.json()
message.whatsapp_message_id = data.get("messageId")
message.status = MessageStatus.SENT
else:
message.status = MessageStatus.FAILED
except Exception:
message.status = MessageStatus.FAILED
db.commit()
return {"success": True, "message_id": str(message.id)}
```
**Step 2: Add Pydantic import for FlowSendRequest**
Add to imports at top of whatsapp.py:
```python
from pydantic import BaseModel
```
**Step 3: Commit**
```bash
git add services/api-gateway/app/routers/whatsapp.py
git commit -m "feat(api-gateway): add internal flow send message endpoint"
```
---
## Task 9: Integrate Flow Engine with Message Handler
**Files:**
- Modify: `services/api-gateway/app/routers/whatsapp.py`
**Step 1: Update handle_whatsapp_event to call flow engine**
In the `elif event.type == "message":` block, after creating the message, add flow engine call:
```python
# After creating message and before db.commit()
# Call flow engine
try:
async with httpx.AsyncClient() as client:
await client.post(
"http://flow-engine:8001/process",
json={
"conversation_id": str(conversation.id),
"contact": {
"id": str(contact.id),
"phone_number": contact.phone_number,
"name": contact.name,
},
"conversation": {
"id": str(conversation.id),
"status": conversation.status.value,
},
"message": {
"id": str(message.id),
"content": content,
"type": "text",
},
},
timeout=30,
)
except Exception as e:
print(f"Flow engine error: {e}")
```
**Step 2: Commit**
```bash
git add services/api-gateway/app/routers/whatsapp.py
git commit -m "feat(api-gateway): integrate flow engine on message events"
```
---
## Task 10: Frontend - Flow Builder Page Setup
**Files:**
- Modify: `frontend/package.json`
- Create: `frontend/src/pages/FlowBuilder.tsx`
- Modify: `frontend/src/layouts/MainLayout.tsx`
**Step 1: Add reactflow dependency to package.json**
Add to dependencies:
```json
"reactflow": "^11.11.4"
```
**Step 2: Create FlowBuilder.tsx**
```tsx
import { useState, useCallback } from 'react';
import { useParams, useNavigate } from 'react-router-dom';
import ReactFlow, {
Node,
Edge,
Controls,
Background,
useNodesState,
useEdgesState,
addEdge,
Connection,
NodeTypes,
} from 'reactflow';
import 'reactflow/dist/style.css';
import { Button, message, Space } from 'antd';
import { SaveOutlined, ArrowLeftOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';
// Custom node components (simplified for now)
const TriggerNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
<strong>🚀 {data.label}</strong>
</div>
);
const MessageNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #1890ff', borderRadius: 8, background: '#e6f7ff', minWidth: 150 }}>
<strong>💬 Mensaje</strong>
<div style={{ fontSize: 12, marginTop: 4 }}>{data.config?.text?.slice(0, 50) || 'Sin texto'}</div>
</div>
);
const ConditionNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #faad14', borderRadius: 8, background: '#fffbe6' }}>
<strong> Condición</strong>
</div>
);
const WaitInputNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
<strong> Esperar Input</strong>
</div>
);
const nodeTypes: NodeTypes = {
trigger: TriggerNode,
message: MessageNode,
condition: ConditionNode,
wait_input: WaitInputNode,
};
interface Flow {
id: string;
name: string;
nodes: Node[];
edges: Edge[];
}
export default function FlowBuilder() {
const { flowId } = useParams<{ flowId: string }>();
const navigate = useNavigate();
const queryClient = useQueryClient();
const [nodes, setNodes, onNodesChange] = useNodesState([]);
const [edges, setEdges, onEdgesChange] = useEdgesState([]);
const { data: flow, isLoading } = useQuery({
queryKey: ['flow', flowId],
queryFn: async () => {
const data = await apiClient.get<Flow>(`/api/flows/${flowId}`);
setNodes(data.nodes || []);
setEdges(data.edges || []);
return data;
},
enabled: !!flowId,
});
const saveMutation = useMutation({
mutationFn: async () => {
await apiClient.put(`/api/flows/${flowId}`, {
nodes,
edges,
});
},
onSuccess: () => {
message.success('Flujo guardado');
queryClient.invalidateQueries({ queryKey: ['flow', flowId] });
},
onError: () => {
message.error('Error al guardar');
},
});
const onConnect = useCallback(
(params: Connection) => setEdges((eds) => addEdge(params, eds)),
[setEdges]
);
const addNode = (type: string) => {
const newNode: Node = {
id: `${type}-${Date.now()}`,
type,
position: { x: 250, y: nodes.length * 100 + 50 },
data: {
label: type === 'trigger' ? 'Inicio' : type,
type,
config: type === 'message' ? { text: 'Hola {{contact.name}}!' } : {}
},
};
setNodes((nds) => [...nds, newNode]);
};
if (isLoading) {
return <div>Cargando...</div>;
}
return (
<div style={{ height: 'calc(100vh - 140px)' }}>
<div style={{ marginBottom: 16, display: 'flex', justifyContent: 'space-between' }}>
<Space>
<Button icon={<ArrowLeftOutlined />} onClick={() => navigate('/flows')}>
Volver
</Button>
<span style={{ fontSize: 18, fontWeight: 'bold' }}>{flow?.name}</span>
</Space>
<Space>
<Button onClick={() => addNode('trigger')}>+ Trigger</Button>
<Button onClick={() => addNode('message')}>+ Mensaje</Button>
<Button onClick={() => addNode('condition')}>+ Condición</Button>
<Button onClick={() => addNode('wait_input')}>+ Esperar Input</Button>
<Button
type="primary"
icon={<SaveOutlined />}
onClick={() => saveMutation.mutate()}
loading={saveMutation.isPending}
style={{ background: '#25D366' }}
>
Guardar
</Button>
</Space>
</div>
<div style={{ height: 'calc(100% - 50px)', border: '1px solid #d9d9d9', borderRadius: 8 }}>
<ReactFlow
nodes={nodes}
edges={edges}
onNodesChange={onNodesChange}
onEdgesChange={onEdgesChange}
onConnect={onConnect}
nodeTypes={nodeTypes}
fitView
>
<Controls />
<Background />
</ReactFlow>
</div>
</div>
);
}
```
**Step 3: Update MainLayout.tsx**
Add import and route:
```tsx
import FlowBuilder from '../pages/FlowBuilder';
```
Add menu item:
```tsx
{
key: '/flows',
icon: <BranchesOutlined />,
label: 'Flujos',
},
```
Add route in Routes:
```tsx
<Route path="/flows" element={<FlowList />} />
<Route path="/flows/:flowId" element={<FlowBuilder />} />
```
Add import for BranchesOutlined:
```tsx
import { BranchesOutlined } from '@ant-design/icons';
```
**Step 4: Commit**
```bash
git add frontend/
git commit -m "feat(frontend): add FlowBuilder page with React Flow"
```
---
## Task 11: Frontend - Flow List Page
**Files:**
- Create: `frontend/src/pages/FlowList.tsx`
**Step 1: Create FlowList.tsx**
```tsx
import { useState } from 'react';
import { useNavigate } from 'react-router-dom';
import {
Card,
Button,
Table,
Tag,
Modal,
Form,
Input,
Select,
message,
Space,
Typography,
} from 'antd';
import {
PlusOutlined,
EditOutlined,
DeleteOutlined,
PlayCircleOutlined,
PauseCircleOutlined,
} from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';
const { Title } = Typography;
interface Flow {
id: string;
name: string;
trigger_type: string;
is_active: boolean;
version: number;
updated_at: string;
}
const triggerLabels: Record<string, string> = {
welcome: 'Bienvenida',
keyword: 'Palabra clave',
fallback: 'Fallback',
event: 'Evento',
manual: 'Manual',
};
export default function FlowList() {
const [isModalOpen, setIsModalOpen] = useState(false);
const [form] = Form.useForm();
const navigate = useNavigate();
const queryClient = useQueryClient();
const { data: flows, isLoading } = useQuery({
queryKey: ['flows'],
queryFn: () => apiClient.get<Flow[]>('/api/flows'),
});
const createMutation = useMutation({
mutationFn: (data: any) => apiClient.post<Flow>('/api/flows', data),
onSuccess: (data) => {
message.success('Flujo creado');
setIsModalOpen(false);
form.resetFields();
queryClient.invalidateQueries({ queryKey: ['flows'] });
navigate(`/flows/${data.id}`);
},
onError: () => {
message.error('Error al crear flujo');
},
});
const deleteMutation = useMutation({
mutationFn: (id: string) => apiClient.delete(`/api/flows/${id}`),
onSuccess: () => {
message.success('Flujo eliminado');
queryClient.invalidateQueries({ queryKey: ['flows'] });
},
});
const toggleActiveMutation = useMutation({
mutationFn: async ({ id, active }: { id: string; active: boolean }) => {
const endpoint = active ? 'activate' : 'deactivate';
await apiClient.post(`/api/flows/${id}/${endpoint}`, {});
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ['flows'] });
message.success('Estado actualizado');
},
});
const columns = [
{
title: 'Nombre',
dataIndex: 'name',
key: 'name',
},
{
title: 'Trigger',
dataIndex: 'trigger_type',
key: 'trigger_type',
render: (type: string) => triggerLabels[type] || type,
},
{
title: 'Estado',
dataIndex: 'is_active',
key: 'is_active',
render: (active: boolean) => (
<Tag color={active ? 'green' : 'default'}>
{active ? 'Activo' : 'Inactivo'}
</Tag>
),
},
{
title: 'Versión',
dataIndex: 'version',
key: 'version',
},
{
title: 'Acciones',
key: 'actions',
render: (_: any, record: Flow) => (
<Space>
<Button
icon={<EditOutlined />}
onClick={() => navigate(`/flows/${record.id}`)}
>
Editar
</Button>
<Button
icon={record.is_active ? <PauseCircleOutlined /> : <PlayCircleOutlined />}
onClick={() =>
toggleActiveMutation.mutate({ id: record.id, active: !record.is_active })
}
>
{record.is_active ? 'Desactivar' : 'Activar'}
</Button>
<Button
danger
icon={<DeleteOutlined />}
onClick={() => {
Modal.confirm({
title: '¿Eliminar flujo?',
onOk: () => deleteMutation.mutate(record.id),
});
}}
/>
</Space>
),
},
];
return (
<div>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
<Title level={4} style={{ margin: 0 }}>Flujos de Chatbot</Title>
<Button
type="primary"
icon={<PlusOutlined />}
onClick={() => setIsModalOpen(true)}
style={{ background: '#25D366' }}
>
Crear Flujo
</Button>
</div>
<Card>
<Table
dataSource={flows}
columns={columns}
rowKey="id"
loading={isLoading}
pagination={false}
/>
</Card>
<Modal
title="Crear nuevo flujo"
open={isModalOpen}
onCancel={() => {
setIsModalOpen(false);
form.resetFields();
}}
footer={null}
>
<Form
form={form}
layout="vertical"
onFinish={(values) => createMutation.mutate(values)}
>
<Form.Item
name="name"
label="Nombre"
rules={[{ required: true, message: 'Ingresa un nombre' }]}
>
<Input placeholder="Ej: Bienvenida Principal" />
</Form.Item>
<Form.Item
name="trigger_type"
label="Tipo de Trigger"
rules={[{ required: true, message: 'Selecciona un trigger' }]}
>
<Select placeholder="Selecciona...">
<Select.Option value="welcome">Bienvenida (primer mensaje)</Select.Option>
<Select.Option value="keyword">Palabra clave</Select.Option>
<Select.Option value="fallback">Fallback (sin coincidencia)</Select.Option>
</Select>
</Form.Item>
<Form.Item
noStyle
shouldUpdate={(prev, curr) => prev.trigger_type !== curr.trigger_type}
>
{({ getFieldValue }) =>
getFieldValue('trigger_type') === 'keyword' && (
<Form.Item
name="trigger_value"
label="Palabras clave"
rules={[{ required: true }]}
>
<Input placeholder="hola, menu, ayuda (separadas por coma)" />
</Form.Item>
)
}
</Form.Item>
<Form.Item>
<Button
type="primary"
htmlType="submit"
loading={createMutation.isPending}
block
>
Crear
</Button>
</Form.Item>
</Form>
</Modal>
</div>
);
}
```
**Step 2: Commit**
```bash
git add frontend/src/pages/FlowList.tsx
git commit -m "feat(frontend): add FlowList page for managing flows"
```
---
## Task 12: Final Integration & Testing
**Files:**
- Update: `frontend/src/layouts/MainLayout.tsx` (complete updates)
**Step 1: Complete MainLayout updates**
Ensure all imports and routes are properly added for FlowList and FlowBuilder.
**Step 2: Create database migration for flows table**
Run alembic revision:
```bash
cd services/api-gateway
alembic revision --autogenerate -m "add flows and flow_sessions tables"
```
**Step 3: Final commit**
```bash
git add .
git commit -m "feat: complete Fase 2 - Flow Engine with visual builder"
```
---
## Summary
Fase 2 creates:
1. **Database Models**: Flow, FlowSession tables for storing chatbot flows
2. **Flow API**: CRUD operations for managing flows
3. **Flow Engine Service**: Python service that executes flows
4. **Flow Context**: Variable interpolation system
5. **Node Types**: trigger, message, condition, wait_input, set_variable
6. **Frontend Flow Builder**: Visual drag & drop editor with React Flow
7. **Frontend Flow List**: Management page for flows
To test:
```bash
docker-compose up -d --build
# Open http://localhost:3000
# Go to Flujos → Create flow
# Add trigger + message nodes
# Connect them
# Activate flow
# Send message from WhatsApp to test
```