Files
Claude AI d1d1aa58e1 feat(flow-engine): add Odoo node executors
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 22:28:26 +00:00

273 lines
9.2 KiB
Python

from typing import Any, Optional
import httpx
from app.config import get_settings
from app.context import FlowContext
from app.nodes.base import NodeExecutor
settings = get_settings()
class OdooSearchPartnerExecutor(NodeExecutor):
"""Search Odoo partner by phone"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
phone = context.interpolate(config.get("phone", "{{contact.phone_number}}"))
output_var = config.get("output_variable", "_odoo_partner")
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{settings.INTEGRATIONS_URL}/api/odoo/partners/search",
params={"phone": phone},
timeout=15,
)
if response.status_code == 200:
context.set(output_var, response.json())
return "found"
elif response.status_code == 404:
context.set(output_var, None)
return "not_found"
else:
context.set("_odoo_error", response.text)
return "error"
except Exception as e:
context.set("_odoo_error", str(e))
return "error"
class OdooCreatePartnerExecutor(NodeExecutor):
"""Create Odoo partner"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
data = {
"name": context.interpolate(config.get("name", "{{contact.name}}")),
"mobile": context.interpolate(config.get("phone", "{{contact.phone_number}}")),
}
if config.get("email"):
data["email"] = context.interpolate(config["email"])
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.INTEGRATIONS_URL}/api/odoo/partners",
json=data,
timeout=15,
)
if response.status_code == 200:
result = response.json()
context.set("_odoo_partner_id", result["id"])
return "success"
else:
context.set("_odoo_error", response.text)
return "error"
except Exception as e:
context.set("_odoo_error", str(e))
return "error"
class OdooGetBalanceExecutor(NodeExecutor):
"""Get partner balance"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
partner_id = config.get("partner_id") or context.get("_odoo_partner.id")
output_var = config.get("output_variable", "_odoo_balance")
if not partner_id:
return "error"
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{settings.INTEGRATIONS_URL}/api/odoo/partners/{partner_id}/balance",
timeout=15,
)
if response.status_code == 200:
context.set(output_var, response.json())
return "success"
else:
return "error"
except Exception:
return "error"
class OdooSearchOrdersExecutor(NodeExecutor):
"""Search orders for partner"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
partner_id = config.get("partner_id") or context.get("_odoo_partner.id")
state = config.get("state")
limit = config.get("limit", 5)
output_var = config.get("output_variable", "_odoo_orders")
if not partner_id:
return "error"
params = {"limit": limit}
if state:
params["state"] = state
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{settings.INTEGRATIONS_URL}/api/odoo/sales/partner/{partner_id}",
params=params,
timeout=15,
)
if response.status_code == 200:
orders = response.json()
context.set(output_var, orders)
return "found" if orders else "not_found"
else:
return "error"
except Exception:
return "error"
class OdooGetOrderExecutor(NodeExecutor):
"""Get order details by ID or name"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
order_id = config.get("order_id")
order_name = config.get("order_name")
output_var = config.get("output_variable", "_odoo_order")
try:
async with httpx.AsyncClient() as client:
if order_id:
url = f"{settings.INTEGRATIONS_URL}/api/odoo/sales/{order_id}"
elif order_name:
name = context.interpolate(order_name)
url = f"{settings.INTEGRATIONS_URL}/api/odoo/sales/name/{name}"
else:
return "error"
response = await client.get(url, timeout=15)
if response.status_code == 200:
context.set(output_var, response.json())
return "found"
elif response.status_code == 404:
return "not_found"
else:
return "error"
except Exception:
return "error"
class OdooSearchProductsExecutor(NodeExecutor):
"""Search products"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
query = context.interpolate(config.get("query", ""))
limit = config.get("limit", 10)
output_var = config.get("output_variable", "_odoo_products")
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{settings.INTEGRATIONS_URL}/api/odoo/products",
params={"q": query, "limit": limit},
timeout=15,
)
if response.status_code == 200:
products = response.json()
context.set(output_var, products)
return "found" if products else "not_found"
else:
return "error"
except Exception:
return "error"
class OdooCheckStockExecutor(NodeExecutor):
"""Check product stock"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
product_id = config.get("product_id")
quantity = config.get("quantity", 1)
output_var = config.get("output_variable", "_odoo_stock")
if not product_id:
return "error"
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{settings.INTEGRATIONS_URL}/api/odoo/products/{product_id}/availability",
params={"quantity": quantity},
timeout=15,
)
if response.status_code == 200:
result = response.json()
context.set(output_var, result)
return "available" if result["available"] else "unavailable"
else:
return "error"
except Exception:
return "error"
class OdooCreateLeadExecutor(NodeExecutor):
"""Create CRM lead"""
async def execute(
self, config: dict, context: FlowContext, session: Any
) -> Optional[str]:
data = {
"name": context.interpolate(config.get("name", "Lead desde WhatsApp")),
"contact_name": context.interpolate(config.get("contact_name", "{{contact.name}}")),
"phone": context.interpolate(config.get("phone", "{{contact.phone_number}}")),
}
if config.get("email"):
data["email_from"] = context.interpolate(config["email"])
if config.get("description"):
data["description"] = context.interpolate(config["description"])
if config.get("expected_revenue"):
data["expected_revenue"] = config["expected_revenue"]
partner = context.get("_odoo_partner")
if partner and isinstance(partner, dict) and partner.get("id"):
data["partner_id"] = partner["id"]
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.INTEGRATIONS_URL}/api/odoo/crm/leads",
json=data,
timeout=15,
)
if response.status_code == 200:
result = response.json()
context.set("_odoo_lead_id", result["id"])
return "success"
else:
context.set("_odoo_error", response.text)
return "error"
except Exception as e:
context.set("_odoo_error", str(e))
return "error"