Files
Consultoría AS 29520a00f6 feat: Add publish and mark-published endpoints with validation
- Add /api/posts/{id}/publish endpoint for API-based publishing
- Add /api/posts/{id}/mark-published endpoint for manual workflow
- Add content length validation before publishing
- Update modal with "Ya lo publiqué" and "Publicar (API)" buttons
- Fix retry_count handling for None values

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 23:01:28 +00:00

304 lines
9.7 KiB
Python

"""
API Routes para gestión de Posts.
"""
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.post import Post
from app.services.ai.platform_adapter import platform_adapter
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class PostCreate(BaseModel):
content: str
content_type: str
platforms: List[str]
scheduled_at: Optional[datetime] = None
image_url: Optional[str] = None
approval_required: bool = False
hashtags: Optional[List[str]] = None
class Config:
from_attributes = True
class PostUpdate(BaseModel):
content: Optional[str] = None
content_x: Optional[str] = None
content_threads: Optional[str] = None
content_instagram: Optional[str] = None
content_facebook: Optional[str] = None
platforms: Optional[List[str]] = None
scheduled_at: Optional[datetime] = None
status: Optional[str] = None
image_url: Optional[str] = None
hashtags: Optional[List[str]] = None
class Config:
from_attributes = True
class PostResponse(BaseModel):
id: int
content: str
content_type: str
platforms: List[str]
status: str
scheduled_at: Optional[datetime]
published_at: Optional[datetime]
image_url: Optional[str]
approval_required: bool
created_at: datetime
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/", response_model=List[PostResponse])
async def list_posts(
status: Optional[str] = Query(None, description="Filtrar por estado"),
content_type: Optional[str] = Query(None, description="Filtrar por tipo"),
platform: Optional[str] = Query(None, description="Filtrar por plataforma"),
limit: int = Query(50, le=100),
offset: int = Query(0),
db: Session = Depends(get_db)
):
"""Listar posts con filtros opcionales."""
query = db.query(Post)
if status:
query = query.filter(Post.status == status)
if content_type:
query = query.filter(Post.content_type == content_type)
if platform:
query = query.filter(Post.platforms.contains([platform]))
posts = query.order_by(Post.created_at.desc()).offset(offset).limit(limit).all()
return posts
@router.get("/pending")
async def list_pending_posts(db: Session = Depends(get_db)):
"""Listar posts pendientes de aprobación."""
posts = db.query(Post).filter(
Post.status == "pending_approval"
).order_by(Post.scheduled_at.asc()).all()
return [post.to_dict() for post in posts]
@router.get("/scheduled")
async def list_scheduled_posts(db: Session = Depends(get_db)):
"""Listar posts programados."""
posts = db.query(Post).filter(
Post.status == "scheduled",
Post.scheduled_at >= datetime.utcnow()
).order_by(Post.scheduled_at.asc()).all()
return [post.to_dict() for post in posts]
@router.get("/{post_id}")
async def get_post(post_id: int, db: Session = Depends(get_db)):
"""Obtener un post por ID."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
return post.to_dict()
@router.post("/", response_model=PostResponse)
async def create_post(post_data: PostCreate, db: Session = Depends(get_db)):
"""Crear un nuevo post con auto-adaptación de contenido por plataforma."""
# Adaptar contenido para cada plataforma
adapted_content = await platform_adapter.adapt_for_all_platforms_smart(
post_data.content,
post_data.platforms
)
post = Post(
content=post_data.content,
content_type=post_data.content_type,
platforms=post_data.platforms,
scheduled_at=post_data.scheduled_at,
image_url=post_data.image_url,
approval_required=post_data.approval_required,
hashtags=post_data.hashtags,
status="pending_approval" if post_data.approval_required else "scheduled",
# Contenido adaptado por plataforma
content_x=adapted_content.get("x"),
content_threads=adapted_content.get("threads"),
content_instagram=adapted_content.get("instagram"),
content_facebook=adapted_content.get("facebook"),
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.put("/{post_id}")
async def update_post(post_id: int, post_data: PostUpdate, db: Session = Depends(get_db)):
"""Actualizar un post con re-adaptación automática si cambia el contenido."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
update_data = post_data.dict(exclude_unset=True)
# Si cambia el contenido principal, re-adaptar para todas las plataformas
if "content" in update_data:
platforms = update_data.get("platforms", post.platforms)
adapted_content = await platform_adapter.adapt_for_all_platforms_smart(
update_data["content"],
platforms
)
# Solo actualizar campos de plataforma si no vienen explícitos en la request
if "content_x" not in update_data and "x" in platforms:
update_data["content_x"] = adapted_content.get("x")
if "content_threads" not in update_data and "threads" in platforms:
update_data["content_threads"] = adapted_content.get("threads")
if "content_instagram" not in update_data and "instagram" in platforms:
update_data["content_instagram"] = adapted_content.get("instagram")
if "content_facebook" not in update_data and "facebook" in platforms:
update_data["content_facebook"] = adapted_content.get("facebook")
for field, value in update_data.items():
setattr(post, field, value)
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post.to_dict()
@router.post("/{post_id}/approve")
async def approve_post(post_id: int, db: Session = Depends(get_db)):
"""Aprobar un post pendiente."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
if post.status != "pending_approval":
raise HTTPException(status_code=400, detail="El post no está pendiente de aprobación")
post.status = "scheduled"
post.approved_at = datetime.utcnow()
db.commit()
return {"message": "Post aprobado", "post_id": post_id}
@router.post("/{post_id}/reject")
async def reject_post(post_id: int, db: Session = Depends(get_db)):
"""Rechazar un post pendiente."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
post.status = "cancelled"
db.commit()
return {"message": "Post rechazado", "post_id": post_id}
@router.post("/{post_id}/publish")
async def publish_post_now(post_id: int, db: Session = Depends(get_db)):
"""Publicar un post inmediatamente."""
from worker.tasks.publish_post import publish_to_platform
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
if post.status not in ["draft", "pending_approval", "scheduled", "failed"]:
raise HTTPException(
status_code=400,
detail=f"No se puede publicar un post con status '{post.status}'"
)
# Cambiar estado a publishing
post.status = "publishing"
db.commit()
# Encolar tarea de publicación para cada plataforma
for platform in post.platforms:
publish_to_platform.delay(post.id, platform)
return {
"success": True,
"message": "Post enviado a publicación",
"post_id": post_id,
"platforms": post.platforms
}
@router.post("/{post_id}/mark-published")
async def mark_post_as_published(post_id: int, db: Session = Depends(get_db)):
"""Marcar un post como publicado manualmente (sin usar API)."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
if post.status not in ["draft", "pending_approval", "scheduled", "failed"]:
raise HTTPException(
status_code=400,
detail=f"No se puede marcar un post con status '{post.status}'"
)
post.status = "published"
post.published_at = datetime.utcnow()
post.error_message = None
db.commit()
return {
"success": True,
"message": "Post marcado como publicado",
"post_id": post_id
}
@router.post("/{post_id}/regenerate")
async def regenerate_post(post_id: int, db: Session = Depends(get_db)):
"""Regenerar contenido de un post con IA."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
# TODO: Implementar regeneración con DeepSeek
# from app.services.content_generator import regenerate_content
# new_content = await regenerate_content(post)
return {"message": "Regeneración en desarrollo", "post_id": post_id}
@router.delete("/{post_id}")
async def delete_post(post_id: int, db: Session = Depends(get_db)):
"""Eliminar un post."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
db.delete(post)
db.commit()
return {"message": "Post eliminado", "post_id": post_id}