Files
WhatsAppCentralizado/services/flow-engine/app/engine.py
Claude AI 8824f65a62 feat(fase2): add Flow schemas and Flow Engine core
API Gateway:
- Add Pydantic schemas for Flow API (create, update, response, nodes, edges)

Flow Engine:
- Add FlowContext for variable management and interpolation
- Add FlowEngine for executing chatbot flows
- Support node types: trigger, message, buttons, wait_input, condition, set_variable

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 10:19:11 +00:00

221 lines
7.4 KiB
Python

from typing import Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
settings = get_settings()
class FlowEngine:
"""Executes flow nodes based on incoming messages"""
def __init__(self, db_session):
self.db = db_session
async def process_message(
self,
conversation_id: str,
contact: dict,
conversation: dict,
message: dict,
) -> bool:
"""
Process incoming message through flow engine.
Returns True if handled by a flow, False otherwise.
"""
from app.models import Flow, FlowSession, TriggerType
# Check for active flow session
session = self.db.query(FlowSession).filter(
FlowSession.conversation_id == conversation_id
).first()
if session and session.waiting_for_input:
# Continue existing flow
flow = self.db.query(Flow).filter(Flow.id == session.flow_id).first()
if flow:
context = FlowContext(contact, conversation, message, session.variables)
await self._execute_from_node(flow, session, context)
return True
# Find matching flow by trigger
flow = self._find_matching_flow(message.get("content", ""))
if flow:
context = FlowContext(contact, conversation, message)
session = FlowSession(
conversation_id=conversation_id,
flow_id=flow.id,
variables={},
)
self.db.add(session)
self.db.commit()
await self._execute_flow(flow, session, context)
return True
return False
def _find_matching_flow(self, message_text: str) -> Optional["Flow"]:
from app.models import Flow, TriggerType
message_lower = message_text.lower().strip()
# Check keyword triggers
keyword_flows = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.KEYWORD,
Flow.is_active == True
).all()
for flow in keyword_flows:
keywords = [k.strip().lower() for k in (flow.trigger_value or "").split(",")]
if any(kw in message_lower for kw in keywords if kw):
return flow
# Check welcome trigger
welcome_flow = self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.WELCOME,
Flow.is_active == True
).first()
if welcome_flow:
return welcome_flow
# Fallback
return self.db.query(Flow).filter(
Flow.trigger_type == TriggerType.FALLBACK,
Flow.is_active == True
).first()
async def _execute_flow(self, flow, session, context: FlowContext):
"""Start flow execution from first node"""
nodes = flow.nodes or []
if not nodes:
return
start_node = next(
(n for n in nodes if n.get("data", {}).get("type") == "trigger"),
nodes[0] if nodes else None
)
if start_node:
session.current_node_id = start_node["id"]
await self._execute_from_node(flow, session, context)
async def _execute_from_node(self, flow, session, context: FlowContext):
"""Execute flow starting from current node"""
nodes = {n["id"]: n for n in (flow.nodes or [])}
edges = flow.edges or []
current_id = session.current_node_id
while current_id:
node = nodes.get(current_id)
if not node:
break
node_type = node.get("data", {}).get("type", "")
config = node.get("data", {}).get("config", {})
result = await self._execute_node(node_type, config, context, session)
if result == "wait":
session.waiting_for_input = True
session.variables = context.to_dict()
self.db.commit()
return
next_edge = self._find_next_edge(edges, current_id, result)
current_id = next_edge["target"] if next_edge else None
session.current_node_id = current_id
session.waiting_for_input = False
session.variables = context.to_dict()
self.db.commit()
async def _execute_node(
self,
node_type: str,
config: dict,
context: FlowContext,
session
) -> Optional[str]:
"""Execute a single node, return result for routing"""
if node_type == "trigger":
return "default"
elif node_type == "message":
text = context.interpolate(config.get("text", ""))
await self._send_message(session.conversation_id, text)
return "default"
elif node_type == "buttons":
text = context.interpolate(config.get("text", ""))
buttons = config.get("buttons", [])
button_text = "\n".join([f"{b['label']}" for b in buttons])
await self._send_message(session.conversation_id, f"{text}\n\n{button_text}")
return "wait"
elif node_type == "wait_input":
variable = config.get("variable", "user_input")
context.set(variable, context.message.get("content", ""))
return "default"
elif node_type == "condition":
return self._evaluate_condition(config, context)
elif node_type == "set_variable":
var_name = config.get("variable", "")
var_value = context.interpolate(config.get("value", ""))
context.set(var_name, var_value)
return "default"
return "default"
def _evaluate_condition(self, config: dict, context: FlowContext) -> str:
"""Evaluate condition and return branch name"""
conditions = config.get("conditions", [])
for cond in conditions:
field = cond.get("field", "")
operator = cond.get("operator", "equals")
value = cond.get("value", "")
branch = cond.get("branch", "default")
actual = context.get(field) or context.message.get("content", "")
if operator == "equals" and str(actual).lower() == str(value).lower():
return branch
elif operator == "contains" and str(value).lower() in str(actual).lower():
return branch
elif operator == "starts_with" and str(actual).lower().startswith(str(value).lower()):
return branch
return "default"
def _find_next_edge(self, edges: list, source_id: str, handle: str = None) -> Optional[dict]:
"""Find the next edge from source node"""
for edge in edges:
if edge.get("source") == source_id:
if handle and edge.get("sourceHandle") != handle:
continue
return edge
return None
async def _send_message(self, conversation_id: str, text: str):
"""Send message via API Gateway"""
async with httpx.AsyncClient() as client:
try:
await client.post(
f"{settings.API_GATEWAY_URL}/api/internal/flow/send",
json={
"conversation_id": str(conversation_id),
"content": text,
"type": "text"
},
timeout=30
)
except Exception as e:
print(f"Failed to send message: {e}")