- Estructura completa del proyecto con FastAPI - Modelos de base de datos (productos, servicios, posts, calendario, interacciones) - Publishers para X, Threads, Instagram, Facebook - Generador de contenido con DeepSeek API - Worker de Celery con tareas programadas - Dashboard básico con templates HTML - Docker Compose para despliegue - Documentación completa Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
267 lines
8.2 KiB
Python
267 lines
8.2 KiB
Python
"""
|
|
Tareas de generación de contenido.
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
|
|
from worker.celery_app import celery_app
|
|
from app.core.database import SessionLocal
|
|
from app.models.post import Post
|
|
from app.models.content_calendar import ContentCalendar
|
|
from app.models.tip_template import TipTemplate
|
|
from app.models.product import Product
|
|
from app.models.service import Service
|
|
from app.services.content_generator import content_generator
|
|
|
|
|
|
def run_async(coro):
|
|
"""Helper para ejecutar coroutines en Celery."""
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_scheduled_content")
|
|
def generate_scheduled_content():
|
|
"""
|
|
Generar contenido según el calendario.
|
|
Se ejecuta cada hora.
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
now = datetime.now()
|
|
current_day = now.weekday()
|
|
current_hour = now.hour
|
|
|
|
# Obtener entradas del calendario para esta hora
|
|
entries = db.query(ContentCalendar).filter(
|
|
ContentCalendar.day_of_week == current_day,
|
|
ContentCalendar.is_active == True
|
|
).all()
|
|
|
|
for entry in entries:
|
|
# Verificar si es la hora correcta
|
|
if entry.time.hour != current_hour:
|
|
continue
|
|
|
|
# Generar contenido según el tipo
|
|
if entry.content_type == "tip_tech":
|
|
generate_tip_post.delay(
|
|
platforms=entry.platforms,
|
|
category_filter=entry.category_filter,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
|
|
elif entry.content_type == "producto":
|
|
generate_product_post.delay(
|
|
platforms=entry.platforms,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
|
|
elif entry.content_type == "servicio":
|
|
generate_service_post.delay(
|
|
platforms=entry.platforms,
|
|
requires_approval=entry.requires_approval
|
|
)
|
|
|
|
return f"Procesadas {len(entries)} entradas del calendario"
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_tip_post")
|
|
def generate_tip_post(
|
|
platforms: list,
|
|
category_filter: str = None,
|
|
requires_approval: bool = False
|
|
):
|
|
"""Generar un post de tip tech."""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar un tip que no se haya usado recientemente
|
|
query = db.query(TipTemplate).filter(
|
|
TipTemplate.is_active == True
|
|
)
|
|
|
|
if category_filter:
|
|
query = query.filter(TipTemplate.category == category_filter)
|
|
|
|
# Ordenar por uso (menos usado primero)
|
|
tip = query.order_by(
|
|
TipTemplate.used_count.asc(),
|
|
TipTemplate.last_used.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not tip:
|
|
return "No hay tips disponibles"
|
|
|
|
# Generar contenido para cada plataforma
|
|
content_by_platform = {}
|
|
for platform in platforms:
|
|
content = run_async(
|
|
content_generator.generate_tip_tech(
|
|
category=tip.category,
|
|
platform=platform,
|
|
template=tip.template
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Crear post
|
|
post = Post(
|
|
content=content_by_platform.get(platforms[0], ""),
|
|
content_type="tip_tech",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
tip_template_id=tip.id
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar uso del tip
|
|
tip.used_count += 1
|
|
tip.last_used = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
return f"Post de tip generado: {post.id}"
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_product_post")
|
|
def generate_product_post(
|
|
platforms: list,
|
|
product_id: int = None,
|
|
requires_approval: bool = True
|
|
):
|
|
"""Generar un post de producto."""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar producto
|
|
if product_id:
|
|
product = db.query(Product).filter(Product.id == product_id).first()
|
|
else:
|
|
# Seleccionar uno aleatorio que no se haya publicado recientemente
|
|
product = db.query(Product).filter(
|
|
Product.is_available == True
|
|
).order_by(
|
|
Product.last_posted_at.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not product:
|
|
return "No hay productos disponibles"
|
|
|
|
# Generar contenido
|
|
content_by_platform = {}
|
|
for platform in platforms:
|
|
content = run_async(
|
|
content_generator.generate_product_post(
|
|
product=product.to_dict(),
|
|
platform=platform
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Crear post
|
|
post = Post(
|
|
content=content_by_platform.get(platforms[0], ""),
|
|
content_type="producto",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
product_id=product.id,
|
|
image_url=product.main_image
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar última publicación del producto
|
|
product.last_posted_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
return f"Post de producto generado: {post.id}"
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.generate_content.generate_service_post")
|
|
def generate_service_post(
|
|
platforms: list,
|
|
service_id: int = None,
|
|
requires_approval: bool = True
|
|
):
|
|
"""Generar un post de servicio."""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Seleccionar servicio
|
|
if service_id:
|
|
service = db.query(Service).filter(Service.id == service_id).first()
|
|
else:
|
|
service = db.query(Service).filter(
|
|
Service.is_active == True
|
|
).order_by(
|
|
Service.last_posted_at.asc().nullsfirst()
|
|
).first()
|
|
|
|
if not service:
|
|
return "No hay servicios disponibles"
|
|
|
|
# Generar contenido
|
|
content_by_platform = {}
|
|
for platform in platforms:
|
|
content = run_async(
|
|
content_generator.generate_service_post(
|
|
service=service.to_dict(),
|
|
platform=platform
|
|
)
|
|
)
|
|
content_by_platform[platform] = content
|
|
|
|
# Crear post
|
|
post = Post(
|
|
content=content_by_platform.get(platforms[0], ""),
|
|
content_type="servicio",
|
|
platforms=platforms,
|
|
content_x=content_by_platform.get("x"),
|
|
content_threads=content_by_platform.get("threads"),
|
|
content_instagram=content_by_platform.get("instagram"),
|
|
content_facebook=content_by_platform.get("facebook"),
|
|
status="pending_approval" if requires_approval else "scheduled",
|
|
scheduled_at=datetime.utcnow() + timedelta(minutes=5),
|
|
approval_required=requires_approval,
|
|
service_id=service.id,
|
|
image_url=service.main_image
|
|
)
|
|
|
|
db.add(post)
|
|
|
|
# Actualizar última publicación del servicio
|
|
service.last_posted_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
|
|
return f"Post de servicio generado: {post.id}"
|
|
|
|
finally:
|
|
db.close()
|