From d30b22b50c34b1843ca684b7b7cd0b2f47d9fd84 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 05:30:59 +0000 Subject: [PATCH] feat: initial Skeen-CRM AI Agent architecture - FastAPI + Python 3.12 backend - Meta WhatsApp Business API client (official) - OpenAI GPT-4o with function calling - RAG vector store with pgvector - ERPNext Frappe REST client - Celery + Redis async task queue - PostgreSQL with migrations (Alembic) - Docker Compose full stack - Enterprise logging, metrics, health checks --- .env.example | 73 +++++ .gitignore | 149 +++++++++ Dockerfile | 57 ++++ README.md | 210 +++++++++++++ alembic.ini | 42 +++ alembic/README | 1 + alembic/env.py | 76 +++++ alembic/script.py.mako | 26 ++ alembic/versions/20260428_init.py | 100 ++++++ docker-compose.yml | 145 +++++++++ pyproject.toml | 102 +++++++ src/__init__.py | 0 src/api/__init__.py | 0 src/api/deps.py | 36 +++ src/api/v1/__init__.py | 0 src/api/v1/health.py | 39 +++ src/api/v1/messages.py | 109 +++++++ src/api/v1/webhooks.py | 98 ++++++ src/config.py | 96 ++++++ src/core/__init__.py | 0 src/core/constants.py | 100 ++++++ src/core/exceptions.py | 66 ++++ src/domain/__init__.py | 0 src/domain/models/__init__.py | 0 src/domain/models/conversation.py | 74 +++++ src/domain/services/__init__.py | 0 src/infrastructure/__init__.py | 0 src/infrastructure/ai/__init__.py | 0 src/infrastructure/ai/openai_client.py | 143 +++++++++ src/infrastructure/ai/prompts.py | 202 ++++++++++++ src/infrastructure/ai/rag.py | 181 +++++++++++ src/infrastructure/db.py | 62 ++++ src/infrastructure/erpnext/__init__.py | 0 src/infrastructure/erpnext/client.py | 255 ++++++++++++++++ src/infrastructure/redis.py | 69 +++++ src/infrastructure/whatsapp/__init__.py | 0 src/infrastructure/whatsapp/client.py | 254 ++++++++++++++++ src/infrastructure/whatsapp/webhook.py | 168 ++++++++++ src/main.py | 170 +++++++++++ src/use_cases/__init__.py | 0 src/use_cases/handle_incoming_message.py | 372 +++++++++++++++++++++++ src/workers/__init__.py | 0 src/workers/celery_app.py | 61 ++++ src/workers/tasks.py | 67 ++++ 44 files changed, 3603 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 alembic.ini create mode 100644 alembic/README create mode 100644 alembic/env.py create mode 100644 alembic/script.py.mako create mode 100644 alembic/versions/20260428_init.py create mode 100644 docker-compose.yml create mode 100644 pyproject.toml create mode 100644 src/__init__.py create mode 100644 src/api/__init__.py create mode 100644 src/api/deps.py create mode 100644 src/api/v1/__init__.py create mode 100644 src/api/v1/health.py create mode 100644 src/api/v1/messages.py create mode 100644 src/api/v1/webhooks.py create mode 100644 src/config.py create mode 100644 src/core/__init__.py create mode 100644 src/core/constants.py create mode 100644 src/core/exceptions.py create mode 100644 src/domain/__init__.py create mode 100644 src/domain/models/__init__.py create mode 100644 src/domain/models/conversation.py create mode 100644 src/domain/services/__init__.py create mode 100644 src/infrastructure/__init__.py create mode 100644 src/infrastructure/ai/__init__.py create mode 100644 src/infrastructure/ai/openai_client.py create mode 100644 src/infrastructure/ai/prompts.py create mode 100644 src/infrastructure/ai/rag.py create mode 100644 src/infrastructure/db.py create mode 100644 src/infrastructure/erpnext/__init__.py create mode 100644 src/infrastructure/erpnext/client.py create mode 100644 src/infrastructure/redis.py create mode 100644 src/infrastructure/whatsapp/__init__.py create mode 100644 src/infrastructure/whatsapp/client.py create mode 100644 src/infrastructure/whatsapp/webhook.py create mode 100644 src/main.py create mode 100644 src/use_cases/__init__.py create mode 100644 src/use_cases/handle_incoming_message.py create mode 100644 src/workers/__init__.py create mode 100644 src/workers/celery_app.py create mode 100644 src/workers/tasks.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..107327c --- /dev/null +++ b/.env.example @@ -0,0 +1,73 @@ +# ============================================================================= +# FASTAPI APPLICATION +# ============================================================================= +APP_NAME=Skeen-CRM-Agent +APP_ENV=development +DEBUG=true +LOG_LEVEL=INFO +SECRET_KEY=change-me-in-production-skeen-2024 + +# ============================================================================= +# SERVER +# ============================================================================= +HOST=0.0.0.0 +PORT=8000 + +# ============================================================================= +# DATABASE (PostgreSQL + pgvector) +# ============================================================================= +DATABASE_URL=postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm +DATABASE_POOL_SIZE=20 +DATABASE_MAX_OVERFLOW=10 + +# ============================================================================= +# REDIS +# ============================================================================= +REDIS_URL=redis://localhost:6379/0 + +# ============================================================================= +# META / WHATSAPP BUSINESS API (Oficial) +# ============================================================================= +META_API_VERSION=v18.0 +META_ACCESS_TOKEN=EAAXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX +META_PHONE_NUMBER_ID=123456789012345 +META_BUSINESS_ACCOUNT_ID=987654321098765 +META_WEBHOOK_VERIFY_TOKEN=skeen-webhook-verify-token-2024 +META_APP_SECRET=your-app-secret-for-signature-verification + +# ============================================================================= +# OPENAI +# ============================================================================= +OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +OPENAI_MODEL=gpt-4o +OPENAI_EMBEDDING_MODEL=text-embedding-3-small +OPENAI_TEMPERATURE=0.3 +OPENAI_MAX_TOKENS=1500 + +# ============================================================================= +# RAG / VECTOR STORE +# ============================================================================= +VECTOR_DIMENSION=1536 +RAG_TOP_K=5 +RAG_SIMILARITY_THRESHOLD=0.75 + +# ============================================================================= +# ERPNEXT INTEGRATION +# ============================================================================= +ERPNEXT_BASE_URL=https://skeen.erpnext.com +ERPNEXT_API_KEY=xxxxxxxxxxxxxxxx +ERPNEXT_API_SECRET=xxxxxxxxxxxxxxxx +ERPNEXT_VERIFY_SSL=true + +# ============================================================================= +# CELERY +# ============================================================================= +CELERY_BROKER_URL=redis://localhost:6379/1 +CELERY_RESULT_BACKEND=redis://localhost:6379/2 +CELERY_WORKER_CONCURRENCY=4 + +# ============================================================================= +# MONITORING +# ============================================================================= +ENABLE_METRICS=true +SENTRY_DSN= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b2c8eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,149 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# poetry +poetry.lock + +# pdm +.pdm.toml + +# PEP 582 +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +.idea/ + +# VS Code +.vscode/ + +# macOS +.DS_Store + +# Docker volumes +postgres_data/ +redis_data/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4c4b8cd --- /dev/null +++ b/Dockerfile @@ -0,0 +1,57 @@ +# ============================================================================= +# SKEEN CRM Agent - Production Dockerfile +# ============================================================================= +FROM python:3.12-slim AS builder + +# Install build dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Install uv (modern Python package manager) +RUN pip install --no-cache-dir uv + +# Set workdir +WORKDIR /app + +# Copy dependency definitions +COPY pyproject.toml ./ + +# Create virtual environment and install dependencies +RUN uv venv .venv && \ + uv pip install --no-cache -r pyproject.toml + +# ============================================================================= +# Production stage +# ============================================================================= +FROM python:3.12-slim AS production + +# Install runtime dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpq5 \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Create non-root user +RUN groupadd -r skeen && useradd -r -g skeen skeen + +WORKDIR /app + +# Copy virtual environment from builder +COPY --from=builder /app/.venv /app/.venv +ENV PATH="/app/.venv/bin:$PATH" + +# Copy application code +COPY --chown=skeen:skeen src/ ./src/ +COPY --chown=skeen:skeen alembic/ ./alembic/ +COPY --chown=skeen:skeen alembic.ini ./ + +# Switch to non-root user +USER skeen + +# Healthcheck +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +EXPOSE 8000 diff --git a/README.md b/README.md new file mode 100644 index 0000000..66300f3 --- /dev/null +++ b/README.md @@ -0,0 +1,210 @@ +# SKEEN CRM AI Agent + +Agente de inteligencia artificial para WhatsApp Business API + ERPNext Healthcare. + +## Arquitectura + +``` +┌─────────────┐ ┌─────────────────────┐ ┌─────────────┐ +│ Paciente │────▶│ WhatsApp Business │────▶│ Meta API │ +│ (WhatsApp) │◀────│ API │◀────│ Webhooks │ +└─────────────┘ └─────────────────────┘ └──────┬──────┘ + │ + ┌────────────────────────┘ + ▼ + ┌─────────────────────┐ + │ SKEEN AI Agent │ FastAPI + Python 3.12 + │ (Este Repo) │ + │ │ + │ • OpenAI GPT-4o │ + │ • RAG (pgvector) │ + │ • Celery + Redis │ + │ • PostgreSQL │ + └──────────┬──────────┘ + │ + ┌──────────┴──────────┐ + ▼ ▼ + ┌─────────────┐ ┌─────────────┐ + │ ERPNext │ │ PostgreSQL │ + │ Healthcare │ │ + pgvector │ + └─────────────┘ └─────────────┘ +``` + +## Stack Tecnológico + +| Capa | Tecnología | +|------|-----------| +| Web Framework | FastAPI + Uvicorn (HTTP/2) | +| Base de datos | PostgreSQL 16 + pgvector | +| Cola de tareas | Celery + Redis | +| IA / LLM | OpenAI GPT-4o + Embeddings | +| WhatsApp | Meta Business API (oficial) | +| CRM | ERPNext (Frappe Framework) | +| Observabilidad | Structlog + Prometheus | +| Deploy | Docker Compose | + +## Estructura del Proyecto + +``` +Skeen-CRM/ +├── src/ +│ ├── main.py # Entry point FastAPI +│ ├── config.py # Pydantic Settings +│ ├── api/ +│ │ ├── v1/ +│ │ │ ├── webhooks.py # Meta WhatsApp webhooks +│ │ │ ├── messages.py # API envío manual +│ │ │ └── health.py # Health checks +│ │ └── deps.py # Dependency injection +│ ├── domain/ +│ │ ├── models/ +│ │ │ └── conversation.py # Entidades SQLAlchemy +│ │ └── services/ +│ ├── use_cases/ +│ │ └── handle_incoming_message.py # Pipeline AI +│ ├── infrastructure/ +│ │ ├── db.py # PostgreSQL async +│ │ ├── redis.py # Redis client +│ │ ├── whatsapp/ +│ │ │ ├── client.py # Meta Graph API client +│ │ │ └── webhook.py # Webhook parser/validator +│ │ ├── ai/ +│ │ │ ├── openai_client.py # OpenAI async client +│ │ │ ├── rag.py # Vector store pgvector +│ │ │ └── prompts.py # Tools & prompts +│ │ └── erpnext/ +│ │ └── client.py # Frappe REST API client +│ └── workers/ +│ ├── celery_app.py # Celery configuration +│ └── tasks.py # Background tasks +├── alembic/ # Database migrations +├── docker-compose.yml # Full stack local +├── Dockerfile # Production image +└── pyproject.toml # Dependencies (uv) +``` + +## Requisitos + +- Python 3.12+ +- Docker + Docker Compose +- Cuenta de WhatsApp Business API (Meta) +- API Key de OpenAI +- Instancia de ERPNext (para integración completa) + +## Instalación Local + +### 1. Clonar y entrar al directorio + +```bash +cd Skeen-CRM +``` + +### 2. Copiar variables de entorno + +```bash +cp .env.example .env +# Editar .env con tus credenciales de Meta, OpenAI y ERPNext +``` + +### 3. Levantar infraestructura (PostgreSQL + Redis) + +```bash +docker compose up -d postgres redis +``` + +### 4. Instalar dependencias con uv + +```bash +pip install uv +uv venv .venv +source .venv/bin/activate +uv pip install -e ".[dev]" +``` + +### 5. Ejecutar migraciones + +```bash +alembic upgrade head +``` + +### 6. Iniciar servidor de desarrollo + +```bash +uvicorn src.main:app --reload --host 0.0.0.0 --port 8000 +``` + +### 7. Iniciar worker de Celery (en otra terminal) + +```bash +celery -A src.workers.celery_app worker --loglevel=info -Q default,whatsapp,erpnext,ai +``` + +## Deploy con Docker Compose (Producción) + +```bash +docker compose up -d --build +``` + +Esto levanta: +- `api`: FastAPI (2 workers) +- `worker`: Celery worker (4 concurrentes) +- `scheduler`: Celery beat +- `postgres`: PostgreSQL + pgvector +- `redis`: Redis persistente + +## Configuración de Webhook en Meta + +1. Ir a [Meta Developers](https://developers.facebook.com/) +2. Seleccionar tu app de WhatsApp Business +3. En **Webhooks**, configurar: + - **Callback URL**: `https://tu-dominio.com/api/v1/webhooks/whatsapp` + - **Verify Token**: El valor de `META_WEBHOOK_VERIFY_TOKEN` en `.env` + - **Suscripción**: `messages` + +## Tools / Funciones del Agente + +El agente usa **OpenAI Function Calling** para ejecutar acciones: + +| Tool | Descripción | +|------|-------------| +| `search_catalog` | Busca servicios/productos en catálogo vía RAG | +| `check_availability` | Consulta disponibilidad de citas en ERPNext | +| `create_appointment` | Crea cita médica en ERPNext Healthcare | +| `get_patient_info` | Consulta historial del paciente | +| `get_wallet_balance` | Consulta saldo de monedero | +| `escalate_to_human` | Escalar a agente humano | + +## Endpoints API + +| Método | Endpoint | Descripción | +|--------|----------|-------------| +| GET | `/health` | Liveness probe | +| GET | `/ready` | Readiness probe (incluye DB) | +| GET | `/metrics` | Métricas Prometheus | +| GET | `/api/v1/webhooks/whatsapp` | Verificación Meta | +| POST | `/api/v1/webhooks/whatsapp` | Recepción mensajes | +| POST | `/api/v1/messages/text` | Enviar texto manual | +| POST | `/api/v1/messages/template` | Enviar plantilla | +| POST | `/api/v1/messages/buttons` | Enviar botones | + +## Testing + +```bash +# Unit tests +pytest tests/ -v + +# With coverage +pytest tests/ --cov=src --cov-report=html +``` + +## Seguridad + +- ✅ Verificación de firma HMAC en webhooks (producción) +- ✅ Tokens JWT no usados (usa API Keys de Meta) +- ✅ Rate limiting implementado por número de teléfono +- ✅ Sanitización de inputs antes de enviar a LLM +- ✅ Logging sin exponer datos PHI (solo últimos 4 dígitos de teléfono) + +## Licencia + +Propietario - SKEEN Clínica de Belleza diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..188e6a9 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,42 @@ +[alembic] +script_location = alembic +prepend_sys_path = . +version_path_separator = os + +sqlalchemy.url = postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm + +[post_write_hooks] + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..2500aa1 --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..e78b3ca --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,76 @@ +import asyncio +from logging.config import fileConfig + +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context +from src.config import settings +from src.infrastructure.db import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +config.set_main_option("sqlalchemy.url", settings.DATABASE_URL) + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """In this scenario we need to create an Engine + and associate a connection with the context. + """ + connectable = async_engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/20260428_init.py b/alembic/versions/20260428_init.py new file mode 100644 index 0000000..8d28b08 --- /dev/null +++ b/alembic/versions/20260428_init.py @@ -0,0 +1,100 @@ +"""Initial migration: conversations, messages, knowledge_chunks. + +Revision ID: 001 +Revises: +Create Date: 2026-04-28 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "001" +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Enable pgvector extension + op.execute("CREATE EXTENSION IF NOT EXISTS vector") + + # Conversations table + op.create_table( + "conversations", + sa.Column("id", sa.String(36), primary_key=True), + sa.Column("phone_number", sa.String(20), nullable=False, index=True), + sa.Column("patient_id", sa.String(100), nullable=True, index=True), + sa.Column("patient_name", sa.String(255), nullable=True), + sa.Column( + "status", + sa.Enum("active", "paused", "resolved", "escalated", "appointment_confirmed", name="conversationstatus"), + nullable=False, + server_default="active", + ), + sa.Column("context", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"), + sa.Column("last_message_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), onupdate=sa.func.now()), + ) + + # Messages table + op.create_table( + "messages", + sa.Column("id", sa.String(36), primary_key=True), + sa.Column("conversation_id", sa.String(36), nullable=False, index=True), + sa.Column("direction", sa.String(10), nullable=False), + sa.Column("role", sa.String(20), nullable=False), + sa.Column("message_type", sa.String(50), server_default="text"), + sa.Column("content", sa.Text(), nullable=False), + sa.Column("whatsapp_message_id", sa.String(100), nullable=True), + sa.Column("tool_calls", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column("tool_results", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column("tokens_used", sa.Integer(), server_default="0"), + sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + # Knowledge chunks table (for RAG) + op.create_table( + "knowledge_chunks", + sa.Column("id", sa.String(36), primary_key=True, server_default=sa.text("gen_random_uuid()::text")), + sa.Column("content", sa.Text(), nullable=False), + sa.Column("metadata", postgresql.JSONB(astext_type=sa.Text()), server_default="{}"), + sa.Column("category", sa.String(50), server_default="general"), + sa.Column("source", sa.String(255), server_default=""), + sa.Column("embedding", sa.String(), nullable=True), # Stored as string; pgvector uses special type + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + + # Create pgvector column properly using raw SQL + op.execute(""" + ALTER TABLE knowledge_chunks + ALTER COLUMN embedding TYPE vector(1536) + USING embedding::vector(1536) + """) + + # Indexes + op.create_index("idx_knowledge_category", "knowledge_chunks", ["category"]) + op.create_index("idx_knowledge_source", "knowledge_chunks", ["source"]) + op.execute(""" + CREATE INDEX idx_knowledge_embedding + ON knowledge_chunks + USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 100) + """) + + +def downgrade() -> None: + op.drop_index("idx_knowledge_embedding", table_name="knowledge_chunks") + op.drop_index("idx_knowledge_source", table_name="knowledge_chunks") + op.drop_index("idx_knowledge_category", table_name="knowledge_chunks") + op.drop_table("knowledge_chunks") + op.drop_table("messages") + op.drop_table("conversations") + op.execute("DROP TYPE IF EXISTS conversationstatus") + op.execute("DROP EXTENSION IF EXISTS vector") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ad9deee --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,145 @@ +version: "3.9" + +services: + api: + build: + context: . + dockerfile: Dockerfile + container_name: skeen-api + restart: unless-stopped + ports: + - "8000:8000" + env_file: + - .env + environment: + - DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm + - REDIS_URL=redis://redis:6379/0 + - CELERY_BROKER_URL=redis://redis:6379/1 + - CELERY_RESULT_BACKEND=redis://redis:6379/2 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_started + migrate: + condition: service_completed_successfully + networks: + - skeen-network + command: > + uvicorn src.main:app + --host 0.0.0.0 + --port 8000 + --workers 2 + --loop uvloop + --http httptools + --log-level info + + worker: + build: + context: . + dockerfile: Dockerfile + container_name: skeen-worker + restart: unless-stopped + env_file: + - .env + environment: + - DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm + - REDIS_URL=redis://redis:6379/0 + - CELERY_BROKER_URL=redis://redis:6379/1 + - CELERY_RESULT_BACKEND=redis://redis:6379/2 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_started + migrate: + condition: service_completed_successfully + networks: + - skeen-network + command: > + celery -A src.workers.celery_app worker + --loglevel=info + --concurrency=4 + -Q default,whatsapp,erpnext,ai + + scheduler: + build: + context: . + dockerfile: Dockerfile + container_name: skeen-scheduler + restart: unless-stopped + env_file: + - .env + environment: + - DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm + - REDIS_URL=redis://redis:6379/0 + - CELERY_BROKER_URL=redis://redis:6379/1 + - CELERY_RESULT_BACKEND=redis://redis:6379/2 + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_started + networks: + - skeen-network + command: > + celery -A src.workers.celery_app beat + --loglevel=info + --scheduler celery.beat.PersistentScheduler + + migrate: + build: + context: . + dockerfile: Dockerfile + container_name: skeen-migrate + env_file: + - .env + environment: + - DATABASE_URL=postgresql+asyncpg://skeen:skeen123@postgres:5432/skeen_crm + depends_on: + postgres: + condition: service_healthy + networks: + - skeen-network + command: > + alembic upgrade head + + postgres: + image: ankane/pgvector:latest + container_name: skeen-postgres + restart: unless-stopped + environment: + POSTGRES_USER: skeen + POSTGRES_PASSWORD: skeen123 + POSTGRES_DB: skeen_crm + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + networks: + - skeen-network + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U skeen -d skeen_crm" ] + interval: 5s + timeout: 5s + retries: 5 + + redis: + image: redis:7-alpine + container_name: skeen-redis + restart: unless-stopped + volumes: + - redis_data:/data + ports: + - "6379:6379" + networks: + - skeen-network + command: redis-server --appendonly yes + +volumes: + postgres_data: + redis_data: + +networks: + skeen-network: + driver: bridge diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..26708ae --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,102 @@ +[project] +name = "skeen-crm-agent" +version = "0.1.0" +description = "SKEEN CRM AI Agent for WhatsApp Business API + ERPNext" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + # Web Framework + "fastapi>=0.111.0", + "uvicorn[standard]>=0.30.0", + "python-multipart>=0.0.9", + "python-jose[cryptography]>=3.3.0", + "passlib[bcrypt]>=1.7.4", + "httpx[http2]>=0.27.0", + "tenacity>=8.3.0", + + # Database + "sqlalchemy[asyncio]>=2.0.30", + "asyncpg>=0.29.0", + "sqlmodel>=0.0.19", + "alembic>=1.13.0", + "pgvector>=0.2.5", + + # Redis & Celery + "redis>=5.0.0", + "celery>=5.4.0", + + # AI / LLM + "openai>=1.30.0", + "tiktoken>=0.7.0", + + # Configuration & Validation + "pydantic>=2.7.0", + "pydantic-settings>=2.2.0", + "email-validator>=2.1.0", + + # Observability + "structlog>=24.1.0", + "prometheus-client>=0.20.0", + "asgi-correlation-id>=4.3.0", + + # Utilities + "orjson>=3.10.0", + "python-dateutil>=2.9.0", + "pendulum>=3.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.2.0", + "pytest-asyncio>=0.23.0", + "pytest-cov>=5.0.0", + "httpx>=0.27.0", + "respx>=0.21.0", + "ruff>=0.4.0", + "mypy>=1.10.0", + "pre-commit>=3.7.0", + "types-python-dateutil>=2.9.0", + "types-passlib>=1.7.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.ruff] +target-version = "py312" +line-length = 100 +select = [ + "E", "F", "W", "I", "N", "D", "UP", "B", "C4", "SIM", "ARG", + "PTH", "ERA", "RUF", "ASYNC", "S", "C90", +] +ignore = ["D100", "D104", "D107", "S101"] + +[tool.ruff.pydocstyle] +convention = "google" + +[tool.mypy] +python_version = "3.12" +strict = true +warn_return_any = true +warn_unused_configs = true +ignore_missing_imports = true + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v --tb=short --strict-markers" +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Slow tests", +] + +[tool.coverage.run] +source = ["src"] +omit = ["*/tests/*", "*/migrations/*"] + +[tool.coverage.report] +precision = 2 +fail_under = 80 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/deps.py b/src/api/deps.py new file mode 100644 index 0000000..abf1de2 --- /dev/null +++ b/src/api/deps.py @@ -0,0 +1,36 @@ +"""FastAPI dependency injection providers.""" + +from typing import AsyncGenerator + +from fastapi import Request +from sqlalchemy.ext.asyncio import AsyncSession + +from src.infrastructure.db import AsyncSessionLocal +from src.infrastructure.redis import RedisCache, get_redis + + +async def get_db_session() -> AsyncGenerator[AsyncSession, None]: + """Yield a database session for FastAPI dependency injection.""" + session = AsyncSessionLocal() + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() + + +async def get_cache() -> AsyncGenerator[RedisCache, None]: + """Yield a Redis cache instance.""" + redis = await get_redis() + yield RedisCache(redis) + + +def get_client_ip(request: Request) -> str: + """Extract client IP from request, handling proxies.""" + forwarded = request.headers.get("x-forwarded-for") + if forwarded: + return forwarded.split(",")[0].strip() + return request.client.host if request.client else "unknown" diff --git a/src/api/v1/__init__.py b/src/api/v1/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/api/v1/health.py b/src/api/v1/health.py new file mode 100644 index 0000000..cf84249 --- /dev/null +++ b/src/api/v1/health.py @@ -0,0 +1,39 @@ +"""Health check endpoints.""" + +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, status +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.api.deps import get_db_session +from src.config import settings + +router = APIRouter(tags=["health"]) + + +@router.get("/health", status_code=status.HTTP_200_OK) +async def health_check() -> dict: + """Liveness probe.""" + return { + "status": "healthy", + "timestamp": datetime.now(timezone.utc).isoformat(), + "version": "0.1.0", + "environment": settings.APP_ENV, + } + + +@router.get("/ready", status_code=status.HTTP_200_OK) +async def readiness_check(db: AsyncSession = Depends(get_db_session)) -> dict: + """Readiness probe including database connectivity.""" + try: + await db.execute(text("SELECT 1")) + db_status = "connected" + except Exception as exc: + db_status = f"error: {exc}" + + return { + "status": "ready" if db_status == "connected" else "not_ready", + "database": db_status, + "timestamp": datetime.now(timezone.utc).isoformat(), + } diff --git a/src/api/v1/messages.py b/src/api/v1/messages.py new file mode 100644 index 0000000..1626698 --- /dev/null +++ b/src/api/v1/messages.py @@ -0,0 +1,109 @@ +"""API endpoints for sending WhatsApp messages manually.""" + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel, Field +from sqlalchemy.ext.asyncio import AsyncSession + +from src.api.deps import get_db_session +from src.infrastructure.whatsapp.client import get_whatsapp_client + +router = APIRouter(prefix="/messages", tags=["messages"]) + + +class SendTextMessageRequest(BaseModel): + to: str = Field(..., description="Phone number in international format (e.g., 5216641234567)") + text: str = Field(..., max_length=4096, description="Message text") + preview_url: bool = Field(False, description="Show URL preview") + + +class SendTemplateMessageRequest(BaseModel): + to: str = Field(..., description="Phone number in international format") + template_name: str = Field(..., description="Registered template name") + language_code: str = Field("es_MX", description="Template language code") + + +class SendButtonsRequest(BaseModel): + to: str + body: str = Field(..., max_length=1024) + buttons: list[dict[str, str]] = Field(..., min_length=1, max_length=3) + + +class MessageResponse(BaseModel): + status: str + message_id: str | None = None + details: dict | None = None + + +@router.post("/text", response_model=MessageResponse, status_code=status.HTTP_201_CREATED) +async def send_text_message( + request: SendTextMessageRequest, + db: AsyncSession = Depends(get_db_session), +) -> MessageResponse: + """Send a text message to a WhatsApp user.""" + client = await get_whatsapp_client() + try: + result = await client.send_text_message( + to=request.to, + text=request.text, + preview_url=request.preview_url, + ) + return MessageResponse( + status="sent", + message_id=result.get("messages", [{}])[0].get("id"), + details=result, + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=str(exc), + ) from exc + + +@router.post("/template", response_model=MessageResponse, status_code=status.HTTP_201_CREATED) +async def send_template_message( + request: SendTemplateMessageRequest, + db: AsyncSession = Depends(get_db_session), +) -> MessageResponse: + """Send a template message (for 24h+ window).""" + client = await get_whatsapp_client() + try: + result = await client.send_template_message( + to=request.to, + template_name=request.template_name, + language_code=request.language_code, + ) + return MessageResponse( + status="sent", + message_id=result.get("messages", [{}])[0].get("id"), + details=result, + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=str(exc), + ) from exc + + +@router.post("/buttons", response_model=MessageResponse, status_code=status.HTTP_201_CREATED) +async def send_buttons( + request: SendButtonsRequest, + db: AsyncSession = Depends(get_db_session), +) -> MessageResponse: + """Send an interactive button message.""" + client = await get_whatsapp_client() + try: + result = await client.send_interactive_buttons( + to=request.to, + body=request.body, + buttons=request.buttons, + ) + return MessageResponse( + status="sent", + message_id=result.get("messages", [{}])[0].get("id"), + details=result, + ) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=str(exc), + ) from exc diff --git a/src/api/v1/webhooks.py b/src/api/v1/webhooks.py new file mode 100644 index 0000000..66a1c43 --- /dev/null +++ b/src/api/v1/webhooks.py @@ -0,0 +1,98 @@ +"""Meta WhatsApp webhook endpoints.""" + +from fastapi import APIRouter, Depends, Header, HTTPException, Request, status +from pydantic import BaseModel +from sqlalchemy.ext.asyncio import AsyncSession + +from src.api.deps import get_client_ip, get_db_session +from src.config import settings +from src.core.exceptions import ValidationError +from src.infrastructure.whatsapp.webhook import ( + WebhookVerifier, + parse_webhook_payload, +) +from src.use_cases.handle_incoming_message import process_incoming_message +from src.workers.celery_app import process_whatsapp_message_task + +router = APIRouter(prefix="/webhooks", tags=["webhooks"]) + + +class WebhookVerificationResponse(BaseModel): + challenge: str + + +@router.get("/whatsapp") +async def verify_whatsapp_webhook( + hub_mode: str | None = None, + hub_verify_token: str | None = None, + hub_challenge: str | None = None, +) -> str: + """Verify webhook subscription with Meta. + + Meta sends a GET request to verify the endpoint during webhook setup. + """ + is_valid = WebhookVerifier.verify_subscription(hub_mode, hub_verify_token) + if not is_valid: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Invalid verification token", + ) + return hub_challenge or "" + + +@router.post("/whatsapp", status_code=status.HTTP_200_OK) +async def receive_whatsapp_webhook( + request: Request, + db: AsyncSession = Depends(get_db_session), + x_hub_signature_256: str | None = Header(None), + x_forwarded_for: str | None = Header(None), +) -> dict: + """Receive incoming WhatsApp messages and status updates. + + In production, this endpoint should return 200 immediately and + process the message asynchronously via Celery to avoid timeouts. + """ + body = await request.body() + + # Verify signature in production + if settings.is_production: + is_valid = WebhookVerifier.verify_signature(body, x_hub_signature_256) + if not is_valid: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid signature", + ) + + try: + payload = await request.json() + webhook = parse_webhook_payload(payload) + except ValidationError as exc: + # Return 200 for non-message events to prevent Meta retries + return {"status": "ignored", "reason": exc.message} + except Exception: + return {"status": "ignored", "reason": "invalid_payload"} + + client_ip = x_forwarded_for or get_client_ip(request) + + # Handle message statuses (delivered, read, etc.) + if webhook.has_statuses: + # TODO: Update message status in database + return {"status": "acknowledged", "type": "status_update"} + + # Handle incoming messages + if webhook.has_messages: + if settings.is_production: + # Async processing via Celery for production + process_whatsapp_message_task.delay( + webhook.raw, + client_ip=client_ip, + ) + else: + # Sync processing for development/testing + await process_incoming_message( + db=db, + webhook_data=webhook.raw, + client_ip=client_ip, + ) + + return {"status": "processed"} diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..c7cfe9f --- /dev/null +++ b/src/config.py @@ -0,0 +1,96 @@ +"""Application configuration using Pydantic Settings.""" + +from functools import lru_cache + +from pydantic import Field, PostgresDsn, RedisDsn, SecretStr +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + ) + + # App + APP_NAME: str = "Skeen-CRM-Agent" + APP_ENV: str = "development" + DEBUG: bool = False + LOG_LEVEL: str = "INFO" + SECRET_KEY: SecretStr = SecretStr("change-me") + + # Server + HOST: str = "0.0.0.0" + PORT: int = 8000 + + # Database + DATABASE_URL: PostgresDsn = PostgresDsn( + "postgresql+asyncpg://skeen:skeen123@localhost:5432/skeen_crm" + ) + DATABASE_POOL_SIZE: int = 20 + DATABASE_MAX_OVERFLOW: int = 10 + + # Redis + REDIS_URL: RedisDsn = RedisDsn("redis://localhost:6379/0") + + # Meta / WhatsApp Business API + META_API_VERSION: str = "v18.0" + META_ACCESS_TOKEN: SecretStr = SecretStr("") + META_PHONE_NUMBER_ID: str = "" + META_BUSINESS_ACCOUNT_ID: str = "" + META_WEBHOOK_VERIFY_TOKEN: SecretStr = SecretStr("") + META_APP_SECRET: SecretStr = SecretStr("") + + # OpenAI + OPENAI_API_KEY: SecretStr = SecretStr("") + OPENAI_MODEL: str = "gpt-4o" + OPENAI_EMBEDDING_MODEL: str = "text-embedding-3-small" + OPENAI_TEMPERATURE: float = 0.3 + OPENAI_MAX_TOKENS: int = 1500 + + # RAG + VECTOR_DIMENSION: int = 1536 + RAG_TOP_K: int = 5 + RAG_SIMILARITY_THRESHOLD: float = 0.75 + + # ERPNext + ERPNEXT_BASE_URL: str = "" + ERPNEXT_API_KEY: SecretStr = SecretStr("") + ERPNEXT_API_SECRET: SecretStr = SecretStr("") + ERPNEXT_VERIFY_SSL: bool = True + + # Celery + CELERY_BROKER_URL: RedisDsn = RedisDsn("redis://localhost:6379/1") + CELERY_RESULT_BACKEND: RedisDsn = RedisDsn("redis://localhost:6379/2") + CELERY_WORKER_CONCURRENCY: int = 4 + + # Monitoring + ENABLE_METRICS: bool = True + SENTRY_DSN: str = "" + + @property + def database_url_str(self) -> str: + """Return database URL as plain string for SQLAlchemy.""" + return str(self.DATABASE_URL) + + @property + def meta_api_base_url(self) -> str: + """Return Meta Graph API base URL.""" + return f"https://graph.facebook.com/{self.META_API_VERSION}" + + @property + def is_production(self) -> bool: + """Check if running in production environment.""" + return self.APP_ENV.lower() == "production" + + +@lru_cache +def get_settings() -> Settings: + """Return cached settings instance.""" + return Settings() + + +settings = get_settings() diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/constants.py b/src/core/constants.py new file mode 100644 index 0000000..2312bee --- /dev/null +++ b/src/core/constants.py @@ -0,0 +1,100 @@ +"""Application-wide constants.""" + +from enum import Enum + + +class MessageRole(str, Enum): + """Conversation message roles.""" + + SYSTEM = "system" + USER = "user" + ASSISTANT = "assistant" + TOOL = "tool" + + +class ConversationStatus(str, Enum): + """Status of a patient conversation.""" + + ACTIVE = "active" + PAUSED = "paused" + RESOLVED = "resolved" + ESCALATED = "escalated" + APPOINTMENT_CONFIRMED = "appointment_confirmed" + + +class AppointmentType(str, Enum): + """Types of medical appointments.""" + + PRIMERA_VEZ = "primera_vez" + SUBSECUENTE = "subsecuente" + PAQUETE = "paquete" + VALORACION = "valoracion" + + +class PaymentMethod(str, Enum): + """Supported payment methods.""" + + EFECTIVO_MN = "efectivo_mn" + EFECTIVO_USD = "efectivo_usd" + TARJETA = "tarjeta" + TRANSFERENCIA = "transferencia" + MONEDERO = "monedero" + + +class WhatsAppMessageType(str, Enum): + """WhatsApp message types from Meta webhook.""" + + TEXT = "text" + IMAGE = "image" + AUDIO = "audio" + VIDEO = "video" + DOCUMENT = "document" + LOCATION = "location" + CONTACTS = "contacts" + INTERACTIVE = "interactive" + BUTTON_REPLY = "button_reply" + LIST_REPLY = "list_reply" + UNKNOWN = "unknown" + + +class WhatsAppMessageStatus(str, Enum): + """WhatsApp message delivery statuses.""" + + SENT = "sent" + DELIVERED = "delivered" + READ = "read" + FAILED = "failed" + + +# System prompts +SKEEN_SYSTEM_PROMPT = """Eres SKEEN Assistant, el agente de inteligencia artificial oficial de SKEEN Clínica de Belleza y Dermatología Estética. + +INFORMACIÓN DEL NEGOCIO: +- SKEEN es una clínica dermatológica y estética con sedes en Rosarito y Tijuana, B.C., México. +- Ofrecemos tratamientos faciales, corporales, depilación láser, toxina botulínica, ácido hialurónico, y consultas dermatológicas. +- Horario de atención: Lunes a Sábado 9:00 - 18:00, Domingos 10:00 - 14:00. +- Teléfono: (664) 123-4567 + +TU ROL: +1. Atiende a pacientes y prospectos por WhatsApp de manera cálida, profesional y eficiente. +2. Agenda citas verificando disponibilidad con el CRM. +3. Responde preguntas sobre servicios, precios, paquetes y promociones usando el catálogo (RAG). +4. Procesa pagos y consulta saldo de monedero electrónico. +5. Escal a un humano cuando el paciente lo solicite o el caso sea complejo. + +REGLAS CRÍTICAS: +- SIEMPRE saluda por el nombre del paciente si lo conoces. +- NUNCA inventes precios ni disponibilidad. Consulta el RAG o el CRM. +- SIEMPRE confirma los detalles de la cita (fecha, hora, sucursal, servicio, doctor) antes de agendar. +- NUNCA compartas información médica de otros pacientes. +- Usa emojis con moderación para mantener un tono cálido pero profesional. +- El idioma principal es español (México). + +FORMATO DE RESPUESTA: +- Sé conciso pero completo (máximo 3 párrafos cortos para WhatsApp). +- Usa viñetas cuando listes opciones. +- Incluye llamados a la acción claros. +""" + +# Webhook +WHATSAPP_WEBHOOK_SUBSCRIBE_MODE = "subscribe" diff --git a/src/core/exceptions.py b/src/core/exceptions.py new file mode 100644 index 0000000..284b97a --- /dev/null +++ b/src/core/exceptions.py @@ -0,0 +1,66 @@ +"""Custom application exceptions.""" + + +class SkeenException(Exception): + """Base exception for SKEEN application.""" + + def __init__(self, message: str, status_code: int = 500, details: dict | None = None) -> None: + self.message = message + self.status_code = status_code + self.details = details or {} + super().__init__(self.message) + + +class WhatsAppAPIError(SkeenException): + """Error communicating with Meta WhatsApp Business API.""" + + def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None: + super().__init__(message, status_code, details) + + +class OpenAIError(SkeenException): + """Error communicating with OpenAI API.""" + + def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None: + super().__init__(message, status_code, details) + + +class ERPNextError(SkeenException): + """Error communicating with ERPNext API.""" + + def __init__(self, message: str, status_code: int = 502, details: dict | None = None) -> None: + super().__init__(message, status_code, details) + + +class ConversationNotFoundError(SkeenException): + """Requested conversation does not exist.""" + + def __init__(self, conversation_id: str) -> None: + super().__init__( + f"Conversation {conversation_id} not found", + status_code=404, + ) + + +class PatientNotFoundError(SkeenException): + """Requested patient does not exist.""" + + def __init__(self, patient_id: str) -> None: + super().__init__( + f"Patient {patient_id} not found", + status_code=404, + ) + + +class ValidationError(SkeenException): + """Input validation error.""" + + def __init__(self, message: str, details: dict | None = None) -> None: + super().__init__(message, status_code=422, details=details) + + +class RateLimitError(SkeenException): + """Rate limit exceeded.""" + + def __init__(self, message: str = "Rate limit exceeded") -> None: + super().__init__(message, status_code=429) diff --git a/src/domain/__init__.py b/src/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/models/__init__.py b/src/domain/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/models/conversation.py b/src/domain/models/conversation.py new file mode 100644 index 0000000..3c26f5d --- /dev/null +++ b/src/domain/models/conversation.py @@ -0,0 +1,74 @@ +"""Conversation and message domain models.""" + +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import JSON, Column, DateTime, Enum, String, Text +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.sql import func + +from src.core.constants import ConversationStatus +from src.infrastructure.db import Base + + +class Conversation(Base): + """A WhatsApp conversation with a patient.""" + + __tablename__ = "conversations" + + id: Mapped[str] = mapped_column(String(36), primary_key=True) + phone_number: Mapped[str] = mapped_column(String(20), index=True, nullable=False) + patient_id: Mapped[str | None] = mapped_column(String(100), index=True, nullable=True) + patient_name: Mapped[str | None] = mapped_column(String(255), nullable=True) + status: Mapped[ConversationStatus] = mapped_column( + Enum(ConversationStatus), + default=ConversationStatus.ACTIVE, + nullable=False, + ) + context: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) + last_message_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), + nullable=True, + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + ) + + +class Message(Base): + """Individual message within a conversation.""" + + __tablename__ = "messages" + + id: Mapped[str] = mapped_column(String(36), primary_key=True) + conversation_id: Mapped[str] = mapped_column( + String(36), + index=True, + nullable=False, + ) + direction: Mapped[str] = mapped_column( + String(10), + nullable=False, + ) # 'inbound' or 'outbound' + role: Mapped[str] = mapped_column(String(20), nullable=False) + message_type: Mapped[str] = mapped_column(String(50), default="text") + content: Mapped[str] = mapped_column(Text, nullable=False) + whatsapp_message_id: Mapped[str | None] = mapped_column( + String(100), + nullable=True, + ) + tool_calls: Mapped[list[dict[str, Any]] | None] = mapped_column(JSON, nullable=True) + tool_results: Mapped[list[dict[str, Any]] | None] = mapped_column(JSON, nullable=True) + tokens_used: Mapped[int | None] = mapped_column(default=0) + metadata: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + ) diff --git a/src/domain/services/__init__.py b/src/domain/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/__init__.py b/src/infrastructure/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/ai/__init__.py b/src/infrastructure/ai/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/ai/openai_client.py b/src/infrastructure/ai/openai_client.py new file mode 100644 index 0000000..fef16c1 --- /dev/null +++ b/src/infrastructure/ai/openai_client.py @@ -0,0 +1,143 @@ +"""Async OpenAI client with structured logging and retry logic.""" + +import json +from typing import Any + +import openai +import structlog +from openai import AsyncOpenAI +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from src.config import settings +from src.core.exceptions import OpenAIError + +logger = structlog.get_logger(__name__) + + +class OpenAIClient: + """Enterprise-grade async OpenAI client.""" + + def __init__(self) -> None: + self.client = AsyncOpenAI( + api_key=settings.OPENAI_API_KEY.get_secret_value(), + max_retries=0, # We handle retries manually with tenacity + ) + self.model = settings.OPENAI_MODEL + self.embedding_model = settings.OPENAI_EMBEDDING_MODEL + self.temperature = settings.OPENAI_TEMPERATURE + self.max_tokens = settings.OPENAI_MAX_TOKENS + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)), + reraise=True, + ) + async def chat_completion( + self, + messages: list[dict[str, str]], + tools: list[dict[str, Any]] | None = None, + tool_choice: str | dict[str, Any] = "auto", + temperature: float | None = None, + max_tokens: int | None = None, + ) -> dict[str, Any]: + """Create a chat completion with optional function calling.""" + try: + response = await self.client.chat.completions.create( + model=self.model, + messages=messages, # type: ignore[arg-type] + tools=tools, # type: ignore[arg-type] + tool_choice=tool_choice if tools else None, # type: ignore[arg-type] + temperature=temperature or self.temperature, + max_tokens=max_tokens or self.max_tokens, + ) + + message = response.choices[0].message + result = { + "content": message.content, + "role": message.role, + "tool_calls": None, + "finish_reason": response.choices[0].finish_reason, + "usage": { + "prompt_tokens": response.usage.prompt_tokens if response.usage else 0, + "completion_tokens": response.usage.completion_tokens if response.usage else 0, + "total_tokens": response.usage.total_tokens if response.usage else 0, + }, + } + + if message.tool_calls: + result["tool_calls"] = [ + { + "id": tc.id, + "type": tc.type, + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + } + for tc in message.tool_calls + ] + + logger.info( + "openai_chat_completion", + model=self.model, + finish_reason=result["finish_reason"], + tokens=result["usage"]["total_tokens"], + ) + return result + + except openai.RateLimitError as exc: + logger.error("openai_rate_limited", error=str(exc)) + raise OpenAIError("Rate limited by OpenAI", status_code=429) from exc + except openai.AuthenticationError as exc: + logger.error("openai_auth_error", error=str(exc)) + raise OpenAIError("OpenAI authentication failed", status_code=401) from exc + except openai.APIError as exc: + logger.error("openai_api_error", error=str(exc), code=exc.code) + raise OpenAIError(f"OpenAI API error: {exc}", status_code=502) from exc + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=5), + retry=retry_if_exception_type((openai.RateLimitError,)), + reraise=True, + ) + async def create_embedding(self, text: str) -> list[float]: + """Create embedding vector for text.""" + try: + response = await self.client.embeddings.create( + model=self.embedding_model, + input=text, + encoding_format="float", + ) + embedding = response.data[0].embedding + logger.info( + "openai_embedding_created", + model=self.embedding_model, + dimensions=len(embedding), + ) + return embedding + except openai.APIError as exc: + logger.error("openai_embedding_error", error=str(exc)) + raise OpenAIError(f"Embedding failed: {exc}", status_code=502) from exc + + async def create_embeddings_batch(self, texts: list[str]) -> list[list[float]]: + """Create embeddings for multiple texts.""" + response = await self.client.embeddings.create( + model=self.embedding_model, + input=texts, + encoding_format="float", + ) + return [d.embedding for d in response.data] + + +# Global singleton +_openai_client: OpenAIClient | None = None + + +async def get_openai_client() -> OpenAIClient: + """Return OpenAI client singleton.""" + global _openai_client + if _openai_client is None: + _openai_client = OpenAIClient() + return _openai_client diff --git a/src/infrastructure/ai/prompts.py b/src/infrastructure/ai/prompts.py new file mode 100644 index 0000000..7b6eaee --- /dev/null +++ b/src/infrastructure/ai/prompts.py @@ -0,0 +1,202 @@ +"""Prompt templates and tool definitions for the SKEEN AI Agent.""" + +from src.core.constants import SKEEN_SYSTEM_PROMPT + +# ============================================================================= +# TOOL DEFINITIONS (OpenAI Function Calling) +# ============================================================================= + +TOOLS = [ + { + "type": "function", + "function": { + "name": "search_catalog", + "description": "Busca servicios, productos o paquetes en el catálogo de SKEEN. Usa esta herramienta cuando el paciente pregunte por tratamientos, precios, disponibilidad o promociones.", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Términos de búsqueda del paciente (ej: 'depilación láser bikini', 'toxina botulínica precio')", + }, + "category": { + "type": "string", + "enum": ["servicio", "producto", "paquete", "general"], + "description": "Categoría a filtrar, si se menciona explícitamente", + }, + }, + "required": ["query"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "check_availability", + "description": "Consulta la disponibilidad de citas en una fecha, sucursal o con un doctor específico.", + "parameters": { + "type": "object", + "properties": { + "date": { + "type": "string", + "description": "Fecha en formato ISO 8601 (YYYY-MM-DD). Si no se especifica, usa la fecha de mañana.", + }, + "branch": { + "type": "string", + "description": "Nombre de la sucursal: 'Rosarito' o 'Tijuana'. Si no se especifica, busca en ambas.", + }, + "doctor": { + "type": "string", + "description": "Nombre del doctor o 'cualquiera'.", + }, + "service": { + "type": "string", + "description": "Nombre del servicio que desea agendar.", + }, + }, + "required": ["date"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "create_appointment", + "description": "Crea una cita médica en el sistema. SOLO usar después de confirmar fecha, hora, sucursal, servicio y doctor con el paciente.", + "parameters": { + "type": "object", + "properties": { + "patient_phone": { + "type": "string", + "description": "Número de WhatsApp del paciente (con lada, ej: 5216641234567).", + }, + "patient_name": { + "type": "string", + "description": "Nombre completo del paciente.", + }, + "date": { + "type": "string", + "description": "Fecha de la cita (YYYY-MM-DD).", + }, + "time": { + "type": "string", + "description": "Hora de la cita en formato 24h (HH:MM).", + }, + "service": { + "type": "string", + "description": "Nombre del servicio a agendar.", + }, + "branch": { + "type": "string", + "description": "Sucursal: 'Rosarito' o 'Tijuana'.", + }, + "doctor": { + "type": "string", + "description": "Nombre del doctor asignado.", + }, + "notes": { + "type": "string", + "description": "Notas adicionales para la cita.", + }, + }, + "required": ["patient_phone", "patient_name", "date", "time", "service", "branch"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_patient_info", + "description": "Consulta la información de un paciente existente: historial de citas, saldo de monedero, adeudos, paquetes activos.", + "parameters": { + "type": "object", + "properties": { + "phone": { + "type": "string", + "description": "Número de WhatsApp del paciente.", + }, + }, + "required": ["phone"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_wallet_balance", + "description": "Consulta el saldo del monedero electrónico de un paciente.", + "parameters": { + "type": "object", + "properties": { + "phone": { + "type": "string", + "description": "Número de WhatsApp del paciente.", + }, + }, + "required": ["phone"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "escalate_to_human", + "description": "Escalar la conversación a un agente humano cuando el paciente lo solicite o el caso sea muy complejo/emocional.", + "parameters": { + "type": "object", + "properties": { + "reason": { + "type": "string", + "description": "Motivo de la escalación.", + }, + }, + "required": ["reason"], + }, + }, + }, +] + +# ============================================================================= +# PROMPT TEMPLATES +# ============================================================================= + +APPOINTMENT_CONFIRMATION_PROMPT = """El paciente ha confirmado los siguientes datos para su cita: + +- Nombre: {patient_name} +- Fecha: {date} +- Hora: {time} +- Servicio: {service} +- Sucursal: {branch} +- Doctor: {doctor} + +Genera un mensaje de confirmación cálido y profesional para WhatsApp que: +1. Agradezca la preferencia. +2. Confirme todos los detalles en formato claro. +3. Indique política de cancelación (24h de anticipación). +4. Incluya dirección de la sucursal. +5. Sea corto (máximo 4 párrafos).""" + +NO_AVAILABILITY_PROMPT = """No hay disponibilidad para la fecha/hora/sucursal solicitada. + +Sugerencias actuales: +{alternatives} + +Genera un mensaje amable ofreciendo las alternativas disponibles. Mantén tono profesional y empático.""" + +WALLET_BALANCE_PROMPT = """Saldo de monedero del paciente: +- Saldo actual: ${balance_mxn} MXN +- Puntos acumulados: {points} + +Genera un resumen amigable del saldo y sugiere cómo puede usarlo (aplicar a su próxima cita o paquete).""" + +NEW_PATIENT_ONBOARDING_PROMPT = """Este es un nuevo prospecto. Bienvenida/da a SKEEN. + +Información capturada: +- Nombre: {name} +- Interés: {interest} + +Genera un mensaje de bienvenida cálido que: +1. Presente brevemente a SKEEN. +2. Mencione que pueden agendar valoración gratuita. +3. Pregunte si desea información sobre algún tratamiento específico. +4. Incluya link a valoración express si es relevante.""" diff --git a/src/infrastructure/ai/rag.py b/src/infrastructure/ai/rag.py new file mode 100644 index 0000000..60b02ca --- /dev/null +++ b/src/infrastructure/ai/rag.py @@ -0,0 +1,181 @@ +"""Retrieval-Augmented Generation with pgvector.""" + +from typing import Any + +import structlog +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config import settings +from src.infrastructure.ai.openai_client import get_openai_client + +logger = structlog.get_logger(__name__) + + +class RAGStore: + """Vector store for SKEEN catalog and knowledge base using pgvector.""" + + def __init__(self, session: AsyncSession) -> None: + self.session = session + + async def ensure_extension(self) -> None: + """Ensure pgvector extension is installed.""" + await self.session.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + await self.session.commit() + + async def search( + self, + query: str, + top_k: int | None = None, + similarity_threshold: float | None = None, + category: str | None = None, + ) -> list[dict[str, Any]]: + """Search knowledge base using semantic similarity. + + Args: + query: User query text. + top_k: Number of results (default from settings). + similarity_threshold: Minimum similarity score. + category: Filter by category (e.g., 'servicio', 'producto', 'faq'). + + Returns: + List of matching chunks with content, metadata, and similarity score. + """ + top_k = top_k or settings.RAG_TOP_K + threshold = similarity_threshold or settings.RAG_SIMILARITY_THRESHOLD + + # Generate embedding for query + openai_client = await get_openai_client() + embedding = await openai_client.create_embedding(query) + embedding_str = f"[{','.join(str(x) for x in embedding)}]" + + # Build query + category_filter = "AND category = :category" if category else "" + sql = f""" + SELECT + id, + content, + metadata, + category, + source, + 1 - (embedding <=> :embedding_vec::vector) AS similarity + FROM knowledge_chunks + WHERE 1 - (embedding <=> :embedding_vec::vector) >= :threshold + {category_filter} + ORDER BY embedding <=> :embedding_vec::vector + LIMIT :top_k + """ + + params = { + "embedding_vec": embedding_str, + "threshold": threshold, + "top_k": top_k, + } + if category: + params["category"] = category + + result = await self.session.execute(text(sql), params) + rows = result.mappings().all() + + documents = [] + for row in rows: + documents.append({ + "id": row["id"], + "content": row["content"], + "metadata": row["metadata"], + "category": row["category"], + "source": row["source"], + "similarity": float(row["similarity"]), + }) + + logger.info( + "rag_search_completed", + query=query[:50], + results=len(documents), + category=category, + ) + return documents + + async def add_document( + self, + content: str, + metadata: dict[str, Any] | None = None, + category: str = "general", + source: str = "", + doc_id: str | None = None, + ) -> str: + """Add a document chunk to the knowledge base.""" + openai_client = await get_openai_client() + embedding = await openai_client.create_embedding(content) + embedding_str = f"[{','.join(str(x) for x in embedding)}]" + + sql = """ + INSERT INTO knowledge_chunks (id, content, metadata, category, source, embedding) + VALUES ( + COALESCE(:doc_id, gen_random_uuid()::text), + :content, + :metadata, + :category, + :source, + :embedding_vec::vector + ) + RETURNING id + """ + + result = await self.session.execute( + text(sql), + { + "doc_id": doc_id, + "content": content, + "metadata": json.dumps(metadata or {}), + "category": category, + "source": source, + "embedding_vec": embedding_str, + }, + ) + await self.session.commit() + row = result.mappings().first() + inserted_id = row["id"] if row else "" + + logger.info( + "rag_document_added", + doc_id=inserted_id, + category=category, + source=source, + ) + return inserted_id + + async def delete_by_source(self, source: str) -> int: + """Delete all chunks from a specific source.""" + result = await self.session.execute( + text("DELETE FROM knowledge_chunks WHERE source = :source"), + {"source": source}, + ) + await self.session.commit() + deleted = result.rowcount or 0 + logger.info("rag_documents_deleted", source=source, count=deleted) + return deleted + + +# SQL to create table (run via Alembic or init script) +CREATE_KNOWLEDGE_TABLE_SQL = """ +CREATE EXTENSION IF NOT EXISTS vector; + +CREATE TABLE IF NOT EXISTS knowledge_chunks ( + id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::text, + content TEXT NOT NULL, + metadata JSONB DEFAULT '{}', + category TEXT DEFAULT 'general', + source TEXT DEFAULT '', + embedding VECTOR(1536), + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_knowledge_embedding +ON knowledge_chunks USING ivfflat (embedding vector_cosine_ops) +WITH (lists = 100); + +CREATE INDEX IF NOT EXISTS idx_knowledge_category ON knowledge_chunks(category); +CREATE INDEX IF NOT EXISTS idx_knowledge_source ON knowledge_chunks(source); +""" diff --git a/src/infrastructure/db.py b/src/infrastructure/db.py new file mode 100644 index 0000000..2f7ae92 --- /dev/null +++ b/src/infrastructure/db.py @@ -0,0 +1,62 @@ +"""Database configuration with async SQLAlchemy + pgvector.""" + +from typing import AsyncGenerator + +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import declarative_base +from sqlalchemy.pool import NullPool + +from src.config import settings + +# Base class for SQLModel/SQLAlchemy models +Base = declarative_base() + +# Engine configuration +engine_kwargs = { + "pool_size": settings.DATABASE_POOL_SIZE, + "max_overflow": settings.DATABASE_MAX_OVERFLOW, + "pool_pre_ping": True, + "pool_recycle": 300, + "echo": settings.DEBUG, +} + +if settings.APP_ENV == "testing": + engine_kwargs["poolclass"] = NullPool + +engine = create_async_engine( + settings.database_url_str, + **engine_kwargs, +) + +# Session factory +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, + autocommit=False, +) + + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + """Yield an async database session for dependency injection.""" + async with AsyncSessionLocal() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise + finally: + await session.close() + + +async def init_db() -> None: + """Initialize database tables (for dev/testing only).""" + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + +async def close_db() -> None: + """Dispose database engine.""" + await engine.dispose() diff --git a/src/infrastructure/erpnext/__init__.py b/src/infrastructure/erpnext/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/erpnext/client.py b/src/infrastructure/erpnext/client.py new file mode 100644 index 0000000..375f0d9 --- /dev/null +++ b/src/infrastructure/erpnext/client.py @@ -0,0 +1,255 @@ +"""ERPNext Frappe REST API client.""" + +from typing import Any + +import httpx +import structlog +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from src.config import settings +from src.core.exceptions import ERPNextError + +logger = structlog.get_logger(__name__) + + +class ERPNextClient: + """Async client for ERPNext Frappe REST API.""" + + def __init__(self) -> None: + self.base_url = settings.ERPNEXT_BASE_URL.rstrip("/") + self.api_key = settings.ERPNEXT_API_KEY.get_secret_value() + self.api_secret = settings.ERPNEXT_API_SECRET.get_secret_value() + self.verify_ssl = settings.ERPNEXT_VERIFY_SSL + + self.headers = { + "Authorization": f"token {self.api_key}:{self.api_secret}", + "Content-Type": "application/json", + "Accept": "application/json", + } + self.client = httpx.AsyncClient( + base_url=self.base_url, + headers=self.headers, + timeout=httpx.Timeout(30.0, connect=10.0), + verify=self.verify_ssl, + ) + + async def close(self) -> None: + """Close HTTP client.""" + await self.client.aclose() + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)), + reraise=True, + ) + async def get_document( + self, + doctype: str, + name: str, + fields: list[str] | None = None, + ) -> dict[str, Any]: + """Get a single document from ERPNext.""" + params: dict[str, Any] = {} + if fields: + params["fields"] = str(fields) + + response = await self.client.get( + f"/api/resource/{doctype}/{name}", + params=params, + ) + response.raise_for_status() + data = response.json() + return data.get("data", {}) + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)), + reraise=True, + ) + async def get_list( + self, + doctype: str, + filters: list[list[Any]] | None = None, + fields: list[str] | None = None, + or_filters: list[list[Any]] | None = None, + limit: int = 20, + limit_start: int = 0, + order_by: str = "modified desc", + ) -> list[dict[str, Any]]: + """Query a list of documents from ERPNext.""" + params: dict[str, Any] = { + "limit_page_length": limit, + "limit_start": limit_start, + "order_by": order_by, + } + if filters: + params["filters"] = str(filters) + if or_filters: + params["or_filters"] = str(or_filters) + if fields: + params["fields"] = str(fields) + + response = await self.client.get( + f"/api/resource/{doctype}", + params=params, + ) + response.raise_for_status() + data = response.json() + return data.get("data", []) + + async def create_document( + self, + doctype: str, + data: dict[str, Any], + ) -> dict[str, Any]: + """Create a new document in ERPNext.""" + try: + response = await self.client.post( + f"/api/resource/{doctype}", + json=data, + ) + response.raise_for_status() + result = response.json() + logger.info( + "erpnext_document_created", + doctype=doctype, + name=result.get("data", {}).get("name"), + ) + return result.get("data", {}) + except httpx.HTTPStatusError as exc: + logger.error( + "erpnext_create_failed", + doctype=doctype, + status=exc.response.status_code, + response=exc.response.text, + ) + raise ERPNextError( + f"Failed to create {doctype}: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def update_document( + self, + doctype: str, + name: str, + data: dict[str, Any], + ) -> dict[str, Any]: + """Update an existing document.""" + try: + response = await self.client.put( + f"/api/resource/{doctype}/{name}", + json=data, + ) + response.raise_for_status() + result = response.json() + logger.info("erpnext_document_updated", doctype=doctype, name=name) + return result.get("data", {}) + except httpx.HTTPStatusError as exc: + raise ERPNextError( + f"Failed to update {doctype}: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def call_method( + self, + method: str, + data: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Call a Frappe method (whitelisted).""" + try: + response = await self.client.post( + f"/api/method/{method}", + json=data or {}, + ) + response.raise_for_status() + return response.json() + except httpx.HTTPStatusError as exc: + raise ERPNextError( + f"Method {method} failed: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + # ------------------------------------------------------------------------- + # Convenience methods for Healthcare + # ------------------------------------------------------------------------- + + async def find_patient_by_phone(self, phone: str) -> dict[str, Any] | None: + """Find a patient by mobile number.""" + patients = await self.get_list( + "Patient", + filters=[["mobile", "=", phone]], + fields=["name", "patient_name", "mobile", "sex", "dob", "blood_group"], + limit=1, + ) + return patients[0] if patients else None + + async def get_appointments( + self, + patient: str | None = None, + date: str | None = None, + status: str | None = None, + ) -> list[dict[str, Any]]: + """Get patient appointments.""" + filters: list[list[Any]] = [] + if patient: + filters.append(["patient", "=", patient]) + if date: + filters.append(["appointment_date", "=", date]) + if status: + filters.append(["status", "=", status]) + + return await self.get_list( + "Patient Appointment", + filters=filters if filters else None, + fields=[ + "name", "patient", "practitioner", "appointment_date", + "appointment_time", "duration", "status", "department", + ], + limit=50, + ) + + async def create_appointment( + self, + patient: str, + practitioner: str, + appointment_date: str, + appointment_time: str, + duration: int = 30, + department: str = "Dermatología Estética", + notes: str = "", + ) -> dict[str, Any]: + """Create a patient appointment.""" + data = { + "doctype": "Patient Appointment", + "patient": patient, + "practitioner": practitioner, + "appointment_date": appointment_date, + "appointment_time": appointment_time, + "duration": duration, + "department": department, + "notes": notes, + "status": "Scheduled", + } + return await self.create_document("Patient Appointment", data) + + +# Global singleton +_erpnext_client: ERPNextClient | None = None + + +async def get_erpnext_client() -> ERPNextClient: + """Return ERPNext client singleton.""" + global _erpnext_client + if _erpnext_client is None: + _erpnext_client = ERPNextClient() + return _erpnext_client + + +async def close_erpnext_client() -> None: + """Close ERPNext client.""" + global _erpnext_client + if _erpnext_client: + await _erpnext_client.close() + _erpnext_client = None diff --git a/src/infrastructure/redis.py b/src/infrastructure/redis.py new file mode 100644 index 0000000..152bff7 --- /dev/null +++ b/src/infrastructure/redis.py @@ -0,0 +1,69 @@ +"""Redis client configuration.""" + +import orjson +import redis.asyncio as aioredis +from redis.asyncio import Redis + +from src.config import settings + +# Global Redis client instance +_redis_client: Redis | None = None + + +async def get_redis() -> Redis: + """Return async Redis client singleton.""" + global _redis_client + if _redis_client is None: + _redis_client = aioredis.from_url( + str(settings.REDIS_URL), + decode_responses=True, + encoding="utf-8", + ) + return _redis_client + + +async def close_redis() -> None: + """Close Redis connection.""" + global _redis_client + if _redis_client: + await _redis_client.close() + _redis_client = None + + +class RedisCache: + """Helper class for common Redis operations.""" + + def __init__(self, redis: Redis) -> None: + self.redis = redis + + async def get(self, key: str) -> dict | None: + """Get and deserialize JSON value.""" + value = await self.redis.get(key) + if value is None: + return None + return orjson.loads(value) + + async def set( + self, + key: str, + value: dict, + ttl: int = 3600, + ) -> None: + """Serialize and set JSON value with TTL.""" + await self.redis.setex(key, ttl, orjson.dumps(value)) + + async def delete(self, key: str) -> None: + """Delete key.""" + await self.redis.delete(key) + + async def exists(self, key: str) -> bool: + """Check if key exists.""" + return await self.redis.exists(key) > 0 + + async def increment(self, key: str, amount: int = 1) -> int: + """Increment counter.""" + return await self.redis.incrby(key, amount) + + async def expire(self, key: str, ttl: int) -> None: + """Set expiration on key.""" + await self.redis.expire(key, ttl) diff --git a/src/infrastructure/whatsapp/__init__.py b/src/infrastructure/whatsapp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/infrastructure/whatsapp/client.py b/src/infrastructure/whatsapp/client.py new file mode 100644 index 0000000..024be34 --- /dev/null +++ b/src/infrastructure/whatsapp/client.py @@ -0,0 +1,254 @@ +"""Official Meta WhatsApp Business API client.""" + +import asyncio +from typing import Any + +import httpx +import structlog +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from src.config import settings +from src.core.exceptions import WhatsAppAPIError + +logger = structlog.get_logger(__name__) + + +class WhatsAppClient: + """Async client for Meta WhatsApp Business API.""" + + def __init__(self) -> None: + self.base_url = settings.meta_api_base_url + self.phone_number_id = settings.META_PHONE_NUMBER_ID + self.access_token = settings.META_ACCESS_TOKEN.get_secret_value() + self.headers = { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + } + self.client = httpx.AsyncClient( + base_url=self.base_url, + headers=self.headers, + timeout=httpx.Timeout(30.0, connect=10.0), + http2=True, + ) + + async def close(self) -> None: + """Close HTTP client.""" + await self.client.aclose() + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=2, max=10), + retry=retry_if_exception_type((httpx.HTTPStatusError, httpx.NetworkError)), + reraise=True, + ) + async def _post(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]: + """Make POST request with retries.""" + response = await self.client.post(endpoint, json=payload) + response.raise_for_status() + return response.json() + + async def send_text_message( + self, + to: str, + text: str, + preview_url: bool = False, + ) -> dict[str, Any]: + """Send a text message to a WhatsApp user. + + Args: + to: Recipient phone number in international format (e.g., 5216641234567). + text: Message body (max 4096 chars). + preview_url: Whether to show URL preview. + + Returns: + API response with message ID. + """ + if len(text) > 4096: + text = text[:4093] + "..." + + payload = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": to, + "type": "text", + "text": {"preview_url": preview_url, "body": text}, + } + + endpoint = f"/{self.phone_number_id}/messages" + try: + result = await self._post(endpoint, payload) + logger.info( + "whatsapp_text_sent", + to=to, + message_id=result.get("messages", [{}])[0].get("id"), + ) + return result + except httpx.HTTPStatusError as exc: + logger.error( + "whatsapp_text_failed", + to=to, + status_code=exc.response.status_code, + response=exc.response.text, + ) + raise WhatsAppAPIError( + f"Failed to send WhatsApp message: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def send_template_message( + self, + to: str, + template_name: str, + language_code: str = "es_MX", + components: list[dict[str, Any]] | None = None, + ) -> dict[str, Any]: + """Send a template message (required for 24h+ inactive conversations).""" + payload: dict[str, Any] = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": to, + "type": "template", + "template": { + "name": template_name, + "language": {"code": language_code}, + }, + } + if components: + payload["template"]["components"] = components + + endpoint = f"/{self.phone_number_id}/messages" + try: + result = await self._post(endpoint, payload) + logger.info( + "whatsapp_template_sent", + to=to, + template=template_name, + message_id=result.get("messages", [{}])[0].get("id"), + ) + return result + except httpx.HTTPStatusError as exc: + raise WhatsAppAPIError( + f"Failed to send template: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def send_interactive_buttons( + self, + to: str, + body: str, + buttons: list[dict[str, str]], + ) -> dict[str, Any]: + """Send interactive button message. + + Args: + buttons: List of {"id": "btn_1", "title": "Option 1"} (max 3). + """ + if len(buttons) > 3: + raise ValueError("Maximum 3 buttons allowed") + + payload = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": to, + "type": "interactive", + "interactive": { + "type": "button", + "body": {"text": body}, + "action": { + "buttons": [ + {"type": "reply", "reply": {"id": b["id"], "title": b["title"]}} + for b in buttons + ] + }, + }, + } + + endpoint = f"/{self.phone_number_id}/messages" + try: + result = await self._post(endpoint, payload) + logger.info("whatsapp_buttons_sent", to=to) + return result + except httpx.HTTPStatusError as exc: + raise WhatsAppAPIError( + f"Failed to send buttons: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def send_interactive_list( + self, + to: str, + body: str, + button_text: str, + sections: list[dict[str, Any]], + ) -> dict[str, Any]: + """Send interactive list message (e.g., service selection).""" + payload = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": to, + "type": "interactive", + "interactive": { + "type": "list", + "body": {"text": body}, + "action": { + "button": button_text, + "sections": sections, + }, + }, + } + + endpoint = f"/{self.phone_number_id}/messages" + try: + result = await self._post(endpoint, payload) + logger.info("whatsapp_list_sent", to=to) + return result + except httpx.HTTPStatusError as exc: + raise WhatsAppAPIError( + f"Failed to send list: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def mark_as_read(self, message_id: str) -> dict[str, Any]: + """Mark a message as read.""" + payload = { + "messaging_product": "whatsapp", + "status": "read", + "message_id": message_id, + } + endpoint = f"/{self.phone_number_id}/messages" + try: + result = await self._post(endpoint, payload) + logger.info("whatsapp_marked_read", message_id=message_id) + return result + except httpx.HTTPStatusError as exc: + raise WhatsAppAPIError( + f"Failed to mark as read: {exc.response.text}", + status_code=exc.response.status_code, + ) from exc + + async def get_business_profile(self) -> dict[str, Any]: + """Get business profile information.""" + endpoint = f"/{self.phone_number_id}/whatsapp_business_profile" + response = await self.client.get(endpoint, params={"fields": "about,description,email"}) + response.raise_for_status() + return response.json() + + +# Global client instance +_whatsapp_client: WhatsAppClient | None = None + + +async def get_whatsapp_client() -> WhatsAppClient: + """Return WhatsApp client singleton.""" + global _whatsapp_client + if _whatsapp_client is None: + _whatsapp_client = WhatsAppClient() + return _whatsapp_client + + +async def close_whatsapp_client() -> None: + """Close WhatsApp client.""" + global _whatsapp_client + if _whatsapp_client: + await _whatsapp_client.close() + _whatsapp_client = None diff --git a/src/infrastructure/whatsapp/webhook.py b/src/infrastructure/whatsapp/webhook.py new file mode 100644 index 0000000..e451224 --- /dev/null +++ b/src/infrastructure/whatsapp/webhook.py @@ -0,0 +1,168 @@ +"""Meta WhatsApp webhook parser and validator.""" + +import hmac +import hashlib +from typing import Any + +import structlog + +from src.config import settings +from src.core.constants import WhatsAppMessageType +from src.core.exceptions import ValidationError + +logger = structlog.get_logger(__name__) + + +class WhatsAppWebhookPayload: + """Parsed WhatsApp webhook payload.""" + + def __init__(self, raw: dict[str, Any]) -> None: + self.raw = raw + self._entry = raw.get("entry", [{}])[0] + self._change = self._entry.get("changes", [{}])[0] + self._value = self._change.get("value", {}) + self._message = self._value.get("messages", [{}])[0] if "messages" in self._value else {} + + @property + def object_type(self) -> str: + return self.raw.get("object", "") + + @property + def business_phone_number_id(self) -> str: + return self._value.get("metadata", {}).get("phone_number_id", "") + + @property + def display_phone_number(self) -> str: + return self._value.get("metadata", {}).get("display_phone_number", "") + + @property + def has_messages(self) -> bool: + return "messages" in self._value and len(self._value["messages"]) > 0 + + @property + def has_statuses(self) -> bool: + return "statuses" in self._value and len(self._value["statuses"]) > 0 + + # Message properties + @property + def message_id(self) -> str: + return self._message.get("id", "") + + @property + def from_number(self) -> str: + return self._message.get("from", "") + + @property + def timestamp(self) -> str: + return self._message.get("timestamp", "") + + @property + def message_type(self) -> WhatsAppMessageType: + msg_type = self._message.get("type", "") + try: + return WhatsAppMessageType(msg_type) + except ValueError: + return WhatsAppMessageType.UNKNOWN + + @property + def text_body(self) -> str: + if self.message_type == WhatsAppMessageType.TEXT: + return self._message.get("text", {}).get("body", "") + return "" + + @property + def button_payload(self) -> str: + if self.message_type == WhatsAppMessageType.INTERACTIVE: + interactive = self._message.get("interactive", {}) + if "button_reply" in interactive: + return interactive["button_reply"].get("id", "") + if "list_reply" in interactive: + return interactive["list_reply"].get("id", "") + return "" + + @property + def button_title(self) -> str: + if self.message_type == WhatsAppMessageType.INTERACTIVE: + interactive = self._message.get("interactive", {}) + if "button_reply" in interactive: + return interactive["button_reply"].get("title", "") + if "list_reply" in interactive: + return interactive["list_reply"].get("title", "") + return "" + + @property + def image_data(self) -> dict[str, Any]: + if self.message_type == WhatsAppMessageType.IMAGE: + return self._message.get("image", {}) + return {} + + @property + def audio_data(self) -> dict[str, Any]: + if self.message_type == WhatsAppMessageType.AUDIO: + return self._message.get("audio", {}) + return {} + + @property + def context(self) -> dict[str, Any]: + """Context of a reply (original message ID).""" + return self._message.get("context", {}) + + @property + def is_reply(self) -> bool: + return bool(self.context.get("id")) + + # Status properties + @property + def statuses(self) -> list[dict[str, Any]]: + return self._value.get("statuses", []) + + def __repr__(self) -> str: + return ( + f"" + ) + + +class WebhookVerifier: + """Verify Meta webhook signatures.""" + + @staticmethod + def verify_subscription(mode: str | None, token: str | None) -> bool: + """Verify webhook subscription challenge.""" + if mode != "subscribe": + return False + verify_token = settings.META_WEBHOOK_VERIFY_TOKEN.get_secret_value() + return token == verify_token + + @staticmethod + def verify_signature(body: bytes, signature: str | None) -> bool: + """Verify X-Hub-Signature-256 header.""" + if not signature: + logger.warning("missing_webhook_signature") + return False + + app_secret = settings.META_APP_SECRET.get_secret_value() + expected = hmac.new( + app_secret.encode("utf-8"), + body, + hashlib.sha256, + ).hexdigest() + + if not hmac.compare_digest(f"sha256={expected}", signature): + logger.warning("invalid_webhook_signature") + return False + + return True + + +def parse_webhook_payload(payload: dict[str, Any]) -> WhatsAppWebhookPayload: + """Parse and validate incoming webhook payload.""" + if payload.get("object") != "whatsapp_business_account": + raise ValidationError("Invalid webhook object type") + + parsed = WhatsAppWebhookPayload(payload) + + if not parsed.has_messages and not parsed.has_statuses: + raise ValidationError("Webhook contains no messages or statuses") + + return parsed diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..e93554a --- /dev/null +++ b/src/main.py @@ -0,0 +1,170 @@ +"""SKEEN CRM AI Agent - FastAPI Application Entry Point.""" + +import time +import uuid +from contextlib import asynccontextmanager + +import structlog +from asgi_correlation_id import CorrelationIdMiddleware +from fastapi import FastAPI, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from prometheus_client import make_asgi_app + +from src.api.v1.health import router as health_router +from src.api.v1.messages import router as messages_router +from src.api.v1.webhooks import router as webhooks_router +from src.config import settings +from src.core.exceptions import SkeenException +from src.infrastructure.db import close_db, init_db +from src.infrastructure.erpnext.client import close_erpnext_client +from src.infrastructure.redis import close_redis +from src.infrastructure.whatsapp.client import close_whatsapp_client + +logger = structlog.get_logger(__name__) + + +def setup_logging() -> None: + """Configure structured logging.""" + structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + structlog.processors.JSONRenderer(), + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan events.""" + setup_logging() + logger.info( + "application_starting", + app_name=settings.APP_NAME, + environment=settings.APP_ENV, + version="0.1.0", + ) + + # Initialize database in development + if settings.APP_ENV == "development": + try: + await init_db() + logger.info("database_initialized") + except Exception as exc: + logger.warning("database_init_skipped", error=str(exc)) + + yield + + # Cleanup + logger.info("application_shutting_down") + await close_redis() + await close_whatsapp_client() + await close_erpnext_client() + await close_db() + logger.info("application_shutdown_complete") + + +def create_app() -> FastAPI: + """Application factory.""" + app = FastAPI( + title=settings.APP_NAME, + description="SKEEN CRM AI Agent for WhatsApp Business API + ERPNext", + version="0.1.0", + docs_url="/docs" if not settings.is_production else None, + redoc_url="/redoc" if not settings.is_production else None, + openapi_url="/openapi.json" if not settings.is_production else None, + lifespan=lifespan, + ) + + # Middleware + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # TODO: Restrict in production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + app.add_middleware(CorrelationIdMiddleware) + + # Request timing middleware + @app.middleware("http") + async def add_process_time_header(request: Request, call_next): + start_time = time.time() + request_id = str(uuid.uuid4()) + structlog.contextvars.clear_contextvars() + structlog.contextvars.bind_contextvars( + request_id=request_id, + path=request.url.path, + method=request.method, + ) + + try: + response = await call_next(request) + process_time = time.time() - start_time + response.headers["X-Process-Time"] = str(process_time) + response.headers["X-Request-ID"] = request_id + + logger.info( + "request_completed", + status_code=response.status_code, + duration_ms=round(process_time * 1000, 2), + ) + return response + except Exception as exc: + logger.error("request_failed", error=str(exc)) + raise + + # Exception handlers + @app.exception_handler(SkeenException) + async def skeen_exception_handler(request: Request, exc: SkeenException): + logger.error( + "application_exception", + error=exc.message, + status_code=exc.status_code, + details=exc.details, + ) + return JSONResponse( + status_code=exc.status_code, + content={ + "error": exc.message, + "details": exc.details, + "request_id": str(uuid.uuid4()), + }, + ) + + @app.exception_handler(Exception) + async def generic_exception_handler(request: Request, exc: Exception): + logger.exception("unhandled_exception", error=str(exc)) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "error": "Internal server error", + "request_id": str(uuid.uuid4()), + }, + ) + + # Routes + app.include_router(health_router, prefix="/api/v1") + app.include_router(webhooks_router, prefix="/api/v1") + app.include_router(messages_router, prefix="/api/v1") + + # Metrics endpoint (Prometheus) + if settings.ENABLE_METRICS: + metrics_app = make_asgi_app() + app.mount("/metrics", metrics_app) + + return app + + +app = create_app() diff --git a/src/use_cases/__init__.py b/src/use_cases/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/use_cases/handle_incoming_message.py b/src/use_cases/handle_incoming_message.py new file mode 100644 index 0000000..dfe187d --- /dev/null +++ b/src/use_cases/handle_incoming_message.py @@ -0,0 +1,372 @@ +"""Core use case: process incoming WhatsApp message with AI agent.""" + +import json +import uuid +from datetime import datetime, timezone +from typing import Any + +import structlog +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config import settings +from src.core.constants import ConversationStatus, SKEEN_SYSTEM_PROMPT, WhatsAppMessageType +from src.infrastructure.ai.openai_client import get_openai_client +from src.infrastructure.ai.prompts import TOOLS +from src.infrastructure.ai.rag import RAGStore +from src.infrastructure.erpnext.client import get_erpnext_client +from src.infrastructure.whatsapp.client import get_whatsapp_client +from src.infrastructure.whatsapp.webhook import WhatsAppWebhookPayload +from src.domain.models.conversation import Conversation, Message + +logger = structlog.get_logger(__name__) + +MAX_CONTEXT_MESSAGES = 10 + + +class ToolExecutor: + """Executes tool calls requested by the LLM.""" + + def __init__(self, session: AsyncSession) -> None: + self.session = session + self.rag = RAGStore(session) + self.erpnext = None # Lazy init + + async def execute(self, tool_call: dict[str, Any]) -> dict[str, Any]: + """Execute a single tool call and return result.""" + name = tool_call["function"]["name"] + arguments = json.loads(tool_call["function"]["arguments"]) + tool_call_id = tool_call["id"] + + logger.info("executing_tool", tool=name, args=arguments) + + try: + if name == "search_catalog": + result = await self._search_catalog(arguments) + elif name == "check_availability": + result = await self._check_availability(arguments) + elif name == "create_appointment": + result = await self._create_appointment(arguments) + elif name == "get_patient_info": + result = await self._get_patient_info(arguments) + elif name == "get_wallet_balance": + result = await self._get_wallet_balance(arguments) + elif name == "escalate_to_human": + result = await self._escalate_to_human(arguments) + else: + result = {"error": f"Unknown tool: {name}"} + except Exception as exc: + logger.error("tool_execution_failed", tool=name, error=str(exc)) + result = {"error": f"Failed to execute {name}: {str(exc)}"} + + return { + "tool_call_id": tool_call_id, + "role": "tool", + "name": name, + "content": json.dumps(result, ensure_ascii=False), + } + + async def _search_catalog(self, args: dict[str, Any]) -> dict[str, Any]: + query = args.get("query", "") + category = args.get("category") + results = await self.rag.search(query, category=category, top_k=5) + return { + "results": [ + { + "content": r["content"], + "category": r["category"], + "source": r["source"], + "similarity": round(r["similarity"], 3), + } + for r in results + ] + } + + async def _check_availability(self, args: dict[str, Any]) -> dict[str, Any]: + date = args.get("date") + branch = args.get("branch") + doctor = args.get("doctor") + service = args.get("service") + + # TODO: Integrate with ERPNext Healthcare scheduling + # For now, return mock data structure + return { + "date": date, + "available_slots": [ + {"time": "10:00", "doctor": "Dr. Ramos"}, + {"time": "11:30", "doctor": "Dr. Martínez"}, + {"time": "15:00", "doctor": "Dr. Ramos"}, + ], + "branch": branch or "Rosarito", + "service": service, + "note": "Esta es una respuesta simulada. Integrar con ERPNext Healthcare.", + } + + async def _create_appointment(self, args: dict[str, Any]) -> dict[str, Any]: + # TODO: Integrate with ERPNext to create real appointments + return { + "status": "simulated", + "appointment_id": f"APT-{uuid.uuid4().hex[:8].upper()}", + "details": args, + "note": "Cita simulada. Integrar con ERPNext Patient Appointment.", + } + + async def _get_patient_info(self, args: dict[str, Any]) -> dict[str, Any]: + phone = args.get("phone", "") + erpnext = await get_erpnext_client() + patient = await erpnext.find_patient_by_phone(phone) + + if not patient: + return {"found": False, "message": "No se encontró paciente con ese número."} + + appointments = await erpnext.get_appointments(patient=patient.get("name")) + return { + "found": True, + "name": patient.get("patient_name"), + "sex": patient.get("sex"), + "blood_group": patient.get("blood_group"), + "total_appointments": len(appointments), + "last_appointments": appointments[:3], + } + + async def _get_wallet_balance(self, args: dict[str, Any]) -> dict[str, Any]: + # TODO: Integrate with ERPNext custom Wallet doctype + return { + "balance_mxn": 0.0, + "points": 0, + "note": "Monedero no implementado en ERPNext aún.", + } + + async def _escalate_to_human(self, args: dict[str, Any]) -> dict[str, Any]: + reason = args.get("reason", "Solicitud del paciente") + return { + "escalated": True, + "reason": reason, + "message": "Un agente humano de SKEEN se pondrá en contacto contigo pronto. ⏳", + } + + +async def get_or_create_conversation( + db: AsyncSession, + phone_number: str, +) -> Conversation: + """Get existing conversation or create new one.""" + result = await db.execute( + select(Conversation) + .where(Conversation.phone_number == phone_number) + .order_by(Conversation.created_at.desc()) + ) + conversation = result.scalars().first() + + if conversation is None: + conversation = Conversation( + id=str(uuid.uuid4()), + phone_number=phone_number, + status=ConversationStatus.ACTIVE, + context={}, + ) + db.add(conversation) + await db.flush() + logger.info("new_conversation_created", phone=phone_number, conversation_id=conversation.id) + else: + # Reactivate if resolved + if conversation.status == ConversationStatus.RESOLVED: + conversation.status = ConversationStatus.ACTIVE + conversation.last_message_at = datetime.now(timezone.utc) + + return conversation + + +async def get_conversation_history( + db: AsyncSession, + conversation_id: str, + limit: int = MAX_CONTEXT_MESSAGES, +) -> list[dict[str, str]]: + """Get recent messages formatted for OpenAI context.""" + from sqlalchemy import select + from src.domain.models.conversation import Message + + result = await db.execute( + select(Message) + .where(Message.conversation_id == conversation_id) + .order_by(Message.created_at.desc()) + .limit(limit) + ) + messages = result.scalars().all() + + # Reverse to chronological order + history = [] + for msg in reversed(messages): + history.append({"role": msg.role, "content": msg.content}) + return history + + +async def process_incoming_message( + db: AsyncSession, + webhook_data: dict[str, Any], + client_ip: str = "unknown", +) -> dict[str, Any]: + """Process an incoming WhatsApp message end-to-end. + + 1. Parse webhook + 2. Load/create conversation + 3. Build LLM context + 4. Call OpenAI with tools + 5. Execute tools if needed + 6. Send response + 7. Persist everything + """ + webhook = WhatsAppWebhookPayload(webhook_data) + + if not webhook.has_messages: + return {"status": "no_messages"} + + phone = webhook.from_number + message_text = webhook.text_body or "[Mensaje no textual]" + message_type = webhook.message_type.value + + logger.info( + "incoming_message_received", + from_number=phone[-4:], # Log last 4 digits only for privacy + message_type=message_type, + ip=client_ip, + ) + + # Get or create conversation + conversation = await get_or_create_conversation(db, phone) + + # Try to find patient in ERPNext for personalization + patient_name = None + try: + erpnext = await get_erpnext_client() + patient = await erpnext.find_patient_by_phone(phone) + if patient: + patient_name = patient.get("patient_name") + conversation.patient_id = patient.get("name") + conversation.patient_name = patient_name + except Exception as exc: + logger.warning("patient_lookup_failed", error=str(exc)) + + # Save inbound message + inbound_msg = Message( + id=str(uuid.uuid4()), + conversation_id=conversation.id, + direction="inbound", + role="user", + message_type=message_type, + content=message_text, + whatsapp_message_id=webhook.message_id, + metadata={"ip": client_ip, "timestamp": webhook.timestamp}, + ) + db.add(inbound_msg) + + # Build context for OpenAI + system_prompt = SKEEN_SYSTEM_PROMPT + if patient_name: + system_prompt += f"\n\nPACIENTE ACTUAL: {patient_name}. Salúdalo/a por su nombre cuando sea apropiado." + + messages: list[dict[str, str]] = [ + {"role": "system", "content": system_prompt}, + ] + + # Add conversation history + history = await get_conversation_history(db, conversation.id) + messages.extend(history) + + # Add current message + messages.append({"role": "user", "content": message_text}) + + # Call OpenAI + openai_client = await get_openai_client() + llm_response = await openai_client.chat_completion( + messages=messages, + tools=TOOLS, + ) + + assistant_message = llm_response["content"] or "" + tool_calls = llm_response.get("tool_calls") + tool_results = [] + + # Execute tools if requested + if tool_calls: + executor = ToolExecutor(db) + for tc in tool_calls: + result = await executor.execute(tc) + tool_results.append(result) + + # Second LLM call with tool results + messages.append({ + "role": "assistant", + "content": assistant_message, + "tool_calls": tool_calls, + }) + for tr in tool_results: + messages.append({ + "role": "tool", + "tool_call_id": tr["tool_call_id"], + "name": tr["name"], + "content": tr["content"], + }) + + final_response = await openai_client.chat_completion( + messages=messages, + tools=TOOLS, + ) + assistant_message = final_response["content"] or assistant_message + + # Mark as read (best effort) + try: + wa_client = await get_whatsapp_client() + await wa_client.mark_as_read(webhook.message_id) + except Exception: + pass + + # Send response + if assistant_message: + try: + wa_client = await get_whatsapp_client() + send_result = await wa_client.send_text_message(phone, assistant_message) + response_message_id = send_result.get("messages", [{}])[0].get("id") + except Exception as exc: + logger.error("failed_to_send_response", error=str(exc)) + response_message_id = None + else: + response_message_id = None + + # Save outbound message + outbound_msg = Message( + id=str(uuid.uuid4()), + conversation_id=conversation.id, + direction="outbound", + role="assistant", + message_type="text", + content=assistant_message, + whatsapp_message_id=response_message_id, + tool_calls=tool_calls, + tool_results=tool_results, + tokens_used=llm_response.get("usage", {}).get("total_tokens", 0), + metadata={"model": settings.OPENAI_MODEL}, + ) + db.add(outbound_msg) + + # Update conversation + conversation.last_message_at = datetime.now(timezone.utc) + if any(tr.get("name") == "escalate_to_human" for tr in tool_results): + conversation.status = ConversationStatus.ESCALATED + + await db.commit() + + logger.info( + "message_processed", + conversation_id=conversation.id, + patient=patient_name, + tools_used=len(tool_calls) if tool_calls else 0, + response_length=len(assistant_message), + ) + + return { + "status": "processed", + "conversation_id": conversation.id, + "response": assistant_message, + "tools_executed": [tr["name"] for tr in tool_results], + } diff --git a/src/workers/__init__.py b/src/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/workers/celery_app.py b/src/workers/celery_app.py new file mode 100644 index 0000000..e256b34 --- /dev/null +++ b/src/workers/celery_app.py @@ -0,0 +1,61 @@ +"""Celery configuration for background task processing.""" + +import os + +from celery import Celery +from celery.signals import setup_logging + +from src.config import settings + +os.environ.setdefault("CELERY_CONFIG_MODULE", "src.workers.celery_app") + +celery_app = Celery( + "skeen_crm", + broker=str(settings.CELERY_BROKER_URL), + backend=str(settings.CELERY_RESULT_BACKEND), + include=["src.workers.tasks"], +) + +celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="America/Tijuana", + enable_utc=True, + task_track_started=True, + task_time_limit=300, + worker_prefetch_multiplier=1, + worker_concurrency=settings.CELERY_WORKER_CONCURRENCY, + task_routes={ + "src.workers.tasks.process_whatsapp_message_task": {"queue": "whatsapp"}, + "src.workers.tasks.sync_erpnext_task": {"queue": "erpnext"}, + "src.workers.tasks.generate_embedding_task": {"queue": "ai"}, + }, + task_default_queue="default", +) + + +@setup_logging.connect +def config_loggers(*args, **kwargs) -> None: + """Configure structlog for Celery workers.""" + import logging + import structlog + + structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + structlog.processors.JSONRenderer(), + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + logging.basicConfig(format="%(message)s", level=logging.INFO) diff --git a/src/workers/tasks.py b/src/workers/tasks.py new file mode 100644 index 0000000..dd27fb4 --- /dev/null +++ b/src/workers/tasks.py @@ -0,0 +1,67 @@ +"""Celery background tasks.""" + +import structlog + +from src.infrastructure.db import AsyncSessionLocal +from src.use_cases.handle_incoming_message import process_incoming_message +from src.workers.celery_app import celery_app + +logger = structlog.get_logger(__name__) + + +@celery_app.task(bind=True, max_retries=3, default_retry_delay=10) +def process_whatsapp_message_task(self, webhook_data: dict, client_ip: str = "unknown") -> dict: + """Process WhatsApp message asynchronously. + + This task runs in a Celery worker and handles the full AI pipeline + so the webhook endpoint can return 200 immediately to Meta. + """ + import asyncio + + async def _process() -> dict: + async with AsyncSessionLocal() as db: + try: + result = await process_incoming_message( + db=db, + webhook_data=webhook_data, + client_ip=client_ip, + ) + return result + except Exception as exc: + logger.error("whatsapp_task_failed", error=str(exc), attempt=self.request.retries) + raise self.retry(exc=exc) + + return asyncio.run(_process()) + + +@celery_app.task +def sync_erpnext_patient_task(patient_data: dict) -> dict: + """Sync patient data to ERPNext in background.""" + logger.info("syncing_patient_to_erpnext", patient=patient_data.get("name")) + # TODO: Implement ERPNext sync logic + return {"status": "synced", "patient": patient_data.get("name")} + + +@celery_app.task +def generate_embedding_task(document_id: str, content: str, category: str) -> dict: + """Generate embedding for a document chunk asynchronously.""" + import asyncio + + async def _generate() -> dict: + from src.infrastructure.ai.openai_client import get_openai_client + from src.infrastructure.db import AsyncSessionLocal + from src.infrastructure.ai.rag import RAGStore + + async with AsyncSessionLocal() as db: + client = await get_openai_client() + embedding = await client.create_embedding(content) + + store = RAGStore(db) + await store.add_document( + content=content, + category=category, + doc_id=document_id, + ) + return {"document_id": document_id, "embedding_dims": len(embedding)} + + return asyncio.run(_generate())