diff --git a/worker/celery_app.py b/worker/celery_app.py index 7261fea..002970a 100644 --- a/worker/celery_app.py +++ b/worker/celery_app.py @@ -17,7 +17,8 @@ celery_app = Celery( "worker.tasks.publish_post", "worker.tasks.fetch_interactions", "worker.tasks.cleanup", - "worker.tasks.content_memory" + "worker.tasks.content_memory", + "worker.tasks.daily_reports" ] ) @@ -77,4 +78,16 @@ celery_app.conf.beat_schedule = { "task": "worker.tasks.content_memory.refresh_best_posts_yaml", "schedule": crontab(day_of_week=0, hour=5, minute=0), # Domingos 5 AM }, + + # Reporte matutino por Telegram (8:00 AM Tijuana) + "telegram-morning-report": { + "task": "worker.tasks.daily_reports.morning_report", + "schedule": crontab(hour=8, minute=0), + }, + + # Reporte vespertino por Telegram (6:00 PM Tijuana) + "telegram-afternoon-report": { + "task": "worker.tasks.daily_reports.afternoon_report", + "schedule": crontab(hour=18, minute=0), + }, } diff --git a/worker/tasks/daily_reports.py b/worker/tasks/daily_reports.py new file mode 100644 index 0000000..8fbc282 --- /dev/null +++ b/worker/tasks/daily_reports.py @@ -0,0 +1,195 @@ +""" +Tareas de reportes diarios por Telegram. +""" + +import asyncio +from datetime import datetime, timedelta + +from worker.celery_app import celery_app +from app.core.database import SessionLocal +from app.models.post import Post +from app.models.interaction import Interaction +from app.services.notifications import telegram_notify + + +def run_async(coro): + """Helper para ejecutar coroutines en Celery.""" + loop = asyncio.get_event_loop() + return loop.run_until_complete(coro) + + +@celery_app.task(name="worker.tasks.daily_reports.morning_report") +def morning_report(): + """ + Reporte matutino: posts programados para hoy + nuevas interacciones. + Se ejecuta a las 8:00 AM. + """ + db = SessionLocal() + + try: + now = datetime.utcnow() + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + today_end = today_start + timedelta(days=1) + + # Posts programados para hoy + scheduled_posts = db.query(Post).filter( + Post.status == "scheduled", + Post.scheduled_at >= today_start, + Post.scheduled_at < today_end + ).order_by(Post.scheduled_at).all() + + # Interacciones de las ΓΊltimas 24 horas sin responder + yesterday = now - timedelta(hours=24) + new_interactions = db.query(Interaction).filter( + Interaction.interaction_at >= yesterday, + Interaction.responded == False, + Interaction.is_archived == False + ).order_by(Interaction.interaction_at.desc()).all() + + # Construir mensaje + message = "β˜€οΈ *Buenos dΓ­as! Reporte matutino*\n" + message += f"πŸ“… {now.strftime('%d/%m/%Y')}\n\n" + + # SecciΓ³n de posts programados + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += "πŸ“ *Posts programados para hoy:*\n\n" + + if scheduled_posts: + for post in scheduled_posts: + time_str = post.scheduled_at.strftime("%H:%M") + platforms = ", ".join(post.platforms) if post.platforms else "N/A" + content_preview = post.content[:60] + "..." if len(post.content) > 60 else post.content + # Escapar caracteres especiales de Markdown + content_preview = content_preview.replace("_", "\\_").replace("*", "\\*") + + message += f"⏰ *{time_str}* ({platforms})\n" + message += f" {content_preview}\n\n" + else: + message += " _No hay posts programados para hoy_\n\n" + + # SecciΓ³n de interacciones + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += "πŸ’¬ *Nuevas interacciones (24h):*\n\n" + + if new_interactions: + # Agrupar por plataforma + by_platform = {} + for interaction in new_interactions: + platform = interaction.platform + if platform not in by_platform: + by_platform[platform] = [] + by_platform[platform].append(interaction) + + for platform, interactions in by_platform.items(): + message += f"πŸ“± *{platform.upper()}* ({len(interactions)})\n" + for i in interactions[:3]: # Mostrar mΓ‘ximo 3 por plataforma + content_preview = i.content[:50] + "..." if i.content and len(i.content) > 50 else (i.content or "N/A") + content_preview = content_preview.replace("_", "\\_").replace("*", "\\*") + message += f" β€’ @{i.author_username}: {content_preview}\n" + if len(interactions) > 3: + message += f" _...y {len(interactions) - 3} mΓ‘s_\n" + message += "\n" + else: + message += " _No hay nuevas interacciones_\n\n" + + # Resumen + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += f"πŸ“Š *Resumen:* {len(scheduled_posts)} posts | {len(new_interactions)} interacciones" + + # Enviar mensaje + success = run_async(telegram_notify(message)) + + return f"Reporte matutino enviado: {len(scheduled_posts)} posts, {len(new_interactions)} interacciones" + + except Exception as e: + return f"Error en reporte matutino: {e}" + + finally: + db.close() + + +@celery_app.task(name="worker.tasks.daily_reports.afternoon_report") +def afternoon_report(): + """ + Reporte vespertino: solo actualizaciΓ³n de interacciones. + Se ejecuta a las 6:00 PM. + """ + db = SessionLocal() + + try: + now = datetime.utcnow() + + # Interacciones desde la maΓ±ana (ΓΊltimas 10 horas aprox) + since_morning = now - timedelta(hours=10) + + new_interactions = db.query(Interaction).filter( + Interaction.interaction_at >= since_morning, + Interaction.responded == False, + Interaction.is_archived == False + ).order_by(Interaction.interaction_at.desc()).all() + + # Posts publicados hoy + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + published_today = db.query(Post).filter( + Post.status == "published", + Post.published_at >= today_start + ).count() + + # Posts pendientes para maΓ±ana + tomorrow_start = today_start + timedelta(days=1) + tomorrow_end = tomorrow_start + timedelta(days=1) + scheduled_tomorrow = db.query(Post).filter( + Post.status == "scheduled", + Post.scheduled_at >= tomorrow_start, + Post.scheduled_at < tomorrow_end + ).count() + + # Construir mensaje + message = "πŸŒ† *Reporte vespertino*\n" + message += f"πŸ“… {now.strftime('%d/%m/%Y %H:%M')}\n\n" + + # Resumen del dΓ­a + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += "πŸ“Š *Resumen del dΓ­a:*\n\n" + message += f" βœ… Publicados hoy: {published_today}\n" + message += f" πŸ“… Programados maΓ±ana: {scheduled_tomorrow}\n\n" + + # Interacciones pendientes + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += "πŸ’¬ *Interacciones pendientes:*\n\n" + + if new_interactions: + by_platform = {} + for interaction in new_interactions: + platform = interaction.platform + if platform not in by_platform: + by_platform[platform] = [] + by_platform[platform].append(interaction) + + for platform, interactions in by_platform.items(): + message += f"πŸ“± *{platform.upper()}* ({len(interactions)})\n" + for i in interactions[:5]: # Mostrar mΓ‘s en el reporte de la tarde + content_preview = i.content[:50] + "..." if i.content and len(i.content) > 50 else (i.content or "N/A") + content_preview = content_preview.replace("_", "\\_").replace("*", "\\*") + message += f" β€’ @{i.author_username}: {content_preview}\n" + if len(interactions) > 5: + message += f" _...y {len(interactions) - 5} mΓ‘s_\n" + message += "\n" + + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += f"⚠️ *{len(new_interactions)} interacciones sin responder*" + else: + message += " βœ… _Todas las interacciones han sido atendidas_\n\n" + message += "━━━━━━━━━━━━━━━━━━━━━\n" + message += "πŸŽ‰ *Excelente trabajo hoy!*" + + # Enviar mensaje + success = run_async(telegram_notify(message)) + + return f"Reporte vespertino enviado: {published_today} publicados, {len(new_interactions)} interacciones" + + except Exception as e: + return f"Error en reporte vespertino: {e}" + + finally: + db.close()