from typing import Optional import httpx from app.config import get_settings from app.context import FlowContext settings = get_settings() class FlowEngine: """Executes flow nodes based on incoming messages""" def __init__(self, db_session): self.db = db_session async def process_message( self, conversation_id: str, contact: dict, conversation: dict, message: dict, ) -> bool: """ Process incoming message through flow engine. Returns True if handled by a flow, False otherwise. """ from app.models import Flow, FlowSession, TriggerType # Check for active flow session session = self.db.query(FlowSession).filter( FlowSession.conversation_id == conversation_id ).first() if session and session.waiting_for_input: # Continue existing flow flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first() if flow: context = FlowContext(contact, conversation, message, session.variables) await self._execute_from_node(flow, session, context) return True # Find matching flow by trigger flow = self._find_matching_flow(message.get("content", "")) if flow: context = FlowContext(contact, conversation, message) session = FlowSession( conversation_id=conversation_id, flow_id=flow.id, variables={}, ) self.db.add(session) self.db.commit() await self._execute_flow(flow, session, context) return True return False def _find_matching_flow(self, message_text: str) -> Optional["Flow"]: from app.models import Flow, TriggerType message_lower = message_text.lower().strip() # Check keyword triggers keyword_flows = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.KEYWORD, Flow.is_active == True ).all() for flow in keyword_flows: keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")] if any(kw in message_lower for kw in keywords if kw): return flow # Check welcome trigger welcome_flow = self.db.query(Flow).filter( Flow.trigger_type == TriggerType.WELCOME, Flow.is_active == True ).first() if welcome_flow: return welcome_flow # Fallback return self.db.query(Flow).filter( Flow.trigger_type == TriggerType.FALLBACK, Flow.is_active == True ).first() async def _execute_flow(self, flow, session, context: FlowContext): """Start flow execution from first node""" nodes = flow.nodes or [] if not nodes: return start_node = next( (n for n in nodes if n.get("data", {}).get("type") == "trigger"), nodes[0] if nodes else None ) if start_node: session.current_node_id = start_node["id"] await self._execute_from_node(flow, session, context) async def _execute_from_node(self, flow, session, context: FlowContext): """Execute flow starting from current node""" nodes = {n["id"]: n for n in (flow.nodes or [])} edges = flow.edges or [] current_id = session.current_node_id while current_id: node = nodes.get(current_id) if not node: break node_type = node.get("data", {}).get("type", "") config = node.get("data", {}).get("config", {}) result = await self._execute_node(node_type, config, context, session) if result == "wait": session.waiting_for_input = True session.variables = context.to_dict() self.db.commit() return next_edge = self._find_next_edge(edges, current_id, result) current_id = next_edge["target"] if next_edge else None session.current_node_id = current_id session.waiting_for_input = False session.variables = context.to_dict() self.db.commit() async def _execute_node( self, node_type: str, config: dict, context: FlowContext, session ) -> Optional[str]: """Execute a single node, return result for routing""" if node_type == "trigger": return "default" elif node_type == "message": text = context.interpolate(config.get("text", "")) await self._send_message(session.conversation_id, text) return "default" elif node_type == "buttons": text = context.interpolate(config.get("text", "")) buttons = config.get("buttons", []) button_text = "\n".join([f"• {b['label']}" for b in buttons]) await self._send_message(session.conversation_id, f"{text}\n\n{button_text}") return "wait" elif node_type == "wait_input": variable = config.get("variable", "user_input") context.set(variable, context.message.get("content", "")) return "default" elif node_type == "condition": return self._evaluate_condition(config, context) elif node_type == "set_variable": var_name = config.get("variable", "") var_value = context.interpolate(config.get("value", "")) context.set(var_name, var_value) return "default" return "default" def _evaluate_condition(self, config: dict, context: FlowContext) -> str: """Evaluate condition and return branch name""" conditions = config.get("conditions", []) for cond in conditions: field = cond.get("field", "") operator = cond.get("operator", "equals") value = cond.get("value", "") branch = cond.get("branch", "default") actual = context.get(field) or context.message.get("content", "") if operator == "equals" and str(actual).lower() == str(value).lower(): return branch elif operator == "contains" and str(value).lower() in str(actual).lower(): return branch elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): return branch return "default" def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: """Find the next edge from source node""" for edge in edges: if edge.get("source") == source_id: if handle and edge.get("sourceHandle") != handle: continue return edge return None async def _send_message(self, conversation_id: str, text: str): """Send message via API Gateway""" async with httpx.AsyncClient() as client: try: await client.post( f"{settings.API_GATEWAY_URL}/api/internal/flow/send", json={ "conversation_id": str(conversation_id), "content": text, "type": "text" }, timeout=30 ) except Exception as e: print(f"Failed to send message: {e}")