- Add /api/posts/{id}/publish endpoint for API-based publishing
- Add /api/posts/{id}/mark-published endpoint for manual workflow
- Add content length validation before publishing
- Update modal with "Ya lo publiqué" and "Publicar (API)" buttons
- Fix retry_count handling for None values
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1109 lines
35 KiB
Python
1109 lines
35 KiB
Python
"""
|
|
Celery tasks for social media automation.
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional
|
|
|
|
from celery import shared_task
|
|
from celery.utils.log import get_task_logger
|
|
|
|
from app.core.database import SessionLocal
|
|
from app.core.config import settings
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
def run_async(coro):
|
|
"""Helper to run async code in sync context."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(coro)
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
# ============================================================
|
|
# PUBLISHING TASKS
|
|
# ============================================================
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def publish_post(self, post_id: int):
|
|
"""
|
|
Publish a single post to its designated platforms.
|
|
|
|
Args:
|
|
post_id: ID of the post to publish
|
|
"""
|
|
logger.info(f"Publishing post {post_id}")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.publishers.manager import publisher_manager, Platform
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
|
|
if not post:
|
|
logger.error(f"Post {post_id} not found")
|
|
return {"success": False, "error": "Post not found"}
|
|
|
|
if post.status not in ["scheduled", "pending_approval"]:
|
|
logger.warning(f"Post {post_id} status is {post.status}, skipping")
|
|
return {"success": False, "error": f"Invalid status: {post.status}"}
|
|
|
|
# Publish to each platform
|
|
results = {}
|
|
for platform_name in post.platforms:
|
|
try:
|
|
platform = Platform(platform_name)
|
|
|
|
# Get platform-specific content
|
|
platform_content = post.get_content_for_platform(platform_name)
|
|
|
|
# Pre-validate content length with descriptive error
|
|
publisher = publisher_manager.get_publisher(platform)
|
|
if publisher and hasattr(publisher, 'char_limit'):
|
|
content_length = len(platform_content)
|
|
if content_length > publisher.char_limit:
|
|
error_msg = (
|
|
f"Contenido excede límite: {content_length}/{publisher.char_limit} "
|
|
f"caracteres (sobra {content_length - publisher.char_limit})"
|
|
)
|
|
logger.error(f"Validation failed for {platform_name}: {error_msg}")
|
|
results[platform_name] = {
|
|
"success": False,
|
|
"post_id": None,
|
|
"url": None,
|
|
"error": error_msg
|
|
}
|
|
continue
|
|
|
|
result = run_async(
|
|
publisher_manager.publish(
|
|
platform=platform,
|
|
content=platform_content,
|
|
image_path=post.image_url
|
|
)
|
|
)
|
|
|
|
results[platform_name] = {
|
|
"success": result.success,
|
|
"post_id": result.post_id,
|
|
"url": result.url,
|
|
"error": result.error_message
|
|
}
|
|
|
|
if result.success:
|
|
logger.info(f"Published to {platform_name}: {result.url}")
|
|
else:
|
|
logger.error(f"Failed {platform_name}: {result.error_message}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error publishing to {platform_name}: {e}")
|
|
results[platform_name] = {"success": False, "error": str(e)}
|
|
|
|
# Update post status
|
|
any_success = any(r.get("success") for r in results.values())
|
|
|
|
if any_success:
|
|
post.status = "published"
|
|
post.published_at = datetime.utcnow()
|
|
post.platform_post_ids = {
|
|
k: v.get("post_id") for k, v in results.items() if v.get("success")
|
|
}
|
|
else:
|
|
post.status = "failed"
|
|
# Collect all error messages
|
|
errors = [f"{k}: {v.get('error')}" for k, v in results.items() if v.get("error")]
|
|
post.error_message = "\n".join(errors) if errors else "Unknown error"
|
|
|
|
db.commit()
|
|
|
|
# Send notification
|
|
send_publish_notification.delay(post_id, results)
|
|
|
|
return {"success": any_success, "results": results}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in publish_post: {e}")
|
|
db.rollback()
|
|
|
|
# Retry on failure
|
|
raise self.retry(exc=e, countdown=60)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def publish_to_platform(post_id: int, platform: str):
|
|
"""Publish a post to a specific platform."""
|
|
logger.info(f"Publishing post {post_id} to {platform}")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.publishers.manager import publisher_manager, Platform
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post:
|
|
return {"success": False, "error": "Post not found"}
|
|
|
|
plat = Platform(platform)
|
|
result = run_async(
|
|
publisher_manager.publish(
|
|
platform=plat,
|
|
content=post.content,
|
|
image_path=post.image_url
|
|
)
|
|
)
|
|
|
|
return {
|
|
"success": result.success,
|
|
"post_id": result.post_id,
|
|
"url": result.url,
|
|
"error": result.error_message
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error publishing to {platform}: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def check_scheduled_posts():
|
|
"""
|
|
Check for posts scheduled to be published now.
|
|
Runs every minute via Celery Beat.
|
|
"""
|
|
logger.info("Checking scheduled posts...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
now = datetime.utcnow()
|
|
window_start = now - timedelta(minutes=1)
|
|
|
|
# Find posts scheduled for now
|
|
posts = db.query(Post).filter(
|
|
Post.status == "scheduled",
|
|
Post.scheduled_at <= now,
|
|
Post.scheduled_at > window_start
|
|
).all()
|
|
|
|
logger.info(f"Found {len(posts)} posts to publish")
|
|
|
|
for post in posts:
|
|
# Queue publish task
|
|
publish_post.delay(post.id)
|
|
logger.info(f"Queued post {post.id} for publishing")
|
|
|
|
return {"checked": len(posts)}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking scheduled posts: {e}")
|
|
return {"error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# CONTENT GENERATION TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def generate_daily_content():
|
|
"""
|
|
Generate content for the day.
|
|
Runs at 6 AM via Celery Beat.
|
|
"""
|
|
logger.info("Generating daily content...")
|
|
|
|
if not settings.DEEPSEEK_API_KEY:
|
|
logger.warning("DeepSeek API not configured, skipping content generation")
|
|
return {"success": False, "error": "API not configured"}
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.services.content_generator import content_generator
|
|
from app.data.content_templates import get_optimal_times
|
|
|
|
today = datetime.utcnow().date()
|
|
is_weekend = datetime.utcnow().weekday() >= 5
|
|
|
|
# Check if content already exists for today
|
|
existing = db.query(Post).filter(
|
|
Post.scheduled_at >= datetime.combine(today, datetime.min.time()),
|
|
Post.scheduled_at < datetime.combine(today + timedelta(days=1), datetime.min.time())
|
|
).count()
|
|
|
|
if existing >= 4:
|
|
logger.info(f"Already have {existing} posts for today, skipping")
|
|
return {"success": True, "message": "Content already exists"}
|
|
|
|
# Generate tips for each platform
|
|
platforms = ["x", "threads"]
|
|
categories = ["productividad", "seguridad", "ia", "general"]
|
|
generated = 0
|
|
|
|
for i, platform in enumerate(platforms):
|
|
category = categories[i % len(categories)]
|
|
|
|
try:
|
|
content = run_async(
|
|
content_generator.generate_tip_tech(
|
|
category=category,
|
|
platform=platform
|
|
)
|
|
)
|
|
|
|
# Get optimal time
|
|
times = get_optimal_times(platform, is_weekend)
|
|
time_str = times[i % len(times)]
|
|
hour, minute = map(int, time_str.split(":"))
|
|
|
|
scheduled_at = datetime.combine(
|
|
today,
|
|
datetime.min.time()
|
|
).replace(hour=hour, minute=minute)
|
|
|
|
# Create post
|
|
post = Post(
|
|
content=content,
|
|
content_type="tip",
|
|
platforms=[platform],
|
|
status="scheduled",
|
|
scheduled_at=scheduled_at,
|
|
metadata={"auto_generated": True, "category": category}
|
|
)
|
|
|
|
db.add(post)
|
|
generated += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating for {platform}: {e}")
|
|
|
|
db.commit()
|
|
logger.info(f"Generated {generated} posts for today")
|
|
|
|
return {"success": True, "generated": generated}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in generate_daily_content: {e}")
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def generate_content_batch(
|
|
platforms: List[str],
|
|
days: int = 7,
|
|
categories: Optional[List[str]] = None
|
|
):
|
|
"""
|
|
Generate a batch of content for multiple days.
|
|
|
|
Args:
|
|
platforms: List of platforms
|
|
days: Number of days to generate for
|
|
categories: Tip categories to use
|
|
"""
|
|
logger.info(f"Generating batch content for {days} days")
|
|
|
|
if not settings.DEEPSEEK_API_KEY:
|
|
return {"success": False, "error": "API not configured"}
|
|
|
|
try:
|
|
from app.services.batch_generator import batch_generator
|
|
|
|
result = run_async(
|
|
batch_generator._generate_batch(
|
|
platforms=platforms,
|
|
start_date=None,
|
|
days=days,
|
|
tip_categories=categories
|
|
)
|
|
)
|
|
|
|
# Save posts to database
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
saved = 0
|
|
for post_data in result.posts:
|
|
post = Post(
|
|
content=post_data.content,
|
|
content_type=post_data.content_type.value,
|
|
platforms=[post_data.platform],
|
|
status="pending_approval", # Require approval for batch
|
|
scheduled_at=post_data.scheduled_at,
|
|
metadata=post_data.metadata
|
|
)
|
|
db.add(post)
|
|
saved += 1
|
|
|
|
db.commit()
|
|
|
|
return {
|
|
"success": True,
|
|
"generated": result.total_generated,
|
|
"saved": saved,
|
|
"errors": result.errors
|
|
}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in generate_content_batch: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
# ============================================================
|
|
# INTERACTION TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def sync_interactions():
|
|
"""
|
|
Sync interactions from all platforms.
|
|
Runs every 15 minutes via Celery Beat.
|
|
"""
|
|
logger.info("Syncing interactions...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.interaction import Interaction
|
|
from app.models.post import Post
|
|
from app.publishers.manager import publisher_manager, Platform
|
|
|
|
synced = 0
|
|
|
|
# Get recent published posts
|
|
recent_posts = db.query(Post).filter(
|
|
Post.status == "published",
|
|
Post.published_at >= datetime.utcnow() - timedelta(days=7)
|
|
).all()
|
|
|
|
for post in recent_posts:
|
|
for platform_name in post.platforms:
|
|
try:
|
|
platform = Platform(platform_name)
|
|
publisher = publisher_manager.get_publisher(platform)
|
|
|
|
if not publisher:
|
|
continue
|
|
|
|
# Get post ID from results
|
|
results = post.publish_results or {}
|
|
platform_result = results.get(platform_name, {})
|
|
platform_post_id = platform_result.get("post_id")
|
|
|
|
if not platform_post_id:
|
|
continue
|
|
|
|
# Fetch comments
|
|
comments = run_async(
|
|
publisher.get_comments(platform_post_id)
|
|
)
|
|
|
|
for comment in comments:
|
|
# Check if already exists
|
|
existing = db.query(Interaction).filter(
|
|
Interaction.platform == platform_name,
|
|
Interaction.platform_interaction_id == str(comment.get("id"))
|
|
).first()
|
|
|
|
if not existing:
|
|
interaction = Interaction(
|
|
post_id=post.id,
|
|
platform=platform_name,
|
|
platform_interaction_id=str(comment.get("id")),
|
|
interaction_type="comment",
|
|
author_id=str(comment.get("author_id", comment.get("from", {}).get("id", ""))),
|
|
author_username=comment.get("username", comment.get("from", {}).get("name", "")),
|
|
content=comment.get("text", comment.get("message", "")),
|
|
interaction_at=datetime.fromisoformat(
|
|
comment.get("created_at", comment.get("timestamp", datetime.utcnow().isoformat()))
|
|
) if comment.get("created_at") or comment.get("timestamp") else datetime.utcnow()
|
|
)
|
|
db.add(interaction)
|
|
synced += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing {platform_name} for post {post.id}: {e}")
|
|
|
|
db.commit()
|
|
logger.info(f"Synced {synced} new interactions")
|
|
|
|
return {"success": True, "synced": synced}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in sync_interactions: {e}")
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# NOTIFICATION TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def send_publish_notification(post_id: int, results: dict):
|
|
"""Send notification about publish result."""
|
|
if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID:
|
|
return {"success": False, "error": "Telegram not configured"}
|
|
|
|
try:
|
|
from app.services.notifications import telegram_notify
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
|
|
if not post:
|
|
return {"success": False, "error": "Post not found"}
|
|
|
|
# Build message
|
|
success_count = sum(1 for r in results.values() if r.get("success"))
|
|
total = len(results)
|
|
|
|
if success_count == total:
|
|
emoji = "✅"
|
|
status = "Publicado"
|
|
elif success_count > 0:
|
|
emoji = "⚠️"
|
|
status = "Parcialmente publicado"
|
|
else:
|
|
emoji = "❌"
|
|
status = "Falló"
|
|
|
|
message = f"{emoji} *{status}*\n\n"
|
|
message += f"📝 {post.content[:100]}...\n\n"
|
|
|
|
for platform, result in results.items():
|
|
icon = "✓" if result.get("success") else "✗"
|
|
message += f"{icon} {platform}"
|
|
if result.get("url"):
|
|
message += f": {result['url']}"
|
|
elif result.get("error"):
|
|
message += f": {result['error'][:50]}"
|
|
message += "\n"
|
|
|
|
run_async(telegram_notify(message))
|
|
|
|
return {"success": True}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending notification: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@shared_task
|
|
def send_daily_summary():
|
|
"""
|
|
Send daily summary of activity.
|
|
Runs at 9 PM via Celery Beat.
|
|
"""
|
|
if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID:
|
|
return {"success": False, "error": "Telegram not configured"}
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.models.interaction import Interaction
|
|
from app.services.notifications import telegram_notify
|
|
|
|
today = datetime.utcnow().date()
|
|
today_start = datetime.combine(today, datetime.min.time())
|
|
|
|
# Stats
|
|
published = db.query(Post).filter(
|
|
Post.status == "published",
|
|
Post.published_at >= today_start
|
|
).count()
|
|
|
|
failed = db.query(Post).filter(
|
|
Post.status == "failed",
|
|
Post.published_at >= today_start
|
|
).count()
|
|
|
|
scheduled = db.query(Post).filter(
|
|
Post.status == "scheduled",
|
|
Post.scheduled_at >= datetime.utcnow()
|
|
).count()
|
|
|
|
pending = db.query(Post).filter(
|
|
Post.status == "pending_approval"
|
|
).count()
|
|
|
|
new_interactions = db.query(Interaction).filter(
|
|
Interaction.interaction_at >= today_start,
|
|
Interaction.responded == False
|
|
).count()
|
|
|
|
# Build message
|
|
message = "📊 *Resumen del día*\n\n"
|
|
message += f"✅ Publicados: {published}\n"
|
|
if failed > 0:
|
|
message += f"❌ Fallidos: {failed}\n"
|
|
message += f"📅 Programados: {scheduled}\n"
|
|
message += f"⏳ Pendientes aprobación: {pending}\n"
|
|
message += f"💬 Nuevas interacciones: {new_interactions}\n"
|
|
|
|
if new_interactions > 0:
|
|
message += f"\n⚠️ Hay {new_interactions} interacciones sin responder"
|
|
|
|
run_async(telegram_notify(message))
|
|
|
|
return {"success": True}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending daily summary: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# THREAD SERIES TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def check_thread_schedules():
|
|
"""
|
|
Check for thread posts that need to be published.
|
|
Runs every minute via Celery Beat.
|
|
"""
|
|
logger.info("Checking thread schedules...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.thread_series import ThreadSeries, ThreadPost
|
|
|
|
now = datetime.utcnow()
|
|
window_start = now - timedelta(minutes=1)
|
|
|
|
# Find thread posts scheduled for now
|
|
scheduled_posts = db.query(ThreadPost).filter(
|
|
ThreadPost.status == "scheduled",
|
|
ThreadPost.scheduled_at <= now,
|
|
ThreadPost.scheduled_at > window_start
|
|
).all()
|
|
|
|
published = 0
|
|
|
|
for thread_post in scheduled_posts:
|
|
try:
|
|
# Queue the publish task for the actual post
|
|
if thread_post.post_id:
|
|
publish_post.delay(thread_post.post_id)
|
|
published += 1
|
|
logger.info(f"Queued thread post {thread_post.id} (post {thread_post.post_id})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error publishing thread post {thread_post.id}: {e}")
|
|
thread_post.status = "failed"
|
|
thread_post.error_message = str(e)
|
|
|
|
db.commit()
|
|
|
|
logger.info(f"Queued {published} thread posts for publishing")
|
|
|
|
return {"success": True, "published": published}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in check_thread_schedules: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def update_thread_post_status(post_id: int, platform_post_id: str = None):
|
|
"""
|
|
Update thread post status after publishing.
|
|
|
|
Args:
|
|
post_id: The Post ID that was published
|
|
platform_post_id: The platform-specific post ID (for reply chains)
|
|
"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.thread_series import ThreadSeries, ThreadPost
|
|
from app.models.post import Post
|
|
|
|
# Find the thread post associated with this post
|
|
thread_post = db.query(ThreadPost).filter(
|
|
ThreadPost.post_id == post_id
|
|
).first()
|
|
|
|
if not thread_post:
|
|
return {"success": True, "message": "Not a thread post"}
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
|
|
if post and post.status == "published":
|
|
thread_post.status = "published"
|
|
thread_post.published_at = datetime.utcnow()
|
|
|
|
if platform_post_id:
|
|
thread_post.platform_post_id = platform_post_id
|
|
|
|
# Update series progress
|
|
series = db.query(ThreadSeries).filter(
|
|
ThreadSeries.id == thread_post.series_id
|
|
).first()
|
|
|
|
if series:
|
|
series.posts_published = db.query(ThreadPost).filter(
|
|
ThreadPost.series_id == series.id,
|
|
ThreadPost.status == "published"
|
|
).count()
|
|
|
|
# Check if series is complete
|
|
if series.posts_published >= series.total_posts:
|
|
series.status = "completed"
|
|
series.completed_at = datetime.utcnow()
|
|
|
|
# Store first post ID for reply chain
|
|
if thread_post.sequence_number == 1 and platform_post_id:
|
|
series.first_platform_post_id = platform_post_id
|
|
|
|
elif post and post.status == "failed":
|
|
thread_post.status = "failed"
|
|
thread_post.error_message = post.error_message
|
|
|
|
db.commit()
|
|
|
|
return {"success": True}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating thread post status: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ============================================================
|
|
# A/B TESTING & RECYCLING TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def evaluate_ab_tests():
|
|
"""
|
|
Evaluate running A/B tests and update metrics.
|
|
Runs every hour via Celery Beat.
|
|
"""
|
|
logger.info("Evaluating A/B tests...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.ab_test import ABTest
|
|
from app.services.ab_testing_service import ab_testing_service
|
|
|
|
# Get running tests
|
|
running_tests = db.query(ABTest).filter(
|
|
ABTest.status == "running"
|
|
).all()
|
|
|
|
evaluated = 0
|
|
|
|
for test in running_tests:
|
|
try:
|
|
# Check if test duration has elapsed
|
|
if test.started_at:
|
|
elapsed_hours = (datetime.utcnow() - test.started_at).total_seconds() / 3600
|
|
if elapsed_hours >= test.duration_hours:
|
|
# Evaluate and complete the test
|
|
result = run_async(ab_testing_service.evaluate_test(test.id))
|
|
logger.info(f"Evaluated A/B test {test.id}: {result}")
|
|
evaluated += 1
|
|
else:
|
|
# Just update metrics
|
|
run_async(ab_testing_service.update_variant_metrics(test.id))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating test {test.id}: {e}")
|
|
|
|
logger.info(f"Evaluated {evaluated} A/B tests")
|
|
|
|
return {"success": True, "evaluated": evaluated}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in evaluate_ab_tests: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def auto_recycle_content():
|
|
"""
|
|
Automatically recycle high-performing content.
|
|
Runs daily at 2 AM via Celery Beat.
|
|
"""
|
|
logger.info("Auto-recycling content...")
|
|
|
|
try:
|
|
from app.services.recycling_service import recycling_service
|
|
|
|
# Recycle 1 post per platform with high engagement
|
|
platforms = ["x", "threads"]
|
|
total_recycled = 0
|
|
|
|
for platform in platforms:
|
|
result = run_async(recycling_service.auto_recycle(
|
|
platform=platform,
|
|
count=1,
|
|
min_engagement_rate=3.0 # Only recycle really good posts
|
|
))
|
|
|
|
if result.get("success"):
|
|
total_recycled += result.get("recycled", 0)
|
|
logger.info(f"Auto-recycled {result.get('recycled', 0)} posts for {platform}")
|
|
|
|
return {"success": True, "recycled": total_recycled}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in auto_recycle_content: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
# ============================================================
|
|
# ODOO SYNC TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def sync_products_from_odoo():
|
|
"""
|
|
Sync products from Odoo ERP.
|
|
Runs daily at 6 AM via Celery Beat.
|
|
"""
|
|
logger.info("Syncing products from Odoo...")
|
|
|
|
if not settings.ODOO_SYNC_ENABLED:
|
|
logger.info("Odoo sync disabled, skipping")
|
|
return {"success": False, "error": "Odoo sync disabled"}
|
|
|
|
try:
|
|
from app.services.odoo_service import odoo_service
|
|
|
|
result = run_async(odoo_service.sync_products(limit=200))
|
|
|
|
if result.get("success"):
|
|
logger.info(f"Synced {result.get('processed', 0)} products from Odoo")
|
|
else:
|
|
logger.error(f"Odoo product sync failed: {result.get('error')}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in sync_products_from_odoo: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@shared_task
|
|
def sync_services_from_odoo():
|
|
"""
|
|
Sync services from Odoo ERP.
|
|
Runs daily at 6 AM via Celery Beat.
|
|
"""
|
|
logger.info("Syncing services from Odoo...")
|
|
|
|
if not settings.ODOO_SYNC_ENABLED:
|
|
logger.info("Odoo sync disabled, skipping")
|
|
return {"success": False, "error": "Odoo sync disabled"}
|
|
|
|
try:
|
|
from app.services.odoo_service import odoo_service
|
|
|
|
result = run_async(odoo_service.sync_services(limit=100))
|
|
|
|
if result.get("success"):
|
|
logger.info(f"Synced {result.get('processed', 0)} services from Odoo")
|
|
else:
|
|
logger.error(f"Odoo service sync failed: {result.get('error')}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in sync_services_from_odoo: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@shared_task
|
|
def export_leads_to_odoo():
|
|
"""
|
|
Export unsynced leads to Odoo CRM.
|
|
Runs every hour via Celery Beat.
|
|
"""
|
|
logger.info("Exporting leads to Odoo...")
|
|
|
|
if not settings.ODOO_SYNC_ENABLED:
|
|
logger.info("Odoo sync disabled, skipping")
|
|
return {"success": False, "error": "Odoo sync disabled"}
|
|
|
|
try:
|
|
from app.services.odoo_service import odoo_service
|
|
|
|
result = run_async(odoo_service.export_leads_to_odoo())
|
|
|
|
if result.get("success"):
|
|
logger.info(f"Exported {result.get('created', 0)} leads to Odoo")
|
|
else:
|
|
logger.error(f"Odoo lead export failed: {result.get('error')}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in export_leads_to_odoo: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
# ============================================================
|
|
# ANALYTICS TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def fetch_post_metrics():
|
|
"""
|
|
Fetch and record metrics for recent posts.
|
|
Runs every 15 minutes via Celery Beat.
|
|
"""
|
|
logger.info("Fetching post metrics...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.services.analytics_service import analytics_service
|
|
from app.publishers.manager import publisher_manager, Platform
|
|
|
|
# Get posts published in the last 7 days
|
|
recent_posts = db.query(Post).filter(
|
|
Post.status == "published",
|
|
Post.published_at >= datetime.utcnow() - timedelta(days=7)
|
|
).all()
|
|
|
|
updated = 0
|
|
|
|
for post in recent_posts:
|
|
for platform_name in post.platforms:
|
|
try:
|
|
platform = Platform(platform_name)
|
|
publisher = publisher_manager.get_publisher(platform)
|
|
|
|
if not publisher:
|
|
continue
|
|
|
|
# Get platform post ID
|
|
platform_ids = post.platform_post_ids or {}
|
|
platform_post_id = platform_ids.get(platform_name)
|
|
|
|
if not platform_post_id:
|
|
continue
|
|
|
|
# Fetch metrics from platform API
|
|
metrics = run_async(publisher.get_post_metrics(platform_post_id))
|
|
|
|
if metrics:
|
|
# Record metrics snapshot
|
|
run_async(analytics_service.record_post_metrics(
|
|
post_id=post.id,
|
|
platform=platform_name,
|
|
metrics=metrics
|
|
))
|
|
|
|
# Update post.metrics with latest
|
|
if not post.metrics:
|
|
post.metrics = {}
|
|
|
|
post.metrics.update({
|
|
"likes": metrics.get("likes", post.metrics.get("likes", 0)),
|
|
"comments": metrics.get("comments", post.metrics.get("comments", 0)),
|
|
"shares": metrics.get("shares", 0) + metrics.get("retweets", 0),
|
|
"impressions": metrics.get("impressions", post.metrics.get("impressions", 0)),
|
|
"reach": metrics.get("reach", post.metrics.get("reach", 0))
|
|
})
|
|
|
|
updated += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching metrics for post {post.id} on {platform_name}: {e}")
|
|
|
|
db.commit()
|
|
logger.info(f"Updated metrics for {updated} post-platform combinations")
|
|
|
|
return {"success": True, "updated": updated}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in fetch_post_metrics: {e}")
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@shared_task
|
|
def generate_weekly_analytics_report():
|
|
"""
|
|
Generate and optionally send weekly analytics report.
|
|
Runs every Sunday at 9 AM via Celery Beat.
|
|
"""
|
|
logger.info("Generating weekly analytics report...")
|
|
|
|
try:
|
|
from app.services.analytics_service import analytics_service
|
|
from app.services.notifications import notification_service
|
|
|
|
# Generate report
|
|
report = run_async(analytics_service.generate_weekly_report())
|
|
|
|
logger.info(f"Generated report {report.id} for {report.period_start} - {report.period_end}")
|
|
|
|
# Send via Telegram if configured
|
|
if settings.TELEGRAM_BOT_TOKEN and settings.TELEGRAM_CHAT_ID:
|
|
if report.summary_text:
|
|
success = run_async(notification_service.notify_daily_summary({
|
|
"custom_message": report.summary_text
|
|
}))
|
|
|
|
if success:
|
|
logger.info("Weekly report sent to Telegram")
|
|
else:
|
|
logger.warning("Failed to send report to Telegram")
|
|
|
|
return {
|
|
"success": True,
|
|
"report_id": report.id,
|
|
"total_posts": report.total_posts,
|
|
"total_engagements": report.total_engagements
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating weekly report: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@shared_task
|
|
def recalculate_optimal_times():
|
|
"""
|
|
Recalculate optimal posting times based on historical data.
|
|
Runs weekly via Celery Beat.
|
|
"""
|
|
logger.info("Recalculating optimal posting times...")
|
|
|
|
try:
|
|
from app.services.analytics_service import analytics_service
|
|
|
|
# Calculate for each platform
|
|
platforms = ["x", "threads", "instagram", "facebook", None]
|
|
results = {}
|
|
|
|
for platform in platforms:
|
|
times = run_async(analytics_service.get_optimal_times(
|
|
platform=platform,
|
|
days=90
|
|
))
|
|
|
|
platform_key = platform or "all"
|
|
results[platform_key] = len(times)
|
|
|
|
logger.info(f"Calculated {len(times)} optimal time slots for {platform_key}")
|
|
|
|
return {"success": True, "results": results}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error recalculating optimal times: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
# ============================================================
|
|
# MAINTENANCE TASKS
|
|
# ============================================================
|
|
|
|
@shared_task
|
|
def cleanup_old_data():
|
|
"""
|
|
Clean up old data to prevent database bloat.
|
|
Runs weekly via Celery Beat.
|
|
"""
|
|
logger.info("Running cleanup...")
|
|
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
from app.models.post import Post
|
|
from app.models.interaction import Interaction
|
|
|
|
cutoff = datetime.utcnow() - timedelta(days=90)
|
|
|
|
# Archive old published posts
|
|
old_posts = db.query(Post).filter(
|
|
Post.status == "published",
|
|
Post.published_at < cutoff
|
|
).count()
|
|
|
|
# Delete old archived interactions
|
|
deleted = db.query(Interaction).filter(
|
|
Interaction.is_archived == True,
|
|
Interaction.interaction_at < cutoff
|
|
).delete()
|
|
|
|
db.commit()
|
|
|
|
# Clean up old images
|
|
from app.services.image_upload import image_upload
|
|
images_deleted = image_upload.cleanup_old(days=30)
|
|
|
|
logger.info(f"Cleanup: {deleted} interactions, {images_deleted} images")
|
|
|
|
return {
|
|
"success": True,
|
|
"interactions_deleted": deleted,
|
|
"images_deleted": images_deleted
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup: {e}")
|
|
db.rollback()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
finally:
|
|
db.close()
|