refactor(flow-engine): use NodeRegistry for modular node execution
Replace hardcoded if/elif chain in _execute_node with dynamic node executor lookup via NodeRegistry. All node executors are registered at module load time, enabling extensibility without modifying engine code. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -2,10 +2,69 @@ from typing import Optional
|
|||||||
import httpx
|
import httpx
|
||||||
from app.config import get_settings
|
from app.config import get_settings
|
||||||
from app.context import FlowContext
|
from app.context import FlowContext
|
||||||
|
from app.nodes import NodeRegistry
|
||||||
|
from app.nodes.basic import (
|
||||||
|
TriggerExecutor, MessageExecutor, ButtonsExecutor,
|
||||||
|
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
|
||||||
|
)
|
||||||
|
from app.nodes.advanced import (
|
||||||
|
SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
|
||||||
|
)
|
||||||
|
from app.nodes.validation import (
|
||||||
|
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
|
||||||
|
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
|
||||||
|
)
|
||||||
|
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
|
||||||
|
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
|
|
||||||
|
|
||||||
|
async def send_message(conversation_id: str, text: str):
|
||||||
|
"""Module-level send message function for use by node executors"""
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
try:
|
||||||
|
await client.post(
|
||||||
|
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
|
||||||
|
json={
|
||||||
|
"conversation_id": str(conversation_id),
|
||||||
|
"content": text,
|
||||||
|
"type": "text"
|
||||||
|
},
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to send message: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def _register_executors():
|
||||||
|
"""Register all node executors with the NodeRegistry"""
|
||||||
|
NodeRegistry.register("trigger", TriggerExecutor())
|
||||||
|
NodeRegistry.register("message", MessageExecutor(send_message))
|
||||||
|
NodeRegistry.register("buttons", ButtonsExecutor(send_message))
|
||||||
|
NodeRegistry.register("wait_input", WaitInputExecutor())
|
||||||
|
NodeRegistry.register("set_variable", SetVariableExecutor())
|
||||||
|
NodeRegistry.register("condition", ConditionExecutor())
|
||||||
|
NodeRegistry.register("switch", SwitchExecutor())
|
||||||
|
NodeRegistry.register("delay", DelayExecutor())
|
||||||
|
NodeRegistry.register("random", RandomExecutor())
|
||||||
|
NodeRegistry.register("loop", LoopExecutor())
|
||||||
|
NodeRegistry.register("goto", GoToExecutor())
|
||||||
|
NodeRegistry.register("validate_email", ValidateEmailExecutor())
|
||||||
|
NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
|
||||||
|
NodeRegistry.register("validate_number", ValidateNumberExecutor())
|
||||||
|
NodeRegistry.register("validate_date", ValidateDateExecutor())
|
||||||
|
NodeRegistry.register("validate_regex", ValidateRegexExecutor())
|
||||||
|
NodeRegistry.register("validate_options", ValidateOptionsExecutor())
|
||||||
|
NodeRegistry.register("javascript", JavaScriptExecutor())
|
||||||
|
NodeRegistry.register("http_request", HttpRequestExecutor())
|
||||||
|
NodeRegistry.register("ai_response", AIResponseExecutor())
|
||||||
|
NodeRegistry.register("ai_sentiment", AISentimentExecutor())
|
||||||
|
|
||||||
|
|
||||||
|
_register_executors()
|
||||||
|
|
||||||
|
|
||||||
class FlowEngine:
|
class FlowEngine:
|
||||||
"""Executes flow nodes based on incoming messages"""
|
"""Executes flow nodes based on incoming messages"""
|
||||||
|
|
||||||
@@ -140,58 +199,11 @@ class FlowEngine:
|
|||||||
context: FlowContext,
|
context: FlowContext,
|
||||||
session
|
session
|
||||||
) -> Optional[str]:
|
) -> Optional[str]:
|
||||||
"""Execute a single node, return result for routing"""
|
"""Execute a single node using registered executor"""
|
||||||
|
executor = NodeRegistry.get(node_type)
|
||||||
if node_type == "trigger":
|
if executor:
|
||||||
return "default"
|
return await executor.execute(config, context, session)
|
||||||
|
print(f"Warning: Unknown node type '{node_type}'")
|
||||||
elif node_type == "message":
|
|
||||||
text = context.interpolate(config.get("text", ""))
|
|
||||||
await self._send_message(session.conversation_id, text)
|
|
||||||
return "default"
|
|
||||||
|
|
||||||
elif node_type == "buttons":
|
|
||||||
text = context.interpolate(config.get("text", ""))
|
|
||||||
buttons = config.get("buttons", [])
|
|
||||||
button_text = "\n".join([f"• {b['label']}" for b in buttons])
|
|
||||||
await self._send_message(session.conversation_id, f"{text}\n\n{button_text}")
|
|
||||||
return "wait"
|
|
||||||
|
|
||||||
elif node_type == "wait_input":
|
|
||||||
variable = config.get("variable", "user_input")
|
|
||||||
context.set(variable, context.message.get("content", ""))
|
|
||||||
return "default"
|
|
||||||
|
|
||||||
elif node_type == "condition":
|
|
||||||
return self._evaluate_condition(config, context)
|
|
||||||
|
|
||||||
elif node_type == "set_variable":
|
|
||||||
var_name = config.get("variable", "")
|
|
||||||
var_value = context.interpolate(config.get("value", ""))
|
|
||||||
context.set(var_name, var_value)
|
|
||||||
return "default"
|
|
||||||
|
|
||||||
return "default"
|
|
||||||
|
|
||||||
def _evaluate_condition(self, config: dict, context: FlowContext) -> str:
|
|
||||||
"""Evaluate condition and return branch name"""
|
|
||||||
conditions = config.get("conditions", [])
|
|
||||||
|
|
||||||
for cond in conditions:
|
|
||||||
field = cond.get("field", "")
|
|
||||||
operator = cond.get("operator", "equals")
|
|
||||||
value = cond.get("value", "")
|
|
||||||
branch = cond.get("branch", "default")
|
|
||||||
|
|
||||||
actual = context.get(field) or context.message.get("content", "")
|
|
||||||
|
|
||||||
if operator == "equals" and str(actual).lower() == str(value).lower():
|
|
||||||
return branch
|
|
||||||
elif operator == "contains" and str(value).lower() in str(actual).lower():
|
|
||||||
return branch
|
|
||||||
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
|
|
||||||
return branch
|
|
||||||
|
|
||||||
return "default"
|
return "default"
|
||||||
|
|
||||||
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
|
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
|
||||||
@@ -202,19 +214,3 @@ class FlowEngine:
|
|||||||
continue
|
continue
|
||||||
return edge
|
return edge
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def _send_message(self, conversation_id: str, text: str):
|
|
||||||
"""Send message via API Gateway"""
|
|
||||||
async with httpx.AsyncClient() as client:
|
|
||||||
try:
|
|
||||||
await client.post(
|
|
||||||
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
|
|
||||||
json={
|
|
||||||
"conversation_id": str(conversation_id),
|
|
||||||
"content": text,
|
|
||||||
"type": "text"
|
|
||||||
},
|
|
||||||
timeout=30
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Failed to send message: {e}")
|
|
||||||
|
|||||||
Reference in New Issue
Block a user