Files
WhatsAppCentralizado/docs/plans/2026-01-29-fase-4-flow-engine-avanzado.md
Claude AI e3b14be00f docs: add Fase 4 Flow Engine Avanzado implementation plan
15 tasks covering:
- NodeExecutor modular architecture with registry
- Advanced nodes: switch, delay, random, loop, goto
- Validation nodes: email, phone, number, date, regex, options
- Script nodes: JavaScript eval, HTTP request
- AI nodes: OpenAI response, sentiment analysis
- Global variables system (model, API, frontend)
- Flow templates for reusable flows
- Frontend: new node components, variables page, templates page

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:08:15 +00:00

52 KiB

Fase 4: Flow Engine Avanzado - Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA.

Architecture: Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder.

Tech Stack: Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node)


Task 1: Node Executor Architecture

Files:

  • Create: services/flow-engine/app/nodes/__init__.py
  • Create: services/flow-engine/app/nodes/base.py
  • Create: services/flow-engine/app/nodes/basic.py

Code for base.py:

from abc import ABC, abstractmethod
from typing import Optional, Any
from app.context import FlowContext


class NodeExecutor(ABC):
    """Base class for all node executors"""

    @abstractmethod
    async def execute(
        self,
        config: dict,
        context: FlowContext,
        session: Any
    ) -> Optional[str]:
        """
        Execute the node logic.
        Returns: branch name for routing (e.g., "default", "true", "false")
                 or "wait" to pause execution
        """
        pass


class NodeRegistry:
    """Registry of all available node executors"""
    _executors: dict[str, NodeExecutor] = {}

    @classmethod
    def register(cls, node_type: str, executor: NodeExecutor):
        cls._executors[node_type] = executor

    @classmethod
    def get(cls, node_type: str) -> Optional[NodeExecutor]:
        return cls._executors.get(node_type)

    @classmethod
    def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        executor = cls.get(node_type)
        if executor:
            return executor.execute(config, context, session)
        return "default"

Code for basic.py:

from typing import Optional, Any
from app.nodes.base import NodeExecutor, NodeRegistry
from app.context import FlowContext


class TriggerExecutor(NodeExecutor):
    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        return "default"


class MessageExecutor(NodeExecutor):
    def __init__(self, send_message_fn):
        self.send_message = send_message_fn

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        text = context.interpolate(config.get("text", ""))
        await self.send_message(session.conversation_id, text)
        return "default"


class ButtonsExecutor(NodeExecutor):
    def __init__(self, send_message_fn):
        self.send_message = send_message_fn

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        text = context.interpolate(config.get("text", ""))
        buttons = config.get("buttons", [])
        button_text = "\n".join([f"• {b['label']}" for b in buttons])
        await self.send_message(session.conversation_id, f"{text}\n\n{button_text}")
        return "wait"


class WaitInputExecutor(NodeExecutor):
    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "user_input")
        context.set(variable, context.message.get("content", ""))
        return "default"


class SetVariableExecutor(NodeExecutor):
    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        var_name = config.get("variable", "")
        var_value = context.interpolate(config.get("value", ""))
        context.set(var_name, var_value)
        return "default"


class ConditionExecutor(NodeExecutor):
    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        conditions = config.get("conditions", [])

        for cond in conditions:
            field = cond.get("field", "")
            operator = cond.get("operator", "equals")
            value = cond.get("value", "")
            branch = cond.get("branch", "default")

            actual = context.get(field) or context.message.get("content", "")

            if operator == "equals" and str(actual).lower() == str(value).lower():
                return branch
            elif operator == "contains" and str(value).lower() in str(actual).lower():
                return branch
            elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
                return branch
            elif operator == "not_equals" and str(actual).lower() != str(value).lower():
                return branch
            elif operator == "greater_than":
                try:
                    if float(actual) > float(value):
                        return branch
                except ValueError:
                    pass
            elif operator == "less_than":
                try:
                    if float(actual) < float(value):
                        return branch
                except ValueError:
                    pass

        return "default"

Code for init.py:

from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
    TriggerExecutor, MessageExecutor, ButtonsExecutor,
    WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)

Commit: feat(fase4): add NodeExecutor architecture with basic nodes


Task 2: Advanced Nodes - Switch, Delay, Random

Files:

  • Create: services/flow-engine/app/nodes/advanced.py
  • Modify: services/flow-engine/app/nodes/__init__.py

Code for advanced.py:

import asyncio
import random as random_module
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext


class SwitchExecutor(NodeExecutor):
    """Multi-branch switch based on variable value"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        cases = config.get("cases", [])  # [{value: "x", branch: "case_x"}, ...]

        actual = context.get(variable) or context.message.get("content", "")
        actual_str = str(actual).lower().strip()

        for case in cases:
            case_value = str(case.get("value", "")).lower().strip()
            if actual_str == case_value:
                return case.get("branch", "default")

        return config.get("default_branch", "default")


class DelayExecutor(NodeExecutor):
    """Wait for a specified duration before continuing"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        delay_seconds = config.get("seconds", 0)
        delay_type = config.get("type", "fixed")  # fixed or random

        if delay_type == "random":
            min_delay = config.get("min_seconds", 1)
            max_delay = config.get("max_seconds", 5)
            delay_seconds = random_module.uniform(min_delay, max_delay)

        if delay_seconds > 0:
            await asyncio.sleep(min(delay_seconds, 30))  # Max 30 seconds

        return "default"


class RandomExecutor(NodeExecutor):
    """Random branch selection for A/B testing"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        branches = config.get("branches", [])  # [{branch: "a", weight: 50}, {branch: "b", weight: 50}]

        if not branches:
            return "default"

        total_weight = sum(b.get("weight", 1) for b in branches)
        rand_value = random_module.uniform(0, total_weight)

        cumulative = 0
        for branch in branches:
            cumulative += branch.get("weight", 1)
            if rand_value <= cumulative:
                # Track A/B test result
                test_name = config.get("test_name", "ab_test")
                context.set(f"_ab_{test_name}", branch.get("branch", "default"))
                return branch.get("branch", "default")

        return branches[-1].get("branch", "default") if branches else "default"


class LoopExecutor(NodeExecutor):
    """Loop a certain number of times or until condition"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        loop_var = config.get("counter_variable", "_loop_counter")
        max_iterations = config.get("max_iterations", 10)

        current = int(context.get(loop_var) or 0)

        if current < max_iterations:
            context.set(loop_var, current + 1)
            return "continue"  # Continue loop
        else:
            context.set(loop_var, 0)  # Reset counter
            return "done"  # Exit loop


class GoToExecutor(NodeExecutor):
    """Jump to another node or flow"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        target_node_id = config.get("target_node_id")
        target_flow_id = config.get("target_flow_id")

        if target_flow_id:
            # Store for sub-flow execution
            context.set("_goto_flow_id", target_flow_id)
            return "sub_flow"

        if target_node_id:
            context.set("_goto_node_id", target_node_id)
            return "goto"

        return "default"

Update init.py:

from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
    TriggerExecutor, MessageExecutor, ButtonsExecutor,
    WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
    SwitchExecutor, DelayExecutor, RandomExecutor,
    LoopExecutor, GoToExecutor
)

Commit: feat(fase4): add advanced nodes - switch, delay, random, loop, goto


Task 3: Validation Nodes

Files:

  • Create: services/flow-engine/app/nodes/validation.py
  • Modify: services/flow-engine/app/nodes/__init__.py

Code for validation.py:

import re
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext


class ValidateEmailExecutor(NodeExecutor):
    """Validate email format"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")

        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

        if re.match(email_pattern, str(value).strip()):
            return "valid"
        return "invalid"


class ValidatePhoneExecutor(NodeExecutor):
    """Validate phone number format"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")

        # Remove common separators
        cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value))

        # Check if it's mostly digits and reasonable length
        if cleaned.isdigit() and 8 <= len(cleaned) <= 15:
            return "valid"
        return "invalid"


class ValidateNumberExecutor(NodeExecutor):
    """Validate numeric value within range"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")

        min_val = config.get("min")
        max_val = config.get("max")

        try:
            num = float(str(value).strip())

            if min_val is not None and num < float(min_val):
                return "invalid"
            if max_val is not None and num > float(max_val):
                return "invalid"

            return "valid"
        except ValueError:
            return "invalid"


class ValidateDateExecutor(NodeExecutor):
    """Validate date format"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")
        date_format = config.get("format", "%Y-%m-%d")

        from datetime import datetime

        try:
            datetime.strptime(str(value).strip(), date_format)
            return "valid"
        except ValueError:
            return "invalid"


class ValidateRegexExecutor(NodeExecutor):
    """Validate against custom regex pattern"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")
        pattern = config.get("pattern", ".*")

        try:
            if re.match(pattern, str(value).strip()):
                return "valid"
            return "invalid"
        except re.error:
            return "invalid"


class ValidateOptionsExecutor(NodeExecutor):
    """Validate against list of allowed options"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        variable = config.get("variable", "")
        value = context.get(variable) or context.message.get("content", "")
        options = config.get("options", [])
        case_sensitive = config.get("case_sensitive", False)

        value_str = str(value).strip()
        if not case_sensitive:
            value_str = value_str.lower()
            options = [str(o).lower() for o in options]

        if value_str in options:
            return "valid"
        return "invalid"

Update init.py:

from app.nodes.validation import (
    ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
    ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)

Commit: feat(fase4): add validation nodes for email, phone, number, date, regex, options


Task 4: JavaScript Node

Files:

  • Create: services/flow-engine/app/nodes/script.py
  • Modify: services/flow-engine/app/nodes/__init__.py

Code for script.py:

import json
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext


class JavaScriptExecutor(NodeExecutor):
    """Execute JavaScript code (using Python eval with restricted globals)"""

    ALLOWED_BUILTINS = {
        'abs': abs, 'all': all, 'any': any, 'bool': bool,
        'dict': dict, 'float': float, 'int': int, 'len': len,
        'list': list, 'max': max, 'min': min, 'round': round,
        'str': str, 'sum': sum, 'sorted': sorted,
    }

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        code = config.get("code", "")
        output_variable = config.get("output_variable", "_result")

        if not code:
            return "default"

        # Build safe execution context
        exec_globals = {
            '__builtins__': self.ALLOWED_BUILTINS,
            'context': {
                'contact': context.contact,
                'conversation': context.conversation,
                'message': context.message,
                'variables': context.variables.copy(),
            },
            'variables': context.variables.copy(),
        }

        try:
            # Simple expression evaluation (not full JS, but Python expressions)
            # For safety, we use a restricted eval
            result = eval(code, exec_globals, {})

            if result is not None:
                context.set(output_variable, result)

            return "success"
        except Exception as e:
            context.set("_script_error", str(e))
            return "error"


class HttpRequestExecutor(NodeExecutor):
    """Make HTTP requests to external APIs"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        import httpx

        url = context.interpolate(config.get("url", ""))
        method = config.get("method", "GET").upper()
        headers = config.get("headers", {})
        body = config.get("body")
        output_variable = config.get("output_variable", "_http_response")
        timeout = config.get("timeout", 10)

        if not url:
            return "error"

        # Interpolate headers and body
        headers = {k: context.interpolate(str(v)) for k, v in headers.items()}
        if body and isinstance(body, str):
            body = context.interpolate(body)

        try:
            async with httpx.AsyncClient() as client:
                response = await client.request(
                    method=method,
                    url=url,
                    headers=headers,
                    json=json.loads(body) if body and isinstance(body, str) else body,
                    timeout=timeout
                )

                context.set(output_variable, {
                    "status": response.status_code,
                    "body": response.text,
                    "json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None
                })

                if 200 <= response.status_code < 300:
                    return "success"
                return "error"

        except Exception as e:
            context.set("_http_error", str(e))
            return "error"

Update init.py to add:

from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor

Commit: feat(fase4): add JavaScript and HTTP request nodes


Task 5: AI Response Node

Files:

  • Create: services/flow-engine/app/nodes/ai.py
  • Modify: services/flow-engine/app/config.py
  • Modify: services/flow-engine/app/nodes/__init__.py

Add to config.py:

    # OpenAI
    OPENAI_API_KEY: str = ""
    OPENAI_MODEL: str = "gpt-3.5-turbo"

Code for ai.py:

from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
from app.config import get_settings

settings = get_settings()


class AIResponseExecutor(NodeExecutor):
    """Generate AI response using OpenAI"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        import httpx

        prompt = context.interpolate(config.get("prompt", ""))
        system_prompt = config.get("system_prompt", "Eres un asistente útil.")
        output_variable = config.get("output_variable", "_ai_response")
        max_tokens = config.get("max_tokens", 500)
        temperature = config.get("temperature", 0.7)

        if not settings.OPENAI_API_KEY:
            context.set("_ai_error", "OpenAI API key not configured")
            return "error"

        if not prompt:
            return "error"

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ]

        # Include conversation history if configured
        if config.get("include_history", False):
            history = context.get("_conversation_history") or []
            messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}]

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    "https://api.openai.com/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {settings.OPENAI_API_KEY}",
                        "Content-Type": "application/json"
                    },
                    json={
                        "model": settings.OPENAI_MODEL,
                        "messages": messages,
                        "max_tokens": max_tokens,
                        "temperature": temperature
                    },
                    timeout=30
                )

                if response.status_code == 200:
                    data = response.json()
                    ai_response = data["choices"][0]["message"]["content"]
                    context.set(output_variable, ai_response)
                    return "success"
                else:
                    context.set("_ai_error", response.text)
                    return "error"

        except Exception as e:
            context.set("_ai_error", str(e))
            return "error"


class AISentimentExecutor(NodeExecutor):
    """Analyze sentiment of user message"""

    async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
        import httpx

        text = context.get(config.get("variable", "")) or context.message.get("content", "")
        output_variable = config.get("output_variable", "_sentiment")

        if not settings.OPENAI_API_KEY or not text:
            return "neutral"

        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    "https://api.openai.com/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {settings.OPENAI_API_KEY}",
                        "Content-Type": "application/json"
                    },
                    json={
                        "model": "gpt-3.5-turbo",
                        "messages": [
                            {"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"},
                            {"role": "user", "content": text}
                        ],
                        "max_tokens": 10,
                        "temperature": 0
                    },
                    timeout=15
                )

                if response.status_code == 200:
                    data = response.json()
                    sentiment = data["choices"][0]["message"]["content"].lower().strip()
                    context.set(output_variable, sentiment)

                    if "positive" in sentiment:
                        return "positive"
                    elif "negative" in sentiment:
                        return "negative"
                    return "neutral"

        except Exception:
            pass

        return "neutral"

Update init.py:

from app.nodes.ai import AIResponseExecutor, AISentimentExecutor

Commit: feat(fase4): add AI response and sentiment analysis nodes


Task 6: Global Variables Model

Files:

  • Create: services/api-gateway/app/models/global_variable.py
  • Modify: services/api-gateway/app/models/__init__.py

Code for global_variable.py:

import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime, Boolean
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base


class GlobalVariable(Base):
    __tablename__ = "global_variables"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    key = Column(String(100), nullable=False, unique=True, index=True)
    value = Column(Text, nullable=True)
    value_type = Column(String(20), default="string", nullable=False)  # string, number, boolean, json
    description = Column(String(500), nullable=True)
    is_secret = Column(Boolean, default=False, nullable=False)  # Hide value in UI
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)

Update models/init.py:

from app.models.global_variable import GlobalVariable

Commit: feat(fase4): add GlobalVariable database model


Task 7: Global Variables API

Files:

  • Create: services/api-gateway/app/schemas/global_variable.py
  • Create: services/api-gateway/app/routers/global_variables.py
  • Modify: services/api-gateway/app/main.py

Code for schemas/global_variable.py:

from pydantic import BaseModel
from typing import Optional
from uuid import UUID
from datetime import datetime


class GlobalVariableCreate(BaseModel):
    key: str
    value: Optional[str] = None
    value_type: str = "string"
    description: Optional[str] = None
    is_secret: bool = False


class GlobalVariableUpdate(BaseModel):
    value: Optional[str] = None
    value_type: Optional[str] = None
    description: Optional[str] = None
    is_secret: Optional[bool] = None


class GlobalVariableResponse(BaseModel):
    id: UUID
    key: str
    value: Optional[str]
    value_type: str
    description: Optional[str]
    is_secret: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

Code for routers/global_variables.py:

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User, UserRole
from app.models.global_variable import GlobalVariable
from app.schemas.global_variable import (
    GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse
)

router = APIRouter(prefix="/api/global-variables", tags=["global-variables"])


def require_admin(current_user: User = Depends(get_current_user)):
    if current_user.role != UserRole.ADMIN:
        raise HTTPException(status_code=403, detail="Admin required")
    return current_user


@router.get("", response_model=List[GlobalVariableResponse])
def list_global_variables(
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    variables = db.query(GlobalVariable).all()
    # Mask secret values
    for var in variables:
        if var.is_secret:
            var.value = "********"
    return variables


@router.post("", response_model=GlobalVariableResponse)
def create_global_variable(
    request: GlobalVariableCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first()
    if existing:
        raise HTTPException(status_code=400, detail="Variable key already exists")

    variable = GlobalVariable(**request.model_dump())
    db.add(variable)
    db.commit()
    db.refresh(variable)
    return variable


@router.put("/{variable_id}", response_model=GlobalVariableResponse)
def update_global_variable(
    variable_id: UUID,
    request: GlobalVariableUpdate,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
    if not variable:
        raise HTTPException(status_code=404, detail="Variable not found")

    for key, value in request.model_dump(exclude_unset=True).items():
        setattr(variable, key, value)

    db.commit()
    db.refresh(variable)
    return variable


@router.delete("/{variable_id}")
def delete_global_variable(
    variable_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(require_admin),
):
    variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first()
    if not variable:
        raise HTTPException(status_code=404, detail="Variable not found")

    db.delete(variable)
    db.commit()
    return {"success": True}


@router.get("/by-key/{key}")
def get_variable_by_key(
    key: str,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first()
    if not variable:
        raise HTTPException(status_code=404, detail="Variable not found")

    return {"key": variable.key, "value": variable.value if not variable.is_secret else None}

Add to main.py:

from app.routers import global_variables
app.include_router(global_variables.router)

Commit: feat(fase4): add GlobalVariable API routes


Task 8: Flow Templates Model and API

Files:

  • Create: services/api-gateway/app/models/flow_template.py
  • Create: services/api-gateway/app/schemas/flow_template.py
  • Create: services/api-gateway/app/routers/flow_templates.py
  • Modify: services/api-gateway/app/models/__init__.py
  • Modify: services/api-gateway/app/main.py

Code for flow_template.py:

import uuid
from datetime import datetime
from sqlalchemy import Column, String, Text, DateTime
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base


class FlowTemplate(Base):
    __tablename__ = "flow_templates"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    category = Column(String(50), default="general", nullable=False)
    nodes = Column(JSONB, default=list)
    edges = Column(JSONB, default=list)
    variables = Column(JSONB, default=dict)
    preview_image = Column(String(500), nullable=True)
    is_public = Column(Boolean, default=True, nullable=False)
    created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

Add missing imports to flow_template.py:

from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey

Code for schemas/flow_template.py:

from pydantic import BaseModel
from typing import Optional, List
from uuid import UUID
from datetime import datetime


class FlowTemplateCreate(BaseModel):
    name: str
    description: Optional[str] = None
    category: str = "general"
    nodes: List[dict] = []
    edges: List[dict] = []
    variables: dict = {}
    is_public: bool = True


class FlowTemplateResponse(BaseModel):
    id: UUID
    name: str
    description: Optional[str]
    category: str
    nodes: List[dict]
    edges: List[dict]
    variables: dict
    preview_image: Optional[str]
    is_public: bool
    created_by: UUID
    created_at: datetime

    class Config:
        from_attributes = True

Code for routers/flow_templates.py:

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.flow_template import FlowTemplate
from app.models.flow import Flow
from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse

router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"])


@router.get("", response_model=List[FlowTemplateResponse])
def list_templates(
    category: str = None,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True)
    if category:
        query = query.filter(FlowTemplate.category == category)
    return query.all()


@router.post("", response_model=FlowTemplateResponse)
def create_template(
    request: FlowTemplateCreate,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    template = FlowTemplate(
        **request.model_dump(),
        created_by=current_user.id,
    )
    db.add(template)
    db.commit()
    db.refresh(template)
    return template


@router.post("/{template_id}/use")
def use_template(
    template_id: UUID,
    flow_name: str,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    """Create a new flow from template"""
    template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
    if not template:
        raise HTTPException(status_code=404, detail="Template not found")

    from app.models.flow import Flow, TriggerType

    flow = Flow(
        name=flow_name,
        description=f"Created from template: {template.name}",
        trigger_type=TriggerType.KEYWORD,
        nodes=template.nodes,
        edges=template.edges,
        variables=template.variables,
        is_active=False,
    )
    db.add(flow)
    db.commit()
    db.refresh(flow)

    return {"success": True, "flow_id": str(flow.id)}


@router.delete("/{template_id}")
def delete_template(
    template_id: UUID,
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user),
):
    template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first()
    if not template:
        raise HTTPException(status_code=404, detail="Template not found")

    if template.created_by != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")

    db.delete(template)
    db.commit()
    return {"success": True}

Update models/init.py:

from app.models.flow_template import FlowTemplate

Add to main.py:

from app.routers import flow_templates
app.include_router(flow_templates.router)

Commit: feat(fase4): add FlowTemplate model and API


Task 9: Refactor Engine to Use NodeRegistry

Files:

  • Modify: services/flow-engine/app/engine.py

Replace _execute_node method and add initialization:

from typing import Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
from app.nodes import NodeRegistry
from app.nodes.basic import (
    TriggerExecutor, MessageExecutor, ButtonsExecutor,
    WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
    SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
)
from app.nodes.validation import (
    ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
    ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor

settings = get_settings()


class FlowEngine:
    """Executes flow nodes based on incoming messages"""

    def __init__(self, db_session):
        self.db = db_session
        self._register_nodes()

    def _register_nodes(self):
        """Register all available node executors"""
        # Basic nodes
        NodeRegistry.register("trigger", TriggerExecutor())
        NodeRegistry.register("message", MessageExecutor(self._send_message))
        NodeRegistry.register("buttons", ButtonsExecutor(self._send_message))
        NodeRegistry.register("wait_input", WaitInputExecutor())
        NodeRegistry.register("set_variable", SetVariableExecutor())
        NodeRegistry.register("condition", ConditionExecutor())

        # Advanced nodes
        NodeRegistry.register("switch", SwitchExecutor())
        NodeRegistry.register("delay", DelayExecutor())
        NodeRegistry.register("random", RandomExecutor())
        NodeRegistry.register("loop", LoopExecutor())
        NodeRegistry.register("goto", GoToExecutor())

        # Validation nodes
        NodeRegistry.register("validate_email", ValidateEmailExecutor())
        NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
        NodeRegistry.register("validate_number", ValidateNumberExecutor())
        NodeRegistry.register("validate_date", ValidateDateExecutor())
        NodeRegistry.register("validate_regex", ValidateRegexExecutor())
        NodeRegistry.register("validate_options", ValidateOptionsExecutor())

        # Script nodes
        NodeRegistry.register("javascript", JavaScriptExecutor())
        NodeRegistry.register("http_request", HttpRequestExecutor())

        # AI nodes
        NodeRegistry.register("ai_response", AIResponseExecutor())
        NodeRegistry.register("ai_sentiment", AISentimentExecutor())

    async def _execute_node(
        self,
        node_type: str,
        config: dict,
        context: FlowContext,
        session
    ) -> Optional[str]:
        """Execute a single node using NodeRegistry"""
        executor = NodeRegistry.get(node_type)
        if executor:
            result = await executor.execute(config, context, session)

            # Handle special results
            if result == "goto" and context.get("_goto_node_id"):
                session.current_node_id = context.get("_goto_node_id")
                context.set("_goto_node_id", None)

            return result

        return "default"

    # ... rest of the class remains the same (process_message, _find_matching_flow, etc.)

Commit: feat(fase4): refactor FlowEngine to use NodeRegistry


Task 10: Frontend - Advanced Node Components

Files:

  • Modify: frontend/src/pages/FlowBuilder.tsx

Add new node components and update nodeTypes:

// Add these new node components after existing ones

const SwitchNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #13c2c2', borderRadius: 8, background: '#e6fffb' }}>
    <strong>🔀 Switch</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'variable'}</div>
  </div>
);

const DelayNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #595959', borderRadius: 8, background: '#fafafa' }}>
    <strong>⏱️ Delay</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.seconds || 0}s</div>
  </div>
);

const RandomNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #eb2f96', borderRadius: 8, background: '#fff0f6' }}>
    <strong>🎲 Random</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>A/B Test</div>
  </div>
);

const LoopNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #fa8c16', borderRadius: 8, background: '#fff7e6' }}>
    <strong>🔄 Loop</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>max: {data.config?.max_iterations || 10}</div>
  </div>
);

const ValidateNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #52c41a', borderRadius: 8, background: '#f6ffed' }}>
    <strong> Validate</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.type || 'email'}</div>
  </div>
);

const HttpNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #2f54eb', borderRadius: 8, background: '#f0f5ff' }}>
    <strong>🌐 HTTP</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.method || 'GET'}</div>
  </div>
);

const AINode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #722ed1', borderRadius: 8, background: '#f9f0ff' }}>
    <strong>🤖 AI Response</strong>
  </div>
);

const SetVariableNode = ({ data }: { data: any }) => (
  <div style={{ padding: 10, border: '2px solid #8c8c8c', borderRadius: 8, background: '#f5f5f5' }}>
    <strong>📝 Variable</strong>
    <div style={{ fontSize: 11, marginTop: 4 }}>{data.config?.variable || 'var'}</div>
  </div>
);

// Update nodeTypes
const nodeTypes: NodeTypes = {
  trigger: TriggerNode,
  message: MessageNode,
  condition: ConditionNode,
  wait_input: WaitInputNode,
  switch: SwitchNode,
  delay: DelayNode,
  random: RandomNode,
  loop: LoopNode,
  validate: ValidateNode,
  http_request: HttpNode,
  ai_response: AINode,
  set_variable: SetVariableNode,
};

Update the toolbar to add new node buttons:

<Space wrap>
  <Button onClick={() => addNode('trigger')}>+ Trigger</Button>
  <Button onClick={() => addNode('message')}>+ Mensaje</Button>
  <Button onClick={() => addNode('condition')}>+ Condición</Button>
  <Button onClick={() => addNode('wait_input')}>+ Esperar</Button>
  <Button onClick={() => addNode('switch')}>+ Switch</Button>
  <Button onClick={() => addNode('delay')}>+ Delay</Button>
  <Button onClick={() => addNode('random')}>+ Random</Button>
  <Button onClick={() => addNode('loop')}>+ Loop</Button>
  <Button onClick={() => addNode('validate')}>+ Validar</Button>
  <Button onClick={() => addNode('http_request')}>+ HTTP</Button>
  <Button onClick={() => addNode('ai_response')}>+ AI</Button>
  <Button onClick={() => addNode('set_variable')}>+ Variable</Button>
  <Button type="primary" icon={<SaveOutlined />} onClick={() => saveMutation.mutate()} loading={saveMutation.isPending} style={{ background: '#25D366' }}>
    Guardar
  </Button>
</Space>

Commit: feat(fase4): add advanced node components to FlowBuilder


Task 11: Frontend - Global Variables Page

Files:

  • Create: frontend/src/pages/GlobalVariables.tsx

Code:

import { useState } from 'react';
import {
  Card, Table, Button, Modal, Form, Input, Select, Switch, Space, Tag, Popconfirm, message, Typography,
} from 'antd';
import { PlusOutlined, EditOutlined, DeleteOutlined, LockOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { apiClient } from '../api/client';

const { Title } = Typography;

interface GlobalVariable {
  id: string;
  key: string;
  value: string | null;
  value_type: string;
  description: string | null;
  is_secret: boolean;
}

export default function GlobalVariables() {
  const [isModalOpen, setIsModalOpen] = useState(false);
  const [selectedVar, setSelectedVar] = useState<GlobalVariable | null>(null);
  const [form] = Form.useForm();
  const queryClient = useQueryClient();

  const { data: variables, isLoading } = useQuery({
    queryKey: ['global-variables'],
    queryFn: () => apiClient.get<GlobalVariable[]>('/api/global-variables'),
  });

  const createMutation = useMutation({
    mutationFn: (data: Partial<GlobalVariable>) => apiClient.post('/api/global-variables', data),
    onSuccess: () => {
      message.success('Variable creada');
      queryClient.invalidateQueries({ queryKey: ['global-variables'] });
      closeModal();
    },
  });

  const updateMutation = useMutation({
    mutationFn: ({ id, data }: { id: string; data: Partial<GlobalVariable> }) =>
      apiClient.put(`/api/global-variables/${id}`, data),
    onSuccess: () => {
      message.success('Variable actualizada');
      queryClient.invalidateQueries({ queryKey: ['global-variables'] });
      closeModal();
    },
  });

  const deleteMutation = useMutation({
    mutationFn: (id: string) => apiClient.delete(`/api/global-variables/${id}`),
    onSuccess: () => {
      message.success('Variable eliminada');
      queryClient.invalidateQueries({ queryKey: ['global-variables'] });
    },
  });

  const closeModal = () => {
    setIsModalOpen(false);
    setSelectedVar(null);
    form.resetFields();
  };

  const handleSubmit = (values: Partial<GlobalVariable>) => {
    if (selectedVar) {
      updateMutation.mutate({ id: selectedVar.id, data: values });
    } else {
      createMutation.mutate(values);
    }
  };

  const handleEdit = (variable: GlobalVariable) => {
    setSelectedVar(variable);
    form.setFieldsValue(variable);
    setIsModalOpen(true);
  };

  const columns = [
    {
      title: 'Clave',
      dataIndex: 'key',
      key: 'key',
      render: (key: string, record: GlobalVariable) => (
        <Space>
          <code>{'{{global.' + key + '}}'}</code>
          {record.is_secret && <LockOutlined style={{ color: '#faad14' }} />}
        </Space>
      ),
    },
    {
      title: 'Valor',
      dataIndex: 'value',
      key: 'value',
      render: (value: string, record: GlobalVariable) =>
        record.is_secret ? <Tag>Secreto</Tag> : value,
    },
    {
      title: 'Tipo',
      dataIndex: 'value_type',
      key: 'value_type',
      render: (type: string) => <Tag>{type}</Tag>,
    },
    {
      title: 'Descripción',
      dataIndex: 'description',
      key: 'description',
    },
    {
      title: 'Acciones',
      key: 'actions',
      render: (_: unknown, record: GlobalVariable) => (
        <Space>
          <Button size="small" icon={<EditOutlined />} onClick={() => handleEdit(record)} />
          <Popconfirm title="¿Eliminar variable?" onConfirm={() => deleteMutation.mutate(record.id)}>
            <Button size="small" danger icon={<DeleteOutlined />} />
          </Popconfirm>
        </Space>
      ),
    },
  ];

  return (
    <div>
      <div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 16 }}>
        <Title level={4} style={{ margin: 0 }}>Variables Globales</Title>
        <Button type="primary" icon={<PlusOutlined />} onClick={() => setIsModalOpen(true)}>
          Nueva Variable
        </Button>
      </div>

      <Card>
        <Table dataSource={variables} columns={columns} rowKey="id" loading={isLoading} pagination={false} />
      </Card>

      <Modal
        title={selectedVar ? 'Editar Variable' : 'Nueva Variable'}
        open={isModalOpen}
        onCancel={closeModal}
        footer={null}
      >
        <Form form={form} layout="vertical" onFinish={handleSubmit}>
          <Form.Item name="key" label="Clave" rules={[{ required: true }]}>
            <Input placeholder="api_key" disabled={!!selectedVar} />
          </Form.Item>
          <Form.Item name="value" label="Valor">
            <Input.TextArea rows={2} />
          </Form.Item>
          <Form.Item name="value_type" label="Tipo" initialValue="string">
            <Select>
              <Select.Option value="string">String</Select.Option>
              <Select.Option value="number">Number</Select.Option>
              <Select.Option value="boolean">Boolean</Select.Option>
              <Select.Option value="json">JSON</Select.Option>
            </Select>
          </Form.Item>
          <Form.Item name="description" label="Descripción">
            <Input />
          </Form.Item>
          <Form.Item name="is_secret" label="¿Es secreto?" valuePropName="checked">
            <Switch />
          </Form.Item>
          <Form.Item>
            <Space>
              <Button type="primary" htmlType="submit" loading={createMutation.isPending || updateMutation.isPending}>
                {selectedVar ? 'Actualizar' : 'Crear'}
              </Button>
              <Button onClick={closeModal}>Cancelar</Button>
            </Space>
          </Form.Item>
        </Form>
      </Modal>
    </div>
  );
}

Commit: feat(fase4): add GlobalVariables frontend page


Task 12: Frontend - Flow Templates Page

Files:

  • Create: frontend/src/pages/FlowTemplates.tsx

Code:

import { useState } from 'react';
import {
  Card, Row, Col, Button, Modal, Input, Typography, Tag, Empty, message,
} from 'antd';
import { PlusOutlined, CopyOutlined } from '@ant-design/icons';
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
import { useNavigate } from 'react-router-dom';
import { apiClient } from '../api/client';

const { Title, Text, Paragraph } = Typography;

interface FlowTemplate {
  id: string;
  name: string;
  description: string | null;
  category: string;
  nodes: any[];
}

export default function FlowTemplates() {
  const [isCreateModalOpen, setIsCreateModalOpen] = useState(false);
  const [selectedTemplate, setSelectedTemplate] = useState<FlowTemplate | null>(null);
  const [flowName, setFlowName] = useState('');
  const navigate = useNavigate();
  const queryClient = useQueryClient();

  const { data: templates, isLoading } = useQuery({
    queryKey: ['flow-templates'],
    queryFn: () => apiClient.get<FlowTemplate[]>('/api/flow-templates'),
  });

  const useTemplateMutation = useMutation({
    mutationFn: async ({ templateId, name }: { templateId: string; name: string }) => {
      const response = await apiClient.post<{ flow_id: string }>(`/api/flow-templates/${templateId}/use?flow_name=${encodeURIComponent(name)}`, {});
      return response;
    },
    onSuccess: (data) => {
      message.success('Flujo creado desde plantilla');
      setIsCreateModalOpen(false);
      setSelectedTemplate(null);
      setFlowName('');
      navigate(`/flows/${data.flow_id}`);
    },
  });

  const handleUseTemplate = (template: FlowTemplate) => {
    setSelectedTemplate(template);
    setFlowName(`${template.name} - Copia`);
    setIsCreateModalOpen(true);
  };

  const categoryColors: Record<string, string> = {
    general: 'blue',
    sales: 'green',
    support: 'orange',
    marketing: 'purple',
  };

  return (
    <div>
      <div style={{ display: 'flex', justifyContent: 'space-between', marginBottom: 24 }}>
        <Title level={4} style={{ margin: 0 }}>Plantillas de Flujos</Title>
      </div>

      {isLoading ? (
        <Card loading />
      ) : templates?.length === 0 ? (
        <Empty description="No hay plantillas disponibles" />
      ) : (
        <Row gutter={[16, 16]}>
          {templates?.map((template) => (
            <Col key={template.id} xs={24} sm={12} md={8} lg={6}>
              <Card
                hoverable
                actions={[
                  <Button type="link" icon={<CopyOutlined />} onClick={() => handleUseTemplate(template)}>
                    Usar
                  </Button>,
                ]}
              >
                <Card.Meta
                  title={template.name}
                  description={
                    <>
                      <Tag color={categoryColors[template.category] || 'default'}>{template.category}</Tag>
                      <Paragraph ellipsis={{ rows: 2 }} style={{ marginTop: 8, marginBottom: 0 }}>
                        {template.description || 'Sin descripción'}
                      </Paragraph>
                      <Text type="secondary" style={{ fontSize: 12 }}>
                        {template.nodes.length} nodos
                      </Text>
                    </>
                  }
                />
              </Card>
            </Col>
          ))}
        </Row>
      )}

      <Modal
        title="Crear flujo desde plantilla"
        open={isCreateModalOpen}
        onCancel={() => { setIsCreateModalOpen(false); setSelectedTemplate(null); }}
        onOk={() => {
          if (selectedTemplate && flowName) {
            useTemplateMutation.mutate({ templateId: selectedTemplate.id, name: flowName });
          }
        }}
        okText="Crear"
        confirmLoading={useTemplateMutation.isPending}
      >
        <div style={{ marginBottom: 16 }}>
          <Text strong>Plantilla: </Text>
          <Text>{selectedTemplate?.name}</Text>
        </div>
        <Input
          placeholder="Nombre del nuevo flujo"
          value={flowName}
          onChange={(e) => setFlowName(e.target.value)}
        />
      </Modal>
    </div>
  );
}

Commit: feat(fase4): add FlowTemplates frontend page


Task 13: Update MainLayout with New Routes

Files:

  • Modify: frontend/src/layouts/MainLayout.tsx

Add imports:

import { GlobalOutlined, AppstoreOutlined } from '@ant-design/icons';
import GlobalVariables from '../pages/GlobalVariables';
import FlowTemplates from '../pages/FlowTemplates';

Add menu items (after Flujos):

    {
      key: '/templates',
      icon: <AppstoreOutlined />,
      label: 'Plantillas',
    },
    {
      key: '/variables',
      icon: <GlobalOutlined />,
      label: 'Variables',
    },

Add routes:

            <Route path="/templates" element={<FlowTemplates />} />
            <Route path="/variables" element={<GlobalVariables />} />

Commit: feat(fase4): add Templates and Variables routes to MainLayout


Task 14: Update Docker Compose for OpenAI

Files:

  • Modify: docker-compose.yml

Add environment variable to flow-engine service:

  flow-engine:
    build: ./services/flow-engine
    environment:
      DATABASE_URL: postgresql://${DB_USER}:${DB_PASSWORD}@postgres:5432/whatsapp_central
      REDIS_URL: redis://redis:6379
      API_GATEWAY_URL: http://api-gateway:8000
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      OPENAI_MODEL: ${OPENAI_MODEL:-gpt-3.5-turbo}

Add to .env.example:

OPENAI_API_KEY=sk-your-api-key
OPENAI_MODEL=gpt-3.5-turbo

Commit: feat(fase4): add OpenAI configuration to Docker Compose


Task 15: Final Integration Commit

Final verification and commit marking Fase 4 complete.

Commit: feat(fase4): complete Flow Engine Avanzado phase


Summary

This plan implements:

  1. Node Architecture: Modular NodeExecutor system with registry
  2. Basic Nodes: Refactored trigger, message, buttons, wait_input, set_variable, condition
  3. Advanced Nodes: switch, delay, random, loop, goto
  4. Validation Nodes: email, phone, number, date, regex, options
  5. Script Nodes: JavaScript (Python eval), HTTP request
  6. AI Nodes: AI response (OpenAI), sentiment analysis
  7. Global Variables: Database model, API, frontend management
  8. Flow Templates: Reusable templates system
  9. Frontend: New node components, GlobalVariables page, FlowTemplates page

Key Features:

  • A/B testing with random node (weighted distribution)
  • AI-powered responses using OpenAI
  • HTTP requests for external API integration
  • Input validation with multiple types
  • Reusable flow templates
  • Global variables for shared configuration