diff --git a/services/flow-engine/app/nodes/__init__.py b/services/flow-engine/app/nodes/__init__.py new file mode 100644 index 0000000..b9ab663 --- /dev/null +++ b/services/flow-engine/app/nodes/__init__.py @@ -0,0 +1,5 @@ +from app.nodes.base import NodeExecutor, NodeRegistry +from app.nodes.basic import ( + TriggerExecutor, MessageExecutor, ButtonsExecutor, + WaitInputExecutor, SetVariableExecutor, ConditionExecutor +) diff --git a/services/flow-engine/app/nodes/base.py b/services/flow-engine/app/nodes/base.py new file mode 100644 index 0000000..7310f4e --- /dev/null +++ b/services/flow-engine/app/nodes/base.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod +from typing import Optional, Any +from app.context import FlowContext + + +class NodeExecutor(ABC): + """Base class for all node executors""" + + @abstractmethod + async def execute( + self, + config: dict, + context: FlowContext, + session: Any + ) -> Optional[str]: + """ + Execute the node logic. + Returns: branch name for routing (e.g., "default", "true", "false") + or "wait" to pause execution + """ + pass + + +class NodeRegistry: + """Registry of all available node executors""" + _executors: dict = {} + + @classmethod + def register(cls, node_type: str, executor: NodeExecutor): + cls._executors[node_type] = executor + + @classmethod + def get(cls, node_type: str) -> Optional[NodeExecutor]: + return cls._executors.get(node_type) diff --git a/services/flow-engine/app/nodes/basic.py b/services/flow-engine/app/nodes/basic.py new file mode 100644 index 0000000..e3c3305 --- /dev/null +++ b/services/flow-engine/app/nodes/basic.py @@ -0,0 +1,81 @@ +from typing import Optional, Any +from app.nodes.base import NodeExecutor +from app.context import FlowContext + + +class TriggerExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + return "default" + + +class MessageExecutor(NodeExecutor): + def __init__(self, send_message_fn): + self.send_message = send_message_fn + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + text = context.interpolate(config.get("text", "")) + await self.send_message(session.conversation_id, text) + return "default" + + +class ButtonsExecutor(NodeExecutor): + def __init__(self, send_message_fn): + self.send_message = send_message_fn + + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + text = context.interpolate(config.get("text", "")) + buttons = config.get("buttons", []) + button_text = "\n".join([f"• {b['label']}" for b in buttons]) + await self.send_message(session.conversation_id, f"{text}\n\n{button_text}") + return "wait" + + +class WaitInputExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + variable = config.get("variable", "user_input") + context.set(variable, context.message.get("content", "")) + return "default" + + +class SetVariableExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + var_name = config.get("variable", "") + var_value = context.interpolate(config.get("value", "")) + context.set(var_name, var_value) + return "default" + + +class ConditionExecutor(NodeExecutor): + async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]: + conditions = config.get("conditions", []) + + for cond in conditions: + field = cond.get("field", "") + operator = cond.get("operator", "equals") + value = cond.get("value", "") + branch = cond.get("branch", "default") + + actual = context.get(field) or context.message.get("content", "") + + if operator == "equals" and str(actual).lower() == str(value).lower(): + return branch + elif operator == "contains" and str(value).lower() in str(actual).lower(): + return branch + elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): + return branch + elif operator == "not_equals" and str(actual).lower() != str(value).lower(): + return branch + elif operator == "greater_than": + try: + if float(actual) > float(value): + return branch + except ValueError: + pass + elif operator == "less_than": + try: + if float(actual) < float(value): + return branch + except ValueError: + pass + + return "default"