Files
social-media-automation/app/services/recycling_service.py
Consultoría AS ecc2ca73ea feat: Add Analytics, Odoo Integration, A/B Testing, and Content features
Phase 1 - Analytics y Reportes:
- PostMetrics and AnalyticsReport models for tracking engagement
- Analytics service with dashboard stats, top posts, optimal times
- 8 API endpoints at /api/analytics/*
- Interactive dashboard with Chart.js charts
- Celery tasks for metrics fetch (15min) and weekly reports

Phase 2 - Integración Odoo:
- Lead and OdooSyncLog models for CRM integration
- Odoo fields added to Product and Service models
- XML-RPC service for bidirectional sync
- Lead management API at /api/leads/*
- Leads dashboard template
- Celery tasks for product/service sync and lead export

Phase 3 - A/B Testing y Recycling:
- ABTest, ABTestVariant, RecycledPost models
- Statistical winner analysis using chi-square test
- Content recycling with engagement-based scoring
- APIs at /api/ab-tests/* and /api/recycling/*
- Automated test evaluation and content recycling tasks

Phase 4 - Thread Series y Templates:
- ThreadSeries and ThreadPost models for multi-post threads
- AI-powered thread generation
- Enhanced ImageTemplate with HTML template support
- APIs at /api/threads/* and /api/templates/*
- Thread scheduling with reply chain support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 03:10:42 +00:00

328 lines
11 KiB
Python

"""
Content Recycling Service - Republish high-performing evergreen content.
"""
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
from app.core.database import SessionLocal
from app.models.post import Post
from app.models.recycled_post import RecycledPost
logger = logging.getLogger(__name__)
class RecyclingService:
"""Service for recycling high-performing content."""
# Default settings
MIN_DAYS_SINCE_PUBLISH = 30 # Don't recycle recent posts
MIN_ENGAGEMENT_RATE = 2.0 # Minimum engagement rate to consider
MAX_RECYCLE_COUNT = 3 # Maximum times to recycle a post
def _get_db(self):
"""Get database session."""
return SessionLocal()
def _calculate_engagement_rate(self, metrics: dict) -> float:
"""Calculate engagement rate from metrics."""
if not metrics:
return 0.0
impressions = metrics.get("impressions", 0)
if impressions == 0:
return 0.0
engagements = (
metrics.get("likes", 0) +
metrics.get("comments", 0) +
metrics.get("shares", 0) +
metrics.get("retweets", 0)
)
return (engagements / impressions) * 100
async def find_recyclable_posts(
self,
platform: str = None,
content_type: str = None,
min_engagement_rate: float = None,
min_days_since_publish: int = None,
limit: int = 20
) -> List[Dict]:
"""
Find posts that are good candidates for recycling.
Args:
platform: Filter by platform
content_type: Filter by content type
min_engagement_rate: Minimum engagement rate threshold
min_days_since_publish: Minimum days since original publish
limit: Maximum candidates to return
Returns:
List of recyclable post candidates with scores
"""
db = self._get_db()
try:
min_eng = min_engagement_rate or self.MIN_ENGAGEMENT_RATE
min_days = min_days_since_publish or self.MIN_DAYS_SINCE_PUBLISH
cutoff_date = datetime.utcnow() - timedelta(days=min_days)
# Query for published posts
query = db.query(Post).filter(
Post.status == "published",
Post.published_at <= cutoff_date,
Post.is_recyclable == True,
Post.recycle_count < self.MAX_RECYCLE_COUNT,
Post.metrics.isnot(None)
)
if platform:
query = query.filter(Post.platforms.contains([platform]))
if content_type:
query = query.filter(Post.content_type == content_type)
posts = query.all()
# Calculate engagement and filter/sort
candidates = []
for post in posts:
engagement_rate = self._calculate_engagement_rate(post.metrics)
if engagement_rate >= min_eng:
# Calculate a recycling score
days_since_publish = (datetime.utcnow() - post.published_at).days
recency_factor = min(days_since_publish / 90, 1.0) # Max out at 90 days
recycled_penalty = 1 - (post.recycle_count * 0.2) # 20% penalty per recycle
score = engagement_rate * recency_factor * recycled_penalty
candidates.append({
"id": post.id,
"content": post.content[:100] + "..." if len(post.content) > 100 else post.content,
"full_content": post.content,
"content_type": post.content_type,
"platforms": post.platforms,
"published_at": post.published_at.isoformat(),
"days_since_publish": days_since_publish,
"engagement_rate": round(engagement_rate, 2),
"recycle_count": post.recycle_count,
"score": round(score, 2),
"metrics": post.metrics
})
# Sort by score
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[:limit]
finally:
db.close()
async def recycle_post(
self,
post_id: int,
modifications: Dict = None,
scheduled_for: datetime = None,
platforms: List[str] = None,
reason: str = "manual"
) -> Dict:
"""
Create a recycled version of a post.
Args:
post_id: Original post ID
modifications: Dict of modifications {content, hashtags, image_url}
scheduled_for: When to publish (defaults to now + 1 hour)
platforms: Override platforms (defaults to original)
reason: Reason for recycling (high_performer, evergreen, seasonal, manual)
Returns:
Dict with new post info
"""
db = self._get_db()
try:
original = db.query(Post).filter(Post.id == post_id).first()
if not original:
return {"success": False, "error": "Original post not found"}
if not original.is_recyclable:
return {"success": False, "error": "Post is marked as not recyclable"}
if original.recycle_count >= self.MAX_RECYCLE_COUNT:
return {"success": False, "error": f"Post has been recycled {original.recycle_count} times (max {self.MAX_RECYCLE_COUNT})"}
# Create new post
new_content = modifications.get("content") if modifications else None
new_hashtags = modifications.get("hashtags") if modifications else None
new_image = modifications.get("image_url") if modifications else None
new_post = Post(
content=new_content or original.content,
content_type=original.content_type,
platforms=platforms or original.platforms,
status="scheduled",
scheduled_at=scheduled_for or (datetime.utcnow() + timedelta(hours=1)),
hashtags=new_hashtags or original.hashtags,
image_url=new_image or original.image_url,
recycled_from_id=original.id,
is_recyclable=True
)
db.add(new_post)
db.flush()
# Track the recycling
recycle_record = RecycledPost(
original_post_id=original.id,
new_post_id=new_post.id,
recycle_number=original.recycle_count + 1,
modifications={
"content_changed": bool(new_content),
"hashtags_updated": bool(new_hashtags),
"image_changed": bool(new_image)
},
original_engagement_rate=self._calculate_engagement_rate(original.metrics),
reason=reason,
status="pending",
scheduled_for=new_post.scheduled_at
)
db.add(recycle_record)
# Update original's recycle count
original.recycle_count += 1
db.commit()
db.refresh(new_post)
return {
"success": True,
"new_post_id": new_post.id,
"recycle_record_id": recycle_record.id,
"scheduled_for": new_post.scheduled_at.isoformat(),
"platforms": new_post.platforms
}
except Exception as e:
logger.error(f"Error recycling post: {e}")
db.rollback()
return {"success": False, "error": str(e)}
finally:
db.close()
async def auto_recycle(
self,
platform: str = None,
count: int = 1,
min_engagement_rate: float = None
) -> Dict:
"""
Automatically select and recycle top-performing posts.
Args:
platform: Filter by platform
count: Number of posts to recycle
min_engagement_rate: Minimum engagement rate threshold
Returns:
Dict with recycled posts info
"""
db = self._get_db()
try:
# Find candidates
candidates = await self.find_recyclable_posts(
platform=platform,
min_engagement_rate=min_engagement_rate or self.MIN_ENGAGEMENT_RATE,
limit=count * 2 # Get extra in case some fail
)
if not candidates:
return {
"success": True,
"recycled": 0,
"message": "No eligible posts found for recycling"
}
recycled = []
for candidate in candidates[:count]:
result = await self.recycle_post(
post_id=candidate["id"],
reason="high_performer"
)
if result.get("success"):
recycled.append({
"original_id": candidate["id"],
"new_post_id": result["new_post_id"],
"engagement_rate": candidate["engagement_rate"]
})
return {
"success": True,
"recycled": len(recycled),
"posts": recycled
}
finally:
db.close()
async def get_recycling_history(
self,
original_post_id: int = None,
limit: int = 50
) -> List[Dict]:
"""
Get recycling history.
Args:
original_post_id: Filter by original post
limit: Maximum records to return
Returns:
List of recycling records
"""
db = self._get_db()
try:
query = db.query(RecycledPost)
if original_post_id:
query = query.filter(RecycledPost.original_post_id == original_post_id)
records = query.order_by(RecycledPost.recycled_at.desc()).limit(limit).all()
return [r.to_dict() for r in records]
finally:
db.close()
async def mark_post_not_recyclable(self, post_id: int) -> Dict:
"""Mark a post as not eligible for recycling."""
db = self._get_db()
try:
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
return {"success": False, "error": "Post not found"}
post.is_recyclable = False
db.commit()
return {"success": True, "message": "Post marked as not recyclable"}
finally:
db.close()
# Global instance
recycling_service = RecyclingService()