feat(fase2): add Flow API routes and Flow Engine main app

API Gateway:
- Add flows router with CRUD endpoints
- Add activate/deactivate endpoints
- Auto-deactivate other flows for welcome/fallback triggers

Flow Engine:
- Add main.py with FastAPI app
- Add /process endpoint for message handling
- Add SQLAlchemy models (mirrors api-gateway)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude AI
2026-01-29 10:20:35 +00:00
parent 8824f65a62
commit 14a579d5ca
5 changed files with 242 additions and 2 deletions

View File

@@ -0,0 +1,49 @@
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from typing import Optional
from sqlalchemy.orm import Session
from app.models import get_db
from app.engine import FlowEngine
from app.config import get_settings
settings = get_settings()
app = FastAPI(
title="WhatsApp Centralizado - Flow Engine",
version="1.0.0",
)
class ProcessMessageRequest(BaseModel):
conversation_id: str
contact: dict
conversation: dict
message: dict
class ProcessMessageResponse(BaseModel):
handled: bool
flow_id: Optional[str] = None
@app.get("/health")
def health_check():
return {"status": "ok", "service": "flow-engine"}
@app.post("/process", response_model=ProcessMessageResponse)
async def process_message(
request: ProcessMessageRequest,
db: Session = Depends(get_db),
):
"""Process an incoming message through the flow engine"""
engine = FlowEngine(db)
handled = await engine.process_message(
conversation_id=request.conversation_id,
contact=request.contact,
conversation=request.conversation,
message=request.message,
)
return ProcessMessageResponse(handled=handled)

View File

@@ -0,0 +1,58 @@
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import sessionmaker, declarative_base
import enum
import uuid
from datetime import datetime
from app.config import get_settings
settings = get_settings()
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class TriggerType(str, enum.Enum):
WELCOME = "welcome"
KEYWORD = "keyword"
FALLBACK = "fallback"
EVENT = "event"
MANUAL = "manual"
class Flow(Base):
__tablename__ = "flows"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
trigger_type = Column(SQLEnum(TriggerType), nullable=False)
trigger_value = Column(String(255), nullable=True)
nodes = Column(JSONB, default=list)
edges = Column(JSONB, default=list)
variables = Column(JSONB, default=dict)
is_active = Column(Boolean, default=False, nullable=False)
version = Column(Integer, default=1, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
class FlowSession(Base):
__tablename__ = "flow_sessions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True)
flow_id = Column(UUID(as_uuid=True), nullable=False)
current_node_id = Column(String(100), nullable=True)
variables = Column(JSONB, default=dict)
waiting_for_input = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, nullable=False)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()