feat(fase4): add NodeExecutor architecture with basic nodes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude AI
2026-01-29 11:10:21 +00:00
parent 64b186314f
commit 835c4aa387
3 changed files with 120 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
from app.nodes.base import NodeExecutor, NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)

View File

@@ -0,0 +1,34 @@
from abc import ABC, abstractmethod
from typing import Optional, Any
from app.context import FlowContext
class NodeExecutor(ABC):
"""Base class for all node executors"""
@abstractmethod
async def execute(
self,
config: dict,
context: FlowContext,
session: Any
) -> Optional[str]:
"""
Execute the node logic.
Returns: branch name for routing (e.g., "default", "true", "false")
or "wait" to pause execution
"""
pass
class NodeRegistry:
"""Registry of all available node executors"""
_executors: dict = {}
@classmethod
def register(cls, node_type: str, executor: NodeExecutor):
cls._executors[node_type] = executor
@classmethod
def get(cls, node_type: str) -> Optional[NodeExecutor]:
return cls._executors.get(node_type)

View File

@@ -0,0 +1,81 @@
from typing import Optional, Any
from app.nodes.base import NodeExecutor
from app.context import FlowContext
class TriggerExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
return "default"
class MessageExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
await self.send_message(session.conversation_id, text)
return "default"
class ButtonsExecutor(NodeExecutor):
def __init__(self, send_message_fn):
self.send_message = send_message_fn
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
text = context.interpolate(config.get("text", ""))
buttons = config.get("buttons", [])
button_text = "\n".join([f"{b['label']}" for b in buttons])
await self.send_message(session.conversation_id, f"{text}\n\n{button_text}")
return "wait"
class WaitInputExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
variable = config.get("variable", "user_input")
context.set(variable, context.message.get("content", ""))
return "default"
class SetVariableExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
var_name = config.get("variable", "")
var_value = context.interpolate(config.get("value", ""))
context.set(var_name, var_value)
return "default"
class ConditionExecutor(NodeExecutor):
async def execute(self, config: dict, context: FlowContext, session: Any) -> Optional[str]:
conditions = config.get("conditions", [])
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "equals")
value = cond.get("value", "")
branch = cond.get("branch", "default")
actual = context.get(field) or context.message.get("content", "")
if operator == "equals" and str(actual).lower() == str(value).lower():
return branch
elif operator == "contains" and str(value).lower() in str(actual).lower():
return branch
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
return branch
elif operator == "not_equals" and str(actual).lower() != str(value).lower():
return branch
elif operator == "greater_than":
try:
if float(actual) > float(value):
return branch
except ValueError:
pass
elif operator == "less_than":
try:
if float(actual) < float(value):
return branch
except ValueError:
pass
return "default"