# Fase 4: Flow Engine Avanzado - Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA. **Architecture:** Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder. **Tech Stack:** Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node) --- ## Task 1: Node Executor Architecture **Files:** - Create: `services/flow-engine/app/nodes/__init__.py` - Create: `services/flow-engine/app/nodes/base.py` - Create: `services/flow-engine/app/nodes/basic.py` **Code for base.py:** ```python from abc import ABC, abstractmethod from typing import Optional, Any from app.context import FlowContext class NodeExecutor(ABC): """Base class for all node executors""" @abstractmethod async def execute( self, config: dict, context: FlowContext, session: Any ) -> Optional[str]: """ Execute the node logic. Returns: branch name for routing (e.g., "default", "true", "false") or "wait" to pause execution """ pass class NodeRegistry: """Registry of all available node executors""" _executors: dict[str, NodeExecutor] = {} @classmethod def register(cls, node_type: str, executor: NodeExecutor): cls._executors[node_type] = executor @classmethod def get(cls, node_type: str) -> Optional[NodeExecutor]: return cls._executors.get(node_type) @classmethod def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]: executor = cls.get(node_type) if executor: return executor.execute(config, context, session) return "default" ``` **Code for basic.py:** ```python from typing import Optional, Any from app.nodes.base import NodeExecutor, NodeRegistry from app.context import FlowContext class TriggerExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: return "default" class MessageExecutor(NodeExecutor): def __init__(self, send_message_fn): self.send_message = send_message_fn async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: text = context.interpolate(config.get("text", "")) await self.send_message(session.conversation_id, text) return "default" class ButtonsExecutor(NodeExecutor): def __init__(self, send_message_fn): self.send_message = send_message_fn async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: text = context.interpolate(config.get("text", "")) buttons = config.get("buttons", []) button_text = "\n".join([f"• {b['label']}" for b in buttons]) await self.send_message(session.conversation_id, f"{text}\n\n{button_text}") return "wait" class WaitInputExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "user_input") context.set(variable, context.message.get("content", "")) return "default" class SetVariableExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: var_name = config.get("variable", "") var_value = context.interpolate(config.get("value", "")) context.set(var_name, var_value) return "default" class ConditionExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: conditions = config.get("conditions", []) for cond in conditions: field = cond.get("field", "") operator = cond.get("operator", "equals") value = cond.get("value", "") branch = cond.get("branch", "default") actual = context.get(field) or context.message.get("content", "") if operator == "equals" and str(actual).lower() == str(value).lower(): return branch elif operator == "contains" and str(value).lower() in str(actual).lower(): return branch elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): return branch elif operator == "not_equals" and str(actual).lower() != str(value).lower(): return branch elif operator == "greater_than": try: if float(actual) > float(value): return branch except ValueError: pass elif operator == "less_than": try: if float(actual) < float(value): return branch except ValueError: pass return "default" ``` **Code for __init__.py:** ```python from app.nodes.base import NodeExecutor, NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) ``` **Commit:** `feat(fase4): add NodeExecutor architecture with basic nodes` --- ## Task 2: Advanced Nodes - Switch, Delay, Random **Files:** - Create: `services/flow-engine/app/nodes/advanced.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for advanced.py:** ```python import asyncio import random as random_module from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class SwitchExecutor(NodeExecutor): """Multi-branch switch based on variable value""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") cases = config.get("cases", []) # [{value: "x", branch: "case_x"}, ...] actual = context.get(variable) or context.message.get("content", "") actual_str = str(actual).lower().strip() for case in cases: case_value = str(case.get("value", "")).lower().strip() if actual_str == case_value: return case.get("branch", "default") return config.get("default_branch", "default") class DelayExecutor(NodeExecutor): """Wait for a specified duration before continuing""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: delay_seconds = config.get("seconds", 0) delay_type = config.get("type", "fixed") # fixed or random if delay_type == "random": min_delay = config.get("min_seconds", 1) max_delay = config.get("max_seconds", 5) delay_seconds = random_module.uniform(min_delay, max_delay) if delay_seconds > 0: await asyncio.sleep(min(delay_seconds, 30)) # Max 30 seconds return "default" class RandomExecutor(NodeExecutor): """Random branch selection for A/B testing""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: branches = config.get("branches", []) # [{branch: "a", weight: 50}, {branch: "b", weight: 50}] if not branches: return "default" total_weight = sum(b.get("weight", 1) for b in branches) rand_value = random_module.uniform(0, total_weight) cumulative = 0 for branch in branches: cumulative += branch.get("weight", 1) if rand_value <= cumulative: # Track A/B test result test_name = config.get("test_name", "ab_test") context.set(f"_ab_{test_name}", branch.get("branch", "default")) return branch.get("branch", "default") return branches[-1].get("branch", "default") if branches else "default" class LoopExecutor(NodeExecutor): """Loop a certain number of times or until condition""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: loop_var = config.get("counter_variable", "_loop_counter") max_iterations = config.get("max_iterations", 10) current = int(context.get(loop_var) or 0) if current < max_iterations: context.set(loop_var, current + 1) return "continue" # Continue loop else: context.set(loop_var, 0) # Reset counter return "done" # Exit loop class GoToExecutor(NodeExecutor): """Jump to another node or flow""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: target_node_id = config.get("target_node_id") target_flow_id = config.get("target_flow_id") if target_flow_id: # Store for sub-flow execution context.set("_goto_flow_id", target_flow_id) return "sub_flow" if target_node_id: context.set("_goto_node_id", target_node_id) return "goto" return "default" ``` **Update __init__.py:** ```python from app.nodes.base import NodeExecutor, NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) from app.nodes.advanced import ( SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor ) ``` **Commit:** `feat(fase4): add advanced nodes - switch, delay, random, loop, goto` --- ## Task 3: Validation Nodes **Files:** - Create: `services/flow-engine/app/nodes/validation.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for validation.py:** ```python import re from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class ValidateEmailExecutor(NodeExecutor): """Validate email format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(email_pattern, str(value).strip()): return "valid" return "invalid" class ValidatePhoneExecutor(NodeExecutor): """Validate phone number format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") # Remove common separators cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value)) # Check if it's mostly digits and reasonable length if cleaned.isdigit() and 8 <= len(cleaned) <= 15: return "valid" return "invalid" class ValidateNumberExecutor(NodeExecutor): """Validate numeric value within range""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") min_val = config.get("min") max_val = config.get("max") try: num = float(str(value).strip()) if min_val is not None and num < float(min_val): return "invalid" if max_val is not None and num > float(max_val): return "invalid" return "valid" except ValueError: return "invalid" class ValidateDateExecutor(NodeExecutor): """Validate date format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") date_format = config.get("format", "%Y-%m-%d") from datetime import datetime try: datetime.strptime(str(value).strip(), date_format) return "valid" except ValueError: return "invalid" class ValidateRegexExecutor(NodeExecutor): """Validate against custom regex pattern""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") pattern = config.get("pattern", ".*") try: if re.match(pattern, str(value).strip()): return "valid" return "invalid" except re.error: return "invalid" class ValidateOptionsExecutor(NodeExecutor): """Validate against list of allowed options""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") options = config.get("options", []) case_sensitive = config.get("case_sensitive", False) value_str = str(value).strip() if not case_sensitive: value_str = value_str.lower() options = [str(o).lower() for o in options] if value_str in options: return "valid" return "invalid" ``` **Update __init__.py:** ```python from app.nodes.validation import ( ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor ) ``` **Commit:** `feat(fase4): add validation nodes for email, phone, number, date, regex, options` --- ## Task 4: JavaScript Node **Files:** - Create: `services/flow-engine/app/nodes/script.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for script.py:** ```python import json from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class JavaScriptExecutor(NodeExecutor): """Execute JavaScript code (using Python eval with restricted globals)""" ALLOWED_BUILTINS = { 'abs': abs, 'all': all, 'any': any, 'bool': bool, 'dict': dict, 'float': float, 'int': int, 'len': len, 'list': list, 'max': max, 'min': min, 'round': round, 'str': str, 'sum': sum, 'sorted': sorted, } async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: code = config.get("code", "") output_variable = config.get("output_variable", "_result") if not code: return "default" # Build safe execution context exec_globals = { '__builtins__': self.ALLOWED_BUILTINS, 'context': { 'contact': context.contact, 'conversation': context.conversation, 'message': context.message, 'variables': context.variables.copy(), }, 'variables': context.variables.copy(), } try: # Simple expression evaluation (not full JS, but Python expressions) # For safety, we use a restricted eval result = eval(code, exec_globals, {}) if result is not None: context.set(output_variable, result) return "success" except Exception as e: context.set("_script_error", str(e)) return "error" class HttpRequestExecutor(NodeExecutor): """Make HTTP requests to external APIs""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx url = context.interpolate(config.get("url", "")) method = config.get("method", "GET").upper() headers = config.get("headers", {}) body = config.get("body") output_variable = config.get("output_variable", "_http_response") timeout = config.get("timeout", 10) if not url: return "error" # Interpolate headers and body headers = {k: context.interpolate(str(v)) for k, v in headers.items()} if body and isinstance(body, str): body = context.interpolate(body) try: async with httpx.AsyncClient() as client: response = await client.request( method=method, url=url, headers=headers, json=json.loads(body) if body and isinstance(body, str) else body, timeout=timeout ) context.set(output_variable, { "status": response.status_code, "body": response.text, "json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None }) if 200 <= response.status_code < 300: return "success" return "error" except Exception as e: context.set("_http_error", str(e)) return "error" ``` **Update __init__.py** to add: ```python from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor ``` **Commit:** `feat(fase4): add JavaScript and HTTP request nodes` --- ## Task 5: AI Response Node **Files:** - Create: `services/flow-engine/app/nodes/ai.py` - Modify: `services/flow-engine/app/config.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Add to config.py:** ```python # OpenAI OPENAI_API_KEY: str = "" OPENAI_MODEL: str = "gpt-3.5-turbo" ``` **Code for ai.py:** ```python from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext from app.config import get_settings settings = get_settings() class AIResponseExecutor(NodeExecutor): """Generate AI response using OpenAI""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx prompt = context.interpolate(config.get("prompt", "")) system_prompt = config.get("system_prompt", "Eres un asistente útil.") output_variable = config.get("output_variable", "_ai_response") max_tokens = config.get("max_tokens", 500) temperature = config.get("temperature", 0.7) if not settings.OPENAI_API_KEY: context.set("_ai_error", "OpenAI API key not configured") return "error" if not prompt: return "error" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] # Include conversation history if configured if config.get("include_history", False): history = context.get("_conversation_history") or [] messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}] try: async with httpx.AsyncClient() as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {settings.OPENAI_API_KEY}", "Content-Type": "application/json" }, json={ "model": settings.OPENAI_MODEL, "messages": messages, "max_tokens": max_tokens, "temperature": temperature }, timeout=30 ) if response.status_code == 200: data = response.json() ai_response = data["choices"][0]["message"]["content"] context.set(output_variable, ai_response) return "success" else: context.set("_ai_error", response.text) return "error" except Exception as e: context.set("_ai_error", str(e)) return "error" class AISentimentExecutor(NodeExecutor): """Analyze sentiment of user message""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx text = context.get(config.get("variable", "")) or context.message.get("content", "") output_variable = config.get("output_variable", "_sentiment") if not settings.OPENAI_API_KEY or not text: return "neutral" try: async with httpx.AsyncClient() as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {settings.OPENAI_API_KEY}", "Content-Type": "application/json" }, json={ "model": "gpt-3.5-turbo", "messages": [ {"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"}, {"role": "user", "content": text} ], "max_tokens": 10, "temperature": 0 }, timeout=15 ) if response.status_code == 200: data = response.json() sentiment = data["choices"][0]["message"]["content"].lower().strip() context.set(output_variable, sentiment) if "positive" in sentiment: return "positive" elif "negative" in sentiment: return "negative" return "neutral" except Exception: pass return "neutral" ``` **Update __init__.py:** ```python from app.nodes.ai import AIResponseExecutor, AISentimentExecutor ``` **Commit:** `feat(fase4): add AI response and sentiment analysis nodes` --- ## Task 6: Global Variables Model **Files:** - Create: `services/api-gateway/app/models/global_variable.py` - Modify: `services/api-gateway/app/models/__init__.py` **Code for global_variable.py:** ```python import uuid from datetime import datetime from sqlalchemy import Column, String, Text, DateTime, Boolean from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base class GlobalVariable(Base): __tablename__ = "global_variables" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) key = Column(String(100), nullable=False, unique=True, index=True) value = Column(Text, nullable=True) value_type = Column(String(20), default="string", nullable=False) # string, number, boolean, json description = Column(String(500), nullable=True) is_secret = Column(Boolean, default=False, nullable=False) # Hide value in UI created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) ``` **Update models/__init__.py:** ```python from app.models.global_variable import GlobalVariable ``` **Commit:** `feat(fase4): add GlobalVariable database model` --- ## Task 7: Global Variables API **Files:** - Create: `services/api-gateway/app/schemas/global_variable.py` - Create: `services/api-gateway/app/routers/global_variables.py` - Modify: `services/api-gateway/app/main.py` **Code for schemas/global_variable.py:** ```python from pydantic import BaseModel from typing import Optional from uuid import UUID from datetime import datetime class GlobalVariableCreate(BaseModel): key: str value: Optional[str] = None value_type: str = "string" description: Optional[str] = None is_secret: bool = False class GlobalVariableUpdate(BaseModel): value: Optional[str] = None value_type: Optional[str] = None description: Optional[str] = None is_secret: Optional[bool] = None class GlobalVariableResponse(BaseModel): id: UUID key: str value: Optional[str] value_type: str description: Optional[str] is_secret: bool created_at: datetime updated_at: datetime class Config: from_attributes = True ``` **Code for routers/global_variables.py:** ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User, UserRole from app.models.global_variable import GlobalVariable from app.schemas.global_variable import ( GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse ) router = APIRouter(prefix="/api/global-variables", tags=["global-variables"]) def require_admin(current_user: User = Depends(get_current_user)): if current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.get("", response_model=List[GlobalVariableResponse]) def list_global_variables( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): variables = db.query(GlobalVariable).all() # Mask secret values for var in variables: if var.is_secret: var.value = "********" return variables @router.post("", response_model=GlobalVariableResponse) def create_global_variable( request: GlobalVariableCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first() if existing: raise HTTPException(status_code=400, detail="Variable key already exists") variable = GlobalVariable(**request.model_dump()) db.add(variable) db.commit() db.refresh(variable) return variable @router.put("/{variable_id}", response_model=GlobalVariableResponse) def update_global_variable( variable_id: UUID, request: GlobalVariableUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") for key, value in request.model_dump(exclude_unset=True).items(): setattr(variable, key, value) db.commit() db.refresh(variable) return variable @router.delete("/{variable_id}") def delete_global_variable( variable_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") db.delete(variable) db.commit() return {"success": True} @router.get("/by-key/{key}") def get_variable_by_key( key: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") return {"key": variable.key, "value": variable.value if not variable.is_secret else None} ``` **Add to main.py:** ```python from app.routers import global_variables app.include_router(global_variables.router) ``` **Commit:** `feat(fase4): add GlobalVariable API routes` --- ## Task 8: Flow Templates Model and API **Files:** - Create: `services/api-gateway/app/models/flow_template.py` - Create: `services/api-gateway/app/schemas/flow_template.py` - Create: `services/api-gateway/app/routers/flow_templates.py` - Modify: `services/api-gateway/app/models/__init__.py` - Modify: `services/api-gateway/app/main.py` **Code for flow_template.py:** ```python import uuid from datetime import datetime from sqlalchemy import Column, String, Text, DateTime from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base class FlowTemplate(Base): __tablename__ = "flow_templates" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) description = Column(Text, nullable=True) category = Column(String(50), default="general", nullable=False) nodes = Column(JSONB, default=list) edges = Column(JSONB, default=list) variables = Column(JSONB, default=dict) preview_image = Column(String(500), nullable=True) is_public = Column(Boolean, default=True, nullable=False) created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) ``` Add missing imports to flow_template.py: ```python from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey ``` **Code for schemas/flow_template.py:** ```python from pydantic import BaseModel from typing import Optional, List from uuid import UUID from datetime import datetime class FlowTemplateCreate(BaseModel): name: str description: Optional[str] = None category: str = "general" nodes: List[dict] = [] edges: List[dict] = [] variables: dict = {} is_public: bool = True class FlowTemplateResponse(BaseModel): id: UUID name: str description: Optional[str] category: str nodes: List[dict] edges: List[dict] variables: dict preview_image: Optional[str] is_public: bool created_by: UUID created_at: datetime class Config: from_attributes = True ``` **Code for routers/flow_templates.py:** ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User from app.models.flow_template import FlowTemplate from app.models.flow import Flow from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"]) @router.get("", response_model=List[FlowTemplateResponse]) def list_templates( category: str = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True) if category: query = query.filter(FlowTemplate.category == category) return query.all() @router.post("", response_model=FlowTemplateResponse) def create_template( request: FlowTemplateCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): template = FlowTemplate( **request.model_dump(), created_by=current_user.id, ) db.add(template) db.commit() db.refresh(template) return template @router.post("/{template_id}/use") def use_template( template_id: UUID, flow_name: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Create a new flow from template""" template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() if not template: raise HTTPException(status_code=404, detail="Template not found") from app.models.flow import Flow, TriggerType flow = Flow( name=flow_name, description=f"Created from template: {template.name}", trigger_type=TriggerType.KEYWORD, nodes=template.nodes, edges=template.edges, variables=template.variables, is_active=False, ) db.add(flow) db.commit() db.refresh(flow) return {"success": True, "flow_id": str(flow.id)} @router.delete("/{template_id}") def delete_template( template_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() if not template: raise HTTPException(status_code=404, detail="Template not found") if template.created_by != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") db.delete(template) db.commit() return {"success": True} ``` **Update models/__init__.py:** ```python from app.models.flow_template import FlowTemplate ``` **Add to main.py:** ```python from app.routers import flow_templates app.include_router(flow_templates.router) ``` **Commit:** `feat(fase4): add FlowTemplate model and API` --- ## Task 9: Refactor Engine to Use NodeRegistry **Files:** - Modify: `services/flow-engine/app/engine.py` **Replace _execute_node method and add initialization:** ```python from typing import Optional import httpx from app.config import get_settings from app.context import FlowContext from app.nodes import NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) from app.nodes.advanced import ( SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor ) from app.nodes.validation import ( ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor ) from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor from app.nodes.ai import AIResponseExecutor, AISentimentExecutor settings = get_settings() class FlowEngine: """Executes flow nodes based on incoming messages""" def __init__(self, db_session): self.db = db_session self._register_nodes() def _register_nodes(self): """Register all available node executors""" # Basic nodes NodeRegistry.register("trigger", TriggerExecutor()) NodeRegistry.register("message", MessageExecutor(self._send_message)) NodeRegistry.register("buttons", ButtonsExecutor(self._send_message)) NodeRegistry.register("wait_input", WaitInputExecutor()) NodeRegistry.register("set_variable", SetVariableExecutor()) NodeRegistry.register("condition", ConditionExecutor()) # Advanced nodes NodeRegistry.register("switch", SwitchExecutor()) NodeRegistry.register("delay", DelayExecutor()) NodeRegistry.register("random", RandomExecutor()) NodeRegistry.register("loop", LoopExecutor()) NodeRegistry.register("goto", GoToExecutor()) # Validation nodes NodeRegistry.register("validate_email", ValidateEmailExecutor()) NodeRegistry.register("validate_phone", ValidatePhoneExecutor()) NodeRegistry.register("validate_number", ValidateNumberExecutor()) NodeRegistry.register("validate_date", ValidateDateExecutor()) NodeRegistry.register("validate_regex", ValidateRegexExecutor()) NodeRegistry.register("validate_options", ValidateOptionsExecutor()) # Script nodes NodeRegistry.register("javascript", JavaScriptExecutor()) NodeRegistry.register("http_request", HttpRequestExecutor()) # AI nodes NodeRegistry.register("ai_response", AIResponseExecutor()) NodeRegistry.register("ai_sentiment", AISentimentExecutor()) async def _execute_node( self, node_type: str, config: dict, context: FlowContext, session ) -> Optional[str]: """Execute a single node using NodeRegistry""" executor = NodeRegistry.get(node_type) if executor: result = await executor.execute(config, context, session) # Handle special results if result == "goto" and context.get("_goto_node_id"): session.current_node_id = context.get("_goto_node_id") context.set("_goto_node_id", None) return result return "default" # ... rest of the class remains the same (process_message, _find_matching_flow, etc.) ``` **Commit:** `feat(fase4): refactor FlowEngine to use NodeRegistry` --- ## Task 10: Frontend - Advanced Node Components **Files:** - Modify: `frontend/src/pages/FlowBuilder.tsx` **Add new node components and update nodeTypes:** ```tsx // Add these new node components after existing ones const SwitchNode = ({ data }: { data: any }) => (
🔀 Switch
{data.config?.variable || 'variable'}
); const DelayNode = ({ data }: { data: any }) => (
⏱️ Delay
{data.config?.seconds || 0}s
); const RandomNode = ({ data }: { data: any }) => (
🎲 Random
A/B Test
); const LoopNode = ({ data }: { data: any }) => (
🔄 Loop
max: {data.config?.max_iterations || 10}
); const ValidateNode = ({ data }: { data: any }) => (
✅ Validate
{data.config?.type || 'email'}
); const HttpNode = ({ data }: { data: any }) => (
🌐 HTTP
{data.config?.method || 'GET'}
); const AINode = ({ data }: { data: any }) => (
🤖 AI Response
); const SetVariableNode = ({ data }: { data: any }) => (
📝 Variable
{data.config?.variable || 'var'}
); // Update nodeTypes const nodeTypes: NodeTypes = { trigger: TriggerNode, message: MessageNode, condition: ConditionNode, wait_input: WaitInputNode, switch: SwitchNode, delay: DelayNode, random: RandomNode, loop: LoopNode, validate: ValidateNode, http_request: HttpNode, ai_response: AINode, set_variable: SetVariableNode, }; ``` **Update the toolbar to add new node buttons:** ```tsx ``` **Commit:** `feat(fase4): add advanced node components to FlowBuilder` --- ## Task 11: Frontend - Global Variables Page **Files:** - Create: `frontend/src/pages/GlobalVariables.tsx` **Code:** ```tsx import { useState } from 'react'; import { Card, Table, Button, Modal, Form, Input, Select, Switch, Space, Tag, Popconfirm, message, Typography, } from 'antd'; import { PlusOutlined, EditOutlined, DeleteOutlined, LockOutlined } from '@ant-design/icons'; import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; import { apiClient } from '../api/client'; const { Title } = Typography; interface GlobalVariable { id: string; key: string; value: string | null; value_type: string; description: string | null; is_secret: boolean; } export default function GlobalVariables() { const [isModalOpen, setIsModalOpen] = useState(false); const [selectedVar, setSelectedVar] = useState(null); const [form] = Form.useForm(); const queryClient = useQueryClient(); const { data: variables, isLoading } = useQuery({ queryKey: ['global-variables'], queryFn: () => apiClient.get('/api/global-variables'), }); const createMutation = useMutation({ mutationFn: (data: Partial) => apiClient.post('/api/global-variables', data), onSuccess: () => { message.success('Variable creada'); queryClient.invalidateQueries({ queryKey: ['global-variables'] }); closeModal(); }, }); const updateMutation = useMutation({ mutationFn: ({ id, data }: { id: string; data: Partial }) => apiClient.put(`/api/global-variables/${id}`, data), onSuccess: () => { message.success('Variable actualizada'); queryClient.invalidateQueries({ queryKey: ['global-variables'] }); closeModal(); }, }); const deleteMutation = useMutation({ mutationFn: (id: string) => apiClient.delete(`/api/global-variables/${id}`), onSuccess: () => { message.success('Variable eliminada'); queryClient.invalidateQueries({ queryKey: ['global-variables'] }); }, }); const closeModal = () => { setIsModalOpen(false); setSelectedVar(null); form.resetFields(); }; const handleSubmit = (values: Partial) => { if (selectedVar) { updateMutation.mutate({ id: selectedVar.id, data: values }); } else { createMutation.mutate(values); } }; const handleEdit = (variable: GlobalVariable) => { setSelectedVar(variable); form.setFieldsValue(variable); setIsModalOpen(true); }; const columns = [ { title: 'Clave', dataIndex: 'key', key: 'key', render: (key: string, record: GlobalVariable) => ( {'{{global.' + key + '}}'} {record.is_secret && } ), }, { title: 'Valor', dataIndex: 'value', key: 'value', render: (value: string, record: GlobalVariable) => record.is_secret ? Secreto : value, }, { title: 'Tipo', dataIndex: 'value_type', key: 'value_type', render: (type: string) => {type}, }, { title: 'Descripción', dataIndex: 'description', key: 'description', }, { title: 'Acciones', key: 'actions', render: (_: unknown, record: GlobalVariable) => (
); } ``` **Commit:** `feat(fase4): add GlobalVariables frontend page` --- ## Task 12: Frontend - Flow Templates Page **Files:** - Create: `frontend/src/pages/FlowTemplates.tsx` **Code:** ```tsx import { useState } from 'react'; import { Card, Row, Col, Button, Modal, Input, Typography, Tag, Empty, message, } from 'antd'; import { PlusOutlined, CopyOutlined } from '@ant-design/icons'; import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; import { useNavigate } from 'react-router-dom'; import { apiClient } from '../api/client'; const { Title, Text, Paragraph } = Typography; interface FlowTemplate { id: string; name: string; description: string | null; category: string; nodes: any[]; } export default function FlowTemplates() { const [isCreateModalOpen, setIsCreateModalOpen] = useState(false); const [selectedTemplate, setSelectedTemplate] = useState(null); const [flowName, setFlowName] = useState(''); const navigate = useNavigate(); const queryClient = useQueryClient(); const { data: templates, isLoading } = useQuery({ queryKey: ['flow-templates'], queryFn: () => apiClient.get('/api/flow-templates'), }); const useTemplateMutation = useMutation({ mutationFn: async ({ templateId, name }: { templateId: string; name: string }) => { const response = await apiClient.post<{ flow_id: string }>(`/api/flow-templates/${templateId}/use?flow_name=${encodeURIComponent(name)}`, {}); return response; }, onSuccess: (data) => { message.success('Flujo creado desde plantilla'); setIsCreateModalOpen(false); setSelectedTemplate(null); setFlowName(''); navigate(`/flows/${data.flow_id}`); }, }); const handleUseTemplate = (template: FlowTemplate) => { setSelectedTemplate(template); setFlowName(`${template.name} - Copia`); setIsCreateModalOpen(true); }; const categoryColors: Record = { general: 'blue', sales: 'green', support: 'orange', marketing: 'purple', }; return (
Plantillas de Flujos
{isLoading ? ( ) : templates?.length === 0 ? ( ) : ( {templates?.map((template) => (
} onClick={() => handleUseTemplate(template)}> Usar , ]} > {template.category} {template.description || 'Sin descripción'} {template.nodes.length} nodos } /> ))} )} { setIsCreateModalOpen(false); setSelectedTemplate(null); }} onOk={() => { if (selectedTemplate && flowName) { useTemplateMutation.mutate({ templateId: selectedTemplate.id, name: flowName }); } }} okText="Crear" confirmLoading={useTemplateMutation.isPending} >
Plantilla: {selectedTemplate?.name}
setFlowName(e.target.value)} />
); } ``` **Commit:** `feat(fase4): add FlowTemplates frontend page` --- ## Task 13: Update MainLayout with New Routes **Files:** - Modify: `frontend/src/layouts/MainLayout.tsx` **Add imports:** ```tsx import { GlobalOutlined, AppstoreOutlined } from '@ant-design/icons'; import GlobalVariables from '../pages/GlobalVariables'; import FlowTemplates from '../pages/FlowTemplates'; ``` **Add menu items (after Flujos):** ```tsx { key: '/templates', icon: , label: 'Plantillas', }, { key: '/variables', icon: , label: 'Variables', }, ``` **Add routes:** ```tsx } /> } /> ``` **Commit:** `feat(fase4): add Templates and Variables routes to MainLayout` --- ## Task 14: Update Docker Compose for OpenAI **Files:** - Modify: `docker-compose.yml` **Add environment variable to flow-engine service:** ```yaml flow-engine: build: ./services/flow-engine environment: DATABASE_URL: postgresql://${DB_USER}:${DB_PASSWORD}@postgres:5432/whatsapp_central REDIS_URL: redis://redis:6379 API_GATEWAY_URL: http://api-gateway:8000 OPENAI_API_KEY: ${OPENAI_API_KEY} OPENAI_MODEL: ${OPENAI_MODEL:-gpt-3.5-turbo} ``` **Add to .env.example:** ``` OPENAI_API_KEY=sk-your-api-key OPENAI_MODEL=gpt-3.5-turbo ``` **Commit:** `feat(fase4): add OpenAI configuration to Docker Compose` --- ## Task 15: Final Integration Commit **Final verification and commit marking Fase 4 complete.** **Commit:** `feat(fase4): complete Flow Engine Avanzado phase` --- ## Summary This plan implements: 1. **Node Architecture**: Modular NodeExecutor system with registry 2. **Basic Nodes**: Refactored trigger, message, buttons, wait_input, set_variable, condition 3. **Advanced Nodes**: switch, delay, random, loop, goto 4. **Validation Nodes**: email, phone, number, date, regex, options 5. **Script Nodes**: JavaScript (Python eval), HTTP request 6. **AI Nodes**: AI response (OpenAI), sentiment analysis 7. **Global Variables**: Database model, API, frontend management 8. **Flow Templates**: Reusable templates system 9. **Frontend**: New node components, GlobalVariables page, FlowTemplates page **Key Features:** - A/B testing with random node (weighted distribution) - AI-powered responses using OpenAI - HTTP requests for external API integration - Input validation with multiple types - Reusable flow templates - Global variables for shared configuration