Files
WhatsAppCentralizado/docs/plans/2026-01-29-fase-4-flow-engine-avanzado.md
Claude AI e3b14be00f docs: add Fase 4 Flow Engine Avanzado implementation plan
15 tasks covering:
- NodeExecutor modular architecture with registry
- Advanced nodes: switch, delay, random, loop, goto
- Validation nodes: email, phone, number, date, regex, options
- Script nodes: JavaScript eval, HTTP request
- AI nodes: OpenAI response, sentiment analysis
- Global variables system (model, API, frontend)
- Flow templates for reusable flows
- Frontend: new node components, variables page, templates page

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:08:15 +00:00

1670 lines
52 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Fase 4: Flow Engine Avanzado - Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA.
**Architecture:** Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder.
**Tech Stack:** Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node)
---
## Task 1: Node Executor Architecture
**Files:**
- Create: `services/flow-engine/app/nodes/__init__.py`
- Create: `services/flow-engine/app/nodes/base.py`
- Create: `services/flow-engine/app/nodes/basic.py`
**Code for base.py:**
```python
from abc import ABC, abstractmethod
from typing import Optional, Any
from app.context import FlowContext
class NodeExecutor(ABC):
"""Base class for all node executors"""
@abstractmethod
async def execute(
self,
config: dict,
context: FlowContext,
session: Any
) -> Optional[str]:
"""
Execute the node logic.
Returns: branch name for routing (e.g., "default", "true", "false")
or "wait" to pause execution
"""
pass
class NodeRegistry:
"""Registry of all available node executors"""
_executors: dict[str, NodeExecutor] = {}
@classmethod
def register(cls, node_type: str, executor: NodeExecutor):
cls._executors[node_type] = executor
@classmethod
def get(cls, node_type: str) -> Optional[NodeExecutor]:
return cls._executors.get(node_type)
@classmethod
def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]:
executor = cls.get(node_type)
if executor:
return executor.execute(config, context, session)
return "default"
```
**Code for basic.py:**
```python
from typing import Optional, Any
from app.nodes.base import NodeExecutor, NodeRegistry
from app.context import FlowContext
class TriggerExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
return "default"
class MessageExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
await self.send_message(session.conversation_id, text)
return "default"
class ButtonsExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
buttons = config.get("buttons", [])
button_text = "\n".join([f"{b['label']}" for b in buttons])
await self.send_message(session.conversation_id, f"{text}\n\n{button_text}")
return "wait"
class WaitInputExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "user_input")
context.set(variable, context.message.get("content", ""))
return "default"
class SetVariableExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
var_name = config.get("variable", "")
var_value = context.interpolate(config.get("value", ""))
context.set(var_name, var_value)
return "default"
class ConditionExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
conditions = config.get("conditions", [])
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "equals")
value = cond.get("value", "")
branch = cond.get("branch", "default")
actual = context.get(field) or context.message.get("content", "")
if operator == "equals" and str(actual).lower() == str(value).lower():
return branch
elif operator == "contains" and str(value).lower() in str(actual).lower():
return branch
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
return branch
elif operator == "not_equals" and str(actual).lower() != str(value).lower():
return branch
elif operator == "greater_than":
try:
if float(actual) > float(value):
return branch
except ValueError:
pass
elif operator == "less_than":
try:
if float(actual) < float(value):
return branch
except ValueError:
pass
return "default"
```
**Code for __init__.py:**
```python
from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
```
**Commit:** `feat(fase4): add NodeExecutor architecture with basic nodes`
---
## Task 2: Advanced Nodes - Switch, Delay, Random
**Files:**
- Create: `services/flow-engine/app/nodes/advanced.py`
- Modify: `services/flow-engine/app/nodes/__init__.py`
**Code for advanced.py:**
```python
import asyncio
import random as random_module
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class SwitchExecutor(NodeExecutor):
"""Multi-branch switch based on variable value"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
cases = config.get("cases", []) # [{value: "x", branch: "case_x"}, ...]
actual = context.get(variable) or context.message.get("content", "")
actual_str = str(actual).lower().strip()
for case in cases:
case_value = str(case.get("value", "")).lower().strip()
if actual_str == case_value:
return case.get("branch", "default")
return config.get("default_branch", "default")
class DelayExecutor(NodeExecutor):
"""Wait for a specified duration before continuing"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
delay_seconds = config.get("seconds", 0)
delay_type = config.get("type", "fixed") # fixed or random
if delay_type == "random":
min_delay = config.get("min_seconds", 1)
max_delay = config.get("max_seconds", 5)
delay_seconds = random_module.uniform(min_delay, max_delay)
if delay_seconds > 0:
await asyncio.sleep(min(delay_seconds, 30)) # Max 30 seconds
return "default"
class RandomExecutor(NodeExecutor):
"""Random branch selection for A/B testing"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
branches = config.get("branches", []) # [{branch: "a", weight: 50}, {branch: "b", weight: 50}]
if not branches:
return "default"
total_weight = sum(b.get("weight", 1) for b in branches)
rand_value = random_module.uniform(0, total_weight)
cumulative = 0
for branch in branches:
cumulative += branch.get("weight", 1)
if rand_value <= cumulative:
# Track A/B test result
test_name = config.get("test_name", "ab_test")
context.set(f"_ab_{test_name}", branch.get("branch", "default"))
return branch.get("branch", "default")
return branches[-1].get("branch", "default") if branches else "default"
class LoopExecutor(NodeExecutor):
"""Loop a certain number of times or until condition"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
loop_var = config.get("counter_variable", "_loop_counter")
max_iterations = config.get("max_iterations", 10)
current = int(context.get(loop_var) or 0)
if current < max_iterations:
context.set(loop_var, current + 1)
return "continue" # Continue loop
else:
context.set(loop_var, 0) # Reset counter
return "done" # Exit loop
class GoToExecutor(NodeExecutor):
"""Jump to another node or flow"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
target_node_id = config.get("target_node_id")
target_flow_id = config.get("target_flow_id")
if target_flow_id:
# Store for sub-flow execution
context.set("_goto_flow_id", target_flow_id)
return "sub_flow"
if target_node_id:
context.set("_goto_node_id", target_node_id)
return "goto"
return "default"
```
**Update __init__.py:**
```python
from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
SwitchExecutor, DelayExecutor, RandomExecutor,
LoopExecutor, GoToExecutor
)
```
**Commit:** `feat(fase4): add advanced nodes - switch, delay, random, loop, goto`
---
## Task 3: Validation Nodes
**Files:**
- Create: `services/flow-engine/app/nodes/validation.py`
- Modify: `services/flow-engine/app/nodes/__init__.py`
**Code for validation.py:**
```python
import re
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class ValidateEmailExecutor(NodeExecutor):
"""Validate email format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, str(value).strip()):
return "valid"
return "invalid"
class ValidatePhoneExecutor(NodeExecutor):
"""Validate phone number format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
# Remove common separators
cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value))
# Check if it's mostly digits and reasonable length
if cleaned.isdigit() and 8 <= len(cleaned) <= 15:
return "valid"
return "invalid"
class ValidateNumberExecutor(NodeExecutor):
"""Validate numeric value within range"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
min_val = config.get("min")
max_val = config.get("max")
try:
num = float(str(value).strip())
if min_val is not None and num < float(min_val):
return "invalid"
if max_val is not None and num > float(max_val):
return "invalid"
return "valid"
except ValueError:
return "invalid"
class ValidateDateExecutor(NodeExecutor):
"""Validate date format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
date_format = config.get("format", "%Y-%m-%d")
from datetime import datetime
try:
datetime.strptime(str(value).strip(), date_format)
return "valid"
except ValueError:
return "invalid"
class ValidateRegexExecutor(NodeExecutor):
"""Validate against custom regex pattern"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
pattern = config.get("pattern", ".*")
try:
if re.match(pattern, str(value).strip()):
return "valid"
return "invalid"
except re.error:
return "invalid"
class ValidateOptionsExecutor(NodeExecutor):
"""Validate against list of allowed options"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
options = config.get("options", [])
case_sensitive = config.get("case_sensitive", False)
value_str = str(value).strip()
if not case_sensitive:
value_str = value_str.lower()
options = [str(o).lower() for o in options]
if value_str in options:
return "valid"
return "invalid"
```
**Update __init__.py:**
```python
from app.nodes.validation import (
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
```
**Commit:** `feat(fase4): add validation nodes for email, phone, number, date, regex, options`
---
## Task 4: JavaScript Node
**Files:**
- Create: `services/flow-engine/app/nodes/script.py`
- Modify: `services/flow-engine/app/nodes/__init__.py`
**Code for script.py:**
```python
import json
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class JavaScriptExecutor(NodeExecutor):
"""Execute JavaScript code (using Python eval with restricted globals)"""
ALLOWED_BUILTINS = {
'abs': abs, 'all': all, 'any': any, 'bool': bool,
'dict': dict, 'float': float, 'int': int, 'len': len,
'list': list, 'max': max, 'min': min, 'round': round,
'str': str, 'sum': sum, 'sorted': sorted,
}
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
code = config.get("code", "")
output_variable = config.get("output_variable", "_result")
if not code:
return "default"
# Build safe execution context
exec_globals = {
'__builtins__': self.ALLOWED_BUILTINS,
'context': {
'contact': context.contact,
'conversation': context.conversation,
'message': context.message,
'variables': context.variables.copy(),
},
'variables': context.variables.copy(),
}
try:
# Simple expression evaluation (not full JS, but Python expressions)
# For safety, we use a restricted eval
result = eval(code, exec_globals, {})
if result is not None:
context.set(output_variable, result)
return "success"
except Exception as e:
context.set("_script_error", str(e))
return "error"
class HttpRequestExecutor(NodeExecutor):
"""Make HTTP requests to external APIs"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
url = context.interpolate(config.get("url", ""))
method = config.get("method", "GET").upper()
headers = config.get("headers", {})
body = config.get("body")
output_variable = config.get("output_variable", "_http_response")
timeout = config.get("timeout", 10)
if not url:
return "error"
# Interpolate headers and body
headers = {k: context.interpolate(str(v)) for k, v in headers.items()}
if body and isinstance(body, str):
body = context.interpolate(body)
try:
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=url,
headers=headers,
json=json.loads(body) if body and isinstance(body, str) else body,
timeout=timeout
)
context.set(output_variable, {
"status": response.status_code,
"body": response.text,
"json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None
})
if 200 <= response.status_code < 300:
return "success"
return "error"
except Exception as e:
context.set("_http_error", str(e))
return "error"
```
**Update __init__.py** to add:
```python
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
```
**Commit:** `feat(fase4): add JavaScript and HTTP request nodes`
---
## Task 5: AI Response Node
**Files:**
- Create: `services/flow-engine/app/nodes/ai.py`
- Modify: `services/flow-engine/app/config.py`
- Modify: `services/flow-engine/app/nodes/__init__.py`
**Add to config.py:**
```python
# OpenAI
OPENAI_API_KEY: str = ""
OPENAI_MODEL: str = "gpt-3.5-turbo"
```
**Code for ai.py:**
```python
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
from app.config import get_settings
settings = get_settings()
class AIResponseExecutor(NodeExecutor):
"""Generate AI response using OpenAI"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
prompt = context.interpolate(config.get("prompt", ""))
system_prompt = config.get("system_prompt", "Eres un asistente útil.")
output_variable = config.get("output_variable", "_ai_response")
max_tokens = config.get("max_tokens", 500)
temperature = config.get("temperature", 0.7)
if not settings.OPENAI_API_KEY:
context.set("_ai_error", "OpenAI API key not configured")
return "error"
if not prompt:
return "error"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
# Include conversation history if configured
if config.get("include_history", False):
history = context.get("_conversation_history") or []
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}]
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": settings.OPENAI_MODEL,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
},
timeout=30
)
if response.status_code == 200:
data = response.json()
ai_response = data["choices"][0]["message"]["content"]
context.set(output_variable, ai_response)
return "success"
else:
context.set("_ai_error", response.text)
return "error"
except Exception as e:
context.set("_ai_error", str(e))
return "error"
class AISentimentExecutor(NodeExecutor):
"""Analyze sentiment of user message"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
text = context.get(config.get("variable", "")) or context.message.get("content", "")
output_variable = config.get("output_variable", "_sentiment")
if not settings.OPENAI_API_KEY or not text:
return "neutral"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"},
{"role": "user", "content": text}
],
"max_tokens": 10,
"temperature": 0
},
timeout=15
)
if response.status_code == 200:
data = response.json()
sentiment = data["choices"][0]["message"]["content"].lower().strip()
context.set(output_variable, sentiment)
if "positive" in sentiment:
return "positive"
elif "negative" in sentiment:
return "negative"
return "neutral"
except Exception:
pass
return "neutral"
```
**Update __init__.py:**
```python
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
```
**Commit:** `feat(fase4): add AI response and sentiment analysis nodes`
---
## Task 6: Global Variables Model
**Files:**
- Create: `services/api-gateway/app/models/global_variable.py`
- Modify: `services/api-gateway/app/models/__init__.py`
**Code for global_variable.py:**
```python
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime, Boolean
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class GlobalVariable(Base):
__tablename__ = "global_variables"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
key = Column(String(100), nullable=False, unique=True, index=True)
value = Column(Text, nullable=True)
value_type = Column(String(20), default="string", nullable=False) # string, number, boolean, json
description = Column(String(500), nullable=True)
is_secret = Column(Boolean, default=False, nullable=False) # Hide value in UI
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
```
**Update models/__init__.py:**
```python
from app.models.global_variable import GlobalVariable
```
**Commit:** `feat(fase4): add GlobalVariable database model`
---
## Task 7: Global Variables API
**Files:**
- Create: `services/api-gateway/app/schemas/global_variable.py`
- Create: `services/api-gateway/app/routers/global_variables.py`
- Modify: `services/api-gateway/app/main.py`
**Code for schemas/global_variable.py:**
```python
from pydantic import BaseModel
from typing import Optional
from uuid import UUID
from datetime import datetime
class GlobalVariableCreate(BaseModel):
key: str
value: Optional[str] = None
value_type: str = "string"
description: Optional[str] = None
is_secret: bool = False
class GlobalVariableUpdate(BaseModel):
value: Optional[str] = None
value_type: Optional[str] = None
description: Optional[str] = None
is_secret: Optional[bool] = None
class GlobalVariableResponse(BaseModel):
id: UUID
key: str
value: Optional[str]
value_type: str
description: Optional[str]
is_secret: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
```
**Code for routers/global_variables.py:**
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.global_variable import GlobalVariable
from app.schemas.global_variable import (
GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse
)
router = APIRouter(prefix="/api/global-variables", tags=["global-variables"])
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.get("", response_model=List[GlobalVariableResponse])
def list_global_variables(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variables = db.query(GlobalVariable).all()
# Mask secret values
for var in variables:
if var.is_secret:
var.value = "********"
return variables
@router.post("", response_model=GlobalVariableResponse)
def create_global_variable(
request: GlobalVariableCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first()
if existing:
raise HTTPException(status_code=400, detail="Variable key already exists")
variable = GlobalVariable(**request.model_dump())
db.add(variable)
db.commit()
db.refresh(variable)
return variable
@router.put("/{variable_id}", response_model=GlobalVariableResponse)
def update_global_variable(
variable_id: UUID,
request: GlobalVariableUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
for key, value in request.model_dump(exclude_unset=True).items():
setattr(variable, key, value)
db.commit()
db.refresh(variable)
return variable
@router.delete("/{variable_id}")
def delete_global_variable(
variable_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
db.delete(variable)
db.commit()
return {"success": True}
@router.get("/by-key/{key}")
def get_variable_by_key(
key: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
return {"key": variable.key, "value": variable.value if not variable.is_secret else None}
```
**Add to main.py:**
```python
from app.routers import global_variables
app.include_router(global_variables.router)
```
**Commit:** `feat(fase4): add GlobalVariable API routes`
---
## Task 8: Flow Templates Model and API
**Files:**
- Create: `services/api-gateway/app/models/flow_template.py`
- Create: `services/api-gateway/app/schemas/flow_template.py`
- Create: `services/api-gateway/app/routers/flow_templates.py`
- Modify: `services/api-gateway/app/models/__init__.py`
- Modify: `services/api-gateway/app/main.py`
**Code for flow_template.py:**
```python
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class FlowTemplate(Base):
__tablename__ = "flow_templates"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
category = Column(String(50), default="general", nullable=False)
nodes = Column(JSONB, default=list)
edges = Column(JSONB, default=list)
variables = Column(JSONB, default=dict)
preview_image = Column(String(500), nullable=True)
is_public = Column(Boolean, default=True, nullable=False)
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
```
Add missing imports to flow_template.py:
```python
from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey
```
**Code for schemas/flow_template.py:**
```python
from pydantic import BaseModel
from typing import Optional, List
from uuid import UUID
from datetime import datetime
class FlowTemplateCreate(BaseModel):
name: str
description: Optional[str] = None
category: str = "general"
nodes: List[dict] = []
edges: List[dict] = []
variables: dict = {}
is_public: bool = True
class FlowTemplateResponse(BaseModel):
id: UUID
name: str
description: Optional[str]
category: str
nodes: List[dict]
edges: List[dict]
variables: dict
preview_image: Optional[str]
is_public: bool
created_by: UUID
created_at: datetime
class Config:
from_attributes = True
```
**Code for routers/flow_templates.py:**
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.flow_template import FlowTemplate
from app.models.flow import Flow
from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse
router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"])
@router.get("", response_model=List[FlowTemplateResponse])
def list_templates(
category: str = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True)
if category:
query = query.filter(FlowTemplate.category == category)
return query.all()
@router.post("", response_model=FlowTemplateResponse)
def create_template(
request: FlowTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
template = FlowTemplate(
**request.model_dump(),
created_by=current_user.id,
)
db.add(template)
db.commit()
db.refresh(template)
return template
@router.post("/{template_id}/use")
def use_template(
template_id: UUID,
flow_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create a new flow from template"""
template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
from app.models.flow import Flow, TriggerType
flow = Flow(
name=flow_name,
description=f"Created from template: {template.name}",
trigger_type=TriggerType.KEYWORD,
nodes=template.nodes,
edges=template.edges,
variables=template.variables,
is_active=False,
)
db.add(flow)
db.commit()
db.refresh(flow)
return {"success": True, "flow_id": str(flow.id)}
@router.delete("/{template_id}")
def delete_template(
template_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
if template.created_by != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
db.delete(template)
db.commit()
return {"success": True}
```
**Update models/__init__.py:**
```python
from app.models.flow_template import FlowTemplate
```
**Add to main.py:**
```python
from app.routers import flow_templates
app.include_router(flow_templates.router)
```
**Commit:** `feat(fase4): add FlowTemplate model and API`
---
## Task 9: Refactor Engine to Use NodeRegistry
**Files:**
- Modify: `services/flow-engine/app/engine.py`
**Replace _execute_node method and add initialization:**
```python
from typing import Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
from app.nodes import NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
)
from app.nodes.validation import (
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
settings = get_settings()
class FlowEngine:
"""Executes flow nodes based on incoming messages"""
def __init__(self, db_session):
self.db = db_session
self._register_nodes()
def _register_nodes(self):
"""Register all available node executors"""
# Basic nodes
NodeRegistry.register("trigger", TriggerExecutor())
NodeRegistry.register("message", MessageExecutor(self._send_message))
NodeRegistry.register("buttons", ButtonsExecutor(self._send_message))
NodeRegistry.register("wait_input", WaitInputExecutor())
NodeRegistry.register("set_variable", SetVariableExecutor())
NodeRegistry.register("condition", ConditionExecutor())
# Advanced nodes
NodeRegistry.register("switch", SwitchExecutor())
NodeRegistry.register("delay", DelayExecutor())
NodeRegistry.register("random", RandomExecutor())
NodeRegistry.register("loop", LoopExecutor())
NodeRegistry.register("goto", GoToExecutor())
# Validation nodes
NodeRegistry.register("validate_email", ValidateEmailExecutor())
NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
NodeRegistry.register("validate_number", ValidateNumberExecutor())
NodeRegistry.register("validate_date", ValidateDateExecutor())
NodeRegistry.register("validate_regex", ValidateRegexExecutor())
NodeRegistry.register("validate_options", ValidateOptionsExecutor())
# Script nodes
NodeRegistry.register("javascript", JavaScriptExecutor())
NodeRegistry.register("http_request", HttpRequestExecutor())
# AI nodes
NodeRegistry.register("ai_response", AIResponseExecutor())
NodeRegistry.register("ai_sentiment", AISentimentExecutor())
async def _execute_node(
self,
node_type: str,
config: dict,
context: FlowContext,
session
) -> Optional[str]:
"""Execute a single node using NodeRegistry"""
executor = NodeRegistry.get(node_type)
if executor:
result = await executor.execute(config, context, session)
# Handle special results
if result == "goto" and context.get("_goto_node_id"):
session.current_node_id = context.get("_goto_node_id")
context.set("_goto_node_id", None)
return result
return "default"
# ... rest of the class remains the same (process_message, _find_matching_flow, etc.)
```
**Commit:** `feat(fase4): refactor FlowEngine to use NodeRegistry`
---
## Task 10: Frontend - Advanced Node Components
**Files:**
- Modify: `frontend/src/pages/FlowBuilder.tsx`
**Add new node components and update nodeTypes:**
```tsx
// Add these new node components after existing ones
const SwitchNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #13c2c2', borderRadius: 8, background: '#e6fffb' }}>
<strong>🔀 Switch</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'variable'}</div>
</div>
);
const DelayNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #595959', borderRadius: 8, background: '#fafafa' }}>
<strong> Delay</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.seconds || 0}s</div>
</div>
);
const RandomNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #eb2f96', borderRadius: 8, background: '#fff0f6' }}>
<strong>🎲 Random</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>A/B Test</div>
</div>
);
const LoopNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #fa8c16', borderRadius: 8, background: '#fff7e6' }}>
<strong>🔄 Loop</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>max: {data.config?.max_iterations || 10}</div>
</div>
);
const ValidateNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
<strong> Validate</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.type || 'email'}</div>
</div>
);
const HttpNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #2f54eb', borderRadius: 8, background: '#f0f5ff' }}>
<strong>🌐 HTTP</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.method || 'GET'}</div>
</div>
);
const AINode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
<strong>🤖 AI Response</strong>
</div>
);
const SetVariableNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #8c8c8c', borderRadius: 8, background: '#f5f5f5' }}>
<strong>📝 Variable</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'var'}</div>
</div>
);
// Update nodeTypes
const nodeTypes: NodeTypes = {
trigger: TriggerNode,
message: MessageNode,
condition: ConditionNode,
wait_input: WaitInputNode,
switch: SwitchNode,
delay: DelayNode,
random: RandomNode,
loop: LoopNode,
validate: ValidateNode,
http_request: HttpNode,
ai_response: AINode,
set_variable: SetVariableNode,
};
```
**Update the toolbar to add new node buttons:**
```tsx
<Space wrap>
<Button onClick={() => addNode('trigger')}>+ Trigger</Button>
<Button onClick={() => addNode('message')}>+ Mensaje</Button>
<Button onClick={() => addNode('condition')}>+ Condición</Button>
<Button onClick={() => addNode('wait_input')}>+ Esperar</Button>
<Button onClick={() => addNode('switch')}>+ Switch</Button>
<Button onClick={() => addNode('delay')}>+ Delay</Button>
<Button onClick={() => addNode('random')}>+ Random</Button>
<Button onClick={() => addNode('loop')}>+ Loop</Button>
<Button onClick={() => addNode('validate')}>+ Validar</Button>
<Button onClick={() => addNode('http_request')}>+ HTTP</Button>
<Button onClick={() => addNode('ai_response')}>+ AI</Button>
<Button onClick={() => addNode('set_variable')}>+ Variable</Button>
<Button type="primary" icon={<SaveOutlined />} onClick={() => saveMutation.mutate()} loading={saveMutation.isPending} style={{ background: '#25D366' }}>
Guardar
</Button>
</Space>
```
**Commit:** `feat(fase4): add advanced node components to FlowBuilder`
---
## Task 11: Frontend - Global Variables Page
**Files:**
- Create: `frontend/src/pages/GlobalVariables.tsx`
**Code:**
```tsx
import { useState } from 'react';
import {
Card, Table, Button, Modal, Form, Input, Select, Switch, Space, Tag, Popconfirm, message, Typography,
} from 'antd';
import { PlusOutlined, EditOutlined, DeleteOutlined, LockOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';
const { Title } = Typography;
interface GlobalVariable {
id: string;
key: string;
value: string | null;
value_type: string;
description: string | null;
is_secret: boolean;
}
export default function GlobalVariables() {
const [isModalOpen, setIsModalOpen] = useState(false);
const [selectedVar, setSelectedVar] = useState<GlobalVariable | null>(null);
const [form] = Form.useForm();
const queryClient = useQueryClient();
const { data: variables, isLoading } = useQuery({
queryKey: ['global-variables'],
queryFn: () => apiClient.get<GlobalVariable[]>('/api/global-variables'),
});
const createMutation = useMutation({
mutationFn: (data: Partial<GlobalVariable>) => apiClient.post('/api/global-variables', data),
onSuccess: () => {
message.success('Variable creada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
closeModal();
},
});
const updateMutation = useMutation({
mutationFn: ({ id, data }: { id: string; data: Partial<GlobalVariable> }) =>
apiClient.put(`/api/global-variables/${id}`, data),
onSuccess: () => {
message.success('Variable actualizada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
closeModal();
},
});
const deleteMutation = useMutation({
mutationFn: (id: string) => apiClient.delete(`/api/global-variables/${id}`),
onSuccess: () => {
message.success('Variable eliminada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
},
});
const closeModal = () => {
setIsModalOpen(false);
setSelectedVar(null);
form.resetFields();
};
const handleSubmit = (values: Partial<GlobalVariable>) => {
if (selectedVar) {
updateMutation.mutate({ id: selectedVar.id, data: values });
} else {
createMutation.mutate(values);
}
};
const handleEdit = (variable: GlobalVariable) => {
setSelectedVar(variable);
form.setFieldsValue(variable);
setIsModalOpen(true);
};
const columns = [
{
title: 'Clave',
dataIndex: 'key',
key: 'key',
render: (key: string, record: GlobalVariable) => (
<Space>
<code>{'{{global.' + key + '}}'}</code>
{record.is_secret && <LockOutlined style={{ color: '#faad14' }} />}
</Space>
),
},
{
title: 'Valor',
dataIndex: 'value',
key: 'value',
render: (value: string, record: GlobalVariable) =>
record.is_secret ? <Tag>Secreto</Tag> : value,
},
{
title: 'Tipo',
dataIndex: 'value_type',
key: 'value_type',
render: (type: string) => <Tag>{type}</Tag>,
},
{
title: 'Descripción',
dataIndex: 'description',
key: 'description',
},
{
title: 'Acciones',
key: 'actions',
render: (_: unknown, record: GlobalVariable) => (
<Space>
<Button size="small" icon={<EditOutlined />} onClick={() => handleEdit(record)} />
<Popconfirm title="¿Eliminar variable?" onConfirm={() => deleteMutation.mutate(record.id)}>
<Button size="small" danger icon={<DeleteOutlined />} />
</Popconfirm>
</Space>
),
},
];
return (
<div>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
<Title level={4} style={{ margin: 0 }}>Variables Globales</Title>
<Button type="primary" icon={<PlusOutlined />} onClick={() => setIsModalOpen(true)}>
Nueva Variable
</Button>
</div>
<Card>
<Table dataSource={variables} columns={columns} rowKey="id" loading={isLoading} pagination={false} />
</Card>
<Modal
title={selectedVar ? 'Editar Variable' : 'Nueva Variable'}
open={isModalOpen}
onCancel={closeModal}
footer={null}
>
<Form form={form} layout="vertical" onFinish={handleSubmit}>
<Form.Item name="key" label="Clave" rules={[{ required: true }]}>
<Input placeholder="api_key" disabled={!!selectedVar} />
</Form.Item>
<Form.Item name="value" label="Valor">
<Input.TextArea rows={2} />
</Form.Item>
<Form.Item name="value_type" label="Tipo" initialValue="string">
<Select>
<Select.Option value="string">String</Select.Option>
<Select.Option value="number">Number</Select.Option>
<Select.Option value="boolean">Boolean</Select.Option>
<Select.Option value="json">JSON</Select.Option>
</Select>
</Form.Item>
<Form.Item name="description" label="Descripción">
<Input />
</Form.Item>
<Form.Item name="is_secret" label="¿Es secreto?" valuePropName="checked">
<Switch />
</Form.Item>
<Form.Item>
<Space>
<Button type="primary" htmlType="submit" loading={createMutation.isPending || updateMutation.isPending}>
{selectedVar ? 'Actualizar' : 'Crear'}
</Button>
<Button onClick={closeModal}>Cancelar</Button>
</Space>
</Form.Item>
</Form>
</Modal>
</div>
);
}
```
**Commit:** `feat(fase4): add GlobalVariables frontend page`
---
## Task 12: Frontend - Flow Templates Page
**Files:**
- Create: `frontend/src/pages/FlowTemplates.tsx`
**Code:**
```tsx
import { useState } from 'react';
import {
Card, Row, Col, Button, Modal, Input, Typography, Tag, Empty, message,
} from 'antd';
import { PlusOutlined, CopyOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { useNavigate } from 'react-router-dom';
import { apiClient } from '../api/client';
const { Title, Text, Paragraph } = Typography;
interface FlowTemplate {
id: string;
name: string;
description: string | null;
category: string;
nodes: any[];
}
export default function FlowTemplates() {
const [isCreateModalOpen, setIsCreateModalOpen] = useState(false);
const [selectedTemplate, setSelectedTemplate] = useState<FlowTemplate | null>(null);
const [flowName, setFlowName] = useState('');
const navigate = useNavigate();
const queryClient = useQueryClient();
const { data: templates, isLoading } = useQuery({
queryKey: ['flow-templates'],
queryFn: () => apiClient.get<FlowTemplate[]>('/api/flow-templates'),
});
const useTemplateMutation = useMutation({
mutationFn: async ({ templateId, name }: { templateId: string; name: string }) => {
const response = await apiClient.post<{ flow_id: string }>(`/api/flow-templates/${templateId}/use?flow_name=${encodeURIComponent(name)}`, {});
return response;
},
onSuccess: (data) => {
message.success('Flujo creado desde plantilla');
setIsCreateModalOpen(false);
setSelectedTemplate(null);
setFlowName('');
navigate(`/flows/${data.flow_id}`);
},
});
const handleUseTemplate = (template: FlowTemplate) => {
setSelectedTemplate(template);
setFlowName(`${template.name} - Copia`);
setIsCreateModalOpen(true);
};
const categoryColors: Record<string, string> = {
general: 'blue',
sales: 'green',
support: 'orange',
marketing: 'purple',
};
return (
<div>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 24 }}>
<Title level={4} style={{ margin: 0 }}>Plantillas de Flujos</Title>
</div>
{isLoading ? (
<Card loading />
) : templates?.length === 0 ? (
<Empty description="No hay plantillas disponibles" />
) : (
<Row gutter={[16, 16]}>
{templates?.map((template) => (
<Col key={template.id} xs={24} sm={12} md={8} lg={6}>
<Card
hoverable
actions={[
<Button type="link" icon={<CopyOutlined />} onClick={() => handleUseTemplate(template)}>
Usar
</Button>,
]}
>
<Card.Meta
title={template.name}
description={
<>
<Tag color={categoryColors[template.category] || 'default'}>{template.category}</Tag>
<Paragraph ellipsis={{ rows: 2 }} style={{ marginTop: 8, marginBottom: 0 }}>
{template.description || 'Sin descripción'}
</Paragraph>
<Text type="secondary" style={{ fontSize: 12 }}>
{template.nodes.length} nodos
</Text>
</>
}
/>
</Card>
</Col>
))}
</Row>
)}
<Modal
title="Crear flujo desde plantilla"
open={isCreateModalOpen}
onCancel={() => { setIsCreateModalOpen(false); setSelectedTemplate(null); }}
onOk={() => {
if (selectedTemplate && flowName) {
useTemplateMutation.mutate({ templateId: selectedTemplate.id, name: flowName });
}
}}
okText="Crear"
confirmLoading={useTemplateMutation.isPending}
>
<div style={{ marginBottom: 16 }}>
<Text strong>Plantilla: </Text>
<Text>{selectedTemplate?.name}</Text>
</div>
<Input
placeholder="Nombre del nuevo flujo"
value={flowName}
onChange={(e) => setFlowName(e.target.value)}
/>
</Modal>
</div>
);
}
```
**Commit:** `feat(fase4): add FlowTemplates frontend page`
---
## Task 13: Update MainLayout with New Routes
**Files:**
- Modify: `frontend/src/layouts/MainLayout.tsx`
**Add imports:**
```tsx
import { GlobalOutlined, AppstoreOutlined } from '@ant-design/icons';
import GlobalVariables from '../pages/GlobalVariables';
import FlowTemplates from '../pages/FlowTemplates';
```
**Add menu items (after Flujos):**
```tsx
{
key: '/templates',
icon: <AppstoreOutlined />,
label: 'Plantillas',
},
{
key: '/variables',
icon: <GlobalOutlined />,
label: 'Variables',
},
```
**Add routes:**
```tsx
<Route path="/templates" element={<FlowTemplates />} />
<Route path="/variables" element={<GlobalVariables />} />
```
**Commit:** `feat(fase4): add Templates and Variables routes to MainLayout`
---
## Task 14: Update Docker Compose for OpenAI
**Files:**
- Modify: `docker-compose.yml`
**Add environment variable to flow-engine service:**
```yaml
flow-engine:
build: ./services/flow-engine
environment:
DATABASE_URL: postgresql://${DB_USER}:${DB_PASSWORD}@postgres:5432/whatsapp_central
REDIS_URL: redis://redis:6379
API_GATEWAY_URL: http://api-gateway:8000
OPENAI_API_KEY: ${OPENAI_API_KEY}
OPENAI_MODEL: ${OPENAI_MODEL:-gpt-3.5-turbo}
```
**Add to .env.example:**
```
OPENAI_API_KEY=sk-your-api-key
OPENAI_MODEL=gpt-3.5-turbo
```
**Commit:** `feat(fase4): add OpenAI configuration to Docker Compose`
---
## Task 15: Final Integration Commit
**Final verification and commit marking Fase 4 complete.**
**Commit:** `feat(fase4): complete Flow Engine Avanzado phase`
---
## Summary
This plan implements:
1. **Node Architecture**: Modular NodeExecutor system with registry
2. **Basic Nodes**: Refactored trigger, message, buttons, wait_input, set_variable, condition
3. **Advanced Nodes**: switch, delay, random, loop, goto
4. **Validation Nodes**: email, phone, number, date, regex, options
5. **Script Nodes**: JavaScript (Python eval), HTTP request
6. **AI Nodes**: AI response (OpenAI), sentiment analysis
7. **Global Variables**: Database model, API, frontend management
8. **Flow Templates**: Reusable templates system
9. **Frontend**: New node components, GlobalVariables page, FlowTemplates page
**Key Features:**
- A/B testing with random node (weighted distribution)
- AI-powered responses using OpenAI
- HTTP requests for external API integration
- Input validation with multiple types
- Reusable flow templates
- Global variables for shared configuration