""" Notification service for alerts and summaries. """ import httpx from typing import Optional from app.core.config import settings async def telegram_notify( message: str, parse_mode: str = "Markdown", disable_notification: bool = False ) -> bool: """ Send a notification via Telegram. Args: message: Message text (supports Markdown) parse_mode: "Markdown" or "HTML" disable_notification: If True, send silently Returns: True if sent successfully """ if not settings.TELEGRAM_BOT_TOKEN or not settings.TELEGRAM_CHAT_ID: return False url = f"https://api.telegram.org/bot{settings.TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": settings.TELEGRAM_CHAT_ID, "text": message, "parse_mode": parse_mode, "disable_notification": disable_notification } try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload) return response.status_code == 200 except Exception: return False async def telegram_notify_error( error: str, context: Optional[str] = None ) -> bool: """Send error notification.""" message = f"🚨 *Error*\n\n{error}" if context: message += f"\n\nπŸ“ Contexto: {context}" return await telegram_notify(message) async def telegram_notify_success( title: str, details: Optional[str] = None ) -> bool: """Send success notification.""" message = f"βœ… *{title}*" if details: message += f"\n\n{details}" return await telegram_notify(message) class NotificationService: """Service for managing notifications.""" def __init__(self): self.telegram_enabled = bool( settings.TELEGRAM_BOT_TOKEN and settings.TELEGRAM_CHAT_ID ) async def notify_publish( self, platform: str, content: str, url: Optional[str] = None, success: bool = True ): """Notify about a publication.""" if not self.telegram_enabled: return emoji = "βœ…" if success else "❌" status = "Publicado" if success else "Error al publicar" message = f"{emoji} *{status} en {platform}*\n\n" message += f"πŸ“ {content[:150]}..." if url: message += f"\n\nπŸ”— {url}" await telegram_notify(message) async def notify_interaction( self, platform: str, author: str, content: str, interaction_type: str = "comentario" ): """Notify about a new interaction.""" if not self.telegram_enabled: return message = f"πŸ’¬ *Nueva interacciΓ³n en {platform}*\n\n" message += f"πŸ‘€ @{author}\n" message += f"πŸ“ {content[:200]}" await telegram_notify(message) async def notify_daily_summary( self, published: int, failed: int, scheduled: int, interactions: int ): """Send daily summary.""" if not self.telegram_enabled: return message = "πŸ“Š *Resumen del dΓ­a*\n\n" message += f"βœ… Publicados: {published}\n" if failed > 0: message += f"❌ Fallidos: {failed}\n" message += f"πŸ“… Programados: {scheduled}\n" message += f"πŸ’¬ Interacciones: {interactions}" await telegram_notify(message) async def notify_error(self, error: str, context: str = None): """Send error notification.""" if not self.telegram_enabled: return await telegram_notify_error(error, context) # Global instance notification_service = NotificationService()