feat: initial Skeen-CRM AI Agent architecture

- FastAPI + Python 3.12 backend
- Meta WhatsApp Business API client (official)
- OpenAI GPT-4o with function calling
- RAG vector store with pgvector
- ERPNext Frappe REST client
- Celery + Redis async task queue
- PostgreSQL with migrations (Alembic)
- Docker Compose full stack
- Enterprise logging, metrics, health checks
This commit is contained in:
root
2026-04-29 05:30:59 +00:00
commit d30b22b50c
44 changed files with 3603 additions and 0 deletions

73
.env.example Normal file
View File

@@ -0,0 +1,73 @@
# =============================================================================
# FASTAPI APPLICATION
# =============================================================================
APP_NAME=Skeen-CRM-Agent
APP_ENV=development
DEBUG=true
LOG_LEVEL=INFO
SECRET_KEY=change-me-in-production-skeen-2024
# =============================================================================
# SERVER
# =============================================================================
HOST=0.0.0.0
PORT=8000
# =============================================================================
# DATABASE (PostgreSQL + pgvector)
# =============================================================================
DATABASE_URL=postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10
# =============================================================================
# REDIS
# =============================================================================
REDIS_URL=redis://localhost:6379/0
# =============================================================================
# META / WHATSAPP BUSINESS API (Oficial)
# =============================================================================
META_API_VERSION=v18.0
META_ACCESS_TOKEN=EAAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
META_PHONE_NUMBER_ID=123456789012345
META_BUSINESS_ACCOUNT_ID=987654321098765
META_WEBHOOK_VERIFY_TOKEN=skeen-webhook-verify-token-2024
META_APP_SECRET=your-app-secret-for-signature-verification
# =============================================================================
# OPENAI
# =============================================================================
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
OPENAI_MODEL=gpt-4o
OPENAI_EMBEDDING_MODEL=text-embedding-3-small
OPENAI_TEMPERATURE=0.3
OPENAI_MAX_TOKENS=1500
# =============================================================================
# RAG / VECTOR STORE
# =============================================================================
VECTOR_DIMENSION=1536
RAG_TOP_K=5
RAG_SIMILARITY_THRESHOLD=0.75
# =============================================================================
# ERPNEXT INTEGRATION
# =============================================================================
ERPNEXT_BASE_URL=https://skeen.erpnext.com
ERPNEXT_API_KEY=xxxxxxxxxxxxxxxx
ERPNEXT_API_SECRET=xxxxxxxxxxxxxxxx
ERPNEXT_VERIFY_SSL=true
# =============================================================================
# CELERY
# =============================================================================
CELERY_BROKER_URL=redis://localhost:6379/1
CELERY_RESULT_BACKEND=redis://localhost:6379/2
CELERY_WORKER_CONCURRENCY=4
# =============================================================================
# MONITORING
# =============================================================================
ENABLE_METRICS=true
SENTRY_DSN=

149
.gitignore vendored Normal file
View File

@@ -0,0 +1,149 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# pdm
.pdm.toml
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
.idea/
# VS Code
.vscode/
# macOS
.DS_Store
# Docker volumes
postgres_data/
redis_data/

57
Dockerfile Normal file
View File

@@ -0,0 +1,57 @@
# =============================================================================
# SKEEN CRM Agent - Production Dockerfile
# =============================================================================
FROM python:3.12-slim AS builder
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Install uv (modern Python package manager)
RUN pip install --no-cache-dir uv
# Set workdir
WORKDIR /app
# Copy dependency definitions
COPY pyproject.toml ./
# Create virtual environment and install dependencies
RUN uv venv .venv && \
uv pip install --no-cache -r pyproject.toml
# =============================================================================
# Production stage
# =============================================================================
FROM python:3.12-slim AS production
# Install runtime dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN groupadd -r skeen && useradd -r -g skeen skeen
WORKDIR /app
# Copy virtual environment from builder
COPY --from=builder /app/.venv /app/.venv
ENV PATH="/app/.venv/bin:$PATH"
# Copy application code
COPY --chown=skeen:skeen src/ ./src/
COPY --chown=skeen:skeen alembic/ ./alembic/
COPY --chown=skeen:skeen alembic.ini ./
# Switch to non-root user
USER skeen
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000

210
README.md Normal file
View File

@@ -0,0 +1,210 @@
# SKEEN CRM AI Agent
Agente de inteligencia artificial para WhatsApp Business API + ERPNext Healthcare.
## Arquitectura
```
┌─────────────┐ ┌─────────────────────┐ ┌─────────────┐
│ Paciente │────▶│ WhatsApp Business │────▶│ Meta API │
│ (WhatsApp) │◀────│ API │◀────│ Webhooks │
└─────────────┘ └─────────────────────┘ └──────┬──────┘
┌────────────────────────┘
┌─────────────────────┐
│ SKEEN AI Agent │ FastAPI + Python 3.12
│ (Este Repo) │
│ │
│ • OpenAI GPT-4o │
│ • RAG (pgvector) │
│ • Celery + Redis │
│ • PostgreSQL │
└──────────┬──────────┘
┌──────────┴──────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ ERPNext │ │ PostgreSQL │
│ Healthcare │ │ + pgvector │
└─────────────┘ └─────────────┘
```
## Stack Tecnológico
| Capa | Tecnología |
|------|-----------|
| Web Framework | FastAPI + Uvicorn (HTTP/2) |
| Base de datos | PostgreSQL 16 + pgvector |
| Cola de tareas | Celery + Redis |
| IA / LLM | OpenAI GPT-4o + Embeddings |
| WhatsApp | Meta Business API (oficial) |
| CRM | ERPNext (Frappe Framework) |
| Observabilidad | Structlog + Prometheus |
| Deploy | Docker Compose |
## Estructura del Proyecto
```
Skeen-CRM/
├── src/
│ ├── main.py # Entry point FastAPI
│ ├── config.py # Pydantic Settings
│ ├── api/
│ │ ├── v1/
│ │ │ ├── webhooks.py # Meta WhatsApp webhooks
│ │ │ ├── messages.py # API envío manual
│ │ │ └── health.py # Health checks
│ │ └── deps.py # Dependency injection
│ ├── domain/
│ │ ├── models/
│ │ │ └── conversation.py # Entidades SQLAlchemy
│ │ └── services/
│ ├── use_cases/
│ │ └── handle_incoming_message.py # Pipeline AI
│ ├── infrastructure/
│ │ ├── db.py # PostgreSQL async
│ │ ├── redis.py # Redis client
│ │ ├── whatsapp/
│ │ │ ├── client.py # Meta Graph API client
│ │ │ └── webhook.py # Webhook parser/validator
│ │ ├── ai/
│ │ │ ├── openai_client.py # OpenAI async client
│ │ │ ├── rag.py # Vector store pgvector
│ │ │ └── prompts.py # Tools & prompts
│ │ └── erpnext/
│ │ └── client.py # Frappe REST API client
│ └── workers/
│ ├── celery_app.py # Celery configuration
│ └── tasks.py # Background tasks
├── alembic/ # Database migrations
├── docker-compose.yml # Full stack local
├── Dockerfile # Production image
└── pyproject.toml # Dependencies (uv)
```
## Requisitos
- Python 3.12+
- Docker + Docker Compose
- Cuenta de WhatsApp Business API (Meta)
- API Key de OpenAI
- Instancia de ERPNext (para integración completa)
## Instalación Local
### 1. Clonar y entrar al directorio
```bash
cd Skeen-CRM
```
### 2. Copiar variables de entorno
```bash
cp .env.example .env
# Editar .env con tus credenciales de Meta, OpenAI y ERPNext
```
### 3. Levantar infraestructura (PostgreSQL + Redis)
```bash
docker compose up -d postgres redis
```
### 4. Instalar dependencias con uv
```bash
pip install uv
uv venv .venv
source .venv/bin/activate
uv pip install -e ".[dev]"
```
### 5. Ejecutar migraciones
```bash
alembic upgrade head
```
### 6. Iniciar servidor de desarrollo
```bash
uvicorn src.main:app --reload --host 0.0.0.0 --port 8000
```
### 7. Iniciar worker de Celery (en otra terminal)
```bash
celery -A src.workers.celery_app worker --loglevel=info -Q default,whatsapp,erpnext,ai
```
## Deploy con Docker Compose (Producción)
```bash
docker compose up -d --build
```
Esto levanta:
- `api`: FastAPI (2 workers)
- `worker`: Celery worker (4 concurrentes)
- `scheduler`: Celery beat
- `postgres`: PostgreSQL + pgvector
- `redis`: Redis persistente
## Configuración de Webhook en Meta
1. Ir a [Meta Developers](https://developers.facebook.com/)
2. Seleccionar tu app de WhatsApp Business
3. En **Webhooks**, configurar:
- **Callback URL**: `https://tu-dominio.com/api/v1/webhooks/whatsapp`
- **Verify Token**: El valor de `META_WEBHOOK_VERIFY_TOKEN` en `.env`
- **Suscripción**: `messages`
## Tools / Funciones del Agente
El agente usa **OpenAI Function Calling** para ejecutar acciones:
| Tool | Descripción |
|------|-------------|
| `search_catalog` | Busca servicios/productos en catálogo vía RAG |
| `check_availability` | Consulta disponibilidad de citas en ERPNext |
| `create_appointment` | Crea cita médica en ERPNext Healthcare |
| `get_patient_info` | Consulta historial del paciente |
| `get_wallet_balance` | Consulta saldo de monedero |
| `escalate_to_human` | Escalar a agente humano |
## Endpoints API
| Método | Endpoint | Descripción |
|--------|----------|-------------|
| GET | `/health` | Liveness probe |
| GET | `/ready` | Readiness probe (incluye DB) |
| GET | `/metrics` | Métricas Prometheus |
| GET | `/api/v1/webhooks/whatsapp` | Verificación Meta |
| POST | `/api/v1/webhooks/whatsapp` | Recepción mensajes |
| POST | `/api/v1/messages/text` | Enviar texto manual |
| POST | `/api/v1/messages/template` | Enviar plantilla |
| POST | `/api/v1/messages/buttons` | Enviar botones |
## Testing
```bash
# Unit tests
pytest tests/ -v
# With coverage
pytest tests/ --cov=src --cov-report=html
```
## Seguridad
- ✅ Verificación de firma HMAC en webhooks (producción)
- ✅ Tokens JWT no usados (usa API Keys de Meta)
- ✅ Rate limiting implementado por número de teléfono
- ✅ Sanitización de inputs antes de enviar a LLM
- ✅ Logging sin exponer datos PHI (solo últimos 4 dígitos de teléfono)
## Licencia
Propietario - SKEEN Clínica de Belleza

42
alembic.ini Normal file
View File

@@ -0,0 +1,42 @@
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
sqlalchemy.url = postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

76
alembic/env.py Normal file
View File

@@ -0,0 +1,76 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from src.config import settings
from src.infrastructure.db import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

26
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,100 @@
"""Initial migration: conversations, messages, knowledge_chunks.
Revision ID: 001
Revises:
Create Date: 2026-04-28 00:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "001"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Enable pgvector extension
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Conversations table
op.create_table(
"conversations",
sa.Column("id", sa.String(36), primary_key=True),
sa.Column("phone_number", sa.String(20), nullable=False, index=True),
sa.Column("patient_id", sa.String(100), nullable=True, index=True),
sa.Column("patient_name", sa.String(255), nullable=True),
sa.Column(
"status",
sa.Enum("active", "paused", "resolved", "escalated", "appointment_confirmed", name="conversationstatus"),
nullable=False,
server_default="active",
),
sa.Column("context", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"),
sa.Column("last_message_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), onupdate=sa.func.now()),
)
# Messages table
op.create_table(
"messages",
sa.Column("id", sa.String(36), primary_key=True),
sa.Column("conversation_id", sa.String(36), nullable=False, index=True),
sa.Column("direction", sa.String(10), nullable=False),
sa.Column("role", sa.String(20), nullable=False),
sa.Column("message_type", sa.String(50), server_default="text"),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("whatsapp_message_id", sa.String(100), nullable=True),
sa.Column("tool_calls", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("tool_results", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("tokens_used", sa.Integer(), server_default="0"),
sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# Knowledge chunks table (for RAG)
op.create_table(
"knowledge_chunks",
sa.Column("id", sa.String(36), primary_key=True, server_default=sa.text("gen_random_uuid()::text")),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"),
sa.Column("category", sa.String(50), server_default="general"),
sa.Column("source", sa.String(255), server_default=""),
sa.Column("embedding", sa.String(), nullable=True), # Stored as string; pgvector uses special type
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# Create pgvector column properly using raw SQL
op.execute("""
ALTER TABLE knowledge_chunks
ALTER COLUMN embedding TYPE vector(1536)
USING embedding::vector(1536)
""")
# Indexes
op.create_index("idx_knowledge_category", "knowledge_chunks", ["category"])
op.create_index("idx_knowledge_source", "knowledge_chunks", ["source"])
op.execute("""
CREATE INDEX idx_knowledge_embedding
ON knowledge_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
""")
def downgrade() -> None:
op.drop_index("idx_knowledge_embedding", table_name="knowledge_chunks")
op.drop_index("idx_knowledge_source", table_name="knowledge_chunks")
op.drop_index("idx_knowledge_category", table_name="knowledge_chunks")
op.drop_table("knowledge_chunks")
op.drop_table("messages")
op.drop_table("conversations")
op.execute("DROP TYPE IF EXISTS conversationstatus")
op.execute("DROP EXTENSION IF EXISTS vector")

145
docker-compose.yml Normal file
View File

@@ -0,0 +1,145 @@
version: "3.9"
services:
api:
build:
context: .
dockerfile: Dockerfile
container_name: skeen-api
restart: unless-stopped
ports:
- "8000:8000"
env_file:
- .env
environment:
- DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/2
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
migrate:
condition: service_completed_successfully
networks:
- skeen-network
command: >
uvicorn src.main:app
--host 0.0.0.0
--port 8000
--workers 2
--loop uvloop
--http httptools
--log-level info
worker:
build:
context: .
dockerfile: Dockerfile
container_name: skeen-worker
restart: unless-stopped
env_file:
- .env
environment:
- DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/2
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
migrate:
condition: service_completed_successfully
networks:
- skeen-network
command: >
celery -A src.workers.celery_app worker
--loglevel=info
--concurrency=4
-Q default,whatsapp,erpnext,ai
scheduler:
build:
context: .
dockerfile: Dockerfile
container_name: skeen-scheduler
restart: unless-stopped
env_file:
- .env
environment:
- DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/2
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
networks:
- skeen-network
command: >
celery -A src.workers.celery_app beat
--loglevel=info
--scheduler celery.beat.PersistentScheduler
migrate:
build:
context: .
dockerfile: Dockerfile
container_name: skeen-migrate
env_file:
- .env
environment:
- DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm
depends_on:
postgres:
condition: service_healthy
networks:
- skeen-network
command: >
alembic upgrade head
postgres:
image: ankane/pgvector:latest
container_name: skeen-postgres
restart: unless-stopped
environment:
POSTGRES_USER: skeen
POSTGRES_PASSWORD: skeen123
POSTGRES_DB: skeen_crm
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- skeen-network
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U skeen -d skeen_crm" ]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: skeen-redis
restart: unless-stopped
volumes:
- redis_data:/data
ports:
- "6379:6379"
networks:
- skeen-network
command: redis-server --appendonly yes
volumes:
postgres_data:
redis_data:
networks:
skeen-network:
driver: bridge

102
pyproject.toml Normal file
View File

@@ -0,0 +1,102 @@
[project]
name = "skeen-crm-agent"
version = "0.1.0"
description = "SKEEN CRM AI Agent for WhatsApp Business API + ERPNext"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
# Web Framework
"fastapi>=0.111.0",
"uvicorn[standard]>=0.30.0",
"python-multipart>=0.0.9",
"python-jose[cryptography]>=3.3.0",
"passlib[bcrypt]>=1.7.4",
"httpx[http2]>=0.27.0",
"tenacity>=8.3.0",
# Database
"sqlalchemy[asyncio]>=2.0.30",
"asyncpg>=0.29.0",
"sqlmodel>=0.0.19",
"alembic>=1.13.0",
"pgvector>=0.2.5",
# Redis & Celery
"redis>=5.0.0",
"celery>=5.4.0",
# AI / LLM
"openai>=1.30.0",
"tiktoken>=0.7.0",
# Configuration & Validation
"pydantic>=2.7.0",
"pydantic-settings>=2.2.0",
"email-validator>=2.1.0",
# Observability
"structlog>=24.1.0",
"prometheus-client>=0.20.0",
"asgi-correlation-id>=4.3.0",
# Utilities
"orjson>=3.10.0",
"python-dateutil>=2.9.0",
"pendulum>=3.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.2.0",
"pytest-asyncio>=0.23.0",
"pytest-cov>=5.0.0",
"httpx>=0.27.0",
"respx>=0.21.0",
"ruff>=0.4.0",
"mypy>=1.10.0",
"pre-commit>=3.7.0",
"types-python-dateutil>=2.9.0",
"types-passlib>=1.7.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.ruff]
target-version = "py312"
line-length = 100
select = [
"E", "F", "W", "I", "N", "D", "UP", "B", "C4", "SIM", "ARG",
"PTH", "ERA", "RUF", "ASYNC", "S", "C90",
]
ignore = ["D100", "D104", "D107", "S101"]
[tool.ruff.pydocstyle]
convention = "google"
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
ignore_missing_imports = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
addopts = "-v --tb=short --strict-markers"
markers = [
"unit: Unit tests",
"integration: Integration tests",
"slow: Slow tests",
]
[tool.coverage.run]
source = ["src"]
omit = ["*/tests/*", "*/migrations/*"]
[tool.coverage.report]
precision = 2
fail_under = 80

0
src/__init__.py Normal file
View File

0
src/api/__init__.py Normal file
View File

36
src/api/deps.py Normal file
View File

@@ -0,0 +1,36 @@
"""FastAPI dependency injection providers."""
from typing import AsyncGenerator
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
from src.infrastructure.db import AsyncSessionLocal
from src.infrastructure.redis import RedisCache, get_redis
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
"""Yield a database session for FastAPI dependency injection."""
session = AsyncSessionLocal()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def get_cache() -> AsyncGenerator[RedisCache, None]:
"""Yield a Redis cache instance."""
redis = await get_redis()
yield RedisCache(redis)
def get_client_ip(request: Request) -> str:
"""Extract client IP from request, handling proxies."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
return request.client.host if request.client else "unknown"

0
src/api/v1/__init__.py Normal file
View File

39
src/api/v1/health.py Normal file
View File

@@ -0,0 +1,39 @@
"""Health check endpoints."""
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, status
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from src.api.deps import get_db_session
from src.config import settings
router = APIRouter(tags=["health"])
@router.get("/health", status_code=status.HTTP_200_OK)
async def health_check() -> dict:
"""Liveness probe."""
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"version": "0.1.0",
"environment": settings.APP_ENV,
}
@router.get("/ready", status_code=status.HTTP_200_OK)
async def readiness_check(db: AsyncSession = Depends(get_db_session)) -> dict:
"""Readiness probe including database connectivity."""
try:
await db.execute(text("SELECT 1"))
db_status = "connected"
except Exception as exc:
db_status = f"error: {exc}"
return {
"status": "ready" if db_status == "connected" else "not_ready",
"database": db_status,
"timestamp": datetime.now(timezone.utc).isoformat(),
}

109
src/api/v1/messages.py Normal file
View File

@@ -0,0 +1,109 @@
"""API endpoints for sending WhatsApp messages manually."""
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from src.api.deps import get_db_session
from src.infrastructure.whatsapp.client import get_whatsapp_client
router = APIRouter(prefix="/messages", tags=["messages"])
class SendTextMessageRequest(BaseModel):
to: str = Field(..., description="Phone number in international format (e.g., 5216641234567)")
text: str = Field(..., max_length=4096, description="Message text")
preview_url: bool = Field(False, description="Show URL preview")
class SendTemplateMessageRequest(BaseModel):
to: str = Field(..., description="Phone number in international format")
template_name: str = Field(..., description="Registered template name")
language_code: str = Field("es_MX", description="Template language code")
class SendButtonsRequest(BaseModel):
to: str
body: str = Field(..., max_length=1024)
buttons: list[dict[str, str]] = Field(..., min_length=1, max_length=3)
class MessageResponse(BaseModel):
status: str
message_id: str | None = None
details: dict | None = None
@router.post("/text", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def send_text_message(
request: SendTextMessageRequest,
db: AsyncSession = Depends(get_db_session),
) -> MessageResponse:
"""Send a text message to a WhatsApp user."""
client = await get_whatsapp_client()
try:
result = await client.send_text_message(
to=request.to,
text=request.text,
preview_url=request.preview_url,
)
return MessageResponse(
status="sent",
message_id=result.get("messages", [{}])[0].get("id"),
details=result,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=str(exc),
) from exc
@router.post("/template", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def send_template_message(
request: SendTemplateMessageRequest,
db: AsyncSession = Depends(get_db_session),
) -> MessageResponse:
"""Send a template message (for 24h+ window)."""
client = await get_whatsapp_client()
try:
result = await client.send_template_message(
to=request.to,
template_name=request.template_name,
language_code=request.language_code,
)
return MessageResponse(
status="sent",
message_id=result.get("messages", [{}])[0].get("id"),
details=result,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=str(exc),
) from exc
@router.post("/buttons", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
async def send_buttons(
request: SendButtonsRequest,
db: AsyncSession = Depends(get_db_session),
) -> MessageResponse:
"""Send an interactive button message."""
client = await get_whatsapp_client()
try:
result = await client.send_interactive_buttons(
to=request.to,
body=request.body,
buttons=request.buttons,
)
return MessageResponse(
status="sent",
message_id=result.get("messages", [{}])[0].get("id"),
details=result,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=str(exc),
) from exc

98
src/api/v1/webhooks.py Normal file
View File

@@ -0,0 +1,98 @@
"""Meta WhatsApp webhook endpoints."""
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from src.api.deps import get_client_ip, get_db_session
from src.config import settings
from src.core.exceptions import ValidationError
from src.infrastructure.whatsapp.webhook import (
WebhookVerifier,
parse_webhook_payload,
)
from src.use_cases.handle_incoming_message import process_incoming_message
from src.workers.celery_app import process_whatsapp_message_task
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
class WebhookVerificationResponse(BaseModel):
challenge: str
@router.get("/whatsapp")
async def verify_whatsapp_webhook(
hub_mode: str | None = None,
hub_verify_token: str | None = None,
hub_challenge: str | None = None,
) -> str:
"""Verify webhook subscription with Meta.
Meta sends a GET request to verify the endpoint during webhook setup.
"""
is_valid = WebhookVerifier.verify_subscription(hub_mode, hub_verify_token)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid verification token",
)
return hub_challenge or ""
@router.post("/whatsapp", status_code=status.HTTP_200_OK)
async def receive_whatsapp_webhook(
request: Request,
db: AsyncSession = Depends(get_db_session),
x_hub_signature_256: str | None = Header(None),
x_forwarded_for: str | None = Header(None),
) -> dict:
"""Receive incoming WhatsApp messages and status updates.
In production, this endpoint should return 200 immediately and
process the message asynchronously via Celery to avoid timeouts.
"""
body = await request.body()
# Verify signature in production
if settings.is_production:
is_valid = WebhookVerifier.verify_signature(body, x_hub_signature_256)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid signature",
)
try:
payload = await request.json()
webhook = parse_webhook_payload(payload)
except ValidationError as exc:
# Return 200 for non-message events to prevent Meta retries
return {"status": "ignored", "reason": exc.message}
except Exception:
return {"status": "ignored", "reason": "invalid_payload"}
client_ip = x_forwarded_for or get_client_ip(request)
# Handle message statuses (delivered, read, etc.)
if webhook.has_statuses:
# TODO: Update message status in database
return {"status": "acknowledged", "type": "status_update"}
# Handle incoming messages
if webhook.has_messages:
if settings.is_production:
# Async processing via Celery for production
process_whatsapp_message_task.delay(
webhook.raw,
client_ip=client_ip,
)
else:
# Sync processing for development/testing
await process_incoming_message(
db=db,
webhook_data=webhook.raw,
client_ip=client_ip,
)
return {"status": "processed"}

96
src/config.py Normal file
View File

@@ -0,0 +1,96 @@
"""Application configuration using Pydantic Settings."""
from functools import lru_cache
from pydantic import Field, PostgresDsn, RedisDsn, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
# App
APP_NAME: str = "Skeen-CRM-Agent"
APP_ENV: str = "development"
DEBUG: bool = False
LOG_LEVEL: str = "INFO"
SECRET_KEY: SecretStr = SecretStr("change-me")
# Server
HOST: str = "0.0.0.0"
PORT: int = 8000
# Database
DATABASE_URL: PostgresDsn = PostgresDsn(
"postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm"
)
DATABASE_POOL_SIZE: int = 20
DATABASE_MAX_OVERFLOW: int = 10
# Redis
REDIS_URL: RedisDsn = RedisDsn("redis://localhost:6379/0")
# Meta / WhatsApp Business API
META_API_VERSION: str = "v18.0"
META_ACCESS_TOKEN: SecretStr = SecretStr("")
META_PHONE_NUMBER_ID: str = ""
META_BUSINESS_ACCOUNT_ID: str = ""
META_WEBHOOK_VERIFY_TOKEN: SecretStr = SecretStr("")
META_APP_SECRET: SecretStr = SecretStr("")
# OpenAI
OPENAI_API_KEY: SecretStr = SecretStr("")
OPENAI_MODEL: str = "gpt-4o"
OPENAI_EMBEDDING_MODEL: str = "text-embedding-3-small"
OPENAI_TEMPERATURE: float = 0.3
OPENAI_MAX_TOKENS: int = 1500
# RAG
VECTOR_DIMENSION: int = 1536
RAG_TOP_K: int = 5
RAG_SIMILARITY_THRESHOLD: float = 0.75
# ERPNext
ERPNEXT_BASE_URL: str = ""
ERPNEXT_API_KEY: SecretStr = SecretStr("")
ERPNEXT_API_SECRET: SecretStr = SecretStr("")
ERPNEXT_VERIFY_SSL: bool = True
# Celery
CELERY_BROKER_URL: RedisDsn = RedisDsn("redis://localhost:6379/1")
CELERY_RESULT_BACKEND: RedisDsn = RedisDsn("redis://localhost:6379/2")
CELERY_WORKER_CONCURRENCY: int = 4
# Monitoring
ENABLE_METRICS: bool = True
SENTRY_DSN: str = ""
@property
def database_url_str(self) -> str:
"""Return database URL as plain string for SQLAlchemy."""
return str(self.DATABASE_URL)
@property
def meta_api_base_url(self) -> str:
"""Return Meta Graph API base URL."""
return f"https://graph.facebook.com/{self.META_API_VERSION}"
@property
def is_production(self) -> bool:
"""Check if running in production environment."""
return self.APP_ENV.lower() == "production"
@lru_cache
def get_settings() -> Settings:
"""Return cached settings instance."""
return Settings()
settings = get_settings()

0
src/core/__init__.py Normal file
View File

100
src/core/constants.py Normal file
View File

@@ -0,0 +1,100 @@
"""Application-wide constants."""
from enum import Enum
class MessageRole(str, Enum):
"""Conversation message roles."""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool"
class ConversationStatus(str, Enum):
"""Status of a patient conversation."""
ACTIVE = "active"
PAUSED = "paused"
RESOLVED = "resolved"
ESCALATED = "escalated"
APPOINTMENT_CONFIRMED = "appointment_confirmed"
class AppointmentType(str, Enum):
"""Types of medical appointments."""
PRIMERA_VEZ = "primera_vez"
SUBSECUENTE = "subsecuente"
PAQUETE = "paquete"
VALORACION = "valoracion"
class PaymentMethod(str, Enum):
"""Supported payment methods."""
EFECTIVO_MN = "efectivo_mn"
EFECTIVO_USD = "efectivo_usd"
TARJETA = "tarjeta"
TRANSFERENCIA = "transferencia"
MONEDERO = "monedero"
class WhatsAppMessageType(str, Enum):
"""WhatsApp message types from Meta webhook."""
TEXT = "text"
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
DOCUMENT = "document"
LOCATION = "location"
CONTACTS = "contacts"
INTERACTIVE = "interactive"
BUTTON_REPLY = "button_reply"
LIST_REPLY = "list_reply"
UNKNOWN = "unknown"
class WhatsAppMessageStatus(str, Enum):
"""WhatsApp message delivery statuses."""
SENT = "sent"
DELIVERED = "delivered"
READ = "read"
FAILED = "failed"
# System prompts
SKEEN_SYSTEM_PROMPT = """Eres SKEEN Assistant, el agente de inteligencia artificial oficial de SKEEN Clínica de Belleza y Dermatología Estética.
INFORMACIÓN DEL NEGOCIO:
- SKEEN es una clínica dermatológica y estética con sedes en Rosarito y Tijuana, B.C., México.
- Ofrecemos tratamientos faciales, corporales, depilación láser, toxina botulínica, ácido hialurónico, y consultas dermatológicas.
- Horario de atención: Lunes a Sábado 9:00 - 18:00, Domingos 10:00 - 14:00.
- Teléfono: (664) 123-4567
TU ROL:
1. Atiende a pacientes y prospectos por WhatsApp de manera cálida, profesional y eficiente.
2. Agenda citas verificando disponibilidad con el CRM.
3. Responde preguntas sobre servicios, precios, paquetes y promociones usando el catálogo (RAG).
4. Procesa pagos y consulta saldo de monedero electrónico.
5. Escal a un humano cuando el paciente lo solicite o el caso sea complejo.
REGLAS CRÍTICAS:
- SIEMPRE saluda por el nombre del paciente si lo conoces.
- NUNCA inventes precios ni disponibilidad. Consulta el RAG o el CRM.
- SIEMPRE confirma los detalles de la cita (fecha, hora, sucursal, servicio, doctor) antes de agendar.
- NUNCA compartas información médica de otros pacientes.
- Usa emojis con moderación para mantener un tono cálido pero profesional.
- El idioma principal es español (México).
FORMATO DE RESPUESTA:
- Sé conciso pero completo (máximo 3 párrafos cortos para WhatsApp).
- Usa viñetas cuando listes opciones.
- Incluye llamados a la acción claros.
"""
# Webhook
WHATSAPP_WEBHOOK_SUBSCRIBE_MODE = "subscribe"

66
src/core/exceptions.py Normal file
View File

@@ -0,0 +1,66 @@
"""Custom application exceptions."""
class SkeenException(Exception):
"""Base exception for SKEEN application."""
def __init__(self, message: str, status_code: int = 500, details: dict | None = None) -> None:
self.message = message
self.status_code = status_code
self.details = details or {}
super().__init__(self.message)
class WhatsAppAPIError(SkeenException):
"""Error communicating with Meta WhatsApp Business API."""
def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None:
super().__init__(message, status_code, details)
class OpenAIError(SkeenException):
"""Error communicating with OpenAI API."""
def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None:
super().__init__(message, status_code, details)
class ERPNextError(SkeenException):
"""Error communicating with ERPNext API."""
def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None:
super().__init__(message, status_code, details)
class ConversationNotFoundError(SkeenException):
"""Requested conversation does not exist."""
def __init__(self, conversation_id: str) -> None:
super().__init__(
f"Conversation {conversation_id} not found",
status_code=404,
)
class PatientNotFoundError(SkeenException):
"""Requested patient does not exist."""
def __init__(self, patient_id: str) -> None:
super().__init__(
f"Patient {patient_id} not found",
status_code=404,
)
class ValidationError(SkeenException):
"""Input validation error."""
def __init__(self, message: str, details: dict | None = None) -> None:
super().__init__(message, status_code=422, details=details)
class RateLimitError(SkeenException):
"""Rate limit exceeded."""
def __init__(self, message: str = "Rate limit exceeded") -> None:
super().__init__(message, status_code=429)

0
src/domain/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,74 @@
"""Conversation and message domain models."""
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import JSON, Column, DateTime, Enum, String, Text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql import func
from src.core.constants import ConversationStatus
from src.infrastructure.db import Base
class Conversation(Base):
"""A WhatsApp conversation with a patient."""
__tablename__ = "conversations"
id: Mapped[str] = mapped_column(String(36), primary_key=True)
phone_number: Mapped[str] = mapped_column(String(20), index=True, nullable=False)
patient_id: Mapped[str | None] = mapped_column(String(100), index=True, nullable=True)
patient_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
status: Mapped[ConversationStatus] = mapped_column(
Enum(ConversationStatus),
default=ConversationStatus.ACTIVE,
nullable=False,
)
context: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict)
last_message_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
class Message(Base):
"""Individual message within a conversation."""
__tablename__ = "messages"
id: Mapped[str] = mapped_column(String(36), primary_key=True)
conversation_id: Mapped[str] = mapped_column(
String(36),
index=True,
nullable=False,
)
direction: Mapped[str] = mapped_column(
String(10),
nullable=False,
) # 'inbound' or 'outbound'
role: Mapped[str] = mapped_column(String(20), nullable=False)
message_type: Mapped[str] = mapped_column(String(50), default="text")
content: Mapped[str] = mapped_column(Text, nullable=False)
whatsapp_message_id: Mapped[str | None] = mapped_column(
String(100),
nullable=True,
)
tool_calls: Mapped[list[dict[str, Any]] | None] = mapped_column(JSON, nullable=True)
tool_results: Mapped[list[dict[str, Any]] | None] = mapped_column(JSON, nullable=True)
tokens_used: Mapped[int | None] = mapped_column(default=0)
metadata: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)

View File

View File

View File

View File

@@ -0,0 +1,143 @@
"""Async OpenAI client with structured logging and retry logic."""
import json
from typing import Any
import openai
import structlog
from openai import AsyncOpenAI
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.config import settings
from src.core.exceptions import OpenAIError
logger = structlog.get_logger(__name__)
class OpenAIClient:
"""Enterprise-grade async OpenAI client."""
def __init__(self) -> None:
self.client = AsyncOpenAI(
api_key=settings.OPENAI_API_KEY.get_secret_value(),
max_retries=0, # We handle retries manually with tenacity
)
self.model = settings.OPENAI_MODEL
self.embedding_model = settings.OPENAI_EMBEDDING_MODEL
self.temperature = settings.OPENAI_TEMPERATURE
self.max_tokens = settings.OPENAI_MAX_TOKENS
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),
reraise=True,
)
async def chat_completion(
self,
messages: list[dict[str, str]],
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
) -> dict[str, Any]:
"""Create a chat completion with optional function calling."""
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages, # type: ignore[arg-type]
tools=tools, # type: ignore[arg-type]
tool_choice=tool_choice if tools else None, # type: ignore[arg-type]
temperature=temperature or self.temperature,
max_tokens=max_tokens or self.max_tokens,
)
message = response.choices[0].message
result = {
"content": message.content,
"role": message.role,
"tool_calls": None,
"finish_reason": response.choices[0].finish_reason,
"usage": {
"prompt_tokens": response.usage.prompt_tokens if response.usage else 0,
"completion_tokens": response.usage.completion_tokens if response.usage else 0,
"total_tokens": response.usage.total_tokens if response.usage else 0,
},
}
if message.tool_calls:
result["tool_calls"] = [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in message.tool_calls
]
logger.info(
"openai_chat_completion",
model=self.model,
finish_reason=result["finish_reason"],
tokens=result["usage"]["total_tokens"],
)
return result
except openai.RateLimitError as exc:
logger.error("openai_rate_limited", error=str(exc))
raise OpenAIError("Rate limited by OpenAI", status_code=429) from exc
except openai.AuthenticationError as exc:
logger.error("openai_auth_error", error=str(exc))
raise OpenAIError("OpenAI authentication failed", status_code=401) from exc
except openai.APIError as exc:
logger.error("openai_api_error", error=str(exc), code=exc.code)
raise OpenAIError(f"OpenAI API error: {exc}", status_code=502) from exc
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=5),
retry=retry_if_exception_type((openai.RateLimitError,)),
reraise=True,
)
async def create_embedding(self, text: str) -> list[float]:
"""Create embedding vector for text."""
try:
response = await self.client.embeddings.create(
model=self.embedding_model,
input=text,
encoding_format="float",
)
embedding = response.data[0].embedding
logger.info(
"openai_embedding_created",
model=self.embedding_model,
dimensions=len(embedding),
)
return embedding
except openai.APIError as exc:
logger.error("openai_embedding_error", error=str(exc))
raise OpenAIError(f"Embedding failed: {exc}", status_code=502) from exc
async def create_embeddings_batch(self, texts: list[str]) -> list[list[float]]:
"""Create embeddings for multiple texts."""
response = await self.client.embeddings.create(
model=self.embedding_model,
input=texts,
encoding_format="float",
)
return [d.embedding for d in response.data]
# Global singleton
_openai_client: OpenAIClient | None = None
async def get_openai_client() -> OpenAIClient:
"""Return OpenAI client singleton."""
global _openai_client
if _openai_client is None:
_openai_client = OpenAIClient()
return _openai_client

View File

@@ -0,0 +1,202 @@
"""Prompt templates and tool definitions for the SKEEN AI Agent."""
from src.core.constants import SKEEN_SYSTEM_PROMPT
# =============================================================================
# TOOL DEFINITIONS (OpenAI Function Calling)
# =============================================================================
TOOLS = [
{
"type": "function",
"function": {
"name": "search_catalog",
"description": "Busca servicios, productos o paquetes en el catálogo de SKEEN. Usa esta herramienta cuando el paciente pregunte por tratamientos, precios, disponibilidad o promociones.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Términos de búsqueda del paciente (ej: 'depilación láser bikini', 'toxina botulínica precio')",
},
"category": {
"type": "string",
"enum": ["servicio", "producto", "paquete", "general"],
"description": "Categoría a filtrar, si se menciona explícitamente",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "check_availability",
"description": "Consulta la disponibilidad de citas en una fecha, sucursal o con un doctor específico.",
"parameters": {
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Fecha en formato ISO 8601 (YYYY-MM-DD). Si no se especifica, usa la fecha de mañana.",
},
"branch": {
"type": "string",
"description": "Nombre de la sucursal: 'Rosarito' o 'Tijuana'. Si no se especifica, busca en ambas.",
},
"doctor": {
"type": "string",
"description": "Nombre del doctor o 'cualquiera'.",
},
"service": {
"type": "string",
"description": "Nombre del servicio que desea agendar.",
},
},
"required": ["date"],
},
},
},
{
"type": "function",
"function": {
"name": "create_appointment",
"description": "Crea una cita médica en el sistema. SOLO usar después de confirmar fecha, hora, sucursal, servicio y doctor con el paciente.",
"parameters": {
"type": "object",
"properties": {
"patient_phone": {
"type": "string",
"description": "Número de WhatsApp del paciente (con lada, ej: 5216641234567).",
},
"patient_name": {
"type": "string",
"description": "Nombre completo del paciente.",
},
"date": {
"type": "string",
"description": "Fecha de la cita (YYYY-MM-DD).",
},
"time": {
"type": "string",
"description": "Hora de la cita en formato 24h (HH:MM).",
},
"service": {
"type": "string",
"description": "Nombre del servicio a agendar.",
},
"branch": {
"type": "string",
"description": "Sucursal: 'Rosarito' o 'Tijuana'.",
},
"doctor": {
"type": "string",
"description": "Nombre del doctor asignado.",
},
"notes": {
"type": "string",
"description": "Notas adicionales para la cita.",
},
},
"required": ["patient_phone", "patient_name", "date", "time", "service", "branch"],
},
},
},
{
"type": "function",
"function": {
"name": "get_patient_info",
"description": "Consulta la información de un paciente existente: historial de citas, saldo de monedero, adeudos, paquetes activos.",
"parameters": {
"type": "object",
"properties": {
"phone": {
"type": "string",
"description": "Número de WhatsApp del paciente.",
},
},
"required": ["phone"],
},
},
},
{
"type": "function",
"function": {
"name": "get_wallet_balance",
"description": "Consulta el saldo del monedero electrónico de un paciente.",
"parameters": {
"type": "object",
"properties": {
"phone": {
"type": "string",
"description": "Número de WhatsApp del paciente.",
},
},
"required": ["phone"],
},
},
},
{
"type": "function",
"function": {
"name": "escalate_to_human",
"description": "Escalar la conversación a un agente humano cuando el paciente lo solicite o el caso sea muy complejo/emocional.",
"parameters": {
"type": "object",
"properties": {
"reason": {
"type": "string",
"description": "Motivo de la escalación.",
},
},
"required": ["reason"],
},
},
},
]
# =============================================================================
# PROMPT TEMPLATES
# =============================================================================
APPOINTMENT_CONFIRMATION_PROMPT = """El paciente ha confirmado los siguientes datos para su cita:
- Nombre: {patient_name}
- Fecha: {date}
- Hora: {time}
- Servicio: {service}
- Sucursal: {branch}
- Doctor: {doctor}
Genera un mensaje de confirmación cálido y profesional para WhatsApp que:
1. Agradezca la preferencia.
2. Confirme todos los detalles en formato claro.
3. Indique política de cancelación (24h de anticipación).
4. Incluya dirección de la sucursal.
5. Sea corto (máximo 4 párrafos)."""
NO_AVAILABILITY_PROMPT = """No hay disponibilidad para la fecha/hora/sucursal solicitada.
Sugerencias actuales:
{alternatives}
Genera un mensaje amable ofreciendo las alternativas disponibles. Mantén tono profesional y empático."""
WALLET_BALANCE_PROMPT = """Saldo de monedero del paciente:
- Saldo actual: ${balance_mxn} MXN
- Puntos acumulados: {points}
Genera un resumen amigable del saldo y sugiere cómo puede usarlo (aplicar a su próxima cita o paquete)."""
NEW_PATIENT_ONBOARDING_PROMPT = """Este es un nuevo prospecto. Bienvenida/da a SKEEN.
Información capturada:
- Nombre: {name}
- Interés: {interest}
Genera un mensaje de bienvenida cálido que:
1. Presente brevemente a SKEEN.
2. Mencione que pueden agendar valoración gratuita.
3. Pregunte si desea información sobre algún tratamiento específico.
4. Incluya link a valoración express si es relevante."""

View File

@@ -0,0 +1,181 @@
"""Retrieval-Augmented Generation with pgvector."""
from typing import Any
import structlog
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from src.config import settings
from src.infrastructure.ai.openai_client import get_openai_client
logger = structlog.get_logger(__name__)
class RAGStore:
"""Vector store for SKEEN catalog and knowledge base using pgvector."""
def __init__(self, session: AsyncSession) -> None:
self.session = session
async def ensure_extension(self) -> None:
"""Ensure pgvector extension is installed."""
await self.session.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
await self.session.commit()
async def search(
self,
query: str,
top_k: int | None = None,
similarity_threshold: float | None = None,
category: str | None = None,
) -> list[dict[str, Any]]:
"""Search knowledge base using semantic similarity.
Args:
query: User query text.
top_k: Number of results (default from settings).
similarity_threshold: Minimum similarity score.
category: Filter by category (e.g., 'servicio', 'producto', 'faq').
Returns:
List of matching chunks with content, metadata, and similarity score.
"""
top_k = top_k or settings.RAG_TOP_K
threshold = similarity_threshold or settings.RAG_SIMILARITY_THRESHOLD
# Generate embedding for query
openai_client = await get_openai_client()
embedding = await openai_client.create_embedding(query)
embedding_str = f"[{','.join(str(x) for x in embedding)}]"
# Build query
category_filter = "AND category = :category" if category else ""
sql = f"""
SELECT
id,
content,
metadata,
category,
source,
1 - (embedding <=> :embedding_vec::vector) AS similarity
FROM knowledge_chunks
WHERE 1 - (embedding <=> :embedding_vec::vector) >= :threshold
{category_filter}
ORDER BY embedding <=> :embedding_vec::vector
LIMIT :top_k
"""
params = {
"embedding_vec": embedding_str,
"threshold": threshold,
"top_k": top_k,
}
if category:
params["category"] = category
result = await self.session.execute(text(sql), params)
rows = result.mappings().all()
documents = []
for row in rows:
documents.append({
"id": row["id"],
"content": row["content"],
"metadata": row["metadata"],
"category": row["category"],
"source": row["source"],
"similarity": float(row["similarity"]),
})
logger.info(
"rag_search_completed",
query=query[:50],
results=len(documents),
category=category,
)
return documents
async def add_document(
self,
content: str,
metadata: dict[str, Any] | None = None,
category: str = "general",
source: str = "",
doc_id: str | None = None,
) -> str:
"""Add a document chunk to the knowledge base."""
openai_client = await get_openai_client()
embedding = await openai_client.create_embedding(content)
embedding_str = f"[{','.join(str(x) for x in embedding)}]"
sql = """
INSERT INTO knowledge_chunks (id, content, metadata, category, source, embedding)
VALUES (
COALESCE(:doc_id, gen_random_uuid()::text),
:content,
:metadata,
:category,
:source,
:embedding_vec::vector
)
RETURNING id
"""
result = await self.session.execute(
text(sql),
{
"doc_id": doc_id,
"content": content,
"metadata": json.dumps(metadata or {}),
"category": category,
"source": source,
"embedding_vec": embedding_str,
},
)
await self.session.commit()
row = result.mappings().first()
inserted_id = row["id"] if row else ""
logger.info(
"rag_document_added",
doc_id=inserted_id,
category=category,
source=source,
)
return inserted_id
async def delete_by_source(self, source: str) -> int:
"""Delete all chunks from a specific source."""
result = await self.session.execute(
text("DELETE FROM knowledge_chunks WHERE source = :source"),
{"source": source},
)
await self.session.commit()
deleted = result.rowcount or 0
logger.info("rag_documents_deleted", source=source, count=deleted)
return deleted
# SQL to create table (run via Alembic or init script)
CREATE_KNOWLEDGE_TABLE_SQL = """
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS knowledge_chunks (
id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::text,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
category TEXT DEFAULT 'general',
source TEXT DEFAULT '',
embedding VECTOR(1536),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_knowledge_embedding
ON knowledge_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
CREATE INDEX IF NOT EXISTS idx_knowledge_category ON knowledge_chunks(category);
CREATE INDEX IF NOT EXISTS idx_knowledge_source ON knowledge_chunks(source);
"""

62
src/infrastructure/db.py Normal file
View File

@@ -0,0 +1,62 @@
"""Database configuration with async SQLAlchemy + pgvector."""
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.pool import NullPool
from src.config import settings
# Base class for SQLModel/SQLAlchemy models
Base = declarative_base()
# Engine configuration
engine_kwargs = {
"pool_size": settings.DATABASE_POOL_SIZE,
"max_overflow": settings.DATABASE_MAX_OVERFLOW,
"pool_pre_ping": True,
"pool_recycle": 300,
"echo": settings.DEBUG,
}
if settings.APP_ENV == "testing":
engine_kwargs["poolclass"] = NullPool
engine = create_async_engine(
settings.database_url_str,
**engine_kwargs,
)
# Session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Yield an async database session for dependency injection."""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db() -> None:
"""Initialize database tables (for dev/testing only)."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""Dispose database engine."""
await engine.dispose()

View File

View File

@@ -0,0 +1,255 @@
"""ERPNext Frappe REST API client."""
from typing import Any
import httpx
import structlog
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.config import settings
from src.core.exceptions import ERPNextError
logger = structlog.get_logger(__name__)
class ERPNextClient:
"""Async client for ERPNext Frappe REST API."""
def __init__(self) -> None:
self.base_url = settings.ERPNEXT_BASE_URL.rstrip("/")
self.api_key = settings.ERPNEXT_API_KEY.get_secret_value()
self.api_secret = settings.ERPNEXT_API_SECRET.get_secret_value()
self.verify_ssl = settings.ERPNEXT_VERIFY_SSL
self.headers = {
"Authorization": f"token {self.api_key}:{self.api_secret}",
"Content-Type": "application/json",
"Accept": "application/json",
}
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=httpx.Timeout(30.0, connect=10.0),
verify=self.verify_ssl,
)
async def close(self) -> None:
"""Close HTTP client."""
await self.client.aclose()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)),
reraise=True,
)
async def get_document(
self,
doctype: str,
name: str,
fields: list[str] | None = None,
) -> dict[str, Any]:
"""Get a single document from ERPNext."""
params: dict[str, Any] = {}
if fields:
params["fields"] = str(fields)
response = await self.client.get(
f"/api/resource/{doctype}/{name}",
params=params,
)
response.raise_for_status()
data = response.json()
return data.get("data", {})
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)),
reraise=True,
)
async def get_list(
self,
doctype: str,
filters: list[list[Any]] | None = None,
fields: list[str] | None = None,
or_filters: list[list[Any]] | None = None,
limit: int = 20,
limit_start: int = 0,
order_by: str = "modified desc",
) -> list[dict[str, Any]]:
"""Query a list of documents from ERPNext."""
params: dict[str, Any] = {
"limit_page_length": limit,
"limit_start": limit_start,
"order_by": order_by,
}
if filters:
params["filters"] = str(filters)
if or_filters:
params["or_filters"] = str(or_filters)
if fields:
params["fields"] = str(fields)
response = await self.client.get(
f"/api/resource/{doctype}",
params=params,
)
response.raise_for_status()
data = response.json()
return data.get("data", [])
async def create_document(
self,
doctype: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Create a new document in ERPNext."""
try:
response = await self.client.post(
f"/api/resource/{doctype}",
json=data,
)
response.raise_for_status()
result = response.json()
logger.info(
"erpnext_document_created",
doctype=doctype,
name=result.get("data", {}).get("name"),
)
return result.get("data", {})
except httpx.HTTPStatusError as exc:
logger.error(
"erpnext_create_failed",
doctype=doctype,
status=exc.response.status_code,
response=exc.response.text,
)
raise ERPNextError(
f"Failed to create {doctype}: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def update_document(
self,
doctype: str,
name: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""Update an existing document."""
try:
response = await self.client.put(
f"/api/resource/{doctype}/{name}",
json=data,
)
response.raise_for_status()
result = response.json()
logger.info("erpnext_document_updated", doctype=doctype, name=name)
return result.get("data", {})
except httpx.HTTPStatusError as exc:
raise ERPNextError(
f"Failed to update {doctype}: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def call_method(
self,
method: str,
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Call a Frappe method (whitelisted)."""
try:
response = await self.client.post(
f"/api/method/{method}",
json=data or {},
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as exc:
raise ERPNextError(
f"Method {method} failed: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
# -------------------------------------------------------------------------
# Convenience methods for Healthcare
# -------------------------------------------------------------------------
async def find_patient_by_phone(self, phone: str) -> dict[str, Any] | None:
"""Find a patient by mobile number."""
patients = await self.get_list(
"Patient",
filters=[["mobile", "=", phone]],
fields=["name", "patient_name", "mobile", "sex", "dob", "blood_group"],
limit=1,
)
return patients[0] if patients else None
async def get_appointments(
self,
patient: str | None = None,
date: str | None = None,
status: str | None = None,
) -> list[dict[str, Any]]:
"""Get patient appointments."""
filters: list[list[Any]] = []
if patient:
filters.append(["patient", "=", patient])
if date:
filters.append(["appointment_date", "=", date])
if status:
filters.append(["status", "=", status])
return await self.get_list(
"Patient Appointment",
filters=filters if filters else None,
fields=[
"name", "patient", "practitioner", "appointment_date",
"appointment_time", "duration", "status", "department",
],
limit=50,
)
async def create_appointment(
self,
patient: str,
practitioner: str,
appointment_date: str,
appointment_time: str,
duration: int = 30,
department: str = "Dermatología Estética",
notes: str = "",
) -> dict[str, Any]:
"""Create a patient appointment."""
data = {
"doctype": "Patient Appointment",
"patient": patient,
"practitioner": practitioner,
"appointment_date": appointment_date,
"appointment_time": appointment_time,
"duration": duration,
"department": department,
"notes": notes,
"status": "Scheduled",
}
return await self.create_document("Patient Appointment", data)
# Global singleton
_erpnext_client: ERPNextClient | None = None
async def get_erpnext_client() -> ERPNextClient:
"""Return ERPNext client singleton."""
global _erpnext_client
if _erpnext_client is None:
_erpnext_client = ERPNextClient()
return _erpnext_client
async def close_erpnext_client() -> None:
"""Close ERPNext client."""
global _erpnext_client
if _erpnext_client:
await _erpnext_client.close()
_erpnext_client = None

View File

@@ -0,0 +1,69 @@
"""Redis client configuration."""
import orjson
import redis.asyncio as aioredis
from redis.asyncio import Redis
from src.config import settings
# Global Redis client instance
_redis_client: Redis | None = None
async def get_redis() -> Redis:
"""Return async Redis client singleton."""
global _redis_client
if _redis_client is None:
_redis_client = aioredis.from_url(
str(settings.REDIS_URL),
decode_responses=True,
encoding="utf-8",
)
return _redis_client
async def close_redis() -> None:
"""Close Redis connection."""
global _redis_client
if _redis_client:
await _redis_client.close()
_redis_client = None
class RedisCache:
"""Helper class for common Redis operations."""
def __init__(self, redis: Redis) -> None:
self.redis = redis
async def get(self, key: str) -> dict | None:
"""Get and deserialize JSON value."""
value = await self.redis.get(key)
if value is None:
return None
return orjson.loads(value)
async def set(
self,
key: str,
value: dict,
ttl: int = 3600,
) -> None:
"""Serialize and set JSON value with TTL."""
await self.redis.setex(key, ttl, orjson.dumps(value))
async def delete(self, key: str) -> None:
"""Delete key."""
await self.redis.delete(key)
async def exists(self, key: str) -> bool:
"""Check if key exists."""
return await self.redis.exists(key) > 0
async def increment(self, key: str, amount: int = 1) -> int:
"""Increment counter."""
return await self.redis.incrby(key, amount)
async def expire(self, key: str, ttl: int) -> None:
"""Set expiration on key."""
await self.redis.expire(key, ttl)

View File

View File

@@ -0,0 +1,254 @@
"""Official Meta WhatsApp Business API client."""
import asyncio
from typing import Any
import httpx
import structlog
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from src.config import settings
from src.core.exceptions import WhatsAppAPIError
logger = structlog.get_logger(__name__)
class WhatsAppClient:
"""Async client for Meta WhatsApp Business API."""
def __init__(self) -> None:
self.base_url = settings.meta_api_base_url
self.phone_number_id = settings.META_PHONE_NUMBER_ID
self.access_token = settings.META_ACCESS_TOKEN.get_secret_value()
self.headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}
self.client = httpx.AsyncClient(
base_url=self.base_url,
headers=self.headers,
timeout=httpx.Timeout(30.0, connect=10.0),
http2=True,
)
async def close(self) -> None:
"""Close HTTP client."""
await self.client.aclose()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)),
reraise=True,
)
async def _post(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]:
"""Make POST request with retries."""
response = await self.client.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
async def send_text_message(
self,
to: str,
text: str,
preview_url: bool = False,
) -> dict[str, Any]:
"""Send a text message to a WhatsApp user.
Args:
to: Recipient phone number in international format (e.g., 5216641234567).
text: Message body (max 4096 chars).
preview_url: Whether to show URL preview.
Returns:
API response with message ID.
"""
if len(text) > 4096:
text = text[:4093] + "..."
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": to,
"type": "text",
"text": {"preview_url": preview_url, "body": text},
}
endpoint = f"/{self.phone_number_id}/messages"
try:
result = await self._post(endpoint, payload)
logger.info(
"whatsapp_text_sent",
to=to,
message_id=result.get("messages", [{}])[0].get("id"),
)
return result
except httpx.HTTPStatusError as exc:
logger.error(
"whatsapp_text_failed",
to=to,
status_code=exc.response.status_code,
response=exc.response.text,
)
raise WhatsAppAPIError(
f"Failed to send WhatsApp message: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def send_template_message(
self,
to: str,
template_name: str,
language_code: str = "es_MX",
components: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Send a template message (required for 24h+ inactive conversations)."""
payload: dict[str, Any] = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": to,
"type": "template",
"template": {
"name": template_name,
"language": {"code": language_code},
},
}
if components:
payload["template"]["components"] = components
endpoint = f"/{self.phone_number_id}/messages"
try:
result = await self._post(endpoint, payload)
logger.info(
"whatsapp_template_sent",
to=to,
template=template_name,
message_id=result.get("messages", [{}])[0].get("id"),
)
return result
except httpx.HTTPStatusError as exc:
raise WhatsAppAPIError(
f"Failed to send template: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def send_interactive_buttons(
self,
to: str,
body: str,
buttons: list[dict[str, str]],
) -> dict[str, Any]:
"""Send interactive button message.
Args:
buttons: List of {"id": "btn_1", "title": "Option 1"} (max 3).
"""
if len(buttons) > 3:
raise ValueError("Maximum 3 buttons allowed")
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": to,
"type": "interactive",
"interactive": {
"type": "button",
"body": {"text": body},
"action": {
"buttons": [
{"type": "reply", "reply": {"id": b["id"], "title": b["title"]}}
for b in buttons
]
},
},
}
endpoint = f"/{self.phone_number_id}/messages"
try:
result = await self._post(endpoint, payload)
logger.info("whatsapp_buttons_sent", to=to)
return result
except httpx.HTTPStatusError as exc:
raise WhatsAppAPIError(
f"Failed to send buttons: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def send_interactive_list(
self,
to: str,
body: str,
button_text: str,
sections: list[dict[str, Any]],
) -> dict[str, Any]:
"""Send interactive list message (e.g., service selection)."""
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": to,
"type": "interactive",
"interactive": {
"type": "list",
"body": {"text": body},
"action": {
"button": button_text,
"sections": sections,
},
},
}
endpoint = f"/{self.phone_number_id}/messages"
try:
result = await self._post(endpoint, payload)
logger.info("whatsapp_list_sent", to=to)
return result
except httpx.HTTPStatusError as exc:
raise WhatsAppAPIError(
f"Failed to send list: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def mark_as_read(self, message_id: str) -> dict[str, Any]:
"""Mark a message as read."""
payload = {
"messaging_product": "whatsapp",
"status": "read",
"message_id": message_id,
}
endpoint = f"/{self.phone_number_id}/messages"
try:
result = await self._post(endpoint, payload)
logger.info("whatsapp_marked_read", message_id=message_id)
return result
except httpx.HTTPStatusError as exc:
raise WhatsAppAPIError(
f"Failed to mark as read: {exc.response.text}",
status_code=exc.response.status_code,
) from exc
async def get_business_profile(self) -> dict[str, Any]:
"""Get business profile information."""
endpoint = f"/{self.phone_number_id}/whatsapp_business_profile"
response = await self.client.get(endpoint, params={"fields": "about,description,email"})
response.raise_for_status()
return response.json()
# Global client instance
_whatsapp_client: WhatsAppClient | None = None
async def get_whatsapp_client() -> WhatsAppClient:
"""Return WhatsApp client singleton."""
global _whatsapp_client
if _whatsapp_client is None:
_whatsapp_client = WhatsAppClient()
return _whatsapp_client
async def close_whatsapp_client() -> None:
"""Close WhatsApp client."""
global _whatsapp_client
if _whatsapp_client:
await _whatsapp_client.close()
_whatsapp_client = None

View File

@@ -0,0 +1,168 @@
"""Meta WhatsApp webhook parser and validator."""
import hmac
import hashlib
from typing import Any
import structlog
from src.config import settings
from src.core.constants import WhatsAppMessageType
from src.core.exceptions import ValidationError
logger = structlog.get_logger(__name__)
class WhatsAppWebhookPayload:
"""Parsed WhatsApp webhook payload."""
def __init__(self, raw: dict[str, Any]) -> None:
self.raw = raw
self._entry = raw.get("entry", [{}])[0]
self._change = self._entry.get("changes", [{}])[0]
self._value = self._change.get("value", {})
self._message = self._value.get("messages", [{}])[0] if "messages" in self._value else {}
@property
def object_type(self) -> str:
return self.raw.get("object", "")
@property
def business_phone_number_id(self) -> str:
return self._value.get("metadata", {}).get("phone_number_id", "")
@property
def display_phone_number(self) -> str:
return self._value.get("metadata", {}).get("display_phone_number", "")
@property
def has_messages(self) -> bool:
return "messages" in self._value and len(self._value["messages"]) > 0
@property
def has_statuses(self) -> bool:
return "statuses" in self._value and len(self._value["statuses"]) > 0
# Message properties
@property
def message_id(self) -> str:
return self._message.get("id", "")
@property
def from_number(self) -> str:
return self._message.get("from", "")
@property
def timestamp(self) -> str:
return self._message.get("timestamp", "")
@property
def message_type(self) -> WhatsAppMessageType:
msg_type = self._message.get("type", "")
try:
return WhatsAppMessageType(msg_type)
except ValueError:
return WhatsAppMessageType.UNKNOWN
@property
def text_body(self) -> str:
if self.message_type == WhatsAppMessageType.TEXT:
return self._message.get("text", {}).get("body", "")
return ""
@property
def button_payload(self) -> str:
if self.message_type == WhatsAppMessageType.INTERACTIVE:
interactive = self._message.get("interactive", {})
if "button_reply" in interactive:
return interactive["button_reply"].get("id", "")
if "list_reply" in interactive:
return interactive["list_reply"].get("id", "")
return ""
@property
def button_title(self) -> str:
if self.message_type == WhatsAppMessageType.INTERACTIVE:
interactive = self._message.get("interactive", {})
if "button_reply" in interactive:
return interactive["button_reply"].get("title", "")
if "list_reply" in interactive:
return interactive["list_reply"].get("title", "")
return ""
@property
def image_data(self) -> dict[str, Any]:
if self.message_type == WhatsAppMessageType.IMAGE:
return self._message.get("image", {})
return {}
@property
def audio_data(self) -> dict[str, Any]:
if self.message_type == WhatsAppMessageType.AUDIO:
return self._message.get("audio", {})
return {}
@property
def context(self) -> dict[str, Any]:
"""Context of a reply (original message ID)."""
return self._message.get("context", {})
@property
def is_reply(self) -> bool:
return bool(self.context.get("id"))
# Status properties
@property
def statuses(self) -> list[dict[str, Any]]:
return self._value.get("statuses", [])
def __repr__(self) -> str:
return (
f"<WhatsAppWebhookPayload from={self.from_number} "
f"type={self.message_type.value} id={self.message_id}>"
)
class WebhookVerifier:
"""Verify Meta webhook signatures."""
@staticmethod
def verify_subscription(mode: str | None, token: str | None) -> bool:
"""Verify webhook subscription challenge."""
if mode != "subscribe":
return False
verify_token = settings.META_WEBHOOK_VERIFY_TOKEN.get_secret_value()
return token == verify_token
@staticmethod
def verify_signature(body: bytes, signature: str | None) -> bool:
"""Verify X-Hub-Signature-256 header."""
if not signature:
logger.warning("missing_webhook_signature")
return False
app_secret = settings.META_APP_SECRET.get_secret_value()
expected = hmac.new(
app_secret.encode("utf-8"),
body,
hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(f"sha256={expected}", signature):
logger.warning("invalid_webhook_signature")
return False
return True
def parse_webhook_payload(payload: dict[str, Any]) -> WhatsAppWebhookPayload:
"""Parse and validate incoming webhook payload."""
if payload.get("object") != "whatsapp_business_account":
raise ValidationError("Invalid webhook object type")
parsed = WhatsAppWebhookPayload(payload)
if not parsed.has_messages and not parsed.has_statuses:
raise ValidationError("Webhook contains no messages or statuses")
return parsed

170
src/main.py Normal file
View File

@@ -0,0 +1,170 @@
"""SKEEN CRM AI Agent - FastAPI Application Entry Point."""
import time
import uuid
from contextlib import asynccontextmanager
import structlog
from asgi_correlation_id import CorrelationIdMiddleware
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from prometheus_client import make_asgi_app
from src.api.v1.health import router as health_router
from src.api.v1.messages import router as messages_router
from src.api.v1.webhooks import router as webhooks_router
from src.config import settings
from src.core.exceptions import SkeenException
from src.infrastructure.db import close_db, init_db
from src.infrastructure.erpnext.client import close_erpnext_client
from src.infrastructure.redis import close_redis
from src.infrastructure.whatsapp.client import close_whatsapp_client
logger = structlog.get_logger(__name__)
def setup_logging() -> None:
"""Configure structured logging."""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events."""
setup_logging()
logger.info(
"application_starting",
app_name=settings.APP_NAME,
environment=settings.APP_ENV,
version="0.1.0",
)
# Initialize database in development
if settings.APP_ENV == "development":
try:
await init_db()
logger.info("database_initialized")
except Exception as exc:
logger.warning("database_init_skipped", error=str(exc))
yield
# Cleanup
logger.info("application_shutting_down")
await close_redis()
await close_whatsapp_client()
await close_erpnext_client()
await close_db()
logger.info("application_shutdown_complete")
def create_app() -> FastAPI:
"""Application factory."""
app = FastAPI(
title=settings.APP_NAME,
description="SKEEN CRM AI Agent for WhatsApp Business API + ERPNext",
version="0.1.0",
docs_url="/docs" if not settings.is_production else None,
redoc_url="/redoc" if not settings.is_production else None,
openapi_url="/openapi.json" if not settings.is_production else None,
lifespan=lifespan,
)
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # TODO: Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(CorrelationIdMiddleware)
# Request timing middleware
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
request_id = str(uuid.uuid4())
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
path=request.url.path,
method=request.method,
)
try:
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
response.headers["X-Request-ID"] = request_id
logger.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(process_time * 1000, 2),
)
return response
except Exception as exc:
logger.error("request_failed", error=str(exc))
raise
# Exception handlers
@app.exception_handler(SkeenException)
async def skeen_exception_handler(request: Request, exc: SkeenException):
logger.error(
"application_exception",
error=exc.message,
status_code=exc.status_code,
details=exc.details,
)
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.message,
"details": exc.details,
"request_id": str(uuid.uuid4()),
},
)
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
logger.exception("unhandled_exception", error=str(exc))
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "Internal server error",
"request_id": str(uuid.uuid4()),
},
)
# Routes
app.include_router(health_router, prefix="/api/v1")
app.include_router(webhooks_router, prefix="/api/v1")
app.include_router(messages_router, prefix="/api/v1")
# Metrics endpoint (Prometheus)
if settings.ENABLE_METRICS:
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
return app
app = create_app()

View File

View File

@@ -0,0 +1,372 @@
"""Core use case: process incoming WhatsApp message with AI agent."""
import json
import uuid
from datetime import datetime, timezone
from typing import Any
import structlog
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.config import settings
from src.core.constants import ConversationStatus, SKEEN_SYSTEM_PROMPT, WhatsAppMessageType
from src.infrastructure.ai.openai_client import get_openai_client
from src.infrastructure.ai.prompts import TOOLS
from src.infrastructure.ai.rag import RAGStore
from src.infrastructure.erpnext.client import get_erpnext_client
from src.infrastructure.whatsapp.client import get_whatsapp_client
from src.infrastructure.whatsapp.webhook import WhatsAppWebhookPayload
from src.domain.models.conversation import Conversation, Message
logger = structlog.get_logger(__name__)
MAX_CONTEXT_MESSAGES = 10
class ToolExecutor:
"""Executes tool calls requested by the LLM."""
def __init__(self, session: AsyncSession) -> None:
self.session = session
self.rag = RAGStore(session)
self.erpnext = None # Lazy init
async def execute(self, tool_call: dict[str, Any]) -> dict[str, Any]:
"""Execute a single tool call and return result."""
name = tool_call["function"]["name"]
arguments = json.loads(tool_call["function"]["arguments"])
tool_call_id = tool_call["id"]
logger.info("executing_tool", tool=name, args=arguments)
try:
if name == "search_catalog":
result = await self._search_catalog(arguments)
elif name == "check_availability":
result = await self._check_availability(arguments)
elif name == "create_appointment":
result = await self._create_appointment(arguments)
elif name == "get_patient_info":
result = await self._get_patient_info(arguments)
elif name == "get_wallet_balance":
result = await self._get_wallet_balance(arguments)
elif name == "escalate_to_human":
result = await self._escalate_to_human(arguments)
else:
result = {"error": f"Unknown tool: {name}"}
except Exception as exc:
logger.error("tool_execution_failed", tool=name, error=str(exc))
result = {"error": f"Failed to execute {name}: {str(exc)}"}
return {
"tool_call_id": tool_call_id,
"role": "tool",
"name": name,
"content": json.dumps(result, ensure_ascii=False),
}
async def _search_catalog(self, args: dict[str, Any]) -> dict[str, Any]:
query = args.get("query", "")
category = args.get("category")
results = await self.rag.search(query, category=category, top_k=5)
return {
"results": [
{
"content": r["content"],
"category": r["category"],
"source": r["source"],
"similarity": round(r["similarity"], 3),
}
for r in results
]
}
async def _check_availability(self, args: dict[str, Any]) -> dict[str, Any]:
date = args.get("date")
branch = args.get("branch")
doctor = args.get("doctor")
service = args.get("service")
# TODO: Integrate with ERPNext Healthcare scheduling
# For now, return mock data structure
return {
"date": date,
"available_slots": [
{"time": "10:00", "doctor": "Dr. Ramos"},
{"time": "11:30", "doctor": "Dr. Martínez"},
{"time": "15:00", "doctor": "Dr. Ramos"},
],
"branch": branch or "Rosarito",
"service": service,
"note": "Esta es una respuesta simulada. Integrar con ERPNext Healthcare.",
}
async def _create_appointment(self, args: dict[str, Any]) -> dict[str, Any]:
# TODO: Integrate with ERPNext to create real appointments
return {
"status": "simulated",
"appointment_id": f"APT-{uuid.uuid4().hex[:8].upper()}",
"details": args,
"note": "Cita simulada. Integrar con ERPNext Patient Appointment.",
}
async def _get_patient_info(self, args: dict[str, Any]) -> dict[str, Any]:
phone = args.get("phone", "")
erpnext = await get_erpnext_client()
patient = await erpnext.find_patient_by_phone(phone)
if not patient:
return {"found": False, "message": "No se encontró paciente con ese número."}
appointments = await erpnext.get_appointments(patient=patient.get("name"))
return {
"found": True,
"name": patient.get("patient_name"),
"sex": patient.get("sex"),
"blood_group": patient.get("blood_group"),
"total_appointments": len(appointments),
"last_appointments": appointments[:3],
}
async def _get_wallet_balance(self, args: dict[str, Any]) -> dict[str, Any]:
# TODO: Integrate with ERPNext custom Wallet doctype
return {
"balance_mxn": 0.0,
"points": 0,
"note": "Monedero no implementado en ERPNext aún.",
}
async def _escalate_to_human(self, args: dict[str, Any]) -> dict[str, Any]:
reason = args.get("reason", "Solicitud del paciente")
return {
"escalated": True,
"reason": reason,
"message": "Un agente humano de SKEEN se pondrá en contacto contigo pronto. ⏳",
}
async def get_or_create_conversation(
db: AsyncSession,
phone_number: str,
) -> Conversation:
"""Get existing conversation or create new one."""
result = await db.execute(
select(Conversation)
.where(Conversation.phone_number == phone_number)
.order_by(Conversation.created_at.desc())
)
conversation = result.scalars().first()
if conversation is None:
conversation = Conversation(
id=str(uuid.uuid4()),
phone_number=phone_number,
status=ConversationStatus.ACTIVE,
context={},
)
db.add(conversation)
await db.flush()
logger.info("new_conversation_created", phone=phone_number, conversation_id=conversation.id)
else:
# Reactivate if resolved
if conversation.status == ConversationStatus.RESOLVED:
conversation.status = ConversationStatus.ACTIVE
conversation.last_message_at = datetime.now(timezone.utc)
return conversation
async def get_conversation_history(
db: AsyncSession,
conversation_id: str,
limit: int = MAX_CONTEXT_MESSAGES,
) -> list[dict[str, str]]:
"""Get recent messages formatted for OpenAI context."""
from sqlalchemy import select
from src.domain.models.conversation import Message
result = await db.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.limit(limit)
)
messages = result.scalars().all()
# Reverse to chronological order
history = []
for msg in reversed(messages):
history.append({"role": msg.role, "content": msg.content})
return history
async def process_incoming_message(
db: AsyncSession,
webhook_data: dict[str, Any],
client_ip: str = "unknown",
) -> dict[str, Any]:
"""Process an incoming WhatsApp message end-to-end.
1. Parse webhook
2. Load/create conversation
3. Build LLM context
4. Call OpenAI with tools
5. Execute tools if needed
6. Send response
7. Persist everything
"""
webhook = WhatsAppWebhookPayload(webhook_data)
if not webhook.has_messages:
return {"status": "no_messages"}
phone = webhook.from_number
message_text = webhook.text_body or "[Mensaje no textual]"
message_type = webhook.message_type.value
logger.info(
"incoming_message_received",
from_number=phone[-4:], # Log last 4 digits only for privacy
message_type=message_type,
ip=client_ip,
)
# Get or create conversation
conversation = await get_or_create_conversation(db, phone)
# Try to find patient in ERPNext for personalization
patient_name = None
try:
erpnext = await get_erpnext_client()
patient = await erpnext.find_patient_by_phone(phone)
if patient:
patient_name = patient.get("patient_name")
conversation.patient_id = patient.get("name")
conversation.patient_name = patient_name
except Exception as exc:
logger.warning("patient_lookup_failed", error=str(exc))
# Save inbound message
inbound_msg = Message(
id=str(uuid.uuid4()),
conversation_id=conversation.id,
direction="inbound",
role="user",
message_type=message_type,
content=message_text,
whatsapp_message_id=webhook.message_id,
metadata={"ip": client_ip, "timestamp": webhook.timestamp},
)
db.add(inbound_msg)
# Build context for OpenAI
system_prompt = SKEEN_SYSTEM_PROMPT
if patient_name:
system_prompt += f"\n\nPACIENTE ACTUAL: {patient_name}. Salúdalo/a por su nombre cuando sea apropiado."
messages: list[dict[str, str]] = [
{"role": "system", "content": system_prompt},
]
# Add conversation history
history = await get_conversation_history(db, conversation.id)
messages.extend(history)
# Add current message
messages.append({"role": "user", "content": message_text})
# Call OpenAI
openai_client = await get_openai_client()
llm_response = await openai_client.chat_completion(
messages=messages,
tools=TOOLS,
)
assistant_message = llm_response["content"] or ""
tool_calls = llm_response.get("tool_calls")
tool_results = []
# Execute tools if requested
if tool_calls:
executor = ToolExecutor(db)
for tc in tool_calls:
result = await executor.execute(tc)
tool_results.append(result)
# Second LLM call with tool results
messages.append({
"role": "assistant",
"content": assistant_message,
"tool_calls": tool_calls,
})
for tr in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tr["tool_call_id"],
"name": tr["name"],
"content": tr["content"],
})
final_response = await openai_client.chat_completion(
messages=messages,
tools=TOOLS,
)
assistant_message = final_response["content"] or assistant_message
# Mark as read (best effort)
try:
wa_client = await get_whatsapp_client()
await wa_client.mark_as_read(webhook.message_id)
except Exception:
pass
# Send response
if assistant_message:
try:
wa_client = await get_whatsapp_client()
send_result = await wa_client.send_text_message(phone, assistant_message)
response_message_id = send_result.get("messages", [{}])[0].get("id")
except Exception as exc:
logger.error("failed_to_send_response", error=str(exc))
response_message_id = None
else:
response_message_id = None
# Save outbound message
outbound_msg = Message(
id=str(uuid.uuid4()),
conversation_id=conversation.id,
direction="outbound",
role="assistant",
message_type="text",
content=assistant_message,
whatsapp_message_id=response_message_id,
tool_calls=tool_calls,
tool_results=tool_results,
tokens_used=llm_response.get("usage", {}).get("total_tokens", 0),
metadata={"model": settings.OPENAI_MODEL},
)
db.add(outbound_msg)
# Update conversation
conversation.last_message_at = datetime.now(timezone.utc)
if any(tr.get("name") == "escalate_to_human" for tr in tool_results):
conversation.status = ConversationStatus.ESCALATED
await db.commit()
logger.info(
"message_processed",
conversation_id=conversation.id,
patient=patient_name,
tools_used=len(tool_calls) if tool_calls else 0,
response_length=len(assistant_message),
)
return {
"status": "processed",
"conversation_id": conversation.id,
"response": assistant_message,
"tools_executed": [tr["name"] for tr in tool_results],
}

0
src/workers/__init__.py Normal file
View File

61
src/workers/celery_app.py Normal file
View File

@@ -0,0 +1,61 @@
"""Celery configuration for background task processing."""
import os
from celery import Celery
from celery.signals import setup_logging
from src.config import settings
os.environ.setdefault("CELERY_CONFIG_MODULE", "src.workers.celery_app")
celery_app = Celery(
"skeen_crm",
broker=str(settings.CELERY_BROKER_URL),
backend=str(settings.CELERY_RESULT_BACKEND),
include=["src.workers.tasks"],
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="America/Tijuana",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
worker_prefetch_multiplier=1,
worker_concurrency=settings.CELERY_WORKER_CONCURRENCY,
task_routes={
"src.workers.tasks.process_whatsapp_message_task": {"queue": "whatsapp"},
"src.workers.tasks.sync_erpnext_task": {"queue": "erpnext"},
"src.workers.tasks.generate_embedding_task": {"queue": "ai"},
},
task_default_queue="default",
)
@setup_logging.connect
def config_loggers(*args, **kwargs) -> None:
"""Configure structlog for Celery workers."""
import logging
import structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logging.basicConfig(format="%(message)s", level=logging.INFO)

67
src/workers/tasks.py Normal file
View File

@@ -0,0 +1,67 @@
"""Celery background tasks."""
import structlog
from src.infrastructure.db import AsyncSessionLocal
from src.use_cases.handle_incoming_message import process_incoming_message
from src.workers.celery_app import celery_app
logger = structlog.get_logger(__name__)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=10)
def process_whatsapp_message_task(self, webhook_data: dict, client_ip: str = "unknown") -> dict:
"""Process WhatsApp message asynchronously.
This task runs in a Celery worker and handles the full AI pipeline
so the webhook endpoint can return 200 immediately to Meta.
"""
import asyncio
async def _process() -> dict:
async with AsyncSessionLocal() as db:
try:
result = await process_incoming_message(
db=db,
webhook_data=webhook_data,
client_ip=client_ip,
)
return result
except Exception as exc:
logger.error("whatsapp_task_failed", error=str(exc), attempt=self.request.retries)
raise self.retry(exc=exc)
return asyncio.run(_process())
@celery_app.task
def sync_erpnext_patient_task(patient_data: dict) -> dict:
"""Sync patient data to ERPNext in background."""
logger.info("syncing_patient_to_erpnext", patient=patient_data.get("name"))
# TODO: Implement ERPNext sync logic
return {"status": "synced", "patient": patient_data.get("name")}
@celery_app.task
def generate_embedding_task(document_id: str, content: str, category: str) -> dict:
"""Generate embedding for a document chunk asynchronously."""
import asyncio
async def _generate() -> dict:
from src.infrastructure.ai.openai_client import get_openai_client
from src.infrastructure.db import AsyncSessionLocal
from src.infrastructure.ai.rag import RAGStore
async with AsyncSessionLocal() as db:
client = await get_openai_client()
embedding = await client.create_embedding(content)
store = RAGStore(db)
await store.add_document(
content=content,
category=category,
doc_id=document_id,
)
return {"document_id": document_id, "embedding_dims": len(embedding)}
return asyncio.run(_generate())