- Add generate_threads_manual_posts() task that creates 2 posts daily - Posts are saved with status 'draft' for manual copy/paste to Threads - Uses tip_tech template with rotating categories (productividad, ia, etc.) - Scheduled to run daily at 7:00 AM (Tijuana timezone) - Posts appear in dashboard for easy access This is a workaround while waiting for Threads API approval. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
701 lines
22 KiB
Python
701 lines
22 KiB
Python
"""
|
|
Tareas de generación de contenido.
|
|
|
|
Usa el nuevo Content Generation Engine v2 con:
|
|
- Quality scoring
|
|
- Anti-repetición via Context Engine
|
|
- Almacenamiento en Content Memory
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
|
|
from worker.celery_app import celery_app
|
|
from app.core.database import SessionLocal
|
|
from app.models.post import Post
|
|
from app.models.content_calendar import ContentCalendar
|
|
from app.models.tip_template import TipTemplate
|
|
from app.models.product import Product
|
|
from app.models.service import Service
|
|
from app.services.content_generator import content_generator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_async(coro):
|
|
"""Helper para ejecutar coroutines en Celery."""
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
def _save_to_memory(
|
|
post_id: int,
|
|
content: str,
|
|
content_type: str,
|
|
platform: str,
|
|
quality_score: Optional[int] = None,
|
|
quality_breakdown: Optional[Dict] = None,
|
|
template_used: Optional[str] = None
|
|
):
|
|
"""
|
|
Encolar guardado en memoria como tarea async.
|
|
Evita bloquear la generación principal.
|
|
"""
|
|
from worker.tasks.content_memory import analyze_and_save_content
|
|
analyze_and_save_content.delay(
|
|
post_id=post_id,
|
|
content=content,
|
|
content_type=content_type,
|
|
platform=platform,
|
|
quality_score=quality_score,
|
|
quality_breakdown=quality_breakdown,
|
|
template_used=template_used
|
|
)
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_scheduled_content")
|
|
def generate_scheduled_content():
|
|
"""
|
|
Generar contenido según el calendario.
|
|
Se ejecuta cada hora.
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
now = datetime.now()
|
|
current_day = now.weekday()
|
|
current_hour = now.hour
|
|
|
|
# Obtener entradas del calendario para esta hora
|
|
entries = db.query(ContentCalendar).filter(
|
|
ContentCalendar.day_of_week == current_day,
|
|
ContentCalendar.is_active == True
|
|
).all()
|
|
|
|
generated = 0
|
|
for entry in entries:
|
|
# Verificar si es la hora correcta
|
|
if entry.time.hour != current_hour:
|
|
continue
|
|
|
|
# Generar contenido según el tipo
|
|
if entry.content_type == "tip_tech":
|
|
generate_tip_post.delay(
|
|
platforms=entry.platforms,
|
|
category_filter=entry.category_filter,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
generated += 1
|
|
|
|
elif entry.content_type == "producto":
|
|
generate_product_post.delay(
|
|
platforms=entry.platforms,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
generated += 1
|
|
|
|
elif entry.content_type == "servicio":
|
|
generate_service_post.delay(
|
|
platforms=entry.platforms,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
generated += 1
|
|
|
|
return f"Procesadas {len(entries)} entradas, {generated} generaciones iniciadas"
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_tip_post")
|
|
def generate_tip_post(
|
|
platforms: list,
|
|
category_filter: str = None,
|
|
requires_approval: bool = False,
|
|
use_quality_check: bool = True
|
|
):
|
|
"""
|
|
Generar un post de tip tech con quality scoring.
|
|
|
|
Args:
|
|
platforms: Lista de plataformas destino
|
|
category_filter: Categoría específica (opcional)
|
|
requires_approval: Si requiere aprobación manual
|
|
use_quality_check: Si usar validación de calidad
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar un tip que no se haya usado recientemente
|
|
query = db.query(TipTemplate).filter(
|
|
TipTemplate.is_active == True
|
|
)
|
|
|
|
if category_filter:
|
|
query = query.filter(TipTemplate.category == category_filter)
|
|
|
|
# Ordenar por uso (menos usado primero)
|
|
tip = query.order_by(
|
|
TipTemplate.used_count.asc(),
|
|
TipTemplate.last_used.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not tip:
|
|
return {"status": "skipped", "reason": "no_tips_available"}
|
|
|
|
# Generar contenido para cada plataforma
|
|
content_by_platform = {}
|
|
quality_info = {}
|
|
|
|
for platform in platforms:
|
|
if use_quality_check:
|
|
# Usar generación con validación de calidad
|
|
result = run_async(
|
|
content_generator.generate_with_quality_check(
|
|
template_name="tip_tech",
|
|
variables={
|
|
"category": tip.category,
|
|
"difficulty_level": "principiante",
|
|
"target_audience": "profesionales tech"
|
|
},
|
|
platform=platform,
|
|
db=db,
|
|
max_attempts=2
|
|
)
|
|
)
|
|
content_by_platform[platform] = result["content"]
|
|
quality_info[platform] = {
|
|
"score": result.get("quality_score"),
|
|
"breakdown": result.get("score_breakdown"),
|
|
"attempts": result.get("attempts", 1)
|
|
}
|
|
else:
|
|
# Generación simple (fallback)
|
|
content = run_async(
|
|
content_generator.generate_tip_tech(
|
|
category=tip.category,
|
|
platform=platform,
|
|
template=tip.template,
|
|
db=db
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Obtener mejor score entre plataformas
|
|
best_score = None
|
|
best_breakdown = None
|
|
total_attempts = 1
|
|
|
|
if quality_info:
|
|
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
|
|
if scores:
|
|
best_score = max(scores)
|
|
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
|
|
if breakdowns:
|
|
best_breakdown = breakdowns[0]
|
|
attempts = [q.get("attempts", 1) for q in quality_info.values()]
|
|
if attempts:
|
|
total_attempts = max(attempts)
|
|
|
|
# Crear post
|
|
main_platform = platforms[0]
|
|
post = Post(
|
|
content=content_by_platform.get(main_platform, ""),
|
|
content_type="tip_tech",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
tip_template_id=tip.id,
|
|
quality_score=best_score,
|
|
score_breakdown=best_breakdown,
|
|
generation_attempts=total_attempts
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar uso del tip
|
|
tip.used_count += 1
|
|
tip.last_used = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
# Guardar en memoria (async)
|
|
_save_to_memory(
|
|
post_id=post.id,
|
|
content=post.content,
|
|
content_type="tip_tech",
|
|
platform=main_platform,
|
|
quality_score=best_score,
|
|
quality_breakdown=best_breakdown,
|
|
template_used="tip_tech"
|
|
)
|
|
|
|
logger.info(
|
|
f"Tip generado: post_id={post.id}, score={best_score}, "
|
|
f"attempts={total_attempts}, category={tip.category}"
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"post_id": post.id,
|
|
"quality_score": best_score,
|
|
"attempts": total_attempts
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generando tip: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_product_post")
|
|
def generate_product_post(
|
|
platforms: list,
|
|
product_id: int = None,
|
|
requires_approval: bool = True,
|
|
use_quality_check: bool = True
|
|
):
|
|
"""
|
|
Generar un post de producto con quality scoring.
|
|
|
|
Args:
|
|
platforms: Lista de plataformas destino
|
|
product_id: ID del producto específico (opcional)
|
|
requires_approval: Si requiere aprobación manual
|
|
use_quality_check: Si usar validación de calidad
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar producto
|
|
if product_id:
|
|
product = db.query(Product).filter(Product.id == product_id).first()
|
|
else:
|
|
# Seleccionar uno aleatorio que no se haya publicado recientemente
|
|
product = db.query(Product).filter(
|
|
Product.is_available == True
|
|
).order_by(
|
|
Product.last_posted_at.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not product:
|
|
return {"status": "skipped", "reason": "no_products_available"}
|
|
|
|
# Generar contenido
|
|
content_by_platform = {}
|
|
quality_info = {}
|
|
|
|
for platform in platforms:
|
|
if use_quality_check:
|
|
result = run_async(
|
|
content_generator.generate_with_quality_check(
|
|
template_name="product_post",
|
|
variables={
|
|
"product_name": product.name,
|
|
"product_description": product.description or "",
|
|
"price": product.price,
|
|
"category": product.category,
|
|
"specs": product.specs or {},
|
|
"highlights": product.highlights or []
|
|
},
|
|
platform=platform,
|
|
db=db,
|
|
max_attempts=2
|
|
)
|
|
)
|
|
content_by_platform[platform] = result["content"]
|
|
quality_info[platform] = {
|
|
"score": result.get("quality_score"),
|
|
"breakdown": result.get("score_breakdown"),
|
|
"attempts": result.get("attempts", 1)
|
|
}
|
|
else:
|
|
content = run_async(
|
|
content_generator.generate_product_post(
|
|
product=product.to_dict(),
|
|
platform=platform,
|
|
db=db
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Obtener mejor score
|
|
best_score = None
|
|
best_breakdown = None
|
|
total_attempts = 1
|
|
|
|
if quality_info:
|
|
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
|
|
if scores:
|
|
best_score = max(scores)
|
|
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
|
|
if breakdowns:
|
|
best_breakdown = breakdowns[0]
|
|
attempts = [q.get("attempts", 1) for q in quality_info.values()]
|
|
if attempts:
|
|
total_attempts = max(attempts)
|
|
|
|
# Crear post
|
|
main_platform = platforms[0]
|
|
post = Post(
|
|
content=content_by_platform.get(main_platform, ""),
|
|
content_type="producto",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
product_id=product.id,
|
|
image_url=product.main_image,
|
|
quality_score=best_score,
|
|
score_breakdown=best_breakdown,
|
|
generation_attempts=total_attempts
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar última publicación del producto
|
|
product.last_posted_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
# Guardar en memoria
|
|
_save_to_memory(
|
|
post_id=post.id,
|
|
content=post.content,
|
|
content_type="producto",
|
|
platform=main_platform,
|
|
quality_score=best_score,
|
|
quality_breakdown=best_breakdown,
|
|
template_used="product_post"
|
|
)
|
|
|
|
logger.info(
|
|
f"Producto generado: post_id={post.id}, product={product.name}, "
|
|
f"score={best_score}"
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"post_id": post.id,
|
|
"product_id": product.id,
|
|
"quality_score": best_score
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generando producto: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_service_post")
|
|
def generate_service_post(
|
|
platforms: list,
|
|
service_id: int = None,
|
|
requires_approval: bool = True,
|
|
use_quality_check: bool = True
|
|
):
|
|
"""
|
|
Generar un post de servicio con quality scoring.
|
|
|
|
Args:
|
|
platforms: Lista de plataformas destino
|
|
service_id: ID del servicio específico (opcional)
|
|
requires_approval: Si requiere aprobación manual
|
|
use_quality_check: Si usar validación de calidad
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar servicio
|
|
if service_id:
|
|
service = db.query(Service).filter(Service.id == service_id).first()
|
|
else:
|
|
service = db.query(Service).filter(
|
|
Service.is_active == True
|
|
).order_by(
|
|
Service.last_posted_at.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not service:
|
|
return {"status": "skipped", "reason": "no_services_available"}
|
|
|
|
# Generar contenido
|
|
content_by_platform = {}
|
|
quality_info = {}
|
|
|
|
for platform in platforms:
|
|
if use_quality_check:
|
|
result = run_async(
|
|
content_generator.generate_with_quality_check(
|
|
template_name="service_post",
|
|
variables={
|
|
"service_name": service.name,
|
|
"service_description": service.description or "",
|
|
"category": service.category,
|
|
"target_sectors": service.target_sectors or [],
|
|
"benefits": service.benefits or [],
|
|
"call_to_action": service.call_to_action or "Contáctanos"
|
|
},
|
|
platform=platform,
|
|
db=db,
|
|
max_attempts=2
|
|
)
|
|
)
|
|
content_by_platform[platform] = result["content"]
|
|
quality_info[platform] = {
|
|
"score": result.get("quality_score"),
|
|
"breakdown": result.get("score_breakdown"),
|
|
"attempts": result.get("attempts", 1)
|
|
}
|
|
else:
|
|
content = run_async(
|
|
content_generator.generate_service_post(
|
|
service=service.to_dict(),
|
|
platform=platform,
|
|
db=db
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Obtener mejor score
|
|
best_score = None
|
|
best_breakdown = None
|
|
total_attempts = 1
|
|
|
|
if quality_info:
|
|
scores = [q.get("score") for q in quality_info.values() if q.get("score")]
|
|
if scores:
|
|
best_score = max(scores)
|
|
breakdowns = [q.get("breakdown") for q in quality_info.values() if q.get("breakdown")]
|
|
if breakdowns:
|
|
best_breakdown = breakdowns[0]
|
|
attempts = [q.get("attempts", 1) for q in quality_info.values()]
|
|
if attempts:
|
|
total_attempts = max(attempts)
|
|
|
|
# Crear post
|
|
main_platform = platforms[0]
|
|
post = Post(
|
|
content=content_by_platform.get(main_platform, ""),
|
|
content_type="servicio",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
service_id=service.id,
|
|
image_url=service.main_image,
|
|
quality_score=best_score,
|
|
score_breakdown=best_breakdown,
|
|
generation_attempts=total_attempts
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar última publicación del servicio
|
|
service.last_posted_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
# Guardar en memoria
|
|
_save_to_memory(
|
|
post_id=post.id,
|
|
content=post.content,
|
|
content_type="servicio",
|
|
platform=main_platform,
|
|
quality_score=best_score,
|
|
quality_breakdown=best_breakdown,
|
|
template_used="service_post"
|
|
)
|
|
|
|
logger.info(
|
|
f"Servicio generado: post_id={post.id}, service={service.name}, "
|
|
f"score={best_score}"
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"post_id": post.id,
|
|
"service_id": service.id,
|
|
"quality_score": best_score
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generando servicio: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_threads_manual_posts")
|
|
def generate_threads_manual_posts(count: int = 2):
|
|
"""
|
|
Generar posts para Threads que se publicarán manualmente.
|
|
|
|
Genera tips de tecnología variados optimizados para Threads
|
|
y los guarda con status 'draft' para copiar y publicar manualmente.
|
|
|
|
Args:
|
|
count: Número de posts a generar (default 2)
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
created = []
|
|
# Categorías para variar el contenido
|
|
categories = ["productividad", "ia", "seguridad", "automatización", "python", "negocios"]
|
|
|
|
for i in range(count):
|
|
# Seleccionar categoría diferente para cada post
|
|
category = categories[i % len(categories)]
|
|
|
|
# Generar contenido con IA usando el template tip_tech
|
|
result = run_async(
|
|
content_generator.generate_with_quality_check(
|
|
template_name="tip_tech",
|
|
variables={
|
|
"category": category,
|
|
"difficulty_level": "principiante",
|
|
"target_audience": "profesionales y emprendedores"
|
|
},
|
|
platform="threads",
|
|
db=db,
|
|
max_attempts=2
|
|
)
|
|
)
|
|
|
|
content = result.get("content", "")
|
|
|
|
if not content:
|
|
logger.warning(f"No se pudo generar contenido para Threads (categoria: {category})")
|
|
continue
|
|
|
|
# Crear post con status draft (para publicación manual)
|
|
post = Post(
|
|
content=content,
|
|
content_threads=content,
|
|
content_type="tip_tech",
|
|
platforms=["threads"],
|
|
status="draft", # Para publicación manual
|
|
approval_required=False,
|
|
quality_score=result.get("quality_score"),
|
|
score_breakdown=result.get("score_breakdown"),
|
|
generation_attempts=result.get("attempts", 1)
|
|
)
|
|
|
|
db.add(post)
|
|
created.append(category)
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Generados {len(created)} posts para Threads (manual): categorías={created}")
|
|
|
|
return {
|
|
"status": "success",
|
|
"generated": len(created),
|
|
"categories": created
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generando posts para Threads: {e}")
|
|
db.rollback()
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_thread")
|
|
def generate_thread(
|
|
topic: str,
|
|
num_posts: int = 5,
|
|
requires_approval: bool = True
|
|
):
|
|
"""
|
|
Generar un hilo educativo.
|
|
|
|
Args:
|
|
topic: Tema del hilo
|
|
num_posts: Número de posts
|
|
requires_approval: Si requiere aprobación
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Generar hilo
|
|
posts = run_async(
|
|
content_generator.generate_thread(
|
|
topic=topic,
|
|
num_posts=num_posts,
|
|
db=db
|
|
)
|
|
)
|
|
|
|
if not posts:
|
|
return {"status": "error", "reason": "no_posts_generated"}
|
|
|
|
# Crear posts individuales (vinculados)
|
|
created_posts = []
|
|
for i, content in enumerate(posts):
|
|
post = Post(
|
|
content=content,
|
|
content_type="hilo_educativo",
|
|
platforms=["x"], # Hilos principalmente para X
|
|
content_x=content,
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5 + i), # Escalonado
|
|
approval_required=requires_approval,
|
|
)
|
|
db.add(post)
|
|
created_posts.append(post)
|
|
|
|
db.commit()
|
|
|
|
# Guardar primer post en memoria (representa el hilo)
|
|
if created_posts:
|
|
_save_to_memory(
|
|
post_id=created_posts[0].id,
|
|
content="\n\n".join(posts),
|
|
content_type="hilo_educativo",
|
|
platform="x",
|
|
template_used="thread"
|
|
)
|
|
|
|
logger.info(f"Hilo generado: {len(created_posts)} posts sobre '{topic}'")
|
|
|
|
return {
|
|
"status": "success",
|
|
"post_ids": [p.id for p in created_posts],
|
|
"topic": topic
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generando hilo: {e}")
|
|
db.rollback()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|