""" A/B Testing Service - Create and manage content experiments. """ from datetime import datetime, timedelta from typing import List, Dict, Optional import logging from scipy import stats from app.core.database import SessionLocal from app.models.ab_test import ABTest, ABTestVariant from app.models.post import Post logger = logging.getLogger(__name__) class ABTestingService: """Service for A/B testing content variants.""" def _get_db(self): """Get database session.""" return SessionLocal() async def create_test( self, name: str, platform: str, variants: List[Dict], test_type: str = "content", duration_hours: int = 24, min_sample_size: int = 100, success_metric: str = "engagement_rate", description: str = None ) -> ABTest: """ Create a new A/B test with variants. Args: name: Test name platform: Target platform variants: List of variant data [{"name": "A", "content": "...", "hashtags": [...]}] test_type: Type of test (content, timing, hashtags, image) duration_hours: How long to run the test min_sample_size: Minimum impressions per variant success_metric: Metric to optimize for description: Optional description Returns: Created ABTest object """ db = self._get_db() try: # Create test test = ABTest( name=name, description=description, test_type=test_type, platform=platform, status="draft", duration_hours=duration_hours, min_sample_size=min_sample_size, success_metric=success_metric ) db.add(test) db.flush() # Get the ID # Create variants for variant_data in variants: variant = ABTestVariant( test_id=test.id, name=variant_data.get("name", "A"), content=variant_data.get("content", ""), hashtags=variant_data.get("hashtags"), image_url=variant_data.get("image_url") ) db.add(variant) db.commit() db.refresh(test) return test except Exception as e: logger.error(f"Error creating A/B test: {e}") db.rollback() raise finally: db.close() async def start_test(self, test_id: int) -> Dict: """ Start an A/B test by creating and scheduling posts for each variant. Args: test_id: ID of the test to start Returns: Dict with status and created post IDs """ db = self._get_db() try: test = db.query(ABTest).filter(ABTest.id == test_id).first() if not test: return {"success": False, "error": "Test not found"} if test.status != "draft": return {"success": False, "error": f"Test is already {test.status}"} variants = db.query(ABTestVariant).filter( ABTestVariant.test_id == test_id ).all() if len(variants) < 2: return {"success": False, "error": "Test needs at least 2 variants"} # Create posts for each variant post_ids = [] now = datetime.utcnow() for i, variant in enumerate(variants): # Schedule variants slightly apart to ensure fair distribution scheduled_at = now + timedelta(minutes=i * 5) post = Post( content=variant.content, content_type="ab_test", platforms=[test.platform], status="scheduled", scheduled_at=scheduled_at, hashtags=variant.hashtags, image_url=variant.image_url, ab_test_id=test.id ) db.add(post) db.flush() variant.post_id = post.id variant.published_at = scheduled_at post_ids.append(post.id) # Update test status test.status = "running" test.started_at = now db.commit() return { "success": True, "test_id": test.id, "post_ids": post_ids, "variants_count": len(variants) } except Exception as e: logger.error(f"Error starting A/B test: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() async def update_variant_metrics(self, test_id: int) -> Dict: """ Update metrics for all variants in a test from their posts. Args: test_id: ID of the test Returns: Dict with updated metrics """ db = self._get_db() try: test = db.query(ABTest).filter(ABTest.id == test_id).first() if not test: return {"success": False, "error": "Test not found"} variants = db.query(ABTestVariant).filter( ABTestVariant.test_id == test_id ).all() for variant in variants: if variant.post_id: post = db.query(Post).filter(Post.id == variant.post_id).first() if post and post.metrics: variant.likes = post.metrics.get("likes", 0) variant.comments = post.metrics.get("comments", 0) variant.shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) variant.impressions = post.metrics.get("impressions", 0) variant.reach = post.metrics.get("reach", 0) variant.calculate_engagement_rate() db.commit() return { "success": True, "variants_updated": len(variants) } except Exception as e: logger.error(f"Error updating variant metrics: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() async def evaluate_test(self, test_id: int) -> Dict: """ Evaluate A/B test results and determine winner. Args: test_id: ID of the test to evaluate Returns: Dict with evaluation results """ db = self._get_db() try: test = db.query(ABTest).filter(ABTest.id == test_id).first() if not test: return {"success": False, "error": "Test not found"} # First update metrics await self.update_variant_metrics(test_id) variants = db.query(ABTestVariant).filter( ABTestVariant.test_id == test_id ).all() if len(variants) < 2: return {"success": False, "error": "Not enough variants"} # Check minimum sample size min_impressions = min(v.impressions for v in variants) if min_impressions < test.min_sample_size: return { "success": True, "status": "insufficient_data", "min_impressions": min_impressions, "required": test.min_sample_size } # Determine winner based on success metric if test.success_metric == "engagement_rate": sorted_variants = sorted(variants, key=lambda v: v.engagement_rate, reverse=True) elif test.success_metric == "likes": sorted_variants = sorted(variants, key=lambda v: v.likes, reverse=True) elif test.success_metric == "comments": sorted_variants = sorted(variants, key=lambda v: v.comments, reverse=True) else: sorted_variants = sorted(variants, key=lambda v: v.engagement_rate, reverse=True) winner = sorted_variants[0] runner_up = sorted_variants[1] # Statistical significance test (chi-square for engagement) try: winner_engagements = winner.likes + winner.comments + winner.shares runner_up_engagements = runner_up.likes + runner_up.comments + runner_up.shares contingency = [ [winner_engagements, winner.impressions - winner_engagements], [runner_up_engagements, runner_up.impressions - runner_up_engagements] ] chi2, p_value, dof, expected = stats.chi2_contingency(contingency) confidence = (1 - p_value) * 100 except Exception: # If statistical test fails, just use raw comparison confidence = None p_value = None # Update test with winner winner.is_winner = True test.winning_variant_id = winner.id test.confidence_level = confidence # Check if test should be concluded if test.started_at: elapsed = datetime.utcnow() - test.started_at if elapsed.total_seconds() >= test.duration_hours * 3600: test.status = "completed" test.ended_at = datetime.utcnow() db.commit() return { "success": True, "winner": { "variant_id": winner.id, "name": winner.name, "engagement_rate": winner.engagement_rate, "impressions": winner.impressions }, "runner_up": { "variant_id": runner_up.id, "name": runner_up.name, "engagement_rate": runner_up.engagement_rate, "impressions": runner_up.impressions }, "confidence_level": confidence, "p_value": p_value, "test_status": test.status } except Exception as e: logger.error(f"Error evaluating A/B test: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() async def get_test(self, test_id: int) -> Optional[Dict]: """Get a test with its variants.""" db = self._get_db() try: test = db.query(ABTest).filter(ABTest.id == test_id).first() if test: return test.to_dict() return None finally: db.close() async def get_tests( self, status: str = None, platform: str = None, limit: int = 20 ) -> List[Dict]: """Get tests with optional filters.""" db = self._get_db() try: query = db.query(ABTest) if status: query = query.filter(ABTest.status == status) if platform: query = query.filter(ABTest.platform == platform) tests = query.order_by(ABTest.created_at.desc()).limit(limit).all() return [t.to_dict() for t in tests] finally: db.close() async def cancel_test(self, test_id: int) -> Dict: """Cancel a running test.""" db = self._get_db() try: test = db.query(ABTest).filter(ABTest.id == test_id).first() if not test: return {"success": False, "error": "Test not found"} test.status = "cancelled" test.ended_at = datetime.utcnow() # Cancel any scheduled posts variants = db.query(ABTestVariant).filter( ABTestVariant.test_id == test_id ).all() for variant in variants: if variant.post_id: post = db.query(Post).filter(Post.id == variant.post_id).first() if post and post.status == "scheduled": post.status = "cancelled" db.commit() return {"success": True, "message": "Test cancelled"} except Exception as e: logger.error(f"Error cancelling A/B test: {e}") db.rollback() return {"success": False, "error": str(e)} finally: db.close() # Global instance ab_testing_service = ABTestingService()