diff --git a/services/flow-engine/app/engine.py b/services/flow-engine/app/engine.py index adb0791..dcc11b0 100644 --- a/services/flow-engine/app/engine.py +++ b/services/flow-engine/app/engine.py @@ -2,10 +2,69 @@ from typing import Optional import httpx from app.config import get_settings from app.context import FlowContext +from app.nodes import NodeRegistry +from app.nodes.basic import ( + TriggerExecutor, MessageExecutor, ButtonsExecutor, + WaitInputExecutor, SetVariableExecutor, ConditionExecutor +) +from app.nodes.advanced import ( + SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor +) +from app.nodes.validation import ( + ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, + ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor +) +from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor +from app.nodes.ai import AIResponseExecutor, AISentimentExecutor settings = get_settings() +async def send_message(conversation_id: str, text: str): + """Module-level send message function for use by node executors""" + async with httpx.AsyncClient() as client: + try: + await client.post( + f"{settings.API_GATEWAY_URL}/api/internal/flow/send", + json={ + "conversation_id": str(conversation_id), + "content": text, + "type": "text" + }, + timeout=30 + ) + except Exception as e: + print(f"Failed to send message: {e}") + + +def _register_executors(): + """Register all node executors with the NodeRegistry""" + NodeRegistry.register("trigger", TriggerExecutor()) + NodeRegistry.register("message", MessageExecutor(send_message)) + NodeRegistry.register("buttons", ButtonsExecutor(send_message)) + NodeRegistry.register("wait_input", WaitInputExecutor()) + NodeRegistry.register("set_variable", SetVariableExecutor()) + NodeRegistry.register("condition", ConditionExecutor()) + NodeRegistry.register("switch", SwitchExecutor()) + NodeRegistry.register("delay", DelayExecutor()) + NodeRegistry.register("random", RandomExecutor()) + NodeRegistry.register("loop", LoopExecutor()) + NodeRegistry.register("goto", GoToExecutor()) + NodeRegistry.register("validate_email", ValidateEmailExecutor()) + NodeRegistry.register("validate_phone", ValidatePhoneExecutor()) + NodeRegistry.register("validate_number", ValidateNumberExecutor()) + NodeRegistry.register("validate_date", ValidateDateExecutor()) + NodeRegistry.register("validate_regex", ValidateRegexExecutor()) + NodeRegistry.register("validate_options", ValidateOptionsExecutor()) + NodeRegistry.register("javascript", JavaScriptExecutor()) + NodeRegistry.register("http_request", HttpRequestExecutor()) + NodeRegistry.register("ai_response", AIResponseExecutor()) + NodeRegistry.register("ai_sentiment", AISentimentExecutor()) + + +_register_executors() + + class FlowEngine: """Executes flow nodes based on incoming messages""" @@ -140,58 +199,11 @@ class FlowEngine: context: FlowContext, session ) -> Optional[str]: - """Execute a single node, return result for routing""" - - if node_type == "trigger": - return "default" - - elif node_type == "message": - text = context.interpolate(config.get("text", "")) - await self._send_message(session.conversation_id, text) - return "default" - - elif node_type == "buttons": - text = context.interpolate(config.get("text", "")) - buttons = config.get("buttons", []) - button_text = "\n".join([f"• {b['label']}" for b in buttons]) - await self._send_message(session.conversation_id, f"{text}\n\n{button_text}") - return "wait" - - elif node_type == "wait_input": - variable = config.get("variable", "user_input") - context.set(variable, context.message.get("content", "")) - return "default" - - elif node_type == "condition": - return self._evaluate_condition(config, context) - - elif node_type == "set_variable": - var_name = config.get("variable", "") - var_value = context.interpolate(config.get("value", "")) - context.set(var_name, var_value) - return "default" - - return "default" - - def _evaluate_condition(self, config: dict, context: FlowContext) -> str: - """Evaluate condition and return branch name""" - conditions = config.get("conditions", []) - - for cond in conditions: - field = cond.get("field", "") - operator = cond.get("operator", "equals") - value = cond.get("value", "") - branch = cond.get("branch", "default") - - actual = context.get(field) or context.message.get("content", "") - - if operator == "equals" and str(actual).lower() == str(value).lower(): - return branch - elif operator == "contains" and str(value).lower() in str(actual).lower(): - return branch - elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): - return branch - + """Execute a single node using registered executor""" + executor = NodeRegistry.get(node_type) + if executor: + return await executor.execute(config, context, session) + print(f"Warning: Unknown node type '{node_type}'") return "default" def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: @@ -202,19 +214,3 @@ class FlowEngine: continue return edge return None - - async def _send_message(self, conversation_id: str, text: str): - """Send message via API Gateway""" - async with httpx.AsyncClient() as client: - try: - await client.post( - f"{settings.API_GATEWAY_URL}/api/internal/flow/send", - json={ - "conversation_id": str(conversation_id), - "content": text, - "type": "text" - }, - timeout=30 - ) - except Exception as e: - print(f"Failed to send message: {e}")