- Add /api/posts/{id}/publish endpoint for API-based publishing
- Add /api/posts/{id}/mark-published endpoint for manual workflow
- Add content length validation before publishing
- Update modal with "Ya lo publiqué" and "Publicar (API)" buttons
- Fix retry_count handling for None values
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
184 lines
5.3 KiB
Python
184 lines
5.3 KiB
Python
"""
|
|
Tareas de publicación de posts.
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
from worker.celery_app import celery_app
|
|
from app.core.database import SessionLocal
|
|
from app.models.post import Post
|
|
from app.publishers import get_publisher
|
|
|
|
|
|
def run_async(coro):
|
|
"""Helper para ejecutar coroutines en Celery."""
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.publish_post.publish_scheduled_posts")
|
|
def publish_scheduled_posts():
|
|
"""
|
|
Publicar posts que están programados para ahora.
|
|
Se ejecuta cada minuto.
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
now = datetime.utcnow()
|
|
|
|
# Obtener posts listos para publicar
|
|
posts = db.query(Post).filter(
|
|
Post.status == "scheduled",
|
|
Post.scheduled_at <= now
|
|
).all()
|
|
|
|
results = []
|
|
|
|
for post in posts:
|
|
# Marcar como en proceso
|
|
post.status = "publishing"
|
|
db.commit()
|
|
|
|
# Publicar en cada plataforma
|
|
success_count = 0
|
|
platform_ids = {}
|
|
errors = []
|
|
|
|
for platform in post.platforms:
|
|
result = publish_to_platform.delay(post.id, platform)
|
|
# En producción, esto sería asíncrono
|
|
# Por ahora, ejecutamos secuencialmente
|
|
|
|
results.append(f"Post {post.id} enviado a publicación")
|
|
|
|
return f"Procesados {len(posts)} posts"
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(
|
|
name="worker.tasks.publish_post.publish_to_platform",
|
|
bind=True,
|
|
max_retries=3,
|
|
default_retry_delay=60
|
|
)
|
|
def publish_to_platform(self, post_id: int, platform: str):
|
|
"""Publicar un post en una plataforma específica."""
|
|
import json
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post:
|
|
return f"Post {post_id} no encontrado"
|
|
|
|
# Obtener publisher
|
|
try:
|
|
publisher = get_publisher(platform)
|
|
except ValueError as e:
|
|
return f"Error: {e}"
|
|
|
|
# Verificar si es un hilo
|
|
if post.content_type == "hilo_educativo":
|
|
# Parsear posts del hilo desde content_x (JSON)
|
|
try:
|
|
thread_posts = json.loads(post.content_x) if post.content_x else []
|
|
if not thread_posts:
|
|
# Fallback: separar contenido por doble salto de línea
|
|
thread_posts = [p.strip() for p in post.content.split("\n\n") if p.strip()]
|
|
except json.JSONDecodeError:
|
|
thread_posts = [p.strip() for p in post.content.split("\n\n") if p.strip()]
|
|
|
|
# Publicar como hilo
|
|
result = run_async(
|
|
publisher.publish_thread(thread_posts)
|
|
)
|
|
else:
|
|
# Publicación normal
|
|
content = post.get_content_for_platform(platform)
|
|
|
|
# Validar longitud antes de publicar
|
|
if hasattr(publisher, 'char_limit') and len(content) > publisher.char_limit:
|
|
error_msg = (
|
|
f"Contenido excede límite: {len(content)}/{publisher.char_limit} "
|
|
f"caracteres (sobran {len(content) - publisher.char_limit})"
|
|
)
|
|
post.error_message = f"{platform}: {error_msg}"
|
|
post.status = "failed"
|
|
db.commit()
|
|
return f"Error en {platform}: {error_msg}"
|
|
|
|
result = run_async(
|
|
publisher.publish(content, post.image_url)
|
|
)
|
|
|
|
if result.success:
|
|
# Guardar ID del post en la plataforma
|
|
platform_ids = post.platform_post_ids or {}
|
|
platform_ids[platform] = result.post_id
|
|
post.platform_post_ids = platform_ids
|
|
|
|
# Verificar si todas las plataformas están publicadas
|
|
all_published = all(
|
|
p in platform_ids for p in post.platforms
|
|
)
|
|
|
|
if all_published:
|
|
post.status = "published"
|
|
post.published_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
return f"Publicado en {platform}: {result.post_id}"
|
|
|
|
else:
|
|
# Error en publicación
|
|
post.error_message = f"{platform}: {result.error_message}"
|
|
post.retry_count = (post.retry_count or 0) + 1
|
|
|
|
if post.retry_count >= 3:
|
|
post.status = "failed"
|
|
|
|
db.commit()
|
|
|
|
# Reintentar
|
|
raise self.retry(
|
|
exc=Exception(result.error_message),
|
|
countdown=60 * post.retry_count
|
|
)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.publish_post.publish_now")
|
|
def publish_now(post_id: int):
|
|
"""Publicar un post inmediatamente."""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post:
|
|
return f"Post {post_id} no encontrado"
|
|
|
|
# Cambiar estado
|
|
post.status = "publishing"
|
|
post.scheduled_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Publicar en cada plataforma
|
|
for platform in post.platforms:
|
|
publish_to_platform.delay(post_id, platform)
|
|
|
|
return f"Post {post_id} enviado a publicación inmediata"
|
|
|
|
finally:
|
|
db.close()
|