diff --git a/services/flow-engine/app/nodes/__init__.py b/services/flow-engine/app/nodes/__init__.py index 8d0a748..d1949e2 100644 --- a/services/flow-engine/app/nodes/__init__.py +++ b/services/flow-engine/app/nodes/__init__.py @@ -16,3 +16,11 @@ from app.nodes.basic import ( WaitInputExecutor, ) from app.nodes.script import HttpRequestExecutor, JavaScriptExecutor +from app.nodes.validation import ( + ValidateDateExecutor, + ValidateEmailExecutor, + ValidateNumberExecutor, + ValidateOptionsExecutor, + ValidatePhoneExecutor, + ValidateRegexExecutor, +) diff --git a/services/flow-engine/app/nodes/advanced.py b/services/flow-engine/app/nodes/advanced.py new file mode 100644 index 0000000..7f846e4 --- /dev/null +++ b/services/flow-engine/app/nodes/advanced.py @@ -0,0 +1,110 @@ +import asyncio +import random as random_module +from typing import Any, Optional + +from app.context import FlowContext +from app.nodes.base import NodeExecutor + + +class SwitchExecutor(NodeExecutor): + """Multi-branch switch based on variable value""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + variable = config.get("variable", "") + cases = config.get("cases", []) + + actual = context.get(variable) or context.message.get("content", "") + actual_str = str(actual).lower().strip() + + for case in cases: + case_value = str(case.get("value", "")).lower().strip() + if actual_str == case_value: + return case.get("branch", "default") + + return config.get("default_branch", "default") + + +class DelayExecutor(NodeExecutor): + """Wait for a specified duration before continuing""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + delay_seconds = config.get("seconds", 0) + delay_type = config.get("type", "fixed") + + if delay_type == "random": + min_delay = config.get("min_seconds", 1) + max_delay = config.get("max_seconds", 5) + delay_seconds = random_module.uniform(min_delay, max_delay) + + if delay_seconds > 0: + await asyncio.sleep(min(delay_seconds, 30)) + + return "default" + + +class RandomExecutor(NodeExecutor): + """Random branch selection for A/B testing""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + branches = config.get("branches", []) + + if not branches: + return "default" + + total_weight = sum(b.get("weight", 1) for b in branches) + rand_value = random_module.uniform(0, total_weight) + + cumulative = 0 + for branch in branches: + cumulative += branch.get("weight", 1) + if rand_value <= cumulative: + test_name = config.get("test_name", "ab_test") + context.set(f"_ab_{test_name}", branch.get("branch", "default")) + return branch.get("branch", "default") + + return branches[-1].get("branch", "default") if branches else "default" + + +class LoopExecutor(NodeExecutor): + """Loop a certain number of times""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + loop_var = config.get("counter_variable", "_loop_counter") + max_iterations = config.get("max_iterations", 10) + + current = int(context.get(loop_var) or 0) + + if current < max_iterations: + context.set(loop_var, current + 1) + return "continue" + else: + context.set(loop_var, 0) + return "done" + + +class GoToExecutor(NodeExecutor): + """Jump to another node or flow""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + target_node_id = config.get("target_node_id") + target_flow_id = config.get("target_flow_id") + + if target_flow_id: + context.set("_goto_flow_id", target_flow_id) + return "sub_flow" + + if target_node_id: + context.set("_goto_node_id", target_node_id) + return "goto" + + return "default"