feat: Add Analytics, Odoo Integration, A/B Testing, and Content features

Phase 1 - Analytics y Reportes:
- PostMetrics and AnalyticsReport models for tracking engagement
- Analytics service with dashboard stats, top posts, optimal times
- 8 API endpoints at /api/analytics/*
- Interactive dashboard with Chart.js charts
- Celery tasks for metrics fetch (15min) and weekly reports

Phase 2 - Integración Odoo:
- Lead and OdooSyncLog models for CRM integration
- Odoo fields added to Product and Service models
- XML-RPC service for bidirectional sync
- Lead management API at /api/leads/*
- Leads dashboard template
- Celery tasks for product/service sync and lead export

Phase 3 - A/B Testing y Recycling:
- ABTest, ABTestVariant, RecycledPost models
- Statistical winner analysis using chi-square test
- Content recycling with engagement-based scoring
- APIs at /api/ab-tests/* and /api/recycling/*
- Automated test evaluation and content recycling tasks

Phase 4 - Thread Series y Templates:
- ThreadSeries and ThreadPost models for multi-post threads
- AI-powered thread generation
- Enhanced ImageTemplate with HTML template support
- APIs at /api/threads/* and /api/templates/*
- Thread scheduling with reply chain support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 03:10:42 +00:00
parent 03b5f9f2e2
commit ecc2ca73ea
31 changed files with 6067 additions and 6 deletions

View File

@@ -12,6 +12,13 @@ from app.models.post import Post
from app.models.content_calendar import ContentCalendar
from app.models.image_template import ImageTemplate
from app.models.interaction import Interaction
from app.models.post_metrics import PostMetrics
from app.models.analytics_report import AnalyticsReport
from app.models.lead import Lead
from app.models.odoo_sync_log import OdooSyncLog
from app.models.ab_test import ABTest, ABTestVariant
from app.models.recycled_post import RecycledPost
from app.models.thread_series import ThreadSeries, ThreadPost
__all__ = [
"Base",
@@ -22,5 +29,14 @@ __all__ = [
"Post",
"ContentCalendar",
"ImageTemplate",
"Interaction"
"Interaction",
"PostMetrics",
"AnalyticsReport",
"Lead",
"OdooSyncLog",
"ABTest",
"ABTestVariant",
"RecycledPost",
"ThreadSeries",
"ThreadPost"
]

165
app/models/ab_test.py Normal file
View File

@@ -0,0 +1,165 @@
"""
A/B Test Models - Test different content variants to optimize engagement.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Float, Boolean, DateTime, ForeignKey, JSON, Enum
from sqlalchemy.orm import relationship
import enum
from app.core.database import Base
class ABTestStatus(enum.Enum):
"""Status options for A/B tests."""
DRAFT = "draft"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
class ABTestType(enum.Enum):
"""Types of A/B tests."""
CONTENT = "content" # Test different content/copy
TIMING = "timing" # Test different posting times
HASHTAGS = "hashtags" # Test different hashtag sets
IMAGE = "image" # Test different images
class ABTest(Base):
"""
A/B Test model for testing content variations.
"""
__tablename__ = "ab_tests"
id = Column(Integer, primary_key=True, index=True)
# Test info
name = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
test_type = Column(String(50), nullable=False, default="content")
# Platform targeting
platform = Column(String(50), nullable=False, index=True)
# x, threads, instagram, facebook
# Status
status = Column(String(20), default="draft", index=True)
# Timing
started_at = Column(DateTime, nullable=True)
ended_at = Column(DateTime, nullable=True)
duration_hours = Column(Integer, default=24) # How long to run the test
# Results
winning_variant_id = Column(Integer, ForeignKey("ab_test_variants.id"), nullable=True)
confidence_level = Column(Float, nullable=True) # Statistical confidence
# Configuration
min_sample_size = Column(Integer, default=100) # Min impressions per variant
success_metric = Column(String(50), default="engagement_rate")
# Options: engagement_rate, likes, comments, shares, clicks
# Metadata
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_by = Column(String(100), nullable=True)
# Relationships
variants = relationship("ABTestVariant", back_populates="test", foreign_keys="ABTestVariant.test_id")
def __repr__(self):
return f"<ABTest {self.id} - {self.name}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"test_type": self.test_type,
"platform": self.platform,
"status": self.status,
"started_at": self.started_at.isoformat() if self.started_at else None,
"ended_at": self.ended_at.isoformat() if self.ended_at else None,
"duration_hours": self.duration_hours,
"winning_variant_id": self.winning_variant_id,
"confidence_level": self.confidence_level,
"min_sample_size": self.min_sample_size,
"success_metric": self.success_metric,
"variants": [v.to_dict() for v in self.variants] if self.variants else [],
"created_at": self.created_at.isoformat() if self.created_at else None
}
class ABTestVariant(Base):
"""
Variant within an A/B test.
"""
__tablename__ = "ab_test_variants"
id = Column(Integer, primary_key=True, index=True)
# Parent test
test_id = Column(Integer, ForeignKey("ab_tests.id", ondelete="CASCADE"), nullable=False)
# Variant info
name = Column(String(10), nullable=False) # A, B, C, etc.
content = Column(Text, nullable=False)
hashtags = Column(JSON, nullable=True)
image_url = Column(String(500), nullable=True)
# Associated post (once published)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=True)
# Metrics (populated after publishing)
impressions = Column(Integer, default=0)
reach = Column(Integer, default=0)
likes = Column(Integer, default=0)
comments = Column(Integer, default=0)
shares = Column(Integer, default=0)
clicks = Column(Integer, default=0)
engagement_rate = Column(Float, default=0.0)
# Status
is_winner = Column(Boolean, default=False)
published_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
test = relationship("ABTest", back_populates="variants", foreign_keys=[test_id])
post = relationship("Post", backref="ab_test_variant")
def __repr__(self):
return f"<ABTestVariant {self.name} - Test {self.test_id}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"test_id": self.test_id,
"name": self.name,
"content": self.content[:100] + "..." if len(self.content) > 100 else self.content,
"full_content": self.content,
"hashtags": self.hashtags,
"image_url": self.image_url,
"post_id": self.post_id,
"impressions": self.impressions,
"reach": self.reach,
"likes": self.likes,
"comments": self.comments,
"shares": self.shares,
"clicks": self.clicks,
"engagement_rate": round(self.engagement_rate, 2),
"is_winner": self.is_winner,
"published_at": self.published_at.isoformat() if self.published_at else None
}
def calculate_engagement_rate(self):
"""Calculate engagement rate for this variant."""
if self.impressions > 0:
total_engagements = self.likes + self.comments + self.shares
self.engagement_rate = (total_engagements / self.impressions) * 100
return self.engagement_rate

View File

@@ -0,0 +1,116 @@
"""
Analytics Report Model - Aggregated analytics snapshots.
"""
from datetime import datetime, date
from sqlalchemy import Column, Integer, Float, String, DateTime, Date, JSON, Text
from app.core.database import Base
class AnalyticsReport(Base):
"""
Stores aggregated analytics reports (daily, weekly, monthly).
"""
__tablename__ = "analytics_reports"
id = Column(Integer, primary_key=True, index=True)
# Report type and period
report_type = Column(String(20), nullable=False, index=True) # daily, weekly, monthly
period_start = Column(Date, nullable=False, index=True)
period_end = Column(Date, nullable=False)
platform = Column(String(50), nullable=True) # null = all platforms
# Aggregated metrics
total_posts = Column(Integer, default=0)
total_impressions = Column(Integer, default=0)
total_reach = Column(Integer, default=0)
total_engagements = Column(Integer, default=0)
total_likes = Column(Integer, default=0)
total_comments = Column(Integer, default=0)
total_shares = Column(Integer, default=0)
# Calculated averages
avg_engagement_rate = Column(Float, default=0.0)
avg_impressions_per_post = Column(Float, default=0.0)
avg_engagements_per_post = Column(Float, default=0.0)
# Comparison with previous period
posts_change_pct = Column(Float, nullable=True)
engagement_change_pct = Column(Float, nullable=True)
impressions_change_pct = Column(Float, nullable=True)
# Top performing data (JSON)
top_posts = Column(JSON, nullable=True)
# [{"post_id": 1, "content": "...", "engagement_rate": 5.2, "platform": "x"}]
best_times = Column(JSON, nullable=True)
# [{"day": 1, "hour": 12, "avg_engagement": 4.5}]
content_performance = Column(JSON, nullable=True)
# {"tip": {"posts": 10, "avg_engagement": 3.2}, "product": {...}}
platform_breakdown = Column(JSON, nullable=True)
# {"x": {"posts": 20, "engagement": 150}, "threads": {...}}
# Report content for Telegram
summary_text = Column(Text, nullable=True)
# Metadata
generated_at = Column(DateTime, default=datetime.utcnow)
sent_to_telegram = Column(DateTime, nullable=True)
def to_dict(self):
return {
"id": self.id,
"report_type": self.report_type,
"period_start": self.period_start.isoformat() if self.period_start else None,
"period_end": self.period_end.isoformat() if self.period_end else None,
"platform": self.platform,
"total_posts": self.total_posts,
"total_impressions": self.total_impressions,
"total_reach": self.total_reach,
"total_engagements": self.total_engagements,
"avg_engagement_rate": round(self.avg_engagement_rate, 2),
"avg_impressions_per_post": round(self.avg_impressions_per_post, 1),
"posts_change_pct": round(self.posts_change_pct, 1) if self.posts_change_pct else None,
"engagement_change_pct": round(self.engagement_change_pct, 1) if self.engagement_change_pct else None,
"top_posts": self.top_posts,
"best_times": self.best_times,
"content_performance": self.content_performance,
"platform_breakdown": self.platform_breakdown,
"generated_at": self.generated_at.isoformat() if self.generated_at else None
}
def generate_telegram_summary(self) -> str:
"""Generate formatted summary for Telegram."""
lines = [
f"📊 *Reporte {self.report_type.title()}*",
f"📅 {self.period_start} - {self.period_end}",
"",
f"📝 Posts publicados: *{self.total_posts}*",
f"👁 Impresiones: *{self.total_impressions:,}*",
f"💬 Interacciones: *{self.total_engagements:,}*",
f"📈 Engagement rate: *{self.avg_engagement_rate:.2f}%*",
]
if self.engagement_change_pct is not None:
emoji = "📈" if self.engagement_change_pct > 0 else "📉"
lines.append(f"{emoji} vs anterior: *{self.engagement_change_pct:+.1f}%*")
if self.platform_breakdown:
lines.append("")
lines.append("*Por plataforma:*")
for platform, data in self.platform_breakdown.items():
lines.append(f"{platform}: {data.get('posts', 0)} posts, {data.get('engagements', 0)} interacciones")
if self.top_posts and len(self.top_posts) > 0:
lines.append("")
lines.append("*Top 3 posts:*")
for i, post in enumerate(self.top_posts[:3], 1):
content = post.get('content', '')[:50] + "..." if len(post.get('content', '')) > 50 else post.get('content', '')
lines.append(f" {i}. {content} ({post.get('engagement_rate', 0):.1f}%)")
self.summary_text = "\n".join(lines)
return self.summary_text

View File

@@ -25,7 +25,17 @@ class ImageTemplate(Base):
# Categorías: tip, producto, servicio, promocion, etc.
# Archivo de plantilla
template_file = Column(String(255), nullable=False) # Ruta al archivo HTML/template
template_file = Column(String(255), nullable=True) # Ruta al archivo HTML/template
# HTML template content (for inline templates)
html_template = Column(Text, nullable=True)
# Template type
template_type = Column(String(50), default="general")
# Types: tip_card, product_card, quote, promo, announcement
# Preview image
preview_url = Column(String(500), nullable=True)
# Variables que acepta la plantilla
variables = Column(ARRAY(String), nullable=False)
@@ -67,6 +77,9 @@ class ImageTemplate(Base):
"description": self.description,
"category": self.category,
"template_file": self.template_file,
"html_template": self.html_template[:100] + "..." if self.html_template and len(self.html_template) > 100 else self.html_template,
"template_type": self.template_type,
"preview_url": self.preview_url,
"variables": self.variables,
"design_config": self.design_config,
"output_sizes": self.output_sizes,

116
app/models/lead.py Normal file
View File

@@ -0,0 +1,116 @@
"""
Lead Model - Leads generated from social media interactions.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, JSON, Enum
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import relationship
import enum
from app.core.database import Base
class LeadStatus(enum.Enum):
"""Lead status options."""
NEW = "new"
CONTACTED = "contacted"
QUALIFIED = "qualified"
PROPOSAL = "proposal"
WON = "won"
LOST = "lost"
class LeadPriority(enum.Enum):
"""Lead priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class Lead(Base):
"""
Lead model for tracking potential customers from social media.
"""
__tablename__ = "leads"
id = Column(Integer, primary_key=True, index=True)
# Source information
interaction_id = Column(Integer, ForeignKey("interactions.id"), nullable=True)
platform = Column(String(50), nullable=False, index=True)
# Contact information
name = Column(String(255), nullable=True)
email = Column(String(255), nullable=True, index=True)
phone = Column(String(50), nullable=True)
company = Column(String(255), nullable=True)
# Social media info
username = Column(String(100), nullable=True)
profile_url = Column(String(500), nullable=True)
# Interest and context
interest = Column(Text, nullable=True) # What they're interested in
source_content = Column(Text, nullable=True) # Original interaction content
notes = Column(Text, nullable=True)
# Products/services interest
products_interested = Column(ARRAY(Integer), nullable=True) # Product IDs
services_interested = Column(ARRAY(Integer), nullable=True) # Service IDs
# Status tracking
status = Column(String(20), default="new", index=True)
priority = Column(String(20), default="medium", index=True)
# Assignment
assigned_to = Column(String(100), nullable=True)
# Odoo integration
odoo_lead_id = Column(Integer, nullable=True, unique=True, index=True)
synced_to_odoo = Column(Boolean, default=False)
odoo_synced_at = Column(DateTime, nullable=True)
# Metadata
tags = Column(ARRAY(String), nullable=True)
custom_fields = Column(JSON, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
last_contacted_at = Column(DateTime, nullable=True)
# Relationship
interaction = relationship("Interaction", backref="leads")
def __repr__(self):
return f"<Lead {self.id} - {self.name or self.username}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"interaction_id": self.interaction_id,
"platform": self.platform,
"name": self.name,
"email": self.email,
"phone": self.phone,
"company": self.company,
"username": self.username,
"profile_url": self.profile_url,
"interest": self.interest,
"source_content": self.source_content,
"notes": self.notes,
"products_interested": self.products_interested,
"services_interested": self.services_interested,
"status": self.status,
"priority": self.priority,
"assigned_to": self.assigned_to,
"odoo_lead_id": self.odoo_lead_id,
"synced_to_odoo": self.synced_to_odoo,
"tags": self.tags,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"last_contacted_at": self.last_contacted_at.isoformat() if self.last_contacted_at else None
}

View File

@@ -0,0 +1,79 @@
"""
Odoo Sync Log Model - Track synchronization history with Odoo.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, JSON
from app.core.database import Base
class OdooSyncLog(Base):
"""
Log of synchronization operations with Odoo ERP.
"""
__tablename__ = "odoo_sync_logs"
id = Column(Integer, primary_key=True, index=True)
# Sync operation details
sync_type = Column(String(50), nullable=False, index=True)
# Types: products, services, leads, sales
direction = Column(String(20), nullable=False)
# Direction: import (from Odoo), export (to Odoo)
status = Column(String(20), nullable=False, index=True)
# Status: started, completed, failed, partial
# Statistics
records_processed = Column(Integer, default=0)
records_created = Column(Integer, default=0)
records_updated = Column(Integer, default=0)
records_failed = Column(Integer, default=0)
# Error details
error_message = Column(Text, nullable=True)
error_details = Column(JSON, nullable=True)
# Contains list of failed records with error details
# Sync details
sync_filter = Column(JSON, nullable=True)
# Filters applied during sync (e.g., date range, categories)
# Timestamps
started_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)
def __repr__(self):
return f"<OdooSyncLog {self.id} - {self.sync_type} {self.status}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"sync_type": self.sync_type,
"direction": self.direction,
"status": self.status,
"records_processed": self.records_processed,
"records_created": self.records_created,
"records_updated": self.records_updated,
"records_failed": self.records_failed,
"error_message": self.error_message,
"error_details": self.error_details,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"duration_seconds": (self.completed_at - self.started_at).total_seconds() if self.completed_at and self.started_at else None
}
def mark_completed(self):
"""Mark sync as completed."""
self.status = "completed"
self.completed_at = datetime.utcnow()
def mark_failed(self, error_message: str, details: dict = None):
"""Mark sync as failed with error details."""
self.status = "failed"
self.error_message = error_message
self.error_details = details
self.completed_at = datetime.utcnow()

View File

@@ -5,6 +5,7 @@ Modelo de Post - Posts generados y programados.
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, JSON, Enum
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import relationship
import enum
from app.core.database import Base
@@ -93,10 +94,21 @@ class Post(Base):
metrics = Column(JSON, nullable=True)
# Ejemplo: {"likes": 10, "retweets": 5, "comments": 3}
# A/B Testing
ab_test_id = Column(Integer, ForeignKey("ab_tests.id"), nullable=True, index=True)
# Recycling
is_recyclable = Column(Boolean, default=True)
recycled_from_id = Column(Integer, ForeignKey("posts.id"), nullable=True)
recycle_count = Column(Integer, default=0) # Times this post has been recycled
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
metrics_history = relationship("PostMetrics", back_populates="post", cascade="all, delete-orphan")
def __repr__(self):
return f"<Post {self.id} - {self.status}>"
@@ -120,7 +132,11 @@ class Post(Base):
"approval_required": self.approval_required,
"hashtags": self.hashtags,
"metrics": self.metrics,
"created_at": self.created_at.isoformat() if self.created_at else None
"created_at": self.created_at.isoformat() if self.created_at else None,
"ab_test_id": self.ab_test_id,
"is_recyclable": self.is_recyclable,
"recycled_from_id": self.recycled_from_id,
"recycle_count": self.recycle_count
}
def get_content_for_platform(self, platform: str) -> str:

View File

@@ -0,0 +1,79 @@
"""
Post Metrics Model - Tracking engagement metrics per post.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, Float, String, DateTime, ForeignKey, Index
from sqlalchemy.orm import relationship
from app.core.database import Base
class PostMetrics(Base):
"""
Stores engagement metrics for posts.
Multiple records per post to track metrics over time.
"""
__tablename__ = "post_metrics"
id = Column(Integer, primary_key=True, index=True)
post_id = Column(Integer, ForeignKey("posts.id", ondelete="CASCADE"), nullable=False)
platform = Column(String(50), nullable=False)
# Core engagement metrics
likes = Column(Integer, default=0)
comments = Column(Integer, default=0)
shares = Column(Integer, default=0) # retweets, reposts
impressions = Column(Integer, default=0)
reach = Column(Integer, default=0)
saves = Column(Integer, default=0) # bookmarks
clicks = Column(Integer, default=0) # link clicks
replies = Column(Integer, default=0)
quotes = Column(Integer, default=0) # quote tweets
# Calculated metrics
engagement_rate = Column(Float, default=0.0)
engagement_total = Column(Integer, default=0)
# Timestamps
recorded_at = Column(DateTime, default=datetime.utcnow, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
post = relationship("Post", back_populates="metrics_history")
# Indexes for efficient queries
__table_args__ = (
Index("ix_post_metrics_post_platform", "post_id", "platform"),
Index("ix_post_metrics_recorded", "recorded_at"),
)
def calculate_engagement_rate(self):
"""Calculate engagement rate based on impressions."""
self.engagement_total = (
self.likes + self.comments + self.shares +
self.saves + self.replies + self.quotes
)
if self.impressions > 0:
self.engagement_rate = (self.engagement_total / self.impressions) * 100
else:
self.engagement_rate = 0.0
def to_dict(self):
return {
"id": self.id,
"post_id": self.post_id,
"platform": self.platform,
"likes": self.likes,
"comments": self.comments,
"shares": self.shares,
"impressions": self.impressions,
"reach": self.reach,
"saves": self.saves,
"clicks": self.clicks,
"replies": self.replies,
"quotes": self.quotes,
"engagement_rate": round(self.engagement_rate, 2),
"engagement_total": self.engagement_total,
"recorded_at": self.recorded_at.isoformat() if self.recorded_at else None
}

View File

@@ -49,6 +49,10 @@ class Product(Base):
is_featured = Column(Boolean, default=False) # Producto destacado
last_posted_at = Column(DateTime, nullable=True) # Última vez que se publicó
# Odoo integration
odoo_product_id = Column(Integer, nullable=True, unique=True, index=True)
odoo_last_synced = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@@ -76,5 +80,7 @@ class Product(Base):
"tags": self.tags,
"highlights": self.highlights,
"is_featured": self.is_featured,
"created_at": self.created_at.isoformat() if self.created_at else None
"created_at": self.created_at.isoformat() if self.created_at else None,
"odoo_product_id": self.odoo_product_id,
"odoo_last_synced": self.odoo_last_synced.isoformat() if self.odoo_last_synced else None
}

View File

@@ -0,0 +1,74 @@
"""
Recycled Post Model - Track content recycling for evergreen posts.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean
from sqlalchemy.orm import relationship
from app.core.database import Base
class RecycledPost(Base):
"""
Tracks when posts are recycled/republished.
Evergreen content that performed well can be republished with modifications.
"""
__tablename__ = "recycled_posts"
id = Column(Integer, primary_key=True, index=True)
# Original post reference
original_post_id = Column(Integer, ForeignKey("posts.id"), nullable=False, index=True)
# New post created from recycling
new_post_id = Column(Integer, ForeignKey("posts.id"), nullable=True, index=True)
# Recycle count (how many times this original has been recycled)
recycle_number = Column(Integer, default=1)
# Modifications made
modifications = Column(JSON, nullable=True)
# Example: {"content_changed": true, "hashtags_updated": true, "image_changed": false}
modification_notes = Column(Text, nullable=True)
# Performance comparison
original_engagement_rate = Column(Integer, nullable=True)
new_engagement_rate = Column(Integer, nullable=True)
# Status
status = Column(String(20), default="pending")
# pending, published, cancelled
# Reason for recycling
reason = Column(String(100), nullable=True)
# high_performer, evergreen, seasonal, manual
# Timestamps
recycled_at = Column(DateTime, default=datetime.utcnow)
scheduled_for = Column(DateTime, nullable=True)
# Relationships
original_post = relationship("Post", foreign_keys=[original_post_id], backref="recycled_versions")
new_post = relationship("Post", foreign_keys=[new_post_id])
def __repr__(self):
return f"<RecycledPost {self.id} - Original: {self.original_post_id}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"original_post_id": self.original_post_id,
"new_post_id": self.new_post_id,
"recycle_number": self.recycle_number,
"modifications": self.modifications,
"modification_notes": self.modification_notes,
"original_engagement_rate": self.original_engagement_rate,
"new_engagement_rate": self.new_engagement_rate,
"status": self.status,
"reason": self.reason,
"recycled_at": self.recycled_at.isoformat() if self.recycled_at else None,
"scheduled_for": self.scheduled_for.isoformat() if self.scheduled_for else None
}

View File

@@ -58,6 +58,10 @@ class Service(Base):
is_featured = Column(Boolean, default=False)
last_posted_at = Column(DateTime, nullable=True)
# Odoo integration
odoo_service_id = Column(Integer, nullable=True, unique=True, index=True)
odoo_last_synced = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@@ -86,5 +90,7 @@ class Service(Base):
"call_to_action": self.call_to_action,
"is_active": self.is_active,
"is_featured": self.is_featured,
"created_at": self.created_at.isoformat() if self.created_at else None
"created_at": self.created_at.isoformat() if self.created_at else None,
"odoo_service_id": self.odoo_service_id,
"odoo_last_synced": self.odoo_last_synced.isoformat() if self.odoo_last_synced else None
}

160
app/models/thread_series.py Normal file
View File

@@ -0,0 +1,160 @@
"""
Thread Series Models - Multi-post thread content scheduling.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean
from sqlalchemy.orm import relationship
import enum
from app.core.database import Base
class ThreadSeriesStatus(enum.Enum):
"""Status options for thread series."""
DRAFT = "draft"
SCHEDULED = "scheduled"
PUBLISHING = "publishing"
COMPLETED = "completed"
PAUSED = "paused"
CANCELLED = "cancelled"
class ThreadSeries(Base):
"""
A series of related posts published as a thread.
E.g., Educational threads, story threads, tip series.
"""
__tablename__ = "thread_series"
id = Column(Integer, primary_key=True, index=True)
# Series info
name = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
topic = Column(String(100), nullable=True)
# Platform (threads work best on X and Threads)
platform = Column(String(50), nullable=False, index=True)
# Schedule configuration
schedule_type = Column(String(20), default="sequential")
# sequential: posts one after another
# timed: posts at specific intervals
interval_minutes = Column(Integer, default=5) # Time between posts
start_time = Column(DateTime, nullable=True) # When to start publishing
# Thread structure
total_posts = Column(Integer, default=0)
posts_published = Column(Integer, default=0)
# Status
status = Column(String(20), default="draft", index=True)
# First post in chain (for reply chain)
first_platform_post_id = Column(String(100), nullable=True)
# AI generation settings
ai_generated = Column(Boolean, default=False)
generation_prompt = Column(Text, nullable=True)
# Metadata
hashtags = Column(JSON, nullable=True) # Common hashtags for the series
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)
# Relationships
posts = relationship("ThreadPost", back_populates="series", order_by="ThreadPost.sequence_number")
def __repr__(self):
return f"<ThreadSeries {self.id} - {self.name}>"
def to_dict(self, include_posts: bool = True):
"""Convert to dictionary."""
result = {
"id": self.id,
"name": self.name,
"description": self.description,
"topic": self.topic,
"platform": self.platform,
"schedule_type": self.schedule_type,
"interval_minutes": self.interval_minutes,
"start_time": self.start_time.isoformat() if self.start_time else None,
"total_posts": self.total_posts,
"posts_published": self.posts_published,
"status": self.status,
"ai_generated": self.ai_generated,
"hashtags": self.hashtags,
"created_at": self.created_at.isoformat() if self.created_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None
}
if include_posts and self.posts:
result["posts"] = [p.to_dict() for p in self.posts]
return result
class ThreadPost(Base):
"""
Individual post within a thread series.
"""
__tablename__ = "thread_posts"
id = Column(Integer, primary_key=True, index=True)
# Parent series
series_id = Column(Integer, ForeignKey("thread_series.id", ondelete="CASCADE"), nullable=False)
# Position in thread
sequence_number = Column(Integer, nullable=False) # 1, 2, 3, ...
# Content
content = Column(Text, nullable=False)
image_url = Column(String(500), nullable=True)
# Associated post (once created)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=True)
# Platform post ID (for reply chain)
platform_post_id = Column(String(100), nullable=True)
reply_to_platform_id = Column(String(100), nullable=True) # ID of post to reply to
# Schedule
scheduled_at = Column(DateTime, nullable=True)
# Status
status = Column(String(20), default="pending")
# pending, scheduled, published, failed
error_message = Column(Text, nullable=True)
published_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
series = relationship("ThreadSeries", back_populates="posts")
post = relationship("Post", backref="thread_post")
def __repr__(self):
return f"<ThreadPost {self.id} - Series {self.series_id} #{self.sequence_number}>"
def to_dict(self):
"""Convert to dictionary."""
return {
"id": self.id,
"series_id": self.series_id,
"sequence_number": self.sequence_number,
"content": self.content[:100] + "..." if len(self.content) > 100 else self.content,
"full_content": self.content,
"image_url": self.image_url,
"post_id": self.post_id,
"platform_post_id": self.platform_post_id,
"scheduled_at": self.scheduled_at.isoformat() if self.scheduled_at else None,
"status": self.status,
"error_message": self.error_message,
"published_at": self.published_at.isoformat() if self.published_at else None
}