From edc0e5577b876dd3de52a756afb4a51ce79dfc4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Consultor=C3=ADa=20AS?= Date: Wed, 28 Jan 2026 01:56:10 +0000 Subject: [PATCH] feat(phase-4): Complete scheduling and automation system - Add Celery worker with 5 scheduled tasks (Beat) - Create ContentScheduler for optimal posting times - Add calendar endpoints for scheduled posts management - Implement Telegram notification service - Add notification API with setup guide Celery Beat Schedule: - check_scheduled_posts: Every minute - generate_daily_content: Daily at 6 AM - sync_interactions: Every 15 minutes - send_daily_summary: Daily at 9 PM - cleanup_old_data: Weekly on Sundays New endpoints: - GET /api/calendar/posts/scheduled - List scheduled posts - GET /api/calendar/posts/view - Calendar view - GET /api/calendar/posts/slots - Available time slots - POST /api/calendar/posts/{id}/schedule - Schedule post - POST /api/calendar/posts/{id}/publish-now - Publish immediately - GET /api/notifications/status - Check notification config - POST /api/notifications/test - Send test notification - GET /api/notifications/setup-guide - Configuration guide Co-Authored-By: Claude Opus 4.5 --- app/api/routes/calendar.py | 226 +++++++++++- app/api/routes/notifications.py | 143 ++++++++ app/main.py | 3 +- app/services/notifications.py | 147 ++++++++ app/services/scheduler.py | 337 +++++++++++++++++ app/worker/__init__.py | 7 + app/worker/celery_app.py | 76 ++++ app/worker/tasks.py | 621 ++++++++++++++++++++++++++++++++ 8 files changed, 1558 insertions(+), 2 deletions(-) create mode 100644 app/api/routes/notifications.py create mode 100644 app/services/notifications.py create mode 100644 app/services/scheduler.py create mode 100644 app/worker/__init__.py create mode 100644 app/worker/celery_app.py create mode 100644 app/worker/tasks.py diff --git a/app/api/routes/calendar.py b/app/api/routes/calendar.py index b4ae038..7f8d8f2 100644 --- a/app/api/routes/calendar.py +++ b/app/api/routes/calendar.py @@ -2,7 +2,7 @@ API Routes para gestión del Calendario de Contenido. """ -from datetime import datetime, time +from datetime import datetime, time, timedelta from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session @@ -199,3 +199,227 @@ async def toggle_calendar_entry(entry_id: int, db: Session = Depends(get_db)): "message": f"Entrada {'activada' if entry.is_active else 'desactivada'}", "is_active": entry.is_active } + + +# =========================================== +# SCHEDULED POSTS ENDPOINTS +# =========================================== + +@router.get("/posts/scheduled") +async def get_scheduled_posts( + start_date: Optional[str] = Query(None, description="YYYY-MM-DD"), + end_date: Optional[str] = Query(None, description="YYYY-MM-DD"), + platform: Optional[str] = Query(None), + db: Session = Depends(get_db) +): + """ + Obtener posts programados en un rango de fechas. + + Por defecto muestra los próximos 7 días. + """ + from app.models.post import Post + + # Parse dates + if start_date: + start = datetime.strptime(start_date, "%Y-%m-%d") + else: + start = datetime.utcnow() + + if end_date: + end = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59) + else: + end = start + timedelta(days=7) + + query = db.query(Post).filter( + Post.scheduled_at >= start, + Post.scheduled_at <= end, + Post.status.in_(["scheduled", "pending_approval"]) + ) + + if platform: + query = query.filter(Post.platforms.contains([platform])) + + posts = query.order_by(Post.scheduled_at).all() + + return [p.to_dict() for p in posts] + + +@router.get("/posts/view") +async def get_calendar_view( + start_date: Optional[str] = Query(None, description="YYYY-MM-DD"), + days: int = Query(7, ge=1, le=30), + platforms: Optional[str] = Query(None, description="Comma-separated"), + db: Session = Depends(get_db) +): + """ + Obtener vista de calendario con posts agrupados por fecha. + """ + from app.services.scheduler import content_scheduler + + if start_date: + start = datetime.strptime(start_date, "%Y-%m-%d") + else: + start = datetime.utcnow() + + end = start + timedelta(days=days) + + platform_list = None + if platforms: + platform_list = [p.strip() for p in platforms.split(",")] + + calendar = content_scheduler.get_calendar(start, end, platform_list) + + return { + "start_date": start.strftime("%Y-%m-%d"), + "end_date": end.strftime("%Y-%m-%d"), + "days": days, + "calendar": calendar + } + + +@router.get("/posts/slots") +async def get_available_slots( + platform: str, + start_date: Optional[str] = Query(None), + days: int = Query(7, ge=1, le=14), + db: Session = Depends(get_db) +): + """ + Obtener slots disponibles para programar en una plataforma. + """ + from app.services.scheduler import content_scheduler + + if start_date: + start = datetime.strptime(start_date, "%Y-%m-%d") + else: + start = datetime.utcnow() + + slots = content_scheduler.get_available_slots(platform, start, days) + + return { + "platform": platform, + "slots": [ + { + "datetime": s.datetime.isoformat(), + "available": s.available + } + for s in slots + ] + } + + +@router.post("/posts/{post_id}/schedule") +async def schedule_post( + post_id: int, + scheduled_at: Optional[str] = Query(None, description="ISO format datetime"), + auto: bool = Query(False, description="Auto-select optimal time"), + db: Session = Depends(get_db) +): + """ + Programar un post para publicación. + + - Con `scheduled_at`: programa para esa fecha/hora específica + - Con `auto=true`: selecciona automáticamente el mejor horario + """ + from app.models.post import Post + from app.services.scheduler import content_scheduler + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise HTTPException(status_code=404, detail="Post no encontrado") + + if post.status == "published": + raise HTTPException(status_code=400, detail="Post ya publicado") + + if scheduled_at: + schedule_time = datetime.fromisoformat(scheduled_at.replace("Z", "+00:00")) + elif auto: + platform = post.platforms[0] if post.platforms else "x" + schedule_time = content_scheduler.get_next_available_slot(platform) + else: + raise HTTPException( + status_code=400, + detail="Proporciona scheduled_at o usa auto=true" + ) + + post.scheduled_at = schedule_time + post.status = "scheduled" + db.commit() + + return { + "message": "Post programado", + "post_id": post_id, + "scheduled_at": schedule_time.isoformat() + } + + +@router.post("/posts/{post_id}/reschedule") +async def reschedule_post( + post_id: int, + scheduled_at: str = Query(..., description="Nueva fecha/hora ISO"), + db: Session = Depends(get_db) +): + """Reprogramar un post a una nueva fecha/hora.""" + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise HTTPException(status_code=404, detail="Post no encontrado") + + if post.status == "published": + raise HTTPException(status_code=400, detail="No se puede reprogramar un post publicado") + + new_time = datetime.fromisoformat(scheduled_at.replace("Z", "+00:00")) + post.scheduled_at = new_time + post.status = "scheduled" + db.commit() + + return { + "message": "Post reprogramado", + "post_id": post_id, + "scheduled_at": new_time.isoformat() + } + + +@router.post("/posts/{post_id}/cancel") +async def cancel_scheduled_post(post_id: int, db: Session = Depends(get_db)): + """Cancelar un post programado (vuelve a draft).""" + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise HTTPException(status_code=404, detail="Post no encontrado") + + if post.status not in ["scheduled", "pending_approval"]: + raise HTTPException( + status_code=400, + detail=f"No se puede cancelar un post con status '{post.status}'" + ) + + post.status = "draft" + post.scheduled_at = None + db.commit() + + return {"message": "Post cancelado", "post_id": post_id} + + +@router.post("/posts/{post_id}/publish-now") +async def publish_post_now(post_id: int, db: Session = Depends(get_db)): + """Publicar un post inmediatamente (sin esperar al horario programado).""" + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise HTTPException(status_code=404, detail="Post no encontrado") + + if post.status == "published": + raise HTTPException(status_code=400, detail="Post ya publicado") + + # Queue for immediate publishing + from app.worker.tasks import publish_post + publish_post.delay(post_id) + + return { + "message": "Post enviado a publicación", + "post_id": post_id + } diff --git a/app/api/routes/notifications.py b/app/api/routes/notifications.py new file mode 100644 index 0000000..b63d0c4 --- /dev/null +++ b/app/api/routes/notifications.py @@ -0,0 +1,143 @@ +""" +API Routes for notification management. +""" + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from typing import Optional + +from app.core.config import settings +from app.services.notifications import telegram_notify, notification_service + + +router = APIRouter() + + +class TestNotificationRequest(BaseModel): + """Request for testing notifications.""" + message: Optional[str] = "Test desde Social Media Automation" + + +class NotificationSettingsResponse(BaseModel): + """Response with notification settings.""" + telegram_configured: bool + bot_token_set: bool + chat_id_set: bool + + +@router.get("/status") +async def get_notification_status(): + """ + Verificar estado de las notificaciones. + + Muestra si Telegram está configurado correctamente. + """ + return NotificationSettingsResponse( + telegram_configured=notification_service.telegram_enabled, + bot_token_set=bool(settings.TELEGRAM_BOT_TOKEN), + chat_id_set=bool(settings.TELEGRAM_CHAT_ID) + ) + + +@router.post("/test") +async def test_notification(request: TestNotificationRequest): + """ + Enviar notificación de prueba. + + Útil para verificar que la configuración de Telegram funciona. + """ + if not notification_service.telegram_enabled: + raise HTTPException( + status_code=503, + detail="Telegram no configurado. Agrega TELEGRAM_BOT_TOKEN y TELEGRAM_CHAT_ID en .env" + ) + + message = f"🧪 *Test de Notificación*\n\n{request.message}\n\n✅ Si ves esto, las notificaciones funcionan correctamente." + + success = await telegram_notify(message) + + if success: + return {"success": True, "message": "Notificación enviada"} + else: + raise HTTPException( + status_code=500, + detail="Error al enviar notificación. Verifica las credenciales." + ) + + +@router.post("/send") +async def send_custom_notification( + message: str, + parse_mode: str = "Markdown" +): + """ + Enviar notificación personalizada. + + - **message**: Texto del mensaje (soporta Markdown) + - **parse_mode**: "Markdown" o "HTML" + """ + if not notification_service.telegram_enabled: + raise HTTPException( + status_code=503, + detail="Telegram no configurado" + ) + + success = await telegram_notify(message, parse_mode) + + return {"success": success} + + +@router.get("/setup-guide") +async def get_setup_guide(): + """ + Obtener guía de configuración de Telegram. + """ + return { + "title": "Configuración de Notificaciones Telegram", + "steps": [ + { + "step": 1, + "title": "Crear Bot de Telegram", + "instructions": [ + "Abre Telegram y busca @BotFather", + "Envía el comando /newbot", + "Sigue las instrucciones para nombrar tu bot", + "Guarda el token que te proporciona" + ] + }, + { + "step": 2, + "title": "Obtener Chat ID", + "instructions": [ + "Inicia una conversación con tu nuevo bot", + "Envía cualquier mensaje", + "Visita: https://api.telegram.org/bot/getUpdates", + "Busca el campo 'chat': {'id': XXXXXXX}", + "Ese número es tu CHAT_ID" + ] + }, + { + "step": 3, + "title": "Configurar Variables", + "instructions": [ + "Edita tu archivo .env", + "Agrega: TELEGRAM_BOT_TOKEN=tu_token_aqui", + "Agrega: TELEGRAM_CHAT_ID=tu_chat_id_aqui", + "Reinicia la aplicación" + ] + }, + { + "step": 4, + "title": "Verificar", + "instructions": [ + "Usa el endpoint POST /api/notifications/test", + "Deberías recibir un mensaje en Telegram" + ] + } + ], + "current_status": { + "configured": notification_service.telegram_enabled, + "bot_token": "✓ Configurado" if settings.TELEGRAM_BOT_TOKEN else "✗ Falta", + "chat_id": "✓ Configurado" if settings.TELEGRAM_CHAT_ID else "✗ Falta" + } + } diff --git a/app/main.py b/app/main.py index 9cfd8db..7d64bea 100644 --- a/app/main.py +++ b/app/main.py @@ -11,7 +11,7 @@ from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware -from app.api.routes import posts, products, services, calendar, dashboard, interactions, auth, publish, generate +from app.api.routes import posts, products, services, calendar, dashboard, interactions, auth, publish, generate, notifications from app.core.config import settings from app.core.database import engine from app.models import Base @@ -64,6 +64,7 @@ app.include_router(calendar.router, prefix="/api/calendar", tags=["Calendar"]) app.include_router(interactions.router, prefix="/api/interactions", tags=["Interactions"]) app.include_router(publish.router, prefix="/api/publish", tags=["Publish"]) app.include_router(generate.router, prefix="/api/generate", tags=["AI Generation"]) +app.include_router(notifications.router, prefix="/api/notifications", tags=["Notifications"]) @app.get("/api/health") diff --git a/app/services/notifications.py b/app/services/notifications.py new file mode 100644 index 0000000..1bc2b50 --- /dev/null +++ b/app/services/notifications.py @@ -0,0 +1,147 @@ +""" +Notification service for alerts and summaries. +""" + +import httpx +from typing import Optional + +from app.core.config import settings + + +async def telegram_notify( + message: str, + parse_mode: str = "Markdown", + disable_notification: bool = False +) -> bool: + """ + Send a notification via Telegram. + + Args: + message: Message text (supports Markdown) + parse_mode: "Markdown" or "HTML" + disable_notification: If True, send silently + + Returns: + True if sent successfully + """ + if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: + return False + + url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage" + + payload = { + "chat_id": settings.TELEGRAM_CHAT_ID, + "text": message, + "parse_mode": parse_mode, + "disable_notification": disable_notification + } + + try: + async with httpx.AsyncClient() as client: + response = await client.post(url, json=payload) + return response.status_code == 200 + except Exception: + return False + + +async def telegram_notify_error( + error: str, + context: Optional[str] = None +) -> bool: + """Send error notification.""" + message = f"🚨 *Error*\n\n{error}" + if context: + message += f"\n\n📍 Contexto: {context}" + + return await telegram_notify(message) + + +async def telegram_notify_success( + title: str, + details: Optional[str] = None +) -> bool: + """Send success notification.""" + message = f"✅ *{title}*" + if details: + message += f"\n\n{details}" + + return await telegram_notify(message) + + +class NotificationService: + """Service for managing notifications.""" + + def __init__(self): + self.telegram_enabled = bool( + settings.TELEGRAM_BOT_TOKEN and settings.TELEGRAM_CHAT_ID + ) + + async def notify_publish( + self, + platform: str, + content: str, + url: Optional[str] = None, + success: bool = True + ): + """Notify about a publication.""" + if not self.telegram_enabled: + return + + emoji = "✅" if success else "❌" + status = "Publicado" if success else "Error al publicar" + + message = f"{emoji} *{status} en {platform}*\n\n" + message += f"📝 {content[:150]}..." + + if url: + message += f"\n\n🔗 {url}" + + await telegram_notify(message) + + async def notify_interaction( + self, + platform: str, + author: str, + content: str, + interaction_type: str = "comentario" + ): + """Notify about a new interaction.""" + if not self.telegram_enabled: + return + + message = f"💬 *Nueva interacción en {platform}*\n\n" + message += f"👤 @{author}\n" + message += f"📝 {content[:200]}" + + await telegram_notify(message) + + async def notify_daily_summary( + self, + published: int, + failed: int, + scheduled: int, + interactions: int + ): + """Send daily summary.""" + if not self.telegram_enabled: + return + + message = "📊 *Resumen del día*\n\n" + message += f"✅ Publicados: {published}\n" + if failed > 0: + message += f"❌ Fallidos: {failed}\n" + message += f"📅 Programados: {scheduled}\n" + message += f"💬 Interacciones: {interactions}" + + await telegram_notify(message) + + async def notify_error(self, error: str, context: str = None): + """Send error notification.""" + if not self.telegram_enabled: + return + + await telegram_notify_error(error, context) + + +# Global instance +notification_service = NotificationService() diff --git a/app/services/scheduler.py b/app/services/scheduler.py new file mode 100644 index 0000000..27a64b7 --- /dev/null +++ b/app/services/scheduler.py @@ -0,0 +1,337 @@ +""" +Content scheduler service. +Manages scheduling of posts according to optimal times and calendar. +""" + +from datetime import datetime, timedelta +from typing import List, Optional, Dict +from dataclasses import dataclass + +from app.core.database import SessionLocal +from app.data.content_templates import OPTIMAL_POSTING_TIMES + + +@dataclass +class ScheduleSlot: + """A time slot for scheduling.""" + datetime: datetime + platform: str + available: bool = True + + +class ContentScheduler: + """ + Service for scheduling content according to optimal times. + """ + + def __init__(self): + self.posting_times = OPTIMAL_POSTING_TIMES + + def get_next_available_slot( + self, + platform: str, + after: Optional[datetime] = None, + min_gap_hours: int = 2 + ) -> datetime: + """ + Get the next available time slot for a platform. + + Args: + platform: Target platform + after: Start searching after this time (default: now) + min_gap_hours: Minimum hours between posts + + Returns: + Next available datetime + """ + if after is None: + after = datetime.utcnow() + + db = SessionLocal() + try: + from app.models.post import Post + + # Get platform's optimal times + times = self.posting_times.get(platform, self.posting_times["x"]) + + # Start from tomorrow if after work hours + current = after + if current.hour >= 21: + current = current.replace( + hour=0, minute=0, second=0, microsecond=0 + ) + timedelta(days=1) + + # Search next 7 days + for day_offset in range(7): + check_date = current.date() + timedelta(days=day_offset) + is_weekend = check_date.weekday() >= 5 + + day_times = times["weekend"] if is_weekend else times["weekday"] + + for time_str in day_times: + hour, minute = map(int, time_str.split(":")) + slot_time = datetime.combine(check_date, datetime.min.time()) + slot_time = slot_time.replace(hour=hour, minute=minute) + + # Skip past times + if slot_time <= after: + continue + + # Check if slot is free (no posts within min_gap) + window_start = slot_time - timedelta(hours=min_gap_hours) + window_end = slot_time + timedelta(hours=min_gap_hours) + + existing = db.query(Post).filter( + Post.platforms.contains([platform]), + Post.status.in_(["scheduled", "published"]), + Post.scheduled_at >= window_start, + Post.scheduled_at <= window_end + ).first() + + if not existing: + return slot_time + + # Fallback: tomorrow at best time + tomorrow = after.date() + timedelta(days=1) + best_time = times.get("best", "12:00") + hour, minute = map(int, best_time.split(":")) + + return datetime.combine(tomorrow, datetime.min.time()).replace( + hour=hour, minute=minute + ) + + finally: + db.close() + + def get_available_slots( + self, + platform: str, + start_date: datetime, + days: int = 7 + ) -> List[ScheduleSlot]: + """ + Get all available slots for a platform within a date range. + + Args: + platform: Target platform + start_date: Start of range + days: Number of days to check + + Returns: + List of available slots + """ + db = SessionLocal() + try: + from app.models.post import Post + + slots = [] + times = self.posting_times.get(platform, self.posting_times["x"]) + + for day_offset in range(days): + check_date = start_date.date() + timedelta(days=day_offset) + is_weekend = check_date.weekday() >= 5 + + day_times = times["weekend"] if is_weekend else times["weekday"] + + for time_str in day_times: + hour, minute = map(int, time_str.split(":")) + slot_time = datetime.combine(check_date, datetime.min.time()) + slot_time = slot_time.replace(hour=hour, minute=minute) + + # Check availability + existing = db.query(Post).filter( + Post.platforms.contains([platform]), + Post.status.in_(["scheduled", "published"]), + Post.scheduled_at == slot_time + ).first() + + slots.append(ScheduleSlot( + datetime=slot_time, + platform=platform, + available=existing is None + )) + + return slots + + finally: + db.close() + + def schedule_post( + self, + post_id: int, + scheduled_at: Optional[datetime] = None, + platform: Optional[str] = None + ) -> datetime: + """ + Schedule a post for publishing. + + Args: + post_id: Post to schedule + scheduled_at: Specific time (or auto-select if None) + platform: Platform for auto-scheduling + + Returns: + Scheduled datetime + """ + db = SessionLocal() + try: + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise ValueError(f"Post {post_id} not found") + + # Auto-select time if not provided + if scheduled_at is None: + target_platform = platform or (post.platforms[0] if post.platforms else "x") + scheduled_at = self.get_next_available_slot(target_platform) + + post.scheduled_at = scheduled_at + post.status = "scheduled" + + db.commit() + + return scheduled_at + + finally: + db.close() + + def reschedule_post( + self, + post_id: int, + new_time: datetime + ) -> bool: + """Reschedule a post to a new time.""" + db = SessionLocal() + try: + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + return False + + if post.status == "published": + return False # Can't reschedule published posts + + post.scheduled_at = new_time + post.status = "scheduled" + + db.commit() + return True + + finally: + db.close() + + def cancel_scheduled(self, post_id: int) -> bool: + """Cancel a scheduled post.""" + db = SessionLocal() + try: + from app.models.post import Post + + post = db.query(Post).filter(Post.id == post_id).first() + if not post or post.status != "scheduled": + return False + + post.status = "draft" + post.scheduled_at = None + + db.commit() + return True + + finally: + db.close() + + def get_calendar( + self, + start_date: datetime, + end_date: datetime, + platforms: Optional[List[str]] = None + ) -> Dict: + """ + Get calendar view of scheduled posts. + + Returns: + Dict with posts grouped by date + """ + db = SessionLocal() + try: + from app.models.post import Post + + query = db.query(Post).filter( + Post.scheduled_at >= start_date, + Post.scheduled_at <= end_date, + Post.status.in_(["scheduled", "pending_approval", "published"]) + ) + + if platforms: + # Filter by platforms (any match) + query = query.filter( + Post.platforms.overlap(platforms) + ) + + posts = query.order_by(Post.scheduled_at).all() + + # Group by date + calendar = {} + for post in posts: + date_key = post.scheduled_at.strftime("%Y-%m-%d") + + if date_key not in calendar: + calendar[date_key] = [] + + calendar[date_key].append({ + "id": post.id, + "content": post.content[:100] + "..." if len(post.content) > 100 else post.content, + "platforms": post.platforms, + "status": post.status, + "scheduled_at": post.scheduled_at.isoformat(), + "content_type": post.content_type + }) + + return calendar + + finally: + db.close() + + def auto_fill_calendar( + self, + start_date: datetime, + days: int, + platforms: List[str], + posts_per_day: Dict[str, int] = None + ) -> List[ScheduleSlot]: + """ + Get suggested slots to fill the calendar. + + Args: + start_date: Start filling from + days: Number of days + platforms: Platforms to schedule for + posts_per_day: Posts per day per platform + + Returns: + List of empty slots that need content + """ + if posts_per_day is None: + posts_per_day = {"x": 4, "threads": 3, "instagram": 2, "facebook": 1} + + empty_slots = [] + + for platform in platforms: + target_count = posts_per_day.get(platform, 2) + slots = self.get_available_slots(platform, start_date, days) + + available = [s for s in slots if s.available] + + # Distribute evenly + slots_needed = target_count * days + step = max(1, len(available) // slots_needed) if available else 1 + + for i in range(0, min(len(available), slots_needed), step): + empty_slots.append(available[i]) + + return sorted(empty_slots, key=lambda s: s.datetime) + + +# Global instance +content_scheduler = ContentScheduler() diff --git a/app/worker/__init__.py b/app/worker/__init__.py new file mode 100644 index 0000000..2dd7b7d --- /dev/null +++ b/app/worker/__init__.py @@ -0,0 +1,7 @@ +""" +Celery worker module for background tasks. +""" + +from app.worker.celery_app import celery_app + +__all__ = ["celery_app"] diff --git a/app/worker/celery_app.py b/app/worker/celery_app.py new file mode 100644 index 0000000..25d61d1 --- /dev/null +++ b/app/worker/celery_app.py @@ -0,0 +1,76 @@ +""" +Celery application configuration. +""" + +from celery import Celery +from celery.schedules import crontab + +from app.core.config import settings + + +# Create Celery app +celery_app = Celery( + "social_automation", + broker=settings.REDIS_URL, + backend=settings.REDIS_URL, + include=[ + "app.worker.tasks", + ] +) + +# Celery configuration +celery_app.conf.update( + # Serialization + task_serializer="json", + accept_content=["json"], + result_serializer="json", + + # Timezone + timezone="America/Tijuana", + enable_utc=True, + + # Task settings + task_track_started=True, + task_time_limit=300, # 5 minutes max per task + task_soft_time_limit=240, # Soft limit 4 minutes + + # Result backend settings + result_expires=86400, # Results expire after 24 hours + + # Worker settings + worker_prefetch_multiplier=1, + worker_concurrency=4, + + # Beat schedule for periodic tasks + beat_schedule={ + # Check and publish scheduled posts every minute + "check-scheduled-posts": { + "task": "app.worker.tasks.check_scheduled_posts", + "schedule": 60.0, # Every minute + }, + + # Generate daily content at 6 AM + "generate-daily-content": { + "task": "app.worker.tasks.generate_daily_content", + "schedule": crontab(hour=6, minute=0), + }, + + # Sync interactions every 15 minutes + "sync-interactions": { + "task": "app.worker.tasks.sync_interactions", + "schedule": crontab(minute="*/15"), + }, + + # Send daily summary at 9 PM + "send-daily-summary": { + "task": "app.worker.tasks.send_daily_summary", + "schedule": crontab(hour=21, minute=0), + }, + + # Cleanup old data weekly + "cleanup-old-data": { + "task": "app.worker.tasks.cleanup_old_data", + "schedule": crontab(hour=3, minute=0, day_of_week=0), # Sunday 3 AM + }, + }, +) diff --git a/app/worker/tasks.py b/app/worker/tasks.py new file mode 100644 index 0000000..b701950 --- /dev/null +++ b/app/worker/tasks.py @@ -0,0 +1,621 @@ +""" +Celery tasks for social media automation. +""" + +import asyncio +from datetime import datetime, timedelta +from typing import List, Optional + +from celery import shared_task +from celery.utils.log import get_task_logger + +from app.core.database import SessionLocal +from app.core.config import settings + +logger = get_task_logger(__name__) + + +def run_async(coro): + """Helper to run async code in sync context.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +# ============================================================ +# PUBLISHING TASKS +# ============================================================ + +@shared_task(bind=True, max_retries=3) +def publish_post(self, post_id: int): + """ + Publish a single post to its designated platforms. + + Args: + post_id: ID of the post to publish + """ + logger.info(f"Publishing post {post_id}") + + db = SessionLocal() + + try: + from app.models.post import Post + from app.publishers.manager import publisher_manager, Platform + + post = db.query(Post).filter(Post.id == post_id).first() + + if not post: + logger.error(f"Post {post_id} not found") + return {"success": False, "error": "Post not found"} + + if post.status not in ["scheduled", "pending_approval"]: + logger.warning(f"Post {post_id} status is {post.status}, skipping") + return {"success": False, "error": f"Invalid status: {post.status}"} + + # Publish to each platform + results = {} + for platform_name in post.platforms: + try: + platform = Platform(platform_name) + result = run_async( + publisher_manager.publish( + platform=platform, + content=post.content, + image_path=post.image_url + ) + ) + + results[platform_name] = { + "success": result.success, + "post_id": result.post_id, + "url": result.url, + "error": result.error_message + } + + if result.success: + logger.info(f"Published to {platform_name}: {result.url}") + else: + logger.error(f"Failed {platform_name}: {result.error_message}") + + except Exception as e: + logger.error(f"Error publishing to {platform_name}: {e}") + results[platform_name] = {"success": False, "error": str(e)} + + # Update post status + any_success = any(r.get("success") for r in results.values()) + + if any_success: + post.status = "published" + post.published_at = datetime.utcnow() + post.publish_results = results + else: + post.status = "failed" + post.publish_results = results + + db.commit() + + # Send notification + send_publish_notification.delay(post_id, results) + + return {"success": any_success, "results": results} + + except Exception as e: + logger.error(f"Error in publish_post: {e}") + db.rollback() + + # Retry on failure + raise self.retry(exc=e, countdown=60) + + finally: + db.close() + + +@shared_task +def publish_to_platform(post_id: int, platform: str): + """Publish a post to a specific platform.""" + logger.info(f"Publishing post {post_id} to {platform}") + + db = SessionLocal() + + try: + from app.models.post import Post + from app.publishers.manager import publisher_manager, Platform + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + return {"success": False, "error": "Post not found"} + + plat = Platform(platform) + result = run_async( + publisher_manager.publish( + platform=plat, + content=post.content, + image_path=post.image_url + ) + ) + + return { + "success": result.success, + "post_id": result.post_id, + "url": result.url, + "error": result.error_message + } + + except Exception as e: + logger.error(f"Error publishing to {platform}: {e}") + return {"success": False, "error": str(e)} + + finally: + db.close() + + +@shared_task +def check_scheduled_posts(): + """ + Check for posts scheduled to be published now. + Runs every minute via Celery Beat. + """ + logger.info("Checking scheduled posts...") + + db = SessionLocal() + + try: + from app.models.post import Post + + now = datetime.utcnow() + window_start = now - timedelta(minutes=1) + + # Find posts scheduled for now + posts = db.query(Post).filter( + Post.status == "scheduled", + Post.scheduled_at <= now, + Post.scheduled_at > window_start + ).all() + + logger.info(f"Found {len(posts)} posts to publish") + + for post in posts: + # Queue publish task + publish_post.delay(post.id) + logger.info(f"Queued post {post.id} for publishing") + + return {"checked": len(posts)} + + except Exception as e: + logger.error(f"Error checking scheduled posts: {e}") + return {"error": str(e)} + + finally: + db.close() + + +# ============================================================ +# CONTENT GENERATION TASKS +# ============================================================ + +@shared_task +def generate_daily_content(): + """ + Generate content for the day. + Runs at 6 AM via Celery Beat. + """ + logger.info("Generating daily content...") + + if not settings.DEEPSEEK_API_KEY: + logger.warning("DeepSeek API not configured, skipping content generation") + return {"success": False, "error": "API not configured"} + + db = SessionLocal() + + try: + from app.models.post import Post + from app.services.content_generator import content_generator + from app.data.content_templates import get_optimal_times + + today = datetime.utcnow().date() + is_weekend = datetime.utcnow().weekday() >= 5 + + # Check if content already exists for today + existing = db.query(Post).filter( + Post.scheduled_at >= datetime.combine(today, datetime.min.time()), + Post.scheduled_at < datetime.combine(today + timedelta(days=1), datetime.min.time()) + ).count() + + if existing >= 4: + logger.info(f"Already have {existing} posts for today, skipping") + return {"success": True, "message": "Content already exists"} + + # Generate tips for each platform + platforms = ["x", "threads"] + categories = ["productividad", "seguridad", "ia", "general"] + generated = 0 + + for i, platform in enumerate(platforms): + category = categories[i % len(categories)] + + try: + content = run_async( + content_generator.generate_tip_tech( + category=category, + platform=platform + ) + ) + + # Get optimal time + times = get_optimal_times(platform, is_weekend) + time_str = times[i % len(times)] + hour, minute = map(int, time_str.split(":")) + + scheduled_at = datetime.combine( + today, + datetime.min.time() + ).replace(hour=hour, minute=minute) + + # Create post + post = Post( + content=content, + content_type="tip", + platforms=[platform], + status="scheduled", + scheduled_at=scheduled_at, + metadata={"auto_generated": True, "category": category} + ) + + db.add(post) + generated += 1 + + except Exception as e: + logger.error(f"Error generating for {platform}: {e}") + + db.commit() + logger.info(f"Generated {generated} posts for today") + + return {"success": True, "generated": generated} + + except Exception as e: + logger.error(f"Error in generate_daily_content: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + +@shared_task +def generate_content_batch( + platforms: List[str], + days: int = 7, + categories: Optional[List[str]] = None +): + """ + Generate a batch of content for multiple days. + + Args: + platforms: List of platforms + days: Number of days to generate for + categories: Tip categories to use + """ + logger.info(f"Generating batch content for {days} days") + + if not settings.DEEPSEEK_API_KEY: + return {"success": False, "error": "API not configured"} + + try: + from app.services.batch_generator import batch_generator + + result = run_async( + batch_generator._generate_batch( + platforms=platforms, + start_date=None, + days=days, + tip_categories=categories + ) + ) + + # Save posts to database + db = SessionLocal() + try: + from app.models.post import Post + + saved = 0 + for post_data in result.posts: + post = Post( + content=post_data.content, + content_type=post_data.content_type.value, + platforms=[post_data.platform], + status="pending_approval", # Require approval for batch + scheduled_at=post_data.scheduled_at, + metadata=post_data.metadata + ) + db.add(post) + saved += 1 + + db.commit() + + return { + "success": True, + "generated": result.total_generated, + "saved": saved, + "errors": result.errors + } + + finally: + db.close() + + except Exception as e: + logger.error(f"Error in generate_content_batch: {e}") + return {"success": False, "error": str(e)} + + +# ============================================================ +# INTERACTION TASKS +# ============================================================ + +@shared_task +def sync_interactions(): + """ + Sync interactions from all platforms. + Runs every 15 minutes via Celery Beat. + """ + logger.info("Syncing interactions...") + + db = SessionLocal() + + try: + from app.models.interaction import Interaction + from app.models.post import Post + from app.publishers.manager import publisher_manager, Platform + + synced = 0 + + # Get recent published posts + recent_posts = db.query(Post).filter( + Post.status == "published", + Post.published_at >= datetime.utcnow() - timedelta(days=7) + ).all() + + for post in recent_posts: + for platform_name in post.platforms: + try: + platform = Platform(platform_name) + publisher = publisher_manager.get_publisher(platform) + + if not publisher: + continue + + # Get post ID from results + results = post.publish_results or {} + platform_result = results.get(platform_name, {}) + platform_post_id = platform_result.get("post_id") + + if not platform_post_id: + continue + + # Fetch comments + comments = run_async( + publisher.get_comments(platform_post_id) + ) + + for comment in comments: + # Check if already exists + existing = db.query(Interaction).filter( + Interaction.platform == platform_name, + Interaction.platform_interaction_id == str(comment.get("id")) + ).first() + + if not existing: + interaction = Interaction( + post_id=post.id, + platform=platform_name, + platform_interaction_id=str(comment.get("id")), + interaction_type="comment", + author_id=str(comment.get("author_id", comment.get("from", {}).get("id", ""))), + author_username=comment.get("username", comment.get("from", {}).get("name", "")), + content=comment.get("text", comment.get("message", "")), + interaction_at=datetime.fromisoformat( + comment.get("created_at", comment.get("timestamp", datetime.utcnow().isoformat())) + ) if comment.get("created_at") or comment.get("timestamp") else datetime.utcnow() + ) + db.add(interaction) + synced += 1 + + except Exception as e: + logger.error(f"Error syncing {platform_name} for post {post.id}: {e}") + + db.commit() + logger.info(f"Synced {synced} new interactions") + + return {"success": True, "synced": synced} + + except Exception as e: + logger.error(f"Error in sync_interactions: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + +# ============================================================ +# NOTIFICATION TASKS +# ============================================================ + +@shared_task +def send_publish_notification(post_id: int, results: dict): + """Send notification about publish result.""" + if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: + return {"success": False, "error": "Telegram not configured"} + + try: + from app.services.notifications import telegram_notify + + db = SessionLocal() + try: + from app.models.post import Post + post = db.query(Post).filter(Post.id == post_id).first() + + if not post: + return {"success": False, "error": "Post not found"} + + # Build message + success_count = sum(1 for r in results.values() if r.get("success")) + total = len(results) + + if success_count == total: + emoji = "✅" + status = "Publicado" + elif success_count > 0: + emoji = "⚠️" + status = "Parcialmente publicado" + else: + emoji = "❌" + status = "Falló" + + message = f"{emoji} *{status}*\n\n" + message += f"📝 {post.content[:100]}...\n\n" + + for platform, result in results.items(): + icon = "✓" if result.get("success") else "✗" + message += f"{icon} {platform}" + if result.get("url"): + message += f": {result['url']}" + elif result.get("error"): + message += f": {result['error'][:50]}" + message += "\n" + + run_async(telegram_notify(message)) + + return {"success": True} + + finally: + db.close() + + except Exception as e: + logger.error(f"Error sending notification: {e}") + return {"success": False, "error": str(e)} + + +@shared_task +def send_daily_summary(): + """ + Send daily summary of activity. + Runs at 9 PM via Celery Beat. + """ + if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: + return {"success": False, "error": "Telegram not configured"} + + db = SessionLocal() + + try: + from app.models.post import Post + from app.models.interaction import Interaction + from app.services.notifications import telegram_notify + + today = datetime.utcnow().date() + today_start = datetime.combine(today, datetime.min.time()) + + # Stats + published = db.query(Post).filter( + Post.status == "published", + Post.published_at >= today_start + ).count() + + failed = db.query(Post).filter( + Post.status == "failed", + Post.published_at >= today_start + ).count() + + scheduled = db.query(Post).filter( + Post.status == "scheduled", + Post.scheduled_at >= datetime.utcnow() + ).count() + + pending = db.query(Post).filter( + Post.status == "pending_approval" + ).count() + + new_interactions = db.query(Interaction).filter( + Interaction.interaction_at >= today_start, + Interaction.responded == False + ).count() + + # Build message + message = "📊 *Resumen del día*\n\n" + message += f"✅ Publicados: {published}\n" + if failed > 0: + message += f"❌ Fallidos: {failed}\n" + message += f"📅 Programados: {scheduled}\n" + message += f"⏳ Pendientes aprobación: {pending}\n" + message += f"💬 Nuevas interacciones: {new_interactions}\n" + + if new_interactions > 0: + message += f"\n⚠️ Hay {new_interactions} interacciones sin responder" + + run_async(telegram_notify(message)) + + return {"success": True} + + except Exception as e: + logger.error(f"Error sending daily summary: {e}") + return {"success": False, "error": str(e)} + + finally: + db.close() + + +# ============================================================ +# MAINTENANCE TASKS +# ============================================================ + +@shared_task +def cleanup_old_data(): + """ + Clean up old data to prevent database bloat. + Runs weekly via Celery Beat. + """ + logger.info("Running cleanup...") + + db = SessionLocal() + + try: + from app.models.post import Post + from app.models.interaction import Interaction + + cutoff = datetime.utcnow() - timedelta(days=90) + + # Archive old published posts + old_posts = db.query(Post).filter( + Post.status == "published", + Post.published_at < cutoff + ).count() + + # Delete old archived interactions + deleted = db.query(Interaction).filter( + Interaction.is_archived == True, + Interaction.interaction_at < cutoff + ).delete() + + db.commit() + + # Clean up old images + from app.services.image_upload import image_upload + images_deleted = image_upload.cleanup_old(days=30) + + logger.info(f"Cleanup: {deleted} interactions, {images_deleted} images") + + return { + "success": True, + "interactions_deleted": deleted, + "images_deleted": images_deleted + } + + except Exception as e: + logger.error(f"Error in cleanup: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close()