""" Thread Series Service - Create and manage multi-post threads. """ from datetime import datetime, timedelta from typing import List, Dict, Optional import logging from app.core.database import SessionLocal from app.core.config import settings from app.models.thread_series import ThreadSeries, ThreadPost from app.models.post import Post logger = logging.getLogger(__name__) class ThreadService: """Service for managing thread series.""" def _get_db(self): """Get database session.""" return SessionLocal() async def create_series( self, name: str, platform: str, posts_content: List[Dict], description: str = None, topic: str = None, schedule_type: str = "sequential", interval_minutes: int = 5, start_time: datetime = None, hashtags: List[str] = None, ai_generated: bool = False, generation_prompt: str = None ) -> ThreadSeries: """ Create a new thread series with posts. Args: name: Series name platform: Target platform (x, threads) posts_content: List of post content dicts [{"content": "...", "image_url": "..."}] description: Series description topic: Topic for categorization schedule_type: "sequential" or "timed" interval_minutes: Minutes between posts start_time: When to start publishing hashtags: Common hashtags for the series ai_generated: Whether content was AI-generated generation_prompt: The prompt used for AI generation Returns: Created ThreadSeries object """ db = self._get_db() try: # Create series series = ThreadSeries( name=name, description=description, topic=topic, platform=platform, schedule_type=schedule_type, interval_minutes=interval_minutes, start_time=start_time, total_posts=len(posts_content), status="draft", hashtags=hashtags, ai_generated=ai_generated, generation_prompt=generation_prompt ) db.add(series) db.flush() # Create thread posts for i, content_data in enumerate(posts_content, 1): thread_post = ThreadPost( series_id=series.id, sequence_number=i, content=content_data.get("content", ""), image_url=content_data.get("image_url"), status="pending" ) db.add(thread_post) db.commit() db.refresh(series) return series except Exception as e: logger.error(f"Error creating thread series: {e}") db.rollback() raise finally: db.close() async def generate_thread_with_ai( self, topic: str, platform: str, num_posts: int = 5, style: str = "educational" ) -> Dict: """ Generate a thread series using AI. Args: topic: The topic to generate content about platform: Target platform num_posts: Number of posts in the thread style: Style of content (educational, storytelling, tips) Returns: Dict with generated content """ if not settings.DEEPSEEK_API_KEY: return {"success": False, "error": "AI API not configured"} try: from app.services.content_generator import content_generator # Generate thread content prompt = f"""Genera un hilo de {num_posts} publicaciones sobre: {topic} Estilo: {style} Plataforma: {platform} Requisitos: 1. Cada publicación debe ser concisa (max 280 caracteres para X, 500 para Threads) 2. La primera publicación debe captar atención 3. Cada publicación debe añadir valor y conectar con la siguiente 4. La última debe tener un llamado a la acción 5. Usa emojis apropiadamente 6. Mantén el tono profesional pero accesible Formato de respuesta (JSON): [ {{"sequence": 1, "content": "..."}}, {{"sequence": 2, "content": "..."}}, ... ] """ # Use the content generator to get AI response response = await content_generator._call_deepseek( prompt, temperature=0.7 ) # Parse the response import json import re # Try to extract JSON from response json_match = re.search(r'\[[\s\S]*\]', response) if json_match: posts_data = json.loads(json_match.group()) return { "success": True, "topic": topic, "platform": platform, "style": style, "posts": posts_data, "count": len(posts_data) } else: return {"success": False, "error": "Could not parse AI response"} except Exception as e: logger.error(f"Error generating thread with AI: {e}") return {"success": False, "error": str(e)} async def schedule_series( self, series_id: int, start_time: datetime = None ) -> Dict: """ Schedule a thread series for publishing. Args: series_id: ID of the series start_time: When to start (defaults to now + 5 minutes) Returns: Dict with scheduling info """ db = self._get_db() try: series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() if not series: return {"success": False, "error": "Series not found"} if series.status not in ["draft", "paused"]: return {"success": False, "error": f"Series is {series.status}, cannot schedule"} thread_posts = db.query(ThreadPost).filter( ThreadPost.series_id == series_id ).order_by(ThreadPost.sequence_number).all() if not thread_posts: return {"success": False, "error": "Series has no posts"} # Set start time start = start_time or (datetime.utcnow() + timedelta(minutes=5)) series.start_time = start # Schedule each post for i, thread_post in enumerate(thread_posts): scheduled_at = start + timedelta(minutes=i * series.interval_minutes) # Create the actual post post = Post( content=thread_post.content, content_type="thread", platforms=[series.platform], status="scheduled", scheduled_at=scheduled_at, hashtags=series.hashtags, image_url=thread_post.image_url ) db.add(post) db.flush() thread_post.post_id = post.id thread_post.scheduled_at = scheduled_at thread_post.status = "scheduled" series.status = "scheduled" db.commit() return { "success": True, "series_id": series_id, "start_time": start.isoformat(), "posts_scheduled": len(thread_posts) } except Exception as e: logger.error(f"Error scheduling series: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() async def publish_next_post(self, series_id: int) -> Dict: """ Publish the next pending post in a series. Args: series_id: ID of the series Returns: Dict with publish result """ db = self._get_db() try: series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() if not series: return {"success": False, "error": "Series not found"} # Find next unpublished post next_post = db.query(ThreadPost).filter( ThreadPost.series_id == series_id, ThreadPost.status.in_(["pending", "scheduled"]) ).order_by(ThreadPost.sequence_number).first() if not next_post: # All posts published series.status = "completed" series.completed_at = datetime.utcnow() db.commit() return { "success": True, "message": "All posts in series have been published", "series_completed": True } # Publish the post (this would normally trigger the publish task) # For now, just mark it and let the scheduler handle it if next_post.sequence_number == 1: series.status = "publishing" # Update series progress series.posts_published = db.query(ThreadPost).filter( ThreadPost.series_id == series_id, ThreadPost.status == "published" ).count() db.commit() return { "success": True, "post_id": next_post.post_id, "sequence_number": next_post.sequence_number, "total_posts": series.total_posts } except Exception as e: logger.error(f"Error publishing next post: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() async def get_series(self, series_id: int) -> Optional[Dict]: """Get a series with its posts.""" db = self._get_db() try: series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() if series: return series.to_dict(include_posts=True) return None finally: db.close() async def get_series_list( self, status: str = None, platform: str = None, limit: int = 20 ) -> List[Dict]: """Get list of thread series.""" db = self._get_db() try: query = db.query(ThreadSeries) if status: query = query.filter(ThreadSeries.status == status) if platform: query = query.filter(ThreadSeries.platform == platform) series_list = query.order_by(ThreadSeries.created_at.desc()).limit(limit).all() return [s.to_dict(include_posts=False) for s in series_list] finally: db.close() async def cancel_series(self, series_id: int) -> Dict: """Cancel a series and its scheduled posts.""" db = self._get_db() try: series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() if not series: return {"success": False, "error": "Series not found"} series.status = "cancelled" # Cancel scheduled posts thread_posts = db.query(ThreadPost).filter( ThreadPost.series_id == series_id, ThreadPost.status == "scheduled" ).all() for tp in thread_posts: tp.status = "cancelled" if tp.post_id: post = db.query(Post).filter(Post.id == tp.post_id).first() if post and post.status == "scheduled": post.status = "cancelled" db.commit() return {"success": True, "message": "Series cancelled"} except Exception as e: logger.error(f"Error cancelling series: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() # Global instance thread_service = ThreadService()