diff --git a/services/api-gateway/app/schemas/__init__.py b/services/api-gateway/app/schemas/__init__.py index 9fa7c0e..287a96c 100644 --- a/services/api-gateway/app/schemas/__init__.py +++ b/services/api-gateway/app/schemas/__init__.py @@ -6,6 +6,15 @@ from app.schemas.auth import ( UserResponse, CreateUserRequest, ) +from app.schemas.flow import ( + NodeData, + FlowNode, + FlowEdge, + FlowCreate, + FlowUpdate, + FlowResponse, + FlowListResponse, +) __all__ = [ "LoginRequest", @@ -14,4 +23,11 @@ __all__ = [ "RefreshRequest", "UserResponse", "CreateUserRequest", + "NodeData", + "FlowNode", + "FlowEdge", + "FlowCreate", + "FlowUpdate", + "FlowResponse", + "FlowListResponse", ] diff --git a/services/api-gateway/app/schemas/flow.py b/services/api-gateway/app/schemas/flow.py new file mode 100644 index 0000000..2f5a48b --- /dev/null +++ b/services/api-gateway/app/schemas/flow.py @@ -0,0 +1,74 @@ +from pydantic import BaseModel +from typing import Optional, List +from uuid import UUID +from datetime import datetime +from app.models.flow import TriggerType + + +class NodeData(BaseModel): + label: str + type: str + config: dict = {} + + +class FlowNode(BaseModel): + id: str + type: str + position: dict + data: NodeData + + +class FlowEdge(BaseModel): + id: str + source: str + target: str + sourceHandle: Optional[str] = None + targetHandle: Optional[str] = None + + +class FlowCreate(BaseModel): + name: str + description: Optional[str] = None + trigger_type: TriggerType + trigger_value: Optional[str] = None + + +class FlowUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + trigger_type: Optional[TriggerType] = None + trigger_value: Optional[str] = None + nodes: Optional[List[dict]] = None + edges: Optional[List[dict]] = None + variables: Optional[dict] = None + is_active: Optional[bool] = None + + +class FlowResponse(BaseModel): + id: UUID + name: str + description: Optional[str] + trigger_type: TriggerType + trigger_value: Optional[str] + nodes: List[dict] + edges: List[dict] + variables: dict + is_active: bool + version: int + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class FlowListResponse(BaseModel): + id: UUID + name: str + trigger_type: TriggerType + is_active: bool + version: int + updated_at: datetime + + class Config: + from_attributes = True diff --git a/services/flow-engine/app/context.py b/services/flow-engine/app/context.py new file mode 100644 index 0000000..f559327 --- /dev/null +++ b/services/flow-engine/app/context.py @@ -0,0 +1,67 @@ +from typing import Any, Optional +from datetime import datetime +import re + + +class FlowContext: + """Manages variables and state during flow execution""" + + def __init__( + self, + contact: dict, + conversation: dict, + message: dict, + session_vars: dict = None + ): + self.contact = contact + self.conversation = conversation + self.message = message + self.variables = session_vars or {} + self._system = { + "date": datetime.now().strftime("%Y-%m-%d"), + "time": datetime.now().strftime("%H:%M"), + "day_of_week": datetime.now().strftime("%A"), + } + + def get(self, key: str) -> Any: + """Get variable by dot notation: contact.name, variables.email""" + parts = key.split(".") + + if parts[0] == "contact": + return self._get_nested(self.contact, parts[1:]) + elif parts[0] == "conversation": + return self._get_nested(self.conversation, parts[1:]) + elif parts[0] == "message": + return self._get_nested(self.message, parts[1:]) + elif parts[0] == "system": + return self._get_nested(self._system, parts[1:]) + elif parts[0] == "variables": + return self._get_nested(self.variables, parts[1:]) + else: + return self.variables.get(key) + + def set(self, key: str, value: Any): + """Set a session variable""" + self.variables[key] = value + + def _get_nested(self, obj: dict, keys: list) -> Any: + for key in keys: + if isinstance(obj, dict): + obj = obj.get(key) + else: + return None + return obj + + def interpolate(self, text: str) -> str: + """Replace {{variable}} with actual values""" + pattern = r'\{\{([^}]+)\}\}' + + def replace(match): + key = match.group(1).strip() + value = self.get(key) + return str(value) if value is not None else "" + + return re.sub(pattern, replace, text) + + def to_dict(self) -> dict: + return self.variables.copy() diff --git a/services/flow-engine/app/engine.py b/services/flow-engine/app/engine.py new file mode 100644 index 0000000..adb0791 --- /dev/null +++ b/services/flow-engine/app/engine.py @@ -0,0 +1,220 @@ +from typing import Optional +import httpx +from app.config import get_settings +from app.context import FlowContext + +settings = get_settings() + + +class FlowEngine: + """Executes flow nodes based on incoming messages""" + + def __init__(self, db_session): + self.db = db_session + + async def process_message( + self, + conversation_id: str, + contact: dict, + conversation: dict, + message: dict, + ) -> bool: + """ + Process incoming message through flow engine. + Returns True if handled by a flow, False otherwise. + """ + from app.models import Flow, FlowSession, TriggerType + + # Check for active flow session + session = self.db.query(FlowSession).filter( + FlowSession.conversation_id == conversation_id + ).first() + + if session and session.waiting_for_input: + # Continue existing flow + flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first() + if flow: + context = FlowContext(contact, conversation, message, session.variables) + await self._execute_from_node(flow, session, context) + return True + + # Find matching flow by trigger + flow = self._find_matching_flow(message.get("content", "")) + + if flow: + context = FlowContext(contact, conversation, message) + session = FlowSession( + conversation_id=conversation_id, + flow_id=flow.id, + variables={}, + ) + self.db.add(session) + self.db.commit() + + await self._execute_flow(flow, session, context) + return True + + return False + + def _find_matching_flow(self, message_text: str) -> Optional["Flow"]: + from app.models import Flow, TriggerType + + message_lower = message_text.lower().strip() + + # Check keyword triggers + keyword_flows = self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.KEYWORD, + Flow.is_active == True + ).all() + + for flow in keyword_flows: + keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")] + if any(kw in message_lower for kw in keywords if kw): + return flow + + # Check welcome trigger + welcome_flow = self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.WELCOME, + Flow.is_active == True + ).first() + + if welcome_flow: + return welcome_flow + + # Fallback + return self.db.query(Flow).filter( + Flow.trigger_type == TriggerType.FALLBACK, + Flow.is_active == True + ).first() + + async def _execute_flow(self, flow, session, context: FlowContext): + """Start flow execution from first node""" + nodes = flow.nodes or [] + if not nodes: + return + + start_node = next( + (n for n in nodes if n.get("data", {}).get("type") == "trigger"), + nodes[0] if nodes else None + ) + + if start_node: + session.current_node_id = start_node["id"] + await self._execute_from_node(flow, session, context) + + async def _execute_from_node(self, flow, session, context: FlowContext): + """Execute flow starting from current node""" + nodes = {n["id"]: n for n in (flow.nodes or [])} + edges = flow.edges or [] + + current_id = session.current_node_id + + while current_id: + node = nodes.get(current_id) + if not node: + break + + node_type = node.get("data", {}).get("type", "") + config = node.get("data", {}).get("config", {}) + + result = await self._execute_node(node_type, config, context, session) + + if result == "wait": + session.waiting_for_input = True + session.variables = context.to_dict() + self.db.commit() + return + + next_edge = self._find_next_edge(edges, current_id, result) + current_id = next_edge["target"] if next_edge else None + session.current_node_id = current_id + session.waiting_for_input = False + + session.variables = context.to_dict() + self.db.commit() + + async def _execute_node( + self, + node_type: str, + config: dict, + context: FlowContext, + session + ) -> Optional[str]: + """Execute a single node, return result for routing""" + + if node_type == "trigger": + return "default" + + elif node_type == "message": + text = context.interpolate(config.get("text", "")) + await self._send_message(session.conversation_id, text) + return "default" + + elif node_type == "buttons": + text = context.interpolate(config.get("text", "")) + buttons = config.get("buttons", []) + button_text = "\n".join([f"• {b['label']}" for b in buttons]) + await self._send_message(session.conversation_id, f"{text}\n\n{button_text}") + return "wait" + + elif node_type == "wait_input": + variable = config.get("variable", "user_input") + context.set(variable, context.message.get("content", "")) + return "default" + + elif node_type == "condition": + return self._evaluate_condition(config, context) + + elif node_type == "set_variable": + var_name = config.get("variable", "") + var_value = context.interpolate(config.get("value", "")) + context.set(var_name, var_value) + return "default" + + return "default" + + def _evaluate_condition(self, config: dict, context: FlowContext) -> str: + """Evaluate condition and return branch name""" + conditions = config.get("conditions", []) + + for cond in conditions: + field = cond.get("field", "") + operator = cond.get("operator", "equals") + value = cond.get("value", "") + branch = cond.get("branch", "default") + + actual = context.get(field) or context.message.get("content", "") + + if operator == "equals" and str(actual).lower() == str(value).lower(): + return branch + elif operator == "contains" and str(value).lower() in str(actual).lower(): + return branch + elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()): + return branch + + return "default" + + def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]: + """Find the next edge from source node""" + for edge in edges: + if edge.get("source") == source_id: + if handle and edge.get("sourceHandle") != handle: + continue + return edge + return None + + async def _send_message(self, conversation_id: str, text: str): + """Send message via API Gateway""" + async with httpx.AsyncClient() as client: + try: + await client.post( + f"{settings.API_GATEWAY_URL}/api/internal/flow/send", + json={ + "conversation_id": str(conversation_id), + "content": text, + "type": "text" + }, + timeout=30 + ) + except Exception as e: + print(f"Failed to send message: {e}")