""" Celery tasks for social media automation. """ import asyncio from datetime import datetime, timedelta from typing import List, Optional from celery import shared_task from celery.utils.log import get_task_logger from app.core.database import SessionLocal from app.core.config import settings logger = get_task_logger(__name__) def run_async(coro): """Helper to run async code in sync context.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() # ============================================================ # PUBLISHING TASKS # ============================================================ @shared_task(bind=True, max_retries=3) def publish_post(self, post_id: int): """ Publish a single post to its designated platforms. Args: post_id: ID of the post to publish """ logger.info(f"Publishing post {post_id}") db = SessionLocal() try: from app.models.post import Post from app.publishers.manager import publisher_manager, Platform post = db.query(Post).filter(Post.id == post_id).first() if not post: logger.error(f"Post {post_id} not found") return {"success": False, "error": "Post not found"} if post.status not in ["scheduled", "pending_approval"]: logger.warning(f"Post {post_id} status is {post.status}, skipping") return {"success": False, "error": f"Invalid status: {post.status}"} # Publish to each platform results = {} for platform_name in post.platforms: try: platform = Platform(platform_name) result = run_async( publisher_manager.publish( platform=platform, content=post.content, image_path=post.image_url ) ) results[platform_name] = { "success": result.success, "post_id": result.post_id, "url": result.url, "error": result.error_message } if result.success: logger.info(f"Published to {platform_name}: {result.url}") else: logger.error(f"Failed {platform_name}: {result.error_message}") except Exception as e: logger.error(f"Error publishing to {platform_name}: {e}") results[platform_name] = {"success": False, "error": str(e)} # Update post status any_success = any(r.get("success") for r in results.values()) if any_success: post.status = "published" post.published_at = datetime.utcnow() post.publish_results = results else: post.status = "failed" post.publish_results = results db.commit() # Send notification send_publish_notification.delay(post_id, results) return {"success": any_success, "results": results} except Exception as e: logger.error(f"Error in publish_post: {e}") db.rollback() # Retry on failure raise self.retry(exc=e, countdown=60) finally: db.close() @shared_task def publish_to_platform(post_id: int, platform: str): """Publish a post to a specific platform.""" logger.info(f"Publishing post {post_id} to {platform}") db = SessionLocal() try: from app.models.post import Post from app.publishers.manager import publisher_manager, Platform post = db.query(Post).filter(Post.id == post_id).first() if not post: return {"success": False, "error": "Post not found"} plat = Platform(platform) result = run_async( publisher_manager.publish( platform=plat, content=post.content, image_path=post.image_url ) ) return { "success": result.success, "post_id": result.post_id, "url": result.url, "error": result.error_message } except Exception as e: logger.error(f"Error publishing to {platform}: {e}") return {"success": False, "error": str(e)} finally: db.close() @shared_task def check_scheduled_posts(): """ Check for posts scheduled to be published now. Runs every minute via Celery Beat. """ logger.info("Checking scheduled posts...") db = SessionLocal() try: from app.models.post import Post now = datetime.utcnow() window_start = now - timedelta(minutes=1) # Find posts scheduled for now posts = db.query(Post).filter( Post.status == "scheduled", Post.scheduled_at <= now, Post.scheduled_at > window_start ).all() logger.info(f"Found {len(posts)} posts to publish") for post in posts: # Queue publish task publish_post.delay(post.id) logger.info(f"Queued post {post.id} for publishing") return {"checked": len(posts)} except Exception as e: logger.error(f"Error checking scheduled posts: {e}") return {"error": str(e)} finally: db.close() # ============================================================ # CONTENT GENERATION TASKS # ============================================================ @shared_task def generate_daily_content(): """ Generate content for the day. Runs at 6 AM via Celery Beat. """ logger.info("Generating daily content...") if not settings.DEEPSEEK_API_KEY: logger.warning("DeepSeek API not configured, skipping content generation") return {"success": False, "error": "API not configured"} db = SessionLocal() try: from app.models.post import Post from app.services.content_generator import content_generator from app.data.content_templates import get_optimal_times today = datetime.utcnow().date() is_weekend = datetime.utcnow().weekday() >= 5 # Check if content already exists for today existing = db.query(Post).filter( Post.scheduled_at >= datetime.combine(today, datetime.min.time()), Post.scheduled_at < datetime.combine(today + timedelta(days=1), datetime.min.time()) ).count() if existing >= 4: logger.info(f"Already have {existing} posts for today, skipping") return {"success": True, "message": "Content already exists"} # Generate tips for each platform platforms = ["x", "threads"] categories = ["productividad", "seguridad", "ia", "general"] generated = 0 for i, platform in enumerate(platforms): category = categories[i % len(categories)] try: content = run_async( content_generator.generate_tip_tech( category=category, platform=platform ) ) # Get optimal time times = get_optimal_times(platform, is_weekend) time_str = times[i % len(times)] hour, minute = map(int, time_str.split(":")) scheduled_at = datetime.combine( today, datetime.min.time() ).replace(hour=hour, minute=minute) # Create post post = Post( content=content, content_type="tip", platforms=[platform], status="scheduled", scheduled_at=scheduled_at, metadata={"auto_generated": True, "category": category} ) db.add(post) generated += 1 except Exception as e: logger.error(f"Error generating for {platform}: {e}") db.commit() logger.info(f"Generated {generated} posts for today") return {"success": True, "generated": generated} except Exception as e: logger.error(f"Error in generate_daily_content: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() @shared_task def generate_content_batch( platforms: List[str], days: int = 7, categories: Optional[List[str]] = None ): """ Generate a batch of content for multiple days. Args: platforms: List of platforms days: Number of days to generate for categories: Tip categories to use """ logger.info(f"Generating batch content for {days} days") if not settings.DEEPSEEK_API_KEY: return {"success": False, "error": "API not configured"} try: from app.services.batch_generator import batch_generator result = run_async( batch_generator._generate_batch( platforms=platforms, start_date=None, days=days, tip_categories=categories ) ) # Save posts to database db = SessionLocal() try: from app.models.post import Post saved = 0 for post_data in result.posts: post = Post( content=post_data.content, content_type=post_data.content_type.value, platforms=[post_data.platform], status="pending_approval", # Require approval for batch scheduled_at=post_data.scheduled_at, metadata=post_data.metadata ) db.add(post) saved += 1 db.commit() return { "success": True, "generated": result.total_generated, "saved": saved, "errors": result.errors } finally: db.close() except Exception as e: logger.error(f"Error in generate_content_batch: {e}") return {"success": False, "error": str(e)} # ============================================================ # INTERACTION TASKS # ============================================================ @shared_task def sync_interactions(): """ Sync interactions from all platforms. Runs every 15 minutes via Celery Beat. """ logger.info("Syncing interactions...") db = SessionLocal() try: from app.models.interaction import Interaction from app.models.post import Post from app.publishers.manager import publisher_manager, Platform synced = 0 # Get recent published posts recent_posts = db.query(Post).filter( Post.status == "published", Post.published_at >= datetime.utcnow() - timedelta(days=7) ).all() for post in recent_posts: for platform_name in post.platforms: try: platform = Platform(platform_name) publisher = publisher_manager.get_publisher(platform) if not publisher: continue # Get post ID from results results = post.publish_results or {} platform_result = results.get(platform_name, {}) platform_post_id = platform_result.get("post_id") if not platform_post_id: continue # Fetch comments comments = run_async( publisher.get_comments(platform_post_id) ) for comment in comments: # Check if already exists existing = db.query(Interaction).filter( Interaction.platform == platform_name, Interaction.platform_interaction_id == str(comment.get("id")) ).first() if not existing: interaction = Interaction( post_id=post.id, platform=platform_name, platform_interaction_id=str(comment.get("id")), interaction_type="comment", author_id=str(comment.get("author_id", comment.get("from", {}).get("id", ""))), author_username=comment.get("username", comment.get("from", {}).get("name", "")), content=comment.get("text", comment.get("message", "")), interaction_at=datetime.fromisoformat( comment.get("created_at", comment.get("timestamp", datetime.utcnow().isoformat())) ) if comment.get("created_at") or comment.get("timestamp") else datetime.utcnow() ) db.add(interaction) synced += 1 except Exception as e: logger.error(f"Error syncing {platform_name} for post {post.id}: {e}") db.commit() logger.info(f"Synced {synced} new interactions") return {"success": True, "synced": synced} except Exception as e: logger.error(f"Error in sync_interactions: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() # ============================================================ # NOTIFICATION TASKS # ============================================================ @shared_task def send_publish_notification(post_id: int, results: dict): """Send notification about publish result.""" if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: return {"success": False, "error": "Telegram not configured"} try: from app.services.notifications import telegram_notify db = SessionLocal() try: from app.models.post import Post post = db.query(Post).filter(Post.id == post_id).first() if not post: return {"success": False, "error": "Post not found"} # Build message success_count = sum(1 for r in results.values() if r.get("success")) total = len(results) if success_count == total: emoji = "✅" status = "Publicado" elif success_count > 0: emoji = "⚠️" status = "Parcialmente publicado" else: emoji = "❌" status = "Falló" message = f"{emoji} *{status}*\n\n" message += f"📝 {post.content[:100]}...\n\n" for platform, result in results.items(): icon = "✓" if result.get("success") else "✗" message += f"{icon} {platform}" if result.get("url"): message += f": {result['url']}" elif result.get("error"): message += f": {result['error'][:50]}" message += "\n" run_async(telegram_notify(message)) return {"success": True} finally: db.close() except Exception as e: logger.error(f"Error sending notification: {e}") return {"success": False, "error": str(e)} @shared_task def send_daily_summary(): """ Send daily summary of activity. Runs at 9 PM via Celery Beat. """ if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: return {"success": False, "error": "Telegram not configured"} db = SessionLocal() try: from app.models.post import Post from app.models.interaction import Interaction from app.services.notifications import telegram_notify today = datetime.utcnow().date() today_start = datetime.combine(today, datetime.min.time()) # Stats published = db.query(Post).filter( Post.status == "published", Post.published_at >= today_start ).count() failed = db.query(Post).filter( Post.status == "failed", Post.published_at >= today_start ).count() scheduled = db.query(Post).filter( Post.status == "scheduled", Post.scheduled_at >= datetime.utcnow() ).count() pending = db.query(Post).filter( Post.status == "pending_approval" ).count() new_interactions = db.query(Interaction).filter( Interaction.interaction_at >= today_start, Interaction.responded == False ).count() # Build message message = "📊 *Resumen del día*\n\n" message += f"✅ Publicados: {published}\n" if failed > 0: message += f"❌ Fallidos: {failed}\n" message += f"📅 Programados: {scheduled}\n" message += f"⏳ Pendientes aprobación: {pending}\n" message += f"💬 Nuevas interacciones: {new_interactions}\n" if new_interactions > 0: message += f"\n⚠️ Hay {new_interactions} interacciones sin responder" run_async(telegram_notify(message)) return {"success": True} except Exception as e: logger.error(f"Error sending daily summary: {e}") return {"success": False, "error": str(e)} finally: db.close() # ============================================================ # MAINTENANCE TASKS # ============================================================ @shared_task def cleanup_old_data(): """ Clean up old data to prevent database bloat. Runs weekly via Celery Beat. """ logger.info("Running cleanup...") db = SessionLocal() try: from app.models.post import Post from app.models.interaction import Interaction cutoff = datetime.utcnow() - timedelta(days=90) # Archive old published posts old_posts = db.query(Post).filter( Post.status == "published", Post.published_at < cutoff ).count() # Delete old archived interactions deleted = db.query(Interaction).filter( Interaction.is_archived == True, Interaction.interaction_at < cutoff ).delete() db.commit() # Clean up old images from app.services.image_upload import image_upload images_deleted = image_upload.cleanup_old(days=30) logger.info(f"Cleanup: {deleted} interactions, {images_deleted} images") return { "success": True, "interactions_deleted": deleted, "images_deleted": images_deleted } except Exception as e: logger.error(f"Error in cleanup: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close()