diff --git a/docs/plans/2026-01-29-fase-4-flow-engine-avanzado.md b/docs/plans/2026-01-29-fase-4-flow-engine-avanzado.md new file mode 100644 index 0000000..7bd3106 --- /dev/null +++ b/docs/plans/2026-01-29-fase-4-flow-engine-avanzado.md @@ -0,0 +1,1669 @@ +# Fase 4: Flow Engine Avanzado - Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Extender el Flow Engine con nodos avanzados (switch, loop, random, sub-flow, delay), validaciones, templates reutilizables, variables globales, ejecución de JavaScript y nodo de IA. + +**Architecture:** Extender FlowEngine._execute_node() con nuevos tipos de nodos. Crear sistema de NodeExecutor modular para separar lógica de cada nodo. Agregar modelo FlowTemplate para templates reutilizables y GlobalVariable para variables globales. Frontend: nuevos componentes de nodo en FlowBuilder. + +**Tech Stack:** Python/FastAPI (backend), PostgreSQL (DB), React/TypeScript/React Flow (frontend), OpenAI API (AI node) + +--- + +## Task 1: Node Executor Architecture + +**Files:** +- Create: `services/flow-engine/app/nodes/__init__.py` +- Create: `services/flow-engine/app/nodes/base.py` +- Create: `services/flow-engine/app/nodes/basic.py` + +**Code for base.py:** +```python +from abc import ABC, abstractmethod +from typing import Optional, Any +from app.context import FlowContext + + +class NodeExecutor(ABC): + """Base class for all node executors""" + + @abstractmethod + async def execute( + self, + config: dict, + context: FlowContext, + session: Any + ) -> Optional[str]: + """ + Execute the node logic. + Returns: branch name for routing (e.g., "default", "true", "false") + or "wait" to pause execution + """ + pass + + +class NodeRegistry: + """Registry of all available node executors""" + _executors: dict[str, NodeExecutor] = {} + + @classmethod + def register(cls, node_type: str, executor: NodeExecutor): + cls._executors[node_type] = executor + + @classmethod + def get(cls, node_type: str) -> Optional[NodeExecutor]: + return cls._executors.get(node_type) + + @classmethod + def execute(cls, node_type: str, config: dict, context: FlowContext, session: Any) -> Optional[str]: + executor = cls.get(node_type) + if executor: + return executor.execute(config, context, session) + return "default" +``` + +**Code for basic.py:** +```python +from typing import Optional, Any +from app.nodes.base import NodeExecutor, NodeRegistry +from app.context import FlowContext + + +class TriggerExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + return "default" + + +class MessageExecutor(NodeExecutor): + def __init__(self, send_message_fn): + self.send_message = send_message_fn + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + text = context.interpolate(config.get("text", "")) + await self.send_message(session.conversation_id, text) + return "default" + + +class ButtonsExecutor(NodeExecutor): + def __init__(self, send_message_fn): + self.send_message = send_message_fn + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + text = context.interpolate(config.get("text", "")) + buttons = config.get("buttons", []) + button_text = "\n".join([f"• {b['label']}" for b in buttons]) + await self.send_message(session.conversation_id, f"{text}\n\n{button_text}") + return "wait" + + +class WaitInputExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "user_input") + context.set(variable, context.message.get("content", "")) + return "default" + + +class SetVariableExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + var_name = config.get("variable", "") + var_value = context.interpolate(config.get("value", "")) + context.set(var_name, var_value) + return "default" + + +class ConditionExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + conditions = config.get("conditions", []) + + for cond in conditions: + field = cond.get("field", "") + operator = cond.get("operator", "equals") + value = cond.get("value", "") + branch = cond.get("branch", "default") + + actual = context.get(field) or context.message.get("content", "") + + if operator == "equals" and str(actual).lower() == str(value).lower(): + return branch + elif operator == "contains" and str(value).lower() in str(actual).lower(): + return branch + elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): + return branch + elif operator == "not_equals" and str(actual).lower() != str(value).lower(): + return branch + elif operator == "greater_than": + try: + if float(actual) > float(value): + return branch + except ValueError: + pass + elif operator == "less_than": + try: + if float(actual) < float(value): + return branch + except ValueError: + pass + + return "default" +``` + +**Code for __init__.py:** +```python +from app.nodes.base import NodeExecutor, NodeRegistry +from app.nodes.basic import ( + TriggerExecutor, MessageExecutor, ButtonsExecutor, + WaitInputExecutor, SetVariableExecutor, ConditionExecutor +) +``` + +**Commit:** `feat(fase4): add NodeExecutor architecture with basic nodes` + +--- + +## Task 2: Advanced Nodes - Switch, Delay, Random + +**Files:** +- Create: `services/flow-engine/app/nodes/advanced.py` +- Modify: `services/flow-engine/app/nodes/__init__.py` + +**Code for advanced.py:** +```python +import asyncio +import random as random_module +from typing import Optional, Any +from app.nodes.base import NodeExecutor +from app.context import FlowContext + + +class SwitchExecutor(NodeExecutor): + """Multi-branch switch based on variable value""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + cases = config.get("cases", []) # [{value: "x", branch: "case_x"}, ...] + + actual = context.get(variable) or context.message.get("content", "") + actual_str = str(actual).lower().strip() + + for case in cases: + case_value = str(case.get("value", "")).lower().strip() + if actual_str == case_value: + return case.get("branch", "default") + + return config.get("default_branch", "default") + + +class DelayExecutor(NodeExecutor): + """Wait for a specified duration before continuing""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + delay_seconds = config.get("seconds", 0) + delay_type = config.get("type", "fixed") # fixed or random + + if delay_type == "random": + min_delay = config.get("min_seconds", 1) + max_delay = config.get("max_seconds", 5) + delay_seconds = random_module.uniform(min_delay, max_delay) + + if delay_seconds > 0: + await asyncio.sleep(min(delay_seconds, 30)) # Max 30 seconds + + return "default" + + +class RandomExecutor(NodeExecutor): + """Random branch selection for A/B testing""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + branches = config.get("branches", []) # [{branch: "a", weight: 50}, {branch: "b", weight: 50}] + + if not branches: + return "default" + + total_weight = sum(b.get("weight", 1) for b in branches) + rand_value = random_module.uniform(0, total_weight) + + cumulative = 0 + for branch in branches: + cumulative += branch.get("weight", 1) + if rand_value <= cumulative: + # Track A/B test result + test_name = config.get("test_name", "ab_test") + context.set(f"_ab_{test_name}", branch.get("branch", "default")) + return branch.get("branch", "default") + + return branches[-1].get("branch", "default") if branches else "default" + + +class LoopExecutor(NodeExecutor): + """Loop a certain number of times or until condition""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + loop_var = config.get("counter_variable", "_loop_counter") + max_iterations = config.get("max_iterations", 10) + + current = int(context.get(loop_var) or 0) + + if current < max_iterations: + context.set(loop_var, current + 1) + return "continue" # Continue loop + else: + context.set(loop_var, 0) # Reset counter + return "done" # Exit loop + + +class GoToExecutor(NodeExecutor): + """Jump to another node or flow""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + target_node_id = config.get("target_node_id") + target_flow_id = config.get("target_flow_id") + + if target_flow_id: + # Store for sub-flow execution + context.set("_goto_flow_id", target_flow_id) + return "sub_flow" + + if target_node_id: + context.set("_goto_node_id", target_node_id) + return "goto" + + return "default" +``` + +**Update __init__.py:** +```python +from app.nodes.base import NodeExecutor, NodeRegistry +from app.nodes.basic import ( + TriggerExecutor, MessageExecutor, ButtonsExecutor, + WaitInputExecutor, SetVariableExecutor, ConditionExecutor +) +from app.nodes.advanced import ( + SwitchExecutor, DelayExecutor, RandomExecutor, + LoopExecutor, GoToExecutor +) +``` + +**Commit:** `feat(fase4): add advanced nodes - switch, delay, random, loop, goto` + +--- + +## Task 3: Validation Nodes + +**Files:** +- Create: `services/flow-engine/app/nodes/validation.py` +- Modify: `services/flow-engine/app/nodes/__init__.py` + +**Code for validation.py:** +```python +import re +from typing import Optional, Any +from app.nodes.base import NodeExecutor +from app.context import FlowContext + + +class ValidateEmailExecutor(NodeExecutor): + """Validate email format""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + + if re.match(email_pattern, str(value).strip()): + return "valid" + return "invalid" + + +class ValidatePhoneExecutor(NodeExecutor): + """Validate phone number format""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + + # Remove common separators + cleaned = re.sub(r'[\s\-\(\)\+]', '', str(value)) + + # Check if it's mostly digits and reasonable length + if cleaned.isdigit() and 8 <= len(cleaned) <= 15: + return "valid" + return "invalid" + + +class ValidateNumberExecutor(NodeExecutor): + """Validate numeric value within range""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + + min_val = config.get("min") + max_val = config.get("max") + + try: + num = float(str(value).strip()) + + if min_val is not None and num < float(min_val): + return "invalid" + if max_val is not None and num > float(max_val): + return "invalid" + + return "valid" + except ValueError: + return "invalid" + + +class ValidateDateExecutor(NodeExecutor): + """Validate date format""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + date_format = config.get("format", "%Y-%m-%d") + + from datetime import datetime + + try: + datetime.strptime(str(value).strip(), date_format) + return "valid" + except ValueError: + return "invalid" + + +class ValidateRegexExecutor(NodeExecutor): + """Validate against custom regex pattern""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + pattern = config.get("pattern", ".*") + + try: + if re.match(pattern, str(value).strip()): + return "valid" + return "invalid" + except re.error: + return "invalid" + + +class ValidateOptionsExecutor(NodeExecutor): + """Validate against list of allowed options""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "") + value = context.get(variable) or context.message.get("content", "") + options = config.get("options", []) + case_sensitive = config.get("case_sensitive", False) + + value_str = str(value).strip() + if not case_sensitive: + value_str = value_str.lower() + options = [str(o).lower() for o in options] + + if value_str in options: + return "valid" + return "invalid" +``` + +**Update __init__.py:** +```python +from app.nodes.validation import ( + ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, + ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor +) +``` + +**Commit:** `feat(fase4): add validation nodes for email, phone, number, date, regex, options` + +--- + +## Task 4: JavaScript Node + +**Files:** +- Create: `services/flow-engine/app/nodes/script.py` +- Modify: `services/flow-engine/app/nodes/__init__.py` + +**Code for script.py:** +```python +import json +from typing import Optional, Any +from app.nodes.base import NodeExecutor +from app.context import FlowContext + + +class JavaScriptExecutor(NodeExecutor): + """Execute JavaScript code (using Python eval with restricted globals)""" + + ALLOWED_BUILTINS = { + 'abs': abs, 'all': all, 'any': any, 'bool': bool, + 'dict': dict, 'float': float, 'int': int, 'len': len, + 'list': list, 'max': max, 'min': min, 'round': round, + 'str': str, 'sum': sum, 'sorted': sorted, + } + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + code = config.get("code", "") + output_variable = config.get("output_variable", "_result") + + if not code: + return "default" + + # Build safe execution context + exec_globals = { + '__builtins__': self.ALLOWED_BUILTINS, + 'context': { + 'contact': context.contact, + 'conversation': context.conversation, + 'message': context.message, + 'variables': context.variables.copy(), + }, + 'variables': context.variables.copy(), + } + + try: + # Simple expression evaluation (not full JS, but Python expressions) + # For safety, we use a restricted eval + result = eval(code, exec_globals, {}) + + if result is not None: + context.set(output_variable, result) + + return "success" + except Exception as e: + context.set("_script_error", str(e)) + return "error" + + +class HttpRequestExecutor(NodeExecutor): + """Make HTTP requests to external APIs""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + import httpx + + url = context.interpolate(config.get("url", "")) + method = config.get("method", "GET").upper() + headers = config.get("headers", {}) + body = config.get("body") + output_variable = config.get("output_variable", "_http_response") + timeout = config.get("timeout", 10) + + if not url: + return "error" + + # Interpolate headers and body + headers = {k: context.interpolate(str(v)) for k, v in headers.items()} + if body and isinstance(body, str): + body = context.interpolate(body) + + try: + async with httpx.AsyncClient() as client: + response = await client.request( + method=method, + url=url, + headers=headers, + json=json.loads(body) if body and isinstance(body, str) else body, + timeout=timeout + ) + + context.set(output_variable, { + "status": response.status_code, + "body": response.text, + "json": response.json() if response.headers.get("content-type", "").startswith("application/json") else None + }) + + if 200 <= response.status_code < 300: + return "success" + return "error" + + except Exception as e: + context.set("_http_error", str(e)) + return "error" +``` + +**Update __init__.py** to add: +```python +from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor +``` + +**Commit:** `feat(fase4): add JavaScript and HTTP request nodes` + +--- + +## Task 5: AI Response Node + +**Files:** +- Create: `services/flow-engine/app/nodes/ai.py` +- Modify: `services/flow-engine/app/config.py` +- Modify: `services/flow-engine/app/nodes/__init__.py` + +**Add to config.py:** +```python + # OpenAI + OPENAI_API_KEY: str = "" + OPENAI_MODEL: str = "gpt-3.5-turbo" +``` + +**Code for ai.py:** +```python +from typing import Optional, Any +from app.nodes.base import NodeExecutor +from app.context import FlowContext +from app.config import get_settings + +settings = get_settings() + + +class AIResponseExecutor(NodeExecutor): + """Generate AI response using OpenAI""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + import httpx + + prompt = context.interpolate(config.get("prompt", "")) + system_prompt = config.get("system_prompt", "Eres un asistente útil.") + output_variable = config.get("output_variable", "_ai_response") + max_tokens = config.get("max_tokens", 500) + temperature = config.get("temperature", 0.7) + + if not settings.OPENAI_API_KEY: + context.set("_ai_error", "OpenAI API key not configured") + return "error" + + if not prompt: + return "error" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt} + ] + + # Include conversation history if configured + if config.get("include_history", False): + history = context.get("_conversation_history") or [] + messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": prompt}] + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + "https://api.openai.com/v1/chat/completions", + headers={ + "Authorization": f"Bearer {settings.OPENAI_API_KEY}", + "Content-Type": "application/json" + }, + json={ + "model": settings.OPENAI_MODEL, + "messages": messages, + "max_tokens": max_tokens, + "temperature": temperature + }, + timeout=30 + ) + + if response.status_code == 200: + data = response.json() + ai_response = data["choices"][0]["message"]["content"] + context.set(output_variable, ai_response) + return "success" + else: + context.set("_ai_error", response.text) + return "error" + + except Exception as e: + context.set("_ai_error", str(e)) + return "error" + + +class AISentimentExecutor(NodeExecutor): + """Analyze sentiment of user message""" + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + import httpx + + text = context.get(config.get("variable", "")) or context.message.get("content", "") + output_variable = config.get("output_variable", "_sentiment") + + if not settings.OPENAI_API_KEY or not text: + return "neutral" + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + "https://api.openai.com/v1/chat/completions", + headers={ + "Authorization": f"Bearer {settings.OPENAI_API_KEY}", + "Content-Type": "application/json" + }, + json={ + "model": "gpt-3.5-turbo", + "messages": [ + {"role": "system", "content": "Analyze the sentiment. Reply with only one word: positive, negative, or neutral"}, + {"role": "user", "content": text} + ], + "max_tokens": 10, + "temperature": 0 + }, + timeout=15 + ) + + if response.status_code == 200: + data = response.json() + sentiment = data["choices"][0]["message"]["content"].lower().strip() + context.set(output_variable, sentiment) + + if "positive" in sentiment: + return "positive" + elif "negative" in sentiment: + return "negative" + return "neutral" + + except Exception: + pass + + return "neutral" +``` + +**Update __init__.py:** +```python +from app.nodes.ai import AIResponseExecutor, AISentimentExecutor +``` + +**Commit:** `feat(fase4): add AI response and sentiment analysis nodes` + +--- + +## Task 6: Global Variables Model + +**Files:** +- Create: `services/api-gateway/app/models/global_variable.py` +- Modify: `services/api-gateway/app/models/__init__.py` + +**Code for global_variable.py:** +```python +import uuid +from datetime import datetime +from sqlalchemy import Column, String, Text, DateTime, Boolean +from sqlalchemy.dialects.postgresql import UUID, JSONB +from app.core.database import Base + + +class GlobalVariable(Base): + __tablename__ = "global_variables" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + key = Column(String(100), nullable=False, unique=True, index=True) + value = Column(Text, nullable=True) + value_type = Column(String(20), default="string", nullable=False) # string, number, boolean, json + description = Column(String(500), nullable=True) + is_secret = Column(Boolean, default=False, nullable=False) # Hide value in UI + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) +``` + +**Update models/__init__.py:** +```python +from app.models.global_variable import GlobalVariable +``` + +**Commit:** `feat(fase4): add GlobalVariable database model` + +--- + +## Task 7: Global Variables API + +**Files:** +- Create: `services/api-gateway/app/schemas/global_variable.py` +- Create: `services/api-gateway/app/routers/global_variables.py` +- Modify: `services/api-gateway/app/main.py` + +**Code for schemas/global_variable.py:** +```python +from pydantic import BaseModel +from typing import Optional +from uuid import UUID +from datetime import datetime + + +class GlobalVariableCreate(BaseModel): + key: str + value: Optional[str] = None + value_type: str = "string" + description: Optional[str] = None + is_secret: bool = False + + +class GlobalVariableUpdate(BaseModel): + value: Optional[str] = None + value_type: Optional[str] = None + description: Optional[str] = None + is_secret: Optional[bool] = None + + +class GlobalVariableResponse(BaseModel): + id: UUID + key: str + value: Optional[str] + value_type: str + description: Optional[str] + is_secret: bool + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True +``` + +**Code for routers/global_variables.py:** +```python +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from typing import List +from uuid import UUID +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User, UserRole +from app.models.global_variable import GlobalVariable +from app.schemas.global_variable import ( + GlobalVariableCreate, GlobalVariableUpdate, GlobalVariableResponse +) + +router = APIRouter(prefix="/api/global-variables", tags=["global-variables"]) + + +def require_admin(current_user: User = Depends(get_current_user)): + if current_user.role != UserRole.ADMIN: + raise HTTPException(status_code=403, detail="Admin required") + return current_user + + +@router.get("", response_model=List[GlobalVariableResponse]) +def list_global_variables( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + variables = db.query(GlobalVariable).all() + # Mask secret values + for var in variables: + if var.is_secret: + var.value = "********" + return variables + + +@router.post("", response_model=GlobalVariableResponse) +def create_global_variable( + request: GlobalVariableCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + existing = db.query(GlobalVariable).filter(GlobalVariable.key == request.key).first() + if existing: + raise HTTPException(status_code=400, detail="Variable key already exists") + + variable = GlobalVariable(**request.model_dump()) + db.add(variable) + db.commit() + db.refresh(variable) + return variable + + +@router.put("/{variable_id}", response_model=GlobalVariableResponse) +def update_global_variable( + variable_id: UUID, + request: GlobalVariableUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() + if not variable: + raise HTTPException(status_code=404, detail="Variable not found") + + for key, value in request.model_dump(exclude_unset=True).items(): + setattr(variable, key, value) + + db.commit() + db.refresh(variable) + return variable + + +@router.delete("/{variable_id}") +def delete_global_variable( + variable_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + variable = db.query(GlobalVariable).filter(GlobalVariable.id == variable_id).first() + if not variable: + raise HTTPException(status_code=404, detail="Variable not found") + + db.delete(variable) + db.commit() + return {"success": True} + + +@router.get("/by-key/{key}") +def get_variable_by_key( + key: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + variable = db.query(GlobalVariable).filter(GlobalVariable.key == key).first() + if not variable: + raise HTTPException(status_code=404, detail="Variable not found") + + return {"key": variable.key, "value": variable.value if not variable.is_secret else None} +``` + +**Add to main.py:** +```python +from app.routers import global_variables +app.include_router(global_variables.router) +``` + +**Commit:** `feat(fase4): add GlobalVariable API routes` + +--- + +## Task 8: Flow Templates Model and API + +**Files:** +- Create: `services/api-gateway/app/models/flow_template.py` +- Create: `services/api-gateway/app/schemas/flow_template.py` +- Create: `services/api-gateway/app/routers/flow_templates.py` +- Modify: `services/api-gateway/app/models/__init__.py` +- Modify: `services/api-gateway/app/main.py` + +**Code for flow_template.py:** +```python +import uuid +from datetime import datetime +from sqlalchemy import Column, String, Text, DateTime +from sqlalchemy.dialects.postgresql import UUID, JSONB +from app.core.database import Base + + +class FlowTemplate(Base): + __tablename__ = "flow_templates" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(100), nullable=False) + description = Column(Text, nullable=True) + category = Column(String(50), default="general", nullable=False) + nodes = Column(JSONB, default=list) + edges = Column(JSONB, default=list) + variables = Column(JSONB, default=dict) + preview_image = Column(String(500), nullable=True) + is_public = Column(Boolean, default=True, nullable=False) + created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) +``` + +Add missing imports to flow_template.py: +```python +from sqlalchemy import Column, String, Text, DateTime, Boolean, ForeignKey +``` + +**Code for schemas/flow_template.py:** +```python +from pydantic import BaseModel +from typing import Optional, List +from uuid import UUID +from datetime import datetime + + +class FlowTemplateCreate(BaseModel): + name: str + description: Optional[str] = None + category: str = "general" + nodes: List[dict] = [] + edges: List[dict] = [] + variables: dict = {} + is_public: bool = True + + +class FlowTemplateResponse(BaseModel): + id: UUID + name: str + description: Optional[str] + category: str + nodes: List[dict] + edges: List[dict] + variables: dict + preview_image: Optional[str] + is_public: bool + created_by: UUID + created_at: datetime + + class Config: + from_attributes = True +``` + +**Code for routers/flow_templates.py:** +```python +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from typing import List +from uuid import UUID +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User +from app.models.flow_template import FlowTemplate +from app.models.flow import Flow +from app.schemas.flow_template import FlowTemplateCreate, FlowTemplateResponse + +router = APIRouter(prefix="/api/flow-templates", tags=["flow-templates"]) + + +@router.get("", response_model=List[FlowTemplateResponse]) +def list_templates( + category: str = None, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + query = db.query(FlowTemplate).filter(FlowTemplate.is_public == True) + if category: + query = query.filter(FlowTemplate.category == category) + return query.all() + + +@router.post("", response_model=FlowTemplateResponse) +def create_template( + request: FlowTemplateCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + template = FlowTemplate( + **request.model_dump(), + created_by=current_user.id, + ) + db.add(template) + db.commit() + db.refresh(template) + return template + + +@router.post("/{template_id}/use") +def use_template( + template_id: UUID, + flow_name: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + """Create a new flow from template""" + template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + from app.models.flow import Flow, TriggerType + + flow = Flow( + name=flow_name, + description=f"Created from template: {template.name}", + trigger_type=TriggerType.KEYWORD, + nodes=template.nodes, + edges=template.edges, + variables=template.variables, + is_active=False, + ) + db.add(flow) + db.commit() + db.refresh(flow) + + return {"success": True, "flow_id": str(flow.id)} + + +@router.delete("/{template_id}") +def delete_template( + template_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + template = db.query(FlowTemplate).filter(FlowTemplate.id == template_id).first() + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + if template.created_by != current_user.id: + raise HTTPException(status_code=403, detail="Not authorized") + + db.delete(template) + db.commit() + return {"success": True} +``` + +**Update models/__init__.py:** +```python +from app.models.flow_template import FlowTemplate +``` + +**Add to main.py:** +```python +from app.routers import flow_templates +app.include_router(flow_templates.router) +``` + +**Commit:** `feat(fase4): add FlowTemplate model and API` + +--- + +## Task 9: Refactor Engine to Use NodeRegistry + +**Files:** +- Modify: `services/flow-engine/app/engine.py` + +**Replace _execute_node method and add initialization:** + +```python +from typing import Optional +import httpx +from app.config import get_settings +from app.context import FlowContext +from app.nodes import NodeRegistry +from app.nodes.basic import ( + TriggerExecutor, MessageExecutor, ButtonsExecutor, + WaitInputExecutor, SetVariableExecutor, ConditionExecutor +) +from app.nodes.advanced import ( + SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor +) +from app.nodes.validation import ( + ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, + ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor +) +from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor +from app.nodes.ai import AIResponseExecutor, AISentimentExecutor + +settings = get_settings() + + +class FlowEngine: + """Executes flow nodes based on incoming messages""" + + def __init__(self, db_session): + self.db = db_session + self._register_nodes() + + def _register_nodes(self): + """Register all available node executors""" + # Basic nodes + NodeRegistry.register("trigger", TriggerExecutor()) + NodeRegistry.register("message", MessageExecutor(self._send_message)) + NodeRegistry.register("buttons", ButtonsExecutor(self._send_message)) + NodeRegistry.register("wait_input", WaitInputExecutor()) + NodeRegistry.register("set_variable", SetVariableExecutor()) + NodeRegistry.register("condition", ConditionExecutor()) + + # Advanced nodes + NodeRegistry.register("switch", SwitchExecutor()) + NodeRegistry.register("delay", DelayExecutor()) + NodeRegistry.register("random", RandomExecutor()) + NodeRegistry.register("loop", LoopExecutor()) + NodeRegistry.register("goto", GoToExecutor()) + + # Validation nodes + NodeRegistry.register("validate_email", ValidateEmailExecutor()) + NodeRegistry.register("validate_phone", ValidatePhoneExecutor()) + NodeRegistry.register("validate_number", ValidateNumberExecutor()) + NodeRegistry.register("validate_date", ValidateDateExecutor()) + NodeRegistry.register("validate_regex", ValidateRegexExecutor()) + NodeRegistry.register("validate_options", ValidateOptionsExecutor()) + + # Script nodes + NodeRegistry.register("javascript", JavaScriptExecutor()) + NodeRegistry.register("http_request", HttpRequestExecutor()) + + # AI nodes + NodeRegistry.register("ai_response", AIResponseExecutor()) + NodeRegistry.register("ai_sentiment", AISentimentExecutor()) + + async def _execute_node( + self, + node_type: str, + config: dict, + context: FlowContext, + session + ) -> Optional[str]: + """Execute a single node using NodeRegistry""" + executor = NodeRegistry.get(node_type) + if executor: + result = await executor.execute(config, context, session) + + # Handle special results + if result == "goto" and context.get("_goto_node_id"): + session.current_node_id = context.get("_goto_node_id") + context.set("_goto_node_id", None) + + return result + + return "default" + + # ... rest of the class remains the same (process_message, _find_matching_flow, etc.) +``` + +**Commit:** `feat(fase4): refactor FlowEngine to use NodeRegistry` + +--- + +## Task 10: Frontend - Advanced Node Components + +**Files:** +- Modify: `frontend/src/pages/FlowBuilder.tsx` + +**Add new node components and update nodeTypes:** + +```tsx +// Add these new node components after existing ones + +const SwitchNode = ({ data }: { data: any }) => ( +
+ 🔀 Switch +
{data.config?.variable || 'variable'}
+
+); + +const DelayNode = ({ data }: { data: any }) => ( +
+ ⏱️ Delay +
{data.config?.seconds || 0}s
+
+); + +const RandomNode = ({ data }: { data: any }) => ( +
+ 🎲 Random +
A/B Test
+
+); + +const LoopNode = ({ data }: { data: any }) => ( +
+ 🔄 Loop +
max: {data.config?.max_iterations || 10}
+
+); + +const ValidateNode = ({ data }: { data: any }) => ( +
+ ✅ Validate +
{data.config?.type || 'email'}
+
+); + +const HttpNode = ({ data }: { data: any }) => ( +
+ 🌐 HTTP +
{data.config?.method || 'GET'}
+
+); + +const AINode = ({ data }: { data: any }) => ( +
+ 🤖 AI Response +
+); + +const SetVariableNode = ({ data }: { data: any }) => ( +
+ 📝 Variable +
{data.config?.variable || 'var'}
+
+); + +// Update nodeTypes +const nodeTypes: NodeTypes = { + trigger: TriggerNode, + message: MessageNode, + condition: ConditionNode, + wait_input: WaitInputNode, + switch: SwitchNode, + delay: DelayNode, + random: RandomNode, + loop: LoopNode, + validate: ValidateNode, + http_request: HttpNode, + ai_response: AINode, + set_variable: SetVariableNode, +}; +``` + +**Update the toolbar to add new node buttons:** +```tsx + + + + + + + + + + + + + + + +``` + +**Commit:** `feat(fase4): add advanced node components to FlowBuilder` + +--- + +## Task 11: Frontend - Global Variables Page + +**Files:** +- Create: `frontend/src/pages/GlobalVariables.tsx` + +**Code:** +```tsx +import { useState } from 'react'; +import { + Card, Table, Button, Modal, Form, Input, Select, Switch, Space, Tag, Popconfirm, message, Typography, +} from 'antd'; +import { PlusOutlined, EditOutlined, DeleteOutlined, LockOutlined } from '@ant-design/icons'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { apiClient } from '../api/client'; + +const { Title } = Typography; + +interface GlobalVariable { + id: string; + key: string; + value: string | null; + value_type: string; + description: string | null; + is_secret: boolean; +} + +export default function GlobalVariables() { + const [isModalOpen, setIsModalOpen] = useState(false); + const [selectedVar, setSelectedVar] = useState(null); + const [form] = Form.useForm(); + const queryClient = useQueryClient(); + + const { data: variables, isLoading } = useQuery({ + queryKey: ['global-variables'], + queryFn: () => apiClient.get('/api/global-variables'), + }); + + const createMutation = useMutation({ + mutationFn: (data: Partial) => apiClient.post('/api/global-variables', data), + onSuccess: () => { + message.success('Variable creada'); + queryClient.invalidateQueries({ queryKey: ['global-variables'] }); + closeModal(); + }, + }); + + const updateMutation = useMutation({ + mutationFn: ({ id, data }: { id: string; data: Partial }) => + apiClient.put(`/api/global-variables/${id}`, data), + onSuccess: () => { + message.success('Variable actualizada'); + queryClient.invalidateQueries({ queryKey: ['global-variables'] }); + closeModal(); + }, + }); + + const deleteMutation = useMutation({ + mutationFn: (id: string) => apiClient.delete(`/api/global-variables/${id}`), + onSuccess: () => { + message.success('Variable eliminada'); + queryClient.invalidateQueries({ queryKey: ['global-variables'] }); + }, + }); + + const closeModal = () => { + setIsModalOpen(false); + setSelectedVar(null); + form.resetFields(); + }; + + const handleSubmit = (values: Partial) => { + if (selectedVar) { + updateMutation.mutate({ id: selectedVar.id, data: values }); + } else { + createMutation.mutate(values); + } + }; + + const handleEdit = (variable: GlobalVariable) => { + setSelectedVar(variable); + form.setFieldsValue(variable); + setIsModalOpen(true); + }; + + const columns = [ + { + title: 'Clave', + dataIndex: 'key', + key: 'key', + render: (key: string, record: GlobalVariable) => ( + + {'{{global.' + key + '}}'} + {record.is_secret && } + + ), + }, + { + title: 'Valor', + dataIndex: 'value', + key: 'value', + render: (value: string, record: GlobalVariable) => + record.is_secret ? Secreto : value, + }, + { + title: 'Tipo', + dataIndex: 'value_type', + key: 'value_type', + render: (type: string) => {type}, + }, + { + title: 'Descripción', + dataIndex: 'description', + key: 'description', + }, + { + title: 'Acciones', + key: 'actions', + render: (_: unknown, record: GlobalVariable) => ( + + + + + + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
+ + ); +} +``` + +**Commit:** `feat(fase4): add GlobalVariables frontend page` + +--- + +## Task 12: Frontend - Flow Templates Page + +**Files:** +- Create: `frontend/src/pages/FlowTemplates.tsx` + +**Code:** +```tsx +import { useState } from 'react'; +import { + Card, Row, Col, Button, Modal, Input, Typography, Tag, Empty, message, +} from 'antd'; +import { PlusOutlined, CopyOutlined } from '@ant-design/icons'; +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import { useNavigate } from 'react-router-dom'; +import { apiClient } from '../api/client'; + +const { Title, Text, Paragraph } = Typography; + +interface FlowTemplate { + id: string; + name: string; + description: string | null; + category: string; + nodes: any[]; +} + +export default function FlowTemplates() { + const [isCreateModalOpen, setIsCreateModalOpen] = useState(false); + const [selectedTemplate, setSelectedTemplate] = useState(null); + const [flowName, setFlowName] = useState(''); + const navigate = useNavigate(); + const queryClient = useQueryClient(); + + const { data: templates, isLoading } = useQuery({ + queryKey: ['flow-templates'], + queryFn: () => apiClient.get('/api/flow-templates'), + }); + + const useTemplateMutation = useMutation({ + mutationFn: async ({ templateId, name }: { templateId: string; name: string }) => { + const response = await apiClient.post<{ flow_id: string }>(`/api/flow-templates/${templateId}/use?flow_name=${encodeURIComponent(name)}`, {}); + return response; + }, + onSuccess: (data) => { + message.success('Flujo creado desde plantilla'); + setIsCreateModalOpen(false); + setSelectedTemplate(null); + setFlowName(''); + navigate(`/flows/${data.flow_id}`); + }, + }); + + const handleUseTemplate = (template: FlowTemplate) => { + setSelectedTemplate(template); + setFlowName(`${template.name} - Copia`); + setIsCreateModalOpen(true); + }; + + const categoryColors: Record = { + general: 'blue', + sales: 'green', + support: 'orange', + marketing: 'purple', + }; + + return ( +
+
+ Plantillas de Flujos +
+ + {isLoading ? ( + + ) : templates?.length === 0 ? ( + + ) : ( + + {templates?.map((template) => ( +
+ } onClick={() => handleUseTemplate(template)}> + Usar + , + ]} + > + + {template.category} + + {template.description || 'Sin descripción'} + + + {template.nodes.length} nodos + + + } + /> + + + ))} + + )} + + { setIsCreateModalOpen(false); setSelectedTemplate(null); }} + onOk={() => { + if (selectedTemplate && flowName) { + useTemplateMutation.mutate({ templateId: selectedTemplate.id, name: flowName }); + } + }} + okText="Crear" + confirmLoading={useTemplateMutation.isPending} + > +
+ Plantilla: + {selectedTemplate?.name} +
+ setFlowName(e.target.value)} + /> +
+ + ); +} +``` + +**Commit:** `feat(fase4): add FlowTemplates frontend page` + +--- + +## Task 13: Update MainLayout with New Routes + +**Files:** +- Modify: `frontend/src/layouts/MainLayout.tsx` + +**Add imports:** +```tsx +import { GlobalOutlined, AppstoreOutlined } from '@ant-design/icons'; +import GlobalVariables from '../pages/GlobalVariables'; +import FlowTemplates from '../pages/FlowTemplates'; +``` + +**Add menu items (after Flujos):** +```tsx + { + key: '/templates', + icon: , + label: 'Plantillas', + }, + { + key: '/variables', + icon: , + label: 'Variables', + }, +``` + +**Add routes:** +```tsx + } /> + } /> +``` + +**Commit:** `feat(fase4): add Templates and Variables routes to MainLayout` + +--- + +## Task 14: Update Docker Compose for OpenAI + +**Files:** +- Modify: `docker-compose.yml` + +**Add environment variable to flow-engine service:** +```yaml + flow-engine: + build: ./services/flow-engine + environment: + DATABASE_URL: postgresql://${DB_USER}:${DB_PASSWORD}@postgres:5432/whatsapp_central + REDIS_URL: redis://redis:6379 + API_GATEWAY_URL: http://api-gateway:8000 + OPENAI_API_KEY: ${OPENAI_API_KEY} + OPENAI_MODEL: ${OPENAI_MODEL:-gpt-3.5-turbo} +``` + +**Add to .env.example:** +``` +OPENAI_API_KEY=sk-your-api-key +OPENAI_MODEL=gpt-3.5-turbo +``` + +**Commit:** `feat(fase4): add OpenAI configuration to Docker Compose` + +--- + +## Task 15: Final Integration Commit + +**Final verification and commit marking Fase 4 complete.** + +**Commit:** `feat(fase4): complete Flow Engine Avanzado phase` + +--- + +## Summary + +This plan implements: + +1. **Node Architecture**: Modular NodeExecutor system with registry +2. **Basic Nodes**: Refactored trigger, message, buttons, wait_input, set_variable, condition +3. **Advanced Nodes**: switch, delay, random, loop, goto +4. **Validation Nodes**: email, phone, number, date, regex, options +5. **Script Nodes**: JavaScript (Python eval), HTTP request +6. **AI Nodes**: AI response (OpenAI), sentiment analysis +7. **Global Variables**: Database model, API, frontend management +8. **Flow Templates**: Reusable templates system +9. **Frontend**: New node components, GlobalVariables page, FlowTemplates page + +**Key Features:** +- A/B testing with random node (weighted distribution) +- AI-powered responses using OpenAI +- HTTP requests for external API integration +- Input validation with multiple types +- Reusable flow templates +- Global variables for shared configuration