Files
Claude AI 8824f65a62 feat(fase2): add Flow schemas and Flow Engine core
API Gateway:
- Add Pydantic schemas for Flow API (create, update, response, nodes, edges)

Flow Engine:
- Add FlowContext for variable management and interpolation
- Add FlowEngine for executing chatbot flows
- Support node types: trigger, message, buttons, wait_input, condition, set_variable

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 10:19:11 +00:00

68 lines
2.1 KiB
Python

from typing import Any, Optional
from datetime import datetime
import re
class FlowContext:
"""Manages variables and state during flow execution"""
def __init__(
self,
contact: dict,
conversation: dict,
message: dict,
session_vars: dict = None
):
self.contact = contact
self.conversation = conversation
self.message = message
self.variables = session_vars or {}
self._system = {
"date": datetime.now().strftime("%Y-%m-%d"),
"time": datetime.now().strftime("%H:%M"),
"day_of_week": datetime.now().strftime("%A"),
}
def get(self, key: str) -> Any:
"""Get variable by dot notation: contact.name, variables.email"""
parts = key.split(".")
if parts[0] == "contact":
return self._get_nested(self.contact, parts[1:])
elif parts[0] == "conversation":
return self._get_nested(self.conversation, parts[1:])
elif parts[0] == "message":
return self._get_nested(self.message, parts[1:])
elif parts[0] == "system":
return self._get_nested(self._system, parts[1:])
elif parts[0] == "variables":
return self._get_nested(self.variables, parts[1:])
else:
return self.variables.get(key)
def set(self, key: str, value: Any):
"""Set a session variable"""
self.variables[key] = value
def _get_nested(self, obj: dict, keys: list) -> Any:
for key in keys:
if isinstance(obj, dict):
obj = obj.get(key)
else:
return None
return obj
def interpolate(self, text: str) -> str:
"""Replace {{variable}} with actual values"""
pattern = r'\{\{([^}]+)\}\}'
def replace(match):
key = match.group(1).strip()
value = self.get(key)
return str(value) if value is not None else ""
return re.sub(pattern, replace, text)
def to_dict(self) -> dict:
return self.variables.copy()