Phase 1 - Analytics y Reportes: - PostMetrics and AnalyticsReport models for tracking engagement - Analytics service with dashboard stats, top posts, optimal times - 8 API endpoints at /api/analytics/* - Interactive dashboard with Chart.js charts - Celery tasks for metrics fetch (15min) and weekly reports Phase 2 - Integración Odoo: - Lead and OdooSyncLog models for CRM integration - Odoo fields added to Product and Service models - XML-RPC service for bidirectional sync - Lead management API at /api/leads/* - Leads dashboard template - Celery tasks for product/service sync and lead export Phase 3 - A/B Testing y Recycling: - ABTest, ABTestVariant, RecycledPost models - Statistical winner analysis using chi-square test - Content recycling with engagement-based scoring - APIs at /api/ab-tests/* and /api/recycling/* - Automated test evaluation and content recycling tasks Phase 4 - Thread Series y Templates: - ThreadSeries and ThreadPost models for multi-post threads - AI-powered thread generation - Enhanced ImageTemplate with HTML template support - APIs at /api/threads/* and /api/templates/* - Thread scheduling with reply chain support Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
424 lines
15 KiB
Python
424 lines
15 KiB
Python
"""
|
|
Analytics Service - Track and analyze post performance.
|
|
"""
|
|
|
|
from datetime import datetime, date, timedelta
|
|
from typing import List, Dict, Optional
|
|
from sqlalchemy import func, desc
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import SessionLocal
|
|
from app.models.post import Post
|
|
from app.models.post_metrics import PostMetrics
|
|
from app.models.analytics_report import AnalyticsReport
|
|
from app.models.interaction import Interaction
|
|
|
|
|
|
class AnalyticsService:
|
|
"""Service for analytics and reporting."""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def _get_db(self) -> Session:
|
|
"""Get database session."""
|
|
return SessionLocal()
|
|
|
|
async def get_dashboard_stats(
|
|
self,
|
|
days: int = 30,
|
|
platform: Optional[str] = None
|
|
) -> Dict:
|
|
"""Get dashboard statistics."""
|
|
db = self._get_db()
|
|
try:
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
# Base query for posts
|
|
posts_query = db.query(Post).filter(
|
|
Post.published_at >= start_date,
|
|
Post.status == "published"
|
|
)
|
|
if platform:
|
|
posts_query = posts_query.filter(Post.platforms.contains([platform]))
|
|
|
|
posts = posts_query.all()
|
|
|
|
# Aggregate metrics
|
|
total_impressions = 0
|
|
total_engagements = 0
|
|
total_likes = 0
|
|
total_comments = 0
|
|
total_shares = 0
|
|
|
|
for post in posts:
|
|
if post.metrics:
|
|
total_likes += post.metrics.get("likes", 0)
|
|
total_comments += post.metrics.get("comments", 0)
|
|
total_shares += post.metrics.get("shares", 0) + post.metrics.get("retweets", 0)
|
|
total_impressions += post.metrics.get("impressions", 0)
|
|
|
|
total_engagements = total_likes + total_comments + total_shares
|
|
|
|
# Calculate engagement rate
|
|
avg_engagement_rate = 0.0
|
|
if total_impressions > 0:
|
|
avg_engagement_rate = (total_engagements / total_impressions) * 100
|
|
|
|
# Posts by platform
|
|
platform_breakdown = {}
|
|
for post in posts:
|
|
for p in post.platforms:
|
|
if p not in platform_breakdown:
|
|
platform_breakdown[p] = {"posts": 0, "engagements": 0}
|
|
platform_breakdown[p]["posts"] += 1
|
|
if post.metrics:
|
|
platform_breakdown[p]["engagements"] += (
|
|
post.metrics.get("likes", 0) +
|
|
post.metrics.get("comments", 0) +
|
|
post.metrics.get("shares", 0)
|
|
)
|
|
|
|
# Posts by content type
|
|
content_breakdown = {}
|
|
for post in posts:
|
|
ct = post.content_type
|
|
if ct not in content_breakdown:
|
|
content_breakdown[ct] = {"posts": 0, "engagements": 0}
|
|
content_breakdown[ct]["posts"] += 1
|
|
if post.metrics:
|
|
content_breakdown[ct]["engagements"] += (
|
|
post.metrics.get("likes", 0) +
|
|
post.metrics.get("comments", 0)
|
|
)
|
|
|
|
# Pending interactions
|
|
pending_interactions = db.query(Interaction).filter(
|
|
Interaction.responded == False,
|
|
Interaction.is_archived == False
|
|
).count()
|
|
|
|
return {
|
|
"period_days": days,
|
|
"total_posts": len(posts),
|
|
"total_impressions": total_impressions,
|
|
"total_engagements": total_engagements,
|
|
"total_likes": total_likes,
|
|
"total_comments": total_comments,
|
|
"total_shares": total_shares,
|
|
"avg_engagement_rate": round(avg_engagement_rate, 2),
|
|
"platform_breakdown": platform_breakdown,
|
|
"content_breakdown": content_breakdown,
|
|
"pending_interactions": pending_interactions
|
|
}
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def get_top_posts(
|
|
self,
|
|
days: int = 30,
|
|
limit: int = 10,
|
|
platform: Optional[str] = None
|
|
) -> List[Dict]:
|
|
"""Get top performing posts by engagement."""
|
|
db = self._get_db()
|
|
try:
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
posts_query = db.query(Post).filter(
|
|
Post.published_at >= start_date,
|
|
Post.status == "published",
|
|
Post.metrics.isnot(None)
|
|
)
|
|
|
|
if platform:
|
|
posts_query = posts_query.filter(Post.platforms.contains([platform]))
|
|
|
|
posts = posts_query.all()
|
|
|
|
# Calculate engagement for each post and sort
|
|
posts_with_engagement = []
|
|
for post in posts:
|
|
if post.metrics:
|
|
engagement = (
|
|
post.metrics.get("likes", 0) +
|
|
post.metrics.get("comments", 0) +
|
|
post.metrics.get("shares", 0) +
|
|
post.metrics.get("retweets", 0)
|
|
)
|
|
impressions = post.metrics.get("impressions", 1)
|
|
engagement_rate = (engagement / impressions * 100) if impressions > 0 else 0
|
|
|
|
posts_with_engagement.append({
|
|
"id": post.id,
|
|
"content": post.content[:100] + "..." if len(post.content) > 100 else post.content,
|
|
"content_type": post.content_type,
|
|
"platforms": post.platforms,
|
|
"published_at": post.published_at.isoformat() if post.published_at else None,
|
|
"likes": post.metrics.get("likes", 0),
|
|
"comments": post.metrics.get("comments", 0),
|
|
"shares": post.metrics.get("shares", 0) + post.metrics.get("retweets", 0),
|
|
"impressions": impressions,
|
|
"engagement_rate": round(engagement_rate, 2)
|
|
})
|
|
|
|
# Sort by engagement rate
|
|
posts_with_engagement.sort(key=lambda x: x["engagement_rate"], reverse=True)
|
|
|
|
return posts_with_engagement[:limit]
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def get_optimal_times(
|
|
self,
|
|
platform: Optional[str] = None,
|
|
days: int = 90
|
|
) -> List[Dict]:
|
|
"""Calculate optimal posting times based on historical data."""
|
|
db = self._get_db()
|
|
try:
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
posts_query = db.query(Post).filter(
|
|
Post.published_at >= start_date,
|
|
Post.status == "published",
|
|
Post.metrics.isnot(None)
|
|
)
|
|
|
|
if platform:
|
|
posts_query = posts_query.filter(Post.platforms.contains([platform]))
|
|
|
|
posts = posts_query.all()
|
|
|
|
# Group by day of week and hour
|
|
time_slots = {} # {(day, hour): [engagement_rates]}
|
|
|
|
for post in posts:
|
|
if post.published_at and post.metrics:
|
|
day = post.published_at.weekday()
|
|
hour = post.published_at.hour
|
|
|
|
engagement = (
|
|
post.metrics.get("likes", 0) +
|
|
post.metrics.get("comments", 0) +
|
|
post.metrics.get("shares", 0)
|
|
)
|
|
impressions = post.metrics.get("impressions", 1)
|
|
rate = (engagement / impressions * 100) if impressions > 0 else 0
|
|
|
|
key = (day, hour)
|
|
if key not in time_slots:
|
|
time_slots[key] = []
|
|
time_slots[key].append(rate)
|
|
|
|
# Calculate averages
|
|
results = []
|
|
for (day, hour), rates in time_slots.items():
|
|
avg_rate = sum(rates) / len(rates) if rates else 0
|
|
results.append({
|
|
"day": day,
|
|
"day_name": ["Lun", "Mar", "Mié", "Jue", "Vie", "Sáb", "Dom"][day],
|
|
"hour": hour,
|
|
"hour_formatted": f"{hour:02d}:00",
|
|
"avg_engagement_rate": round(avg_rate, 2),
|
|
"sample_size": len(rates)
|
|
})
|
|
|
|
# Sort by engagement rate
|
|
results.sort(key=lambda x: x["avg_engagement_rate"], reverse=True)
|
|
|
|
return results
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def generate_weekly_report(
|
|
self,
|
|
week_start: Optional[date] = None
|
|
) -> AnalyticsReport:
|
|
"""Generate weekly analytics report."""
|
|
db = self._get_db()
|
|
try:
|
|
if week_start is None:
|
|
# Last complete week
|
|
today = date.today()
|
|
week_start = today - timedelta(days=today.weekday() + 7)
|
|
|
|
week_end = week_start + timedelta(days=6)
|
|
start_dt = datetime.combine(week_start, datetime.min.time())
|
|
end_dt = datetime.combine(week_end, datetime.max.time())
|
|
|
|
# Previous week for comparison
|
|
prev_start = week_start - timedelta(days=7)
|
|
prev_end = week_end - timedelta(days=7)
|
|
prev_start_dt = datetime.combine(prev_start, datetime.min.time())
|
|
prev_end_dt = datetime.combine(prev_end, datetime.max.time())
|
|
|
|
# Current week posts
|
|
posts = db.query(Post).filter(
|
|
Post.published_at >= start_dt,
|
|
Post.published_at <= end_dt,
|
|
Post.status == "published"
|
|
).all()
|
|
|
|
# Previous week posts
|
|
prev_posts = db.query(Post).filter(
|
|
Post.published_at >= prev_start_dt,
|
|
Post.published_at <= prev_end_dt,
|
|
Post.status == "published"
|
|
).all()
|
|
|
|
# Calculate current week metrics
|
|
total_impressions = 0
|
|
total_engagements = 0
|
|
total_likes = 0
|
|
total_comments = 0
|
|
total_shares = 0
|
|
platform_breakdown = {}
|
|
content_performance = {}
|
|
|
|
for post in posts:
|
|
if post.metrics:
|
|
likes = post.metrics.get("likes", 0)
|
|
comments = post.metrics.get("comments", 0)
|
|
shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0)
|
|
impressions = post.metrics.get("impressions", 0)
|
|
|
|
total_likes += likes
|
|
total_comments += comments
|
|
total_shares += shares
|
|
total_impressions += impressions
|
|
|
|
# Platform breakdown
|
|
for p in post.platforms:
|
|
if p not in platform_breakdown:
|
|
platform_breakdown[p] = {"posts": 0, "engagements": 0}
|
|
platform_breakdown[p]["posts"] += 1
|
|
platform_breakdown[p]["engagements"] += likes + comments + shares
|
|
|
|
# Content type performance
|
|
ct = post.content_type
|
|
if ct not in content_performance:
|
|
content_performance[ct] = {"posts": 0, "engagements": 0, "impressions": 0}
|
|
content_performance[ct]["posts"] += 1
|
|
content_performance[ct]["engagements"] += likes + comments + shares
|
|
content_performance[ct]["impressions"] += impressions
|
|
|
|
total_engagements = total_likes + total_comments + total_shares
|
|
|
|
# Calculate previous week totals for comparison
|
|
prev_engagements = 0
|
|
for post in prev_posts:
|
|
if post.metrics:
|
|
prev_engagements += (
|
|
post.metrics.get("likes", 0) +
|
|
post.metrics.get("comments", 0) +
|
|
post.metrics.get("shares", 0)
|
|
)
|
|
|
|
# Calculate changes
|
|
posts_change = ((len(posts) - len(prev_posts)) / len(prev_posts) * 100) if prev_posts else 0
|
|
engagement_change = ((total_engagements - prev_engagements) / prev_engagements * 100) if prev_engagements else 0
|
|
|
|
# Get top posts
|
|
top_posts = await self.get_top_posts(days=7, limit=5)
|
|
|
|
# Get best times
|
|
best_times = await self.get_optimal_times(days=30)
|
|
|
|
# Calculate averages
|
|
avg_engagement_rate = (total_engagements / total_impressions * 100) if total_impressions > 0 else 0
|
|
avg_impressions = total_impressions / len(posts) if posts else 0
|
|
avg_engagements = total_engagements / len(posts) if posts else 0
|
|
|
|
# Create report
|
|
report = AnalyticsReport(
|
|
report_type="weekly",
|
|
period_start=week_start,
|
|
period_end=week_end,
|
|
total_posts=len(posts),
|
|
total_impressions=total_impressions,
|
|
total_engagements=total_engagements,
|
|
total_likes=total_likes,
|
|
total_comments=total_comments,
|
|
total_shares=total_shares,
|
|
avg_engagement_rate=avg_engagement_rate,
|
|
avg_impressions_per_post=avg_impressions,
|
|
avg_engagements_per_post=avg_engagements,
|
|
posts_change_pct=posts_change,
|
|
engagement_change_pct=engagement_change,
|
|
top_posts=top_posts[:5],
|
|
best_times=best_times[:10],
|
|
content_performance=content_performance,
|
|
platform_breakdown=platform_breakdown
|
|
)
|
|
|
|
# Generate summary text
|
|
report.generate_telegram_summary()
|
|
|
|
db.add(report)
|
|
db.commit()
|
|
db.refresh(report)
|
|
|
|
return report
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def get_reports(
|
|
self,
|
|
report_type: str = "weekly",
|
|
limit: int = 10
|
|
) -> List[Dict]:
|
|
"""Get historical reports."""
|
|
db = self._get_db()
|
|
try:
|
|
reports = db.query(AnalyticsReport).filter(
|
|
AnalyticsReport.report_type == report_type
|
|
).order_by(desc(AnalyticsReport.period_start)).limit(limit).all()
|
|
|
|
return [r.to_dict() for r in reports]
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def record_post_metrics(
|
|
self,
|
|
post_id: int,
|
|
platform: str,
|
|
metrics: Dict
|
|
) -> PostMetrics:
|
|
"""Record metrics snapshot for a post."""
|
|
db = self._get_db()
|
|
try:
|
|
post_metrics = PostMetrics(
|
|
post_id=post_id,
|
|
platform=platform,
|
|
likes=metrics.get("likes", 0),
|
|
comments=metrics.get("comments", 0),
|
|
shares=metrics.get("shares", 0),
|
|
impressions=metrics.get("impressions", 0),
|
|
reach=metrics.get("reach", 0),
|
|
saves=metrics.get("saves", 0),
|
|
clicks=metrics.get("clicks", 0),
|
|
replies=metrics.get("replies", 0),
|
|
quotes=metrics.get("quotes", 0)
|
|
)
|
|
|
|
post_metrics.calculate_engagement_rate()
|
|
|
|
db.add(post_metrics)
|
|
db.commit()
|
|
db.refresh(post_metrics)
|
|
|
|
return post_metrics
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Global instance
|
|
analytics_service = AnalyticsService()
|