From ecc2ca73eaab07bc78d31f1dd587fb30668eb20f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Consultor=C3=ADa=20AS?= Date: Wed, 28 Jan 2026 03:10:42 +0000 Subject: [PATCH] feat: Add Analytics, Odoo Integration, A/B Testing, and Content features MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 - Analytics y Reportes: - PostMetrics and AnalyticsReport models for tracking engagement - Analytics service with dashboard stats, top posts, optimal times - 8 API endpoints at /api/analytics/* - Interactive dashboard with Chart.js charts - Celery tasks for metrics fetch (15min) and weekly reports Phase 2 - Integración Odoo: - Lead and OdooSyncLog models for CRM integration - Odoo fields added to Product and Service models - XML-RPC service for bidirectional sync - Lead management API at /api/leads/* - Leads dashboard template - Celery tasks for product/service sync and lead export Phase 3 - A/B Testing y Recycling: - ABTest, ABTestVariant, RecycledPost models - Statistical winner analysis using chi-square test - Content recycling with engagement-based scoring - APIs at /api/ab-tests/* and /api/recycling/* - Automated test evaluation and content recycling tasks Phase 4 - Thread Series y Templates: - ThreadSeries and ThreadPost models for multi-post threads - AI-powered thread generation - Enhanced ImageTemplate with HTML template support - APIs at /api/threads/* and /api/templates/* - Thread scheduling with reply chain support Co-Authored-By: Claude Opus 4.5 --- app/api/routes/ab_testing.py | 216 +++++++++++ app/api/routes/analytics.py | 257 +++++++++++++ app/api/routes/dashboard.py | 26 ++ app/api/routes/image_templates.py | 271 ++++++++++++++ app/api/routes/leads.py | 317 ++++++++++++++++ app/api/routes/odoo.py | 123 +++++++ app/api/routes/recycling.py | 181 ++++++++++ app/api/routes/threads.py | 255 +++++++++++++ app/core/config.py | 12 + app/main.py | 9 +- app/models/__init__.py | 18 +- app/models/ab_test.py | 165 +++++++++ app/models/analytics_report.py | 116 ++++++ app/models/image_template.py | 15 +- app/models/lead.py | 116 ++++++ app/models/odoo_sync_log.py | 79 ++++ app/models/post.py | 18 +- app/models/post_metrics.py | 79 ++++ app/models/product.py | 8 +- app/models/recycled_post.py | 74 ++++ app/models/service.py | 8 +- app/models/thread_series.py | 160 +++++++++ app/services/ab_testing_service.py | 396 ++++++++++++++++++++ app/services/analytics_service.py | 423 ++++++++++++++++++++++ app/services/odoo_service.py | 493 +++++++++++++++++++++++++ app/services/recycling_service.py | 327 +++++++++++++++++ app/services/thread_service.py | 393 ++++++++++++++++++++ app/worker/celery_app.py | 54 +++ app/worker/tasks.py | 461 ++++++++++++++++++++++++ dashboard/templates/analytics.html | 446 +++++++++++++++++++++++ dashboard/templates/leads.html | 557 +++++++++++++++++++++++++++++ 31 files changed, 6067 insertions(+), 6 deletions(-) create mode 100644 app/api/routes/ab_testing.py create mode 100644 app/api/routes/analytics.py create mode 100644 app/api/routes/image_templates.py create mode 100644 app/api/routes/leads.py create mode 100644 app/api/routes/odoo.py create mode 100644 app/api/routes/recycling.py create mode 100644 app/api/routes/threads.py create mode 100644 app/models/ab_test.py create mode 100644 app/models/analytics_report.py create mode 100644 app/models/lead.py create mode 100644 app/models/odoo_sync_log.py create mode 100644 app/models/post_metrics.py create mode 100644 app/models/recycled_post.py create mode 100644 app/models/thread_series.py create mode 100644 app/services/ab_testing_service.py create mode 100644 app/services/analytics_service.py create mode 100644 app/services/odoo_service.py create mode 100644 app/services/recycling_service.py create mode 100644 app/services/thread_service.py create mode 100644 dashboard/templates/analytics.html create mode 100644 dashboard/templates/leads.html diff --git a/app/api/routes/ab_testing.py b/app/api/routes/ab_testing.py new file mode 100644 index 0000000..036e50d --- /dev/null +++ b/app/api/routes/ab_testing.py @@ -0,0 +1,216 @@ +""" +API Routes for A/B Testing. +""" + +from typing import List, Optional +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.services.ab_testing_service import ab_testing_service + + +router = APIRouter() + + +class VariantCreate(BaseModel): + """Schema for creating a test variant.""" + name: str # A, B, C, etc. + content: str + hashtags: Optional[List[str]] = None + image_url: Optional[str] = None + + +class ABTestCreate(BaseModel): + """Schema for creating an A/B test.""" + name: str + platform: str + variants: List[VariantCreate] + test_type: str = "content" + duration_hours: int = 24 + min_sample_size: int = 100 + success_metric: str = "engagement_rate" + description: Optional[str] = None + + +@router.get("/") +async def list_tests( + status: Optional[str] = None, + platform: Optional[str] = None, + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db) +): + """ + List all A/B tests. + + - **status**: Filter by status (draft, running, completed, cancelled) + - **platform**: Filter by platform + """ + tests = await ab_testing_service.get_tests( + status=status, + platform=platform, + limit=limit + ) + return {"tests": tests, "count": len(tests)} + + +@router.get("/{test_id}") +async def get_test( + test_id: int, + db: Session = Depends(get_db) +): + """ + Get a specific A/B test with its variants. + """ + test = await ab_testing_service.get_test(test_id) + if not test: + raise HTTPException(status_code=404, detail="Test not found") + return test + + +@router.post("/") +async def create_test( + test_data: ABTestCreate, + db: Session = Depends(get_db) +): + """ + Create a new A/B test. + + Requires at least 2 variants. Variants should have: + - **name**: Identifier (A, B, C, etc.) + - **content**: The content to test + - **hashtags**: Optional hashtag list + - **image_url**: Optional image URL + """ + if len(test_data.variants) < 2: + raise HTTPException( + status_code=400, + detail="A/B test requires at least 2 variants" + ) + + if len(test_data.variants) > 4: + raise HTTPException( + status_code=400, + detail="Maximum 4 variants allowed per test" + ) + + try: + test = await ab_testing_service.create_test( + name=test_data.name, + platform=test_data.platform, + variants=[v.dict() for v in test_data.variants], + test_type=test_data.test_type, + duration_hours=test_data.duration_hours, + min_sample_size=test_data.min_sample_size, + success_metric=test_data.success_metric, + description=test_data.description + ) + + return { + "message": "A/B test created successfully", + "test": test.to_dict() + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/{test_id}/start") +async def start_test( + test_id: int, + db: Session = Depends(get_db) +): + """ + Start an A/B test. + + This will create and schedule posts for each variant. + """ + result = await ab_testing_service.start_test(test_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to start test") + ) + + return { + "message": "A/B test started successfully", + **result + } + + +@router.post("/{test_id}/evaluate") +async def evaluate_test( + test_id: int, + db: Session = Depends(get_db) +): + """ + Evaluate A/B test results and determine winner. + + Updates metrics from posts and calculates statistical significance. + """ + result = await ab_testing_service.evaluate_test(test_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to evaluate test") + ) + + return result + + +@router.get("/{test_id}/results") +async def get_test_results( + test_id: int, + db: Session = Depends(get_db) +): + """ + Get current results for an A/B test. + """ + # First update metrics + await ab_testing_service.update_variant_metrics(test_id) + + test = await ab_testing_service.get_test(test_id) + if not test: + raise HTTPException(status_code=404, detail="Test not found") + + # Calculate current standings + variants = test.get("variants", []) + if variants: + sorted_variants = sorted( + variants, + key=lambda v: v.get("engagement_rate", 0), + reverse=True + ) + else: + sorted_variants = [] + + return { + "test_id": test_id, + "status": test.get("status"), + "started_at": test.get("started_at"), + "winning_variant_id": test.get("winning_variant_id"), + "confidence_level": test.get("confidence_level"), + "variants": sorted_variants + } + + +@router.post("/{test_id}/cancel") +async def cancel_test( + test_id: int, + db: Session = Depends(get_db) +): + """ + Cancel a running A/B test. + """ + result = await ab_testing_service.cancel_test(test_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to cancel test") + ) + + return {"message": "Test cancelled successfully"} diff --git a/app/api/routes/analytics.py b/app/api/routes/analytics.py new file mode 100644 index 0000000..d88e7d7 --- /dev/null +++ b/app/api/routes/analytics.py @@ -0,0 +1,257 @@ +""" +API Routes for Analytics. +""" + +from datetime import date, datetime +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.services.analytics_service import analytics_service +from app.services.notifications import notification_service + + +router = APIRouter() + + +class DashboardResponse(BaseModel): + period_days: int + total_posts: int + total_impressions: int + total_engagements: int + total_likes: int + total_comments: int + total_shares: int + avg_engagement_rate: float + platform_breakdown: dict + content_breakdown: dict + pending_interactions: int + + +@router.get("/dashboard") +async def get_analytics_dashboard( + days: int = Query(30, ge=1, le=365), + platform: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get analytics dashboard data. + + - **days**: Number of days to analyze (default 30) + - **platform**: Filter by platform (optional) + """ + stats = await analytics_service.get_dashboard_stats(days=days, platform=platform) + return stats + + +@router.get("/top-posts") +async def get_top_posts( + days: int = Query(30, ge=1, le=365), + limit: int = Query(10, ge=1, le=50), + platform: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get top performing posts by engagement rate. + + - **days**: Number of days to analyze + - **limit**: Maximum number of posts to return + - **platform**: Filter by platform (optional) + """ + posts = await analytics_service.get_top_posts( + days=days, + limit=limit, + platform=platform + ) + return {"posts": posts, "count": len(posts)} + + +@router.get("/optimal-times") +async def get_optimal_times( + platform: Optional[str] = None, + days: int = Query(90, ge=30, le=365), + db: Session = Depends(get_db) +): + """ + Get optimal posting times based on historical performance. + + - **platform**: Filter by platform (optional) + - **days**: Days of historical data to analyze + """ + times = await analytics_service.get_optimal_times( + platform=platform, + days=days + ) + return { + "optimal_times": times[:20], + "analysis_period_days": days, + "platform": platform + } + + +@router.get("/reports") +async def get_reports( + report_type: str = Query("weekly", regex="^(daily|weekly|monthly)$"), + limit: int = Query(10, ge=1, le=52), + db: Session = Depends(get_db) +): + """ + Get historical analytics reports. + + - **report_type**: Type of report (daily, weekly, monthly) + - **limit**: Maximum number of reports to return + """ + reports = await analytics_service.get_reports( + report_type=report_type, + limit=limit + ) + return {"reports": reports, "count": len(reports)} + + +@router.post("/reports/generate") +async def generate_report( + report_type: str = Query("weekly", regex="^(daily|weekly|monthly)$"), + db: Session = Depends(get_db) +): + """ + Generate a new analytics report. + + - **report_type**: Type of report to generate + """ + if report_type == "weekly": + report = await analytics_service.generate_weekly_report() + return { + "message": "Reporte generado exitosamente", + "report": report.to_dict() + } + else: + raise HTTPException( + status_code=400, + detail=f"Report type '{report_type}' not implemented yet" + ) + + +@router.post("/reports/send-telegram") +async def send_report_telegram( + db: Session = Depends(get_db) +): + """ + Generate and send weekly report via Telegram. + """ + try: + # Generate report + report = await analytics_service.generate_weekly_report() + + # Send via Telegram + if report.summary_text: + success = await notification_service.notify_daily_summary({ + "custom_message": report.summary_text + }) + + if success: + return { + "message": "Reporte enviado a Telegram", + "report_id": report.id + } + else: + return { + "message": "Reporte generado pero no se pudo enviar a Telegram", + "report_id": report.id + } + else: + return { + "message": "Reporte generado sin resumen", + "report_id": report.id + } + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error generando reporte: {str(e)}" + ) + + +@router.get("/posts/{post_id}/metrics") +async def get_post_metrics( + post_id: int, + db: Session = Depends(get_db) +): + """ + Get detailed metrics for a specific post. + """ + from app.models.post import Post + from app.models.post_metrics import PostMetrics + + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + raise HTTPException(status_code=404, detail="Post not found") + + # Get metrics history + metrics_history = db.query(PostMetrics).filter( + PostMetrics.post_id == post_id + ).order_by(PostMetrics.recorded_at.desc()).limit(100).all() + + return { + "post_id": post_id, + "current_metrics": post.metrics, + "published_at": post.published_at.isoformat() if post.published_at else None, + "platforms": post.platforms, + "metrics_history": [m.to_dict() for m in metrics_history] + } + + +@router.get("/engagement-trend") +async def get_engagement_trend( + days: int = Query(30, ge=7, le=365), + platform: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get engagement trend over time for charting. + """ + from app.models.post import Post + from datetime import timedelta + + start_date = datetime.utcnow() - timedelta(days=days) + + posts_query = db.query(Post).filter( + Post.published_at >= start_date, + Post.status == "published" + ) + + if platform: + posts_query = posts_query.filter(Post.platforms.contains([platform])) + + posts = posts_query.order_by(Post.published_at).all() + + # Group by day + daily_data = {} + for post in posts: + if post.published_at: + day_key = post.published_at.strftime("%Y-%m-%d") + if day_key not in daily_data: + daily_data[day_key] = { + "date": day_key, + "posts": 0, + "impressions": 0, + "engagements": 0 + } + daily_data[day_key]["posts"] += 1 + if post.metrics: + daily_data[day_key]["impressions"] += post.metrics.get("impressions", 0) + daily_data[day_key]["engagements"] += ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + + post.metrics.get("shares", 0) + ) + + # Sort by date + trend = sorted(daily_data.values(), key=lambda x: x["date"]) + + return { + "trend": trend, + "period_days": days, + "platform": platform + } diff --git a/app/api/routes/dashboard.py b/app/api/routes/dashboard.py index 2df9b8b..ede0590 100644 --- a/app/api/routes/dashboard.py +++ b/app/api/routes/dashboard.py @@ -180,3 +180,29 @@ async def dashboard_settings(request: Request, db: Session = Depends(get_db)): "request": request, "user": user.to_dict() }) + + +@router.get("/analytics", response_class=HTMLResponse) +async def dashboard_analytics(request: Request, db: Session = Depends(get_db)): + """Página de analytics.""" + user = require_auth(request, db) + if not user: + return RedirectResponse(url="/login", status_code=302) + + return templates.TemplateResponse("analytics.html", { + "request": request, + "user": user.to_dict() + }) + + +@router.get("/leads", response_class=HTMLResponse) +async def dashboard_leads(request: Request, db: Session = Depends(get_db)): + """Página de leads.""" + user = require_auth(request, db) + if not user: + return RedirectResponse(url="/login", status_code=302) + + return templates.TemplateResponse("leads.html", { + "request": request, + "user": user.to_dict() + }) diff --git a/app/api/routes/image_templates.py b/app/api/routes/image_templates.py new file mode 100644 index 0000000..082a427 --- /dev/null +++ b/app/api/routes/image_templates.py @@ -0,0 +1,271 @@ +""" +API Routes for Image Templates. +""" + +from typing import Optional, Dict, Any +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.models.image_template import ImageTemplate + + +router = APIRouter() + + +class TemplateCreate(BaseModel): + """Schema for creating an image template.""" + name: str + description: Optional[str] = None + category: str + template_type: str = "general" + html_template: Optional[str] = None + template_file: Optional[str] = None + variables: list + design_config: Optional[Dict[str, Any]] = None + output_sizes: Optional[Dict[str, Any]] = None + + +class TemplateUpdate(BaseModel): + """Schema for updating a template.""" + name: Optional[str] = None + description: Optional[str] = None + category: Optional[str] = None + template_type: Optional[str] = None + html_template: Optional[str] = None + variables: Optional[list] = None + design_config: Optional[Dict[str, Any]] = None + output_sizes: Optional[Dict[str, Any]] = None + is_active: Optional[bool] = None + + +class PreviewRequest(BaseModel): + """Schema for previewing a template.""" + template_id: Optional[int] = None + html_template: Optional[str] = None + variables: Dict[str, str] + output_size: Optional[Dict[str, int]] = None + + +@router.get("/") +async def list_templates( + category: Optional[str] = None, + template_type: Optional[str] = None, + active_only: bool = True, + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db) +): + """ + List all image templates. + + - **category**: Filter by category (tip, producto, servicio, etc.) + - **template_type**: Filter by type (tip_card, product_card, quote, etc.) + - **active_only**: Only show active templates + """ + query = db.query(ImageTemplate) + + if category: + query = query.filter(ImageTemplate.category == category) + if template_type: + query = query.filter(ImageTemplate.template_type == template_type) + if active_only: + query = query.filter(ImageTemplate.is_active == True) + + templates = query.order_by(ImageTemplate.name).limit(limit).all() + + return { + "templates": [t.to_dict() for t in templates], + "count": len(templates) + } + + +@router.get("/{template_id}") +async def get_template( + template_id: int, + db: Session = Depends(get_db) +): + """ + Get a specific template with full details. + """ + template = db.query(ImageTemplate).filter(ImageTemplate.id == template_id).first() + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + result = template.to_dict() + # Include full HTML template + if template.html_template: + result["full_html_template"] = template.html_template + + return result + + +@router.post("/") +async def create_template( + template_data: TemplateCreate, + db: Session = Depends(get_db) +): + """ + Create a new image template. + + Templates can be defined with: + - **html_template**: Inline HTML/CSS template + - **template_file**: Path to a template file + + Variables are placeholders like: ["title", "content", "accent_color"] + """ + if not template_data.html_template and not template_data.template_file: + raise HTTPException( + status_code=400, + detail="Either html_template or template_file is required" + ) + + template = ImageTemplate( + name=template_data.name, + description=template_data.description, + category=template_data.category, + template_type=template_data.template_type, + html_template=template_data.html_template, + template_file=template_data.template_file, + variables=template_data.variables, + design_config=template_data.design_config, + output_sizes=template_data.output_sizes, + is_active=True + ) + + db.add(template) + db.commit() + db.refresh(template) + + return { + "message": "Template created successfully", + "template": template.to_dict() + } + + +@router.put("/{template_id}") +async def update_template( + template_id: int, + template_data: TemplateUpdate, + db: Session = Depends(get_db) +): + """ + Update an existing template. + """ + template = db.query(ImageTemplate).filter(ImageTemplate.id == template_id).first() + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + update_data = template_data.dict(exclude_unset=True) + for field, value in update_data.items(): + if value is not None: + setattr(template, field, value) + + db.commit() + db.refresh(template) + + return { + "message": "Template updated successfully", + "template": template.to_dict() + } + + +@router.delete("/{template_id}") +async def delete_template( + template_id: int, + db: Session = Depends(get_db) +): + """ + Delete a template. + """ + template = db.query(ImageTemplate).filter(ImageTemplate.id == template_id).first() + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + db.delete(template) + db.commit() + + return {"message": "Template deleted successfully"} + + +@router.post("/preview") +async def preview_template( + preview_data: PreviewRequest, + db: Session = Depends(get_db) +): + """ + Generate a preview of a template with variables. + + You can either provide: + - **template_id**: To use an existing template + - **html_template**: To preview custom HTML + + The preview returns the rendered HTML (image generation requires separate processing). + """ + html_template = preview_data.html_template + + if preview_data.template_id: + template = db.query(ImageTemplate).filter( + ImageTemplate.id == preview_data.template_id + ).first() + + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + html_template = template.html_template + + if not html_template: + raise HTTPException( + status_code=400, + detail="No HTML template available" + ) + + # Simple variable substitution + rendered_html = html_template + for var_name, var_value in preview_data.variables.items(): + rendered_html = rendered_html.replace(f"{{{{{var_name}}}}}", str(var_value)) + + # Get output size + output_size = preview_data.output_size or {"width": 1080, "height": 1080} + + return { + "rendered_html": rendered_html, + "output_size": output_size, + "variables_used": list(preview_data.variables.keys()) + } + + +@router.get("/categories/list") +async def list_categories( + db: Session = Depends(get_db) +): + """ + Get list of available template categories. + """ + from sqlalchemy import distinct + + categories = db.query(distinct(ImageTemplate.category)).filter( + ImageTemplate.is_active == True + ).all() + + return { + "categories": [c[0] for c in categories if c[0]] + } + + +@router.get("/types/list") +async def list_template_types( + db: Session = Depends(get_db) +): + """ + Get list of available template types. + """ + from sqlalchemy import distinct + + types = db.query(distinct(ImageTemplate.template_type)).filter( + ImageTemplate.is_active == True + ).all() + + return { + "types": [t[0] for t in types if t[0]] + } diff --git a/app/api/routes/leads.py b/app/api/routes/leads.py new file mode 100644 index 0000000..bb7863e --- /dev/null +++ b/app/api/routes/leads.py @@ -0,0 +1,317 @@ +""" +API Routes for Leads Management. +""" + +from datetime import datetime +from typing import Optional, List +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.models.lead import Lead +from app.models.interaction import Interaction +from app.services.odoo_service import odoo_service + + +router = APIRouter() + + +class LeadCreate(BaseModel): + """Schema for creating a lead.""" + name: Optional[str] = None + email: Optional[str] = None + phone: Optional[str] = None + company: Optional[str] = None + platform: str + username: Optional[str] = None + profile_url: Optional[str] = None + interest: Optional[str] = None + notes: Optional[str] = None + priority: str = "medium" + products_interested: Optional[List[int]] = None + services_interested: Optional[List[int]] = None + tags: Optional[List[str]] = None + + +class LeadUpdate(BaseModel): + """Schema for updating a lead.""" + name: Optional[str] = None + email: Optional[str] = None + phone: Optional[str] = None + company: Optional[str] = None + interest: Optional[str] = None + notes: Optional[str] = None + status: Optional[str] = None + priority: Optional[str] = None + assigned_to: Optional[str] = None + products_interested: Optional[List[int]] = None + services_interested: Optional[List[int]] = None + tags: Optional[List[str]] = None + + +@router.get("/") +async def list_leads( + status: Optional[str] = None, + priority: Optional[str] = None, + platform: Optional[str] = None, + synced: Optional[bool] = None, + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + db: Session = Depends(get_db) +): + """ + List all leads with optional filters. + + - **status**: Filter by status (new, contacted, qualified, proposal, won, lost) + - **priority**: Filter by priority (low, medium, high, urgent) + - **platform**: Filter by source platform + - **synced**: Filter by Odoo sync status + """ + query = db.query(Lead) + + if status: + query = query.filter(Lead.status == status) + if priority: + query = query.filter(Lead.priority == priority) + if platform: + query = query.filter(Lead.platform == platform) + if synced is not None: + query = query.filter(Lead.synced_to_odoo == synced) + + total = query.count() + leads = query.order_by(Lead.created_at.desc()).offset(offset).limit(limit).all() + + return { + "leads": [lead.to_dict() for lead in leads], + "total": total, + "limit": limit, + "offset": offset + } + + +@router.get("/{lead_id}") +async def get_lead( + lead_id: int, + db: Session = Depends(get_db) +): + """ + Get a specific lead by ID. + """ + lead = db.query(Lead).filter(Lead.id == lead_id).first() + if not lead: + raise HTTPException(status_code=404, detail="Lead not found") + + return lead.to_dict() + + +@router.post("/") +async def create_lead( + lead_data: LeadCreate, + db: Session = Depends(get_db) +): + """ + Create a new lead manually. + """ + lead = Lead( + name=lead_data.name, + email=lead_data.email, + phone=lead_data.phone, + company=lead_data.company, + platform=lead_data.platform, + username=lead_data.username, + profile_url=lead_data.profile_url, + interest=lead_data.interest, + notes=lead_data.notes, + priority=lead_data.priority, + products_interested=lead_data.products_interested, + services_interested=lead_data.services_interested, + tags=lead_data.tags, + status="new" + ) + + db.add(lead) + db.commit() + db.refresh(lead) + + return { + "message": "Lead created successfully", + "lead": lead.to_dict() + } + + +@router.post("/from-interaction/{interaction_id}") +async def create_lead_from_interaction( + interaction_id: int, + interest: Optional[str] = None, + priority: str = "medium", + notes: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Create a lead from an existing interaction. + + - **interaction_id**: ID of the interaction to convert + - **interest**: What the lead is interested in + - **priority**: Lead priority (low, medium, high, urgent) + - **notes**: Additional notes + """ + # Get interaction + interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first() + if not interaction: + raise HTTPException(status_code=404, detail="Interaction not found") + + # Check if lead already exists for this interaction + existing = db.query(Lead).filter(Lead.interaction_id == interaction_id).first() + if existing: + raise HTTPException( + status_code=400, + detail="Lead already exists for this interaction" + ) + + # Create lead + lead = Lead( + interaction_id=interaction_id, + platform=interaction.platform, + username=interaction.author_username, + profile_url=interaction.author_profile_url, + source_content=interaction.content, + interest=interest or f"Interest shown in post {interaction.post_id}", + notes=notes, + priority=priority, + status="new" + ) + + # Mark interaction as potential lead + interaction.is_potential_lead = True + + db.add(lead) + db.commit() + db.refresh(lead) + + return { + "message": "Lead created from interaction", + "lead": lead.to_dict() + } + + +@router.put("/{lead_id}") +async def update_lead( + lead_id: int, + lead_data: LeadUpdate, + db: Session = Depends(get_db) +): + """ + Update an existing lead. + """ + lead = db.query(Lead).filter(Lead.id == lead_id).first() + if not lead: + raise HTTPException(status_code=404, detail="Lead not found") + + # Update only provided fields + update_data = lead_data.dict(exclude_unset=True) + for field, value in update_data.items(): + if value is not None: + setattr(lead, field, value) + + lead.updated_at = datetime.utcnow() + + # If status changed to contacted, update last_contacted_at + if lead_data.status == "contacted": + lead.last_contacted_at = datetime.utcnow() + + db.commit() + db.refresh(lead) + + return { + "message": "Lead updated successfully", + "lead": lead.to_dict() + } + + +@router.delete("/{lead_id}") +async def delete_lead( + lead_id: int, + db: Session = Depends(get_db) +): + """ + Delete a lead. + """ + lead = db.query(Lead).filter(Lead.id == lead_id).first() + if not lead: + raise HTTPException(status_code=404, detail="Lead not found") + + db.delete(lead) + db.commit() + + return {"message": "Lead deleted successfully"} + + +@router.post("/{lead_id}/sync-odoo") +async def sync_lead_to_odoo( + lead_id: int, + db: Session = Depends(get_db) +): + """ + Sync a specific lead to Odoo CRM. + """ + lead = db.query(Lead).filter(Lead.id == lead_id).first() + if not lead: + raise HTTPException(status_code=404, detail="Lead not found") + + if lead.synced_to_odoo: + return { + "message": "Lead already synced to Odoo", + "odoo_lead_id": lead.odoo_lead_id + } + + result = await odoo_service.create_lead(lead) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "Sync failed") + ) + + lead.odoo_lead_id = result["odoo_lead_id"] + lead.synced_to_odoo = True + lead.odoo_synced_at = datetime.utcnow() + db.commit() + + return { + "message": "Lead synced to Odoo successfully", + "odoo_lead_id": result["odoo_lead_id"] + } + + +@router.get("/stats/summary") +async def get_leads_summary( + db: Session = Depends(get_db) +): + """ + Get leads summary statistics. + """ + from sqlalchemy import func + + total = db.query(Lead).count() + by_status = db.query( + Lead.status, func.count(Lead.id) + ).group_by(Lead.status).all() + + by_platform = db.query( + Lead.platform, func.count(Lead.id) + ).group_by(Lead.platform).all() + + by_priority = db.query( + Lead.priority, func.count(Lead.id) + ).group_by(Lead.priority).all() + + unsynced = db.query(Lead).filter(Lead.synced_to_odoo == False).count() + + return { + "total": total, + "by_status": {status: count for status, count in by_status}, + "by_platform": {platform: count for platform, count in by_platform}, + "by_priority": {priority: count for priority, count in by_priority}, + "unsynced_to_odoo": unsynced + } diff --git a/app/api/routes/odoo.py b/app/api/routes/odoo.py new file mode 100644 index 0000000..e99b06c --- /dev/null +++ b/app/api/routes/odoo.py @@ -0,0 +1,123 @@ +""" +API Routes for Odoo Integration. +""" + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.orm import Session + +from app.core.database import get_db +from app.services.odoo_service import odoo_service + + +router = APIRouter() + + +@router.get("/status") +async def get_odoo_status(db: Session = Depends(get_db)): + """ + Get Odoo connection status. + """ + status = await odoo_service.test_connection() + return status + + +@router.post("/sync/products") +async def sync_products( + limit: int = 100, + db: Session = Depends(get_db) +): + """ + Sync products from Odoo to local database. + + - **limit**: Maximum number of products to sync + """ + result = await odoo_service.sync_products(limit=limit) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "Sync failed") + ) + + return { + "message": "Products synced successfully", + **result + } + + +@router.post("/sync/services") +async def sync_services( + limit: int = 100, + db: Session = Depends(get_db) +): + """ + Sync services from Odoo to local database. + + - **limit**: Maximum number of services to sync + """ + result = await odoo_service.sync_services(limit=limit) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "Sync failed") + ) + + return { + "message": "Services synced successfully", + **result + } + + +@router.post("/sync/leads") +async def export_leads(db: Session = Depends(get_db)): + """ + Export unsynced leads to Odoo CRM. + """ + result = await odoo_service.export_leads_to_odoo() + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "Export failed") + ) + + return { + "message": "Leads exported successfully", + **result + } + + +@router.get("/sync/logs") +async def get_sync_logs( + limit: int = 20, + db: Session = Depends(get_db) +): + """ + Get Odoo sync history. + + - **limit**: Maximum number of logs to return + """ + logs = await odoo_service.get_sync_logs(limit=limit) + return {"logs": logs, "count": len(logs)} + + +@router.get("/sales") +async def get_sales_summary( + days: int = 30, + db: Session = Depends(get_db) +): + """ + Get sales summary from Odoo. + + - **days**: Number of days to look back + """ + result = await odoo_service.get_sales_summary(days=days) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "Failed to get sales data") + ) + + return result diff --git a/app/api/routes/recycling.py b/app/api/routes/recycling.py new file mode 100644 index 0000000..511b05c --- /dev/null +++ b/app/api/routes/recycling.py @@ -0,0 +1,181 @@ +""" +API Routes for Content Recycling. +""" + +from datetime import datetime +from typing import List, Optional +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.services.recycling_service import recycling_service + + +router = APIRouter() + + +class RecycleRequest(BaseModel): + """Schema for recycling a post.""" + content: Optional[str] = None + hashtags: Optional[List[str]] = None + image_url: Optional[str] = None + scheduled_for: Optional[datetime] = None + platforms: Optional[List[str]] = None + reason: str = "manual" + + +@router.get("/candidates") +async def get_recyclable_candidates( + platform: Optional[str] = None, + content_type: Optional[str] = None, + min_engagement_rate: float = Query(2.0, ge=0), + min_days: int = Query(30, ge=7, le=365), + limit: int = Query(20, ge=1, le=50), + db: Session = Depends(get_db) +): + """ + Get posts that are good candidates for recycling. + + - **platform**: Filter by platform + - **content_type**: Filter by content type + - **min_engagement_rate**: Minimum engagement rate (default 2.0%) + - **min_days**: Minimum days since original publish (default 30) + """ + candidates = await recycling_service.find_recyclable_posts( + platform=platform, + content_type=content_type, + min_engagement_rate=min_engagement_rate, + min_days_since_publish=min_days, + limit=limit + ) + + return { + "candidates": candidates, + "count": len(candidates), + "filters": { + "platform": platform, + "content_type": content_type, + "min_engagement_rate": min_engagement_rate, + "min_days": min_days + } + } + + +@router.post("/{post_id}") +async def recycle_post( + post_id: int, + recycle_data: RecycleRequest = None, + db: Session = Depends(get_db) +): + """ + Recycle a post by creating a new version. + + You can optionally modify: + - **content**: New content text + - **hashtags**: New hashtag list + - **image_url**: New image URL + - **scheduled_for**: When to publish (default: 1 hour from now) + - **platforms**: Override platforms + - **reason**: Reason for recycling (manual, high_performer, evergreen, seasonal) + """ + modifications = None + scheduled_for = None + platforms = None + reason = "manual" + + if recycle_data: + modifications = {} + if recycle_data.content: + modifications["content"] = recycle_data.content + if recycle_data.hashtags: + modifications["hashtags"] = recycle_data.hashtags + if recycle_data.image_url: + modifications["image_url"] = recycle_data.image_url + + scheduled_for = recycle_data.scheduled_for + platforms = recycle_data.platforms + reason = recycle_data.reason + + result = await recycling_service.recycle_post( + post_id=post_id, + modifications=modifications if modifications else None, + scheduled_for=scheduled_for, + platforms=platforms, + reason=reason + ) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to recycle post") + ) + + return { + "message": "Post recycled successfully", + **result + } + + +@router.post("/auto") +async def auto_recycle_posts( + platform: Optional[str] = None, + count: int = Query(1, ge=1, le=5), + min_engagement_rate: float = Query(2.0, ge=0), + db: Session = Depends(get_db) +): + """ + Automatically recycle top-performing posts. + + - **platform**: Filter by platform + - **count**: Number of posts to recycle (max 5) + - **min_engagement_rate**: Minimum engagement rate threshold + """ + result = await recycling_service.auto_recycle( + platform=platform, + count=count, + min_engagement_rate=min_engagement_rate + ) + + return result + + +@router.get("/history") +async def get_recycling_history( + original_post_id: Optional[int] = None, + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db) +): + """ + Get recycling history. + + - **original_post_id**: Filter by original post + """ + history = await recycling_service.get_recycling_history( + original_post_id=original_post_id, + limit=limit + ) + + return { + "history": history, + "count": len(history) + } + + +@router.post("/{post_id}/disable") +async def disable_recycling( + post_id: int, + db: Session = Depends(get_db) +): + """ + Mark a post as not eligible for recycling. + """ + result = await recycling_service.mark_post_not_recyclable(post_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to disable recycling") + ) + + return result diff --git a/app/api/routes/threads.py b/app/api/routes/threads.py new file mode 100644 index 0000000..a70ffbf --- /dev/null +++ b/app/api/routes/threads.py @@ -0,0 +1,255 @@ +""" +API Routes for Thread Series Management. +""" + +from datetime import datetime +from typing import List, Optional +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.services.thread_service import thread_service + + +router = APIRouter() + + +class ThreadPostCreate(BaseModel): + """Schema for a post in a thread.""" + content: str + image_url: Optional[str] = None + + +class ThreadSeriesCreate(BaseModel): + """Schema for creating a thread series.""" + name: str + platform: str + posts: List[ThreadPostCreate] + description: Optional[str] = None + topic: Optional[str] = None + schedule_type: str = "sequential" + interval_minutes: int = 5 + hashtags: Optional[List[str]] = None + + +class ThreadGenerateRequest(BaseModel): + """Schema for AI thread generation.""" + topic: str + platform: str + num_posts: int = 5 + style: str = "educational" + name: Optional[str] = None + + +class ScheduleRequest(BaseModel): + """Schema for scheduling a series.""" + start_time: Optional[datetime] = None + + +@router.get("/") +async def list_thread_series( + status: Optional[str] = None, + platform: Optional[str] = None, + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db) +): + """ + List all thread series. + + - **status**: Filter by status (draft, scheduled, publishing, completed) + - **platform**: Filter by platform + """ + series_list = await thread_service.get_series_list( + status=status, + platform=platform, + limit=limit + ) + return {"series": series_list, "count": len(series_list)} + + +@router.get("/{series_id}") +async def get_thread_series( + series_id: int, + db: Session = Depends(get_db) +): + """ + Get a specific thread series with all its posts. + """ + series = await thread_service.get_series(series_id) + if not series: + raise HTTPException(status_code=404, detail="Series not found") + return series + + +@router.post("/") +async def create_thread_series( + series_data: ThreadSeriesCreate, + db: Session = Depends(get_db) +): + """ + Create a new thread series manually. + + - **name**: Series name + - **platform**: Target platform (x, threads) + - **posts**: List of posts with content and optional image_url + - **schedule_type**: "sequential" (posts one after another) or "timed" + - **interval_minutes**: Minutes between posts (default 5) + """ + if len(series_data.posts) < 2: + raise HTTPException( + status_code=400, + detail="Thread series requires at least 2 posts" + ) + + if len(series_data.posts) > 20: + raise HTTPException( + status_code=400, + detail="Maximum 20 posts per thread series" + ) + + try: + series = await thread_service.create_series( + name=series_data.name, + platform=series_data.platform, + posts_content=[p.dict() for p in series_data.posts], + description=series_data.description, + topic=series_data.topic, + schedule_type=series_data.schedule_type, + interval_minutes=series_data.interval_minutes, + hashtags=series_data.hashtags + ) + + return { + "message": "Thread series created successfully", + "series": series.to_dict() + } + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/generate") +async def generate_thread_with_ai( + gen_data: ThreadGenerateRequest, + db: Session = Depends(get_db) +): + """ + Generate a thread series using AI. + + - **topic**: The topic to generate content about + - **platform**: Target platform + - **num_posts**: Number of posts (2-10) + - **style**: Content style (educational, storytelling, tips) + """ + if gen_data.num_posts < 2 or gen_data.num_posts > 10: + raise HTTPException( + status_code=400, + detail="Number of posts must be between 2 and 10" + ) + + result = await thread_service.generate_thread_with_ai( + topic=gen_data.topic, + platform=gen_data.platform, + num_posts=gen_data.num_posts, + style=gen_data.style + ) + + if not result.get("success"): + raise HTTPException( + status_code=500, + detail=result.get("error", "AI generation failed") + ) + + # If a name was provided, also create the series + if gen_data.name and result.get("posts"): + posts_content = [{"content": p["content"]} for p in result["posts"]] + + series = await thread_service.create_series( + name=gen_data.name, + platform=gen_data.platform, + posts_content=posts_content, + topic=gen_data.topic, + ai_generated=True, + generation_prompt=f"Topic: {gen_data.topic}, Style: {gen_data.style}" + ) + + return { + "message": "Thread generated and saved", + "series": series.to_dict(), + "generated_posts": result["posts"] + } + + return { + "message": "Thread generated (not saved)", + "generated_posts": result["posts"], + "count": result["count"] + } + + +@router.post("/{series_id}/schedule") +async def schedule_series( + series_id: int, + schedule_data: ScheduleRequest = None, + db: Session = Depends(get_db) +): + """ + Schedule a thread series for publishing. + + - **start_time**: When to start publishing (optional, defaults to now + 5 min) + """ + start_time = schedule_data.start_time if schedule_data else None + + result = await thread_service.schedule_series( + series_id=series_id, + start_time=start_time + ) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to schedule") + ) + + return { + "message": "Series scheduled successfully", + **result + } + + +@router.post("/{series_id}/publish-next") +async def publish_next_post( + series_id: int, + db: Session = Depends(get_db) +): + """ + Manually trigger publishing the next post in a series. + """ + result = await thread_service.publish_next_post(series_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to publish") + ) + + return result + + +@router.post("/{series_id}/cancel") +async def cancel_series( + series_id: int, + db: Session = Depends(get_db) +): + """ + Cancel a thread series and its scheduled posts. + """ + result = await thread_service.cancel_series(series_id) + + if not result.get("success"): + raise HTTPException( + status_code=400, + detail=result.get("error", "Failed to cancel") + ) + + return {"message": "Series cancelled successfully"} diff --git a/app/core/config.py b/app/core/config.py index de0f58a..4b8bd9c 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -51,6 +51,18 @@ class Settings(BaseSettings): TELEGRAM_BOT_TOKEN: Optional[str] = None TELEGRAM_CHAT_ID: Optional[str] = None + # Analytics + ANALYTICS_FETCH_INTERVAL: int = 15 # minutes + TELEGRAM_REPORT_ENABLED: bool = True + TELEGRAM_REPORT_DAY: int = 6 # Sunday (0=Monday, 6=Sunday) + + # Odoo Integration + ODOO_URL: Optional[str] = None # e.g., "https://mycompany.odoo.com" + ODOO_DB: Optional[str] = None + ODOO_USERNAME: Optional[str] = None + ODOO_PASSWORD: Optional[str] = None # API key or password + ODOO_SYNC_ENABLED: bool = False + class Config: env_file = ".env" case_sensitive = True diff --git a/app/main.py b/app/main.py index 7d64bea..a896cc0 100644 --- a/app/main.py +++ b/app/main.py @@ -11,7 +11,7 @@ from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware -from app.api.routes import posts, products, services, calendar, dashboard, interactions, auth, publish, generate, notifications +from app.api.routes import posts, products, services, calendar, dashboard, interactions, auth, publish, generate, notifications, analytics, odoo, leads, ab_testing, recycling, threads, image_templates from app.core.config import settings from app.core.database import engine from app.models import Base @@ -65,6 +65,13 @@ app.include_router(interactions.router, prefix="/api/interactions", tags=["Inter app.include_router(publish.router, prefix="/api/publish", tags=["Publish"]) app.include_router(generate.router, prefix="/api/generate", tags=["AI Generation"]) app.include_router(notifications.router, prefix="/api/notifications", tags=["Notifications"]) +app.include_router(analytics.router, prefix="/api/analytics", tags=["Analytics"]) +app.include_router(odoo.router, prefix="/api/odoo", tags=["Odoo"]) +app.include_router(leads.router, prefix="/api/leads", tags=["Leads"]) +app.include_router(ab_testing.router, prefix="/api/ab-tests", tags=["A/B Testing"]) +app.include_router(recycling.router, prefix="/api/recycling", tags=["Recycling"]) +app.include_router(threads.router, prefix="/api/threads", tags=["Thread Series"]) +app.include_router(image_templates.router, prefix="/api/templates", tags=["Image Templates"]) @app.get("/api/health") diff --git a/app/models/__init__.py b/app/models/__init__.py index 090cae0..fb43302 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -12,6 +12,13 @@ from app.models.post import Post from app.models.content_calendar import ContentCalendar from app.models.image_template import ImageTemplate from app.models.interaction import Interaction +from app.models.post_metrics import PostMetrics +from app.models.analytics_report import AnalyticsReport +from app.models.lead import Lead +from app.models.odoo_sync_log import OdooSyncLog +from app.models.ab_test import ABTest, ABTestVariant +from app.models.recycled_post import RecycledPost +from app.models.thread_series import ThreadSeries, ThreadPost __all__ = [ "Base", @@ -22,5 +29,14 @@ __all__ = [ "Post", "ContentCalendar", "ImageTemplate", - "Interaction" + "Interaction", + "PostMetrics", + "AnalyticsReport", + "Lead", + "OdooSyncLog", + "ABTest", + "ABTestVariant", + "RecycledPost", + "ThreadSeries", + "ThreadPost" ] diff --git a/app/models/ab_test.py b/app/models/ab_test.py new file mode 100644 index 0000000..092ef69 --- /dev/null +++ b/app/models/ab_test.py @@ -0,0 +1,165 @@ +""" +A/B Test Models - Test different content variants to optimize engagement. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, Float, Boolean, DateTime, ForeignKey, JSON, Enum +from sqlalchemy.orm import relationship +import enum + +from app.core.database import Base + + +class ABTestStatus(enum.Enum): + """Status options for A/B tests.""" + DRAFT = "draft" + RUNNING = "running" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class ABTestType(enum.Enum): + """Types of A/B tests.""" + CONTENT = "content" # Test different content/copy + TIMING = "timing" # Test different posting times + HASHTAGS = "hashtags" # Test different hashtag sets + IMAGE = "image" # Test different images + + +class ABTest(Base): + """ + A/B Test model for testing content variations. + """ + __tablename__ = "ab_tests" + + id = Column(Integer, primary_key=True, index=True) + + # Test info + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + test_type = Column(String(50), nullable=False, default="content") + + # Platform targeting + platform = Column(String(50), nullable=False, index=True) + # x, threads, instagram, facebook + + # Status + status = Column(String(20), default="draft", index=True) + + # Timing + started_at = Column(DateTime, nullable=True) + ended_at = Column(DateTime, nullable=True) + duration_hours = Column(Integer, default=24) # How long to run the test + + # Results + winning_variant_id = Column(Integer, ForeignKey("ab_test_variants.id"), nullable=True) + confidence_level = Column(Float, nullable=True) # Statistical confidence + + # Configuration + min_sample_size = Column(Integer, default=100) # Min impressions per variant + success_metric = Column(String(50), default="engagement_rate") + # Options: engagement_rate, likes, comments, shares, clicks + + # Metadata + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + created_by = Column(String(100), nullable=True) + + # Relationships + variants = relationship("ABTestVariant", back_populates="test", foreign_keys="ABTestVariant.test_id") + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "name": self.name, + "description": self.description, + "test_type": self.test_type, + "platform": self.platform, + "status": self.status, + "started_at": self.started_at.isoformat() if self.started_at else None, + "ended_at": self.ended_at.isoformat() if self.ended_at else None, + "duration_hours": self.duration_hours, + "winning_variant_id": self.winning_variant_id, + "confidence_level": self.confidence_level, + "min_sample_size": self.min_sample_size, + "success_metric": self.success_metric, + "variants": [v.to_dict() for v in self.variants] if self.variants else [], + "created_at": self.created_at.isoformat() if self.created_at else None + } + + +class ABTestVariant(Base): + """ + Variant within an A/B test. + """ + __tablename__ = "ab_test_variants" + + id = Column(Integer, primary_key=True, index=True) + + # Parent test + test_id = Column(Integer, ForeignKey("ab_tests.id", ondelete="CASCADE"), nullable=False) + + # Variant info + name = Column(String(10), nullable=False) # A, B, C, etc. + content = Column(Text, nullable=False) + hashtags = Column(JSON, nullable=True) + image_url = Column(String(500), nullable=True) + + # Associated post (once published) + post_id = Column(Integer, ForeignKey("posts.id"), nullable=True) + + # Metrics (populated after publishing) + impressions = Column(Integer, default=0) + reach = Column(Integer, default=0) + likes = Column(Integer, default=0) + comments = Column(Integer, default=0) + shares = Column(Integer, default=0) + clicks = Column(Integer, default=0) + engagement_rate = Column(Float, default=0.0) + + # Status + is_winner = Column(Boolean, default=False) + published_at = Column(DateTime, nullable=True) + + # Timestamps + created_at = Column(DateTime, default=datetime.utcnow) + + # Relationships + test = relationship("ABTest", back_populates="variants", foreign_keys=[test_id]) + post = relationship("Post", backref="ab_test_variant") + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "test_id": self.test_id, + "name": self.name, + "content": self.content[:100] + "..." if len(self.content) > 100 else self.content, + "full_content": self.content, + "hashtags": self.hashtags, + "image_url": self.image_url, + "post_id": self.post_id, + "impressions": self.impressions, + "reach": self.reach, + "likes": self.likes, + "comments": self.comments, + "shares": self.shares, + "clicks": self.clicks, + "engagement_rate": round(self.engagement_rate, 2), + "is_winner": self.is_winner, + "published_at": self.published_at.isoformat() if self.published_at else None + } + + def calculate_engagement_rate(self): + """Calculate engagement rate for this variant.""" + if self.impressions > 0: + total_engagements = self.likes + self.comments + self.shares + self.engagement_rate = (total_engagements / self.impressions) * 100 + return self.engagement_rate diff --git a/app/models/analytics_report.py b/app/models/analytics_report.py new file mode 100644 index 0000000..cce963c --- /dev/null +++ b/app/models/analytics_report.py @@ -0,0 +1,116 @@ +""" +Analytics Report Model - Aggregated analytics snapshots. +""" + +from datetime import datetime, date +from sqlalchemy import Column, Integer, Float, String, DateTime, Date, JSON, Text + +from app.core.database import Base + + +class AnalyticsReport(Base): + """ + Stores aggregated analytics reports (daily, weekly, monthly). + """ + __tablename__ = "analytics_reports" + + id = Column(Integer, primary_key=True, index=True) + + # Report type and period + report_type = Column(String(20), nullable=False, index=True) # daily, weekly, monthly + period_start = Column(Date, nullable=False, index=True) + period_end = Column(Date, nullable=False) + platform = Column(String(50), nullable=True) # null = all platforms + + # Aggregated metrics + total_posts = Column(Integer, default=0) + total_impressions = Column(Integer, default=0) + total_reach = Column(Integer, default=0) + total_engagements = Column(Integer, default=0) + total_likes = Column(Integer, default=0) + total_comments = Column(Integer, default=0) + total_shares = Column(Integer, default=0) + + # Calculated averages + avg_engagement_rate = Column(Float, default=0.0) + avg_impressions_per_post = Column(Float, default=0.0) + avg_engagements_per_post = Column(Float, default=0.0) + + # Comparison with previous period + posts_change_pct = Column(Float, nullable=True) + engagement_change_pct = Column(Float, nullable=True) + impressions_change_pct = Column(Float, nullable=True) + + # Top performing data (JSON) + top_posts = Column(JSON, nullable=True) + # [{"post_id": 1, "content": "...", "engagement_rate": 5.2, "platform": "x"}] + + best_times = Column(JSON, nullable=True) + # [{"day": 1, "hour": 12, "avg_engagement": 4.5}] + + content_performance = Column(JSON, nullable=True) + # {"tip": {"posts": 10, "avg_engagement": 3.2}, "product": {...}} + + platform_breakdown = Column(JSON, nullable=True) + # {"x": {"posts": 20, "engagement": 150}, "threads": {...}} + + # Report content for Telegram + summary_text = Column(Text, nullable=True) + + # Metadata + generated_at = Column(DateTime, default=datetime.utcnow) + sent_to_telegram = Column(DateTime, nullable=True) + + def to_dict(self): + return { + "id": self.id, + "report_type": self.report_type, + "period_start": self.period_start.isoformat() if self.period_start else None, + "period_end": self.period_end.isoformat() if self.period_end else None, + "platform": self.platform, + "total_posts": self.total_posts, + "total_impressions": self.total_impressions, + "total_reach": self.total_reach, + "total_engagements": self.total_engagements, + "avg_engagement_rate": round(self.avg_engagement_rate, 2), + "avg_impressions_per_post": round(self.avg_impressions_per_post, 1), + "posts_change_pct": round(self.posts_change_pct, 1) if self.posts_change_pct else None, + "engagement_change_pct": round(self.engagement_change_pct, 1) if self.engagement_change_pct else None, + "top_posts": self.top_posts, + "best_times": self.best_times, + "content_performance": self.content_performance, + "platform_breakdown": self.platform_breakdown, + "generated_at": self.generated_at.isoformat() if self.generated_at else None + } + + def generate_telegram_summary(self) -> str: + """Generate formatted summary for Telegram.""" + lines = [ + f"📊 *Reporte {self.report_type.title()}*", + f"📅 {self.period_start} - {self.period_end}", + "", + f"📝 Posts publicados: *{self.total_posts}*", + f"👁 Impresiones: *{self.total_impressions:,}*", + f"💬 Interacciones: *{self.total_engagements:,}*", + f"📈 Engagement rate: *{self.avg_engagement_rate:.2f}%*", + ] + + if self.engagement_change_pct is not None: + emoji = "📈" if self.engagement_change_pct > 0 else "📉" + lines.append(f"{emoji} vs anterior: *{self.engagement_change_pct:+.1f}%*") + + if self.platform_breakdown: + lines.append("") + lines.append("*Por plataforma:*") + for platform, data in self.platform_breakdown.items(): + lines.append(f" • {platform}: {data.get('posts', 0)} posts, {data.get('engagements', 0)} interacciones") + + if self.top_posts and len(self.top_posts) > 0: + lines.append("") + lines.append("*Top 3 posts:*") + for i, post in enumerate(self.top_posts[:3], 1): + content = post.get('content', '')[:50] + "..." if len(post.get('content', '')) > 50 else post.get('content', '') + lines.append(f" {i}. {content} ({post.get('engagement_rate', 0):.1f}%)") + + self.summary_text = "\n".join(lines) + return self.summary_text diff --git a/app/models/image_template.py b/app/models/image_template.py index c6be86e..f2902c6 100644 --- a/app/models/image_template.py +++ b/app/models/image_template.py @@ -25,7 +25,17 @@ class ImageTemplate(Base): # Categorías: tip, producto, servicio, promocion, etc. # Archivo de plantilla - template_file = Column(String(255), nullable=False) # Ruta al archivo HTML/template + template_file = Column(String(255), nullable=True) # Ruta al archivo HTML/template + + # HTML template content (for inline templates) + html_template = Column(Text, nullable=True) + + # Template type + template_type = Column(String(50), default="general") + # Types: tip_card, product_card, quote, promo, announcement + + # Preview image + preview_url = Column(String(500), nullable=True) # Variables que acepta la plantilla variables = Column(ARRAY(String), nullable=False) @@ -67,6 +77,9 @@ class ImageTemplate(Base): "description": self.description, "category": self.category, "template_file": self.template_file, + "html_template": self.html_template[:100] + "..." if self.html_template and len(self.html_template) > 100 else self.html_template, + "template_type": self.template_type, + "preview_url": self.preview_url, "variables": self.variables, "design_config": self.design_config, "output_sizes": self.output_sizes, diff --git a/app/models/lead.py b/app/models/lead.py new file mode 100644 index 0000000..e12dd3c --- /dev/null +++ b/app/models/lead.py @@ -0,0 +1,116 @@ +""" +Lead Model - Leads generated from social media interactions. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, JSON, Enum +from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.orm import relationship +import enum + +from app.core.database import Base + + +class LeadStatus(enum.Enum): + """Lead status options.""" + NEW = "new" + CONTACTED = "contacted" + QUALIFIED = "qualified" + PROPOSAL = "proposal" + WON = "won" + LOST = "lost" + + +class LeadPriority(enum.Enum): + """Lead priority levels.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + URGENT = "urgent" + + +class Lead(Base): + """ + Lead model for tracking potential customers from social media. + """ + __tablename__ = "leads" + + id = Column(Integer, primary_key=True, index=True) + + # Source information + interaction_id = Column(Integer, ForeignKey("interactions.id"), nullable=True) + platform = Column(String(50), nullable=False, index=True) + + # Contact information + name = Column(String(255), nullable=True) + email = Column(String(255), nullable=True, index=True) + phone = Column(String(50), nullable=True) + company = Column(String(255), nullable=True) + + # Social media info + username = Column(String(100), nullable=True) + profile_url = Column(String(500), nullable=True) + + # Interest and context + interest = Column(Text, nullable=True) # What they're interested in + source_content = Column(Text, nullable=True) # Original interaction content + notes = Column(Text, nullable=True) + + # Products/services interest + products_interested = Column(ARRAY(Integer), nullable=True) # Product IDs + services_interested = Column(ARRAY(Integer), nullable=True) # Service IDs + + # Status tracking + status = Column(String(20), default="new", index=True) + priority = Column(String(20), default="medium", index=True) + + # Assignment + assigned_to = Column(String(100), nullable=True) + + # Odoo integration + odoo_lead_id = Column(Integer, nullable=True, unique=True, index=True) + synced_to_odoo = Column(Boolean, default=False) + odoo_synced_at = Column(DateTime, nullable=True) + + # Metadata + tags = Column(ARRAY(String), nullable=True) + custom_fields = Column(JSON, nullable=True) + + # Timestamps + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + last_contacted_at = Column(DateTime, nullable=True) + + # Relationship + interaction = relationship("Interaction", backref="leads") + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "interaction_id": self.interaction_id, + "platform": self.platform, + "name": self.name, + "email": self.email, + "phone": self.phone, + "company": self.company, + "username": self.username, + "profile_url": self.profile_url, + "interest": self.interest, + "source_content": self.source_content, + "notes": self.notes, + "products_interested": self.products_interested, + "services_interested": self.services_interested, + "status": self.status, + "priority": self.priority, + "assigned_to": self.assigned_to, + "odoo_lead_id": self.odoo_lead_id, + "synced_to_odoo": self.synced_to_odoo, + "tags": self.tags, + "created_at": self.created_at.isoformat() if self.created_at else None, + "updated_at": self.updated_at.isoformat() if self.updated_at else None, + "last_contacted_at": self.last_contacted_at.isoformat() if self.last_contacted_at else None + } diff --git a/app/models/odoo_sync_log.py b/app/models/odoo_sync_log.py new file mode 100644 index 0000000..9f3da7e --- /dev/null +++ b/app/models/odoo_sync_log.py @@ -0,0 +1,79 @@ +""" +Odoo Sync Log Model - Track synchronization history with Odoo. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, JSON + +from app.core.database import Base + + +class OdooSyncLog(Base): + """ + Log of synchronization operations with Odoo ERP. + """ + __tablename__ = "odoo_sync_logs" + + id = Column(Integer, primary_key=True, index=True) + + # Sync operation details + sync_type = Column(String(50), nullable=False, index=True) + # Types: products, services, leads, sales + + direction = Column(String(20), nullable=False) + # Direction: import (from Odoo), export (to Odoo) + + status = Column(String(20), nullable=False, index=True) + # Status: started, completed, failed, partial + + # Statistics + records_processed = Column(Integer, default=0) + records_created = Column(Integer, default=0) + records_updated = Column(Integer, default=0) + records_failed = Column(Integer, default=0) + + # Error details + error_message = Column(Text, nullable=True) + error_details = Column(JSON, nullable=True) + # Contains list of failed records with error details + + # Sync details + sync_filter = Column(JSON, nullable=True) + # Filters applied during sync (e.g., date range, categories) + + # Timestamps + started_at = Column(DateTime, default=datetime.utcnow) + completed_at = Column(DateTime, nullable=True) + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "sync_type": self.sync_type, + "direction": self.direction, + "status": self.status, + "records_processed": self.records_processed, + "records_created": self.records_created, + "records_updated": self.records_updated, + "records_failed": self.records_failed, + "error_message": self.error_message, + "error_details": self.error_details, + "started_at": self.started_at.isoformat() if self.started_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "duration_seconds": (self.completed_at - self.started_at).total_seconds() if self.completed_at and self.started_at else None + } + + def mark_completed(self): + """Mark sync as completed.""" + self.status = "completed" + self.completed_at = datetime.utcnow() + + def mark_failed(self, error_message: str, details: dict = None): + """Mark sync as failed with error details.""" + self.status = "failed" + self.error_message = error_message + self.error_details = details + self.completed_at = datetime.utcnow() diff --git a/app/models/post.py b/app/models/post.py index 7b3ea97..b3e6613 100644 --- a/app/models/post.py +++ b/app/models/post.py @@ -5,6 +5,7 @@ Modelo de Post - Posts generados y programados. from datetime import datetime from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, JSON, Enum from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.orm import relationship import enum from app.core.database import Base @@ -93,10 +94,21 @@ class Post(Base): metrics = Column(JSON, nullable=True) # Ejemplo: {"likes": 10, "retweets": 5, "comments": 3} + # A/B Testing + ab_test_id = Column(Integer, ForeignKey("ab_tests.id"), nullable=True, index=True) + + # Recycling + is_recyclable = Column(Boolean, default=True) + recycled_from_id = Column(Integer, ForeignKey("posts.id"), nullable=True) + recycle_count = Column(Integer, default=0) # Times this post has been recycled + # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + # Relationships + metrics_history = relationship("PostMetrics", back_populates="post", cascade="all, delete-orphan") + def __repr__(self): return f"" @@ -120,7 +132,11 @@ class Post(Base): "approval_required": self.approval_required, "hashtags": self.hashtags, "metrics": self.metrics, - "created_at": self.created_at.isoformat() if self.created_at else None + "created_at": self.created_at.isoformat() if self.created_at else None, + "ab_test_id": self.ab_test_id, + "is_recyclable": self.is_recyclable, + "recycled_from_id": self.recycled_from_id, + "recycle_count": self.recycle_count } def get_content_for_platform(self, platform: str) -> str: diff --git a/app/models/post_metrics.py b/app/models/post_metrics.py new file mode 100644 index 0000000..052f506 --- /dev/null +++ b/app/models/post_metrics.py @@ -0,0 +1,79 @@ +""" +Post Metrics Model - Tracking engagement metrics per post. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, Float, String, DateTime, ForeignKey, Index +from sqlalchemy.orm import relationship + +from app.core.database import Base + + +class PostMetrics(Base): + """ + Stores engagement metrics for posts. + Multiple records per post to track metrics over time. + """ + __tablename__ = "post_metrics" + + id = Column(Integer, primary_key=True, index=True) + post_id = Column(Integer, ForeignKey("posts.id", ondelete="CASCADE"), nullable=False) + platform = Column(String(50), nullable=False) + + # Core engagement metrics + likes = Column(Integer, default=0) + comments = Column(Integer, default=0) + shares = Column(Integer, default=0) # retweets, reposts + impressions = Column(Integer, default=0) + reach = Column(Integer, default=0) + saves = Column(Integer, default=0) # bookmarks + clicks = Column(Integer, default=0) # link clicks + replies = Column(Integer, default=0) + quotes = Column(Integer, default=0) # quote tweets + + # Calculated metrics + engagement_rate = Column(Float, default=0.0) + engagement_total = Column(Integer, default=0) + + # Timestamps + recorded_at = Column(DateTime, default=datetime.utcnow, index=True) + created_at = Column(DateTime, default=datetime.utcnow) + + # Relationships + post = relationship("Post", back_populates="metrics_history") + + # Indexes for efficient queries + __table_args__ = ( + Index("ix_post_metrics_post_platform", "post_id", "platform"), + Index("ix_post_metrics_recorded", "recorded_at"), + ) + + def calculate_engagement_rate(self): + """Calculate engagement rate based on impressions.""" + self.engagement_total = ( + self.likes + self.comments + self.shares + + self.saves + self.replies + self.quotes + ) + if self.impressions > 0: + self.engagement_rate = (self.engagement_total / self.impressions) * 100 + else: + self.engagement_rate = 0.0 + + def to_dict(self): + return { + "id": self.id, + "post_id": self.post_id, + "platform": self.platform, + "likes": self.likes, + "comments": self.comments, + "shares": self.shares, + "impressions": self.impressions, + "reach": self.reach, + "saves": self.saves, + "clicks": self.clicks, + "replies": self.replies, + "quotes": self.quotes, + "engagement_rate": round(self.engagement_rate, 2), + "engagement_total": self.engagement_total, + "recorded_at": self.recorded_at.isoformat() if self.recorded_at else None + } diff --git a/app/models/product.py b/app/models/product.py index dfd35d8..d73b35f 100644 --- a/app/models/product.py +++ b/app/models/product.py @@ -49,6 +49,10 @@ class Product(Base): is_featured = Column(Boolean, default=False) # Producto destacado last_posted_at = Column(DateTime, nullable=True) # Última vez que se publicó + # Odoo integration + odoo_product_id = Column(Integer, nullable=True, unique=True, index=True) + odoo_last_synced = Column(DateTime, nullable=True) + # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) @@ -76,5 +80,7 @@ class Product(Base): "tags": self.tags, "highlights": self.highlights, "is_featured": self.is_featured, - "created_at": self.created_at.isoformat() if self.created_at else None + "created_at": self.created_at.isoformat() if self.created_at else None, + "odoo_product_id": self.odoo_product_id, + "odoo_last_synced": self.odoo_last_synced.isoformat() if self.odoo_last_synced else None } diff --git a/app/models/recycled_post.py b/app/models/recycled_post.py new file mode 100644 index 0000000..034da3f --- /dev/null +++ b/app/models/recycled_post.py @@ -0,0 +1,74 @@ +""" +Recycled Post Model - Track content recycling for evergreen posts. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean +from sqlalchemy.orm import relationship + +from app.core.database import Base + + +class RecycledPost(Base): + """ + Tracks when posts are recycled/republished. + Evergreen content that performed well can be republished with modifications. + """ + __tablename__ = "recycled_posts" + + id = Column(Integer, primary_key=True, index=True) + + # Original post reference + original_post_id = Column(Integer, ForeignKey("posts.id"), nullable=False, index=True) + + # New post created from recycling + new_post_id = Column(Integer, ForeignKey("posts.id"), nullable=True, index=True) + + # Recycle count (how many times this original has been recycled) + recycle_number = Column(Integer, default=1) + + # Modifications made + modifications = Column(JSON, nullable=True) + # Example: {"content_changed": true, "hashtags_updated": true, "image_changed": false} + + modification_notes = Column(Text, nullable=True) + + # Performance comparison + original_engagement_rate = Column(Integer, nullable=True) + new_engagement_rate = Column(Integer, nullable=True) + + # Status + status = Column(String(20), default="pending") + # pending, published, cancelled + + # Reason for recycling + reason = Column(String(100), nullable=True) + # high_performer, evergreen, seasonal, manual + + # Timestamps + recycled_at = Column(DateTime, default=datetime.utcnow) + scheduled_for = Column(DateTime, nullable=True) + + # Relationships + original_post = relationship("Post", foreign_keys=[original_post_id], backref="recycled_versions") + new_post = relationship("Post", foreign_keys=[new_post_id]) + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "original_post_id": self.original_post_id, + "new_post_id": self.new_post_id, + "recycle_number": self.recycle_number, + "modifications": self.modifications, + "modification_notes": self.modification_notes, + "original_engagement_rate": self.original_engagement_rate, + "new_engagement_rate": self.new_engagement_rate, + "status": self.status, + "reason": self.reason, + "recycled_at": self.recycled_at.isoformat() if self.recycled_at else None, + "scheduled_for": self.scheduled_for.isoformat() if self.scheduled_for else None + } diff --git a/app/models/service.py b/app/models/service.py index 787f669..421468f 100644 --- a/app/models/service.py +++ b/app/models/service.py @@ -58,6 +58,10 @@ class Service(Base): is_featured = Column(Boolean, default=False) last_posted_at = Column(DateTime, nullable=True) + # Odoo integration + odoo_service_id = Column(Integer, nullable=True, unique=True, index=True) + odoo_last_synced = Column(DateTime, nullable=True) + # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) @@ -86,5 +90,7 @@ class Service(Base): "call_to_action": self.call_to_action, "is_active": self.is_active, "is_featured": self.is_featured, - "created_at": self.created_at.isoformat() if self.created_at else None + "created_at": self.created_at.isoformat() if self.created_at else None, + "odoo_service_id": self.odoo_service_id, + "odoo_last_synced": self.odoo_last_synced.isoformat() if self.odoo_last_synced else None } diff --git a/app/models/thread_series.py b/app/models/thread_series.py new file mode 100644 index 0000000..c20b019 --- /dev/null +++ b/app/models/thread_series.py @@ -0,0 +1,160 @@ +""" +Thread Series Models - Multi-post thread content scheduling. +""" + +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean +from sqlalchemy.orm import relationship +import enum + +from app.core.database import Base + + +class ThreadSeriesStatus(enum.Enum): + """Status options for thread series.""" + DRAFT = "draft" + SCHEDULED = "scheduled" + PUBLISHING = "publishing" + COMPLETED = "completed" + PAUSED = "paused" + CANCELLED = "cancelled" + + +class ThreadSeries(Base): + """ + A series of related posts published as a thread. + E.g., Educational threads, story threads, tip series. + """ + __tablename__ = "thread_series" + + id = Column(Integer, primary_key=True, index=True) + + # Series info + name = Column(String(255), nullable=False) + description = Column(Text, nullable=True) + topic = Column(String(100), nullable=True) + + # Platform (threads work best on X and Threads) + platform = Column(String(50), nullable=False, index=True) + + # Schedule configuration + schedule_type = Column(String(20), default="sequential") + # sequential: posts one after another + # timed: posts at specific intervals + + interval_minutes = Column(Integer, default=5) # Time between posts + start_time = Column(DateTime, nullable=True) # When to start publishing + + # Thread structure + total_posts = Column(Integer, default=0) + posts_published = Column(Integer, default=0) + + # Status + status = Column(String(20), default="draft", index=True) + + # First post in chain (for reply chain) + first_platform_post_id = Column(String(100), nullable=True) + + # AI generation settings + ai_generated = Column(Boolean, default=False) + generation_prompt = Column(Text, nullable=True) + + # Metadata + hashtags = Column(JSON, nullable=True) # Common hashtags for the series + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + completed_at = Column(DateTime, nullable=True) + + # Relationships + posts = relationship("ThreadPost", back_populates="series", order_by="ThreadPost.sequence_number") + + def __repr__(self): + return f"" + + def to_dict(self, include_posts: bool = True): + """Convert to dictionary.""" + result = { + "id": self.id, + "name": self.name, + "description": self.description, + "topic": self.topic, + "platform": self.platform, + "schedule_type": self.schedule_type, + "interval_minutes": self.interval_minutes, + "start_time": self.start_time.isoformat() if self.start_time else None, + "total_posts": self.total_posts, + "posts_published": self.posts_published, + "status": self.status, + "ai_generated": self.ai_generated, + "hashtags": self.hashtags, + "created_at": self.created_at.isoformat() if self.created_at else None, + "completed_at": self.completed_at.isoformat() if self.completed_at else None + } + + if include_posts and self.posts: + result["posts"] = [p.to_dict() for p in self.posts] + + return result + + +class ThreadPost(Base): + """ + Individual post within a thread series. + """ + __tablename__ = "thread_posts" + + id = Column(Integer, primary_key=True, index=True) + + # Parent series + series_id = Column(Integer, ForeignKey("thread_series.id", ondelete="CASCADE"), nullable=False) + + # Position in thread + sequence_number = Column(Integer, nullable=False) # 1, 2, 3, ... + + # Content + content = Column(Text, nullable=False) + image_url = Column(String(500), nullable=True) + + # Associated post (once created) + post_id = Column(Integer, ForeignKey("posts.id"), nullable=True) + + # Platform post ID (for reply chain) + platform_post_id = Column(String(100), nullable=True) + reply_to_platform_id = Column(String(100), nullable=True) # ID of post to reply to + + # Schedule + scheduled_at = Column(DateTime, nullable=True) + + # Status + status = Column(String(20), default="pending") + # pending, scheduled, published, failed + + error_message = Column(Text, nullable=True) + published_at = Column(DateTime, nullable=True) + + # Timestamps + created_at = Column(DateTime, default=datetime.utcnow) + + # Relationships + series = relationship("ThreadSeries", back_populates="posts") + post = relationship("Post", backref="thread_post") + + def __repr__(self): + return f"" + + def to_dict(self): + """Convert to dictionary.""" + return { + "id": self.id, + "series_id": self.series_id, + "sequence_number": self.sequence_number, + "content": self.content[:100] + "..." if len(self.content) > 100 else self.content, + "full_content": self.content, + "image_url": self.image_url, + "post_id": self.post_id, + "platform_post_id": self.platform_post_id, + "scheduled_at": self.scheduled_at.isoformat() if self.scheduled_at else None, + "status": self.status, + "error_message": self.error_message, + "published_at": self.published_at.isoformat() if self.published_at else None + } diff --git a/app/services/ab_testing_service.py b/app/services/ab_testing_service.py new file mode 100644 index 0000000..147a286 --- /dev/null +++ b/app/services/ab_testing_service.py @@ -0,0 +1,396 @@ +""" +A/B Testing Service - Create and manage content experiments. +""" + +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import logging +from scipy import stats + +from app.core.database import SessionLocal +from app.models.ab_test import ABTest, ABTestVariant +from app.models.post import Post + +logger = logging.getLogger(__name__) + + +class ABTestingService: + """Service for A/B testing content variants.""" + + def _get_db(self): + """Get database session.""" + return SessionLocal() + + async def create_test( + self, + name: str, + platform: str, + variants: List[Dict], + test_type: str = "content", + duration_hours: int = 24, + min_sample_size: int = 100, + success_metric: str = "engagement_rate", + description: str = None + ) -> ABTest: + """ + Create a new A/B test with variants. + + Args: + name: Test name + platform: Target platform + variants: List of variant data [{"name": "A", "content": "...", "hashtags": [...]}] + test_type: Type of test (content, timing, hashtags, image) + duration_hours: How long to run the test + min_sample_size: Minimum impressions per variant + success_metric: Metric to optimize for + description: Optional description + + Returns: + Created ABTest object + """ + db = self._get_db() + + try: + # Create test + test = ABTest( + name=name, + description=description, + test_type=test_type, + platform=platform, + status="draft", + duration_hours=duration_hours, + min_sample_size=min_sample_size, + success_metric=success_metric + ) + + db.add(test) + db.flush() # Get the ID + + # Create variants + for variant_data in variants: + variant = ABTestVariant( + test_id=test.id, + name=variant_data.get("name", "A"), + content=variant_data.get("content", ""), + hashtags=variant_data.get("hashtags"), + image_url=variant_data.get("image_url") + ) + db.add(variant) + + db.commit() + db.refresh(test) + + return test + + except Exception as e: + logger.error(f"Error creating A/B test: {e}") + db.rollback() + raise + + finally: + db.close() + + async def start_test(self, test_id: int) -> Dict: + """ + Start an A/B test by creating and scheduling posts for each variant. + + Args: + test_id: ID of the test to start + + Returns: + Dict with status and created post IDs + """ + db = self._get_db() + + try: + test = db.query(ABTest).filter(ABTest.id == test_id).first() + if not test: + return {"success": False, "error": "Test not found"} + + if test.status != "draft": + return {"success": False, "error": f"Test is already {test.status}"} + + variants = db.query(ABTestVariant).filter( + ABTestVariant.test_id == test_id + ).all() + + if len(variants) < 2: + return {"success": False, "error": "Test needs at least 2 variants"} + + # Create posts for each variant + post_ids = [] + now = datetime.utcnow() + + for i, variant in enumerate(variants): + # Schedule variants slightly apart to ensure fair distribution + scheduled_at = now + timedelta(minutes=i * 5) + + post = Post( + content=variant.content, + content_type="ab_test", + platforms=[test.platform], + status="scheduled", + scheduled_at=scheduled_at, + hashtags=variant.hashtags, + image_url=variant.image_url, + ab_test_id=test.id + ) + + db.add(post) + db.flush() + + variant.post_id = post.id + variant.published_at = scheduled_at + post_ids.append(post.id) + + # Update test status + test.status = "running" + test.started_at = now + + db.commit() + + return { + "success": True, + "test_id": test.id, + "post_ids": post_ids, + "variants_count": len(variants) + } + + except Exception as e: + logger.error(f"Error starting A/B test: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def update_variant_metrics(self, test_id: int) -> Dict: + """ + Update metrics for all variants in a test from their posts. + + Args: + test_id: ID of the test + + Returns: + Dict with updated metrics + """ + db = self._get_db() + + try: + test = db.query(ABTest).filter(ABTest.id == test_id).first() + if not test: + return {"success": False, "error": "Test not found"} + + variants = db.query(ABTestVariant).filter( + ABTestVariant.test_id == test_id + ).all() + + for variant in variants: + if variant.post_id: + post = db.query(Post).filter(Post.id == variant.post_id).first() + if post and post.metrics: + variant.likes = post.metrics.get("likes", 0) + variant.comments = post.metrics.get("comments", 0) + variant.shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) + variant.impressions = post.metrics.get("impressions", 0) + variant.reach = post.metrics.get("reach", 0) + variant.calculate_engagement_rate() + + db.commit() + + return { + "success": True, + "variants_updated": len(variants) + } + + except Exception as e: + logger.error(f"Error updating variant metrics: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def evaluate_test(self, test_id: int) -> Dict: + """ + Evaluate A/B test results and determine winner. + + Args: + test_id: ID of the test to evaluate + + Returns: + Dict with evaluation results + """ + db = self._get_db() + + try: + test = db.query(ABTest).filter(ABTest.id == test_id).first() + if not test: + return {"success": False, "error": "Test not found"} + + # First update metrics + await self.update_variant_metrics(test_id) + + variants = db.query(ABTestVariant).filter( + ABTestVariant.test_id == test_id + ).all() + + if len(variants) < 2: + return {"success": False, "error": "Not enough variants"} + + # Check minimum sample size + min_impressions = min(v.impressions for v in variants) + if min_impressions < test.min_sample_size: + return { + "success": True, + "status": "insufficient_data", + "min_impressions": min_impressions, + "required": test.min_sample_size + } + + # Determine winner based on success metric + if test.success_metric == "engagement_rate": + sorted_variants = sorted(variants, key=lambda v: v.engagement_rate, reverse=True) + elif test.success_metric == "likes": + sorted_variants = sorted(variants, key=lambda v: v.likes, reverse=True) + elif test.success_metric == "comments": + sorted_variants = sorted(variants, key=lambda v: v.comments, reverse=True) + else: + sorted_variants = sorted(variants, key=lambda v: v.engagement_rate, reverse=True) + + winner = sorted_variants[0] + runner_up = sorted_variants[1] + + # Statistical significance test (chi-square for engagement) + try: + winner_engagements = winner.likes + winner.comments + winner.shares + runner_up_engagements = runner_up.likes + runner_up.comments + runner_up.shares + + contingency = [ + [winner_engagements, winner.impressions - winner_engagements], + [runner_up_engagements, runner_up.impressions - runner_up_engagements] + ] + + chi2, p_value, dof, expected = stats.chi2_contingency(contingency) + confidence = (1 - p_value) * 100 + + except Exception: + # If statistical test fails, just use raw comparison + confidence = None + p_value = None + + # Update test with winner + winner.is_winner = True + test.winning_variant_id = winner.id + test.confidence_level = confidence + + # Check if test should be concluded + if test.started_at: + elapsed = datetime.utcnow() - test.started_at + if elapsed.total_seconds() >= test.duration_hours * 3600: + test.status = "completed" + test.ended_at = datetime.utcnow() + + db.commit() + + return { + "success": True, + "winner": { + "variant_id": winner.id, + "name": winner.name, + "engagement_rate": winner.engagement_rate, + "impressions": winner.impressions + }, + "runner_up": { + "variant_id": runner_up.id, + "name": runner_up.name, + "engagement_rate": runner_up.engagement_rate, + "impressions": runner_up.impressions + }, + "confidence_level": confidence, + "p_value": p_value, + "test_status": test.status + } + + except Exception as e: + logger.error(f"Error evaluating A/B test: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def get_test(self, test_id: int) -> Optional[Dict]: + """Get a test with its variants.""" + db = self._get_db() + + try: + test = db.query(ABTest).filter(ABTest.id == test_id).first() + if test: + return test.to_dict() + return None + + finally: + db.close() + + async def get_tests( + self, + status: str = None, + platform: str = None, + limit: int = 20 + ) -> List[Dict]: + """Get tests with optional filters.""" + db = self._get_db() + + try: + query = db.query(ABTest) + + if status: + query = query.filter(ABTest.status == status) + if platform: + query = query.filter(ABTest.platform == platform) + + tests = query.order_by(ABTest.created_at.desc()).limit(limit).all() + return [t.to_dict() for t in tests] + + finally: + db.close() + + async def cancel_test(self, test_id: int) -> Dict: + """Cancel a running test.""" + db = self._get_db() + + try: + test = db.query(ABTest).filter(ABTest.id == test_id).first() + if not test: + return {"success": False, "error": "Test not found"} + + test.status = "cancelled" + test.ended_at = datetime.utcnow() + + # Cancel any scheduled posts + variants = db.query(ABTestVariant).filter( + ABTestVariant.test_id == test_id + ).all() + + for variant in variants: + if variant.post_id: + post = db.query(Post).filter(Post.id == variant.post_id).first() + if post and post.status == "scheduled": + post.status = "cancelled" + + db.commit() + + return {"success": True, "message": "Test cancelled"} + + except Exception as e: + logger.error(f"Error cancelling A/B test: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + +# Global instance +ab_testing_service = ABTestingService() diff --git a/app/services/analytics_service.py b/app/services/analytics_service.py new file mode 100644 index 0000000..2322797 --- /dev/null +++ b/app/services/analytics_service.py @@ -0,0 +1,423 @@ +""" +Analytics Service - Track and analyze post performance. +""" + +from datetime import datetime, date, timedelta +from typing import List, Dict, Optional +from sqlalchemy import func, desc +from sqlalchemy.orm import Session + +from app.core.database import SessionLocal +from app.models.post import Post +from app.models.post_metrics import PostMetrics +from app.models.analytics_report import AnalyticsReport +from app.models.interaction import Interaction + + +class AnalyticsService: + """Service for analytics and reporting.""" + + def __init__(self): + pass + + def _get_db(self) -> Session: + """Get database session.""" + return SessionLocal() + + async def get_dashboard_stats( + self, + days: int = 30, + platform: Optional[str] = None + ) -> Dict: + """Get dashboard statistics.""" + db = self._get_db() + try: + start_date = datetime.utcnow() - timedelta(days=days) + + # Base query for posts + posts_query = db.query(Post).filter( + Post.published_at >= start_date, + Post.status == "published" + ) + if platform: + posts_query = posts_query.filter(Post.platforms.contains([platform])) + + posts = posts_query.all() + + # Aggregate metrics + total_impressions = 0 + total_engagements = 0 + total_likes = 0 + total_comments = 0 + total_shares = 0 + + for post in posts: + if post.metrics: + total_likes += post.metrics.get("likes", 0) + total_comments += post.metrics.get("comments", 0) + total_shares += post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) + total_impressions += post.metrics.get("impressions", 0) + + total_engagements = total_likes + total_comments + total_shares + + # Calculate engagement rate + avg_engagement_rate = 0.0 + if total_impressions > 0: + avg_engagement_rate = (total_engagements / total_impressions) * 100 + + # Posts by platform + platform_breakdown = {} + for post in posts: + for p in post.platforms: + if p not in platform_breakdown: + platform_breakdown[p] = {"posts": 0, "engagements": 0} + platform_breakdown[p]["posts"] += 1 + if post.metrics: + platform_breakdown[p]["engagements"] += ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + + post.metrics.get("shares", 0) + ) + + # Posts by content type + content_breakdown = {} + for post in posts: + ct = post.content_type + if ct not in content_breakdown: + content_breakdown[ct] = {"posts": 0, "engagements": 0} + content_breakdown[ct]["posts"] += 1 + if post.metrics: + content_breakdown[ct]["engagements"] += ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + ) + + # Pending interactions + pending_interactions = db.query(Interaction).filter( + Interaction.responded == False, + Interaction.is_archived == False + ).count() + + return { + "period_days": days, + "total_posts": len(posts), + "total_impressions": total_impressions, + "total_engagements": total_engagements, + "total_likes": total_likes, + "total_comments": total_comments, + "total_shares": total_shares, + "avg_engagement_rate": round(avg_engagement_rate, 2), + "platform_breakdown": platform_breakdown, + "content_breakdown": content_breakdown, + "pending_interactions": pending_interactions + } + + finally: + db.close() + + async def get_top_posts( + self, + days: int = 30, + limit: int = 10, + platform: Optional[str] = None + ) -> List[Dict]: + """Get top performing posts by engagement.""" + db = self._get_db() + try: + start_date = datetime.utcnow() - timedelta(days=days) + + posts_query = db.query(Post).filter( + Post.published_at >= start_date, + Post.status == "published", + Post.metrics.isnot(None) + ) + + if platform: + posts_query = posts_query.filter(Post.platforms.contains([platform])) + + posts = posts_query.all() + + # Calculate engagement for each post and sort + posts_with_engagement = [] + for post in posts: + if post.metrics: + engagement = ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + + post.metrics.get("shares", 0) + + post.metrics.get("retweets", 0) + ) + impressions = post.metrics.get("impressions", 1) + engagement_rate = (engagement / impressions * 100) if impressions > 0 else 0 + + posts_with_engagement.append({ + "id": post.id, + "content": post.content[:100] + "..." if len(post.content) > 100 else post.content, + "content_type": post.content_type, + "platforms": post.platforms, + "published_at": post.published_at.isoformat() if post.published_at else None, + "likes": post.metrics.get("likes", 0), + "comments": post.metrics.get("comments", 0), + "shares": post.metrics.get("shares", 0) + post.metrics.get("retweets", 0), + "impressions": impressions, + "engagement_rate": round(engagement_rate, 2) + }) + + # Sort by engagement rate + posts_with_engagement.sort(key=lambda x: x["engagement_rate"], reverse=True) + + return posts_with_engagement[:limit] + + finally: + db.close() + + async def get_optimal_times( + self, + platform: Optional[str] = None, + days: int = 90 + ) -> List[Dict]: + """Calculate optimal posting times based on historical data.""" + db = self._get_db() + try: + start_date = datetime.utcnow() - timedelta(days=days) + + posts_query = db.query(Post).filter( + Post.published_at >= start_date, + Post.status == "published", + Post.metrics.isnot(None) + ) + + if platform: + posts_query = posts_query.filter(Post.platforms.contains([platform])) + + posts = posts_query.all() + + # Group by day of week and hour + time_slots = {} # {(day, hour): [engagement_rates]} + + for post in posts: + if post.published_at and post.metrics: + day = post.published_at.weekday() + hour = post.published_at.hour + + engagement = ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + + post.metrics.get("shares", 0) + ) + impressions = post.metrics.get("impressions", 1) + rate = (engagement / impressions * 100) if impressions > 0 else 0 + + key = (day, hour) + if key not in time_slots: + time_slots[key] = [] + time_slots[key].append(rate) + + # Calculate averages + results = [] + for (day, hour), rates in time_slots.items(): + avg_rate = sum(rates) / len(rates) if rates else 0 + results.append({ + "day": day, + "day_name": ["Lun", "Mar", "Mié", "Jue", "Vie", "Sáb", "Dom"][day], + "hour": hour, + "hour_formatted": f"{hour:02d}:00", + "avg_engagement_rate": round(avg_rate, 2), + "sample_size": len(rates) + }) + + # Sort by engagement rate + results.sort(key=lambda x: x["avg_engagement_rate"], reverse=True) + + return results + + finally: + db.close() + + async def generate_weekly_report( + self, + week_start: Optional[date] = None + ) -> AnalyticsReport: + """Generate weekly analytics report.""" + db = self._get_db() + try: + if week_start is None: + # Last complete week + today = date.today() + week_start = today - timedelta(days=today.weekday() + 7) + + week_end = week_start + timedelta(days=6) + start_dt = datetime.combine(week_start, datetime.min.time()) + end_dt = datetime.combine(week_end, datetime.max.time()) + + # Previous week for comparison + prev_start = week_start - timedelta(days=7) + prev_end = week_end - timedelta(days=7) + prev_start_dt = datetime.combine(prev_start, datetime.min.time()) + prev_end_dt = datetime.combine(prev_end, datetime.max.time()) + + # Current week posts + posts = db.query(Post).filter( + Post.published_at >= start_dt, + Post.published_at <= end_dt, + Post.status == "published" + ).all() + + # Previous week posts + prev_posts = db.query(Post).filter( + Post.published_at >= prev_start_dt, + Post.published_at <= prev_end_dt, + Post.status == "published" + ).all() + + # Calculate current week metrics + total_impressions = 0 + total_engagements = 0 + total_likes = 0 + total_comments = 0 + total_shares = 0 + platform_breakdown = {} + content_performance = {} + + for post in posts: + if post.metrics: + likes = post.metrics.get("likes", 0) + comments = post.metrics.get("comments", 0) + shares = post.metrics.get("shares", 0) + post.metrics.get("retweets", 0) + impressions = post.metrics.get("impressions", 0) + + total_likes += likes + total_comments += comments + total_shares += shares + total_impressions += impressions + + # Platform breakdown + for p in post.platforms: + if p not in platform_breakdown: + platform_breakdown[p] = {"posts": 0, "engagements": 0} + platform_breakdown[p]["posts"] += 1 + platform_breakdown[p]["engagements"] += likes + comments + shares + + # Content type performance + ct = post.content_type + if ct not in content_performance: + content_performance[ct] = {"posts": 0, "engagements": 0, "impressions": 0} + content_performance[ct]["posts"] += 1 + content_performance[ct]["engagements"] += likes + comments + shares + content_performance[ct]["impressions"] += impressions + + total_engagements = total_likes + total_comments + total_shares + + # Calculate previous week totals for comparison + prev_engagements = 0 + for post in prev_posts: + if post.metrics: + prev_engagements += ( + post.metrics.get("likes", 0) + + post.metrics.get("comments", 0) + + post.metrics.get("shares", 0) + ) + + # Calculate changes + posts_change = ((len(posts) - len(prev_posts)) / len(prev_posts) * 100) if prev_posts else 0 + engagement_change = ((total_engagements - prev_engagements) / prev_engagements * 100) if prev_engagements else 0 + + # Get top posts + top_posts = await self.get_top_posts(days=7, limit=5) + + # Get best times + best_times = await self.get_optimal_times(days=30) + + # Calculate averages + avg_engagement_rate = (total_engagements / total_impressions * 100) if total_impressions > 0 else 0 + avg_impressions = total_impressions / len(posts) if posts else 0 + avg_engagements = total_engagements / len(posts) if posts else 0 + + # Create report + report = AnalyticsReport( + report_type="weekly", + period_start=week_start, + period_end=week_end, + total_posts=len(posts), + total_impressions=total_impressions, + total_engagements=total_engagements, + total_likes=total_likes, + total_comments=total_comments, + total_shares=total_shares, + avg_engagement_rate=avg_engagement_rate, + avg_impressions_per_post=avg_impressions, + avg_engagements_per_post=avg_engagements, + posts_change_pct=posts_change, + engagement_change_pct=engagement_change, + top_posts=top_posts[:5], + best_times=best_times[:10], + content_performance=content_performance, + platform_breakdown=platform_breakdown + ) + + # Generate summary text + report.generate_telegram_summary() + + db.add(report) + db.commit() + db.refresh(report) + + return report + + finally: + db.close() + + async def get_reports( + self, + report_type: str = "weekly", + limit: int = 10 + ) -> List[Dict]: + """Get historical reports.""" + db = self._get_db() + try: + reports = db.query(AnalyticsReport).filter( + AnalyticsReport.report_type == report_type + ).order_by(desc(AnalyticsReport.period_start)).limit(limit).all() + + return [r.to_dict() for r in reports] + + finally: + db.close() + + async def record_post_metrics( + self, + post_id: int, + platform: str, + metrics: Dict + ) -> PostMetrics: + """Record metrics snapshot for a post.""" + db = self._get_db() + try: + post_metrics = PostMetrics( + post_id=post_id, + platform=platform, + likes=metrics.get("likes", 0), + comments=metrics.get("comments", 0), + shares=metrics.get("shares", 0), + impressions=metrics.get("impressions", 0), + reach=metrics.get("reach", 0), + saves=metrics.get("saves", 0), + clicks=metrics.get("clicks", 0), + replies=metrics.get("replies", 0), + quotes=metrics.get("quotes", 0) + ) + + post_metrics.calculate_engagement_rate() + + db.add(post_metrics) + db.commit() + db.refresh(post_metrics) + + return post_metrics + + finally: + db.close() + + +# Global instance +analytics_service = AnalyticsService() diff --git a/app/services/odoo_service.py b/app/services/odoo_service.py new file mode 100644 index 0000000..becd84e --- /dev/null +++ b/app/services/odoo_service.py @@ -0,0 +1,493 @@ +""" +Odoo Integration Service - Sync products, services, and leads with Odoo ERP. +""" + +import xmlrpc.client +from datetime import datetime +from typing import List, Dict, Optional, Any +import logging + +from app.core.config import settings +from app.core.database import SessionLocal +from app.models.product import Product +from app.models.service import Service +from app.models.lead import Lead +from app.models.odoo_sync_log import OdooSyncLog + +logger = logging.getLogger(__name__) + + +class OdooService: + """Service for Odoo ERP integration.""" + + def __init__(self): + self.url = settings.ODOO_URL + self.db = settings.ODOO_DB + self.username = settings.ODOO_USERNAME + self.password = settings.ODOO_PASSWORD + self.enabled = settings.ODOO_SYNC_ENABLED + self._uid = None + self._common = None + self._models = None + + def _get_db(self): + """Get database session.""" + return SessionLocal() + + def _connect(self) -> bool: + """Establish connection to Odoo.""" + if not self.enabled or not all([self.url, self.db, self.username, self.password]): + logger.warning("Odoo not configured or disabled") + return False + + try: + # Common endpoint for authentication + self._common = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/common") + + # Authenticate + self._uid = self._common.authenticate( + self.db, self.username, self.password, {} + ) + + if not self._uid: + logger.error("Odoo authentication failed") + return False + + # Models endpoint for data operations + self._models = xmlrpc.client.ServerProxy(f"{self.url}/xmlrpc/2/object") + + return True + + except Exception as e: + logger.error(f"Odoo connection error: {e}") + return False + + def _execute(self, model: str, method: str, *args, **kwargs) -> Any: + """Execute an Odoo model method.""" + if not self._models or not self._uid: + if not self._connect(): + raise ConnectionError("Cannot connect to Odoo") + + return self._models.execute_kw( + self.db, self._uid, self.password, + model, method, list(args), kwargs + ) + + async def test_connection(self) -> Dict: + """Test connection to Odoo.""" + if not self.enabled: + return { + "connected": False, + "error": "Odoo integration not enabled", + "configured": bool(self.url) + } + + try: + if self._connect(): + version = self._common.version() + return { + "connected": True, + "version": version.get("server_version"), + "uid": self._uid + } + else: + return { + "connected": False, + "error": "Authentication failed" + } + except Exception as e: + return { + "connected": False, + "error": str(e) + } + + async def sync_products(self, limit: int = 100) -> Dict: + """ + Sync products from Odoo to local database. + + Returns: + Dict with sync statistics + """ + db = self._get_db() + log = OdooSyncLog( + sync_type="products", + direction="import", + status="started" + ) + db.add(log) + db.commit() + + try: + if not self._connect(): + log.mark_failed("Cannot connect to Odoo") + db.commit() + return {"success": False, "error": "Connection failed"} + + # Fetch products from Odoo + product_ids = self._execute( + "product.template", "search", + [["sale_ok", "=", True], ["active", "=", True]], + limit=limit + ) + + products_data = self._execute( + "product.template", "read", + product_ids, + fields=["id", "name", "description_sale", "list_price", "categ_id", + "image_1920", "qty_available", "default_code"] + ) + + created = 0 + updated = 0 + failed = 0 + + for odoo_product in products_data: + try: + # Check if product exists locally + local_product = db.query(Product).filter( + Product.odoo_product_id == odoo_product["id"] + ).first() + + product_data = { + "name": odoo_product["name"], + "description": odoo_product.get("description_sale") or "", + "price": odoo_product.get("list_price", 0), + "category": odoo_product.get("categ_id", [0, "general"])[1] if odoo_product.get("categ_id") else "general", + "stock": int(odoo_product.get("qty_available", 0)), + "is_available": odoo_product.get("qty_available", 0) > 0, + "odoo_product_id": odoo_product["id"], + "odoo_last_synced": datetime.utcnow() + } + + if local_product: + # Update existing + for key, value in product_data.items(): + setattr(local_product, key, value) + updated += 1 + else: + # Create new + local_product = Product(**product_data) + db.add(local_product) + created += 1 + + except Exception as e: + logger.error(f"Error syncing product {odoo_product.get('id')}: {e}") + failed += 1 + + db.commit() + + log.records_processed = len(products_data) + log.records_created = created + log.records_updated = updated + log.records_failed = failed + log.mark_completed() + db.commit() + + return { + "success": True, + "processed": len(products_data), + "created": created, + "updated": updated, + "failed": failed + } + + except Exception as e: + logger.error(f"Error in sync_products: {e}") + log.mark_failed(str(e)) + db.commit() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def sync_services(self, limit: int = 100) -> Dict: + """ + Sync services from Odoo to local database. + Services in Odoo are typically products with type='service'. + + Returns: + Dict with sync statistics + """ + db = self._get_db() + log = OdooSyncLog( + sync_type="services", + direction="import", + status="started" + ) + db.add(log) + db.commit() + + try: + if not self._connect(): + log.mark_failed("Cannot connect to Odoo") + db.commit() + return {"success": False, "error": "Connection failed"} + + # Fetch service-type products from Odoo + service_ids = self._execute( + "product.template", "search", + [["type", "=", "service"], ["sale_ok", "=", True], ["active", "=", True]], + limit=limit + ) + + services_data = self._execute( + "product.template", "read", + service_ids, + fields=["id", "name", "description_sale", "list_price", "categ_id"] + ) + + created = 0 + updated = 0 + failed = 0 + + for odoo_service in services_data: + try: + # Check if service exists locally + local_service = db.query(Service).filter( + Service.odoo_service_id == odoo_service["id"] + ).first() + + service_data = { + "name": odoo_service["name"], + "description": odoo_service.get("description_sale") or "", + "category": odoo_service.get("categ_id", [0, "general"])[1] if odoo_service.get("categ_id") else "general", + "price_range": f"${odoo_service.get('list_price', 0):,.0f} MXN" if odoo_service.get("list_price") else None, + "odoo_service_id": odoo_service["id"], + "odoo_last_synced": datetime.utcnow() + } + + if local_service: + # Update existing + for key, value in service_data.items(): + setattr(local_service, key, value) + updated += 1 + else: + # Create new + local_service = Service(**service_data) + db.add(local_service) + created += 1 + + except Exception as e: + logger.error(f"Error syncing service {odoo_service.get('id')}: {e}") + failed += 1 + + db.commit() + + log.records_processed = len(services_data) + log.records_created = created + log.records_updated = updated + log.records_failed = failed + log.mark_completed() + db.commit() + + return { + "success": True, + "processed": len(services_data), + "created": created, + "updated": updated, + "failed": failed + } + + except Exception as e: + logger.error(f"Error in sync_services: {e}") + log.mark_failed(str(e)) + db.commit() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def create_lead(self, lead: Lead) -> Dict: + """ + Create a lead in Odoo CRM. + + Args: + lead: Local lead object + + Returns: + Dict with Odoo lead ID and status + """ + try: + if not self._connect(): + return {"success": False, "error": "Cannot connect to Odoo"} + + # Prepare lead data for Odoo + odoo_lead_data = { + "name": lead.interest or f"Lead from {lead.platform}", + "contact_name": lead.name or lead.username, + "email_from": lead.email, + "phone": lead.phone, + "partner_name": lead.company, + "description": f""" +Source: {lead.platform} +Username: @{lead.username} +Profile: {lead.profile_url} + +Original Content: +{lead.source_content} + +Notes: +{lead.notes or 'No notes'} + """.strip(), + "type": "lead", + "priority": "2" if lead.priority == "high" else "1" if lead.priority == "medium" else "0", + } + + # Create lead in Odoo + odoo_lead_id = self._execute( + "crm.lead", "create", + [odoo_lead_data] + ) + + return { + "success": True, + "odoo_lead_id": odoo_lead_id + } + + except Exception as e: + logger.error(f"Error creating Odoo lead: {e}") + return {"success": False, "error": str(e)} + + async def export_leads_to_odoo(self) -> Dict: + """ + Export unsynced leads to Odoo CRM. + + Returns: + Dict with export statistics + """ + db = self._get_db() + log = OdooSyncLog( + sync_type="leads", + direction="export", + status="started" + ) + db.add(log) + db.commit() + + try: + if not self._connect(): + log.mark_failed("Cannot connect to Odoo") + db.commit() + return {"success": False, "error": "Connection failed"} + + # Get unsynced leads + unsynced_leads = db.query(Lead).filter( + Lead.synced_to_odoo == False + ).all() + + created = 0 + failed = 0 + errors = [] + + for lead in unsynced_leads: + try: + result = await self.create_lead(lead) + + if result["success"]: + lead.odoo_lead_id = result["odoo_lead_id"] + lead.synced_to_odoo = True + lead.odoo_synced_at = datetime.utcnow() + created += 1 + else: + failed += 1 + errors.append({ + "lead_id": lead.id, + "error": result.get("error") + }) + + except Exception as e: + logger.error(f"Error exporting lead {lead.id}: {e}") + failed += 1 + errors.append({ + "lead_id": lead.id, + "error": str(e) + }) + + db.commit() + + log.records_processed = len(unsynced_leads) + log.records_created = created + log.records_failed = failed + if errors: + log.error_details = errors + log.mark_completed() + db.commit() + + return { + "success": True, + "processed": len(unsynced_leads), + "created": created, + "failed": failed, + "errors": errors if errors else None + } + + except Exception as e: + logger.error(f"Error in export_leads_to_odoo: {e}") + log.mark_failed(str(e)) + db.commit() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def get_sales_summary(self, days: int = 30) -> Dict: + """ + Get sales summary from Odoo. + + Args: + days: Number of days to look back + + Returns: + Dict with sales statistics + """ + try: + if not self._connect(): + return {"success": False, "error": "Cannot connect to Odoo"} + + from datetime import timedelta + start_date = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%d") + + # Get confirmed sales orders + order_ids = self._execute( + "sale.order", "search", + [["state", "in", ["sale", "done"]], ["date_order", ">=", start_date]] + ) + + orders_data = self._execute( + "sale.order", "read", + order_ids, + fields=["id", "name", "amount_total", "date_order", "partner_id", "state"] + ) + + total_revenue = sum(order.get("amount_total", 0) for order in orders_data) + total_orders = len(orders_data) + + return { + "success": True, + "period_days": days, + "total_orders": total_orders, + "total_revenue": total_revenue, + "avg_order_value": total_revenue / total_orders if total_orders > 0 else 0, + "orders": orders_data[:10] # Return latest 10 + } + + except Exception as e: + logger.error(f"Error getting sales summary: {e}") + return {"success": False, "error": str(e)} + + async def get_sync_logs(self, limit: int = 20) -> List[Dict]: + """Get recent sync logs.""" + db = self._get_db() + try: + logs = db.query(OdooSyncLog).order_by( + OdooSyncLog.started_at.desc() + ).limit(limit).all() + + return [log.to_dict() for log in logs] + + finally: + db.close() + + +# Global instance +odoo_service = OdooService() diff --git a/app/services/recycling_service.py b/app/services/recycling_service.py new file mode 100644 index 0000000..6a930e1 --- /dev/null +++ b/app/services/recycling_service.py @@ -0,0 +1,327 @@ +""" +Content Recycling Service - Republish high-performing evergreen content. +""" + +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import logging + +from app.core.database import SessionLocal +from app.models.post import Post +from app.models.recycled_post import RecycledPost + +logger = logging.getLogger(__name__) + + +class RecyclingService: + """Service for recycling high-performing content.""" + + # Default settings + MIN_DAYS_SINCE_PUBLISH = 30 # Don't recycle recent posts + MIN_ENGAGEMENT_RATE = 2.0 # Minimum engagement rate to consider + MAX_RECYCLE_COUNT = 3 # Maximum times to recycle a post + + def _get_db(self): + """Get database session.""" + return SessionLocal() + + def _calculate_engagement_rate(self, metrics: dict) -> float: + """Calculate engagement rate from metrics.""" + if not metrics: + return 0.0 + + impressions = metrics.get("impressions", 0) + if impressions == 0: + return 0.0 + + engagements = ( + metrics.get("likes", 0) + + metrics.get("comments", 0) + + metrics.get("shares", 0) + + metrics.get("retweets", 0) + ) + + return (engagements / impressions) * 100 + + async def find_recyclable_posts( + self, + platform: str = None, + content_type: str = None, + min_engagement_rate: float = None, + min_days_since_publish: int = None, + limit: int = 20 + ) -> List[Dict]: + """ + Find posts that are good candidates for recycling. + + Args: + platform: Filter by platform + content_type: Filter by content type + min_engagement_rate: Minimum engagement rate threshold + min_days_since_publish: Minimum days since original publish + limit: Maximum candidates to return + + Returns: + List of recyclable post candidates with scores + """ + db = self._get_db() + + try: + min_eng = min_engagement_rate or self.MIN_ENGAGEMENT_RATE + min_days = min_days_since_publish or self.MIN_DAYS_SINCE_PUBLISH + + cutoff_date = datetime.utcnow() - timedelta(days=min_days) + + # Query for published posts + query = db.query(Post).filter( + Post.status == "published", + Post.published_at <= cutoff_date, + Post.is_recyclable == True, + Post.recycle_count < self.MAX_RECYCLE_COUNT, + Post.metrics.isnot(None) + ) + + if platform: + query = query.filter(Post.platforms.contains([platform])) + + if content_type: + query = query.filter(Post.content_type == content_type) + + posts = query.all() + + # Calculate engagement and filter/sort + candidates = [] + for post in posts: + engagement_rate = self._calculate_engagement_rate(post.metrics) + + if engagement_rate >= min_eng: + # Calculate a recycling score + days_since_publish = (datetime.utcnow() - post.published_at).days + recency_factor = min(days_since_publish / 90, 1.0) # Max out at 90 days + recycled_penalty = 1 - (post.recycle_count * 0.2) # 20% penalty per recycle + + score = engagement_rate * recency_factor * recycled_penalty + + candidates.append({ + "id": post.id, + "content": post.content[:100] + "..." if len(post.content) > 100 else post.content, + "full_content": post.content, + "content_type": post.content_type, + "platforms": post.platforms, + "published_at": post.published_at.isoformat(), + "days_since_publish": days_since_publish, + "engagement_rate": round(engagement_rate, 2), + "recycle_count": post.recycle_count, + "score": round(score, 2), + "metrics": post.metrics + }) + + # Sort by score + candidates.sort(key=lambda x: x["score"], reverse=True) + + return candidates[:limit] + + finally: + db.close() + + async def recycle_post( + self, + post_id: int, + modifications: Dict = None, + scheduled_for: datetime = None, + platforms: List[str] = None, + reason: str = "manual" + ) -> Dict: + """ + Create a recycled version of a post. + + Args: + post_id: Original post ID + modifications: Dict of modifications {content, hashtags, image_url} + scheduled_for: When to publish (defaults to now + 1 hour) + platforms: Override platforms (defaults to original) + reason: Reason for recycling (high_performer, evergreen, seasonal, manual) + + Returns: + Dict with new post info + """ + db = self._get_db() + + try: + original = db.query(Post).filter(Post.id == post_id).first() + if not original: + return {"success": False, "error": "Original post not found"} + + if not original.is_recyclable: + return {"success": False, "error": "Post is marked as not recyclable"} + + if original.recycle_count >= self.MAX_RECYCLE_COUNT: + return {"success": False, "error": f"Post has been recycled {original.recycle_count} times (max {self.MAX_RECYCLE_COUNT})"} + + # Create new post + new_content = modifications.get("content") if modifications else None + new_hashtags = modifications.get("hashtags") if modifications else None + new_image = modifications.get("image_url") if modifications else None + + new_post = Post( + content=new_content or original.content, + content_type=original.content_type, + platforms=platforms or original.platforms, + status="scheduled", + scheduled_at=scheduled_for or (datetime.utcnow() + timedelta(hours=1)), + hashtags=new_hashtags or original.hashtags, + image_url=new_image or original.image_url, + recycled_from_id=original.id, + is_recyclable=True + ) + + db.add(new_post) + db.flush() + + # Track the recycling + recycle_record = RecycledPost( + original_post_id=original.id, + new_post_id=new_post.id, + recycle_number=original.recycle_count + 1, + modifications={ + "content_changed": bool(new_content), + "hashtags_updated": bool(new_hashtags), + "image_changed": bool(new_image) + }, + original_engagement_rate=self._calculate_engagement_rate(original.metrics), + reason=reason, + status="pending", + scheduled_for=new_post.scheduled_at + ) + + db.add(recycle_record) + + # Update original's recycle count + original.recycle_count += 1 + + db.commit() + db.refresh(new_post) + + return { + "success": True, + "new_post_id": new_post.id, + "recycle_record_id": recycle_record.id, + "scheduled_for": new_post.scheduled_at.isoformat(), + "platforms": new_post.platforms + } + + except Exception as e: + logger.error(f"Error recycling post: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def auto_recycle( + self, + platform: str = None, + count: int = 1, + min_engagement_rate: float = None + ) -> Dict: + """ + Automatically select and recycle top-performing posts. + + Args: + platform: Filter by platform + count: Number of posts to recycle + min_engagement_rate: Minimum engagement rate threshold + + Returns: + Dict with recycled posts info + """ + db = self._get_db() + + try: + # Find candidates + candidates = await self.find_recyclable_posts( + platform=platform, + min_engagement_rate=min_engagement_rate or self.MIN_ENGAGEMENT_RATE, + limit=count * 2 # Get extra in case some fail + ) + + if not candidates: + return { + "success": True, + "recycled": 0, + "message": "No eligible posts found for recycling" + } + + recycled = [] + for candidate in candidates[:count]: + result = await self.recycle_post( + post_id=candidate["id"], + reason="high_performer" + ) + + if result.get("success"): + recycled.append({ + "original_id": candidate["id"], + "new_post_id": result["new_post_id"], + "engagement_rate": candidate["engagement_rate"] + }) + + return { + "success": True, + "recycled": len(recycled), + "posts": recycled + } + + finally: + db.close() + + async def get_recycling_history( + self, + original_post_id: int = None, + limit: int = 50 + ) -> List[Dict]: + """ + Get recycling history. + + Args: + original_post_id: Filter by original post + limit: Maximum records to return + + Returns: + List of recycling records + """ + db = self._get_db() + + try: + query = db.query(RecycledPost) + + if original_post_id: + query = query.filter(RecycledPost.original_post_id == original_post_id) + + records = query.order_by(RecycledPost.recycled_at.desc()).limit(limit).all() + + return [r.to_dict() for r in records] + + finally: + db.close() + + async def mark_post_not_recyclable(self, post_id: int) -> Dict: + """Mark a post as not eligible for recycling.""" + db = self._get_db() + + try: + post = db.query(Post).filter(Post.id == post_id).first() + if not post: + return {"success": False, "error": "Post not found"} + + post.is_recyclable = False + db.commit() + + return {"success": True, "message": "Post marked as not recyclable"} + + finally: + db.close() + + +# Global instance +recycling_service = RecyclingService() diff --git a/app/services/thread_service.py b/app/services/thread_service.py new file mode 100644 index 0000000..7b966c2 --- /dev/null +++ b/app/services/thread_service.py @@ -0,0 +1,393 @@ +""" +Thread Series Service - Create and manage multi-post threads. +""" + +from datetime import datetime, timedelta +from typing import List, Dict, Optional +import logging + +from app.core.database import SessionLocal +from app.core.config import settings +from app.models.thread_series import ThreadSeries, ThreadPost +from app.models.post import Post + +logger = logging.getLogger(__name__) + + +class ThreadService: + """Service for managing thread series.""" + + def _get_db(self): + """Get database session.""" + return SessionLocal() + + async def create_series( + self, + name: str, + platform: str, + posts_content: List[Dict], + description: str = None, + topic: str = None, + schedule_type: str = "sequential", + interval_minutes: int = 5, + start_time: datetime = None, + hashtags: List[str] = None, + ai_generated: bool = False, + generation_prompt: str = None + ) -> ThreadSeries: + """ + Create a new thread series with posts. + + Args: + name: Series name + platform: Target platform (x, threads) + posts_content: List of post content dicts [{"content": "...", "image_url": "..."}] + description: Series description + topic: Topic for categorization + schedule_type: "sequential" or "timed" + interval_minutes: Minutes between posts + start_time: When to start publishing + hashtags: Common hashtags for the series + ai_generated: Whether content was AI-generated + generation_prompt: The prompt used for AI generation + + Returns: + Created ThreadSeries object + """ + db = self._get_db() + + try: + # Create series + series = ThreadSeries( + name=name, + description=description, + topic=topic, + platform=platform, + schedule_type=schedule_type, + interval_minutes=interval_minutes, + start_time=start_time, + total_posts=len(posts_content), + status="draft", + hashtags=hashtags, + ai_generated=ai_generated, + generation_prompt=generation_prompt + ) + + db.add(series) + db.flush() + + # Create thread posts + for i, content_data in enumerate(posts_content, 1): + thread_post = ThreadPost( + series_id=series.id, + sequence_number=i, + content=content_data.get("content", ""), + image_url=content_data.get("image_url"), + status="pending" + ) + db.add(thread_post) + + db.commit() + db.refresh(series) + + return series + + except Exception as e: + logger.error(f"Error creating thread series: {e}") + db.rollback() + raise + + finally: + db.close() + + async def generate_thread_with_ai( + self, + topic: str, + platform: str, + num_posts: int = 5, + style: str = "educational" + ) -> Dict: + """ + Generate a thread series using AI. + + Args: + topic: The topic to generate content about + platform: Target platform + num_posts: Number of posts in the thread + style: Style of content (educational, storytelling, tips) + + Returns: + Dict with generated content + """ + if not settings.DEEPSEEK_API_KEY: + return {"success": False, "error": "AI API not configured"} + + try: + from app.services.content_generator import content_generator + + # Generate thread content + prompt = f"""Genera un hilo de {num_posts} publicaciones sobre: {topic} + +Estilo: {style} +Plataforma: {platform} + +Requisitos: +1. Cada publicación debe ser concisa (max 280 caracteres para X, 500 para Threads) +2. La primera publicación debe captar atención +3. Cada publicación debe añadir valor y conectar con la siguiente +4. La última debe tener un llamado a la acción +5. Usa emojis apropiadamente +6. Mantén el tono profesional pero accesible + +Formato de respuesta (JSON): +[ + {{"sequence": 1, "content": "..."}}, + {{"sequence": 2, "content": "..."}}, + ... +] +""" + + # Use the content generator to get AI response + response = await content_generator._call_deepseek( + prompt, + temperature=0.7 + ) + + # Parse the response + import json + import re + + # Try to extract JSON from response + json_match = re.search(r'\[[\s\S]*\]', response) + if json_match: + posts_data = json.loads(json_match.group()) + + return { + "success": True, + "topic": topic, + "platform": platform, + "style": style, + "posts": posts_data, + "count": len(posts_data) + } + else: + return {"success": False, "error": "Could not parse AI response"} + + except Exception as e: + logger.error(f"Error generating thread with AI: {e}") + return {"success": False, "error": str(e)} + + async def schedule_series( + self, + series_id: int, + start_time: datetime = None + ) -> Dict: + """ + Schedule a thread series for publishing. + + Args: + series_id: ID of the series + start_time: When to start (defaults to now + 5 minutes) + + Returns: + Dict with scheduling info + """ + db = self._get_db() + + try: + series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() + if not series: + return {"success": False, "error": "Series not found"} + + if series.status not in ["draft", "paused"]: + return {"success": False, "error": f"Series is {series.status}, cannot schedule"} + + thread_posts = db.query(ThreadPost).filter( + ThreadPost.series_id == series_id + ).order_by(ThreadPost.sequence_number).all() + + if not thread_posts: + return {"success": False, "error": "Series has no posts"} + + # Set start time + start = start_time or (datetime.utcnow() + timedelta(minutes=5)) + series.start_time = start + + # Schedule each post + for i, thread_post in enumerate(thread_posts): + scheduled_at = start + timedelta(minutes=i * series.interval_minutes) + + # Create the actual post + post = Post( + content=thread_post.content, + content_type="thread", + platforms=[series.platform], + status="scheduled", + scheduled_at=scheduled_at, + hashtags=series.hashtags, + image_url=thread_post.image_url + ) + + db.add(post) + db.flush() + + thread_post.post_id = post.id + thread_post.scheduled_at = scheduled_at + thread_post.status = "scheduled" + + series.status = "scheduled" + db.commit() + + return { + "success": True, + "series_id": series_id, + "start_time": start.isoformat(), + "posts_scheduled": len(thread_posts) + } + + except Exception as e: + logger.error(f"Error scheduling series: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def publish_next_post(self, series_id: int) -> Dict: + """ + Publish the next pending post in a series. + + Args: + series_id: ID of the series + + Returns: + Dict with publish result + """ + db = self._get_db() + + try: + series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() + if not series: + return {"success": False, "error": "Series not found"} + + # Find next unpublished post + next_post = db.query(ThreadPost).filter( + ThreadPost.series_id == series_id, + ThreadPost.status.in_(["pending", "scheduled"]) + ).order_by(ThreadPost.sequence_number).first() + + if not next_post: + # All posts published + series.status = "completed" + series.completed_at = datetime.utcnow() + db.commit() + return { + "success": True, + "message": "All posts in series have been published", + "series_completed": True + } + + # Publish the post (this would normally trigger the publish task) + # For now, just mark it and let the scheduler handle it + + if next_post.sequence_number == 1: + series.status = "publishing" + + # Update series progress + series.posts_published = db.query(ThreadPost).filter( + ThreadPost.series_id == series_id, + ThreadPost.status == "published" + ).count() + + db.commit() + + return { + "success": True, + "post_id": next_post.post_id, + "sequence_number": next_post.sequence_number, + "total_posts": series.total_posts + } + + except Exception as e: + logger.error(f"Error publishing next post: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + async def get_series(self, series_id: int) -> Optional[Dict]: + """Get a series with its posts.""" + db = self._get_db() + + try: + series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() + if series: + return series.to_dict(include_posts=True) + return None + + finally: + db.close() + + async def get_series_list( + self, + status: str = None, + platform: str = None, + limit: int = 20 + ) -> List[Dict]: + """Get list of thread series.""" + db = self._get_db() + + try: + query = db.query(ThreadSeries) + + if status: + query = query.filter(ThreadSeries.status == status) + if platform: + query = query.filter(ThreadSeries.platform == platform) + + series_list = query.order_by(ThreadSeries.created_at.desc()).limit(limit).all() + return [s.to_dict(include_posts=False) for s in series_list] + + finally: + db.close() + + async def cancel_series(self, series_id: int) -> Dict: + """Cancel a series and its scheduled posts.""" + db = self._get_db() + + try: + series = db.query(ThreadSeries).filter(ThreadSeries.id == series_id).first() + if not series: + return {"success": False, "error": "Series not found"} + + series.status = "cancelled" + + # Cancel scheduled posts + thread_posts = db.query(ThreadPost).filter( + ThreadPost.series_id == series_id, + ThreadPost.status == "scheduled" + ).all() + + for tp in thread_posts: + tp.status = "cancelled" + if tp.post_id: + post = db.query(Post).filter(Post.id == tp.post_id).first() + if post and post.status == "scheduled": + post.status = "cancelled" + + db.commit() + + return {"success": True, "message": "Series cancelled"} + + except Exception as e: + logger.error(f"Error cancelling series: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + +# Global instance +thread_service = ThreadService() diff --git a/app/worker/celery_app.py b/app/worker/celery_app.py index 25d61d1..a9aeb37 100644 --- a/app/worker/celery_app.py +++ b/app/worker/celery_app.py @@ -49,6 +49,12 @@ celery_app.conf.update( "schedule": 60.0, # Every minute }, + # Check and publish thread series posts every minute + "check-thread-schedules": { + "task": "app.worker.tasks.check_thread_schedules", + "schedule": 60.0, # Every minute + }, + # Generate daily content at 6 AM "generate-daily-content": { "task": "app.worker.tasks.generate_daily_content", @@ -72,5 +78,53 @@ celery_app.conf.update( "task": "app.worker.tasks.cleanup_old_data", "schedule": crontab(hour=3, minute=0, day_of_week=0), # Sunday 3 AM }, + + # Fetch post metrics every 15 minutes + "fetch-post-metrics": { + "task": "app.worker.tasks.fetch_post_metrics", + "schedule": crontab(minute="*/15"), + }, + + # Generate weekly analytics report on Sunday at 9 AM + "generate-weekly-report": { + "task": "app.worker.tasks.generate_weekly_analytics_report", + "schedule": crontab(hour=9, minute=0, day_of_week=0), # Sunday 9 AM + }, + + # Recalculate optimal times weekly on Monday at 2 AM + "recalculate-optimal-times": { + "task": "app.worker.tasks.recalculate_optimal_times", + "schedule": crontab(hour=2, minute=0, day_of_week=1), # Monday 2 AM + }, + + # Sync products from Odoo daily at 6 AM + "sync-products-odoo": { + "task": "app.worker.tasks.sync_products_from_odoo", + "schedule": crontab(hour=6, minute=0), + }, + + # Sync services from Odoo daily at 6:05 AM + "sync-services-odoo": { + "task": "app.worker.tasks.sync_services_from_odoo", + "schedule": crontab(hour=6, minute=5), + }, + + # Export leads to Odoo every hour + "export-leads-odoo": { + "task": "app.worker.tasks.export_leads_to_odoo", + "schedule": crontab(minute=30), # Every hour at :30 + }, + + # Evaluate A/B tests every hour + "evaluate-ab-tests": { + "task": "app.worker.tasks.evaluate_ab_tests", + "schedule": crontab(minute=0), # Every hour at :00 + }, + + # Auto-recycle content daily at 2 AM + "auto-recycle-content": { + "task": "app.worker.tasks.auto_recycle_content", + "schedule": crontab(hour=2, minute=0), + }, }, ) diff --git a/app/worker/tasks.py b/app/worker/tasks.py index b701950..c3f45f9 100644 --- a/app/worker/tasks.py +++ b/app/worker/tasks.py @@ -566,6 +566,467 @@ def send_daily_summary(): db.close() +# ============================================================ +# THREAD SERIES TASKS +# ============================================================ + +@shared_task +def check_thread_schedules(): + """ + Check for thread posts that need to be published. + Runs every minute via Celery Beat. + """ + logger.info("Checking thread schedules...") + + db = SessionLocal() + + try: + from app.models.thread_series import ThreadSeries, ThreadPost + + now = datetime.utcnow() + window_start = now - timedelta(minutes=1) + + # Find thread posts scheduled for now + scheduled_posts = db.query(ThreadPost).filter( + ThreadPost.status == "scheduled", + ThreadPost.scheduled_at <= now, + ThreadPost.scheduled_at > window_start + ).all() + + published = 0 + + for thread_post in scheduled_posts: + try: + # Queue the publish task for the actual post + if thread_post.post_id: + publish_post.delay(thread_post.post_id) + published += 1 + logger.info(f"Queued thread post {thread_post.id} (post {thread_post.post_id})") + + except Exception as e: + logger.error(f"Error publishing thread post {thread_post.id}: {e}") + thread_post.status = "failed" + thread_post.error_message = str(e) + + db.commit() + + logger.info(f"Queued {published} thread posts for publishing") + + return {"success": True, "published": published} + + except Exception as e: + logger.error(f"Error in check_thread_schedules: {e}") + return {"success": False, "error": str(e)} + + finally: + db.close() + + +@shared_task +def update_thread_post_status(post_id: int, platform_post_id: str = None): + """ + Update thread post status after publishing. + + Args: + post_id: The Post ID that was published + platform_post_id: The platform-specific post ID (for reply chains) + """ + db = SessionLocal() + + try: + from app.models.thread_series import ThreadSeries, ThreadPost + from app.models.post import Post + + # Find the thread post associated with this post + thread_post = db.query(ThreadPost).filter( + ThreadPost.post_id == post_id + ).first() + + if not thread_post: + return {"success": True, "message": "Not a thread post"} + + post = db.query(Post).filter(Post.id == post_id).first() + + if post and post.status == "published": + thread_post.status = "published" + thread_post.published_at = datetime.utcnow() + + if platform_post_id: + thread_post.platform_post_id = platform_post_id + + # Update series progress + series = db.query(ThreadSeries).filter( + ThreadSeries.id == thread_post.series_id + ).first() + + if series: + series.posts_published = db.query(ThreadPost).filter( + ThreadPost.series_id == series.id, + ThreadPost.status == "published" + ).count() + + # Check if series is complete + if series.posts_published >= series.total_posts: + series.status = "completed" + series.completed_at = datetime.utcnow() + + # Store first post ID for reply chain + if thread_post.sequence_number == 1 and platform_post_id: + series.first_platform_post_id = platform_post_id + + elif post and post.status == "failed": + thread_post.status = "failed" + thread_post.error_message = post.error_message + + db.commit() + + return {"success": True} + + except Exception as e: + logger.error(f"Error updating thread post status: {e}") + return {"success": False, "error": str(e)} + + finally: + db.close() + + +# ============================================================ +# A/B TESTING & RECYCLING TASKS +# ============================================================ + +@shared_task +def evaluate_ab_tests(): + """ + Evaluate running A/B tests and update metrics. + Runs every hour via Celery Beat. + """ + logger.info("Evaluating A/B tests...") + + db = SessionLocal() + + try: + from app.models.ab_test import ABTest + from app.services.ab_testing_service import ab_testing_service + + # Get running tests + running_tests = db.query(ABTest).filter( + ABTest.status == "running" + ).all() + + evaluated = 0 + + for test in running_tests: + try: + # Check if test duration has elapsed + if test.started_at: + elapsed_hours = (datetime.utcnow() - test.started_at).total_seconds() / 3600 + if elapsed_hours >= test.duration_hours: + # Evaluate and complete the test + result = run_async(ab_testing_service.evaluate_test(test.id)) + logger.info(f"Evaluated A/B test {test.id}: {result}") + evaluated += 1 + else: + # Just update metrics + run_async(ab_testing_service.update_variant_metrics(test.id)) + + except Exception as e: + logger.error(f"Error evaluating test {test.id}: {e}") + + logger.info(f"Evaluated {evaluated} A/B tests") + + return {"success": True, "evaluated": evaluated} + + except Exception as e: + logger.error(f"Error in evaluate_ab_tests: {e}") + return {"success": False, "error": str(e)} + + finally: + db.close() + + +@shared_task +def auto_recycle_content(): + """ + Automatically recycle high-performing content. + Runs daily at 2 AM via Celery Beat. + """ + logger.info("Auto-recycling content...") + + try: + from app.services.recycling_service import recycling_service + + # Recycle 1 post per platform with high engagement + platforms = ["x", "threads"] + total_recycled = 0 + + for platform in platforms: + result = run_async(recycling_service.auto_recycle( + platform=platform, + count=1, + min_engagement_rate=3.0 # Only recycle really good posts + )) + + if result.get("success"): + total_recycled += result.get("recycled", 0) + logger.info(f"Auto-recycled {result.get('recycled', 0)} posts for {platform}") + + return {"success": True, "recycled": total_recycled} + + except Exception as e: + logger.error(f"Error in auto_recycle_content: {e}") + return {"success": False, "error": str(e)} + + +# ============================================================ +# ODOO SYNC TASKS +# ============================================================ + +@shared_task +def sync_products_from_odoo(): + """ + Sync products from Odoo ERP. + Runs daily at 6 AM via Celery Beat. + """ + logger.info("Syncing products from Odoo...") + + if not settings.ODOO_SYNC_ENABLED: + logger.info("Odoo sync disabled, skipping") + return {"success": False, "error": "Odoo sync disabled"} + + try: + from app.services.odoo_service import odoo_service + + result = run_async(odoo_service.sync_products(limit=200)) + + if result.get("success"): + logger.info(f"Synced {result.get('processed', 0)} products from Odoo") + else: + logger.error(f"Odoo product sync failed: {result.get('error')}") + + return result + + except Exception as e: + logger.error(f"Error in sync_products_from_odoo: {e}") + return {"success": False, "error": str(e)} + + +@shared_task +def sync_services_from_odoo(): + """ + Sync services from Odoo ERP. + Runs daily at 6 AM via Celery Beat. + """ + logger.info("Syncing services from Odoo...") + + if not settings.ODOO_SYNC_ENABLED: + logger.info("Odoo sync disabled, skipping") + return {"success": False, "error": "Odoo sync disabled"} + + try: + from app.services.odoo_service import odoo_service + + result = run_async(odoo_service.sync_services(limit=100)) + + if result.get("success"): + logger.info(f"Synced {result.get('processed', 0)} services from Odoo") + else: + logger.error(f"Odoo service sync failed: {result.get('error')}") + + return result + + except Exception as e: + logger.error(f"Error in sync_services_from_odoo: {e}") + return {"success": False, "error": str(e)} + + +@shared_task +def export_leads_to_odoo(): + """ + Export unsynced leads to Odoo CRM. + Runs every hour via Celery Beat. + """ + logger.info("Exporting leads to Odoo...") + + if not settings.ODOO_SYNC_ENABLED: + logger.info("Odoo sync disabled, skipping") + return {"success": False, "error": "Odoo sync disabled"} + + try: + from app.services.odoo_service import odoo_service + + result = run_async(odoo_service.export_leads_to_odoo()) + + if result.get("success"): + logger.info(f"Exported {result.get('created', 0)} leads to Odoo") + else: + logger.error(f"Odoo lead export failed: {result.get('error')}") + + return result + + except Exception as e: + logger.error(f"Error in export_leads_to_odoo: {e}") + return {"success": False, "error": str(e)} + + +# ============================================================ +# ANALYTICS TASKS +# ============================================================ + +@shared_task +def fetch_post_metrics(): + """ + Fetch and record metrics for recent posts. + Runs every 15 minutes via Celery Beat. + """ + logger.info("Fetching post metrics...") + + db = SessionLocal() + + try: + from app.models.post import Post + from app.services.analytics_service import analytics_service + from app.publishers.manager import publisher_manager, Platform + + # Get posts published in the last 7 days + recent_posts = db.query(Post).filter( + Post.status == "published", + Post.published_at >= datetime.utcnow() - timedelta(days=7) + ).all() + + updated = 0 + + for post in recent_posts: + for platform_name in post.platforms: + try: + platform = Platform(platform_name) + publisher = publisher_manager.get_publisher(platform) + + if not publisher: + continue + + # Get platform post ID + platform_ids = post.platform_post_ids or {} + platform_post_id = platform_ids.get(platform_name) + + if not platform_post_id: + continue + + # Fetch metrics from platform API + metrics = run_async(publisher.get_post_metrics(platform_post_id)) + + if metrics: + # Record metrics snapshot + run_async(analytics_service.record_post_metrics( + post_id=post.id, + platform=platform_name, + metrics=metrics + )) + + # Update post.metrics with latest + if not post.metrics: + post.metrics = {} + + post.metrics.update({ + "likes": metrics.get("likes", post.metrics.get("likes", 0)), + "comments": metrics.get("comments", post.metrics.get("comments", 0)), + "shares": metrics.get("shares", 0) + metrics.get("retweets", 0), + "impressions": metrics.get("impressions", post.metrics.get("impressions", 0)), + "reach": metrics.get("reach", post.metrics.get("reach", 0)) + }) + + updated += 1 + + except Exception as e: + logger.error(f"Error fetching metrics for post {post.id} on {platform_name}: {e}") + + db.commit() + logger.info(f"Updated metrics for {updated} post-platform combinations") + + return {"success": True, "updated": updated} + + except Exception as e: + logger.error(f"Error in fetch_post_metrics: {e}") + db.rollback() + return {"success": False, "error": str(e)} + + finally: + db.close() + + +@shared_task +def generate_weekly_analytics_report(): + """ + Generate and optionally send weekly analytics report. + Runs every Sunday at 9 AM via Celery Beat. + """ + logger.info("Generating weekly analytics report...") + + try: + from app.services.analytics_service import analytics_service + from app.services.notifications import notification_service + + # Generate report + report = run_async(analytics_service.generate_weekly_report()) + + logger.info(f"Generated report {report.id} for {report.period_start} - {report.period_end}") + + # Send via Telegram if configured + if settings.TELEGRAM_BOT_TOKEN and settings.TELEGRAM_CHAT_ID: + if report.summary_text: + success = run_async(notification_service.notify_daily_summary({ + "custom_message": report.summary_text + })) + + if success: + logger.info("Weekly report sent to Telegram") + else: + logger.warning("Failed to send report to Telegram") + + return { + "success": True, + "report_id": report.id, + "total_posts": report.total_posts, + "total_engagements": report.total_engagements + } + + except Exception as e: + logger.error(f"Error generating weekly report: {e}") + return {"success": False, "error": str(e)} + + +@shared_task +def recalculate_optimal_times(): + """ + Recalculate optimal posting times based on historical data. + Runs weekly via Celery Beat. + """ + logger.info("Recalculating optimal posting times...") + + try: + from app.services.analytics_service import analytics_service + + # Calculate for each platform + platforms = ["x", "threads", "instagram", "facebook", None] + results = {} + + for platform in platforms: + times = run_async(analytics_service.get_optimal_times( + platform=platform, + days=90 + )) + + platform_key = platform or "all" + results[platform_key] = len(times) + + logger.info(f"Calculated {len(times)} optimal time slots for {platform_key}") + + return {"success": True, "results": results} + + except Exception as e: + logger.error(f"Error recalculating optimal times: {e}") + return {"success": False, "error": str(e)} + + # ============================================================ # MAINTENANCE TASKS # ============================================================ diff --git a/dashboard/templates/analytics.html b/dashboard/templates/analytics.html new file mode 100644 index 0000000..42b6c24 --- /dev/null +++ b/dashboard/templates/analytics.html @@ -0,0 +1,446 @@ + + + + + + Analytics - Social Media Automation + + + + + + +
+
+

+ Consultoría AS - Analytics +

+ +
+
+ +
+ +
+

Dashboard de Analytics

+
+ + + +
+
+ + +
+
+
-
+
Posts Publicados
+
+
+
-
+
Impresiones
+
+
+
-
+
Interacciones
+
+
+
-
+
Engagement Rate
+
+
+
-
+
Por Responder
+
+
+ + +
+
+
+
Likes
+
-
+
+
❤️
+
+
+
+
Comentarios
+
-
+
+
💬
+
+
+
+
Compartidos
+
-
+
+
🔄
+
+
+ +
+ +
+

Tendencia de Engagement

+ +
+ + +
+

Por Plataforma

+
+ +
+
+
+ +
+ +
+

Top Posts por Engagement

+
+ +
+
+ + +
+

Mejores Horarios

+
+ +
+
+
+ + +
+

Rendimiento por Tipo de Contenido

+
+ +
+
+ + +
+
+

Reportes Anteriores

+ +
+
+ +
+
+
+ + + + diff --git a/dashboard/templates/leads.html b/dashboard/templates/leads.html new file mode 100644 index 0000000..b27af6e --- /dev/null +++ b/dashboard/templates/leads.html @@ -0,0 +1,557 @@ + + + + + + Leads - Social Media Automation + + + + + +
+
+

+ Consultoría AS - Leads +

+ +
+
+ +
+ +
+
+
-
+
Total Leads
+
+
+
-
+
Nuevos
+
+
+
-
+
Contactados
+
+
+
-
+
Calificados
+
+
+
-
+
Sin Sincronizar
+
+
+ + +
+
+ + + +
+
+ + +
+
+ + +
+
+ +
+
+ + +
+ + Página 1 + +
+
+ + + + + + + + + +