# Fase 4: Flow Engine Avanzado - Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA. **Architecture:** Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder. **Tech Stack:** Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node) --- ## Task 1: Node Executor Architecture **Files:** - Create: `services/flow-engine/app/nodes/__init__.py` - Create: `services/flow-engine/app/nodes/base.py` - Create: `services/flow-engine/app/nodes/basic.py` **Code for base.py:** ```python from abc import ABC, abstractmethod from typing import Optional, Any from app.context import FlowContext class NodeExecutor(ABC): """Base class for all node executors""" @abstractmethod async def execute( self, config: dict, context: FlowContext, session: Any ) -> Optional[str]: """ Execute the node logic. Returns: branch name for routing (e.g., "default", "true", "false") or "wait" to pause execution """ pass class NodeRegistry: """Registry of all available node executors""" _executors: dict[str, NodeExecutor] = {} @classmethod def register(cls, node_type: str, executor: NodeExecutor): cls._executors[node_type] = executor @classmethod def get(cls, node_type: str) -> Optional[NodeExecutor]: return cls._executors.get(node_type) @classmethod def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]: executor = cls.get(node_type) if executor: return executor.execute(config, context, session) return "default" ``` **Code for basic.py:** ```python from typing import Optional, Any from app.nodes.base import NodeExecutor, NodeRegistry from app.context import FlowContext class TriggerExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: return "default" class MessageExecutor(NodeExecutor): def __init__(self, send_message_fn): self.send_message = send_message_fn async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: text = context.interpolate(config.get("text", "")) await self.send_message(session.conversation_id, text) return "default" class ButtonsExecutor(NodeExecutor): def __init__(self, send_message_fn): self.send_message = send_message_fn async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: text = context.interpolate(config.get("text", "")) buttons = config.get("buttons", []) button_text = "\n".join([f"• {b['label']}" for b in buttons]) await self.send_message(session.conversation_id, f"{text}\n\n{button_text}") return "wait" class WaitInputExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "user_input") context.set(variable, context.message.get("content", "")) return "default" class SetVariableExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: var_name = config.get("variable", "") var_value = context.interpolate(config.get("value", "")) context.set(var_name, var_value) return "default" class ConditionExecutor(NodeExecutor): async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: conditions = config.get("conditions", []) for cond in conditions: field = cond.get("field", "") operator = cond.get("operator", "equals") value = cond.get("value", "") branch = cond.get("branch", "default") actual = context.get(field) or context.message.get("content", "") if operator == "equals" and str(actual).lower() == str(value).lower(): return branch elif operator == "contains" and str(value).lower() in str(actual).lower(): return branch elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): return branch elif operator == "not_equals" and str(actual).lower() != str(value).lower(): return branch elif operator == "greater_than": try: if float(actual) > float(value): return branch except ValueError: pass elif operator == "less_than": try: if float(actual) < float(value): return branch except ValueError: pass return "default" ``` **Code for __init__.py:** ```python from app.nodes.base import NodeExecutor, NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) ``` **Commit:** `feat(fase4): add NodeExecutor architecture with basic nodes` --- ## Task 2: Advanced Nodes - Switch, Delay, Random **Files:** - Create: `services/flow-engine/app/nodes/advanced.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for advanced.py:** ```python import asyncio import random as random_module from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class SwitchExecutor(NodeExecutor): """Multi-branch switch based on variable value""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") cases = config.get("cases", []) # [{value: "x", branch: "case_x"}, ...] actual = context.get(variable) or context.message.get("content", "") actual_str = str(actual).lower().strip() for case in cases: case_value = str(case.get("value", "")).lower().strip() if actual_str == case_value: return case.get("branch", "default") return config.get("default_branch", "default") class DelayExecutor(NodeExecutor): """Wait for a specified duration before continuing""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: delay_seconds = config.get("seconds", 0) delay_type = config.get("type", "fixed") # fixed or random if delay_type == "random": min_delay = config.get("min_seconds", 1) max_delay = config.get("max_seconds", 5) delay_seconds = random_module.uniform(min_delay, max_delay) if delay_seconds > 0: await asyncio.sleep(min(delay_seconds, 30)) # Max 30 seconds return "default" class RandomExecutor(NodeExecutor): """Random branch selection for A/B testing""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: branches = config.get("branches", []) # [{branch: "a", weight: 50}, {branch: "b", weight: 50}] if not branches: return "default" total_weight = sum(b.get("weight", 1) for b in branches) rand_value = random_module.uniform(0, total_weight) cumulative = 0 for branch in branches: cumulative += branch.get("weight", 1) if rand_value <= cumulative: # Track A/B test result test_name = config.get("test_name", "ab_test") context.set(f"_ab_{test_name}", branch.get("branch", "default")) return branch.get("branch", "default") return branches[-1].get("branch", "default") if branches else "default" class LoopExecutor(NodeExecutor): """Loop a certain number of times or until condition""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: loop_var = config.get("counter_variable", "_loop_counter") max_iterations = config.get("max_iterations", 10) current = int(context.get(loop_var) or 0) if current < max_iterations: context.set(loop_var, current + 1) return "continue" # Continue loop else: context.set(loop_var, 0) # Reset counter return "done" # Exit loop class GoToExecutor(NodeExecutor): """Jump to another node or flow""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: target_node_id = config.get("target_node_id") target_flow_id = config.get("target_flow_id") if target_flow_id: # Store for sub-flow execution context.set("_goto_flow_id", target_flow_id) return "sub_flow" if target_node_id: context.set("_goto_node_id", target_node_id) return "goto" return "default" ``` **Update __init__.py:** ```python from app.nodes.base import NodeExecutor, NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) from app.nodes.advanced import ( SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor ) ``` **Commit:** `feat(fase4): add advanced nodes - switch, delay, random, loop, goto` --- ## Task 3: Validation Nodes **Files:** - Create: `services/flow-engine/app/nodes/validation.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for validation.py:** ```python import re from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class ValidateEmailExecutor(NodeExecutor): """Validate email format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(email_pattern, str(value).strip()): return "valid" return "invalid" class ValidatePhoneExecutor(NodeExecutor): """Validate phone number format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") # Remove common separators cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value)) # Check if it's mostly digits and reasonable length if cleaned.isdigit() and 8 <= len(cleaned) <= 15: return "valid" return "invalid" class ValidateNumberExecutor(NodeExecutor): """Validate numeric value within range""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") min_val = config.get("min") max_val = config.get("max") try: num = float(str(value).strip()) if min_val is not None and num < float(min_val): return "invalid" if max_val is not None and num > float(max_val): return "invalid" return "valid" except ValueError: return "invalid" class ValidateDateExecutor(NodeExecutor): """Validate date format""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") date_format = config.get("format", "%Y-%m-%d") from datetime import datetime try: datetime.strptime(str(value).strip(), date_format) return "valid" except ValueError: return "invalid" class ValidateRegexExecutor(NodeExecutor): """Validate against custom regex pattern""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") pattern = config.get("pattern", ".*") try: if re.match(pattern, str(value).strip()): return "valid" return "invalid" except re.error: return "invalid" class ValidateOptionsExecutor(NodeExecutor): """Validate against list of allowed options""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: variable = config.get("variable", "") value = context.get(variable) or context.message.get("content", "") options = config.get("options", []) case_sensitive = config.get("case_sensitive", False) value_str = str(value).strip() if not case_sensitive: value_str = value_str.lower() options = [str(o).lower() for o in options] if value_str in options: return "valid" return "invalid" ``` **Update __init__.py:** ```python from app.nodes.validation import ( ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor ) ``` **Commit:** `feat(fase4): add validation nodes for email, phone, number, date, regex, options` --- ## Task 4: JavaScript Node **Files:** - Create: `services/flow-engine/app/nodes/script.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Code for script.py:** ```python import json from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext class JavaScriptExecutor(NodeExecutor): """Execute JavaScript code (using Python eval with restricted globals)""" ALLOWED_BUILTINS = { 'abs': abs, 'all': all, 'any': any, 'bool': bool, 'dict': dict, 'float': float, 'int': int, 'len': len, 'list': list, 'max': max, 'min': min, 'round': round, 'str': str, 'sum': sum, 'sorted': sorted, } async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: code = config.get("code", "") output_variable = config.get("output_variable", "_result") if not code: return "default" # Build safe execution context exec_globals = { '__builtins__': self.ALLOWED_BUILTINS, 'context': { 'contact': context.contact, 'conversation': context.conversation, 'message': context.message, 'variables': context.variables.copy(), }, 'variables': context.variables.copy(), } try: # Simple expression evaluation (not full JS, but Python expressions) # For safety, we use a restricted eval result = eval(code, exec_globals, {}) if result is not None: context.set(output_variable, result) return "success" except Exception as e: context.set("_script_error", str(e)) return "error" class HttpRequestExecutor(NodeExecutor): """Make HTTP requests to external APIs""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx url = context.interpolate(config.get("url", "")) method = config.get("method", "GET").upper() headers = config.get("headers", {}) body = config.get("body") output_variable = config.get("output_variable", "_http_response") timeout = config.get("timeout", 10) if not url: return "error" # Interpolate headers and body headers = {k: context.interpolate(str(v)) for k, v in headers.items()} if body and isinstance(body, str): body = context.interpolate(body) try: async with httpx.AsyncClient() as client: response = await client.request( method=method, url=url, headers=headers, json=json.loads(body) if body and isinstance(body, str) else body, timeout=timeout ) context.set(output_variable, { "status": response.status_code, "body": response.text, "json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None }) if 200 <= response.status_code < 300: return "success" return "error" except Exception as e: context.set("_http_error", str(e)) return "error" ``` **Update __init__.py** to add: ```python from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor ``` **Commit:** `feat(fase4): add JavaScript and HTTP request nodes` --- ## Task 5: AI Response Node **Files:** - Create: `services/flow-engine/app/nodes/ai.py` - Modify: `services/flow-engine/app/config.py` - Modify: `services/flow-engine/app/nodes/__init__.py` **Add to config.py:** ```python # OpenAI OPENAI_API_KEY: str = "" OPENAI_MODEL: str = "gpt-3.5-turbo" ``` **Code for ai.py:** ```python from typing import Optional, Any from app.nodes.base import NodeExecutor from app.context import FlowContext from app.config import get_settings settings = get_settings() class AIResponseExecutor(NodeExecutor): """Generate AI response using OpenAI""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx prompt = context.interpolate(config.get("prompt", "")) system_prompt = config.get("system_prompt", "Eres un asistente útil.") output_variable = config.get("output_variable", "_ai_response") max_tokens = config.get("max_tokens", 500) temperature = config.get("temperature", 0.7) if not settings.OPENAI_API_KEY: context.set("_ai_error", "OpenAI API key not configured") return "error" if not prompt: return "error" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] # Include conversation history if configured if config.get("include_history", False): history = context.get("_conversation_history") or [] messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}] try: async with httpx.AsyncClient() as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {settings.OPENAI_API_KEY}", "Content-Type": "application/json" }, json={ "model": settings.OPENAI_MODEL, "messages": messages, "max_tokens": max_tokens, "temperature": temperature }, timeout=30 ) if response.status_code == 200: data = response.json() ai_response = data["choices"][0]["message"]["content"] context.set(output_variable, ai_response) return "success" else: context.set("_ai_error", response.text) return "error" except Exception as e: context.set("_ai_error", str(e)) return "error" class AISentimentExecutor(NodeExecutor): """Analyze sentiment of user message""" async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: import httpx text = context.get(config.get("variable", "")) or context.message.get("content", "") output_variable = config.get("output_variable", "_sentiment") if not settings.OPENAI_API_KEY or not text: return "neutral" try: async with httpx.AsyncClient() as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers={ "Authorization": f"Bearer {settings.OPENAI_API_KEY}", "Content-Type": "application/json" }, json={ "model": "gpt-3.5-turbo", "messages": [ {"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"}, {"role": "user", "content": text} ], "max_tokens": 10, "temperature": 0 }, timeout=15 ) if response.status_code == 200: data = response.json() sentiment = data["choices"][0]["message"]["content"].lower().strip() context.set(output_variable, sentiment) if "positive" in sentiment: return "positive" elif "negative" in sentiment: return "negative" return "neutral" except Exception: pass return "neutral" ``` **Update __init__.py:** ```python from app.nodes.ai import AIResponseExecutor, AISentimentExecutor ``` **Commit:** `feat(fase4): add AI response and sentiment analysis nodes` --- ## Task 6: Global Variables Model **Files:** - Create: `services/api-gateway/app/models/global_variable.py` - Modify: `services/api-gateway/app/models/__init__.py` **Code for global_variable.py:** ```python import uuid from datetime import datetime from sqlalchemy import Column, String, Text, DateTime, Boolean from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base class GlobalVariable(Base): __tablename__ = "global_variables" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) key = Column(String(100), nullable=False, unique=True, index=True) value = Column(Text, nullable=True) value_type = Column(String(20), default="string", nullable=False) # string, number, boolean, json description = Column(String(500), nullable=True) is_secret = Column(Boolean, default=False, nullable=False) # Hide value in UI created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) ``` **Update models/__init__.py:** ```python from app.models.global_variable import GlobalVariable ``` **Commit:** `feat(fase4): add GlobalVariable database model` --- ## Task 7: Global Variables API **Files:** - Create: `services/api-gateway/app/schemas/global_variable.py` - Create: `services/api-gateway/app/routers/global_variables.py` - Modify: `services/api-gateway/app/main.py` **Code for schemas/global_variable.py:** ```python from pydantic import BaseModel from typing import Optional from uuid import UUID from datetime import datetime class GlobalVariableCreate(BaseModel): key: str value: Optional[str] = None value_type: str = "string" description: Optional[str] = None is_secret: bool = False class GlobalVariableUpdate(BaseModel): value: Optional[str] = None value_type: Optional[str] = None description: Optional[str] = None is_secret: Optional[bool] = None class GlobalVariableResponse(BaseModel): id: UUID key: str value: Optional[str] value_type: str description: Optional[str] is_secret: bool created_at: datetime updated_at: datetime class Config: from_attributes = True ``` **Code for routers/global_variables.py:** ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User, UserRole from app.models.global_variable import GlobalVariable from app.schemas.global_variable import ( GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse ) router = APIRouter(prefix="/api/global-variables", tags=["global-variables"]) def require_admin(current_user: User = Depends(get_current_user)): if current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.get("", response_model=List[GlobalVariableResponse]) def list_global_variables( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): variables = db.query(GlobalVariable).all() # Mask secret values for var in variables: if var.is_secret: var.value = "********" return variables @router.post("", response_model=GlobalVariableResponse) def create_global_variable( request: GlobalVariableCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first() if existing: raise HTTPException(status_code=400, detail="Variable key already exists") variable = GlobalVariable(**request.model_dump()) db.add(variable) db.commit() db.refresh(variable) return variable @router.put("/{variable_id}", response_model=GlobalVariableResponse) def update_global_variable( variable_id: UUID, request: GlobalVariableUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") for key, value in request.model_dump(exclude_unset=True).items(): setattr(variable, key, value) db.commit() db.refresh(variable) return variable @router.delete("/{variable_id}") def delete_global_variable( variable_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") db.delete(variable) db.commit() return {"success": True} @router.get("/by-key/{key}") def get_variable_by_key( key: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first() if not variable: raise HTTPException(status_code=404, detail="Variable not found") return {"key": variable.key, "value": variable.value if not variable.is_secret else None} ``` **Add to main.py:** ```python from app.routers import global_variables app.include_router(global_variables.router) ``` **Commit:** `feat(fase4): add GlobalVariable API routes` --- ## Task 8: Flow Templates Model and API **Files:** - Create: `services/api-gateway/app/models/flow_template.py` - Create: `services/api-gateway/app/schemas/flow_template.py` - Create: `services/api-gateway/app/routers/flow_templates.py` - Modify: `services/api-gateway/app/models/__init__.py` - Modify: `services/api-gateway/app/main.py` **Code for flow_template.py:** ```python import uuid from datetime import datetime from sqlalchemy import Column, String, Text, DateTime from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base class FlowTemplate(Base): __tablename__ = "flow_templates" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) description = Column(Text, nullable=True) category = Column(String(50), default="general", nullable=False) nodes = Column(JSONB, default=list) edges = Column(JSONB, default=list) variables = Column(JSONB, default=dict) preview_image = Column(String(500), nullable=True) is_public = Column(Boolean, default=True, nullable=False) created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) ``` Add missing imports to flow_template.py: ```python from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey ``` **Code for schemas/flow_template.py:** ```python from pydantic import BaseModel from typing import Optional, List from uuid import UUID from datetime import datetime class FlowTemplateCreate(BaseModel): name: str description: Optional[str] = None category: str = "general" nodes: List[dict] = [] edges: List[dict] = [] variables: dict = {} is_public: bool = True class FlowTemplateResponse(BaseModel): id: UUID name: str description: Optional[str] category: str nodes: List[dict] edges: List[dict] variables: dict preview_image: Optional[str] is_public: bool created_by: UUID created_at: datetime class Config: from_attributes = True ``` **Code for routers/flow_templates.py:** ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User from app.models.flow_template import FlowTemplate from app.models.flow import Flow from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"]) @router.get("", response_model=List[FlowTemplateResponse]) def list_templates( category: str = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True) if category: query = query.filter(FlowTemplate.category == category) return query.all() @router.post("", response_model=FlowTemplateResponse) def create_template( request: FlowTemplateCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): template = FlowTemplate( **request.model_dump(), created_by=current_user.id, ) db.add(template) db.commit() db.refresh(template) return template @router.post("/{template_id}/use") def use_template( template_id: UUID, flow_name: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Create a new flow from template""" template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() if not template: raise HTTPException(status_code=404, detail="Template not found") from app.models.flow import Flow, TriggerType flow = Flow( name=flow_name, description=f"Created from template: {template.name}", trigger_type=TriggerType.KEYWORD, nodes=template.nodes, edges=template.edges, variables=template.variables, is_active=False, ) db.add(flow) db.commit() db.refresh(flow) return {"success": True, "flow_id": str(flow.id)} @router.delete("/{template_id}") def delete_template( template_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() if not template: raise HTTPException(status_code=404, detail="Template not found") if template.created_by != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") db.delete(template) db.commit() return {"success": True} ``` **Update models/__init__.py:** ```python from app.models.flow_template import FlowTemplate ``` **Add to main.py:** ```python from app.routers import flow_templates app.include_router(flow_templates.router) ``` **Commit:** `feat(fase4): add FlowTemplate model and API` --- ## Task 9: Refactor Engine to Use NodeRegistry **Files:** - Modify: `services/flow-engine/app/engine.py` **Replace _execute_node method and add initialization:** ```python from typing import Optional import httpx from app.config import get_settings from app.context import FlowContext from app.nodes import NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) from app.nodes.advanced import ( SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor ) from app.nodes.validation import ( ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor ) from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor from app.nodes.ai import AIResponseExecutor, AISentimentExecutor settings = get_settings() class FlowEngine: """Executes flow nodes based on incoming messages""" def __init__(self, db_session): self.db = db_session self._register_nodes() def _register_nodes(self): """Register all available node executors""" # Basic nodes NodeRegistry.register("trigger", TriggerExecutor()) NodeRegistry.register("message", MessageExecutor(self._send_message)) NodeRegistry.register("buttons", ButtonsExecutor(self._send_message)) NodeRegistry.register("wait_input", WaitInputExecutor()) NodeRegistry.register("set_variable", SetVariableExecutor()) NodeRegistry.register("condition", ConditionExecutor()) # Advanced nodes NodeRegistry.register("switch", SwitchExecutor()) NodeRegistry.register("delay", DelayExecutor()) NodeRegistry.register("random", RandomExecutor()) NodeRegistry.register("loop", LoopExecutor()) NodeRegistry.register("goto", GoToExecutor()) # Validation nodes NodeRegistry.register("validate_email", ValidateEmailExecutor()) NodeRegistry.register("validate_phone", ValidatePhoneExecutor()) NodeRegistry.register("validate_number", ValidateNumberExecutor()) NodeRegistry.register("validate_date", ValidateDateExecutor()) NodeRegistry.register("validate_regex", ValidateRegexExecutor()) NodeRegistry.register("validate_options", ValidateOptionsExecutor()) # Script nodes NodeRegistry.register("javascript", JavaScriptExecutor()) NodeRegistry.register("http_request", HttpRequestExecutor()) # AI nodes NodeRegistry.register("ai_response", AIResponseExecutor()) NodeRegistry.register("ai_sentiment", AISentimentExecutor()) async def _execute_node( self, node_type: str, config: dict, context: FlowContext, session ) -> Optional[str]: """Execute a single node using NodeRegistry""" executor = NodeRegistry.get(node_type) if executor: result = await executor.execute(config, context, session) # Handle special results if result == "goto" and context.get("_goto_node_id"): session.current_node_id = context.get("_goto_node_id") context.set("_goto_node_id", None) return result return "default" # ... rest of the class remains the same (process_message, _find_matching_flow, etc.) ``` **Commit:** `feat(fase4): refactor FlowEngine to use NodeRegistry` --- ## Task 10: Frontend - Advanced Node Components **Files:** - Modify: `frontend/src/pages/FlowBuilder.tsx` **Add new node components and update nodeTypes:** ```tsx // Add these new node components after existing ones const SwitchNode = ({ data }: { data: any }) => (
{'{{global.' + key + '}}'}
{record.is_secret &&