diff --git a/services/api-gateway/app/main.py b/services/api-gateway/app/main.py index 196fb72..b15f56f 100644 --- a/services/api-gateway/app/main.py +++ b/services/api-gateway/app/main.py @@ -2,7 +2,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.config import get_settings from app.core.database import engine, Base -from app.routers import auth, whatsapp +from app.routers import auth, whatsapp, flows settings = get_settings() @@ -28,6 +28,7 @@ app.add_middleware( # Routers app.include_router(auth.router) app.include_router(whatsapp.router) +app.include_router(flows.router) @app.get("/health") diff --git a/services/api-gateway/app/routers/__init__.py b/services/api-gateway/app/routers/__init__.py index 1f2148e..b1d4d82 100644 --- a/services/api-gateway/app/routers/__init__.py +++ b/services/api-gateway/app/routers/__init__.py @@ -1,4 +1,5 @@ from app.routers.auth import router as auth_router from app.routers.whatsapp import router as whatsapp_router +from app.routers.flows import router as flows_router -__all__ = ["auth_router", "whatsapp_router"] +__all__ = ["auth_router", "whatsapp_router", "flows_router"] diff --git a/services/api-gateway/app/routers/flows.py b/services/api-gateway/app/routers/flows.py new file mode 100644 index 0000000..7311d1c --- /dev/null +++ b/services/api-gateway/app/routers/flows.py @@ -0,0 +1,131 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session +from typing import List +from uuid import UUID +from app.core.database import get_db +from app.core.security import get_current_user +from app.models.user import User, UserRole +from app.models.flow import Flow, TriggerType +from app.schemas.flow import FlowCreate, FlowUpdate, FlowResponse, FlowListResponse + +router = APIRouter(prefix="/api/flows", tags=["flows"]) + + +def require_admin(current_user: User = Depends(get_current_user)): + if current_user.role != UserRole.ADMIN: + raise HTTPException(status_code=403, detail="Admin required") + return current_user + + +@router.get("", response_model=List[FlowListResponse]) +def list_flows( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + flows = db.query(Flow).order_by(Flow.updated_at.desc()).all() + return flows + + +@router.post("", response_model=FlowResponse) +def create_flow( + request: FlowCreate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = Flow( + name=request.name, + description=request.description, + trigger_type=request.trigger_type, + trigger_value=request.trigger_value, + nodes=[], + edges=[], + variables={}, + ) + db.add(flow) + db.commit() + db.refresh(flow) + return flow + + +@router.get("/{flow_id}", response_model=FlowResponse) +def get_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + return flow + + +@router.put("/{flow_id}", response_model=FlowResponse) +def update_flow( + flow_id: UUID, + request: FlowUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + update_data = request.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(flow, field, value) + + flow.version += 1 + db.commit() + db.refresh(flow) + return flow + + +@router.delete("/{flow_id}") +def delete_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + db.delete(flow) + db.commit() + return {"success": True} + + +@router.post("/{flow_id}/activate") +def activate_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + if flow.trigger_type in [TriggerType.WELCOME, TriggerType.FALLBACK]: + db.query(Flow).filter( + Flow.trigger_type == flow.trigger_type, + Flow.id != flow_id + ).update({"is_active": False}) + + flow.is_active = True + db.commit() + return {"success": True} + + +@router.post("/{flow_id}/deactivate") +def deactivate_flow( + flow_id: UUID, + db: Session = Depends(get_db), + current_user: User = Depends(require_admin), +): + flow = db.query(Flow).filter(Flow.id == flow_id).first() + if not flow: + raise HTTPException(status_code=404, detail="Flow not found") + + flow.is_active = False + db.commit() + return {"success": True} diff --git a/services/flow-engine/app/main.py b/services/flow-engine/app/main.py new file mode 100644 index 0000000..1ad4d47 --- /dev/null +++ b/services/flow-engine/app/main.py @@ -0,0 +1,49 @@ +from fastapi import FastAPI, Depends +from pydantic import BaseModel +from typing import Optional +from sqlalchemy.orm import Session +from app.models import get_db +from app.engine import FlowEngine +from app.config import get_settings + +settings = get_settings() + +app = FastAPI( + title="WhatsApp Centralizado - Flow Engine", + version="1.0.0", +) + + +class ProcessMessageRequest(BaseModel): + conversation_id: str + contact: dict + conversation: dict + message: dict + + +class ProcessMessageResponse(BaseModel): + handled: bool + flow_id: Optional[str] = None + + +@app.get("/health") +def health_check(): + return {"status": "ok", "service": "flow-engine"} + + +@app.post("/process", response_model=ProcessMessageResponse) +async def process_message( + request: ProcessMessageRequest, + db: Session = Depends(get_db), +): + """Process an incoming message through the flow engine""" + engine = FlowEngine(db) + + handled = await engine.process_message( + conversation_id=request.conversation_id, + contact=request.contact, + conversation=request.conversation, + message=request.message, + ) + + return ProcessMessageResponse(handled=handled) diff --git a/services/flow-engine/app/models.py b/services/flow-engine/app/models.py new file mode 100644 index 0000000..72feb10 --- /dev/null +++ b/services/flow-engine/app/models.py @@ -0,0 +1,58 @@ +from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, Enum as SQLEnum, create_engine +from sqlalchemy.dialects.postgresql import UUID, JSONB +from sqlalchemy.orm import sessionmaker, declarative_base +import enum +import uuid +from datetime import datetime +from app.config import get_settings + +settings = get_settings() +engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base = declarative_base() + + +class TriggerType(str, enum.Enum): + WELCOME = "welcome" + KEYWORD = "keyword" + FALLBACK = "fallback" + EVENT = "event" + MANUAL = "manual" + + +class Flow(Base): + __tablename__ = "flows" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name = Column(String(100), nullable=False) + description = Column(Text, nullable=True) + trigger_type = Column(SQLEnum(TriggerType), nullable=False) + trigger_value = Column(String(255), nullable=True) + nodes = Column(JSONB, default=list) + edges = Column(JSONB, default=list) + variables = Column(JSONB, default=dict) + is_active = Column(Boolean, default=False, nullable=False) + version = Column(Integer, default=1, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + +class FlowSession(Base): + __tablename__ = "flow_sessions" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + conversation_id = Column(UUID(as_uuid=True), nullable=False, index=True) + flow_id = Column(UUID(as_uuid=True), nullable=False) + current_node_id = Column(String(100), nullable=True) + variables = Column(JSONB, default=dict) + waiting_for_input = Column(Boolean, default=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close()