feat(fase4): add advanced nodes - switch, delay, random, loop, goto
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -16,3 +16,11 @@ from app.nodes.basic import (
|
|||||||
WaitInputExecutor,
|
WaitInputExecutor,
|
||||||
)
|
)
|
||||||
from app.nodes.script import HttpRequestExecutor, JavaScriptExecutor
|
from app.nodes.script import HttpRequestExecutor, JavaScriptExecutor
|
||||||
|
from app.nodes.validation import (
|
||||||
|
ValidateDateExecutor,
|
||||||
|
ValidateEmailExecutor,
|
||||||
|
ValidateNumberExecutor,
|
||||||
|
ValidateOptionsExecutor,
|
||||||
|
ValidatePhoneExecutor,
|
||||||
|
ValidateRegexExecutor,
|
||||||
|
)
|
||||||
|
|||||||
110
services/flow-engine/app/nodes/advanced.py
Normal file
110
services/flow-engine/app/nodes/advanced.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
import asyncio
|
||||||
|
import random as random_module
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from app.context import FlowContext
|
||||||
|
from app.nodes.base import NodeExecutor
|
||||||
|
|
||||||
|
|
||||||
|
class SwitchExecutor(NodeExecutor):
|
||||||
|
"""Multi-branch switch based on variable value"""
|
||||||
|
|
||||||
|
async def execute(
|
||||||
|
self, config: dict, context: FlowContext, session: Any
|
||||||
|
) -> Optional[str]:
|
||||||
|
variable = config.get("variable", "")
|
||||||
|
cases = config.get("cases", [])
|
||||||
|
|
||||||
|
actual = context.get(variable) or context.message.get("content", "")
|
||||||
|
actual_str = str(actual).lower().strip()
|
||||||
|
|
||||||
|
for case in cases:
|
||||||
|
case_value = str(case.get("value", "")).lower().strip()
|
||||||
|
if actual_str == case_value:
|
||||||
|
return case.get("branch", "default")
|
||||||
|
|
||||||
|
return config.get("default_branch", "default")
|
||||||
|
|
||||||
|
|
||||||
|
class DelayExecutor(NodeExecutor):
|
||||||
|
"""Wait for a specified duration before continuing"""
|
||||||
|
|
||||||
|
async def execute(
|
||||||
|
self, config: dict, context: FlowContext, session: Any
|
||||||
|
) -> Optional[str]:
|
||||||
|
delay_seconds = config.get("seconds", 0)
|
||||||
|
delay_type = config.get("type", "fixed")
|
||||||
|
|
||||||
|
if delay_type == "random":
|
||||||
|
min_delay = config.get("min_seconds", 1)
|
||||||
|
max_delay = config.get("max_seconds", 5)
|
||||||
|
delay_seconds = random_module.uniform(min_delay, max_delay)
|
||||||
|
|
||||||
|
if delay_seconds > 0:
|
||||||
|
await asyncio.sleep(min(delay_seconds, 30))
|
||||||
|
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
|
||||||
|
class RandomExecutor(NodeExecutor):
|
||||||
|
"""Random branch selection for A/B testing"""
|
||||||
|
|
||||||
|
async def execute(
|
||||||
|
self, config: dict, context: FlowContext, session: Any
|
||||||
|
) -> Optional[str]:
|
||||||
|
branches = config.get("branches", [])
|
||||||
|
|
||||||
|
if not branches:
|
||||||
|
return "default"
|
||||||
|
|
||||||
|
total_weight = sum(b.get("weight", 1) for b in branches)
|
||||||
|
rand_value = random_module.uniform(0, total_weight)
|
||||||
|
|
||||||
|
cumulative = 0
|
||||||
|
for branch in branches:
|
||||||
|
cumulative += branch.get("weight", 1)
|
||||||
|
if rand_value <= cumulative:
|
||||||
|
test_name = config.get("test_name", "ab_test")
|
||||||
|
context.set(f"_ab_{test_name}", branch.get("branch", "default"))
|
||||||
|
return branch.get("branch", "default")
|
||||||
|
|
||||||
|
return branches[-1].get("branch", "default") if branches else "default"
|
||||||
|
|
||||||
|
|
||||||
|
class LoopExecutor(NodeExecutor):
|
||||||
|
"""Loop a certain number of times"""
|
||||||
|
|
||||||
|
async def execute(
|
||||||
|
self, config: dict, context: FlowContext, session: Any
|
||||||
|
) -> Optional[str]:
|
||||||
|
loop_var = config.get("counter_variable", "_loop_counter")
|
||||||
|
max_iterations = config.get("max_iterations", 10)
|
||||||
|
|
||||||
|
current = int(context.get(loop_var) or 0)
|
||||||
|
|
||||||
|
if current < max_iterations:
|
||||||
|
context.set(loop_var, current + 1)
|
||||||
|
return "continue"
|
||||||
|
else:
|
||||||
|
context.set(loop_var, 0)
|
||||||
|
return "done"
|
||||||
|
|
||||||
|
|
||||||
|
class GoToExecutor(NodeExecutor):
|
||||||
|
"""Jump to another node or flow"""
|
||||||
|
|
||||||
|
async def execute(
|
||||||
|
self, config: dict, context: FlowContext, session: Any
|
||||||
|
) -> Optional[str]:
|
||||||
|
target_node_id = config.get("target_node_id")
|
||||||
|
target_flow_id = config.get("target_flow_id")
|
||||||
|
|
||||||
|
if target_flow_id:
|
||||||
|
context.set("_goto_flow_id", target_flow_id)
|
||||||
|
return "sub_flow"
|
||||||
|
|
||||||
|
if target_node_id:
|
||||||
|
context.set("_goto_node_id", target_node_id)
|
||||||
|
return "goto"
|
||||||
|
|
||||||
|
return "default"
|
||||||
Reference in New Issue
Block a user