""" Tareas de generación de contenido. Usa el nuevo Content Generation Engine v2 con: - Quality scoring - Anti-repetición via Context Engine - Almacenamiento en Content Memory """ import asyncio import logging from datetime import datetime, timedelta from typing import Optional, Dict, Any from worker.celery_app import celery_app from app.core.database import SessionLocal from app.models.post import Post from app.models.content_calendar import ContentCalendar from app.models.tip_template import TipTemplate from app.models.product import Product from app.models.service import Service from app.services.content_generator import content_generator logger = logging.getLogger(__name__) def run_async(coro): """Helper para ejecutar coroutines en Celery.""" loop = asyncio.get_event_loop() return loop.run_until_complete(coro) def _save_to_memory( post_id: int, content: str, content_type: str, platform: str, quality_score: Optional[int] = None, quality_breakdown: Optional[Dict] = None, template_used: Optional[str] = None ): """ Encolar guardado en memoria como tarea async. Evita bloquear la generación principal. """ from worker.tasks.content_memory import analyze_and_save_content analyze_and_save_content.delay( post_id=post_id, content=content, content_type=content_type, platform=platform, quality_score=quality_score, quality_breakdown=quality_breakdown, template_used=template_used ) @celery_app.task(name="worker.tasks.generate_content.generate_scheduled_content") def generate_scheduled_content(): """ Generar contenido según el calendario. Se ejecuta cada hora. """ db = SessionLocal() try: now = datetime.now() current_day = now.weekday() current_hour = now.hour # Obtener entradas del calendario para esta hora entries = db.query(ContentCalendar).filter( ContentCalendar.day_of_week == current_day, ContentCalendar.is_active == True ).all() generated = 0 for entry in entries: # Verificar si es la hora correcta if entry.time.hour != current_hour: continue # Generar contenido según el tipo if entry.content_type == "tip_tech": generate_tip_post.delay( platforms=entry.platforms, category_filter=entry.category_filter, requires_approval=entry.requires_approval ) generated += 1 elif entry.content_type == "producto": generate_product_post.delay( platforms=entry.platforms, requires_approval=entry.requires_approval ) generated += 1 elif entry.content_type == "servicio": generate_service_post.delay( platforms=entry.platforms, requires_approval=entry.requires_approval ) generated += 1 return f"Procesadas {len(entries)} entradas, {generated} generaciones iniciadas" finally: db.close() @celery_app.task(name="worker.tasks.generate_content.generate_tip_post") def generate_tip_post( platforms: list, category_filter: str = None, requires_approval: bool = False, use_quality_check: bool = True ): """ Generar un post de tip tech con quality scoring. Args: platforms: Lista de plataformas destino category_filter: Categoría específica (opcional) requires_approval: Si requiere aprobación manual use_quality_check: Si usar validación de calidad """ db = SessionLocal() try: # Seleccionar un tip que no se haya usado recientemente query = db.query(TipTemplate).filter( TipTemplate.is_active == True ) if category_filter: query = query.filter(TipTemplate.category == category_filter) # Ordenar por uso (menos usado primero) tip = query.order_by( TipTemplate.used_count.asc(), TipTemplate.last_used.asc().nullsfirst() ).first() if not tip: return {"status": "skipped", "reason": "no_tips_available"} # Generar contenido para cada plataforma content_by_platform = {} quality_info = {} for platform in platforms: if use_quality_check: # Usar generación con validación de calidad result = run_async( content_generator.generate_with_quality_check( template_name="tip_tech", variables={ "category": tip.category, "difficulty_level": "principiante", "target_audience": "profesionales tech" }, platform=platform, db=db, max_attempts=2 ) ) content_by_platform[platform] = result["content"] quality_info[platform] = { "score": result.get("quality_score"), "breakdown": result.get("score_breakdown"), "attempts": result.get("attempts", 1) } else: # Generación simple (fallback) content = run_async( content_generator.generate_tip_tech( category=tip.category, platform=platform, template=tip.template, db=db ) ) content_by_platform[platform] = content # Obtener mejor score entre plataformas best_score = None best_breakdown = None total_attempts = 1 if quality_info: scores = [q.get("score") for q in quality_info.values() if q.get("score")] if scores: best_score = max(scores) breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")] if breakdowns: best_breakdown = breakdowns[0] attempts = [q.get("attempts", 1) for q in quality_info.values()] if attempts: total_attempts = max(attempts) # Crear post main_platform = platforms[0] post = Post( content=content_by_platform.get(main_platform, ""), content_type="tip_tech", platforms=platforms, content_x=content_by_platform.get("x"), content_threads=content_by_platform.get("threads"), content_instagram=content_by_platform.get("instagram"), content_facebook=content_by_platform.get("facebook"), status="pending_approval" if requires_approval else "scheduled", scheduled_at=datetime.utcnow() + timedelta(minutes=5), approval_required=requires_approval, tip_template_id=tip.id, quality_score=best_score, score_breakdown=best_breakdown, generation_attempts=total_attempts ) db.add(post) # Actualizar uso del tip tip.used_count += 1 tip.last_used = datetime.utcnow() db.commit() # Guardar en memoria (async) _save_to_memory( post_id=post.id, content=post.content, content_type="tip_tech", platform=main_platform, quality_score=best_score, quality_breakdown=best_breakdown, template_used="tip_tech" ) logger.info( f"Tip generado: post_id={post.id}, score={best_score}, " f"attempts={total_attempts}, category={tip.category}" ) return { "status": "success", "post_id": post.id, "quality_score": best_score, "attempts": total_attempts } except Exception as e: logger.error(f"Error generando tip: {e}") db.rollback() raise finally: db.close() @celery_app.task(name="worker.tasks.generate_content.generate_product_post") def generate_product_post( platforms: list, product_id: int = None, requires_approval: bool = True, use_quality_check: bool = True ): """ Generar un post de producto con quality scoring. Args: platforms: Lista de plataformas destino product_id: ID del producto específico (opcional) requires_approval: Si requiere aprobación manual use_quality_check: Si usar validación de calidad """ db = SessionLocal() try: # Seleccionar producto if product_id: product = db.query(Product).filter(Product.id == product_id).first() else: # Seleccionar uno aleatorio que no se haya publicado recientemente product = db.query(Product).filter( Product.is_available == True ).order_by( Product.last_posted_at.asc().nullsfirst() ).first() if not product: return {"status": "skipped", "reason": "no_products_available"} # Generar contenido content_by_platform = {} quality_info = {} for platform in platforms: if use_quality_check: result = run_async( content_generator.generate_with_quality_check( template_name="product_post", variables={ "product_name": product.name, "product_description": product.description or "", "price": product.price, "category": product.category, "specs": product.specs or {}, "highlights": product.highlights or [] }, platform=platform, db=db, max_attempts=2 ) ) content_by_platform[platform] = result["content"] quality_info[platform] = { "score": result.get("quality_score"), "breakdown": result.get("score_breakdown"), "attempts": result.get("attempts", 1) } else: content = run_async( content_generator.generate_product_post( product=product.to_dict(), platform=platform, db=db ) ) content_by_platform[platform] = content # Obtener mejor score best_score = None best_breakdown = None total_attempts = 1 if quality_info: scores = [q.get("score") for q in quality_info.values() if q.get("score")] if scores: best_score = max(scores) breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")] if breakdowns: best_breakdown = breakdowns[0] attempts = [q.get("attempts", 1) for q in quality_info.values()] if attempts: total_attempts = max(attempts) # Crear post main_platform = platforms[0] post = Post( content=content_by_platform.get(main_platform, ""), content_type="producto", platforms=platforms, content_x=content_by_platform.get("x"), content_threads=content_by_platform.get("threads"), content_instagram=content_by_platform.get("instagram"), content_facebook=content_by_platform.get("facebook"), status="pending_approval" if requires_approval else "scheduled", scheduled_at=datetime.utcnow() + timedelta(minutes=5), approval_required=requires_approval, product_id=product.id, image_url=product.main_image, quality_score=best_score, score_breakdown=best_breakdown, generation_attempts=total_attempts ) db.add(post) # Actualizar última publicación del producto product.last_posted_at = datetime.utcnow() db.commit() # Guardar en memoria _save_to_memory( post_id=post.id, content=post.content, content_type="producto", platform=main_platform, quality_score=best_score, quality_breakdown=best_breakdown, template_used="product_post" ) logger.info( f"Producto generado: post_id={post.id}, product={product.name}, " f"score={best_score}" ) return { "status": "success", "post_id": post.id, "product_id": product.id, "quality_score": best_score } except Exception as e: logger.error(f"Error generando producto: {e}") db.rollback() raise finally: db.close() @celery_app.task(name="worker.tasks.generate_content.generate_service_post") def generate_service_post( platforms: list, service_id: int = None, requires_approval: bool = True, use_quality_check: bool = True ): """ Generar un post de servicio con quality scoring. Args: platforms: Lista de plataformas destino service_id: ID del servicio específico (opcional) requires_approval: Si requiere aprobación manual use_quality_check: Si usar validación de calidad """ db = SessionLocal() try: # Seleccionar servicio if service_id: service = db.query(Service).filter(Service.id == service_id).first() else: service = db.query(Service).filter( Service.is_active == True ).order_by( Service.last_posted_at.asc().nullsfirst() ).first() if not service: return {"status": "skipped", "reason": "no_services_available"} # Generar contenido content_by_platform = {} quality_info = {} for platform in platforms: if use_quality_check: result = run_async( content_generator.generate_with_quality_check( template_name="service_post", variables={ "service_name": service.name, "service_description": service.description or "", "category": service.category, "target_sectors": service.target_sectors or [], "benefits": service.benefits or [], "call_to_action": service.call_to_action or "Contáctanos" }, platform=platform, db=db, max_attempts=2 ) ) content_by_platform[platform] = result["content"] quality_info[platform] = { "score": result.get("quality_score"), "breakdown": result.get("score_breakdown"), "attempts": result.get("attempts", 1) } else: content = run_async( content_generator.generate_service_post( service=service.to_dict(), platform=platform, db=db ) ) content_by_platform[platform] = content # Obtener mejor score best_score = None best_breakdown = None total_attempts = 1 if quality_info: scores = [q.get("score") for q in quality_info.values() if q.get("score")] if scores: best_score = max(scores) breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")] if breakdowns: best_breakdown = breakdowns[0] attempts = [q.get("attempts", 1) for q in quality_info.values()] if attempts: total_attempts = max(attempts) # Crear post main_platform = platforms[0] post = Post( content=content_by_platform.get(main_platform, ""), content_type="servicio", platforms=platforms, content_x=content_by_platform.get("x"), content_threads=content_by_platform.get("threads"), content_instagram=content_by_platform.get("instagram"), content_facebook=content_by_platform.get("facebook"), status="pending_approval" if requires_approval else "scheduled", scheduled_at=datetime.utcnow() + timedelta(minutes=5), approval_required=requires_approval, service_id=service.id, image_url=service.main_image, quality_score=best_score, score_breakdown=best_breakdown, generation_attempts=total_attempts ) db.add(post) # Actualizar última publicación del servicio service.last_posted_at = datetime.utcnow() db.commit() # Guardar en memoria _save_to_memory( post_id=post.id, content=post.content, content_type="servicio", platform=main_platform, quality_score=best_score, quality_breakdown=best_breakdown, template_used="service_post" ) logger.info( f"Servicio generado: post_id={post.id}, service={service.name}, " f"score={best_score}" ) return { "status": "success", "post_id": post.id, "service_id": service.id, "quality_score": best_score } except Exception as e: logger.error(f"Error generando servicio: {e}") db.rollback() raise finally: db.close() @celery_app.task(name="worker.tasks.generate_content.generate_threads_manual_posts") def generate_threads_manual_posts(count: int = 2): """ Generar posts para Threads que se publicarán manualmente. Genera tips de tecnología variados optimizados para Threads, los guarda con status 'draft' y envía notificación a Telegram con el contenido listo para copiar. Args: count: Número de posts a generar (default 2) """ from app.services.notifications import telegram_notify db = SessionLocal() try: created = [] # Categorías para variar el contenido categories = ["productividad", "ia", "seguridad", "automatización", "python", "negocios"] for i in range(count): # Seleccionar categoría diferente para cada post category = categories[i % len(categories)] # Generar contenido con IA usando el template tip_tech result = run_async( content_generator.generate_with_quality_check( template_name="tip_tech", variables={ "category": category, "difficulty_level": "principiante", "target_audience": "profesionales y emprendedores" }, platform="threads", db=db, max_attempts=2 ) ) content = result.get("content", "") if not content: logger.warning(f"No se pudo generar contenido para Threads (categoria: {category})") continue # Crear post con status draft (para publicación manual) post = Post( content=content, content_threads=content, content_type="tip_tech", platforms=["threads"], status="draft", # Para publicación manual approval_required=False, quality_score=result.get("quality_score"), score_breakdown=result.get("score_breakdown"), generation_attempts=result.get("attempts", 1) ) db.add(post) db.flush() # Para obtener el ID created.append({"category": category, "post_id": post.id, "content": content}) # Enviar notificación a Telegram con el contenido quality_score = result.get("quality_score", "N/A") telegram_message = ( f"🧵 *Nuevo post para Threads*\n" f"📂 Categoría: {category}\n" f"⭐ Calidad: {quality_score}/100\n\n" f"━━━━━━━━━━━━━━━━━━━━━\n" f"📋 *COPIAR Y PUBLICAR:*\n" f"━━━━━━━━━━━━━━━━━━━━━\n\n" f"`{content}`\n\n" f"━━━━━━━━━━━━━━━━━━━━━\n" f"🔗 [Abrir Threads](https://threads.net)\n" f"📱 Post #{post.id} | {len(content)} caracteres" ) run_async(telegram_notify(telegram_message, parse_mode="Markdown")) db.commit() logger.info(f"Generados {len(created)} posts para Threads (manual): categorías={[c['category'] for c in created]}") return { "status": "success", "generated": len(created), "categories": [c["category"] for c in created], "post_ids": [c["post_id"] for c in created] } except Exception as e: logger.error(f"Error generando posts para Threads: {e}") db.rollback() return {"status": "error", "error": str(e)} finally: db.close() @celery_app.task(name="worker.tasks.generate_content.generate_thread") def generate_thread( topic: str, num_posts: int = 5, requires_approval: bool = True ): """ Generar un hilo educativo. Args: topic: Tema del hilo num_posts: Número de posts requires_approval: Si requiere aprobación """ db = SessionLocal() try: # Generar hilo posts = run_async( content_generator.generate_thread( topic=topic, num_posts=num_posts, db=db ) ) if not posts: return {"status": "error", "reason": "no_posts_generated"} # Crear posts individuales (vinculados) created_posts = [] for i, content in enumerate(posts): post = Post( content=content, content_type="hilo_educativo", platforms=["x"], # Hilos principalmente para X content_x=content, status="pending_approval" if requires_approval else "scheduled", scheduled_at=datetime.utcnow() + timedelta(minutes=5 + i), # Escalonado approval_required=requires_approval, ) db.add(post) created_posts.append(post) db.commit() # Guardar primer post en memoria (representa el hilo) if created_posts: _save_to_memory( post_id=created_posts[0].id, content="\n\n".join(posts), content_type="hilo_educativo", platform="x", template_used="thread" ) logger.info(f"Hilo generado: {len(created_posts)} posts sobre '{topic}'") return { "status": "success", "post_ids": [p.id for p in created_posts], "topic": topic } except Exception as e: logger.error(f"Error generando hilo: {e}") db.rollback() raise finally: db.close()