""" Analytics Service - Track and analyze post performance. """ from datetime import datetime, date, timedelta from typing import List, Dict, Optional from sqlalchemy import func, desc from sqlalchemy.orm import Session from app.core.database import SessionLocal from app.models.post import Post from app.models.post_metrics import PostMetrics from app.models.analytics_report import AnalyticsReport from app.models.interaction import Interaction class AnalyticsService: """Service for analytics and reporting.""" def __init__(self): pass def _get_db(self) -> Session: """Get database session.""" return SessionLocal() async def get_dashboard_stats( self, days: int = 30, platform: Optional[str] = None ) -> Dict: """Get dashboard statistics.""" db = self._get_db() try: start_date = datetime.utcnow() - timedelta(days=days) # Base query for posts posts_query = db.query(Post).filter( Post.published_at >= start_date, Post.status == "published" ) if platform: posts_query = posts_query.filter(Post.platforms.contains([platform])) posts = posts_query.all() # Aggregate metrics total_impressions = 0 total_engagements = 0 total_likes = 0 total_comments = 0 total_shares = 0 for post in posts: if post.metrics: total_likes += post.metrics.get("likes", 0) total_comments += post.metrics.get("comments", 0) total_shares += post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) total_impressions += post.metrics.get("impressions", 0) total_engagements = total_likes + total_comments + total_shares # Calculate engagement rate avg_engagement_rate = 0.0 if total_impressions > 0: avg_engagement_rate = (total_engagements / total_impressions) * 100 # Posts by platform platform_breakdown = {} for post in posts: for p in post.platforms: if p not in platform_breakdown: platform_breakdown[p] = {"posts": 0, "engagements": 0} platform_breakdown[p]["posts"] += 1 if post.metrics: platform_breakdown[p]["engagements"] += ( post.metrics.get("likes", 0) + post.metrics.get("comments", 0) + post.metrics.get("shares", 0) ) # Posts by content type content_breakdown = {} for post in posts: ct = post.content_type if ct not in content_breakdown: content_breakdown[ct] = {"posts": 0, "engagements": 0} content_breakdown[ct]["posts"] += 1 if post.metrics: content_breakdown[ct]["engagements"] += ( post.metrics.get("likes", 0) + post.metrics.get("comments", 0) ) # Pending interactions pending_interactions = db.query(Interaction).filter( Interaction.responded == False, Interaction.is_archived == False ).count() return { "period_days": days, "total_posts": len(posts), "total_impressions": total_impressions, "total_engagements": total_engagements, "total_likes": total_likes, "total_comments": total_comments, "total_shares": total_shares, "avg_engagement_rate": round(avg_engagement_rate, 2), "platform_breakdown": platform_breakdown, "content_breakdown": content_breakdown, "pending_interactions": pending_interactions } finally: db.close() async def get_top_posts( self, days: int = 30, limit: int = 10, platform: Optional[str] = None ) -> List[Dict]: """Get top performing posts by engagement.""" db = self._get_db() try: start_date = datetime.utcnow() - timedelta(days=days) posts_query = db.query(Post).filter( Post.published_at >= start_date, Post.status == "published", Post.metrics.isnot(None) ) if platform: posts_query = posts_query.filter(Post.platforms.contains([platform])) posts = posts_query.all() # Calculate engagement for each post and sort posts_with_engagement = [] for post in posts: if post.metrics: engagement = ( post.metrics.get("likes", 0) + post.metrics.get("comments", 0) + post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) ) impressions = post.metrics.get("impressions", 1) engagement_rate = (engagement / impressions * 100) if impressions > 0 else 0 posts_with_engagement.append({ "id": post.id, "content": post.content[:100] + "..." if len(post.content) > 100 else post.content, "content_type": post.content_type, "platforms": post.platforms, "published_at": post.published_at.isoformat() if post.published_at else None, "likes": post.metrics.get("likes", 0), "comments": post.metrics.get("comments", 0), "shares": post.metrics.get("shares", 0) + post.metrics.get("retweets", 0), "impressions": impressions, "engagement_rate": round(engagement_rate, 2) }) # Sort by engagement rate posts_with_engagement.sort(key=lambda x: x["engagement_rate"], reverse=True) return posts_with_engagement[:limit] finally: db.close() async def get_optimal_times( self, platform: Optional[str] = None, days: int = 90 ) -> List[Dict]: """Calculate optimal posting times based on historical data.""" db = self._get_db() try: start_date = datetime.utcnow() - timedelta(days=days) posts_query = db.query(Post).filter( Post.published_at >= start_date, Post.status == "published", Post.metrics.isnot(None) ) if platform: posts_query = posts_query.filter(Post.platforms.contains([platform])) posts = posts_query.all() # Group by day of week and hour time_slots = {} # {(day, hour): [engagement_rates]} for post in posts: if post.published_at and post.metrics: day = post.published_at.weekday() hour = post.published_at.hour engagement = ( post.metrics.get("likes", 0) + post.metrics.get("comments", 0) + post.metrics.get("shares", 0) ) impressions = post.metrics.get("impressions", 1) rate = (engagement / impressions * 100) if impressions > 0 else 0 key = (day, hour) if key not in time_slots: time_slots[key] = [] time_slots[key].append(rate) # Calculate averages results = [] for (day, hour), rates in time_slots.items(): avg_rate = sum(rates) / len(rates) if rates else 0 results.append({ "day": day, "day_name": ["Lun", "Mar", "Mié", "Jue", "Vie", "Sáb", "Dom"][day], "hour": hour, "hour_formatted": f"{hour:02d}:00", "avg_engagement_rate": round(avg_rate, 2), "sample_size": len(rates) }) # Sort by engagement rate results.sort(key=lambda x: x["avg_engagement_rate"], reverse=True) return results finally: db.close() async def generate_weekly_report( self, week_start: Optional[date] = None ) -> AnalyticsReport: """Generate weekly analytics report.""" db = self._get_db() try: if week_start is None: # Last complete week today = date.today() week_start = today - timedelta(days=today.weekday() + 7) week_end = week_start + timedelta(days=6) start_dt = datetime.combine(week_start, datetime.min.time()) end_dt = datetime.combine(week_end, datetime.max.time()) # Previous week for comparison prev_start = week_start - timedelta(days=7) prev_end = week_end - timedelta(days=7) prev_start_dt = datetime.combine(prev_start, datetime.min.time()) prev_end_dt = datetime.combine(prev_end, datetime.max.time()) # Current week posts posts = db.query(Post).filter( Post.published_at >= start_dt, Post.published_at <= end_dt, Post.status == "published" ).all() # Previous week posts prev_posts = db.query(Post).filter( Post.published_at >= prev_start_dt, Post.published_at <= prev_end_dt, Post.status == "published" ).all() # Calculate current week metrics total_impressions = 0 total_engagements = 0 total_likes = 0 total_comments = 0 total_shares = 0 platform_breakdown = {} content_performance = {} for post in posts: if post.metrics: likes = post.metrics.get("likes", 0) comments = post.metrics.get("comments", 0) shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) impressions = post.metrics.get("impressions", 0) total_likes += likes total_comments += comments total_shares += shares total_impressions += impressions # Platform breakdown for p in post.platforms: if p not in platform_breakdown: platform_breakdown[p] = {"posts": 0, "engagements": 0} platform_breakdown[p]["posts"] += 1 platform_breakdown[p]["engagements"] += likes + comments + shares # Content type performance ct = post.content_type if ct not in content_performance: content_performance[ct] = {"posts": 0, "engagements": 0, "impressions": 0} content_performance[ct]["posts"] += 1 content_performance[ct]["engagements"] += likes + comments + shares content_performance[ct]["impressions"] += impressions total_engagements = total_likes + total_comments + total_shares # Calculate previous week totals for comparison prev_engagements = 0 for post in prev_posts: if post.metrics: prev_engagements += ( post.metrics.get("likes", 0) + post.metrics.get("comments", 0) + post.metrics.get("shares", 0) ) # Calculate changes posts_change = ((len(posts) - len(prev_posts)) / len(prev_posts) * 100) if prev_posts else 0 engagement_change = ((total_engagements - prev_engagements) / prev_engagements * 100) if prev_engagements else 0 # Get top posts top_posts = await self.get_top_posts(days=7, limit=5) # Get best times best_times = await self.get_optimal_times(days=30) # Calculate averages avg_engagement_rate = (total_engagements / total_impressions * 100) if total_impressions > 0 else 0 avg_impressions = total_impressions / len(posts) if posts else 0 avg_engagements = total_engagements / len(posts) if posts else 0 # Create report report = AnalyticsReport( report_type="weekly", period_start=week_start, period_end=week_end, total_posts=len(posts), total_impressions=total_impressions, total_engagements=total_engagements, total_likes=total_likes, total_comments=total_comments, total_shares=total_shares, avg_engagement_rate=avg_engagement_rate, avg_impressions_per_post=avg_impressions, avg_engagements_per_post=avg_engagements, posts_change_pct=posts_change, engagement_change_pct=engagement_change, top_posts=top_posts[:5], best_times=best_times[:10], content_performance=content_performance, platform_breakdown=platform_breakdown ) # Generate summary text report.generate_telegram_summary() db.add(report) db.commit() db.refresh(report) return report finally: db.close() async def get_reports( self, report_type: str = "weekly", limit: int = 10 ) -> List[Dict]: """Get historical reports.""" db = self._get_db() try: reports = db.query(AnalyticsReport).filter( AnalyticsReport.report_type == report_type ).order_by(desc(AnalyticsReport.period_start)).limit(limit).all() return [r.to_dict() for r in reports] finally: db.close() async def record_post_metrics( self, post_id: int, platform: str, metrics: Dict ) -> PostMetrics: """Record metrics snapshot for a post.""" db = self._get_db() try: post_metrics = PostMetrics( post_id=post_id, platform=platform, likes=metrics.get("likes", 0), comments=metrics.get("comments", 0), shares=metrics.get("shares", 0), impressions=metrics.get("impressions", 0), reach=metrics.get("reach", 0), saves=metrics.get("saves", 0), clicks=metrics.get("clicks", 0), replies=metrics.get("replies", 0), quotes=metrics.get("quotes", 0) ) post_metrics.calculate_engagement_rate() db.add(post_metrics) db.commit() db.refresh(post_metrics) return post_metrics finally: db.close() # Global instance analytics_service = AnalyticsService()