Files
social-media-automation/app/services/analytics_service.py
Consultoría AS ecc2ca73ea feat: Add Analytics, Odoo Integration, A/B Testing, and Content features
Phase 1 - Analytics y Reportes:
- PostMetrics and AnalyticsReport models for tracking engagement
- Analytics service with dashboard stats, top posts, optimal times
- 8 API endpoints at /api/analytics/*
- Interactive dashboard with Chart.js charts
- Celery tasks for metrics fetch (15min) and weekly reports

Phase 2 - Integración Odoo:
- Lead and OdooSyncLog models for CRM integration
- Odoo fields added to Product and Service models
- XML-RPC service for bidirectional sync
- Lead management API at /api/leads/*
- Leads dashboard template
- Celery tasks for product/service sync and lead export

Phase 3 - A/B Testing y Recycling:
- ABTest, ABTestVariant, RecycledPost models
- Statistical winner analysis using chi-square test
- Content recycling with engagement-based scoring
- APIs at /api/ab-tests/* and /api/recycling/*
- Automated test evaluation and content recycling tasks

Phase 4 - Thread Series y Templates:
- ThreadSeries and ThreadPost models for multi-post threads
- AI-powered thread generation
- Enhanced ImageTemplate with HTML template support
- APIs at /api/threads/* and /api/templates/*
- Thread scheduling with reply chain support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 03:10:42 +00:00

424 lines
15 KiB
Python

"""
Analytics Service - Track and analyze post performance.
"""
from datetime import datetime, date, timedelta
from typing import List, Dict, Optional
from sqlalchemy import func, desc
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.post import Post
from app.models.post_metrics import PostMetrics
from app.models.analytics_report import AnalyticsReport
from app.models.interaction import Interaction
class AnalyticsService:
"""Service for analytics and reporting."""
def __init__(self):
pass
def _get_db(self) -> Session:
"""Get database session."""
return SessionLocal()
async def get_dashboard_stats(
self,
days: int = 30,
platform: Optional[str] = None
) -> Dict:
"""Get dashboard statistics."""
db = self._get_db()
try:
start_date = datetime.utcnow() - timedelta(days=days)
# Base query for posts
posts_query = db.query(Post).filter(
Post.published_at >= start_date,
Post.status == "published"
)
if platform:
posts_query = posts_query.filter(Post.platforms.contains([platform]))
posts = posts_query.all()
# Aggregate metrics
total_impressions = 0
total_engagements = 0
total_likes = 0
total_comments = 0
total_shares = 0
for post in posts:
if post.metrics:
total_likes += post.metrics.get("likes", 0)
total_comments += post.metrics.get("comments", 0)
total_shares += post.metrics.get("shares", 0) + post.metrics.get("retweets", 0)
total_impressions += post.metrics.get("impressions", 0)
total_engagements = total_likes + total_comments + total_shares
# Calculate engagement rate
avg_engagement_rate = 0.0
if total_impressions > 0:
avg_engagement_rate = (total_engagements / total_impressions) * 100
# Posts by platform
platform_breakdown = {}
for post in posts:
for p in post.platforms:
if p not in platform_breakdown:
platform_breakdown[p] = {"posts": 0, "engagements": 0}
platform_breakdown[p]["posts"] += 1
if post.metrics:
platform_breakdown[p]["engagements"] += (
post.metrics.get("likes", 0) +
post.metrics.get("comments", 0) +
post.metrics.get("shares", 0)
)
# Posts by content type
content_breakdown = {}
for post in posts:
ct = post.content_type
if ct not in content_breakdown:
content_breakdown[ct] = {"posts": 0, "engagements": 0}
content_breakdown[ct]["posts"] += 1
if post.metrics:
content_breakdown[ct]["engagements"] += (
post.metrics.get("likes", 0) +
post.metrics.get("comments", 0)
)
# Pending interactions
pending_interactions = db.query(Interaction).filter(
Interaction.responded == False,
Interaction.is_archived == False
).count()
return {
"period_days": days,
"total_posts": len(posts),
"total_impressions": total_impressions,
"total_engagements": total_engagements,
"total_likes": total_likes,
"total_comments": total_comments,
"total_shares": total_shares,
"avg_engagement_rate": round(avg_engagement_rate, 2),
"platform_breakdown": platform_breakdown,
"content_breakdown": content_breakdown,
"pending_interactions": pending_interactions
}
finally:
db.close()
async def get_top_posts(
self,
days: int = 30,
limit: int = 10,
platform: Optional[str] = None
) -> List[Dict]:
"""Get top performing posts by engagement."""
db = self._get_db()
try:
start_date = datetime.utcnow() - timedelta(days=days)
posts_query = db.query(Post).filter(
Post.published_at >= start_date,
Post.status == "published",
Post.metrics.isnot(None)
)
if platform:
posts_query = posts_query.filter(Post.platforms.contains([platform]))
posts = posts_query.all()
# Calculate engagement for each post and sort
posts_with_engagement = []
for post in posts:
if post.metrics:
engagement = (
post.metrics.get("likes", 0) +
post.metrics.get("comments", 0) +
post.metrics.get("shares", 0) +
post.metrics.get("retweets", 0)
)
impressions = post.metrics.get("impressions", 1)
engagement_rate = (engagement / impressions * 100) if impressions > 0 else 0
posts_with_engagement.append({
"id": post.id,
"content": post.content[:100] + "..." if len(post.content) > 100 else post.content,
"content_type": post.content_type,
"platforms": post.platforms,
"published_at": post.published_at.isoformat() if post.published_at else None,
"likes": post.metrics.get("likes", 0),
"comments": post.metrics.get("comments", 0),
"shares": post.metrics.get("shares", 0) + post.metrics.get("retweets", 0),
"impressions": impressions,
"engagement_rate": round(engagement_rate, 2)
})
# Sort by engagement rate
posts_with_engagement.sort(key=lambda x: x["engagement_rate"], reverse=True)
return posts_with_engagement[:limit]
finally:
db.close()
async def get_optimal_times(
self,
platform: Optional[str] = None,
days: int = 90
) -> List[Dict]:
"""Calculate optimal posting times based on historical data."""
db = self._get_db()
try:
start_date = datetime.utcnow() - timedelta(days=days)
posts_query = db.query(Post).filter(
Post.published_at >= start_date,
Post.status == "published",
Post.metrics.isnot(None)
)
if platform:
posts_query = posts_query.filter(Post.platforms.contains([platform]))
posts = posts_query.all()
# Group by day of week and hour
time_slots = {} # {(day, hour): [engagement_rates]}
for post in posts:
if post.published_at and post.metrics:
day = post.published_at.weekday()
hour = post.published_at.hour
engagement = (
post.metrics.get("likes", 0) +
post.metrics.get("comments", 0) +
post.metrics.get("shares", 0)
)
impressions = post.metrics.get("impressions", 1)
rate = (engagement / impressions * 100) if impressions > 0 else 0
key = (day, hour)
if key not in time_slots:
time_slots[key] = []
time_slots[key].append(rate)
# Calculate averages
results = []
for (day, hour), rates in time_slots.items():
avg_rate = sum(rates) / len(rates) if rates else 0
results.append({
"day": day,
"day_name": ["Lun", "Mar", "Mié", "Jue", "Vie", "Sáb", "Dom"][day],
"hour": hour,
"hour_formatted": f"{hour:02d}:00",
"avg_engagement_rate": round(avg_rate, 2),
"sample_size": len(rates)
})
# Sort by engagement rate
results.sort(key=lambda x: x["avg_engagement_rate"], reverse=True)
return results
finally:
db.close()
async def generate_weekly_report(
self,
week_start: Optional[date] = None
) -> AnalyticsReport:
"""Generate weekly analytics report."""
db = self._get_db()
try:
if week_start is None:
# Last complete week
today = date.today()
week_start = today - timedelta(days=today.weekday() + 7)
week_end = week_start + timedelta(days=6)
start_dt = datetime.combine(week_start, datetime.min.time())
end_dt = datetime.combine(week_end, datetime.max.time())
# Previous week for comparison
prev_start = week_start - timedelta(days=7)
prev_end = week_end - timedelta(days=7)
prev_start_dt = datetime.combine(prev_start, datetime.min.time())
prev_end_dt = datetime.combine(prev_end, datetime.max.time())
# Current week posts
posts = db.query(Post).filter(
Post.published_at >= start_dt,
Post.published_at <= end_dt,
Post.status == "published"
).all()
# Previous week posts
prev_posts = db.query(Post).filter(
Post.published_at >= prev_start_dt,
Post.published_at <= prev_end_dt,
Post.status == "published"
).all()
# Calculate current week metrics
total_impressions = 0
total_engagements = 0
total_likes = 0
total_comments = 0
total_shares = 0
platform_breakdown = {}
content_performance = {}
for post in posts:
if post.metrics:
likes = post.metrics.get("likes", 0)
comments = post.metrics.get("comments", 0)
shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0)
impressions = post.metrics.get("impressions", 0)
total_likes += likes
total_comments += comments
total_shares += shares
total_impressions += impressions
# Platform breakdown
for p in post.platforms:
if p not in platform_breakdown:
platform_breakdown[p] = {"posts": 0, "engagements": 0}
platform_breakdown[p]["posts"] += 1
platform_breakdown[p]["engagements"] += likes + comments + shares
# Content type performance
ct = post.content_type
if ct not in content_performance:
content_performance[ct] = {"posts": 0, "engagements": 0, "impressions": 0}
content_performance[ct]["posts"] += 1
content_performance[ct]["engagements"] += likes + comments + shares
content_performance[ct]["impressions"] += impressions
total_engagements = total_likes + total_comments + total_shares
# Calculate previous week totals for comparison
prev_engagements = 0
for post in prev_posts:
if post.metrics:
prev_engagements += (
post.metrics.get("likes", 0) +
post.metrics.get("comments", 0) +
post.metrics.get("shares", 0)
)
# Calculate changes
posts_change = ((len(posts) - len(prev_posts)) / len(prev_posts) * 100) if prev_posts else 0
engagement_change = ((total_engagements - prev_engagements) / prev_engagements * 100) if prev_engagements else 0
# Get top posts
top_posts = await self.get_top_posts(days=7, limit=5)
# Get best times
best_times = await self.get_optimal_times(days=30)
# Calculate averages
avg_engagement_rate = (total_engagements / total_impressions * 100) if total_impressions > 0 else 0
avg_impressions = total_impressions / len(posts) if posts else 0
avg_engagements = total_engagements / len(posts) if posts else 0
# Create report
report = AnalyticsReport(
report_type="weekly",
period_start=week_start,
period_end=week_end,
total_posts=len(posts),
total_impressions=total_impressions,
total_engagements=total_engagements,
total_likes=total_likes,
total_comments=total_comments,
total_shares=total_shares,
avg_engagement_rate=avg_engagement_rate,
avg_impressions_per_post=avg_impressions,
avg_engagements_per_post=avg_engagements,
posts_change_pct=posts_change,
engagement_change_pct=engagement_change,
top_posts=top_posts[:5],
best_times=best_times[:10],
content_performance=content_performance,
platform_breakdown=platform_breakdown
)
# Generate summary text
report.generate_telegram_summary()
db.add(report)
db.commit()
db.refresh(report)
return report
finally:
db.close()
async def get_reports(
self,
report_type: str = "weekly",
limit: int = 10
) -> List[Dict]:
"""Get historical reports."""
db = self._get_db()
try:
reports = db.query(AnalyticsReport).filter(
AnalyticsReport.report_type == report_type
).order_by(desc(AnalyticsReport.period_start)).limit(limit).all()
return [r.to_dict() for r in reports]
finally:
db.close()
async def record_post_metrics(
self,
post_id: int,
platform: str,
metrics: Dict
) -> PostMetrics:
"""Record metrics snapshot for a post."""
db = self._get_db()
try:
post_metrics = PostMetrics(
post_id=post_id,
platform=platform,
likes=metrics.get("likes", 0),
comments=metrics.get("comments", 0),
shares=metrics.get("shares", 0),
impressions=metrics.get("impressions", 0),
reach=metrics.get("reach", 0),
saves=metrics.get("saves", 0),
clicks=metrics.get("clicks", 0),
replies=metrics.get("replies", 0),
quotes=metrics.get("quotes", 0)
)
post_metrics.calculate_engagement_rate()
db.add(post_metrics)
db.commit()
db.refresh(post_metrics)
return post_metrics
finally:
db.close()
# Global instance
analytics_service = AnalyticsService()