""" Celery tasks for social media automation. """ import asyncio from datetime import datetime, timedelta from typing import List, Optional from celery import shared_task from celery.utils.log import get_task_logger from app.core.database import SessionLocal from app.core.config import settings logger = get_task_logger(__name__) def run_async(coro): """Helper to run async code in sync context.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() # ============================================================ # PUBLISHING TASKS # ============================================================ @shared_task(bind=True, max_retries=3) def publish_post(self, post_id: int): """ Publish a single post to its designated platforms. Args: post_id: ID of the post to publish """ logger.info(f"Publishing post {post_id}") db = SessionLocal() try: from app.models.post import Post from app.publishers.manager import publisher_manager, Platform post = db.query(Post).filter(Post.id == post_id).first() if not post: logger.error(f"Post {post_id} not found") return {"success": False, "error": "Post not found"} if post.status not in ["scheduled", "pending_approval"]: logger.warning(f"Post {post_id} status is {post.status}, skipping") return {"success": False, "error": f"Invalid status: {post.status}"} # Publish to each platform results = {} for platform_name in post.platforms: try: platform = Platform(platform_name) result = run_async( publisher_manager.publish( platform=platform, content=post.content, image_path=post.image_url ) ) results[platform_name] = { "success": result.success, "post_id": result.post_id, "url": result.url, "error": result.error_message } if result.success: logger.info(f"Published to {platform_name}: {result.url}") else: logger.error(f"Failed {platform_name}: {result.error_message}") except Exception as e: logger.error(f"Error publishing to {platform_name}: {e}") results[platform_name] = {"success": False, "error": str(e)} # Update post status any_success = any(r.get("success") for r in results.values()) if any_success: post.status = "published" post.published_at = datetime.utcnow() post.publish_results = results else: post.status = "failed" post.publish_results = results db.commit() # Send notification send_publish_notification.delay(post_id, results) return {"success": any_success, "results": results} except Exception as e: logger.error(f"Error in publish_post: {e}") db.rollback() # Retry on failure raise self.retry(exc=e, countdown=60) finally: db.close() @shared_task def publish_to_platform(post_id: int, platform: str): """Publish a post to a specific platform.""" logger.info(f"Publishing post {post_id} to {platform}") db = SessionLocal() try: from app.models.post import Post from app.publishers.manager import publisher_manager, Platform post = db.query(Post).filter(Post.id == post_id).first() if not post: return {"success": False, "error": "Post not found"} plat = Platform(platform) result = run_async( publisher_manager.publish( platform=plat, content=post.content, image_path=post.image_url ) ) return { "success": result.success, "post_id": result.post_id, "url": result.url, "error": result.error_message } except Exception as e: logger.error(f"Error publishing to {platform}: {e}") return {"success": False, "error": str(e)} finally: db.close() @shared_task def check_scheduled_posts(): """ Check for posts scheduled to be published now. Runs every minute via Celery Beat. """ logger.info("Checking scheduled posts...") db = SessionLocal() try: from app.models.post import Post now = datetime.utcnow() window_start = now - timedelta(minutes=1) # Find posts scheduled for now posts = db.query(Post).filter( Post.status == "scheduled", Post.scheduled_at <= now, Post.scheduled_at > window_start ).all() logger.info(f"Found {len(posts)} posts to publish") for post in posts: # Queue publish task publish_post.delay(post.id) logger.info(f"Queued post {post.id} for publishing") return {"checked": len(posts)} except Exception as e: logger.error(f"Error checking scheduled posts: {e}") return {"error": str(e)} finally: db.close() # ============================================================ # CONTENT GENERATION TASKS # ============================================================ @shared_task def generate_daily_content(): """ Generate content for the day. Runs at 6 AM via Celery Beat. """ logger.info("Generating daily content...") if not settings.DEEPSEEK_API_KEY: logger.warning("DeepSeek API not configured, skipping content generation") return {"success": False, "error": "API not configured"} db = SessionLocal() try: from app.models.post import Post from app.services.content_generator import content_generator from app.data.content_templates import get_optimal_times today = datetime.utcnow().date() is_weekend = datetime.utcnow().weekday() >= 5 # Check if content already exists for today existing = db.query(Post).filter( Post.scheduled_at >= datetime.combine(today, datetime.min.time()), Post.scheduled_at < datetime.combine(today + timedelta(days=1), datetime.min.time()) ).count() if existing >= 4: logger.info(f"Already have {existing} posts for today, skipping") return {"success": True, "message": "Content already exists"} # Generate tips for each platform platforms = ["x", "threads"] categories = ["productividad", "seguridad", "ia", "general"] generated = 0 for i, platform in enumerate(platforms): category = categories[i % len(categories)] try: content = run_async( content_generator.generate_tip_tech( category=category, platform=platform ) ) # Get optimal time times = get_optimal_times(platform, is_weekend) time_str = times[i % len(times)] hour, minute = map(int, time_str.split(":")) scheduled_at = datetime.combine( today, datetime.min.time() ).replace(hour=hour, minute=minute) # Create post post = Post( content=content, content_type="tip", platforms=[platform], status="scheduled", scheduled_at=scheduled_at, metadata={"auto_generated": True, "category": category} ) db.add(post) generated += 1 except Exception as e: logger.error(f"Error generating for {platform}: {e}") db.commit() logger.info(f"Generated {generated} posts for today") return {"success": True, "generated": generated} except Exception as e: logger.error(f"Error in generate_daily_content: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() @shared_task def generate_content_batch( platforms: List[str], days: int = 7, categories: Optional[List[str]] = None ): """ Generate a batch of content for multiple days. Args: platforms: List of platforms days: Number of days to generate for categories: Tip categories to use """ logger.info(f"Generating batch content for {days} days") if not settings.DEEPSEEK_API_KEY: return {"success": False, "error": "API not configured"} try: from app.services.batch_generator import batch_generator result = run_async( batch_generator._generate_batch( platforms=platforms, start_date=None, days=days, tip_categories=categories ) ) # Save posts to database db = SessionLocal() try: from app.models.post import Post saved = 0 for post_data in result.posts: post = Post( content=post_data.content, content_type=post_data.content_type.value, platforms=[post_data.platform], status="pending_approval", # Require approval for batch scheduled_at=post_data.scheduled_at, metadata=post_data.metadata ) db.add(post) saved += 1 db.commit() return { "success": True, "generated": result.total_generated, "saved": saved, "errors": result.errors } finally: db.close() except Exception as e: logger.error(f"Error in generate_content_batch: {e}") return {"success": False, "error": str(e)} # ============================================================ # INTERACTION TASKS # ============================================================ @shared_task def sync_interactions(): """ Sync interactions from all platforms. Runs every 15 minutes via Celery Beat. """ logger.info("Syncing interactions...") db = SessionLocal() try: from app.models.interaction import Interaction from app.models.post import Post from app.publishers.manager import publisher_manager, Platform synced = 0 # Get recent published posts recent_posts = db.query(Post).filter( Post.status == "published", Post.published_at >= datetime.utcnow() - timedelta(days=7) ).all() for post in recent_posts: for platform_name in post.platforms: try: platform = Platform(platform_name) publisher = publisher_manager.get_publisher(platform) if not publisher: continue # Get post ID from results results = post.publish_results or {} platform_result = results.get(platform_name, {}) platform_post_id = platform_result.get("post_id") if not platform_post_id: continue # Fetch comments comments = run_async( publisher.get_comments(platform_post_id) ) for comment in comments: # Check if already exists existing = db.query(Interaction).filter( Interaction.platform == platform_name, Interaction.platform_interaction_id == str(comment.get("id")) ).first() if not existing: interaction = Interaction( post_id=post.id, platform=platform_name, platform_interaction_id=str(comment.get("id")), interaction_type="comment", author_id=str(comment.get("author_id", comment.get("from", {}).get("id", ""))), author_username=comment.get("username", comment.get("from", {}).get("name", "")), content=comment.get("text", comment.get("message", "")), interaction_at=datetime.fromisoformat( comment.get("created_at", comment.get("timestamp", datetime.utcnow().isoformat())) ) if comment.get("created_at") or comment.get("timestamp") else datetime.utcnow() ) db.add(interaction) synced += 1 except Exception as e: logger.error(f"Error syncing {platform_name} for post {post.id}: {e}") db.commit() logger.info(f"Synced {synced} new interactions") return {"success": True, "synced": synced} except Exception as e: logger.error(f"Error in sync_interactions: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() # ============================================================ # NOTIFICATION TASKS # ============================================================ @shared_task def send_publish_notification(post_id: int, results: dict): """Send notification about publish result.""" if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: return {"success": False, "error": "Telegram not configured"} try: from app.services.notifications import telegram_notify db = SessionLocal() try: from app.models.post import Post post = db.query(Post).filter(Post.id == post_id).first() if not post: return {"success": False, "error": "Post not found"} # Build message success_count = sum(1 for r in results.values() if r.get("success")) total = len(results) if success_count == total: emoji = "✅" status = "Publicado" elif success_count > 0: emoji = "⚠️" status = "Parcialmente publicado" else: emoji = "❌" status = "Falló" message = f"{emoji} *{status}*\n\n" message += f"📝 {post.content[:100]}...\n\n" for platform, result in results.items(): icon = "✓" if result.get("success") else "✗" message += f"{icon} {platform}" if result.get("url"): message += f": {result['url']}" elif result.get("error"): message += f": {result['error'][:50]}" message += "\n" run_async(telegram_notify(message)) return {"success": True} finally: db.close() except Exception as e: logger.error(f"Error sending notification: {e}") return {"success": False, "error": str(e)} @shared_task def send_daily_summary(): """ Send daily summary of activity. Runs at 9 PM via Celery Beat. """ if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: return {"success": False, "error": "Telegram not configured"} db = SessionLocal() try: from app.models.post import Post from app.models.interaction import Interaction from app.services.notifications import telegram_notify today = datetime.utcnow().date() today_start = datetime.combine(today, datetime.min.time()) # Stats published = db.query(Post).filter( Post.status == "published", Post.published_at >= today_start ).count() failed = db.query(Post).filter( Post.status == "failed", Post.published_at >= today_start ).count() scheduled = db.query(Post).filter( Post.status == "scheduled", Post.scheduled_at >= datetime.utcnow() ).count() pending = db.query(Post).filter( Post.status == "pending_approval" ).count() new_interactions = db.query(Interaction).filter( Interaction.interaction_at >= today_start, Interaction.responded == False ).count() # Build message message = "📊 *Resumen del día*\n\n" message += f"✅ Publicados: {published}\n" if failed > 0: message += f"❌ Fallidos: {failed}\n" message += f"📅 Programados: {scheduled}\n" message += f"⏳ Pendientes aprobación: {pending}\n" message += f"💬 Nuevas interacciones: {new_interactions}\n" if new_interactions > 0: message += f"\n⚠️ Hay {new_interactions} interacciones sin responder" run_async(telegram_notify(message)) return {"success": True} except Exception as e: logger.error(f"Error sending daily summary: {e}") return {"success": False, "error": str(e)} finally: db.close() # ============================================================ # THREAD SERIES TASKS # ============================================================ @shared_task def check_thread_schedules(): """ Check for thread posts that need to be published. Runs every minute via Celery Beat. """ logger.info("Checking thread schedules...") db = SessionLocal() try: from app.models.thread_series import ThreadSeries, ThreadPost now = datetime.utcnow() window_start = now - timedelta(minutes=1) # Find thread posts scheduled for now scheduled_posts = db.query(ThreadPost).filter( ThreadPost.status == "scheduled", ThreadPost.scheduled_at <= now, ThreadPost.scheduled_at > window_start ).all() published = 0 for thread_post in scheduled_posts: try: # Queue the publish task for the actual post if thread_post.post_id: publish_post.delay(thread_post.post_id) published += 1 logger.info(f"Queued thread post {thread_post.id} (post {thread_post.post_id})") except Exception as e: logger.error(f"Error publishing thread post {thread_post.id}: {e}") thread_post.status = "failed" thread_post.error_message = str(e) db.commit() logger.info(f"Queued {published} thread posts for publishing") return {"success": True, "published": published} except Exception as e: logger.error(f"Error in check_thread_schedules: {e}") return {"success": False, "error": str(e)} finally: db.close() @shared_task def update_thread_post_status(post_id: int, platform_post_id: str = None): """ Update thread post status after publishing. Args: post_id: The Post ID that was published platform_post_id: The platform-specific post ID (for reply chains) """ db = SessionLocal() try: from app.models.thread_series import ThreadSeries, ThreadPost from app.models.post import Post # Find the thread post associated with this post thread_post = db.query(ThreadPost).filter( ThreadPost.post_id == post_id ).first() if not thread_post: return {"success": True, "message": "Not a thread post"} post = db.query(Post).filter(Post.id == post_id).first() if post and post.status == "published": thread_post.status = "published" thread_post.published_at = datetime.utcnow() if platform_post_id: thread_post.platform_post_id = platform_post_id # Update series progress series = db.query(ThreadSeries).filter( ThreadSeries.id == thread_post.series_id ).first() if series: series.posts_published = db.query(ThreadPost).filter( ThreadPost.series_id == series.id, ThreadPost.status == "published" ).count() # Check if series is complete if series.posts_published >= series.total_posts: series.status = "completed" series.completed_at = datetime.utcnow() # Store first post ID for reply chain if thread_post.sequence_number == 1 and platform_post_id: series.first_platform_post_id = platform_post_id elif post and post.status == "failed": thread_post.status = "failed" thread_post.error_message = post.error_message db.commit() return {"success": True} except Exception as e: logger.error(f"Error updating thread post status: {e}") return {"success": False, "error": str(e)} finally: db.close() # ============================================================ # A/B TESTING & RECYCLING TASKS # ============================================================ @shared_task def evaluate_ab_tests(): """ Evaluate running A/B tests and update metrics. Runs every hour via Celery Beat. """ logger.info("Evaluating A/B tests...") db = SessionLocal() try: from app.models.ab_test import ABTest from app.services.ab_testing_service import ab_testing_service # Get running tests running_tests = db.query(ABTest).filter( ABTest.status == "running" ).all() evaluated = 0 for test in running_tests: try: # Check if test duration has elapsed if test.started_at: elapsed_hours = (datetime.utcnow() - test.started_at).total_seconds() / 3600 if elapsed_hours >= test.duration_hours: # Evaluate and complete the test result = run_async(ab_testing_service.evaluate_test(test.id)) logger.info(f"Evaluated A/B test {test.id}: {result}") evaluated += 1 else: # Just update metrics run_async(ab_testing_service.update_variant_metrics(test.id)) except Exception as e: logger.error(f"Error evaluating test {test.id}: {e}") logger.info(f"Evaluated {evaluated} A/B tests") return {"success": True, "evaluated": evaluated} except Exception as e: logger.error(f"Error in evaluate_ab_tests: {e}") return {"success": False, "error": str(e)} finally: db.close() @shared_task def auto_recycle_content(): """ Automatically recycle high-performing content. Runs daily at 2 AM via Celery Beat. """ logger.info("Auto-recycling content...") try: from app.services.recycling_service import recycling_service # Recycle 1 post per platform with high engagement platforms = ["x", "threads"] total_recycled = 0 for platform in platforms: result = run_async(recycling_service.auto_recycle( platform=platform, count=1, min_engagement_rate=3.0 # Only recycle really good posts )) if result.get("success"): total_recycled += result.get("recycled", 0) logger.info(f"Auto-recycled {result.get('recycled', 0)} posts for {platform}") return {"success": True, "recycled": total_recycled} except Exception as e: logger.error(f"Error in auto_recycle_content: {e}") return {"success": False, "error": str(e)} # ============================================================ # ODOO SYNC TASKS # ============================================================ @shared_task def sync_products_from_odoo(): """ Sync products from Odoo ERP. Runs daily at 6 AM via Celery Beat. """ logger.info("Syncing products from Odoo...") if not settings.ODOO_SYNC_ENABLED: logger.info("Odoo sync disabled, skipping") return {"success": False, "error": "Odoo sync disabled"} try: from app.services.odoo_service import odoo_service result = run_async(odoo_service.sync_products(limit=200)) if result.get("success"): logger.info(f"Synced {result.get('processed', 0)} products from Odoo") else: logger.error(f"Odoo product sync failed: {result.get('error')}") return result except Exception as e: logger.error(f"Error in sync_products_from_odoo: {e}") return {"success": False, "error": str(e)} @shared_task def sync_services_from_odoo(): """ Sync services from Odoo ERP. Runs daily at 6 AM via Celery Beat. """ logger.info("Syncing services from Odoo...") if not settings.ODOO_SYNC_ENABLED: logger.info("Odoo sync disabled, skipping") return {"success": False, "error": "Odoo sync disabled"} try: from app.services.odoo_service import odoo_service result = run_async(odoo_service.sync_services(limit=100)) if result.get("success"): logger.info(f"Synced {result.get('processed', 0)} services from Odoo") else: logger.error(f"Odoo service sync failed: {result.get('error')}") return result except Exception as e: logger.error(f"Error in sync_services_from_odoo: {e}") return {"success": False, "error": str(e)} @shared_task def export_leads_to_odoo(): """ Export unsynced leads to Odoo CRM. Runs every hour via Celery Beat. """ logger.info("Exporting leads to Odoo...") if not settings.ODOO_SYNC_ENABLED: logger.info("Odoo sync disabled, skipping") return {"success": False, "error": "Odoo sync disabled"} try: from app.services.odoo_service import odoo_service result = run_async(odoo_service.export_leads_to_odoo()) if result.get("success"): logger.info(f"Exported {result.get('created', 0)} leads to Odoo") else: logger.error(f"Odoo lead export failed: {result.get('error')}") return result except Exception as e: logger.error(f"Error in export_leads_to_odoo: {e}") return {"success": False, "error": str(e)} # ============================================================ # ANALYTICS TASKS # ============================================================ @shared_task def fetch_post_metrics(): """ Fetch and record metrics for recent posts. Runs every 15 minutes via Celery Beat. """ logger.info("Fetching post metrics...") db = SessionLocal() try: from app.models.post import Post from app.services.analytics_service import analytics_service from app.publishers.manager import publisher_manager, Platform # Get posts published in the last 7 days recent_posts = db.query(Post).filter( Post.status == "published", Post.published_at >= datetime.utcnow() - timedelta(days=7) ).all() updated = 0 for post in recent_posts: for platform_name in post.platforms: try: platform = Platform(platform_name) publisher = publisher_manager.get_publisher(platform) if not publisher: continue # Get platform post ID platform_ids = post.platform_post_ids or {} platform_post_id = platform_ids.get(platform_name) if not platform_post_id: continue # Fetch metrics from platform API metrics = run_async(publisher.get_post_metrics(platform_post_id)) if metrics: # Record metrics snapshot run_async(analytics_service.record_post_metrics( post_id=post.id, platform=platform_name, metrics=metrics )) # Update post.metrics with latest if not post.metrics: post.metrics = {} post.metrics.update({ "likes": metrics.get("likes", post.metrics.get("likes", 0)), "comments": metrics.get("comments", post.metrics.get("comments", 0)), "shares": metrics.get("shares", 0) + metrics.get("retweets", 0), "impressions": metrics.get("impressions", post.metrics.get("impressions", 0)), "reach": metrics.get("reach", post.metrics.get("reach", 0)) }) updated += 1 except Exception as e: logger.error(f"Error fetching metrics for post {post.id} on {platform_name}: {e}") db.commit() logger.info(f"Updated metrics for {updated} post-platform combinations") return {"success": True, "updated": updated} except Exception as e: logger.error(f"Error in fetch_post_metrics: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() @shared_task def generate_weekly_analytics_report(): """ Generate and optionally send weekly analytics report. Runs every Sunday at 9 AM via Celery Beat. """ logger.info("Generating weekly analytics report...") try: from app.services.analytics_service import analytics_service from app.services.notifications import notification_service # Generate report report = run_async(analytics_service.generate_weekly_report()) logger.info(f"Generated report {report.id} for {report.period_start} - {report.period_end}") # Send via Telegram if configured if settings.TELEGRAM_BOT_TOKEN and settings.TELEGRAM_CHAT_ID: if report.summary_text: success = run_async(notification_service.notify_daily_summary({ "custom_message": report.summary_text })) if success: logger.info("Weekly report sent to Telegram") else: logger.warning("Failed to send report to Telegram") return { "success": True, "report_id": report.id, "total_posts": report.total_posts, "total_engagements": report.total_engagements } except Exception as e: logger.error(f"Error generating weekly report: {e}") return {"success": False, "error": str(e)} @shared_task def recalculate_optimal_times(): """ Recalculate optimal posting times based on historical data. Runs weekly via Celery Beat. """ logger.info("Recalculating optimal posting times...") try: from app.services.analytics_service import analytics_service # Calculate for each platform platforms = ["x", "threads", "instagram", "facebook", None] results = {} for platform in platforms: times = run_async(analytics_service.get_optimal_times( platform=platform, days=90 )) platform_key = platform or "all" results[platform_key] = len(times) logger.info(f"Calculated {len(times)} optimal time slots for {platform_key}") return {"success": True, "results": results} except Exception as e: logger.error(f"Error recalculating optimal times: {e}") return {"success": False, "error": str(e)} # ============================================================ # MAINTENANCE TASKS # ============================================================ @shared_task def cleanup_old_data(): """ Clean up old data to prevent database bloat. Runs weekly via Celery Beat. """ logger.info("Running cleanup...") db = SessionLocal() try: from app.models.post import Post from app.models.interaction import Interaction cutoff = datetime.utcnow() - timedelta(days=90) # Archive old published posts old_posts = db.query(Post).filter( Post.status == "published", Post.published_at < cutoff ).count() # Delete old archived interactions deleted = db.query(Interaction).filter( Interaction.is_archived == True, Interaction.interaction_at < cutoff ).delete() db.commit() # Clean up old images from app.services.image_upload import image_upload images_deleted = image_upload.cleanup_old(days=30) logger.info(f"Cleanup: {deleted} interactions, {images_deleted} images") return { "success": True, "interactions_deleted": deleted, "images_deleted": images_deleted } except Exception as e: logger.error(f"Error in cleanup: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close()