""" Tareas de gestión de memoria de contenido. Incluye: - Actualización periódica de engagement scores - Marcado de top performers - Sincronización de métricas desde posts - Limpieza de memoria antigua """ import logging from datetime import datetime, timedelta from typing import Optional from worker.celery_app import celery_app from app.core.database import SessionLocal from app.models.post import Post from app.models.post_metrics import PostMetrics from app.models.content_memory import ContentMemory logger = logging.getLogger(__name__) @celery_app.task(name="worker.tasks.content_memory.update_engagement_scores") def update_engagement_scores(): """ Actualizar scores de engagement basado en métricas reales. Esta tarea: 1. Obtiene métricas recientes de posts 2. Calcula engagement score normalizado 3. Actualiza ContentMemory 4. Marca top performers Se ejecuta cada 6 horas. """ db = SessionLocal() try: updated_count = 0 new_top_performers = 0 # Obtener posts con métricas que tengan ContentMemory memories = db.query(ContentMemory).filter( ContentMemory.created_at >= datetime.utcnow() - timedelta(days=90) ).all() for memory in memories: # Obtener el post asociado post = db.query(Post).filter(Post.id == memory.post_id).first() if not post or not post.metrics: continue # Calcular engagement score metrics = post.metrics old_score = memory.engagement_score memory.update_engagement(metrics) updated_count += 1 # Logging si cambió significativamente if old_score and abs((memory.engagement_score or 0) - old_score) > 10: logger.info( f"Post {post.id} engagement cambió: {old_score:.1f} -> {memory.engagement_score:.1f}" ) db.commit() # Recalcular top performers new_top_performers = _recalculate_top_performers(db) logger.info( f"Engagement actualizado: {updated_count} posts, " f"{new_top_performers} nuevos top performers" ) return { "updated": updated_count, "new_top_performers": new_top_performers } except Exception as e: logger.error(f"Error actualizando engagement: {e}") db.rollback() raise finally: db.close() def _recalculate_top_performers(db, top_percentile: int = 20) -> int: """ Recalcular qué posts son top performers. Args: db: Sesión de DB top_percentile: Percentil para considerar top performer Returns: Número de nuevos top performers marcados """ # Obtener todos los scores all_scores = db.query(ContentMemory.engagement_score).filter( ContentMemory.engagement_score.isnot(None) ).all() if not all_scores: return 0 scores = sorted([s[0] for s in all_scores], reverse=True) # Calcular umbral threshold_idx = max(0, int(len(scores) * top_percentile / 100) - 1) threshold_score = scores[threshold_idx] # Marcar nuevos top performers new_tops = db.query(ContentMemory).filter( ContentMemory.engagement_score >= threshold_score, ContentMemory.is_top_performer == False ).update({"is_top_performer": True}) # Desmarcar los que ya no califican db.query(ContentMemory).filter( ContentMemory.engagement_score < threshold_score, ContentMemory.is_top_performer == True ).update({"is_top_performer": False}) db.commit() return new_tops @celery_app.task(name="worker.tasks.content_memory.sync_post_metrics") def sync_post_metrics(post_id: int): """ Sincronizar métricas de un post específico a ContentMemory. Se llama después de obtener nuevas métricas. Args: post_id: ID del post """ db = SessionLocal() try: post = db.query(Post).filter(Post.id == post_id).first() if not post or not post.metrics: return {"status": "skipped", "reason": "no_metrics"} memory = db.query(ContentMemory).filter( ContentMemory.post_id == post_id ).first() if not memory: return {"status": "skipped", "reason": "no_memory"} memory.update_engagement(post.metrics) db.commit() return { "status": "updated", "post_id": post_id, "engagement_score": memory.engagement_score } finally: db.close() @celery_app.task(name="worker.tasks.content_memory.analyze_and_save_content") def analyze_and_save_content( post_id: int, content: str, content_type: str, platform: str, quality_score: Optional[int] = None, quality_breakdown: Optional[dict] = None, template_used: Optional[str] = None, personality_used: Optional[str] = None ): """ Analizar contenido generado y guardarlo en memoria. Se llama después de generar un nuevo post. Args: post_id: ID del post creado content: Contenido generado content_type: Tipo de contenido platform: Plataforma principal quality_score: Score de calidad (si se evaluó) quality_breakdown: Breakdown del score template_used: Template usado personality_used: Personalidad usada """ db = SessionLocal() try: # Verificar que no exista ya existing = db.query(ContentMemory).filter( ContentMemory.post_id == post_id ).first() if existing: return {"status": "skipped", "reason": "already_exists"} # Importar context engine para análisis from app.services.ai import context_engine # Guardar en memoria memory = context_engine.save_to_memory( db=db, post_id=post_id, content=content, content_type=content_type, platform=platform, quality_score=quality_score, quality_breakdown=quality_breakdown, template_used=template_used, personality_used=personality_used ) return { "status": "created", "memory_id": memory.id, "topics": memory.topics, "hook_type": memory.hook_type } except Exception as e: logger.error(f"Error guardando en memoria: {e}") db.rollback() raise finally: db.close() @celery_app.task(name="worker.tasks.content_memory.cleanup_old_memory") def cleanup_old_memory(days_to_keep: int = 180): """ Limpiar registros de memoria antiguos. Mantiene top performers indefinidamente. Args: days_to_keep: Días de registros a mantener (excepto top performers) """ db = SessionLocal() try: cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep) # Eliminar registros viejos que NO son top performers deleted = db.query(ContentMemory).filter( ContentMemory.created_at < cutoff_date, ContentMemory.is_top_performer == False ).delete() db.commit() logger.info(f"Limpieza de memoria: {deleted} registros eliminados") return {"deleted": deleted} finally: db.close() @celery_app.task(name="worker.tasks.content_memory.refresh_best_posts_yaml") def refresh_best_posts_yaml(): """ Actualizar el archivo best_posts.yaml con top performers reales. Se ejecuta semanalmente para mantener ejemplos actualizados. """ import yaml from pathlib import Path db = SessionLocal() try: # Obtener top performers por tipo de contenido content_types = ["tip_tech", "producto", "servicio"] examples = {} for content_type in content_types: top = db.query(ContentMemory).filter( ContentMemory.content_type == content_type, ContentMemory.is_top_performer == True ).order_by( ContentMemory.engagement_score.desc() ).limit(5).all() if top: examples[content_type] = [] for mem in top: post = db.query(Post).filter(Post.id == mem.post_id).first() if post: examples[content_type].append({ "content": post.content, "platform": mem.platform, "engagement_score": mem.engagement_score, "metrics": post.metrics, "analysis": { "hook_type": mem.hook_type, "topics": mem.topics } }) if not examples: return {"status": "skipped", "reason": "no_top_performers"} # Actualizar archivo YAML yaml_path = Path(__file__).parent.parent.parent / "app" / "prompts" / "examples" / "best_posts.yaml" # Cargar archivo existente with open(yaml_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) # Actualizar ejemplos data["examples"] = examples data["metadata"]["last_updated"] = datetime.utcnow().isoformat() data["metadata"]["auto_update"] = True # Guardar with open(yaml_path, "w", encoding="utf-8") as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True) logger.info(f"best_posts.yaml actualizado con {sum(len(v) for v in examples.values())} ejemplos") return { "status": "updated", "examples_by_type": {k: len(v) for k, v in examples.items()} } except Exception as e: logger.error(f"Error actualizando best_posts.yaml: {e}") raise finally: db.close()