Add exclude_company_ids config in settings.yaml to filter out projects by Odoo company_id. Currently excludes company_id=2 (CAS 3D), keeping only Consultoria AS and unassigned projects. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
105 lines
3.3 KiB
Python
105 lines
3.3 KiB
Python
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
class OdooClient:
|
|
def __init__(self, url: str, database: str, username: str, password: str):
|
|
self.url = url.rstrip("/")
|
|
self.database = database
|
|
self.username = username
|
|
self.password = password
|
|
self.uid: int | None = None
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None or self._client.is_closed:
|
|
self._client = httpx.AsyncClient(timeout=30.0)
|
|
return self._client
|
|
|
|
async def close(self) -> None:
|
|
if self._client and not self._client.is_closed:
|
|
await self._client.aclose()
|
|
|
|
async def _jsonrpc(self, endpoint: str, params: dict[str, Any]) -> Any:
|
|
payload = {
|
|
"jsonrpc": "2.0",
|
|
"method": "call",
|
|
"params": params,
|
|
"id": 1,
|
|
}
|
|
client = await self._get_client()
|
|
response = await client.post(
|
|
f"{self.url}{endpoint}",
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
if "error" in result:
|
|
raise Exception(f"Odoo error: {result['error']}")
|
|
return result.get("result")
|
|
|
|
async def authenticate(self) -> int:
|
|
result = await self._jsonrpc("/web/session/authenticate", {
|
|
"db": self.database,
|
|
"login": self.username,
|
|
"password": self.password,
|
|
})
|
|
if isinstance(result, dict):
|
|
self.uid = result.get("uid")
|
|
else:
|
|
self.uid = result
|
|
return self.uid
|
|
|
|
async def search_read(
|
|
self,
|
|
model: str,
|
|
domain: list | None = None,
|
|
fields: list[str] | None = None,
|
|
limit: int = 0,
|
|
order: str = "",
|
|
) -> list[dict[str, Any]]:
|
|
return await self._jsonrpc(f"/web/dataset/call_kw/{model}/search_read", {
|
|
"model": model,
|
|
"method": "search_read",
|
|
"args": [domain or []],
|
|
"kwargs": {
|
|
"fields": fields or [],
|
|
"limit": limit,
|
|
"order": order,
|
|
},
|
|
})
|
|
|
|
async def get_projects(self) -> list[dict[str, Any]]:
|
|
return await self.search_read(
|
|
model="project.project",
|
|
domain=[("active", "=", True)],
|
|
fields=["name", "task_count", "color", "company_id"],
|
|
)
|
|
|
|
async def get_tasks(self, project_id: int | None = None) -> list[dict[str, Any]]:
|
|
domain: list = [("active", "=", True)]
|
|
if project_id:
|
|
domain.append(("project_id", "=", project_id))
|
|
return await self.search_read(
|
|
model="project.task",
|
|
domain=domain,
|
|
fields=[
|
|
"name", "project_id", "stage_id", "user_ids",
|
|
"priority", "date_deadline",
|
|
],
|
|
)
|
|
|
|
async def get_calendar_events(
|
|
self, date_from: str, date_to: str
|
|
) -> list[dict[str, Any]]:
|
|
return await self.search_read(
|
|
model="calendar.event",
|
|
domain=[
|
|
("start", ">=", f"{date_from} 00:00:00"),
|
|
("start", "<=", f"{date_to} 23:59:59"),
|
|
],
|
|
fields=["name", "start", "stop", "location", "description", "partner_ids"],
|
|
order="start asc",
|
|
)
|