""" Tareas de publicación de posts. """ import asyncio from datetime import datetime from worker.celery_app import celery_app from app.core.database import SessionLocal from app.models.post import Post from app.publishers import get_publisher def run_async(coro): """Helper para ejecutar coroutines en Celery.""" loop = asyncio.get_event_loop() return loop.run_until_complete(coro) @celery_app.task(name="worker.tasks.publish_post.publish_scheduled_posts") def publish_scheduled_posts(): """ Publicar posts que están programados para ahora. Se ejecuta cada minuto. """ db = SessionLocal() try: now = datetime.utcnow() # Obtener posts listos para publicar posts = db.query(Post).filter( Post.status == "scheduled", Post.scheduled_at <= now ).all() results = [] for post in posts: # Marcar como en proceso post.status = "publishing" db.commit() # Publicar en cada plataforma success_count = 0 platform_ids = {} errors = [] for platform in post.platforms: result = publish_to_platform.delay(post.id, platform) # En producción, esto sería asíncrono # Por ahora, ejecutamos secuencialmente results.append(f"Post {post.id} enviado a publicación") return f"Procesados {len(posts)} posts" finally: db.close() @celery_app.task( name="worker.tasks.publish_post.publish_to_platform", bind=True, max_retries=3, default_retry_delay=60 ) def publish_to_platform(self, post_id: int, platform: str): """Publicar un post en una plataforma específica.""" import json db = SessionLocal() try: post = db.query(Post).filter(Post.id == post_id).first() if not post: return f"Post {post_id} no encontrado" # Obtener publisher try: publisher = get_publisher(platform) except ValueError as e: return f"Error: {e}" # Verificar si es un hilo if post.content_type == "hilo_educativo": # Parsear posts del hilo desde content_x (JSON) try: thread_posts = json.loads(post.content_x) if post.content_x else [] if not thread_posts: # Fallback: separar contenido por doble salto de línea thread_posts = [p.strip() for p in post.content.split("\n\n") if p.strip()] except json.JSONDecodeError: thread_posts = [p.strip() for p in post.content.split("\n\n") if p.strip()] # Publicar como hilo result = run_async( publisher.publish_thread(thread_posts) ) else: # Publicación normal content = post.get_content_for_platform(platform) # Validar longitud antes de publicar if hasattr(publisher, 'char_limit') and len(content) > publisher.char_limit: error_msg = ( f"Contenido excede límite: {len(content)}/{publisher.char_limit} " f"caracteres (sobran {len(content) - publisher.char_limit})" ) post.error_message = f"{platform}: {error_msg}" post.status = "failed" db.commit() return f"Error en {platform}: {error_msg}" result = run_async( publisher.publish(content, post.image_url) ) if result.success: # Guardar ID del post en la plataforma platform_ids = post.platform_post_ids or {} platform_ids[platform] = result.post_id post.platform_post_ids = platform_ids # Verificar si todas las plataformas están publicadas all_published = all( p in platform_ids for p in post.platforms ) if all_published: post.status = "published" post.published_at = datetime.utcnow() db.commit() return f"Publicado en {platform}: {result.post_id}" else: # Error en publicación post.error_message = f"{platform}: {result.error_message}" post.retry_count = (post.retry_count or 0) + 1 if post.retry_count >= 3: post.status = "failed" db.commit() # Reintentar raise self.retry( exc=Exception(result.error_message), countdown=60 * post.retry_count ) except Exception as e: db.rollback() raise finally: db.close() @celery_app.task(name="worker.tasks.publish_post.publish_now") def publish_now(post_id: int): """Publicar un post inmediatamente.""" db = SessionLocal() try: post = db.query(Post).filter(Post.id == post_id).first() if not post: return f"Post {post_id} no encontrado" # Cambiar estado post.status = "publishing" post.scheduled_at = datetime.utcnow() db.commit() # Publicar en cada plataforma for platform in post.platforms: publish_to_platform.delay(post_id, platform) return f"Post {post_id} enviado a publicación inmediata" finally: db.close()