# Fase 2: Flow Engine Básico - Plan de Implementación > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Crear el motor de chatbot con Flow Builder visual para automatizar conversaciones de WhatsApp. **Architecture:** Flow Engine (Python) procesa mensajes entrantes, evalúa triggers, ejecuta nodos y gestiona estado. Frontend usa React Flow para editor visual drag & drop. Los flujos se almacenan como JSON en PostgreSQL. **Tech Stack:** Python 3.11, FastAPI, React Flow, Zustand, PostgreSQL JSONB, Redis para estado de sesiones. --- ## Task 1: Database Models para Flows **Files:** - Create: `services/api-gateway/app/models/flow.py` - Modify: `services/api-gateway/app/models/__init__.py` **Step 1: Create flow.py** ```python import uuid from datetime import datetime from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum from sqlalchemy.dialects.postgresql import UUID, JSONB import enum from app.core.database import Base class TriggerType(str, enum.Enum): WELCOME = "welcome" KEYWORD = "keyword" FALLBACK = "fallback" EVENT = "event" MANUAL = "manual" class Flow(Base): __tablename__ = "flows" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) description = Column(Text, nullable=True) trigger_type = Column(SQLEnum(TriggerType), nullable=False) trigger_value = Column(String(255), nullable=True) # keywords, event name, etc. nodes = Column(JSONB, default=list) # Array of node definitions edges = Column(JSONB, default=list) # React Flow edges variables = Column(JSONB, default=dict) # Flow-level variables is_active = Column(Boolean, default=False, nullable=False) version = Column(Integer, default=1, nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) class FlowSession(Base): """Active flow execution state per conversation""" __tablename__ = "flow_sessions" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True) flow_id = Column(UUID(as_uuid=True), nullable=False) current_node_id = Column(String(100), nullable=True) variables = Column(JSONB, default=dict) # Session variables waiting_for_input = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) ``` **Step 2: Update models/__init__.py** ```python from app.models.user import User from app.models.whatsapp import WhatsAppAccount, Contact, Conversation, Message from app.models.flow import Flow, FlowSession, TriggerType __all__ = [ "User", "WhatsAppAccount", "Contact", "Conversation", "Message", "Flow", "FlowSession", "TriggerType" ] ``` **Step 3: Commit** ```bash git add services/api-gateway/app/models/ git commit -m "feat(api-gateway): add Flow and FlowSession database models" ``` --- ## Task 2: Flow API Schemas **Files:** - Create: `services/api-gateway/app/schemas/flow.py` - Modify: `services/api-gateway/app/schemas/__init__.py` **Step 1: Create flow.py** ```python from pydantic import BaseModel from typing import Optional, List, Any from uuid import UUID from datetime import datetime from app.models.flow import TriggerType class NodeData(BaseModel): label: str type: str # message, condition, wait_input, buttons, etc. config: dict = {} class FlowNode(BaseModel): id: str type: str position: dict # {x: number, y: number} data: NodeData class FlowEdge(BaseModel): id: str source: str target: str sourceHandle: Optional[str] = None targetHandle: Optional[str] = None class FlowCreate(BaseModel): name: str description: Optional[str] = None trigger_type: TriggerType trigger_value: Optional[str] = None class FlowUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None trigger_type: Optional[TriggerType] = None trigger_value: Optional[str] = None nodes: Optional[List[dict]] = None edges: Optional[List[dict]] = None variables: Optional[dict] = None is_active: Optional[bool] = None class FlowResponse(BaseModel): id: UUID name: str description: Optional[str] trigger_type: TriggerType trigger_value: Optional[str] nodes: List[dict] edges: List[dict] variables: dict is_active: bool version: int created_at: datetime updated_at: datetime class Config: from_attributes = True class FlowListResponse(BaseModel): id: UUID name: str trigger_type: TriggerType is_active: bool version: int updated_at: datetime class Config: from_attributes = True ``` **Step 2: Update schemas/__init__.py** ```python from app.schemas import auth, whatsapp, flow __all__ = ["auth", "whatsapp", "flow"] ``` **Step 3: Commit** ```bash git add services/api-gateway/app/schemas/ git commit -m "feat(api-gateway): add Flow API schemas" ``` --- ## Task 3: Flow API Routes **Files:** - Create: `services/api-gateway/app/routers/flows.py` - Modify: `services/api-gateway/app/routers/__init__.py` - Modify: `services/api-gateway/app/main.py` **Step 1: Create flows.py** ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from typing import List from uuid import UUID from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User, UserRole from app.models.flow import Flow, TriggerType from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse router = APIRouter(prefix="/api/flows", tags=["flows"]) def require_admin(current_user: User = Depends(get_current_user)): if current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.get("", response_model=List[FlowListResponse]) def list_flows( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): flows = db.query(Flow).order_by(Flow.updated_at.desc()).all() return flows @router.post("", response_model=FlowResponse) def create_flow( request: FlowCreate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): flow = Flow( name=request.name, description=request.description, trigger_type=request.trigger_type, trigger_value=request.trigger_value, nodes=[], edges=[], variables={}, ) db.add(flow) db.commit() db.refresh(flow) return flow @router.get("/{flow_id}", response_model=FlowResponse) def get_flow( flow_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): flow = db.query(Flow).filter(Flow.id == flow_id).first() if not flow: raise HTTPException(status_code=404, detail="Flow not found") return flow @router.put("/{flow_id}", response_model=FlowResponse) def update_flow( flow_id: UUID, request: FlowUpdate, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): flow = db.query(Flow).filter(Flow.id == flow_id).first() if not flow: raise HTTPException(status_code=404, detail="Flow not found") update_data = request.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(flow, field, value) flow.version += 1 db.commit() db.refresh(flow) return flow @router.delete("/{flow_id}") def delete_flow( flow_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): flow = db.query(Flow).filter(Flow.id == flow_id).first() if not flow: raise HTTPException(status_code=404, detail="Flow not found") db.delete(flow) db.commit() return {"success": True} @router.post("/{flow_id}/activate") def activate_flow( flow_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): flow = db.query(Flow).filter(Flow.id == flow_id).first() if not flow: raise HTTPException(status_code=404, detail="Flow not found") # Deactivate other flows with same trigger if welcome/fallback if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]: db.query(Flow).filter( Flow.trigger_type == flow.trigger_type, Flow.id != flow_id ).update({"is_active": False}) flow.is_active = True db.commit() return {"success": True} @router.post("/{flow_id}/deactivate") def deactivate_flow( flow_id: UUID, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): flow = db.query(Flow).filter(Flow.id == flow_id).first() if not flow: raise HTTPException(status_code=404, detail="Flow not found") flow.is_active = False db.commit() return {"success": True} ``` **Step 2: Update routers/__init__.py** ```python from app.routers import auth, whatsapp, flows __all__ = ["auth", "whatsapp", "flows"] ``` **Step 3: Update main.py - add import and router** Add to imports: ```python from app.routers import auth, whatsapp, flows ``` Add router: ```python app.include_router(flows.router) ``` **Step 4: Commit** ```bash git add services/api-gateway/app/routers/ services/api-gateway/app/main.py git commit -m "feat(api-gateway): add Flow CRUD API routes" ``` --- ## Task 4: Flow Engine Service Setup **Files:** - Create: `services/flow-engine/requirements.txt` - Create: `services/flow-engine/Dockerfile` - Create: `services/flow-engine/app/__init__.py` - Create: `services/flow-engine/app/config.py` **Step 1: Create requirements.txt** ``` fastapi==0.115.6 uvicorn[standard]==0.34.0 sqlalchemy==2.0.36 psycopg2-binary==2.9.10 redis==5.2.1 httpx==0.28.1 pydantic==2.10.4 pydantic-settings==2.7.1 ``` **Step 2: Create Dockerfile** ```dockerfile FROM python:3.11-slim WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends \ gcc \ libpq-dev \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY app ./app EXPOSE 8001 CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001"] ``` **Step 3: Create app/__init__.py** ```python # WhatsApp Centralizado - Flow Engine ``` **Step 4: Create app/config.py** ```python from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): DATABASE_URL: str = "postgresql://whatsapp_admin:password@localhost:5432/whatsapp_central" REDIS_URL: str = "redis://localhost:6379" API_GATEWAY_URL: str = "http://localhost:8000" WHATSAPP_CORE_URL: str = "http://localhost:3001" class Config: env_file = ".env" @lru_cache def get_settings() -> Settings: return Settings() ``` **Step 5: Commit** ```bash git add services/flow-engine/ git commit -m "feat(flow-engine): setup project structure" ``` --- ## Task 5: Flow Engine Core **Files:** - Create: `services/flow-engine/app/engine.py` - Create: `services/flow-engine/app/context.py` **Step 1: Create context.py** ```python from typing import Any, Optional from datetime import datetime import re class FlowContext: """Manages variables and state during flow execution""" def __init__( self, contact: dict, conversation: dict, message: dict, session_vars: dict = None ): self.contact = contact self.conversation = conversation self.message = message self.variables = session_vars or {} self._system = { "date": datetime.now().strftime("%Y-%m-%d"), "time": datetime.now().strftime("%H:%M"), "day_of_week": datetime.now().strftime("%A"), } def get(self, key: str) -> Any: """Get variable by dot notation: contact.name, variables.email""" parts = key.split(".") if parts[0] == "contact": return self._get_nested(self.contact, parts[1:]) elif parts[0] == "conversation": return self._get_nested(self.conversation, parts[1:]) elif parts[0] == "message": return self._get_nested(self.message, parts[1:]) elif parts[0] == "system": return self._get_nested(self._system, parts[1:]) elif parts[0] == "variables": return self._get_nested(self.variables, parts[1:]) else: return self.variables.get(key) def set(self, key: str, value: Any): """Set a session variable""" self.variables[key] = value def _get_nested(self, obj: dict, keys: list) -> Any: for key in keys: if isinstance(obj, dict): obj = obj.get(key) else: return None return obj def interpolate(self, text: str) -> str: """Replace {{variable}} with actual values""" pattern = r'\{\{([^}]+)\}\}' def replace(match): key = match.group(1).strip() value = self.get(key) return str(value) if value is not None else "" return re.sub(pattern, replace, text) def to_dict(self) -> dict: return self.variables.copy() ``` **Step 2: Create engine.py** ```python from typing import Optional, Tuple import httpx from app.config import get_settings from app.context import FlowContext settings = get_settings() class FlowEngine: """Executes flow nodes based on incoming messages""" def __init__(self, db_session): self.db = db_session async def process_message( self, conversation_id: str, contact: dict, conversation: dict, message: dict, ) -> bool: """ Process incoming message through flow engine. Returns True if handled by a flow, False otherwise. """ from app.models import Flow, FlowSession, TriggerType # Check for active flow session session = self.db.query(FlowSession).filter( FlowSession.conversation_id == conversation_id ).first() if session and session.waiting_for_input: # Continue existing flow flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first() if flow: context = FlowContext(contact, conversation, message, session.variables) await self._execute_from_node(flow, session, context) return True # Find matching flow by trigger flow = self._find_matching_flow(message.get("content", "")) if flow: context = FlowContext(contact, conversation, message) session = FlowSession( conversation_id=conversation_id, flow_id=flow.id, variables={}, ) self.db.add(session) self.db.commit() await self._execute_flow(flow, session, context) return True return False def _find_matching_flow(self, message_text: str) -> Optional["Flow"]: from app.models import Flow, TriggerType message_lower = message_text.lower().strip() # Check keyword triggers keyword_flows = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.KEYWORD, Flow.is_active == True ).all() for flow in keyword_flows: keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")] if any(kw in message_lower for kw in keywords if kw): return flow # Check welcome trigger (first message - simplified) welcome_flow = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.WELCOME, Flow.is_active == True ).first() if welcome_flow: return welcome_flow # Fallback return self.db.query(Flow).filter( Flow.trigger_type == TriggerType.FALLBACK, Flow.is_active == True ).first() async def _execute_flow(self, flow, session, context: FlowContext): """Start flow execution from first node""" nodes = flow.nodes or [] if not nodes: return # Find start node (trigger node) start_node = next( (n for n in nodes if n.get("data", {}).get("type") == "trigger"), nodes[0] if nodes else None ) if start_node: session.current_node_id = start_node["id"] await self._execute_from_node(flow, session, context) async def _execute_from_node(self, flow, session, context: FlowContext): """Execute flow starting from current node""" nodes = {n["id"]: n for n in (flow.nodes or [])} edges = flow.edges or [] current_id = session.current_node_id while current_id: node = nodes.get(current_id) if not node: break node_type = node.get("data", {}).get("type", "") config = node.get("data", {}).get("config", {}) # Execute node result = await self._execute_node(node_type, config, context, session) if result == "wait": # Node requires input, save state and exit session.waiting_for_input = True session.variables = context.to_dict() self.db.commit() return # Find next node next_edge = self._find_next_edge(edges, current_id, result) current_id = next_edge["target"] if next_edge else None session.current_node_id = current_id session.waiting_for_input = False # Flow completed session.variables = context.to_dict() self.db.commit() async def _execute_node( self, node_type: str, config: dict, context: FlowContext, session ) -> Optional[str]: """Execute a single node, return result for routing""" if node_type == "trigger": return "default" elif node_type == "message": text = context.interpolate(config.get("text", "")) await self._send_message(session.conversation_id, text) return "default" elif node_type == "buttons": text = context.interpolate(config.get("text", "")) buttons = config.get("buttons", []) # For now, send as text with options button_text = "\n".join([f"• {b['label']}" for b in buttons]) await self._send_message(session.conversation_id, f"{text}\n\n{button_text}") return "wait" elif node_type == "wait_input": variable = config.get("variable", "user_input") context.set(variable, context.message.get("content", "")) return "default" elif node_type == "condition": return self._evaluate_condition(config, context) elif node_type == "set_variable": var_name = config.get("variable", "") var_value = context.interpolate(config.get("value", "")) context.set(var_name, var_value) return "default" return "default" def _evaluate_condition(self, config: dict, context: FlowContext) -> str: """Evaluate condition and return branch name""" conditions = config.get("conditions", []) for cond in conditions: field = cond.get("field", "") operator = cond.get("operator", "equals") value = cond.get("value", "") branch = cond.get("branch", "default") actual = context.get(field) or context.message.get("content", "") if operator == "equals" and str(actual).lower() == str(value).lower(): return branch elif operator == "contains" and str(value).lower() in str(actual).lower(): return branch elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): return branch return "default" def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: """Find the next edge from source node""" for edge in edges: if edge.get("source") == source_id: if handle and edge.get("sourceHandle") != handle: continue return edge return None async def _send_message(self, conversation_id: str, text: str): """Send message via API Gateway""" async with httpx.AsyncClient() as client: try: await client.post( f"{settings.API_GATEWAY_URL}/api/internal/flow/send", json={ "conversation_id": conversation_id, "content": text, "type": "text" }, timeout=30 ) except Exception as e: print(f"Failed to send message: {e}") ``` **Step 3: Commit** ```bash git add services/flow-engine/app/ git commit -m "feat(flow-engine): add FlowContext and FlowEngine core" ``` --- ## Task 6: Flow Engine Main & API **Files:** - Create: `services/flow-engine/app/main.py` - Create: `services/flow-engine/app/models.py` **Step 1: Create models.py (shared models import)** ```python from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.orm import sessionmaker, declarative_base import enum import uuid from datetime import datetime from app.config import get_settings settings = get_settings() engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() class TriggerType(str, enum.Enum): WELCOME = "welcome" KEYWORD = "keyword" FALLBACK = "fallback" EVENT = "event" MANUAL = "manual" class Flow(Base): __tablename__ = "flows" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) description = Column(Text, nullable=True) trigger_type = Column(SQLEnum(TriggerType), nullable=False) trigger_value = Column(String(255), nullable=True) nodes = Column(JSONB, default=list) edges = Column(JSONB, default=list) variables = Column(JSONB, default=dict) is_active = Column(Boolean, default=False, nullable=False) version = Column(Integer, default=1, nullable=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) class FlowSession(Base): __tablename__ = "flow_sessions" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True) flow_id = Column(UUID(as_uuid=True), nullable=False) current_node_id = Column(String(100), nullable=True) variables = Column(JSONB, default=dict) waiting_for_input = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) def get_db(): db = SessionLocal() try: yield db finally: db.close() ``` **Step 2: Create main.py** ```python from fastapi import FastAPI, Depends from pydantic import BaseModel from typing import Optional from sqlalchemy.orm import Session from app.models import get_db from app.engine import FlowEngine from app.config import get_settings settings = get_settings() app = FastAPI( title="WhatsApp Centralizado - Flow Engine", version="1.0.0", ) class ProcessMessageRequest(BaseModel): conversation_id: str contact: dict conversation: dict message: dict class ProcessMessageResponse(BaseModel): handled: bool flow_id: Optional[str] = None @app.get("/health") def health_check(): return {"status": "ok", "service": "flow-engine"} @app.post("/process", response_model=ProcessMessageResponse) async def process_message( request: ProcessMessageRequest, db: Session = Depends(get_db), ): """Process an incoming message through the flow engine""" engine = FlowEngine(db) handled = await engine.process_message( conversation_id=request.conversation_id, contact=request.contact, conversation=request.conversation, message=request.message, ) return ProcessMessageResponse(handled=handled) ``` **Step 3: Commit** ```bash git add services/flow-engine/app/ git commit -m "feat(flow-engine): add main API and database models" ``` --- ## Task 7: Update Docker Compose **Files:** - Modify: `docker-compose.yml` **Step 1: Add flow-engine service** Add after api-gateway service: ```yaml flow-engine: build: context: ./services/flow-engine dockerfile: Dockerfile container_name: wac_flow_engine restart: unless-stopped environment: DATABASE_URL: postgresql://${DB_USER:-whatsapp_admin}:${DB_PASSWORD}@postgres:5432/${DB_NAME:-whatsapp_central} REDIS_URL: redis://redis:6379 API_GATEWAY_URL: http://api-gateway:8000 WHATSAPP_CORE_URL: http://whatsapp-core:3001 depends_on: postgres: condition: service_healthy redis: condition: service_healthy networks: - wac_network ``` **Step 2: Commit** ```bash git add docker-compose.yml git commit -m "feat(docker): add flow-engine service" ``` --- ## Task 8: API Gateway - Internal Flow Routes **Files:** - Modify: `services/api-gateway/app/routers/whatsapp.py` **Step 1: Add internal endpoint for flow messages** Add at the end of whatsapp.py: ```python class FlowSendRequest(BaseModel): conversation_id: str content: str type: str = "text" @router.post("/internal/flow/send") async def flow_send_message( request: FlowSendRequest, db: Session = Depends(get_db), ): """Internal endpoint for flow engine to send messages""" conversation = db.query(Conversation).filter( Conversation.id == request.conversation_id ).first() if not conversation: raise HTTPException(status_code=404, detail="Conversation not found") # Create message in DB message = Message( conversation_id=conversation.id, direction=MessageDirection.OUTBOUND, type=MessageType.TEXT, content=request.content, status=MessageStatus.PENDING, ) db.add(message) db.commit() db.refresh(message) # Send via WhatsApp Core async with httpx.AsyncClient() as client: try: response = await client.post( f"{settings.WHATSAPP_CORE_URL}/api/sessions/{conversation.whatsapp_account_id}/messages", json={ "to": conversation.contact.phone_number, "type": "text", "content": {"text": request.content}, }, timeout=30, ) if response.status_code == 200: data = response.json() message.whatsapp_message_id = data.get("messageId") message.status = MessageStatus.SENT else: message.status = MessageStatus.FAILED except Exception: message.status = MessageStatus.FAILED db.commit() return {"success": True, "message_id": str(message.id)} ``` **Step 2: Add Pydantic import for FlowSendRequest** Add to imports at top of whatsapp.py: ```python from pydantic import BaseModel ``` **Step 3: Commit** ```bash git add services/api-gateway/app/routers/whatsapp.py git commit -m "feat(api-gateway): add internal flow send message endpoint" ``` --- ## Task 9: Integrate Flow Engine with Message Handler **Files:** - Modify: `services/api-gateway/app/routers/whatsapp.py` **Step 1: Update handle_whatsapp_event to call flow engine** In the `elif event.type == "message":` block, after creating the message, add flow engine call: ```python # After creating message and before db.commit() # Call flow engine try: async with httpx.AsyncClient() as client: await client.post( "http://flow-engine:8001/process", json={ "conversation_id": str(conversation.id), "contact": { "id": str(contact.id), "phone_number": contact.phone_number, "name": contact.name, }, "conversation": { "id": str(conversation.id), "status": conversation.status.value, }, "message": { "id": str(message.id), "content": content, "type": "text", }, }, timeout=30, ) except Exception as e: print(f"Flow engine error: {e}") ``` **Step 2: Commit** ```bash git add services/api-gateway/app/routers/whatsapp.py git commit -m "feat(api-gateway): integrate flow engine on message events" ``` --- ## Task 10: Frontend - Flow Builder Page Setup **Files:** - Modify: `frontend/package.json` - Create: `frontend/src/pages/FlowBuilder.tsx` - Modify: `frontend/src/layouts/MainLayout.tsx` **Step 1: Add reactflow dependency to package.json** Add to dependencies: ```json "reactflow": "^11.11.4" ``` **Step 2: Create FlowBuilder.tsx** ```tsx import { useState, useCallback } from 'react'; import { useParams, useNavigate } from 'react-router-dom'; import ReactFlow, { Node, Edge, Controls, Background, useNodesState, useEdgesState, addEdge, Connection, NodeTypes, } from 'reactflow'; import 'reactflow/dist/style.css'; import { Button, message, Space } from 'antd'; import { SaveOutlined, ArrowLeftOutlined } from '@ant-design/icons'; import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; import { apiClient } from '../api/client'; // Custom node components (simplified for now) const TriggerNode = ({ data }: { data: any }) => (