Files
social-media-automation/worker/tasks/generate_content.py
Consultoría AS 11b0ba46fa feat: Add Content Generation Engine v2 with quality scoring
Major improvements to AI content generation:

## New Components (app/services/ai/)
- PromptLibrary: YAML-based prompt templates with inheritance
- ContextEngine: Anti-repetition and best performers tracking
- ContentGeneratorV2: Enhanced generation with dynamic parameters
- PlatformAdapter: Platform-specific content adaptation
- ContentValidator: AI-powered quality scoring (0-100)

## Prompt Library (app/prompts/)
- 3 personalities: default, educational, promotional
- 5 templates: tip_tech, product_post, service_post, thread, response
- 4 platform configs: x, threads, instagram, facebook
- Few-shot examples by category: ia, productividad, seguridad

## Database Changes
- New table: content_memory (tracks generated content)
- New columns in posts: quality_score, score_breakdown, generation_attempts

## New API Endpoints (/api/v2/generate/)
- POST /generate - Generation with quality check
- POST /generate/batch - Batch generation
- POST /quality/evaluate - Evaluate content quality
- GET /templates, /personalities, /platforms - List configs

## Celery Tasks
- update_engagement_scores (every 6h)
- cleanup_old_memory (monthly)
- refresh_best_posts_yaml (weekly)

## Tests
- Comprehensive tests for all AI engine components

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 20:55:28 +00:00

623 lines
20 KiB
Python

"""
Tareas de generación de contenido.
Usa el nuevo Content Generation Engine v2 con:
- Quality scoring
- Anti-repetición via Context Engine
- Almacenamiento en Content Memory
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from worker.celery_app import celery_app
from app.core.database import SessionLocal
from app.models.post import Post
from app.models.content_calendar import ContentCalendar
from app.models.tip_template import TipTemplate
from app.models.product import Product
from app.models.service import Service
from app.services.content_generator import content_generator
logger = logging.getLogger(__name__)
def run_async(coro):
"""Helper para ejecutar coroutines en Celery."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
def _save_to_memory(
post_id: int,
content: str,
content_type: str,
platform: str,
quality_score: Optional[int] = None,
quality_breakdown: Optional[Dict] = None,
template_used: Optional[str] = None
):
"""
Encolar guardado en memoria como tarea async.
Evita bloquear la generación principal.
"""
from worker.tasks.content_memory import analyze_and_save_content
analyze_and_save_content.delay(
post_id=post_id,
content=content,
content_type=content_type,
platform=platform,
quality_score=quality_score,
quality_breakdown=quality_breakdown,
template_used=template_used
)
@celery_app.task(name="worker.tasks.generate_content.generate_scheduled_content")
def generate_scheduled_content():
"""
Generar contenido según el calendario.
Se ejecuta cada hora.
"""
db = SessionLocal()
try:
now = datetime.now()
current_day = now.weekday()
current_hour = now.hour
# Obtener entradas del calendario para esta hora
entries = db.query(ContentCalendar).filter(
ContentCalendar.day_of_week == current_day,
ContentCalendar.is_active == True
).all()
generated = 0
for entry in entries:
# Verificar si es la hora correcta
if entry.time.hour != current_hour:
continue
# Generar contenido según el tipo
if entry.content_type == "tip_tech":
generate_tip_post.delay(
platforms=entry.platforms,
category_filter=entry.category_filter,
requires_approval=entry.requires_approval
)
generated += 1
elif entry.content_type == "producto":
generate_product_post.delay(
platforms=entry.platforms,
requires_approval=entry.requires_approval
)
generated += 1
elif entry.content_type == "servicio":
generate_service_post.delay(
platforms=entry.platforms,
requires_approval=entry.requires_approval
)
generated += 1
return f"Procesadas {len(entries)} entradas, {generated} generaciones iniciadas"
finally:
db.close()
@celery_app.task(name="worker.tasks.generate_content.generate_tip_post")
def generate_tip_post(
platforms: list,
category_filter: str = None,
requires_approval: bool = False,
use_quality_check: bool = True
):
"""
Generar un post de tip tech con quality scoring.
Args:
platforms: Lista de plataformas destino
category_filter: Categoría específica (opcional)
requires_approval: Si requiere aprobación manual
use_quality_check: Si usar validación de calidad
"""
db = SessionLocal()
try:
# Seleccionar un tip que no se haya usado recientemente
query = db.query(TipTemplate).filter(
TipTemplate.is_active == True
)
if category_filter:
query = query.filter(TipTemplate.category == category_filter)
# Ordenar por uso (menos usado primero)
tip = query.order_by(
TipTemplate.used_count.asc(),
TipTemplate.last_used.asc().nullsfirst()
).first()
if not tip:
return {"status": "skipped", "reason": "no_tips_available"}
# Generar contenido para cada plataforma
content_by_platform = {}
quality_info = {}
for platform in platforms:
if use_quality_check:
# Usar generación con validación de calidad
result = run_async(
content_generator.generate_with_quality_check(
template_name="tip_tech",
variables={
"category": tip.category,
"difficulty_level": "principiante",
"target_audience": "profesionales tech"
},
platform=platform,
db=db,
max_attempts=2
)
)
content_by_platform[platform] = result["content"]
quality_info[platform] = {
"score": result.get("quality_score"),
"breakdown": result.get("score_breakdown"),
"attempts": result.get("attempts", 1)
}
else:
# Generación simple (fallback)
content = run_async(
content_generator.generate_tip_tech(
category=tip.category,
platform=platform,
template=tip.template,
db=db
)
)
content_by_platform[platform] = content
# Obtener mejor score entre plataformas
best_score = None
best_breakdown = None
total_attempts = 1
if quality_info:
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
if scores:
best_score = max(scores)
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
if breakdowns:
best_breakdown = breakdowns[0]
attempts = [q.get("attempts", 1) for q in quality_info.values()]
if attempts:
total_attempts = max(attempts)
# Crear post
main_platform = platforms[0]
post = Post(
content=content_by_platform.get(main_platform, ""),
content_type="tip_tech",
platforms=platforms,
content_x=content_by_platform.get("x"),
content_threads=content_by_platform.get("threads"),
content_instagram=content_by_platform.get("instagram"),
content_facebook=content_by_platform.get("facebook"),
status="pending_approval" if requires_approval else "scheduled",
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
approval_required=requires_approval,
tip_template_id=tip.id,
quality_score=best_score,
score_breakdown=best_breakdown,
generation_attempts=total_attempts
)
db.add(post)
# Actualizar uso del tip
tip.used_count += 1
tip.last_used = datetime.utcnow()
db.commit()
# Guardar en memoria (async)
_save_to_memory(
post_id=post.id,
content=post.content,
content_type="tip_tech",
platform=main_platform,
quality_score=best_score,
quality_breakdown=best_breakdown,
template_used="tip_tech"
)
logger.info(
f"Tip generado: post_id={post.id}, score={best_score}, "
f"attempts={total_attempts}, category={tip.category}"
)
return {
"status": "success",
"post_id": post.id,
"quality_score": best_score,
"attempts": total_attempts
}
except Exception as e:
logger.error(f"Error generando tip: {e}")
db.rollback()
raise
finally:
db.close()
@celery_app.task(name="worker.tasks.generate_content.generate_product_post")
def generate_product_post(
platforms: list,
product_id: int = None,
requires_approval: bool = True,
use_quality_check: bool = True
):
"""
Generar un post de producto con quality scoring.
Args:
platforms: Lista de plataformas destino
product_id: ID del producto específico (opcional)
requires_approval: Si requiere aprobación manual
use_quality_check: Si usar validación de calidad
"""
db = SessionLocal()
try:
# Seleccionar producto
if product_id:
product = db.query(Product).filter(Product.id == product_id).first()
else:
# Seleccionar uno aleatorio que no se haya publicado recientemente
product = db.query(Product).filter(
Product.is_available == True
).order_by(
Product.last_posted_at.asc().nullsfirst()
).first()
if not product:
return {"status": "skipped", "reason": "no_products_available"}
# Generar contenido
content_by_platform = {}
quality_info = {}
for platform in platforms:
if use_quality_check:
result = run_async(
content_generator.generate_with_quality_check(
template_name="product_post",
variables={
"product_name": product.name,
"product_description": product.description or "",
"price": product.price,
"category": product.category,
"specs": product.specs or {},
"highlights": product.highlights or []
},
platform=platform,
db=db,
max_attempts=2
)
)
content_by_platform[platform] = result["content"]
quality_info[platform] = {
"score": result.get("quality_score"),
"breakdown": result.get("score_breakdown"),
"attempts": result.get("attempts", 1)
}
else:
content = run_async(
content_generator.generate_product_post(
product=product.to_dict(),
platform=platform,
db=db
)
)
content_by_platform[platform] = content
# Obtener mejor score
best_score = None
best_breakdown = None
total_attempts = 1
if quality_info:
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
if scores:
best_score = max(scores)
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
if breakdowns:
best_breakdown = breakdowns[0]
attempts = [q.get("attempts", 1) for q in quality_info.values()]
if attempts:
total_attempts = max(attempts)
# Crear post
main_platform = platforms[0]
post = Post(
content=content_by_platform.get(main_platform, ""),
content_type="producto",
platforms=platforms,
content_x=content_by_platform.get("x"),
content_threads=content_by_platform.get("threads"),
content_instagram=content_by_platform.get("instagram"),
content_facebook=content_by_platform.get("facebook"),
status="pending_approval" if requires_approval else "scheduled",
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
approval_required=requires_approval,
product_id=product.id,
image_url=product.main_image,
quality_score=best_score,
score_breakdown=best_breakdown,
generation_attempts=total_attempts
)
db.add(post)
# Actualizar última publicación del producto
product.last_posted_at = datetime.utcnow()
db.commit()
# Guardar en memoria
_save_to_memory(
post_id=post.id,
content=post.content,
content_type="producto",
platform=main_platform,
quality_score=best_score,
quality_breakdown=best_breakdown,
template_used="product_post"
)
logger.info(
f"Producto generado: post_id={post.id}, product={product.name}, "
f"score={best_score}"
)
return {
"status": "success",
"post_id": post.id,
"product_id": product.id,
"quality_score": best_score
}
except Exception as e:
logger.error(f"Error generando producto: {e}")
db.rollback()
raise
finally:
db.close()
@celery_app.task(name="worker.tasks.generate_content.generate_service_post")
def generate_service_post(
platforms: list,
service_id: int = None,
requires_approval: bool = True,
use_quality_check: bool = True
):
"""
Generar un post de servicio con quality scoring.
Args:
platforms: Lista de plataformas destino
service_id: ID del servicio específico (opcional)
requires_approval: Si requiere aprobación manual
use_quality_check: Si usar validación de calidad
"""
db = SessionLocal()
try:
# Seleccionar servicio
if service_id:
service = db.query(Service).filter(Service.id == service_id).first()
else:
service = db.query(Service).filter(
Service.is_active == True
).order_by(
Service.last_posted_at.asc().nullsfirst()
).first()
if not service:
return {"status": "skipped", "reason": "no_services_available"}
# Generar contenido
content_by_platform = {}
quality_info = {}
for platform in platforms:
if use_quality_check:
result = run_async(
content_generator.generate_with_quality_check(
template_name="service_post",
variables={
"service_name": service.name,
"service_description": service.description or "",
"category": service.category,
"target_sectors": service.target_sectors or [],
"benefits": service.benefits or [],
"call_to_action": service.call_to_action or "Contáctanos"
},
platform=platform,
db=db,
max_attempts=2
)
)
content_by_platform[platform] = result["content"]
quality_info[platform] = {
"score": result.get("quality_score"),
"breakdown": result.get("score_breakdown"),
"attempts": result.get("attempts", 1)
}
else:
content = run_async(
content_generator.generate_service_post(
service=service.to_dict(),
platform=platform,
db=db
)
)
content_by_platform[platform] = content
# Obtener mejor score
best_score = None
best_breakdown = None
total_attempts = 1
if quality_info:
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
if scores:
best_score = max(scores)
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
if breakdowns:
best_breakdown = breakdowns[0]
attempts = [q.get("attempts", 1) for q in quality_info.values()]
if attempts:
total_attempts = max(attempts)
# Crear post
main_platform = platforms[0]
post = Post(
content=content_by_platform.get(main_platform, ""),
content_type="servicio",
platforms=platforms,
content_x=content_by_platform.get("x"),
content_threads=content_by_platform.get("threads"),
content_instagram=content_by_platform.get("instagram"),
content_facebook=content_by_platform.get("facebook"),
status="pending_approval" if requires_approval else "scheduled",
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
approval_required=requires_approval,
service_id=service.id,
image_url=service.main_image,
quality_score=best_score,
score_breakdown=best_breakdown,
generation_attempts=total_attempts
)
db.add(post)
# Actualizar última publicación del servicio
service.last_posted_at = datetime.utcnow()
db.commit()
# Guardar en memoria
_save_to_memory(
post_id=post.id,
content=post.content,
content_type="servicio",
platform=main_platform,
quality_score=best_score,
quality_breakdown=best_breakdown,
template_used="service_post"
)
logger.info(
f"Servicio generado: post_id={post.id}, service={service.name}, "
f"score={best_score}"
)
return {
"status": "success",
"post_id": post.id,
"service_id": service.id,
"quality_score": best_score
}
except Exception as e:
logger.error(f"Error generando servicio: {e}")
db.rollback()
raise
finally:
db.close()
@celery_app.task(name="worker.tasks.generate_content.generate_thread")
def generate_thread(
topic: str,
num_posts: int = 5,
requires_approval: bool = True
):
"""
Generar un hilo educativo.
Args:
topic: Tema del hilo
num_posts: Número de posts
requires_approval: Si requiere aprobación
"""
db = SessionLocal()
try:
# Generar hilo
posts = run_async(
content_generator.generate_thread(
topic=topic,
num_posts=num_posts,
db=db
)
)
if not posts:
return {"status": "error", "reason": "no_posts_generated"}
# Crear posts individuales (vinculados)
created_posts = []
for i, content in enumerate(posts):
post = Post(
content=content,
content_type="hilo_educativo",
platforms=["x"], # Hilos principalmente para X
content_x=content,
status="pending_approval" if requires_approval else "scheduled",
scheduled_at=datetime.utcnow() + timedelta(minutes=5 + i), # Escalonado
approval_required=requires_approval,
)
db.add(post)
created_posts.append(post)
db.commit()
# Guardar primer post en memoria (representa el hilo)
if created_posts:
_save_to_memory(
post_id=created_posts[0].id,
content="\n\n".join(posts),
content_type="hilo_educativo",
platform="x",
template_used="thread"
)
logger.info(f"Hilo generado: {len(created_posts)} posts sobre '{topic}'")
return {
"status": "success",
"post_ids": [p.id for p in created_posts],
"topic": topic
}
except Exception as e:
logger.error(f"Error generando hilo: {e}")
db.rollback()
raise
finally:
db.close()