- Add Celery worker with 5 scheduled tasks (Beat)
- Create ContentScheduler for optimal posting times
- Add calendar endpoints for scheduled posts management
- Implement Telegram notification service
- Add notification API with setup guide
Celery Beat Schedule:
- check_scheduled_posts: Every minute
- generate_daily_content: Daily at 6 AM
- sync_interactions: Every 15 minutes
- send_daily_summary: Daily at 9 PM
- cleanup_old_data: Weekly on Sundays
New endpoints:
- GET /api/calendar/posts/scheduled - List scheduled posts
- GET /api/calendar/posts/view - Calendar view
- GET /api/calendar/posts/slots - Available time slots
- POST /api/calendar/posts/{id}/schedule - Schedule post
- POST /api/calendar/posts/{id}/publish-now - Publish immediately
- GET /api/notifications/status - Check notification config
- POST /api/notifications/test - Send test notification
- GET /api/notifications/setup-guide - Configuration guide
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
338 lines
10 KiB
Python
338 lines
10 KiB
Python
"""
|
|
Content scheduler service.
|
|
Manages scheduling of posts according to optimal times and calendar.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional, Dict
|
|
from dataclasses import dataclass
|
|
|
|
from app.core.database import SessionLocal
|
|
from app.data.content_templates import OPTIMAL_POSTING_TIMES
|
|
|
|
|
|
@dataclass
|
|
class ScheduleSlot:
|
|
"""A time slot for scheduling."""
|
|
datetime: datetime
|
|
platform: str
|
|
available: bool = True
|
|
|
|
|
|
class ContentScheduler:
|
|
"""
|
|
Service for scheduling content according to optimal times.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.posting_times = OPTIMAL_POSTING_TIMES
|
|
|
|
def get_next_available_slot(
|
|
self,
|
|
platform: str,
|
|
after: Optional[datetime] = None,
|
|
min_gap_hours: int = 2
|
|
) -> datetime:
|
|
"""
|
|
Get the next available time slot for a platform.
|
|
|
|
Args:
|
|
platform: Target platform
|
|
after: Start searching after this time (default: now)
|
|
min_gap_hours: Minimum hours between posts
|
|
|
|
Returns:
|
|
Next available datetime
|
|
"""
|
|
if after is None:
|
|
after = datetime.utcnow()
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
# Get platform's optimal times
|
|
times = self.posting_times.get(platform, self.posting_times["x"])
|
|
|
|
# Start from tomorrow if after work hours
|
|
current = after
|
|
if current.hour >= 21:
|
|
current = current.replace(
|
|
hour=0, minute=0, second=0, microsecond=0
|
|
) + timedelta(days=1)
|
|
|
|
# Search next 7 days
|
|
for day_offset in range(7):
|
|
check_date = current.date() + timedelta(days=day_offset)
|
|
is_weekend = check_date.weekday() >= 5
|
|
|
|
day_times = times["weekend"] if is_weekend else times["weekday"]
|
|
|
|
for time_str in day_times:
|
|
hour, minute = map(int, time_str.split(":"))
|
|
slot_time = datetime.combine(check_date, datetime.min.time())
|
|
slot_time = slot_time.replace(hour=hour, minute=minute)
|
|
|
|
# Skip past times
|
|
if slot_time <= after:
|
|
continue
|
|
|
|
# Check if slot is free (no posts within min_gap)
|
|
window_start = slot_time - timedelta(hours=min_gap_hours)
|
|
window_end = slot_time + timedelta(hours=min_gap_hours)
|
|
|
|
existing = db.query(Post).filter(
|
|
Post.platforms.contains([platform]),
|
|
Post.status.in_(["scheduled", "published"]),
|
|
Post.scheduled_at >= window_start,
|
|
Post.scheduled_at <= window_end
|
|
).first()
|
|
|
|
if not existing:
|
|
return slot_time
|
|
|
|
# Fallback: tomorrow at best time
|
|
tomorrow = after.date() + timedelta(days=1)
|
|
best_time = times.get("best", "12:00")
|
|
hour, minute = map(int, best_time.split(":"))
|
|
|
|
return datetime.combine(tomorrow, datetime.min.time()).replace(
|
|
hour=hour, minute=minute
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def get_available_slots(
|
|
self,
|
|
platform: str,
|
|
start_date: datetime,
|
|
days: int = 7
|
|
) -> List[ScheduleSlot]:
|
|
"""
|
|
Get all available slots for a platform within a date range.
|
|
|
|
Args:
|
|
platform: Target platform
|
|
start_date: Start of range
|
|
days: Number of days to check
|
|
|
|
Returns:
|
|
List of available slots
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
slots = []
|
|
times = self.posting_times.get(platform, self.posting_times["x"])
|
|
|
|
for day_offset in range(days):
|
|
check_date = start_date.date() + timedelta(days=day_offset)
|
|
is_weekend = check_date.weekday() >= 5
|
|
|
|
day_times = times["weekend"] if is_weekend else times["weekday"]
|
|
|
|
for time_str in day_times:
|
|
hour, minute = map(int, time_str.split(":"))
|
|
slot_time = datetime.combine(check_date, datetime.min.time())
|
|
slot_time = slot_time.replace(hour=hour, minute=minute)
|
|
|
|
# Check availability
|
|
existing = db.query(Post).filter(
|
|
Post.platforms.contains([platform]),
|
|
Post.status.in_(["scheduled", "published"]),
|
|
Post.scheduled_at == slot_time
|
|
).first()
|
|
|
|
slots.append(ScheduleSlot(
|
|
datetime=slot_time,
|
|
platform=platform,
|
|
available=existing is None
|
|
))
|
|
|
|
return slots
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def schedule_post(
|
|
self,
|
|
post_id: int,
|
|
scheduled_at: Optional[datetime] = None,
|
|
platform: Optional[str] = None
|
|
) -> datetime:
|
|
"""
|
|
Schedule a post for publishing.
|
|
|
|
Args:
|
|
post_id: Post to schedule
|
|
scheduled_at: Specific time (or auto-select if None)
|
|
platform: Platform for auto-scheduling
|
|
|
|
Returns:
|
|
Scheduled datetime
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post:
|
|
raise ValueError(f"Post {post_id} not found")
|
|
|
|
# Auto-select time if not provided
|
|
if scheduled_at is None:
|
|
target_platform = platform or (post.platforms[0] if post.platforms else "x")
|
|
scheduled_at = self.get_next_available_slot(target_platform)
|
|
|
|
post.scheduled_at = scheduled_at
|
|
post.status = "scheduled"
|
|
|
|
db.commit()
|
|
|
|
return scheduled_at
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def reschedule_post(
|
|
self,
|
|
post_id: int,
|
|
new_time: datetime
|
|
) -> bool:
|
|
"""Reschedule a post to a new time."""
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post:
|
|
return False
|
|
|
|
if post.status == "published":
|
|
return False # Can't reschedule published posts
|
|
|
|
post.scheduled_at = new_time
|
|
post.status = "scheduled"
|
|
|
|
db.commit()
|
|
return True
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def cancel_scheduled(self, post_id: int) -> bool:
|
|
"""Cancel a scheduled post."""
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
post = db.query(Post).filter(Post.id == post_id).first()
|
|
if not post or post.status != "scheduled":
|
|
return False
|
|
|
|
post.status = "draft"
|
|
post.scheduled_at = None
|
|
|
|
db.commit()
|
|
return True
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def get_calendar(
|
|
self,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
platforms: Optional[List[str]] = None
|
|
) -> Dict:
|
|
"""
|
|
Get calendar view of scheduled posts.
|
|
|
|
Returns:
|
|
Dict with posts grouped by date
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
from app.models.post import Post
|
|
|
|
query = db.query(Post).filter(
|
|
Post.scheduled_at >= start_date,
|
|
Post.scheduled_at <= end_date,
|
|
Post.status.in_(["scheduled", "pending_approval", "published"])
|
|
)
|
|
|
|
if platforms:
|
|
# Filter by platforms (any match)
|
|
query = query.filter(
|
|
Post.platforms.overlap(platforms)
|
|
)
|
|
|
|
posts = query.order_by(Post.scheduled_at).all()
|
|
|
|
# Group by date
|
|
calendar = {}
|
|
for post in posts:
|
|
date_key = post.scheduled_at.strftime("%Y-%m-%d")
|
|
|
|
if date_key not in calendar:
|
|
calendar[date_key] = []
|
|
|
|
calendar[date_key].append({
|
|
"id": post.id,
|
|
"content": post.content[:100] + "..." if len(post.content) > 100 else post.content,
|
|
"platforms": post.platforms,
|
|
"status": post.status,
|
|
"scheduled_at": post.scheduled_at.isoformat(),
|
|
"content_type": post.content_type
|
|
})
|
|
|
|
return calendar
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def auto_fill_calendar(
|
|
self,
|
|
start_date: datetime,
|
|
days: int,
|
|
platforms: List[str],
|
|
posts_per_day: Dict[str, int] = None
|
|
) -> List[ScheduleSlot]:
|
|
"""
|
|
Get suggested slots to fill the calendar.
|
|
|
|
Args:
|
|
start_date: Start filling from
|
|
days: Number of days
|
|
platforms: Platforms to schedule for
|
|
posts_per_day: Posts per day per platform
|
|
|
|
Returns:
|
|
List of empty slots that need content
|
|
"""
|
|
if posts_per_day is None:
|
|
posts_per_day = {"x": 4, "threads": 3, "instagram": 2, "facebook": 1}
|
|
|
|
empty_slots = []
|
|
|
|
for platform in platforms:
|
|
target_count = posts_per_day.get(platform, 2)
|
|
slots = self.get_available_slots(platform, start_date, days)
|
|
|
|
available = [s for s in slots if s.available]
|
|
|
|
# Distribute evenly
|
|
slots_needed = target_count * days
|
|
step = max(1, len(available) // slots_needed) if available else 1
|
|
|
|
for i in range(0, min(len(available), slots_needed), step):
|
|
empty_slots.append(available[i])
|
|
|
|
return sorted(empty_slots, key=lambda s: s.datetime)
|
|
|
|
|
|
# Global instance
|
|
content_scheduler = ContentScheduler()
|