diff --git a/services/flow-engine/app/config.py b/services/flow-engine/app/config.py index 396f48e..16b34cc 100644 --- a/services/flow-engine/app/config.py +++ b/services/flow-engine/app/config.py @@ -13,6 +13,7 @@ class Settings(BaseSettings): API_GATEWAY_URL: str = "http://localhost:8000" WHATSAPP_CORE_URL: str = "http://localhost:3001" + INTEGRATIONS_URL: str = "http://localhost:8002" class Config: env_file = ".env" diff --git a/services/flow-engine/app/engine.py b/services/flow-engine/app/engine.py index dcc11b0..15a4bc0 100644 --- a/services/flow-engine/app/engine.py +++ b/services/flow-engine/app/engine.py @@ -16,6 +16,16 @@ from app.nodes.validation import ( ) from app.nodes.script import JavaScriptExecutor, HttpRequestExecutor from app.nodes.ai import AIResponseExecutor, AISentimentExecutor +from app.nodes.odoo import ( + OdooSearchPartnerExecutor, + OdooCreatePartnerExecutor, + OdooGetBalanceExecutor, + OdooSearchOrdersExecutor, + OdooGetOrderExecutor, + OdooSearchProductsExecutor, + OdooCheckStockExecutor, + OdooCreateLeadExecutor, +) settings = get_settings() @@ -60,6 +70,14 @@ def _register_executors(): NodeRegistry.register("http_request", HttpRequestExecutor()) NodeRegistry.register("ai_response", AIResponseExecutor()) NodeRegistry.register("ai_sentiment", AISentimentExecutor()) + NodeRegistry.register("odoo_search_partner", OdooSearchPartnerExecutor()) + NodeRegistry.register("odoo_create_partner", OdooCreatePartnerExecutor()) + NodeRegistry.register("odoo_get_balance", OdooGetBalanceExecutor()) + NodeRegistry.register("odoo_search_orders", OdooSearchOrdersExecutor()) + NodeRegistry.register("odoo_get_order", OdooGetOrderExecutor()) + NodeRegistry.register("odoo_search_products", OdooSearchProductsExecutor()) + NodeRegistry.register("odoo_check_stock", OdooCheckStockExecutor()) + NodeRegistry.register("odoo_create_lead", OdooCreateLeadExecutor()) _register_executors() diff --git a/services/flow-engine/app/nodes/__init__.py b/services/flow-engine/app/nodes/__init__.py index d1949e2..d172e5d 100644 --- a/services/flow-engine/app/nodes/__init__.py +++ b/services/flow-engine/app/nodes/__init__.py @@ -24,3 +24,13 @@ from app.nodes.validation import ( ValidatePhoneExecutor, ValidateRegexExecutor, ) +from app.nodes.odoo import ( + OdooSearchPartnerExecutor, + OdooCreatePartnerExecutor, + OdooGetBalanceExecutor, + OdooSearchOrdersExecutor, + OdooGetOrderExecutor, + OdooSearchProductsExecutor, + OdooCheckStockExecutor, + OdooCreateLeadExecutor, +) diff --git a/services/flow-engine/app/nodes/odoo.py b/services/flow-engine/app/nodes/odoo.py new file mode 100644 index 0000000..cf048dd --- /dev/null +++ b/services/flow-engine/app/nodes/odoo.py @@ -0,0 +1,272 @@ +from typing import Any, Optional +import httpx + +from app.config import get_settings +from app.context import FlowContext +from app.nodes.base import NodeExecutor + +settings = get_settings() + + +class OdooSearchPartnerExecutor(NodeExecutor): + """Search Odoo partner by phone""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + phone = context.interpolate(config.get("phone", "{{contact.phone_number}}")) + output_var = config.get("output_variable", "_odoo_partner") + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.INTEGRATIONS_URL}/api/odoo/partners/search", + params={"phone": phone}, + timeout=15, + ) + + if response.status_code == 200: + context.set(output_var, response.json()) + return "found" + elif response.status_code == 404: + context.set(output_var, None) + return "not_found" + else: + context.set("_odoo_error", response.text) + return "error" + except Exception as e: + context.set("_odoo_error", str(e)) + return "error" + + +class OdooCreatePartnerExecutor(NodeExecutor): + """Create Odoo partner""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + data = { + "name": context.interpolate(config.get("name", "{{contact.name}}")), + "mobile": context.interpolate(config.get("phone", "{{contact.phone_number}}")), + } + + if config.get("email"): + data["email"] = context.interpolate(config["email"]) + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{settings.INTEGRATIONS_URL}/api/odoo/partners", + json=data, + timeout=15, + ) + + if response.status_code == 200: + result = response.json() + context.set("_odoo_partner_id", result["id"]) + return "success" + else: + context.set("_odoo_error", response.text) + return "error" + except Exception as e: + context.set("_odoo_error", str(e)) + return "error" + + +class OdooGetBalanceExecutor(NodeExecutor): + """Get partner balance""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + partner_id = config.get("partner_id") or context.get("_odoo_partner.id") + output_var = config.get("output_variable", "_odoo_balance") + + if not partner_id: + return "error" + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.INTEGRATIONS_URL}/api/odoo/partners/{partner_id}/balance", + timeout=15, + ) + + if response.status_code == 200: + context.set(output_var, response.json()) + return "success" + else: + return "error" + except Exception: + return "error" + + +class OdooSearchOrdersExecutor(NodeExecutor): + """Search orders for partner""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + partner_id = config.get("partner_id") or context.get("_odoo_partner.id") + state = config.get("state") + limit = config.get("limit", 5) + output_var = config.get("output_variable", "_odoo_orders") + + if not partner_id: + return "error" + + params = {"limit": limit} + if state: + params["state"] = state + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.INTEGRATIONS_URL}/api/odoo/sales/partner/{partner_id}", + params=params, + timeout=15, + ) + + if response.status_code == 200: + orders = response.json() + context.set(output_var, orders) + return "found" if orders else "not_found" + else: + return "error" + except Exception: + return "error" + + +class OdooGetOrderExecutor(NodeExecutor): + """Get order details by ID or name""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + order_id = config.get("order_id") + order_name = config.get("order_name") + output_var = config.get("output_variable", "_odoo_order") + + try: + async with httpx.AsyncClient() as client: + if order_id: + url = f"{settings.INTEGRATIONS_URL}/api/odoo/sales/{order_id}" + elif order_name: + name = context.interpolate(order_name) + url = f"{settings.INTEGRATIONS_URL}/api/odoo/sales/name/{name}" + else: + return "error" + + response = await client.get(url, timeout=15) + + if response.status_code == 200: + context.set(output_var, response.json()) + return "found" + elif response.status_code == 404: + return "not_found" + else: + return "error" + except Exception: + return "error" + + +class OdooSearchProductsExecutor(NodeExecutor): + """Search products""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + query = context.interpolate(config.get("query", "")) + limit = config.get("limit", 10) + output_var = config.get("output_variable", "_odoo_products") + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.INTEGRATIONS_URL}/api/odoo/products", + params={"q": query, "limit": limit}, + timeout=15, + ) + + if response.status_code == 200: + products = response.json() + context.set(output_var, products) + return "found" if products else "not_found" + else: + return "error" + except Exception: + return "error" + + +class OdooCheckStockExecutor(NodeExecutor): + """Check product stock""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + product_id = config.get("product_id") + quantity = config.get("quantity", 1) + output_var = config.get("output_variable", "_odoo_stock") + + if not product_id: + return "error" + + try: + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.INTEGRATIONS_URL}/api/odoo/products/{product_id}/availability", + params={"quantity": quantity}, + timeout=15, + ) + + if response.status_code == 200: + result = response.json() + context.set(output_var, result) + return "available" if result["available"] else "unavailable" + else: + return "error" + except Exception: + return "error" + + +class OdooCreateLeadExecutor(NodeExecutor): + """Create CRM lead""" + + async def execute( + self, config: dict, context: FlowContext, session: Any + ) -> Optional[str]: + data = { + "name": context.interpolate(config.get("name", "Lead desde WhatsApp")), + "contact_name": context.interpolate(config.get("contact_name", "{{contact.name}}")), + "phone": context.interpolate(config.get("phone", "{{contact.phone_number}}")), + } + + if config.get("email"): + data["email_from"] = context.interpolate(config["email"]) + if config.get("description"): + data["description"] = context.interpolate(config["description"]) + if config.get("expected_revenue"): + data["expected_revenue"] = config["expected_revenue"] + + partner = context.get("_odoo_partner") + if partner and isinstance(partner, dict) and partner.get("id"): + data["partner_id"] = partner["id"] + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{settings.INTEGRATIONS_URL}/api/odoo/crm/leads", + json=data, + timeout=15, + ) + + if response.status_code == 200: + result = response.json() + context.set("_odoo_lead_id", result["id"]) + return "success" + else: + context.set("_odoo_error", response.text) + return "error" + except Exception as e: + context.set("_odoo_error", str(e)) + return "error"