feat(fase2): add Flow schemas and Flow Engine core
API Gateway: - Add Pydantic schemas for Flow API (create, update, response, nodes, edges) Flow Engine: - Add FlowContext for variable management and interpolation - Add FlowEngine for executing chatbot flows - Support node types: trigger, message, buttons, wait_input, condition, set_variable Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
67
services/flow-engine/app/context.py
Normal file
67
services/flow-engine/app/context.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from typing import Any, Optional
|
||||
from datetime import datetime
|
||||
import re
|
||||
|
||||
|
||||
class FlowContext:
|
||||
"""Manages variables and state during flow execution"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
contact: dict,
|
||||
conversation: dict,
|
||||
message: dict,
|
||||
session_vars: dict = None
|
||||
):
|
||||
self.contact = contact
|
||||
self.conversation = conversation
|
||||
self.message = message
|
||||
self.variables = session_vars or {}
|
||||
self._system = {
|
||||
"date": datetime.now().strftime("%Y-%m-%d"),
|
||||
"time": datetime.now().strftime("%H:%M"),
|
||||
"day_of_week": datetime.now().strftime("%A"),
|
||||
}
|
||||
|
||||
def get(self, key: str) -> Any:
|
||||
"""Get variable by dot notation: contact.name, variables.email"""
|
||||
parts = key.split(".")
|
||||
|
||||
if parts[0] == "contact":
|
||||
return self._get_nested(self.contact, parts[1:])
|
||||
elif parts[0] == "conversation":
|
||||
return self._get_nested(self.conversation, parts[1:])
|
||||
elif parts[0] == "message":
|
||||
return self._get_nested(self.message, parts[1:])
|
||||
elif parts[0] == "system":
|
||||
return self._get_nested(self._system, parts[1:])
|
||||
elif parts[0] == "variables":
|
||||
return self._get_nested(self.variables, parts[1:])
|
||||
else:
|
||||
return self.variables.get(key)
|
||||
|
||||
def set(self, key: str, value: Any):
|
||||
"""Set a session variable"""
|
||||
self.variables[key] = value
|
||||
|
||||
def _get_nested(self, obj: dict, keys: list) -> Any:
|
||||
for key in keys:
|
||||
if isinstance(obj, dict):
|
||||
obj = obj.get(key)
|
||||
else:
|
||||
return None
|
||||
return obj
|
||||
|
||||
def interpolate(self, text: str) -> str:
|
||||
"""Replace {{variable}} with actual values"""
|
||||
pattern = r'\{\{([^}]+)\}\}'
|
||||
|
||||
def replace(match):
|
||||
key = match.group(1).strip()
|
||||
value = self.get(key)
|
||||
return str(value) if value is not None else ""
|
||||
|
||||
return re.sub(pattern, replace, text)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return self.variables.copy()
|
||||
Reference in New Issue
Block a user