Major improvements to AI content generation: ## New Components (app/services/ai/) - PromptLibrary: YAML-based prompt templates with inheritance - ContextEngine: Anti-repetition and best performers tracking - ContentGeneratorV2: Enhanced generation with dynamic parameters - PlatformAdapter: Platform-specific content adaptation - ContentValidator: AI-powered quality scoring (0-100) ## Prompt Library (app/prompts/) - 3 personalities: default, educational, promotional - 5 templates: tip_tech, product_post, service_post, thread, response - 4 platform configs: x, threads, instagram, facebook - Few-shot examples by category: ia, productividad, seguridad ## Database Changes - New table: content_memory (tracks generated content) - New columns in posts: quality_score, score_breakdown, generation_attempts ## New API Endpoints (/api/v2/generate/) - POST /generate - Generation with quality check - POST /generate/batch - Batch generation - POST /quality/evaluate - Evaluate content quality - GET /templates, /personalities, /platforms - List configs ## Celery Tasks - update_engagement_scores (every 6h) - cleanup_old_memory (monthly) - refresh_best_posts_yaml (weekly) ## Tests - Comprehensive tests for all AI engine components Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
343 lines
9.8 KiB
Python
343 lines
9.8 KiB
Python
"""
|
|
Tareas de gestión de memoria de contenido.
|
|
|
|
Incluye:
|
|
- Actualización periódica de engagement scores
|
|
- Marcado de top performers
|
|
- Sincronización de métricas desde posts
|
|
- Limpieza de memoria antigua
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from worker.celery_app import celery_app
|
|
from app.core.database import SessionLocal
|
|
from app.models.post import Post
|
|
from app.models.post_metrics import PostMetrics
|
|
from app.models.content_memory import ContentMemory
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.content_memory.update_engagement_scores")
|
|
def update_engagement_scores():
|
|
"""
|
|
Actualizar scores de engagement basado en métricas reales.
|
|
|
|
Esta tarea:
|
|
1. Obtiene métricas recientes de posts
|
|
2. Calcula engagement score normalizado
|
|
3. Actualiza ContentMemory
|
|
4. Marca top performers
|
|
|
|
Se ejecuta cada 6 horas.
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
updated_count = 0
|
|
new_top_performers = 0
|
|
|
|
# Obtener posts con métricas que tengan ContentMemory
|
|
memories = db.query(ContentMemory).filter(
|
|
ContentMemory.created_at >= datetime.utcnow() - timedelta(days=90)
|
|
).all()
|
|
|
|
for memory in memories:
|
|
# Obtener el post asociado
|
|
post = db.query(Post).filter(Post.id == memory.post_id).first()
|
|
if not post or not post.metrics:
|
|
continue
|
|
|
|
# Calcular engagement score
|
|
metrics = post.metrics
|
|
old_score = memory.engagement_score
|
|
|
|
memory.update_engagement(metrics)
|
|
updated_count += 1
|
|
|
|
# Logging si cambió significativamente
|
|
if old_score and abs((memory.engagement_score or 0) - old_score) > 10:
|
|
logger.info(
|
|
f"Post {post.id} engagement cambió: {old_score:.1f} -> {memory.engagement_score:.1f}"
|
|
)
|
|
|
|
db.commit()
|
|
|
|
# Recalcular top performers
|
|
new_top_performers = _recalculate_top_performers(db)
|
|
|
|
logger.info(
|
|
f"Engagement actualizado: {updated_count} posts, "
|
|
f"{new_top_performers} nuevos top performers"
|
|
)
|
|
|
|
return {
|
|
"updated": updated_count,
|
|
"new_top_performers": new_top_performers
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error actualizando engagement: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _recalculate_top_performers(db, top_percentile: int = 20) -> int:
|
|
"""
|
|
Recalcular qué posts son top performers.
|
|
|
|
Args:
|
|
db: Sesión de DB
|
|
top_percentile: Percentil para considerar top performer
|
|
|
|
Returns:
|
|
Número de nuevos top performers marcados
|
|
"""
|
|
# Obtener todos los scores
|
|
all_scores = db.query(ContentMemory.engagement_score).filter(
|
|
ContentMemory.engagement_score.isnot(None)
|
|
).all()
|
|
|
|
if not all_scores:
|
|
return 0
|
|
|
|
scores = sorted([s[0] for s in all_scores], reverse=True)
|
|
|
|
# Calcular umbral
|
|
threshold_idx = max(0, int(len(scores) * top_percentile / 100) - 1)
|
|
threshold_score = scores[threshold_idx]
|
|
|
|
# Marcar nuevos top performers
|
|
new_tops = db.query(ContentMemory).filter(
|
|
ContentMemory.engagement_score >= threshold_score,
|
|
ContentMemory.is_top_performer == False
|
|
).update({"is_top_performer": True})
|
|
|
|
# Desmarcar los que ya no califican
|
|
db.query(ContentMemory).filter(
|
|
ContentMemory.engagement_score < threshold_score,
|
|
ContentMemory.is_top_performer == True
|
|
).update({"is_top_performer": False})
|
|
|
|
db.commit()
|
|
|
|
return new_tops
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.content_memory.sync_post_metrics")
|
|
def sync_post_metrics(post_id: int):
|
|
"""
|
|
Sincronizar métricas de un post específico a ContentMemory.
|
|
|
|
Se llama después de obtener nuevas métricas.
|
|
|
|
Args:
|
|
post_id: ID del post
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post or not post.metrics:
|
|
return {"status": "skipped", "reason": "no_metrics"}
|
|
|
|
memory = db.query(ContentMemory).filter(
|
|
ContentMemory.post_id == post_id
|
|
).first()
|
|
|
|
if not memory:
|
|
return {"status": "skipped", "reason": "no_memory"}
|
|
|
|
memory.update_engagement(post.metrics)
|
|
db.commit()
|
|
|
|
return {
|
|
"status": "updated",
|
|
"post_id": post_id,
|
|
"engagement_score": memory.engagement_score
|
|
}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.content_memory.analyze_and_save_content")
|
|
def analyze_and_save_content(
|
|
post_id: int,
|
|
content: str,
|
|
content_type: str,
|
|
platform: str,
|
|
quality_score: Optional[int] = None,
|
|
quality_breakdown: Optional[dict] = None,
|
|
template_used: Optional[str] = None,
|
|
personality_used: Optional[str] = None
|
|
):
|
|
"""
|
|
Analizar contenido generado y guardarlo en memoria.
|
|
|
|
Se llama después de generar un nuevo post.
|
|
|
|
Args:
|
|
post_id: ID del post creado
|
|
content: Contenido generado
|
|
content_type: Tipo de contenido
|
|
platform: Plataforma principal
|
|
quality_score: Score de calidad (si se evaluó)
|
|
quality_breakdown: Breakdown del score
|
|
template_used: Template usado
|
|
personality_used: Personalidad usada
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Verificar que no exista ya
|
|
existing = db.query(ContentMemory).filter(
|
|
ContentMemory.post_id == post_id
|
|
).first()
|
|
|
|
if existing:
|
|
return {"status": "skipped", "reason": "already_exists"}
|
|
|
|
# Importar context engine para análisis
|
|
from app.services.ai import context_engine
|
|
|
|
# Guardar en memoria
|
|
memory = context_engine.save_to_memory(
|
|
db=db,
|
|
post_id=post_id,
|
|
content=content,
|
|
content_type=content_type,
|
|
platform=platform,
|
|
quality_score=quality_score,
|
|
quality_breakdown=quality_breakdown,
|
|
template_used=template_used,
|
|
personality_used=personality_used
|
|
)
|
|
|
|
return {
|
|
"status": "created",
|
|
"memory_id": memory.id,
|
|
"topics": memory.topics,
|
|
"hook_type": memory.hook_type
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error guardando en memoria: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.content_memory.cleanup_old_memory")
|
|
def cleanup_old_memory(days_to_keep: int = 180):
|
|
"""
|
|
Limpiar registros de memoria antiguos.
|
|
|
|
Mantiene top performers indefinidamente.
|
|
|
|
Args:
|
|
days_to_keep: Días de registros a mantener (excepto top performers)
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
|
|
|
|
# Eliminar registros viejos que NO son top performers
|
|
deleted = db.query(ContentMemory).filter(
|
|
ContentMemory.created_at < cutoff_date,
|
|
ContentMemory.is_top_performer == False
|
|
).delete()
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Limpieza de memoria: {deleted} registros eliminados")
|
|
|
|
return {"deleted": deleted}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.content_memory.refresh_best_posts_yaml")
|
|
def refresh_best_posts_yaml():
|
|
"""
|
|
Actualizar el archivo best_posts.yaml con top performers reales.
|
|
|
|
Se ejecuta semanalmente para mantener ejemplos actualizados.
|
|
"""
|
|
import yaml
|
|
from pathlib import Path
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Obtener top performers por tipo de contenido
|
|
content_types = ["tip_tech", "producto", "servicio"]
|
|
examples = {}
|
|
|
|
for content_type in content_types:
|
|
top = db.query(ContentMemory).filter(
|
|
ContentMemory.content_type == content_type,
|
|
ContentMemory.is_top_performer == True
|
|
).order_by(
|
|
ContentMemory.engagement_score.desc()
|
|
).limit(5).all()
|
|
|
|
if top:
|
|
examples[content_type] = []
|
|
for mem in top:
|
|
post = db.query(Post).filter(Post.id == mem.post_id).first()
|
|
if post:
|
|
examples[content_type].append({
|
|
"content": post.content,
|
|
"platform": mem.platform,
|
|
"engagement_score": mem.engagement_score,
|
|
"metrics": post.metrics,
|
|
"analysis": {
|
|
"hook_type": mem.hook_type,
|
|
"topics": mem.topics
|
|
}
|
|
})
|
|
|
|
if not examples:
|
|
return {"status": "skipped", "reason": "no_top_performers"}
|
|
|
|
# Actualizar archivo YAML
|
|
yaml_path = Path(__file__).parent.parent.parent / "app" / "prompts" / "examples" / "best_posts.yaml"
|
|
|
|
# Cargar archivo existente
|
|
with open(yaml_path, "r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
# Actualizar ejemplos
|
|
data["examples"] = examples
|
|
data["metadata"]["last_updated"] = datetime.utcnow().isoformat()
|
|
data["metadata"]["auto_update"] = True
|
|
|
|
# Guardar
|
|
with open(yaml_path, "w", encoding="utf-8") as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)
|
|
|
|
logger.info(f"best_posts.yaml actualizado con {sum(len(v) for v in examples.values())} ejemplos")
|
|
|
|
return {
|
|
"status": "updated",
|
|
"examples_by_type": {k: len(v) for k, v in examples.items()}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error actualizando best_posts.yaml: {e}")
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|