Files
Claude AI d1d1aa58e1 feat(flow-engine): add Odoo node executors
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 22:28:26 +00:00

235 lines
8.4 KiB
Python

from typing import Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
from app.nodes import NodeRegistry
from app.nodes.basic import (
TriggerExecutor, MessageExecutor, ButtonsExecutor,
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
)
from app.nodes.advanced import (
SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
)
from app.nodes.validation import (
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
)
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
from app.nodes.odoo import (
OdooSearchPartnerExecutor,
OdooCreatePartnerExecutor,
OdooGetBalanceExecutor,
OdooSearchOrdersExecutor,
OdooGetOrderExecutor,
OdooSearchProductsExecutor,
OdooCheckStockExecutor,
OdooCreateLeadExecutor,
)
settings = get_settings()
async def send_message(conversation_id: str, text: str):
"""Module-level send message function for use by node executors"""
async with httpx.AsyncClient() as client:
try:
await client.post(
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
json={
"conversation_id": str(conversation_id),
"content": text,
"type": "text"
},
timeout=30
)
except Exception as e:
print(f"Failed to send message: {e}")
def _register_executors():
"""Register all node executors with the NodeRegistry"""
NodeRegistry.register("trigger", TriggerExecutor())
NodeRegistry.register("message", MessageExecutor(send_message))
NodeRegistry.register("buttons", ButtonsExecutor(send_message))
NodeRegistry.register("wait_input", WaitInputExecutor())
NodeRegistry.register("set_variable", SetVariableExecutor())
NodeRegistry.register("condition", ConditionExecutor())
NodeRegistry.register("switch", SwitchExecutor())
NodeRegistry.register("delay", DelayExecutor())
NodeRegistry.register("random", RandomExecutor())
NodeRegistry.register("loop", LoopExecutor())
NodeRegistry.register("goto", GoToExecutor())
NodeRegistry.register("validate_email", ValidateEmailExecutor())
NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
NodeRegistry.register("validate_number", ValidateNumberExecutor())
NodeRegistry.register("validate_date", ValidateDateExecutor())
NodeRegistry.register("validate_regex", ValidateRegexExecutor())
NodeRegistry.register("validate_options", ValidateOptionsExecutor())
NodeRegistry.register("javascript", JavaScriptExecutor())
NodeRegistry.register("http_request", HttpRequestExecutor())
NodeRegistry.register("ai_response", AIResponseExecutor())
NodeRegistry.register("ai_sentiment", AISentimentExecutor())
NodeRegistry.register("odoo_search_partner", OdooSearchPartnerExecutor())
NodeRegistry.register("odoo_create_partner", OdooCreatePartnerExecutor())
NodeRegistry.register("odoo_get_balance", OdooGetBalanceExecutor())
NodeRegistry.register("odoo_search_orders", OdooSearchOrdersExecutor())
NodeRegistry.register("odoo_get_order", OdooGetOrderExecutor())
NodeRegistry.register("odoo_search_products", OdooSearchProductsExecutor())
NodeRegistry.register("odoo_check_stock", OdooCheckStockExecutor())
NodeRegistry.register("odoo_create_lead", OdooCreateLeadExecutor())
_register_executors()
class FlowEngine:
"""Executes flow nodes based on incoming messages"""
def __init__(self, db_session):
self.db = db_session
async def process_message(
self,
conversation_id: str,
contact: dict,
conversation: dict,
message: dict,
) -> bool:
"""
Process incoming message through flow engine.
Returns True if handled by a flow, False otherwise.
"""
from app.models import Flow, FlowSession, TriggerType
# Check for active flow session
session = self.db.query(FlowSession).filter(
FlowSession.conversation_id == conversation_id
).first()
if session and session.waiting_for_input:
# Continue existing flow
flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
if flow:
context = FlowContext(contact, conversation, message, session.variables)
await self._execute_from_node(flow, session, context)
return True
# Find matching flow by trigger
flow = self._find_matching_flow(message.get("content", ""))
if flow:
context = FlowContext(contact, conversation, message)
session = FlowSession(
conversation_id=conversation_id,
flow_id=flow.id,
variables={},
)
self.db.add(session)
self.db.commit()
await self._execute_flow(flow, session, context)
return True
return False
def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
from app.models import Flow, TriggerType
message_lower = message_text.lower().strip()
# Check keyword triggers
keyword_flows = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.KEYWORD,
Flow.is_active == True
).all()
for flow in keyword_flows:
keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
if any(kw in message_lower for kw in keywords if kw):
return flow
# Check welcome trigger
welcome_flow = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.WELCOME,
Flow.is_active == True
).first()
if welcome_flow:
return welcome_flow
# Fallback
return self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.FALLBACK,
Flow.is_active == True
).first()
async def _execute_flow(self, flow, session, context: FlowContext):
"""Start flow execution from first node"""
nodes = flow.nodes or []
if not nodes:
return
start_node = next(
(n for n in nodes if n.get("data", {}).get("type") == "trigger"),
nodes[0] if nodes else None
)
if start_node:
session.current_node_id = start_node["id"]
await self._execute_from_node(flow, session, context)
async def _execute_from_node(self, flow, session, context: FlowContext):
"""Execute flow starting from current node"""
nodes = {n["id"]: n for n in (flow.nodes or [])}
edges = flow.edges or []
current_id = session.current_node_id
while current_id:
node = nodes.get(current_id)
if not node:
break
node_type = node.get("data", {}).get("type", "")
config = node.get("data", {}).get("config", {})
result = await self._execute_node(node_type, config, context, session)
if result == "wait":
session.waiting_for_input = True
session.variables = context.to_dict()
self.db.commit()
return
next_edge = self._find_next_edge(edges, current_id, result)
current_id = next_edge["target"] if next_edge else None
session.current_node_id = current_id
session.waiting_for_input = False
session.variables = context.to_dict()
self.db.commit()
async def _execute_node(
self,
node_type: str,
config: dict,
context: FlowContext,
session
) -> Optional[str]:
"""Execute a single node using registered executor"""
executor = NodeRegistry.get(node_type)
if executor:
return await executor.execute(config, context, session)
print(f"Warning: Unknown node type '{node_type}'")
return "default"
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
"""Find the next edge from source node"""
for edge in edges:
if edge.get("source") == source_id:
if handle and edge.get("sourceHandle") != handle:
continue
return edge
return None