""" Comparison analysis for Sales Bot Compares sales data across different time periods """ import logging from datetime import datetime, timedelta, timezone from typing import Dict, Optional, List logger = logging.getLogger(__name__) # Mexico timezone TZ_MEXICO = timezone(timedelta(hours=-6)) class ComparisonAnalyzer: """ Analyzes and compares sales data across different periods. """ def __init__(self, nocodb_client): self.nocodb = nocodb_client def comparar_semanas(self) -> Dict: """ Compara la semana actual con la semana anterior. Returns: dict con comparación semanal """ try: hoy = datetime.now(TZ_MEXICO).date() # Semana actual (lunes a hoy) dias_desde_lunes = hoy.weekday() inicio_semana_actual = hoy - timedelta(days=dias_desde_lunes) # Semana anterior inicio_semana_anterior = inicio_semana_actual - timedelta(days=7) fin_semana_anterior = inicio_semana_actual - timedelta(days=1) # Obtener ventas ventas_actual = self._obtener_ventas_rango(inicio_semana_actual, hoy) ventas_anterior = self._obtener_ventas_rango(inicio_semana_anterior, fin_semana_anterior) monto_actual = sum(float(v.get('monto', 0) or 0) for v in ventas_actual) monto_anterior = sum(float(v.get('monto', 0) or 0) for v in ventas_anterior) # Calcular diferencia if monto_anterior > 0: diff_pct = ((monto_actual - monto_anterior) / monto_anterior) * 100 else: diff_pct = 100 if monto_actual > 0 else 0 return { 'current_week': round(monto_actual, 2), 'previous_week': round(monto_anterior, 2), 'difference': round(monto_actual - monto_anterior, 2), 'diff_percent': round(diff_pct, 1), 'ventas_actual': len(ventas_actual), 'ventas_anterior': len(ventas_anterior), 'periodo_actual': { 'inicio': inicio_semana_actual.strftime('%Y-%m-%d'), 'fin': hoy.strftime('%Y-%m-%d') }, 'periodo_anterior': { 'inicio': inicio_semana_anterior.strftime('%Y-%m-%d'), 'fin': fin_semana_anterior.strftime('%Y-%m-%d') } } except Exception as e: logger.error(f"Error comparando semanas: {str(e)}") return { 'current_week': 0, 'previous_week': 0, 'difference': 0, 'diff_percent': 0, 'error': str(e) } def comparar_meses(self, mes1: str = None, mes2: str = None) -> Dict: """ Compara dos meses. Args: mes1: Primer mes (formato YYYY-MM), default mes actual mes2: Segundo mes (formato YYYY-MM), default mes anterior Returns: dict con comparación mensual """ try: hoy = datetime.now(TZ_MEXICO).date() # Determinar meses a comparar if not mes1: mes1 = hoy.strftime('%Y-%m') if not mes2: # Mes anterior primer_dia_mes_actual = hoy.replace(day=1) ultimo_dia_mes_anterior = primer_dia_mes_actual - timedelta(days=1) mes2 = ultimo_dia_mes_anterior.strftime('%Y-%m') # Obtener ventas de cada mes ventas_mes1 = self._obtener_ventas_mes(mes1) ventas_mes2 = self._obtener_ventas_mes(mes2) monto_mes1 = sum(float(v.get('monto', 0) or 0) for v in ventas_mes1) monto_mes2 = sum(float(v.get('monto', 0) or 0) for v in ventas_mes2) # Calcular diferencia if monto_mes2 > 0: diff_pct = ((monto_mes1 - monto_mes2) / monto_mes2) * 100 else: diff_pct = 100 if monto_mes1 > 0 else 0 # Calcular mes anterior al mes2 para tendencia de 3 meses anio2, mes2_num = map(int, mes2.split('-')) if mes2_num == 1: mes3 = f"{anio2 - 1}-12" else: mes3 = f"{anio2}-{mes2_num - 1:02d}" ventas_mes3 = self._obtener_ventas_mes(mes3) monto_mes3 = sum(float(v.get('monto', 0) or 0) for v in ventas_mes3) return { 'current_month': round(monto_mes1, 2), 'previous_month': round(monto_mes2, 2), 'month_2': round(monto_mes3, 2), 'difference': round(monto_mes1 - monto_mes2, 2), 'diff_percent': round(diff_pct, 1), 'ventas_actual': len(ventas_mes1), 'ventas_anterior': len(ventas_mes2), 'months': [mes3, mes2, mes1], 'values': [round(monto_mes3, 2), round(monto_mes2, 2), round(monto_mes1, 2)], 'mes1': mes1, 'mes2': mes2 } except Exception as e: logger.error(f"Error comparando meses: {str(e)}") return { 'current_month': 0, 'previous_month': 0, 'difference': 0, 'diff_percent': 0, 'error': str(e) } def comparar_anios(self, anio1: int = None, anio2: int = None) -> Dict: """ Compara ventas anuales (hasta la fecha actual del año). Args: anio1: Primer año, default año actual anio2: Segundo año, default año anterior Returns: dict con comparación anual """ try: hoy = datetime.now(TZ_MEXICO).date() if not anio1: anio1 = hoy.year if not anio2: anio2 = anio1 - 1 # Obtener ventas YTD de cada año dia_mes = hoy.strftime('%m-%d') # Para comparación justa, comparamos hasta el mismo día del año ventas_anio1 = self._obtener_ventas_ytd(anio1, dia_mes) ventas_anio2 = self._obtener_ventas_ytd(anio2, dia_mes) monto_anio1 = sum(float(v.get('monto', 0) or 0) for v in ventas_anio1) monto_anio2 = sum(float(v.get('monto', 0) or 0) for v in ventas_anio2) # Calcular diferencia if monto_anio2 > 0: diff_pct = ((monto_anio1 - monto_anio2) / monto_anio2) * 100 else: diff_pct = 100 if monto_anio1 > 0 else 0 return { 'anio_actual': anio1, 'anio_anterior': anio2, 'monto_actual': round(monto_anio1, 2), 'monto_anterior': round(monto_anio2, 2), 'difference': round(monto_anio1 - monto_anio2, 2), 'diff_percent': round(diff_pct, 1), 'ventas_actual': len(ventas_anio1), 'ventas_anterior': len(ventas_anio2), 'comparacion_hasta': hoy.strftime('%d de %B') } except Exception as e: logger.error(f"Error comparando años: {str(e)}") return { 'monto_actual': 0, 'monto_anterior': 0, 'difference': 0, 'diff_percent': 0, 'error': str(e) } def _obtener_ventas_rango(self, inicio: datetime.date, fin: datetime.date) -> List[Dict]: """Obtiene ventas en un rango de fechas.""" try: # Obtener todas las ventas recientes ventas = self.nocodb.get_ventas_mes() if hasattr(self.nocodb, 'get_ventas_mes') else [] ventas_filtradas = [] for venta in ventas: try: fecha_str = venta.get('fecha_venta', '') if 'T' in fecha_str: fecha = datetime.fromisoformat(fecha_str.replace('Z', '+00:00')) else: fecha = datetime.strptime(fecha_str[:10], '%Y-%m-%d') if fecha.tzinfo is None: fecha = fecha.replace(tzinfo=timezone.utc) fecha_local = fecha.astimezone(TZ_MEXICO).date() if inicio <= fecha_local <= fin: ventas_filtradas.append(venta) except: continue return ventas_filtradas except Exception as e: logger.error(f"Error obteniendo ventas del rango: {str(e)}") return [] def _obtener_ventas_mes(self, mes: str) -> List[Dict]: """Obtiene ventas de un mes específico.""" try: # Usar método existente si está disponible if hasattr(self.nocodb, 'get_ventas_mes'): ventas = self.nocodb.get_ventas_mes() else: return [] # Filtrar por mes ventas_filtradas = [] for venta in ventas: fecha_str = venta.get('fecha_venta', '') if fecha_str and fecha_str[:7] == mes: ventas_filtradas.append(venta) return ventas_filtradas except Exception as e: logger.error(f"Error obteniendo ventas del mes {mes}: {str(e)}") return [] def _obtener_ventas_ytd(self, anio: int, hasta_dia_mes: str) -> List[Dict]: """Obtiene ventas del año hasta una fecha específica.""" try: inicio = datetime(anio, 1, 1).date() mes, dia = map(int, hasta_dia_mes.split('-')) fin = datetime(anio, mes, dia).date() return self._obtener_ventas_rango(inicio, fin) except Exception as e: logger.error(f"Error obteniendo ventas YTD {anio}: {str(e)}") return [] def get_comparison_summary(self, tipo: str = 'weekly') -> Dict: """ Obtiene resumen de comparación según el tipo solicitado. Args: tipo: 'weekly', 'monthly', o 'yearly' Returns: dict con comparación """ if tipo == 'weekly': return self.comparar_semanas() elif tipo == 'monthly': return self.comparar_meses() elif tipo == 'yearly': return self.comparar_anios() else: return {'error': f'Tipo de comparación no válido: {tipo}'}