from typing import Optional import httpx from app.config import get_settings from app.context import FlowContext from app.nodes import NodeRegistry from app.nodes.basic import ( TriggerExecutor, MessageExecutor, ButtonsExecutor, WaitInputExecutor, SetVariableExecutor, ConditionExecutor ) from app.nodes.advanced import ( SwitchExecutor, DelayExecutor, RandomExecutor, LoopExecutor, GoToExecutor ) from app.nodes.validation import ( ValidateEmailExecutor, ValidatePhoneExecutor, ValidateNumberExecutor, ValidateDateExecutor, ValidateRegexExecutor, ValidateOptionsExecutor ) from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor from app.nodes.ai import AIResponseExecutor, AISentimentExecutor from app.nodes.odoo import ( OdooSearchPartnerExecutor, OdooCreatePartnerExecutor, OdooGetBalanceExecutor, OdooSearchOrdersExecutor, OdooGetOrderExecutor, OdooSearchProductsExecutor, OdooCheckStockExecutor, OdooCreateLeadExecutor, ) settings = get_settings() async def send_message(conversation_id: str, text: str): """Module-level send message function for use by node executors""" async with httpx.AsyncClient() as client: try: await client.post( f"{settings.API_GATEWAY_URL}/api/internal/flow/send", json={ "conversation_id": str(conversation_id), "content": text, "type": "text" }, timeout=30 ) except Exception as e: print(f"Failed to send message: {e}") def _register_executors(): """Register all node executors with the NodeRegistry""" NodeRegistry.register("trigger", TriggerExecutor()) NodeRegistry.register("message", MessageExecutor(send_message)) NodeRegistry.register("buttons", ButtonsExecutor(send_message)) NodeRegistry.register("wait_input", WaitInputExecutor()) NodeRegistry.register("set_variable", SetVariableExecutor()) NodeRegistry.register("condition", ConditionExecutor()) NodeRegistry.register("switch", SwitchExecutor()) NodeRegistry.register("delay", DelayExecutor()) NodeRegistry.register("random", RandomExecutor()) NodeRegistry.register("loop", LoopExecutor()) NodeRegistry.register("goto", GoToExecutor()) NodeRegistry.register("validate_email", ValidateEmailExecutor()) NodeRegistry.register("validate_phone", ValidatePhoneExecutor()) NodeRegistry.register("validate_number", ValidateNumberExecutor()) NodeRegistry.register("validate_date", ValidateDateExecutor()) NodeRegistry.register("validate_regex", ValidateRegexExecutor()) NodeRegistry.register("validate_options", ValidateOptionsExecutor()) NodeRegistry.register("javascript", JavaScriptExecutor()) NodeRegistry.register("http_request", HttpRequestExecutor()) NodeRegistry.register("ai_response", AIResponseExecutor()) NodeRegistry.register("ai_sentiment", AISentimentExecutor()) NodeRegistry.register("odoo_search_partner", OdooSearchPartnerExecutor()) NodeRegistry.register("odoo_create_partner", OdooCreatePartnerExecutor()) NodeRegistry.register("odoo_get_balance", OdooGetBalanceExecutor()) NodeRegistry.register("odoo_search_orders", OdooSearchOrdersExecutor()) NodeRegistry.register("odoo_get_order", OdooGetOrderExecutor()) NodeRegistry.register("odoo_search_products", OdooSearchProductsExecutor()) NodeRegistry.register("odoo_check_stock", OdooCheckStockExecutor()) NodeRegistry.register("odoo_create_lead", OdooCreateLeadExecutor()) _register_executors() class FlowEngine: """Executes flow nodes based on incoming messages""" def __init__(self, db_session): self.db = db_session async def process_message( self, conversation_id: str, contact: dict, conversation: dict, message: dict, ) -> bool: """ Process incoming message through flow engine. Returns True if handled by a flow, False otherwise. """ from app.models import Flow, FlowSession, TriggerType # Check for active flow session session = self.db.query(FlowSession).filter( FlowSession.conversation_id == conversation_id ).first() if session and session.waiting_for_input: # Continue existing flow flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first() if flow: context = FlowContext(contact, conversation, message, session.variables) await self._execute_from_node(flow, session, context) return True # Find matching flow by trigger flow = self._find_matching_flow(message.get("content", "")) if flow: context = FlowContext(contact, conversation, message) session = FlowSession( conversation_id=conversation_id, flow_id=flow.id, variables={}, ) self.db.add(session) self.db.commit() await self._execute_flow(flow, session, context) return True return False def _find_matching_flow(self, message_text: str) -> Optional["Flow"]: from app.models import Flow, TriggerType message_lower = message_text.lower().strip() # Check keyword triggers keyword_flows = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.KEYWORD, Flow.is_active == True ).all() for flow in keyword_flows: keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")] if any(kw in message_lower for kw in keywords if kw): return flow # Check welcome trigger welcome_flow = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.WELCOME, Flow.is_active == True ).first() if welcome_flow: return welcome_flow # Fallback return self.db.query(Flow).filter( Flow.trigger_type == TriggerType.FALLBACK, Flow.is_active == True ).first() async def _execute_flow(self, flow, session, context: FlowContext): """Start flow execution from first node""" nodes = flow.nodes or [] if not nodes: return start_node = next( (n for n in nodes if n.get("data", {}).get("type") == "trigger"), nodes[0] if nodes else None ) if start_node: session.current_node_id = start_node["id"] await self._execute_from_node(flow, session, context) async def _execute_from_node(self, flow, session, context: FlowContext): """Execute flow starting from current node""" nodes = {n["id"]: n for n in (flow.nodes or [])} edges = flow.edges or [] current_id = session.current_node_id while current_id: node = nodes.get(current_id) if not node: break node_type = node.get("data", {}).get("type", "") config = node.get("data", {}).get("config", {}) result = await self._execute_node(node_type, config, context, session) if result == "wait": session.waiting_for_input = True session.variables = context.to_dict() self.db.commit() return next_edge = self._find_next_edge(edges, current_id, result) current_id = next_edge["target"] if next_edge else None session.current_node_id = current_id session.waiting_for_input = False session.variables = context.to_dict() self.db.commit() async def _execute_node( self, node_type: str, config: dict, context: FlowContext, session ) -> Optional[str]: """Execute a single node using registered executor""" executor = NodeRegistry.get(node_type) if executor: return await executor.execute(config, context, session) print(f"Warning: Unknown node type '{node_type}'") return "default" def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: """Find the next edge from source node""" for edge in edges: if edge.get("source") == source_id: if handle and edge.get("sourceHandle") != handle: continue return edge return None