15 tasks covering: - NodeExecutor modular architecture with registry - Advanced nodes: switch, delay, random, loop, goto - Validation nodes: email, phone, number, date, regex, options - Script nodes: JavaScript eval, HTTP request - AI nodes: OpenAI response, sentiment analysis - Global variables system (model, API, frontend) - Flow templates for reusable flows - Frontend: new node components, variables page, templates page Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
52 KiB
Fase 4: Flow Engine Avanzado - Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA.
Architecture: Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder.
Tech Stack: Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node)
Task 1: Node Executor Architecture
Files:
- Create:
services/flow-engine/app/nodes/__init__.py - Create:
services/flow-engine/app/nodes/base.py - Create:
services/flow-engine/app/nodes/basic.py
Code for base.py:
from abc import ABC, abstractmethod
from typing import Optional, Any
from app.context import FlowContext
class NodeExecutor(ABC):
"""Base class for all node executors"""
@abstractmethod
async def execute(
self,
config: dict,
context: FlowContext,
session: Any
) -> Optional[str]:
"""
Execute the node logic.
Returns: branch name for routing (e.g., "default", "true", "false")
or "wait" to pause execution
"""
pass
class NodeRegistry:
"""Registry of all available node executors"""
_executors: dict[str, NodeExecutor] = {}
@classmethod
def register(cls, node_type: str, executor: NodeExecutor):
cls._executors[node_type] = executor
@classmethod
def get(cls, node_type: str) -> Optional[NodeExecutor]:
return cls._executors.get(node_type)
@classmethod
def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]:
executor = cls.get(node_type)
if executor:
return executor.execute(config, context, session)
return "default"
Code for basic.py:
from typing import Optional, Any
from app.nodes.base import NodeExecutor, NodeRegistry
from app.context import FlowContext
class TriggerExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
return "default"
class MessageExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
await self.send_message(session.conversation_id, text)
return "default"
class ButtonsExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
buttons = config.get("buttons", [])
button_text = "\n".join([f"• {b['label']}" for b in buttons])
await self.send_message(session.conversation_id, f"{text}\n\n{button_text}")
return "wait"
class WaitInputExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "user_input")
context.set(variable, context.message.get("content", ""))
return "default"
class SetVariableExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
var_name = config.get("variable", "")
var_value = context.interpolate(config.get("value", ""))
context.set(var_name, var_value)
return "default"
class ConditionExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
conditions = config.get("conditions", [])
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "equals")
value = cond.get("value", "")
branch = cond.get("branch", "default")
actual = context.get(field) or context.message.get("content", "")
if operator == "equals" and str(actual).lower() == str(value).lower():
return branch
elif operator == "contains" and str(value).lower() in str(actual).lower():
return branch
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
return branch
elif operator == "not_equals" and str(actual).lower() != str(value).lower():
return branch
elif operator == "greater_than":
try:
if float(actual) > float(value):
return branch
except ValueError:
pass
elif operator == "less_than":
try:
if float(actual) < float(value):
return branch
except ValueError:
pass
return "default"
Code for init.py:
from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
Commit: feat(fase4): add NodeExecutor architecture with basic nodes
Task 2: Advanced Nodes - Switch, Delay, Random
Files:
- Create:
services/flow-engine/app/nodes/advanced.py - Modify:
services/flow-engine/app/nodes/__init__.py
Code for advanced.py:
import asyncio
import random as random_module
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class SwitchExecutor(NodeExecutor):
"""Multi-branch switch based on variable value"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
cases = config.get("cases", []) # [{value: "x", branch: "case_x"}, ...]
actual = context.get(variable) or context.message.get("content", "")
actual_str = str(actual).lower().strip()
for case in cases:
case_value = str(case.get("value", "")).lower().strip()
if actual_str == case_value:
return case.get("branch", "default")
return config.get("default_branch", "default")
class DelayExecutor(NodeExecutor):
"""Wait for a specified duration before continuing"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
delay_seconds = config.get("seconds", 0)
delay_type = config.get("type", "fixed") # fixed or random
if delay_type == "random":
min_delay = config.get("min_seconds", 1)
max_delay = config.get("max_seconds", 5)
delay_seconds = random_module.uniform(min_delay, max_delay)
if delay_seconds > 0:
await asyncio.sleep(min(delay_seconds, 30)) # Max 30 seconds
return "default"
class RandomExecutor(NodeExecutor):
"""Random branch selection for A/B testing"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
branches = config.get("branches", []) # [{branch: "a", weight: 50}, {branch: "b", weight: 50}]
if not branches:
return "default"
total_weight = sum(b.get("weight", 1) for b in branches)
rand_value = random_module.uniform(0, total_weight)
cumulative = 0
for branch in branches:
cumulative += branch.get("weight", 1)
if rand_value <= cumulative:
# Track A/B test result
test_name = config.get("test_name", "ab_test")
context.set(f"_ab_{test_name}", branch.get("branch", "default"))
return branch.get("branch", "default")
return branches[-1].get("branch", "default") if branches else "default"
class LoopExecutor(NodeExecutor):
"""Loop a certain number of times or until condition"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
loop_var = config.get("counter_variable", "_loop_counter")
max_iterations = config.get("max_iterations", 10)
current = int(context.get(loop_var) or 0)
if current < max_iterations:
context.set(loop_var, current + 1)
return "continue" # Continue loop
else:
context.set(loop_var, 0) # Reset counter
return "done" # Exit loop
class GoToExecutor(NodeExecutor):
"""Jump to another node or flow"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
target_node_id = config.get("target_node_id")
target_flow_id = config.get("target_flow_id")
if target_flow_id:
# Store for sub-flow execution
context.set("_goto_flow_id", target_flow_id)
return "sub_flow"
if target_node_id:
context.set("_goto_node_id", target_node_id)
return "goto"
return "default"
Update init.py:
from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
SwitchExecutor, DelayExecutor, RandomExecutor,
LoopExecutor, GoToExecutor
)
Commit: feat(fase4): add advanced nodes - switch, delay, random, loop, goto
Task 3: Validation Nodes
Files:
- Create:
services/flow-engine/app/nodes/validation.py - Modify:
services/flow-engine/app/nodes/__init__.py
Code for validation.py:
import re
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class ValidateEmailExecutor(NodeExecutor):
"""Validate email format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, str(value).strip()):
return "valid"
return "invalid"
class ValidatePhoneExecutor(NodeExecutor):
"""Validate phone number format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
# Remove common separators
cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value))
# Check if it's mostly digits and reasonable length
if cleaned.isdigit() and 8 <= len(cleaned) <= 15:
return "valid"
return "invalid"
class ValidateNumberExecutor(NodeExecutor):
"""Validate numeric value within range"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
min_val = config.get("min")
max_val = config.get("max")
try:
num = float(str(value).strip())
if min_val is not None and num < float(min_val):
return "invalid"
if max_val is not None and num > float(max_val):
return "invalid"
return "valid"
except ValueError:
return "invalid"
class ValidateDateExecutor(NodeExecutor):
"""Validate date format"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
date_format = config.get("format", "%Y-%m-%d")
from datetime import datetime
try:
datetime.strptime(str(value).strip(), date_format)
return "valid"
except ValueError:
return "invalid"
class ValidateRegexExecutor(NodeExecutor):
"""Validate against custom regex pattern"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
pattern = config.get("pattern", ".*")
try:
if re.match(pattern, str(value).strip()):
return "valid"
return "invalid"
except re.error:
return "invalid"
class ValidateOptionsExecutor(NodeExecutor):
"""Validate against list of allowed options"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "")
value = context.get(variable) or context.message.get("content", "")
options = config.get("options", [])
case_sensitive = config.get("case_sensitive", False)
value_str = str(value).strip()
if not case_sensitive:
value_str = value_str.lower()
options = [str(o).lower() for o in options]
if value_str in options:
return "valid"
return "invalid"
Update init.py:
from app.nodes.validation import (
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
Commit: feat(fase4): add validation nodes for email, phone, number, date, regex, options
Task 4: JavaScript Node
Files:
- Create:
services/flow-engine/app/nodes/script.py - Modify:
services/flow-engine/app/nodes/__init__.py
Code for script.py:
import json
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class JavaScriptExecutor(NodeExecutor):
"""Execute JavaScript code (using Python eval with restricted globals)"""
ALLOWED_BUILTINS = {
'abs': abs, 'all': all, 'any': any, 'bool': bool,
'dict': dict, 'float': float, 'int': int, 'len': len,
'list': list, 'max': max, 'min': min, 'round': round,
'str': str, 'sum': sum, 'sorted': sorted,
}
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
code = config.get("code", "")
output_variable = config.get("output_variable", "_result")
if not code:
return "default"
# Build safe execution context
exec_globals = {
'__builtins__': self.ALLOWED_BUILTINS,
'context': {
'contact': context.contact,
'conversation': context.conversation,
'message': context.message,
'variables': context.variables.copy(),
},
'variables': context.variables.copy(),
}
try:
# Simple expression evaluation (not full JS, but Python expressions)
# For safety, we use a restricted eval
result = eval(code, exec_globals, {})
if result is not None:
context.set(output_variable, result)
return "success"
except Exception as e:
context.set("_script_error", str(e))
return "error"
class HttpRequestExecutor(NodeExecutor):
"""Make HTTP requests to external APIs"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
url = context.interpolate(config.get("url", ""))
method = config.get("method", "GET").upper()
headers = config.get("headers", {})
body = config.get("body")
output_variable = config.get("output_variable", "_http_response")
timeout = config.get("timeout", 10)
if not url:
return "error"
# Interpolate headers and body
headers = {k: context.interpolate(str(v)) for k, v in headers.items()}
if body and isinstance(body, str):
body = context.interpolate(body)
try:
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=url,
headers=headers,
json=json.loads(body) if body and isinstance(body, str) else body,
timeout=timeout
)
context.set(output_variable, {
"status": response.status_code,
"body": response.text,
"json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None
})
if 200 <= response.status_code < 300:
return "success"
return "error"
except Exception as e:
context.set("_http_error", str(e))
return "error"
Update init.py to add:
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
Commit: feat(fase4): add JavaScript and HTTP request nodes
Task 5: AI Response Node
Files:
- Create:
services/flow-engine/app/nodes/ai.py - Modify:
services/flow-engine/app/config.py - Modify:
services/flow-engine/app/nodes/__init__.py
Add to config.py:
# OpenAI
OPENAI_API_KEY: str = ""
OPENAI_MODEL: str = "gpt-3.5-turbo"
Code for ai.py:
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
from app.config import get_settings
settings = get_settings()
class AIResponseExecutor(NodeExecutor):
"""Generate AI response using OpenAI"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
prompt = context.interpolate(config.get("prompt", ""))
system_prompt = config.get("system_prompt", "Eres un asistente útil.")
output_variable = config.get("output_variable", "_ai_response")
max_tokens = config.get("max_tokens", 500)
temperature = config.get("temperature", 0.7)
if not settings.OPENAI_API_KEY:
context.set("_ai_error", "OpenAI API key not configured")
return "error"
if not prompt:
return "error"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
# Include conversation history if configured
if config.get("include_history", False):
history = context.get("_conversation_history") or []
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}]
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": settings.OPENAI_MODEL,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
},
timeout=30
)
if response.status_code == 200:
data = response.json()
ai_response = data["choices"][0]["message"]["content"]
context.set(output_variable, ai_response)
return "success"
else:
context.set("_ai_error", response.text)
return "error"
except Exception as e:
context.set("_ai_error", str(e))
return "error"
class AISentimentExecutor(NodeExecutor):
"""Analyze sentiment of user message"""
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
import httpx
text = context.get(config.get("variable", "")) or context.message.get("content", "")
output_variable = config.get("output_variable", "_sentiment")
if not settings.OPENAI_API_KEY or not text:
return "neutral"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
},
json={
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"},
{"role": "user", "content": text}
],
"max_tokens": 10,
"temperature": 0
},
timeout=15
)
if response.status_code == 200:
data = response.json()
sentiment = data["choices"][0]["message"]["content"].lower().strip()
context.set(output_variable, sentiment)
if "positive" in sentiment:
return "positive"
elif "negative" in sentiment:
return "negative"
return "neutral"
except Exception:
pass
return "neutral"
Update init.py:
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
Commit: feat(fase4): add AI response and sentiment analysis nodes
Task 6: Global Variables Model
Files:
- Create:
services/api-gateway/app/models/global_variable.py - Modify:
services/api-gateway/app/models/__init__.py
Code for global_variable.py:
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime, Boolean
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class GlobalVariable(Base):
__tablename__ = "global_variables"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
key = Column(String(100), nullable=False, unique=True, index=True)
value = Column(Text, nullable=True)
value_type = Column(String(20), default="string", nullable=False) # string, number, boolean, json
description = Column(String(500), nullable=True)
is_secret = Column(Boolean, default=False, nullable=False) # Hide value in UI
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
Update models/init.py:
from app.models.global_variable import GlobalVariable
Commit: feat(fase4): add GlobalVariable database model
Task 7: Global Variables API
Files:
- Create:
services/api-gateway/app/schemas/global_variable.py - Create:
services/api-gateway/app/routers/global_variables.py - Modify:
services/api-gateway/app/main.py
Code for schemas/global_variable.py:
from pydantic import BaseModel
from typing import Optional
from uuid import UUID
from datetime import datetime
class GlobalVariableCreate(BaseModel):
key: str
value: Optional[str] = None
value_type: str = "string"
description: Optional[str] = None
is_secret: bool = False
class GlobalVariableUpdate(BaseModel):
value: Optional[str] = None
value_type: Optional[str] = None
description: Optional[str] = None
is_secret: Optional[bool] = None
class GlobalVariableResponse(BaseModel):
id: UUID
key: str
value: Optional[str]
value_type: str
description: Optional[str]
is_secret: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
Code for routers/global_variables.py:
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.global_variable import GlobalVariable
from app.schemas.global_variable import (
GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse
)
router = APIRouter(prefix="/api/global-variables", tags=["global-variables"])
def require_admin(current_user: User = Depends(get_current_user)):
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.get("", response_model=List[GlobalVariableResponse])
def list_global_variables(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variables = db.query(GlobalVariable).all()
# Mask secret values
for var in variables:
if var.is_secret:
var.value = "********"
return variables
@router.post("", response_model=GlobalVariableResponse)
def create_global_variable(
request: GlobalVariableCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first()
if existing:
raise HTTPException(status_code=400, detail="Variable key already exists")
variable = GlobalVariable(**request.model_dump())
db.add(variable)
db.commit()
db.refresh(variable)
return variable
@router.put("/{variable_id}", response_model=GlobalVariableResponse)
def update_global_variable(
variable_id: UUID,
request: GlobalVariableUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
for key, value in request.model_dump(exclude_unset=True).items():
setattr(variable, key, value)
db.commit()
db.refresh(variable)
return variable
@router.delete("/{variable_id}")
def delete_global_variable(
variable_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(require_admin),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
db.delete(variable)
db.commit()
return {"success": True}
@router.get("/by-key/{key}")
def get_variable_by_key(
key: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first()
if not variable:
raise HTTPException(status_code=404, detail="Variable not found")
return {"key": variable.key, "value": variable.value if not variable.is_secret else None}
Add to main.py:
from app.routers import global_variables
app.include_router(global_variables.router)
Commit: feat(fase4): add GlobalVariable API routes
Task 8: Flow Templates Model and API
Files:
- Create:
services/api-gateway/app/models/flow_template.py - Create:
services/api-gateway/app/schemas/flow_template.py - Create:
services/api-gateway/app/routers/flow_templates.py - Modify:
services/api-gateway/app/models/__init__.py - Modify:
services/api-gateway/app/main.py
Code for flow_template.py:
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class FlowTemplate(Base):
__tablename__ = "flow_templates"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
category = Column(String(50), default="general", nullable=False)
nodes = Column(JSONB, default=list)
edges = Column(JSONB, default=list)
variables = Column(JSONB, default=dict)
preview_image = Column(String(500), nullable=True)
is_public = Column(Boolean, default=True, nullable=False)
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
Add missing imports to flow_template.py:
from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey
Code for schemas/flow_template.py:
from pydantic import BaseModel
from typing import Optional, List
from uuid import UUID
from datetime import datetime
class FlowTemplateCreate(BaseModel):
name: str
description: Optional[str] = None
category: str = "general"
nodes: List[dict] = []
edges: List[dict] = []
variables: dict = {}
is_public: bool = True
class FlowTemplateResponse(BaseModel):
id: UUID
name: str
description: Optional[str]
category: str
nodes: List[dict]
edges: List[dict]
variables: dict
preview_image: Optional[str]
is_public: bool
created_by: UUID
created_at: datetime
class Config:
from_attributes = True
Code for routers/flow_templates.py:
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.flow_template import FlowTemplate
from app.models.flow import Flow
from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse
router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"])
@router.get("", response_model=List[FlowTemplateResponse])
def list_templates(
category: str = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True)
if category:
query = query.filter(FlowTemplate.category == category)
return query.all()
@router.post("", response_model=FlowTemplateResponse)
def create_template(
request: FlowTemplateCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
template = FlowTemplate(
**request.model_dump(),
created_by=current_user.id,
)
db.add(template)
db.commit()
db.refresh(template)
return template
@router.post("/{template_id}/use")
def use_template(
template_id: UUID,
flow_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create a new flow from template"""
template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
from app.models.flow import Flow, TriggerType
flow = Flow(
name=flow_name,
description=f"Created from template: {template.name}",
trigger_type=TriggerType.KEYWORD,
nodes=template.nodes,
edges=template.edges,
variables=template.variables,
is_active=False,
)
db.add(flow)
db.commit()
db.refresh(flow)
return {"success": True, "flow_id": str(flow.id)}
@router.delete("/{template_id}")
def delete_template(
template_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
if not template:
raise HTTPException(status_code=404, detail="Template not found")
if template.created_by != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
db.delete(template)
db.commit()
return {"success": True}
Update models/init.py:
from app.models.flow_template import FlowTemplate
Add to main.py:
from app.routers import flow_templates
app.include_router(flow_templates.router)
Commit: feat(fase4): add FlowTemplate model and API
Task 9: Refactor Engine to Use NodeRegistry
Files:
- Modify:
services/flow-engine/app/engine.py
Replace _execute_node method and add initialization:
from typing import Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
from app.nodes import NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
)
from app.nodes.validation import (
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
settings = get_settings()
class FlowEngine:
"""Executes flow nodes based on incoming messages"""
def __init__(self, db_session):
self.db = db_session
self._register_nodes()
def _register_nodes(self):
"""Register all available node executors"""
# Basic nodes
NodeRegistry.register("trigger", TriggerExecutor())
NodeRegistry.register("message", MessageExecutor(self._send_message))
NodeRegistry.register("buttons", ButtonsExecutor(self._send_message))
NodeRegistry.register("wait_input", WaitInputExecutor())
NodeRegistry.register("set_variable", SetVariableExecutor())
NodeRegistry.register("condition", ConditionExecutor())
# Advanced nodes
NodeRegistry.register("switch", SwitchExecutor())
NodeRegistry.register("delay", DelayExecutor())
NodeRegistry.register("random", RandomExecutor())
NodeRegistry.register("loop", LoopExecutor())
NodeRegistry.register("goto", GoToExecutor())
# Validation nodes
NodeRegistry.register("validate_email", ValidateEmailExecutor())
NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
NodeRegistry.register("validate_number", ValidateNumberExecutor())
NodeRegistry.register("validate_date", ValidateDateExecutor())
NodeRegistry.register("validate_regex", ValidateRegexExecutor())
NodeRegistry.register("validate_options", ValidateOptionsExecutor())
# Script nodes
NodeRegistry.register("javascript", JavaScriptExecutor())
NodeRegistry.register("http_request", HttpRequestExecutor())
# AI nodes
NodeRegistry.register("ai_response", AIResponseExecutor())
NodeRegistry.register("ai_sentiment", AISentimentExecutor())
async def _execute_node(
self,
node_type: str,
config: dict,
context: FlowContext,
session
) -> Optional[str]:
"""Execute a single node using NodeRegistry"""
executor = NodeRegistry.get(node_type)
if executor:
result = await executor.execute(config, context, session)
# Handle special results
if result == "goto" and context.get("_goto_node_id"):
session.current_node_id = context.get("_goto_node_id")
context.set("_goto_node_id", None)
return result
return "default"
# ... rest of the class remains the same (process_message, _find_matching_flow, etc.)
Commit: feat(fase4): refactor FlowEngine to use NodeRegistry
Task 10: Frontend - Advanced Node Components
Files:
- Modify:
frontend/src/pages/FlowBuilder.tsx
Add new node components and update nodeTypes:
// Add these new node components after existing ones
const SwitchNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #13c2c2', borderRadius: 8, background: '#e6fffb' }}>
<strong>🔀 Switch</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'variable'}</div>
</div>
);
const DelayNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #595959', borderRadius: 8, background: '#fafafa' }}>
<strong>⏱️ Delay</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.seconds || 0}s</div>
</div>
);
const RandomNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #eb2f96', borderRadius: 8, background: '#fff0f6' }}>
<strong>🎲 Random</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>A/B Test</div>
</div>
);
const LoopNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #fa8c16', borderRadius: 8, background: '#fff7e6' }}>
<strong>🔄 Loop</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>max: {data.config?.max_iterations || 10}</div>
</div>
);
const ValidateNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
<strong>✅ Validate</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.type || 'email'}</div>
</div>
);
const HttpNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #2f54eb', borderRadius: 8, background: '#f0f5ff' }}>
<strong>🌐 HTTP</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.method || 'GET'}</div>
</div>
);
const AINode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
<strong>🤖 AI Response</strong>
</div>
);
const SetVariableNode = ({ data }: { data: any }) => (
<div style={{ padding: 10, border: '2px solid #8c8c8c', borderRadius: 8, background: '#f5f5f5' }}>
<strong>📝 Variable</strong>
<div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'var'}</div>
</div>
);
// Update nodeTypes
const nodeTypes: NodeTypes = {
trigger: TriggerNode,
message: MessageNode,
condition: ConditionNode,
wait_input: WaitInputNode,
switch: SwitchNode,
delay: DelayNode,
random: RandomNode,
loop: LoopNode,
validate: ValidateNode,
http_request: HttpNode,
ai_response: AINode,
set_variable: SetVariableNode,
};
Update the toolbar to add new node buttons:
<Space wrap>
<Button onClick={() => addNode('trigger')}>+ Trigger</Button>
<Button onClick={() => addNode('message')}>+ Mensaje</Button>
<Button onClick={() => addNode('condition')}>+ Condición</Button>
<Button onClick={() => addNode('wait_input')}>+ Esperar</Button>
<Button onClick={() => addNode('switch')}>+ Switch</Button>
<Button onClick={() => addNode('delay')}>+ Delay</Button>
<Button onClick={() => addNode('random')}>+ Random</Button>
<Button onClick={() => addNode('loop')}>+ Loop</Button>
<Button onClick={() => addNode('validate')}>+ Validar</Button>
<Button onClick={() => addNode('http_request')}>+ HTTP</Button>
<Button onClick={() => addNode('ai_response')}>+ AI</Button>
<Button onClick={() => addNode('set_variable')}>+ Variable</Button>
<Button type="primary" icon={<SaveOutlined />} onClick={() => saveMutation.mutate()} loading={saveMutation.isPending} style={{ background: '#25D366' }}>
Guardar
</Button>
</Space>
Commit: feat(fase4): add advanced node components to FlowBuilder
Task 11: Frontend - Global Variables Page
Files:
- Create:
frontend/src/pages/GlobalVariables.tsx
Code:
import { useState } from 'react';
import {
Card, Table, Button, Modal, Form, Input, Select, Switch, Space, Tag, Popconfirm, message, Typography,
} from 'antd';
import { PlusOutlined, EditOutlined, DeleteOutlined, LockOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';
const { Title } = Typography;
interface GlobalVariable {
id: string;
key: string;
value: string | null;
value_type: string;
description: string | null;
is_secret: boolean;
}
export default function GlobalVariables() {
const [isModalOpen, setIsModalOpen] = useState(false);
const [selectedVar, setSelectedVar] = useState<GlobalVariable | null>(null);
const [form] = Form.useForm();
const queryClient = useQueryClient();
const { data: variables, isLoading } = useQuery({
queryKey: ['global-variables'],
queryFn: () => apiClient.get<GlobalVariable[]>('/api/global-variables'),
});
const createMutation = useMutation({
mutationFn: (data: Partial<GlobalVariable>) => apiClient.post('/api/global-variables', data),
onSuccess: () => {
message.success('Variable creada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
closeModal();
},
});
const updateMutation = useMutation({
mutationFn: ({ id, data }: { id: string; data: Partial<GlobalVariable> }) =>
apiClient.put(`/api/global-variables/${id}`, data),
onSuccess: () => {
message.success('Variable actualizada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
closeModal();
},
});
const deleteMutation = useMutation({
mutationFn: (id: string) => apiClient.delete(`/api/global-variables/${id}`),
onSuccess: () => {
message.success('Variable eliminada');
queryClient.invalidateQueries({ queryKey: ['global-variables'] });
},
});
const closeModal = () => {
setIsModalOpen(false);
setSelectedVar(null);
form.resetFields();
};
const handleSubmit = (values: Partial<GlobalVariable>) => {
if (selectedVar) {
updateMutation.mutate({ id: selectedVar.id, data: values });
} else {
createMutation.mutate(values);
}
};
const handleEdit = (variable: GlobalVariable) => {
setSelectedVar(variable);
form.setFieldsValue(variable);
setIsModalOpen(true);
};
const columns = [
{
title: 'Clave',
dataIndex: 'key',
key: 'key',
render: (key: string, record: GlobalVariable) => (
<Space>
<code>{'{{global.' + key + '}}'}</code>
{record.is_secret && <LockOutlined style={{ color: '#faad14' }} />}
</Space>
),
},
{
title: 'Valor',
dataIndex: 'value',
key: 'value',
render: (value: string, record: GlobalVariable) =>
record.is_secret ? <Tag>Secreto</Tag> : value,
},
{
title: 'Tipo',
dataIndex: 'value_type',
key: 'value_type',
render: (type: string) => <Tag>{type}</Tag>,
},
{
title: 'Descripción',
dataIndex: 'description',
key: 'description',
},
{
title: 'Acciones',
key: 'actions',
render: (_: unknown, record: GlobalVariable) => (
<Space>
<Button size="small" icon={<EditOutlined />} onClick={() => handleEdit(record)} />
<Popconfirm title="¿Eliminar variable?" onConfirm={() => deleteMutation.mutate(record.id)}>
<Button size="small" danger icon={<DeleteOutlined />} />
</Popconfirm>
</Space>
),
},
];
return (
<div>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
<Title level={4} style={{ margin: 0 }}>Variables Globales</Title>
<Button type="primary" icon={<PlusOutlined />} onClick={() => setIsModalOpen(true)}>
Nueva Variable
</Button>
</div>
<Card>
<Table dataSource={variables} columns={columns} rowKey="id" loading={isLoading} pagination={false} />
</Card>
<Modal
title={selectedVar ? 'Editar Variable' : 'Nueva Variable'}
open={isModalOpen}
onCancel={closeModal}
footer={null}
>
<Form form={form} layout="vertical" onFinish={handleSubmit}>
<Form.Item name="key" label="Clave" rules={[{ required: true }]}>
<Input placeholder="api_key" disabled={!!selectedVar} />
</Form.Item>
<Form.Item name="value" label="Valor">
<Input.TextArea rows={2} />
</Form.Item>
<Form.Item name="value_type" label="Tipo" initialValue="string">
<Select>
<Select.Option value="string">String</Select.Option>
<Select.Option value="number">Number</Select.Option>
<Select.Option value="boolean">Boolean</Select.Option>
<Select.Option value="json">JSON</Select.Option>
</Select>
</Form.Item>
<Form.Item name="description" label="Descripción">
<Input />
</Form.Item>
<Form.Item name="is_secret" label="¿Es secreto?" valuePropName="checked">
<Switch />
</Form.Item>
<Form.Item>
<Space>
<Button type="primary" htmlType="submit" loading={createMutation.isPending || updateMutation.isPending}>
{selectedVar ? 'Actualizar' : 'Crear'}
</Button>
<Button onClick={closeModal}>Cancelar</Button>
</Space>
</Form.Item>
</Form>
</Modal>
</div>
);
}
Commit: feat(fase4): add GlobalVariables frontend page
Task 12: Frontend - Flow Templates Page
Files:
- Create:
frontend/src/pages/FlowTemplates.tsx
Code:
import { useState } from 'react';
import {
Card, Row, Col, Button, Modal, Input, Typography, Tag, Empty, message,
} from 'antd';
import { PlusOutlined, CopyOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { useNavigate } from 'react-router-dom';
import { apiClient } from '../api/client';
const { Title, Text, Paragraph } = Typography;
interface FlowTemplate {
id: string;
name: string;
description: string | null;
category: string;
nodes: any[];
}
export default function FlowTemplates() {
const [isCreateModalOpen, setIsCreateModalOpen] = useState(false);
const [selectedTemplate, setSelectedTemplate] = useState<FlowTemplate | null>(null);
const [flowName, setFlowName] = useState('');
const navigate = useNavigate();
const queryClient = useQueryClient();
const { data: templates, isLoading } = useQuery({
queryKey: ['flow-templates'],
queryFn: () => apiClient.get<FlowTemplate[]>('/api/flow-templates'),
});
const useTemplateMutation = useMutation({
mutationFn: async ({ templateId, name }: { templateId: string; name: string }) => {
const response = await apiClient.post<{ flow_id: string }>(`/api/flow-templates/${templateId}/use?flow_name=${encodeURIComponent(name)}`, {});
return response;
},
onSuccess: (data) => {
message.success('Flujo creado desde plantilla');
setIsCreateModalOpen(false);
setSelectedTemplate(null);
setFlowName('');
navigate(`/flows/${data.flow_id}`);
},
});
const handleUseTemplate = (template: FlowTemplate) => {
setSelectedTemplate(template);
setFlowName(`${template.name} - Copia`);
setIsCreateModalOpen(true);
};
const categoryColors: Record<string, string> = {
general: 'blue',
sales: 'green',
support: 'orange',
marketing: 'purple',
};
return (
<div>
<div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 24 }}>
<Title level={4} style={{ margin: 0 }}>Plantillas de Flujos</Title>
</div>
{isLoading ? (
<Card loading />
) : templates?.length === 0 ? (
<Empty description="No hay plantillas disponibles" />
) : (
<Row gutter={[16, 16]}>
{templates?.map((template) => (
<Col key={template.id} xs={24} sm={12} md={8} lg={6}>
<Card
hoverable
actions={[
<Button type="link" icon={<CopyOutlined />} onClick={() => handleUseTemplate(template)}>
Usar
</Button>,
]}
>
<Card.Meta
title={template.name}
description={
<>
<Tag color={categoryColors[template.category] || 'default'}>{template.category}</Tag>
<Paragraph ellipsis={{ rows: 2 }} style={{ marginTop: 8, marginBottom: 0 }}>
{template.description || 'Sin descripción'}
</Paragraph>
<Text type="secondary" style={{ fontSize: 12 }}>
{template.nodes.length} nodos
</Text>
</>
}
/>
</Card>
</Col>
))}
</Row>
)}
<Modal
title="Crear flujo desde plantilla"
open={isCreateModalOpen}
onCancel={() => { setIsCreateModalOpen(false); setSelectedTemplate(null); }}
onOk={() => {
if (selectedTemplate && flowName) {
useTemplateMutation.mutate({ templateId: selectedTemplate.id, name: flowName });
}
}}
okText="Crear"
confirmLoading={useTemplateMutation.isPending}
>
<div style={{ marginBottom: 16 }}>
<Text strong>Plantilla: </Text>
<Text>{selectedTemplate?.name}</Text>
</div>
<Input
placeholder="Nombre del nuevo flujo"
value={flowName}
onChange={(e) => setFlowName(e.target.value)}
/>
</Modal>
</div>
);
}
Commit: feat(fase4): add FlowTemplates frontend page
Task 13: Update MainLayout with New Routes
Files:
- Modify:
frontend/src/layouts/MainLayout.tsx
Add imports:
import { GlobalOutlined, AppstoreOutlined } from '@ant-design/icons';
import GlobalVariables from '../pages/GlobalVariables';
import FlowTemplates from '../pages/FlowTemplates';
Add menu items (after Flujos):
{
key: '/templates',
icon: <AppstoreOutlined />,
label: 'Plantillas',
},
{
key: '/variables',
icon: <GlobalOutlined />,
label: 'Variables',
},
Add routes:
<Route path="/templates" element={<FlowTemplates />} />
<Route path="/variables" element={<GlobalVariables />} />
Commit: feat(fase4): add Templates and Variables routes to MainLayout
Task 14: Update Docker Compose for OpenAI
Files:
- Modify:
docker-compose.yml
Add environment variable to flow-engine service:
flow-engine:
build: ./services/flow-engine
environment:
DATABASE_URL: postgresql://${DB_USER}:${DB_PASSWORD}@postgres:5432/whatsapp_central
REDIS_URL: redis://redis:6379
API_GATEWAY_URL: http://api-gateway:8000
OPENAI_API_KEY: ${OPENAI_API_KEY}
OPENAI_MODEL: ${OPENAI_MODEL:-gpt-3.5-turbo}
Add to .env.example:
OPENAI_API_KEY=sk-your-api-key
OPENAI_MODEL=gpt-3.5-turbo
Commit: feat(fase4): add OpenAI configuration to Docker Compose
Task 15: Final Integration Commit
Final verification and commit marking Fase 4 complete.
Commit: feat(fase4): complete Flow Engine Avanzado phase
Summary
This plan implements:
- Node Architecture: Modular NodeExecutor system with registry
- Basic Nodes: Refactored trigger, message, buttons, wait_input, set_variable, condition
- Advanced Nodes: switch, delay, random, loop, goto
- Validation Nodes: email, phone, number, date, regex, options
- Script Nodes: JavaScript (Python eval), HTTP request
- AI Nodes: AI response (OpenAI), sentiment analysis
- Global Variables: Database model, API, frontend management
- Flow Templates: Reusable templates system
- Frontend: New node components, GlobalVariables page, FlowTemplates page
Key Features:
- A/B testing with random node (weighted distribution)
- AI-powered responses using OpenAI
- HTTP requests for external API integration
- Input validation with multiple types
- Reusable flow templates
- Global variables for shared configuration