235 lines
8.4 KiB
Python
235 lines
8.4 KiB
Python
from typing import Optional
|
|
import httpx
|
|
from app.config import get_settings
|
|
from app.context import FlowContext
|
|
from app.nodes import NodeRegistry
|
|
from app.nodes.basic import (
|
|
TriggerExecutor, MessageExecutor, ButtonsExecutor,
|
|
WaitInputExecutor, SetVariableExecutor, ConditionExecutor
|
|
)
|
|
from app.nodes.advanced import (
|
|
SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor
|
|
)
|
|
from app.nodes.validation import (
|
|
ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor,
|
|
ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor
|
|
)
|
|
from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor
|
|
from app.nodes.ai import AIResponseExecutor, AISentimentExecutor
|
|
from app.nodes.odoo import (
|
|
OdooSearchPartnerExecutor,
|
|
OdooCreatePartnerExecutor,
|
|
OdooGetBalanceExecutor,
|
|
OdooSearchOrdersExecutor,
|
|
OdooGetOrderExecutor,
|
|
OdooSearchProductsExecutor,
|
|
OdooCheckStockExecutor,
|
|
OdooCreateLeadExecutor,
|
|
)
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
async def send_message(conversation_id: str, text: str):
|
|
"""Module-level send message function for use by node executors"""
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
await client.post(
|
|
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
|
|
json={
|
|
"conversation_id": str(conversation_id),
|
|
"content": text,
|
|
"type": "text"
|
|
},
|
|
timeout=30
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to send message: {e}")
|
|
|
|
|
|
def _register_executors():
|
|
"""Register all node executors with the NodeRegistry"""
|
|
NodeRegistry.register("trigger", TriggerExecutor())
|
|
NodeRegistry.register("message", MessageExecutor(send_message))
|
|
NodeRegistry.register("buttons", ButtonsExecutor(send_message))
|
|
NodeRegistry.register("wait_input", WaitInputExecutor())
|
|
NodeRegistry.register("set_variable", SetVariableExecutor())
|
|
NodeRegistry.register("condition", ConditionExecutor())
|
|
NodeRegistry.register("switch", SwitchExecutor())
|
|
NodeRegistry.register("delay", DelayExecutor())
|
|
NodeRegistry.register("random", RandomExecutor())
|
|
NodeRegistry.register("loop", LoopExecutor())
|
|
NodeRegistry.register("goto", GoToExecutor())
|
|
NodeRegistry.register("validate_email", ValidateEmailExecutor())
|
|
NodeRegistry.register("validate_phone", ValidatePhoneExecutor())
|
|
NodeRegistry.register("validate_number", ValidateNumberExecutor())
|
|
NodeRegistry.register("validate_date", ValidateDateExecutor())
|
|
NodeRegistry.register("validate_regex", ValidateRegexExecutor())
|
|
NodeRegistry.register("validate_options", ValidateOptionsExecutor())
|
|
NodeRegistry.register("javascript", JavaScriptExecutor())
|
|
NodeRegistry.register("http_request", HttpRequestExecutor())
|
|
NodeRegistry.register("ai_response", AIResponseExecutor())
|
|
NodeRegistry.register("ai_sentiment", AISentimentExecutor())
|
|
NodeRegistry.register("odoo_search_partner", OdooSearchPartnerExecutor())
|
|
NodeRegistry.register("odoo_create_partner", OdooCreatePartnerExecutor())
|
|
NodeRegistry.register("odoo_get_balance", OdooGetBalanceExecutor())
|
|
NodeRegistry.register("odoo_search_orders", OdooSearchOrdersExecutor())
|
|
NodeRegistry.register("odoo_get_order", OdooGetOrderExecutor())
|
|
NodeRegistry.register("odoo_search_products", OdooSearchProductsExecutor())
|
|
NodeRegistry.register("odoo_check_stock", OdooCheckStockExecutor())
|
|
NodeRegistry.register("odoo_create_lead", OdooCreateLeadExecutor())
|
|
|
|
|
|
_register_executors()
|
|
|
|
|
|
class FlowEngine:
|
|
"""Executes flow nodes based on incoming messages"""
|
|
|
|
def __init__(self, db_session):
|
|
self.db = db_session
|
|
|
|
async def process_message(
|
|
self,
|
|
conversation_id: str,
|
|
contact: dict,
|
|
conversation: dict,
|
|
message: dict,
|
|
) -> bool:
|
|
"""
|
|
Process incoming message through flow engine.
|
|
Returns True if handled by a flow, False otherwise.
|
|
"""
|
|
from app.models import Flow, FlowSession, TriggerType
|
|
|
|
# Check for active flow session
|
|
session = self.db.query(FlowSession).filter(
|
|
FlowSession.conversation_id == conversation_id
|
|
).first()
|
|
|
|
if session and session.waiting_for_input:
|
|
# Continue existing flow
|
|
flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
|
|
if flow:
|
|
context = FlowContext(contact, conversation, message, session.variables)
|
|
await self._execute_from_node(flow, session, context)
|
|
return True
|
|
|
|
# Find matching flow by trigger
|
|
flow = self._find_matching_flow(message.get("content", ""))
|
|
|
|
if flow:
|
|
context = FlowContext(contact, conversation, message)
|
|
session = FlowSession(
|
|
conversation_id=conversation_id,
|
|
flow_id=flow.id,
|
|
variables={},
|
|
)
|
|
self.db.add(session)
|
|
self.db.commit()
|
|
|
|
await self._execute_flow(flow, session, context)
|
|
return True
|
|
|
|
return False
|
|
|
|
def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
|
|
from app.models import Flow, TriggerType
|
|
|
|
message_lower = message_text.lower().strip()
|
|
|
|
# Check keyword triggers
|
|
keyword_flows = self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.KEYWORD,
|
|
Flow.is_active == True
|
|
).all()
|
|
|
|
for flow in keyword_flows:
|
|
keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
|
|
if any(kw in message_lower for kw in keywords if kw):
|
|
return flow
|
|
|
|
# Check welcome trigger
|
|
welcome_flow = self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.WELCOME,
|
|
Flow.is_active == True
|
|
).first()
|
|
|
|
if welcome_flow:
|
|
return welcome_flow
|
|
|
|
# Fallback
|
|
return self.db.query(Flow).filter(
|
|
Flow.trigger_type == TriggerType.FALLBACK,
|
|
Flow.is_active == True
|
|
).first()
|
|
|
|
async def _execute_flow(self, flow, session, context: FlowContext):
|
|
"""Start flow execution from first node"""
|
|
nodes = flow.nodes or []
|
|
if not nodes:
|
|
return
|
|
|
|
start_node = next(
|
|
(n for n in nodes if n.get("data", {}).get("type") == "trigger"),
|
|
nodes[0] if nodes else None
|
|
)
|
|
|
|
if start_node:
|
|
session.current_node_id = start_node["id"]
|
|
await self._execute_from_node(flow, session, context)
|
|
|
|
async def _execute_from_node(self, flow, session, context: FlowContext):
|
|
"""Execute flow starting from current node"""
|
|
nodes = {n["id"]: n for n in (flow.nodes or [])}
|
|
edges = flow.edges or []
|
|
|
|
current_id = session.current_node_id
|
|
|
|
while current_id:
|
|
node = nodes.get(current_id)
|
|
if not node:
|
|
break
|
|
|
|
node_type = node.get("data", {}).get("type", "")
|
|
config = node.get("data", {}).get("config", {})
|
|
|
|
result = await self._execute_node(node_type, config, context, session)
|
|
|
|
if result == "wait":
|
|
session.waiting_for_input = True
|
|
session.variables = context.to_dict()
|
|
self.db.commit()
|
|
return
|
|
|
|
next_edge = self._find_next_edge(edges, current_id, result)
|
|
current_id = next_edge["target"] if next_edge else None
|
|
session.current_node_id = current_id
|
|
session.waiting_for_input = False
|
|
|
|
session.variables = context.to_dict()
|
|
self.db.commit()
|
|
|
|
async def _execute_node(
|
|
self,
|
|
node_type: str,
|
|
config: dict,
|
|
context: FlowContext,
|
|
session
|
|
) -> Optional[str]:
|
|
"""Execute a single node using registered executor"""
|
|
executor = NodeRegistry.get(node_type)
|
|
if executor:
|
|
return await executor.execute(config, context, session)
|
|
print(f"Warning: Unknown node type '{node_type}'")
|
|
return "default"
|
|
|
|
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
|
|
"""Find the next edge from source node"""
|
|
for edge in edges:
|
|
if edge.get("source") == source_id:
|
|
if handle and edge.get("sourceHandle") != handle:
|
|
continue
|
|
return edge
|
|
return None
|