1589 lines
42 KiB
Markdown
1589 lines
42 KiB
Markdown
# Fase 2: Flow Engine Básico - Plan de Implementación
|
|
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
|
|
|
**Goal:** Crear el motor de chatbot con Flow Builder visual para automatizar conversaciones de WhatsApp.
|
|
|
|
**Architecture:** Flow Engine (Python) procesa mensajes entrantes, evalúa triggers, ejecuta nodos y gestiona estado. Frontend usa React Flow para editor visual drag & drop. Los flujos se almacenan como JSON en PostgreSQL.
|
|
|
|
**Tech Stack:** Python 3.11, FastAPI, React Flow, Zustand, PostgreSQL JSONB, Redis para estado de sesiones.
|
|
|
|
---
|
|
|
|
## Task 1: Database Models para Flows
|
|
|
|
**Files:**
|
|
- Create: `services/api-gateway/app/models/flow.py`
|
|
- Modify: `services/api-gateway/app/models/__init__.py`
|
|
|
|
**Step 1: Create flow.py**
|
|
|
|
```python
|
|
import uuid
|
|
from datetime import datetime
|
|
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
import enum
|
|
from app.core.database import Base
|
|
|
|
|
|
class TriggerType(str, enum.Enum):
|
|
WELCOME = "welcome"
|
|
KEYWORD = "keyword"
|
|
FALLBACK = "fallback"
|
|
EVENT = "event"
|
|
MANUAL = "manual"
|
|
|
|
|
|
class Flow(Base):
|
|
__tablename__ = "flows"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
name = Column(String(100), nullable=False)
|
|
description = Column(Text, nullable=True)
|
|
trigger_type = Column(SQLEnum(TriggerType), nullable=False)
|
|
trigger_value = Column(String(255), nullable=True) # keywords, event name, etc.
|
|
nodes = Column(JSONB, default=list) # Array of node definitions
|
|
edges = Column(JSONB, default=list) # React Flow edges
|
|
variables = Column(JSONB, default=dict) # Flow-level variables
|
|
is_active = Column(Boolean, default=False, nullable=False)
|
|
version = Column(Integer, default=1, nullable=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
|
|
|
|
class FlowSession(Base):
|
|
"""Active flow execution state per conversation"""
|
|
__tablename__ = "flow_sessions"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
|
|
flow_id = Column(UUID(as_uuid=True), nullable=False)
|
|
current_node_id = Column(String(100), nullable=True)
|
|
variables = Column(JSONB, default=dict) # Session variables
|
|
waiting_for_input = Column(Boolean, default=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
|
|
```
|
|
|
|
**Step 2: Update models/__init__.py**
|
|
|
|
```python
|
|
from app.models.user import User
|
|
from app.models.whatsapp import WhatsAppAccount, Contact, Conversation, Message
|
|
from app.models.flow import Flow, FlowSession, TriggerType
|
|
|
|
__all__ = [
|
|
"User",
|
|
"WhatsAppAccount", "Contact", "Conversation", "Message",
|
|
"Flow", "FlowSession", "TriggerType"
|
|
]
|
|
```
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add services/api-gateway/app/models/
|
|
git commit -m "feat(api-gateway): add Flow and FlowSession database models"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 2: Flow API Schemas
|
|
|
|
**Files:**
|
|
- Create: `services/api-gateway/app/schemas/flow.py`
|
|
- Modify: `services/api-gateway/app/schemas/__init__.py`
|
|
|
|
**Step 1: Create flow.py**
|
|
|
|
```python
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Any
|
|
from uuid import UUID
|
|
from datetime import datetime
|
|
from app.models.flow import TriggerType
|
|
|
|
|
|
class NodeData(BaseModel):
|
|
label: str
|
|
type: str # message, condition, wait_input, buttons, etc.
|
|
config: dict = {}
|
|
|
|
|
|
class FlowNode(BaseModel):
|
|
id: str
|
|
type: str
|
|
position: dict # {x: number, y: number}
|
|
data: NodeData
|
|
|
|
|
|
class FlowEdge(BaseModel):
|
|
id: str
|
|
source: str
|
|
target: str
|
|
sourceHandle: Optional[str] = None
|
|
targetHandle: Optional[str] = None
|
|
|
|
|
|
class FlowCreate(BaseModel):
|
|
name: str
|
|
description: Optional[str] = None
|
|
trigger_type: TriggerType
|
|
trigger_value: Optional[str] = None
|
|
|
|
|
|
class FlowUpdate(BaseModel):
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
trigger_type: Optional[TriggerType] = None
|
|
trigger_value: Optional[str] = None
|
|
nodes: Optional[List[dict]] = None
|
|
edges: Optional[List[dict]] = None
|
|
variables: Optional[dict] = None
|
|
is_active: Optional[bool] = None
|
|
|
|
|
|
class FlowResponse(BaseModel):
|
|
id: UUID
|
|
name: str
|
|
description: Optional[str]
|
|
trigger_type: TriggerType
|
|
trigger_value: Optional[str]
|
|
nodes: List[dict]
|
|
edges: List[dict]
|
|
variables: dict
|
|
is_active: bool
|
|
version: int
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class FlowListResponse(BaseModel):
|
|
id: UUID
|
|
name: str
|
|
trigger_type: TriggerType
|
|
is_active: bool
|
|
version: int
|
|
updated_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
```
|
|
|
|
**Step 2: Update schemas/__init__.py**
|
|
|
|
```python
|
|
from app.schemas import auth, whatsapp, flow
|
|
|
|
__all__ = ["auth", "whatsapp", "flow"]
|
|
```
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add services/api-gateway/app/schemas/
|
|
git commit -m "feat(api-gateway): add Flow API schemas"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 3: Flow API Routes
|
|
|
|
**Files:**
|
|
- Create: `services/api-gateway/app/routers/flows.py`
|
|
- Modify: `services/api-gateway/app/routers/__init__.py`
|
|
- Modify: `services/api-gateway/app/main.py`
|
|
|
|
**Step 1: Create flows.py**
|
|
|
|
```python
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from uuid import UUID
|
|
from app.core.database import get_db
|
|
from app.core.security import get_current_user
|
|
from app.models.user import User, UserRole
|
|
from app.models.flow import Flow, TriggerType
|
|
from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse
|
|
|
|
router = APIRouter(prefix="/api/flows", tags=["flows"])
|
|
|
|
|
|
def require_admin(current_user: User = Depends(get_current_user)):
|
|
if current_user.role != UserRole.ADMIN:
|
|
raise HTTPException(status_code=403, detail="Admin required")
|
|
return current_user
|
|
|
|
|
|
@router.get("", response_model=List[FlowListResponse])
|
|
def list_flows(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
flows = db.query(Flow).order_by(Flow.updated_at.desc()).all()
|
|
return flows
|
|
|
|
|
|
@router.post("", response_model=FlowResponse)
|
|
def create_flow(
|
|
request: FlowCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin),
|
|
):
|
|
flow = Flow(
|
|
name=request.name,
|
|
description=request.description,
|
|
trigger_type=request.trigger_type,
|
|
trigger_value=request.trigger_value,
|
|
nodes=[],
|
|
edges=[],
|
|
variables={},
|
|
)
|
|
db.add(flow)
|
|
db.commit()
|
|
db.refresh(flow)
|
|
return flow
|
|
|
|
|
|
@router.get("/{flow_id}", response_model=FlowResponse)
|
|
def get_flow(
|
|
flow_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
flow = db.query(Flow).filter(Flow.id == flow_id).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Flow not found")
|
|
return flow
|
|
|
|
|
|
@router.put("/{flow_id}", response_model=FlowResponse)
|
|
def update_flow(
|
|
flow_id: UUID,
|
|
request: FlowUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin),
|
|
):
|
|
flow = db.query(Flow).filter(Flow.id == flow_id).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Flow not found")
|
|
|
|
update_data = request.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(flow, field, value)
|
|
|
|
flow.version += 1
|
|
db.commit()
|
|
db.refresh(flow)
|
|
return flow
|
|
|
|
|
|
@router.delete("/{flow_id}")
|
|
def delete_flow(
|
|
flow_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin),
|
|
):
|
|
flow = db.query(Flow).filter(Flow.id == flow_id).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Flow not found")
|
|
|
|
db.delete(flow)
|
|
db.commit()
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/{flow_id}/activate")
|
|
def activate_flow(
|
|
flow_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin),
|
|
):
|
|
flow = db.query(Flow).filter(Flow.id == flow_id).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Flow not found")
|
|
|
|
# Deactivate other flows with same trigger if welcome/fallback
|
|
if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]:
|
|
db.query(Flow).filter(
|
|
Flow.trigger_type == flow.trigger_type,
|
|
Flow.id != flow_id
|
|
).update({"is_active": False})
|
|
|
|
flow.is_active = True
|
|
db.commit()
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/{flow_id}/deactivate")
|
|
def deactivate_flow(
|
|
flow_id: UUID,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(require_admin),
|
|
):
|
|
flow = db.query(Flow).filter(Flow.id == flow_id).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Flow not found")
|
|
|
|
flow.is_active = False
|
|
db.commit()
|
|
return {"success": True}
|
|
```
|
|
|
|
**Step 2: Update routers/__init__.py**
|
|
|
|
```python
|
|
from app.routers import auth, whatsapp, flows
|
|
|
|
__all__ = ["auth", "whatsapp", "flows"]
|
|
```
|
|
|
|
**Step 3: Update main.py - add import and router**
|
|
|
|
Add to imports:
|
|
```python
|
|
from app.routers import auth, whatsapp, flows
|
|
```
|
|
|
|
Add router:
|
|
```python
|
|
app.include_router(flows.router)
|
|
```
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add services/api-gateway/app/routers/ services/api-gateway/app/main.py
|
|
git commit -m "feat(api-gateway): add Flow CRUD API routes"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 4: Flow Engine Service Setup
|
|
|
|
**Files:**
|
|
- Create: `services/flow-engine/requirements.txt`
|
|
- Create: `services/flow-engine/Dockerfile`
|
|
- Create: `services/flow-engine/app/__init__.py`
|
|
- Create: `services/flow-engine/app/config.py`
|
|
|
|
**Step 1: Create requirements.txt**
|
|
|
|
```
|
|
fastapi==0.115.6
|
|
uvicorn[standard]==0.34.0
|
|
sqlalchemy==2.0.36
|
|
psycopg2-binary==2.9.10
|
|
redis==5.2.1
|
|
httpx==0.28.1
|
|
pydantic==2.10.4
|
|
pydantic-settings==2.7.1
|
|
```
|
|
|
|
**Step 2: Create Dockerfile**
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim
|
|
|
|
WORKDIR /app
|
|
|
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
|
gcc \
|
|
libpq-dev \
|
|
&& rm -rf /var/lib/apt/lists/*
|
|
|
|
COPY requirements.txt .
|
|
RUN pip install --no-cache-dir -r requirements.txt
|
|
|
|
COPY app ./app
|
|
|
|
EXPOSE 8001
|
|
|
|
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"]
|
|
```
|
|
|
|
**Step 3: Create app/__init__.py**
|
|
|
|
```python
|
|
# WhatsApp Centralizado - Flow Engine
|
|
```
|
|
|
|
**Step 4: Create app/config.py**
|
|
|
|
```python
|
|
from pydantic_settings import BaseSettings
|
|
from functools import lru_cache
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
DATABASE_URL: str = "postgresql://whatsapp_admin:password@localhost:5432/whatsapp_central"
|
|
REDIS_URL: str = "redis://localhost:6379"
|
|
API_GATEWAY_URL: str = "http://localhost:8000"
|
|
WHATSAPP_CORE_URL: str = "http://localhost:3001"
|
|
|
|
class Config:
|
|
env_file = ".env"
|
|
|
|
|
|
@lru_cache
|
|
def get_settings() -> Settings:
|
|
return Settings()
|
|
```
|
|
|
|
**Step 5: Commit**
|
|
|
|
```bash
|
|
git add services/flow-engine/
|
|
git commit -m "feat(flow-engine): setup project structure"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 5: Flow Engine Core
|
|
|
|
**Files:**
|
|
- Create: `services/flow-engine/app/engine.py`
|
|
- Create: `services/flow-engine/app/context.py`
|
|
|
|
**Step 1: Create context.py**
|
|
|
|
```python
|
|
from typing import Any, Optional
|
|
from datetime import datetime
|
|
import re
|
|
|
|
|
|
class FlowContext:
|
|
"""Manages variables and state during flow execution"""
|
|
|
|
def __init__(
|
|
self,
|
|
contact: dict,
|
|
conversation: dict,
|
|
message: dict,
|
|
session_vars: dict = None
|
|
):
|
|
self.contact = contact
|
|
self.conversation = conversation
|
|
self.message = message
|
|
self.variables = session_vars or {}
|
|
self._system = {
|
|
"date": datetime.now().strftime("%Y-%m-%d"),
|
|
"time": datetime.now().strftime("%H:%M"),
|
|
"day_of_week": datetime.now().strftime("%A"),
|
|
}
|
|
|
|
def get(self, key: str) -> Any:
|
|
"""Get variable by dot notation: contact.name, variables.email"""
|
|
parts = key.split(".")
|
|
|
|
if parts[0] == "contact":
|
|
return self._get_nested(self.contact, parts[1:])
|
|
elif parts[0] == "conversation":
|
|
return self._get_nested(self.conversation, parts[1:])
|
|
elif parts[0] == "message":
|
|
return self._get_nested(self.message, parts[1:])
|
|
elif parts[0] == "system":
|
|
return self._get_nested(self._system, parts[1:])
|
|
elif parts[0] == "variables":
|
|
return self._get_nested(self.variables, parts[1:])
|
|
else:
|
|
return self.variables.get(key)
|
|
|
|
def set(self, key: str, value: Any):
|
|
"""Set a session variable"""
|
|
self.variables[key] = value
|
|
|
|
def _get_nested(self, obj: dict, keys: list) -> Any:
|
|
for key in keys:
|
|
if isinstance(obj, dict):
|
|
obj = obj.get(key)
|
|
else:
|
|
return None
|
|
return obj
|
|
|
|
def interpolate(self, text: str) -> str:
|
|
"""Replace {{variable}} with actual values"""
|
|
pattern = r'\{\{([^}]+)\}\}'
|
|
|
|
def replace(match):
|
|
key = match.group(1).strip()
|
|
value = self.get(key)
|
|
return str(value) if value is not None else ""
|
|
|
|
return re.sub(pattern, replace, text)
|
|
|
|
def to_dict(self) -> dict:
|
|
return self.variables.copy()
|
|
```
|
|
|
|
**Step 2: Create engine.py**
|
|
|
|
```python
|
|
from typing import Optional, Tuple
|
|
import httpx
|
|
from app.config import get_settings
|
|
from app.context import FlowContext
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
class FlowEngine:
|
|
"""Executes flow nodes based on incoming messages"""
|
|
|
|
def __init__(self, db_session):
|
|
self.db = db_session
|
|
|
|
async def process_message(
|
|
self,
|
|
conversation_id: str,
|
|
contact: dict,
|
|
conversation: dict,
|
|
message: dict,
|
|
) -> bool:
|
|
"""
|
|
Process incoming message through flow engine.
|
|
Returns True if handled by a flow, False otherwise.
|
|
"""
|
|
from app.models import Flow, FlowSession, TriggerType
|
|
|
|
# Check for active flow session
|
|
session = self.db.query(FlowSession).filter(
|
|
FlowSession.conversation_id == conversation_id
|
|
).first()
|
|
|
|
if session and session.waiting_for_input:
|
|
# Continue existing flow
|
|
flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
|
|
if flow:
|
|
context = FlowContext(contact, conversation, message, session.variables)
|
|
await self._execute_from_node(flow, session, context)
|
|
return True
|
|
|
|
# Find matching flow by trigger
|
|
flow = self._find_matching_flow(message.get("content", ""))
|
|
|
|
if flow:
|
|
context = FlowContext(contact, conversation, message)
|
|
session = FlowSession(
|
|
conversation_id=conversation_id,
|
|
flow_id=flow.id,
|
|
variables={},
|
|
)
|
|
self.db.add(session)
|
|
self.db.commit()
|
|
|
|
await self._execute_flow(flow, session, context)
|
|
return True
|
|
|
|
return False
|
|
|
|
def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
|
|
from app.models import Flow, TriggerType
|
|
|
|
message_lower = message_text.lower().strip()
|
|
|
|
# Check keyword triggers
|
|
keyword_flows = self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.KEYWORD,
|
|
Flow.is_active == True
|
|
).all()
|
|
|
|
for flow in keyword_flows:
|
|
keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
|
|
if any(kw in message_lower for kw in keywords if kw):
|
|
return flow
|
|
|
|
# Check welcome trigger (first message - simplified)
|
|
welcome_flow = self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.WELCOME,
|
|
Flow.is_active == True
|
|
).first()
|
|
|
|
if welcome_flow:
|
|
return welcome_flow
|
|
|
|
# Fallback
|
|
return self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.FALLBACK,
|
|
Flow.is_active == True
|
|
).first()
|
|
|
|
async def _execute_flow(self, flow, session, context: FlowContext):
|
|
"""Start flow execution from first node"""
|
|
nodes = flow.nodes or []
|
|
if not nodes:
|
|
return
|
|
|
|
# Find start node (trigger node)
|
|
start_node = next(
|
|
(n for n in nodes if n.get("data", {}).get("type") == "trigger"),
|
|
nodes[0] if nodes else None
|
|
)
|
|
|
|
if start_node:
|
|
session.current_node_id = start_node["id"]
|
|
await self._execute_from_node(flow, session, context)
|
|
|
|
async def _execute_from_node(self, flow, session, context: FlowContext):
|
|
"""Execute flow starting from current node"""
|
|
nodes = {n["id"]: n for n in (flow.nodes or [])}
|
|
edges = flow.edges or []
|
|
|
|
current_id = session.current_node_id
|
|
|
|
while current_id:
|
|
node = nodes.get(current_id)
|
|
if not node:
|
|
break
|
|
|
|
node_type = node.get("data", {}).get("type", "")
|
|
config = node.get("data", {}).get("config", {})
|
|
|
|
# Execute node
|
|
result = await self._execute_node(node_type, config, context, session)
|
|
|
|
if result == "wait":
|
|
# Node requires input, save state and exit
|
|
session.waiting_for_input = True
|
|
session.variables = context.to_dict()
|
|
self.db.commit()
|
|
return
|
|
|
|
# Find next node
|
|
next_edge = self._find_next_edge(edges, current_id, result)
|
|
current_id = next_edge["target"] if next_edge else None
|
|
session.current_node_id = current_id
|
|
session.waiting_for_input = False
|
|
|
|
# Flow completed
|
|
session.variables = context.to_dict()
|
|
self.db.commit()
|
|
|
|
async def _execute_node(
|
|
self,
|
|
node_type: str,
|
|
config: dict,
|
|
context: FlowContext,
|
|
session
|
|
) -> Optional[str]:
|
|
"""Execute a single node, return result for routing"""
|
|
|
|
if node_type == "trigger":
|
|
return "default"
|
|
|
|
elif node_type == "message":
|
|
text = context.interpolate(config.get("text", ""))
|
|
await self._send_message(session.conversation_id, text)
|
|
return "default"
|
|
|
|
elif node_type == "buttons":
|
|
text = context.interpolate(config.get("text", ""))
|
|
buttons = config.get("buttons", [])
|
|
# For now, send as text with options
|
|
button_text = "\n".join([f"• {b['label']}" for b in buttons])
|
|
await self._send_message(session.conversation_id, f"{text}\n\n{button_text}")
|
|
return "wait"
|
|
|
|
elif node_type == "wait_input":
|
|
variable = config.get("variable", "user_input")
|
|
context.set(variable, context.message.get("content", ""))
|
|
return "default"
|
|
|
|
elif node_type == "condition":
|
|
return self._evaluate_condition(config, context)
|
|
|
|
elif node_type == "set_variable":
|
|
var_name = config.get("variable", "")
|
|
var_value = context.interpolate(config.get("value", ""))
|
|
context.set(var_name, var_value)
|
|
return "default"
|
|
|
|
return "default"
|
|
|
|
def _evaluate_condition(self, config: dict, context: FlowContext) -> str:
|
|
"""Evaluate condition and return branch name"""
|
|
conditions = config.get("conditions", [])
|
|
|
|
for cond in conditions:
|
|
field = cond.get("field", "")
|
|
operator = cond.get("operator", "equals")
|
|
value = cond.get("value", "")
|
|
branch = cond.get("branch", "default")
|
|
|
|
actual = context.get(field) or context.message.get("content", "")
|
|
|
|
if operator == "equals" and str(actual).lower() == str(value).lower():
|
|
return branch
|
|
elif operator == "contains" and str(value).lower() in str(actual).lower():
|
|
return branch
|
|
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
|
|
return branch
|
|
|
|
return "default"
|
|
|
|
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
|
|
"""Find the next edge from source node"""
|
|
for edge in edges:
|
|
if edge.get("source") == source_id:
|
|
if handle and edge.get("sourceHandle") != handle:
|
|
continue
|
|
return edge
|
|
return None
|
|
|
|
async def _send_message(self, conversation_id: str, text: str):
|
|
"""Send message via API Gateway"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
await client.post(
|
|
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
|
|
json={
|
|
"conversation_id": conversation_id,
|
|
"content": text,
|
|
"type": "text"
|
|
},
|
|
timeout=30
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to send message: {e}")
|
|
```
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add services/flow-engine/app/
|
|
git commit -m "feat(flow-engine): add FlowContext and FlowEngine core"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 6: Flow Engine Main & API
|
|
|
|
**Files:**
|
|
- Create: `services/flow-engine/app/main.py`
|
|
- Create: `services/flow-engine/app/models.py`
|
|
|
|
**Step 1: Create models.py (shared models import)**
|
|
|
|
```python
|
|
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine
|
|
from sqlalchemy.dialects.postgresql import UUID, JSONB
|
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
import enum
|
|
import uuid
|
|
from datetime import datetime
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base = declarative_base()
|
|
|
|
|
|
class TriggerType(str, enum.Enum):
|
|
WELCOME = "welcome"
|
|
KEYWORD = "keyword"
|
|
FALLBACK = "fallback"
|
|
EVENT = "event"
|
|
MANUAL = "manual"
|
|
|
|
|
|
class Flow(Base):
|
|
__tablename__ = "flows"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
name = Column(String(100), nullable=False)
|
|
description = Column(Text, nullable=True)
|
|
trigger_type = Column(SQLEnum(TriggerType), nullable=False)
|
|
trigger_value = Column(String(255), nullable=True)
|
|
nodes = Column(JSONB, default=list)
|
|
edges = Column(JSONB, default=list)
|
|
variables = Column(JSONB, default=dict)
|
|
is_active = Column(Boolean, default=False, nullable=False)
|
|
version = Column(Integer, default=1, nullable=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
|
|
class FlowSession(Base):
|
|
__tablename__ = "flow_sessions"
|
|
|
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
|
|
flow_id = Column(UUID(as_uuid=True), nullable=False)
|
|
current_node_id = Column(String(100), nullable=True)
|
|
variables = Column(JSONB, default=dict)
|
|
waiting_for_input = Column(Boolean, default=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
```
|
|
|
|
**Step 2: Create main.py**
|
|
|
|
```python
|
|
from fastapi import FastAPI, Depends
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
from sqlalchemy.orm import Session
|
|
from app.models import get_db
|
|
from app.engine import FlowEngine
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
app = FastAPI(
|
|
title="WhatsApp Centralizado - Flow Engine",
|
|
version="1.0.0",
|
|
)
|
|
|
|
|
|
class ProcessMessageRequest(BaseModel):
|
|
conversation_id: str
|
|
contact: dict
|
|
conversation: dict
|
|
message: dict
|
|
|
|
|
|
class ProcessMessageResponse(BaseModel):
|
|
handled: bool
|
|
flow_id: Optional[str] = None
|
|
|
|
|
|
@app.get("/health")
|
|
def health_check():
|
|
return {"status": "ok", "service": "flow-engine"}
|
|
|
|
|
|
@app.post("/process", response_model=ProcessMessageResponse)
|
|
async def process_message(
|
|
request: ProcessMessageRequest,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Process an incoming message through the flow engine"""
|
|
engine = FlowEngine(db)
|
|
|
|
handled = await engine.process_message(
|
|
conversation_id=request.conversation_id,
|
|
contact=request.contact,
|
|
conversation=request.conversation,
|
|
message=request.message,
|
|
)
|
|
|
|
return ProcessMessageResponse(handled=handled)
|
|
```
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add services/flow-engine/app/
|
|
git commit -m "feat(flow-engine): add main API and database models"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 7: Update Docker Compose
|
|
|
|
**Files:**
|
|
- Modify: `docker-compose.yml`
|
|
|
|
**Step 1: Add flow-engine service**
|
|
|
|
Add after api-gateway service:
|
|
|
|
```yaml
|
|
flow-engine:
|
|
build:
|
|
context: ./services/flow-engine
|
|
dockerfile: Dockerfile
|
|
container_name: wac_flow_engine
|
|
restart: unless-stopped
|
|
environment:
|
|
DATABASE_URL: postgresql://${DB_USER:-whatsapp_admin}:${DB_PASSWORD}@postgres:5432/${DB_NAME:-whatsapp_central}
|
|
REDIS_URL: redis://redis:6379
|
|
API_GATEWAY_URL: http://api-gateway:8000
|
|
WHATSAPP_CORE_URL: http://whatsapp-core:3001
|
|
depends_on:
|
|
postgres:
|
|
condition: service_healthy
|
|
redis:
|
|
condition: service_healthy
|
|
networks:
|
|
- wac_network
|
|
```
|
|
|
|
**Step 2: Commit**
|
|
|
|
```bash
|
|
git add docker-compose.yml
|
|
git commit -m "feat(docker): add flow-engine service"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 8: API Gateway - Internal Flow Routes
|
|
|
|
**Files:**
|
|
- Modify: `services/api-gateway/app/routers/whatsapp.py`
|
|
|
|
**Step 1: Add internal endpoint for flow messages**
|
|
|
|
Add at the end of whatsapp.py:
|
|
|
|
```python
|
|
class FlowSendRequest(BaseModel):
|
|
conversation_id: str
|
|
content: str
|
|
type: str = "text"
|
|
|
|
|
|
@router.post("/internal/flow/send")
|
|
async def flow_send_message(
|
|
request: FlowSendRequest,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Internal endpoint for flow engine to send messages"""
|
|
conversation = db.query(Conversation).filter(
|
|
Conversation.id == request.conversation_id
|
|
).first()
|
|
|
|
if not conversation:
|
|
raise HTTPException(status_code=404, detail="Conversation not found")
|
|
|
|
# Create message in DB
|
|
message = Message(
|
|
conversation_id=conversation.id,
|
|
direction=MessageDirection.OUTBOUND,
|
|
type=MessageType.TEXT,
|
|
content=request.content,
|
|
status=MessageStatus.PENDING,
|
|
)
|
|
db.add(message)
|
|
db.commit()
|
|
db.refresh(message)
|
|
|
|
# Send via WhatsApp Core
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages",
|
|
json={
|
|
"to": conversation.contact.phone_number,
|
|
"type": "text",
|
|
"content": {"text": request.content},
|
|
},
|
|
timeout=30,
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
message.whatsapp_message_id = data.get("messageId")
|
|
message.status = MessageStatus.SENT
|
|
else:
|
|
message.status = MessageStatus.FAILED
|
|
except Exception:
|
|
message.status = MessageStatus.FAILED
|
|
|
|
db.commit()
|
|
return {"success": True, "message_id": str(message.id)}
|
|
```
|
|
|
|
**Step 2: Add Pydantic import for FlowSendRequest**
|
|
|
|
Add to imports at top of whatsapp.py:
|
|
|
|
```python
|
|
from pydantic import BaseModel
|
|
```
|
|
|
|
**Step 3: Commit**
|
|
|
|
```bash
|
|
git add services/api-gateway/app/routers/whatsapp.py
|
|
git commit -m "feat(api-gateway): add internal flow send message endpoint"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 9: Integrate Flow Engine with Message Handler
|
|
|
|
**Files:**
|
|
- Modify: `services/api-gateway/app/routers/whatsapp.py`
|
|
|
|
**Step 1: Update handle_whatsapp_event to call flow engine**
|
|
|
|
In the `elif event.type == "message":` block, after creating the message, add flow engine call:
|
|
|
|
```python
|
|
# After creating message and before db.commit()
|
|
# Call flow engine
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
await client.post(
|
|
"http://flow-engine:8001/process",
|
|
json={
|
|
"conversation_id": str(conversation.id),
|
|
"contact": {
|
|
"id": str(contact.id),
|
|
"phone_number": contact.phone_number,
|
|
"name": contact.name,
|
|
},
|
|
"conversation": {
|
|
"id": str(conversation.id),
|
|
"status": conversation.status.value,
|
|
},
|
|
"message": {
|
|
"id": str(message.id),
|
|
"content": content,
|
|
"type": "text",
|
|
},
|
|
},
|
|
timeout=30,
|
|
)
|
|
except Exception as e:
|
|
print(f"Flow engine error: {e}")
|
|
```
|
|
|
|
**Step 2: Commit**
|
|
|
|
```bash
|
|
git add services/api-gateway/app/routers/whatsapp.py
|
|
git commit -m "feat(api-gateway): integrate flow engine on message events"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 10: Frontend - Flow Builder Page Setup
|
|
|
|
**Files:**
|
|
- Modify: `frontend/package.json`
|
|
- Create: `frontend/src/pages/FlowBuilder.tsx`
|
|
- Modify: `frontend/src/layouts/MainLayout.tsx`
|
|
|
|
**Step 1: Add reactflow dependency to package.json**
|
|
|
|
Add to dependencies:
|
|
```json
|
|
"reactflow": "^11.11.4"
|
|
```
|
|
|
|
**Step 2: Create FlowBuilder.tsx**
|
|
|
|
```tsx
|
|
import { useState, useCallback } from 'react';
|
|
import { useParams, useNavigate } from 'react-router-dom';
|
|
import ReactFlow, {
|
|
Node,
|
|
Edge,
|
|
Controls,
|
|
Background,
|
|
useNodesState,
|
|
useEdgesState,
|
|
addEdge,
|
|
Connection,
|
|
NodeTypes,
|
|
} from 'reactflow';
|
|
import 'reactflow/dist/style.css';
|
|
import { Button, message, Space } from 'antd';
|
|
import { SaveOutlined, ArrowLeftOutlined } from '@ant-design/icons';
|
|
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
|
|
import { apiClient } from '../api/client';
|
|
|
|
// Custom node components (simplified for now)
|
|
const TriggerNode = ({ data }: { data: any }) => (
|
|
<div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
|
|
<strong>🚀 {data.label}</strong>
|
|
</div>
|
|
);
|
|
|
|
const MessageNode = ({ data }: { data: any }) => (
|
|
<div style={{ padding: 10, border: '2px solid #1890ff', borderRadius: 8, background: '#e6f7ff', minWidth: 150 }}>
|
|
<strong>💬 Mensaje</strong>
|
|
<div style={{ fontSize: 12, marginTop: 4 }}>{data.config?.text?.slice(0, 50) || 'Sin texto'}</div>
|
|
</div>
|
|
);
|
|
|
|
const ConditionNode = ({ data }: { data: any }) => (
|
|
<div style={{ padding: 10, border: '2px solid #faad14', borderRadius: 8, background: '#fffbe6' }}>
|
|
<strong>❓ Condición</strong>
|
|
</div>
|
|
);
|
|
|
|
const WaitInputNode = ({ data }: { data: any }) => (
|
|
<div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
|
|
<strong>⏳ Esperar Input</strong>
|
|
</div>
|
|
);
|
|
|
|
const nodeTypes: NodeTypes = {
|
|
trigger: TriggerNode,
|
|
message: MessageNode,
|
|
condition: ConditionNode,
|
|
wait_input: WaitInputNode,
|
|
};
|
|
|
|
interface Flow {
|
|
id: string;
|
|
name: string;
|
|
nodes: Node[];
|
|
edges: Edge[];
|
|
}
|
|
|
|
export default function FlowBuilder() {
|
|
const { flowId } = useParams<{ flowId: string }>();
|
|
const navigate = useNavigate();
|
|
const queryClient = useQueryClient();
|
|
|
|
const [nodes, setNodes, onNodesChange] = useNodesState([]);
|
|
const [edges, setEdges, onEdgesChange] = useEdgesState([]);
|
|
|
|
const { data: flow, isLoading } = useQuery({
|
|
queryKey: ['flow', flowId],
|
|
queryFn: async () => {
|
|
const data = await apiClient.get<Flow>(`/api/flows/${flowId}`);
|
|
setNodes(data.nodes || []);
|
|
setEdges(data.edges || []);
|
|
return data;
|
|
},
|
|
enabled: !!flowId,
|
|
});
|
|
|
|
const saveMutation = useMutation({
|
|
mutationFn: async () => {
|
|
await apiClient.put(`/api/flows/${flowId}`, {
|
|
nodes,
|
|
edges,
|
|
});
|
|
},
|
|
onSuccess: () => {
|
|
message.success('Flujo guardado');
|
|
queryClient.invalidateQueries({ queryKey: ['flow', flowId] });
|
|
},
|
|
onError: () => {
|
|
message.error('Error al guardar');
|
|
},
|
|
});
|
|
|
|
const onConnect = useCallback(
|
|
(params: Connection) => setEdges((eds) => addEdge(params, eds)),
|
|
[setEdges]
|
|
);
|
|
|
|
const addNode = (type: string) => {
|
|
const newNode: Node = {
|
|
id: `${type}-${Date.now()}`,
|
|
type,
|
|
position: { x: 250, y: nodes.length * 100 + 50 },
|
|
data: {
|
|
label: type === 'trigger' ? 'Inicio' : type,
|
|
type,
|
|
config: type === 'message' ? { text: 'Hola {{contact.name}}!' } : {}
|
|
},
|
|
};
|
|
setNodes((nds) => [...nds, newNode]);
|
|
};
|
|
|
|
if (isLoading) {
|
|
return <div>Cargando...</div>;
|
|
}
|
|
|
|
return (
|
|
<div style={{ height: 'calc(100vh - 140px)' }}>
|
|
<div style={{ marginBottom: 16, display: 'flex', justifyContent: 'space-between' }}>
|
|
<Space>
|
|
<Button icon={<ArrowLeftOutlined />} onClick={() => navigate('/flows')}>
|
|
Volver
|
|
</Button>
|
|
<span style={{ fontSize: 18, fontWeight: 'bold' }}>{flow?.name}</span>
|
|
</Space>
|
|
<Space>
|
|
<Button onClick={() => addNode('trigger')}>+ Trigger</Button>
|
|
<Button onClick={() => addNode('message')}>+ Mensaje</Button>
|
|
<Button onClick={() => addNode('condition')}>+ Condición</Button>
|
|
<Button onClick={() => addNode('wait_input')}>+ Esperar Input</Button>
|
|
<Button
|
|
type="primary"
|
|
icon={<SaveOutlined />}
|
|
onClick={() => saveMutation.mutate()}
|
|
loading={saveMutation.isPending}
|
|
style={{ background: '#25D366' }}
|
|
>
|
|
Guardar
|
|
</Button>
|
|
</Space>
|
|
</div>
|
|
|
|
<div style={{ height: 'calc(100% - 50px)', border: '1px solid #d9d9d9', borderRadius: 8 }}>
|
|
<ReactFlow
|
|
nodes={nodes}
|
|
edges={edges}
|
|
onNodesChange={onNodesChange}
|
|
onEdgesChange={onEdgesChange}
|
|
onConnect={onConnect}
|
|
nodeTypes={nodeTypes}
|
|
fitView
|
|
>
|
|
<Controls />
|
|
<Background />
|
|
</ReactFlow>
|
|
</div>
|
|
</div>
|
|
);
|
|
}
|
|
```
|
|
|
|
**Step 3: Update MainLayout.tsx**
|
|
|
|
Add import and route:
|
|
```tsx
|
|
import FlowBuilder from '../pages/FlowBuilder';
|
|
```
|
|
|
|
Add menu item:
|
|
```tsx
|
|
{
|
|
key: '/flows',
|
|
icon: <BranchesOutlined />,
|
|
label: 'Flujos',
|
|
},
|
|
```
|
|
|
|
Add route in Routes:
|
|
```tsx
|
|
<Route path="/flows" element={<FlowList />} />
|
|
<Route path="/flows/:flowId" element={<FlowBuilder />} />
|
|
```
|
|
|
|
Add import for BranchesOutlined:
|
|
```tsx
|
|
import { BranchesOutlined } from '@ant-design/icons';
|
|
```
|
|
|
|
**Step 4: Commit**
|
|
|
|
```bash
|
|
git add frontend/
|
|
git commit -m "feat(frontend): add FlowBuilder page with React Flow"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 11: Frontend - Flow List Page
|
|
|
|
**Files:**
|
|
- Create: `frontend/src/pages/FlowList.tsx`
|
|
|
|
**Step 1: Create FlowList.tsx**
|
|
|
|
```tsx
|
|
import { useState } from 'react';
|
|
import { useNavigate } from 'react-router-dom';
|
|
import {
|
|
Card,
|
|
Button,
|
|
Table,
|
|
Tag,
|
|
Modal,
|
|
Form,
|
|
Input,
|
|
Select,
|
|
message,
|
|
Space,
|
|
Typography,
|
|
} from 'antd';
|
|
import {
|
|
PlusOutlined,
|
|
EditOutlined,
|
|
DeleteOutlined,
|
|
PlayCircleOutlined,
|
|
PauseCircleOutlined,
|
|
} from '@ant-design/icons';
|
|
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
|
|
import { apiClient } from '../api/client';
|
|
|
|
const { Title } = Typography;
|
|
|
|
interface Flow {
|
|
id: string;
|
|
name: string;
|
|
trigger_type: string;
|
|
is_active: boolean;
|
|
version: number;
|
|
updated_at: string;
|
|
}
|
|
|
|
const triggerLabels: Record<string, string> = {
|
|
welcome: 'Bienvenida',
|
|
keyword: 'Palabra clave',
|
|
fallback: 'Fallback',
|
|
event: 'Evento',
|
|
manual: 'Manual',
|
|
};
|
|
|
|
export default function FlowList() {
|
|
const [isModalOpen, setIsModalOpen] = useState(false);
|
|
const [form] = Form.useForm();
|
|
const navigate = useNavigate();
|
|
const queryClient = useQueryClient();
|
|
|
|
const { data: flows, isLoading } = useQuery({
|
|
queryKey: ['flows'],
|
|
queryFn: () => apiClient.get<Flow[]>('/api/flows'),
|
|
});
|
|
|
|
const createMutation = useMutation({
|
|
mutationFn: (data: any) => apiClient.post<Flow>('/api/flows', data),
|
|
onSuccess: (data) => {
|
|
message.success('Flujo creado');
|
|
setIsModalOpen(false);
|
|
form.resetFields();
|
|
queryClient.invalidateQueries({ queryKey: ['flows'] });
|
|
navigate(`/flows/${data.id}`);
|
|
},
|
|
onError: () => {
|
|
message.error('Error al crear flujo');
|
|
},
|
|
});
|
|
|
|
const deleteMutation = useMutation({
|
|
mutationFn: (id: string) => apiClient.delete(`/api/flows/${id}`),
|
|
onSuccess: () => {
|
|
message.success('Flujo eliminado');
|
|
queryClient.invalidateQueries({ queryKey: ['flows'] });
|
|
},
|
|
});
|
|
|
|
const toggleActiveMutation = useMutation({
|
|
mutationFn: async ({ id, active }: { id: string; active: boolean }) => {
|
|
const endpoint = active ? 'activate' : 'deactivate';
|
|
await apiClient.post(`/api/flows/${id}/${endpoint}`, {});
|
|
},
|
|
onSuccess: () => {
|
|
queryClient.invalidateQueries({ queryKey: ['flows'] });
|
|
message.success('Estado actualizado');
|
|
},
|
|
});
|
|
|
|
const columns = [
|
|
{
|
|
title: 'Nombre',
|
|
dataIndex: 'name',
|
|
key: 'name',
|
|
},
|
|
{
|
|
title: 'Trigger',
|
|
dataIndex: 'trigger_type',
|
|
key: 'trigger_type',
|
|
render: (type: string) => triggerLabels[type] || type,
|
|
},
|
|
{
|
|
title: 'Estado',
|
|
dataIndex: 'is_active',
|
|
key: 'is_active',
|
|
render: (active: boolean) => (
|
|
<Tag color={active ? 'green' : 'default'}>
|
|
{active ? 'Activo' : 'Inactivo'}
|
|
</Tag>
|
|
),
|
|
},
|
|
{
|
|
title: 'Versión',
|
|
dataIndex: 'version',
|
|
key: 'version',
|
|
},
|
|
{
|
|
title: 'Acciones',
|
|
key: 'actions',
|
|
render: (_: any, record: Flow) => (
|
|
<Space>
|
|
<Button
|
|
icon={<EditOutlined />}
|
|
onClick={() => navigate(`/flows/${record.id}`)}
|
|
>
|
|
Editar
|
|
</Button>
|
|
<Button
|
|
icon={record.is_active ? <PauseCircleOutlined /> : <PlayCircleOutlined />}
|
|
onClick={() =>
|
|
toggleActiveMutation.mutate({ id: record.id, active: !record.is_active })
|
|
}
|
|
>
|
|
{record.is_active ? 'Desactivar' : 'Activar'}
|
|
</Button>
|
|
<Button
|
|
danger
|
|
icon={<DeleteOutlined />}
|
|
onClick={() => {
|
|
Modal.confirm({
|
|
title: '¿Eliminar flujo?',
|
|
onOk: () => deleteMutation.mutate(record.id),
|
|
});
|
|
}}
|
|
/>
|
|
</Space>
|
|
),
|
|
},
|
|
];
|
|
|
|
return (
|
|
<div>
|
|
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
|
|
<Title level={4} style={{ margin: 0 }}>Flujos de Chatbot</Title>
|
|
<Button
|
|
type="primary"
|
|
icon={<PlusOutlined />}
|
|
onClick={() => setIsModalOpen(true)}
|
|
style={{ background: '#25D366' }}
|
|
>
|
|
Crear Flujo
|
|
</Button>
|
|
</div>
|
|
|
|
<Card>
|
|
<Table
|
|
dataSource={flows}
|
|
columns={columns}
|
|
rowKey="id"
|
|
loading={isLoading}
|
|
pagination={false}
|
|
/>
|
|
</Card>
|
|
|
|
<Modal
|
|
title="Crear nuevo flujo"
|
|
open={isModalOpen}
|
|
onCancel={() => {
|
|
setIsModalOpen(false);
|
|
form.resetFields();
|
|
}}
|
|
footer={null}
|
|
>
|
|
<Form
|
|
form={form}
|
|
layout="vertical"
|
|
onFinish={(values) => createMutation.mutate(values)}
|
|
>
|
|
<Form.Item
|
|
name="name"
|
|
label="Nombre"
|
|
rules={[{ required: true, message: 'Ingresa un nombre' }]}
|
|
>
|
|
<Input placeholder="Ej: Bienvenida Principal" />
|
|
</Form.Item>
|
|
|
|
<Form.Item
|
|
name="trigger_type"
|
|
label="Tipo de Trigger"
|
|
rules={[{ required: true, message: 'Selecciona un trigger' }]}
|
|
>
|
|
<Select placeholder="Selecciona...">
|
|
<Select.Option value="welcome">Bienvenida (primer mensaje)</Select.Option>
|
|
<Select.Option value="keyword">Palabra clave</Select.Option>
|
|
<Select.Option value="fallback">Fallback (sin coincidencia)</Select.Option>
|
|
</Select>
|
|
</Form.Item>
|
|
|
|
<Form.Item
|
|
noStyle
|
|
shouldUpdate={(prev, curr) => prev.trigger_type !== curr.trigger_type}
|
|
>
|
|
{({ getFieldValue }) =>
|
|
getFieldValue('trigger_type') === 'keyword' && (
|
|
<Form.Item
|
|
name="trigger_value"
|
|
label="Palabras clave"
|
|
rules={[{ required: true }]}
|
|
>
|
|
<Input placeholder="hola, menu, ayuda (separadas por coma)" />
|
|
</Form.Item>
|
|
)
|
|
}
|
|
</Form.Item>
|
|
|
|
<Form.Item>
|
|
<Button
|
|
type="primary"
|
|
htmlType="submit"
|
|
loading={createMutation.isPending}
|
|
block
|
|
>
|
|
Crear
|
|
</Button>
|
|
</Form.Item>
|
|
</Form>
|
|
</Modal>
|
|
</div>
|
|
);
|
|
}
|
|
```
|
|
|
|
**Step 2: Commit**
|
|
|
|
```bash
|
|
git add frontend/src/pages/FlowList.tsx
|
|
git commit -m "feat(frontend): add FlowList page for managing flows"
|
|
```
|
|
|
|
---
|
|
|
|
## Task 12: Final Integration & Testing
|
|
|
|
**Files:**
|
|
- Update: `frontend/src/layouts/MainLayout.tsx` (complete updates)
|
|
|
|
**Step 1: Complete MainLayout updates**
|
|
|
|
Ensure all imports and routes are properly added for FlowList and FlowBuilder.
|
|
|
|
**Step 2: Create database migration for flows table**
|
|
|
|
Run alembic revision:
|
|
```bash
|
|
cd services/api-gateway
|
|
alembic revision --autogenerate -m "add flows and flow_sessions tables"
|
|
```
|
|
|
|
**Step 3: Final commit**
|
|
|
|
```bash
|
|
git add .
|
|
git commit -m "feat: complete Fase 2 - Flow Engine with visual builder"
|
|
```
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
Fase 2 creates:
|
|
|
|
1. **Database Models**: Flow, FlowSession tables for storing chatbot flows
|
|
2. **Flow API**: CRUD operations for managing flows
|
|
3. **Flow Engine Service**: Python service that executes flows
|
|
4. **Flow Context**: Variable interpolation system
|
|
5. **Node Types**: trigger, message, condition, wait_input, set_variable
|
|
6. **Frontend Flow Builder**: Visual drag & drop editor with React Flow
|
|
7. **Frontend Flow List**: Management page for flows
|
|
|
|
To test:
|
|
```bash
|
|
docker-compose up -d --build
|
|
|
|
# Open http://localhost:3000
|
|
# Go to Flujos → Create flow
|
|
# Add trigger + message nodes
|
|
# Connect them
|
|
# Activate flow
|
|
# Send message from WhatsApp to test
|
|
```
|