Implementación inicial del sistema de automatización de redes sociales

- Estructura completa del proyecto con FastAPI
- Modelos de base de datos (productos, servicios, posts, calendario, interacciones)
- Publishers para X, Threads, Instagram, Facebook
- Generador de contenido con DeepSeek API
- Worker de Celery con tareas programadas
- Dashboard básico con templates HTML
- Docker Compose para despliegue
- Documentación completa

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 01:11:44 +00:00
commit 049d2133f9
53 changed files with 5876 additions and 0 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Social Media Automation - Consultoría AS

1
app/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
# API Routes

View File

@@ -0,0 +1 @@
# API Route modules

201
app/api/routes/calendar.py Normal file
View File

@@ -0,0 +1,201 @@
"""
API Routes para gestión del Calendario de Contenido.
"""
from datetime import datetime, time
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.content_calendar import ContentCalendar
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class CalendarEntryCreate(BaseModel):
day_of_week: int # 0=Lunes, 6=Domingo
time: str # Formato "HH:MM"
content_type: str
platforms: List[str]
requires_approval: bool = False
category_filter: Optional[str] = None
priority: int = 0
description: Optional[str] = None
class Config:
from_attributes = True
class CalendarEntryUpdate(BaseModel):
day_of_week: Optional[int] = None
time: Optional[str] = None
content_type: Optional[str] = None
platforms: Optional[List[str]] = None
is_active: Optional[bool] = None
requires_approval: Optional[bool] = None
category_filter: Optional[str] = None
priority: Optional[int] = None
description: Optional[str] = None
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/")
async def list_calendar_entries(
day_of_week: Optional[int] = Query(None),
content_type: Optional[str] = Query(None),
platform: Optional[str] = Query(None),
is_active: Optional[bool] = Query(True),
db: Session = Depends(get_db)
):
"""Listar entradas del calendario."""
query = db.query(ContentCalendar)
if day_of_week is not None:
query = query.filter(ContentCalendar.day_of_week == day_of_week)
if content_type:
query = query.filter(ContentCalendar.content_type == content_type)
if is_active is not None:
query = query.filter(ContentCalendar.is_active == is_active)
if platform:
query = query.filter(ContentCalendar.platforms.contains([platform]))
entries = query.order_by(
ContentCalendar.day_of_week,
ContentCalendar.time
).all()
return [e.to_dict() for e in entries]
@router.get("/week")
async def get_week_schedule(db: Session = Depends(get_db)):
"""Obtener el calendario semanal completo."""
entries = db.query(ContentCalendar).filter(
ContentCalendar.is_active == True
).order_by(
ContentCalendar.day_of_week,
ContentCalendar.time
).all()
# Organizar por día
week = {i: [] for i in range(7)}
for entry in entries:
week[entry.day_of_week].append(entry.to_dict())
days = ["Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado", "Domingo"]
return {
days[i]: week[i] for i in range(7)
}
@router.get("/today")
async def get_today_schedule(db: Session = Depends(get_db)):
"""Obtener el calendario de hoy."""
today = datetime.now().weekday()
entries = db.query(ContentCalendar).filter(
ContentCalendar.day_of_week == today,
ContentCalendar.is_active == True
).order_by(ContentCalendar.time).all()
return [e.to_dict() for e in entries]
@router.get("/{entry_id}")
async def get_calendar_entry(entry_id: int, db: Session = Depends(get_db)):
"""Obtener una entrada del calendario por ID."""
entry = db.query(ContentCalendar).filter(ContentCalendar.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Entrada no encontrada")
return entry.to_dict()
@router.post("/")
async def create_calendar_entry(entry_data: CalendarEntryCreate, db: Session = Depends(get_db)):
"""Crear una nueva entrada en el calendario."""
# Parsear hora
hour, minute = map(int, entry_data.time.split(":"))
entry_time = time(hour=hour, minute=minute)
entry = ContentCalendar(
day_of_week=entry_data.day_of_week,
time=entry_time,
content_type=entry_data.content_type,
platforms=entry_data.platforms,
requires_approval=entry_data.requires_approval,
category_filter=entry_data.category_filter,
priority=entry_data.priority,
description=entry_data.description
)
db.add(entry)
db.commit()
db.refresh(entry)
return entry.to_dict()
@router.put("/{entry_id}")
async def update_calendar_entry(
entry_id: int,
entry_data: CalendarEntryUpdate,
db: Session = Depends(get_db)
):
"""Actualizar una entrada del calendario."""
entry = db.query(ContentCalendar).filter(ContentCalendar.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Entrada no encontrada")
update_data = entry_data.dict(exclude_unset=True)
# Parsear hora si se proporciona
if "time" in update_data and update_data["time"]:
hour, minute = map(int, update_data["time"].split(":"))
update_data["time"] = time(hour=hour, minute=minute)
for field, value in update_data.items():
setattr(entry, field, value)
entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(entry)
return entry.to_dict()
@router.delete("/{entry_id}")
async def delete_calendar_entry(entry_id: int, db: Session = Depends(get_db)):
"""Eliminar una entrada del calendario."""
entry = db.query(ContentCalendar).filter(ContentCalendar.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Entrada no encontrada")
db.delete(entry)
db.commit()
return {"message": "Entrada eliminada", "entry_id": entry_id}
@router.post("/{entry_id}/toggle")
async def toggle_calendar_entry(entry_id: int, db: Session = Depends(get_db)):
"""Activar/desactivar una entrada del calendario."""
entry = db.query(ContentCalendar).filter(ContentCalendar.id == entry_id).first()
if not entry:
raise HTTPException(status_code=404, detail="Entrada no encontrada")
entry.is_active = not entry.is_active
db.commit()
return {
"message": f"Entrada {'activada' if entry.is_active else 'desactivada'}",
"is_active": entry.is_active
}

118
app/api/routes/dashboard.py Normal file
View File

@@ -0,0 +1,118 @@
"""
API Routes para el Dashboard.
"""
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.core.database import get_db
from app.models.post import Post
from app.models.interaction import Interaction
router = APIRouter()
templates = Jinja2Templates(directory="dashboard/templates")
@router.get("/", response_class=HTMLResponse)
async def dashboard_home(request: Request, db: Session = Depends(get_db)):
"""Página principal del dashboard."""
# Estadísticas
now = datetime.utcnow()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_start = today_start - timedelta(days=now.weekday())
stats = {
"posts_today": db.query(Post).filter(
Post.published_at >= today_start
).count(),
"posts_week": db.query(Post).filter(
Post.published_at >= week_start
).count(),
"pending_approval": db.query(Post).filter(
Post.status == "pending_approval"
).count(),
"scheduled": db.query(Post).filter(
Post.status == "scheduled"
).count(),
"interactions_pending": db.query(Interaction).filter(
Interaction.responded == False,
Interaction.is_archived == False
).count()
}
# Posts pendientes
pending_posts = db.query(Post).filter(
Post.status == "pending_approval"
).order_by(Post.scheduled_at.asc()).limit(5).all()
# Próximas publicaciones
scheduled_posts = db.query(Post).filter(
Post.status == "scheduled",
Post.scheduled_at >= now
).order_by(Post.scheduled_at.asc()).limit(5).all()
# Interacciones recientes
recent_interactions = db.query(Interaction).filter(
Interaction.responded == False
).order_by(Interaction.interaction_at.desc()).limit(5).all()
return templates.TemplateResponse("index.html", {
"request": request,
"stats": stats,
"pending_posts": [p.to_dict() for p in pending_posts],
"scheduled_posts": [p.to_dict() for p in scheduled_posts],
"recent_interactions": [i.to_dict() for i in recent_interactions]
})
@router.get("/posts", response_class=HTMLResponse)
async def dashboard_posts(request: Request, db: Session = Depends(get_db)):
"""Página de gestión de posts."""
posts = db.query(Post).order_by(Post.created_at.desc()).limit(50).all()
return templates.TemplateResponse("posts.html", {
"request": request,
"posts": [p.to_dict() for p in posts]
})
@router.get("/calendar", response_class=HTMLResponse)
async def dashboard_calendar(request: Request):
"""Página de calendario."""
return templates.TemplateResponse("calendar.html", {
"request": request
})
@router.get("/interactions", response_class=HTMLResponse)
async def dashboard_interactions(request: Request, db: Session = Depends(get_db)):
"""Página de interacciones."""
interactions = db.query(Interaction).filter(
Interaction.is_archived == False
).order_by(Interaction.interaction_at.desc()).limit(50).all()
return templates.TemplateResponse("interactions.html", {
"request": request,
"interactions": [i.to_dict() for i in interactions]
})
@router.get("/products", response_class=HTMLResponse)
async def dashboard_products(request: Request):
"""Página de productos."""
return templates.TemplateResponse("products.html", {
"request": request
})
@router.get("/services", response_class=HTMLResponse)
async def dashboard_services(request: Request):
"""Página de servicios."""
return templates.TemplateResponse("services.html", {
"request": request
})

View File

@@ -0,0 +1,226 @@
"""
API Routes para gestión de Interacciones.
"""
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.interaction import Interaction
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class InteractionResponse(BaseModel):
content: str
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/")
async def list_interactions(
platform: Optional[str] = Query(None),
interaction_type: Optional[str] = Query(None),
responded: Optional[bool] = Query(None),
is_lead: Optional[bool] = Query(None),
is_read: Optional[bool] = Query(None),
limit: int = Query(50, le=100),
offset: int = Query(0),
db: Session = Depends(get_db)
):
"""Listar interacciones con filtros."""
query = db.query(Interaction)
if platform:
query = query.filter(Interaction.platform == platform)
if interaction_type:
query = query.filter(Interaction.interaction_type == interaction_type)
if responded is not None:
query = query.filter(Interaction.responded == responded)
if is_lead is not None:
query = query.filter(Interaction.is_lead == is_lead)
if is_read is not None:
query = query.filter(Interaction.is_read == is_read)
interactions = query.order_by(
Interaction.interaction_at.desc()
).offset(offset).limit(limit).all()
return [i.to_dict() for i in interactions]
@router.get("/pending")
async def list_pending_interactions(db: Session = Depends(get_db)):
"""Listar interacciones sin responder."""
interactions = db.query(Interaction).filter(
Interaction.responded == False,
Interaction.is_archived == False
).order_by(
Interaction.priority.desc(),
Interaction.interaction_at.desc()
).all()
return [i.to_dict() for i in interactions]
@router.get("/leads")
async def list_leads(db: Session = Depends(get_db)):
"""Listar interacciones marcadas como leads."""
interactions = db.query(Interaction).filter(
Interaction.is_lead == True
).order_by(Interaction.interaction_at.desc()).all()
return [i.to_dict() for i in interactions]
@router.get("/stats")
async def get_interaction_stats(db: Session = Depends(get_db)):
"""Obtener estadísticas de interacciones."""
total = db.query(Interaction).count()
pending = db.query(Interaction).filter(
Interaction.responded == False,
Interaction.is_archived == False
).count()
leads = db.query(Interaction).filter(Interaction.is_lead == True).count()
# Por plataforma
by_platform = {}
for platform in ["x", "threads", "instagram", "facebook"]:
by_platform[platform] = db.query(Interaction).filter(
Interaction.platform == platform
).count()
return {
"total": total,
"pending": pending,
"leads": leads,
"by_platform": by_platform
}
@router.get("/{interaction_id}")
async def get_interaction(interaction_id: int, db: Session = Depends(get_db)):
"""Obtener una interacción por ID."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
return interaction.to_dict()
@router.post("/{interaction_id}/respond")
async def respond_to_interaction(
interaction_id: int,
response_data: InteractionResponse,
db: Session = Depends(get_db)
):
"""Responder a una interacción."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
# TODO: Enviar respuesta a la plataforma correspondiente
# from app.publishers import get_publisher
# publisher = get_publisher(interaction.platform)
# result = await publisher.reply(interaction.external_id, response_data.content)
interaction.responded = True
interaction.response_content = response_data.content
interaction.responded_at = datetime.utcnow()
db.commit()
return {
"message": "Respuesta guardada",
"interaction_id": interaction_id,
"note": "La publicación a la plataforma está en desarrollo"
}
@router.post("/{interaction_id}/suggest-response")
async def suggest_response(interaction_id: int, db: Session = Depends(get_db)):
"""Generar sugerencia de respuesta con IA."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
# TODO: Implementar generación con DeepSeek
# from app.services.content_generator import generate_response_suggestion
# suggestions = await generate_response_suggestion(interaction)
# Respuestas placeholder
suggestions = [
f"¡Gracias por tu comentario! Nos da gusto que te interese nuestro contenido.",
f"¡Hola! Gracias por escribirnos. ¿En qué podemos ayudarte?",
f"¡Excelente pregunta! Te respondemos por DM para darte más detalles."
]
return {
"suggestions": suggestions,
"note": "La generación con IA está en desarrollo"
}
@router.post("/{interaction_id}/mark-as-lead")
async def mark_as_lead(interaction_id: int, db: Session = Depends(get_db)):
"""Marcar interacción como lead potencial."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
interaction.is_lead = not interaction.is_lead
db.commit()
return {
"message": f"{'Marcado' if interaction.is_lead else 'Desmarcado'} como lead",
"is_lead": interaction.is_lead
}
@router.post("/{interaction_id}/mark-as-read")
async def mark_as_read(interaction_id: int, db: Session = Depends(get_db)):
"""Marcar interacción como leída."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
interaction.is_read = True
db.commit()
return {"message": "Marcado como leído", "interaction_id": interaction_id}
@router.post("/{interaction_id}/archive")
async def archive_interaction(interaction_id: int, db: Session = Depends(get_db)):
"""Archivar una interacción."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
interaction.is_archived = True
db.commit()
return {"message": "Interacción archivada", "interaction_id": interaction_id}
@router.delete("/{interaction_id}")
async def delete_interaction(interaction_id: int, db: Session = Depends(get_db)):
"""Eliminar una interacción."""
interaction = db.query(Interaction).filter(Interaction.id == interaction_id).first()
if not interaction:
raise HTTPException(status_code=404, detail="Interacción no encontrada")
db.delete(interaction)
db.commit()
return {"message": "Interacción eliminada", "interaction_id": interaction_id}

216
app/api/routes/posts.py Normal file
View File

@@ -0,0 +1,216 @@
"""
API Routes para gestión de Posts.
"""
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.post import Post
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class PostCreate(BaseModel):
content: str
content_type: str
platforms: List[str]
scheduled_at: Optional[datetime] = None
image_url: Optional[str] = None
approval_required: bool = False
hashtags: Optional[List[str]] = None
class Config:
from_attributes = True
class PostUpdate(BaseModel):
content: Optional[str] = None
content_x: Optional[str] = None
content_threads: Optional[str] = None
content_instagram: Optional[str] = None
content_facebook: Optional[str] = None
platforms: Optional[List[str]] = None
scheduled_at: Optional[datetime] = None
status: Optional[str] = None
image_url: Optional[str] = None
hashtags: Optional[List[str]] = None
class Config:
from_attributes = True
class PostResponse(BaseModel):
id: int
content: str
content_type: str
platforms: List[str]
status: str
scheduled_at: Optional[datetime]
published_at: Optional[datetime]
image_url: Optional[str]
approval_required: bool
created_at: datetime
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/", response_model=List[PostResponse])
async def list_posts(
status: Optional[str] = Query(None, description="Filtrar por estado"),
content_type: Optional[str] = Query(None, description="Filtrar por tipo"),
platform: Optional[str] = Query(None, description="Filtrar por plataforma"),
limit: int = Query(50, le=100),
offset: int = Query(0),
db: Session = Depends(get_db)
):
"""Listar posts con filtros opcionales."""
query = db.query(Post)
if status:
query = query.filter(Post.status == status)
if content_type:
query = query.filter(Post.content_type == content_type)
if platform:
query = query.filter(Post.platforms.contains([platform]))
posts = query.order_by(Post.created_at.desc()).offset(offset).limit(limit).all()
return posts
@router.get("/pending")
async def list_pending_posts(db: Session = Depends(get_db)):
"""Listar posts pendientes de aprobación."""
posts = db.query(Post).filter(
Post.status == "pending_approval"
).order_by(Post.scheduled_at.asc()).all()
return [post.to_dict() for post in posts]
@router.get("/scheduled")
async def list_scheduled_posts(db: Session = Depends(get_db)):
"""Listar posts programados."""
posts = db.query(Post).filter(
Post.status == "scheduled",
Post.scheduled_at >= datetime.utcnow()
).order_by(Post.scheduled_at.asc()).all()
return [post.to_dict() for post in posts]
@router.get("/{post_id}")
async def get_post(post_id: int, db: Session = Depends(get_db)):
"""Obtener un post por ID."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
return post.to_dict()
@router.post("/", response_model=PostResponse)
async def create_post(post_data: PostCreate, db: Session = Depends(get_db)):
"""Crear un nuevo post."""
post = Post(
content=post_data.content,
content_type=post_data.content_type,
platforms=post_data.platforms,
scheduled_at=post_data.scheduled_at,
image_url=post_data.image_url,
approval_required=post_data.approval_required,
hashtags=post_data.hashtags,
status="pending_approval" if post_data.approval_required else "scheduled"
)
db.add(post)
db.commit()
db.refresh(post)
return post
@router.put("/{post_id}")
async def update_post(post_id: int, post_data: PostUpdate, db: Session = Depends(get_db)):
"""Actualizar un post."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
update_data = post_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(post, field, value)
post.updated_at = datetime.utcnow()
db.commit()
db.refresh(post)
return post.to_dict()
@router.post("/{post_id}/approve")
async def approve_post(post_id: int, db: Session = Depends(get_db)):
"""Aprobar un post pendiente."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
if post.status != "pending_approval":
raise HTTPException(status_code=400, detail="El post no está pendiente de aprobación")
post.status = "scheduled"
post.approved_at = datetime.utcnow()
db.commit()
return {"message": "Post aprobado", "post_id": post_id}
@router.post("/{post_id}/reject")
async def reject_post(post_id: int, db: Session = Depends(get_db)):
"""Rechazar un post pendiente."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
post.status = "cancelled"
db.commit()
return {"message": "Post rechazado", "post_id": post_id}
@router.post("/{post_id}/regenerate")
async def regenerate_post(post_id: int, db: Session = Depends(get_db)):
"""Regenerar contenido de un post con IA."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
# TODO: Implementar regeneración con DeepSeek
# from app.services.content_generator import regenerate_content
# new_content = await regenerate_content(post)
return {"message": "Regeneración en desarrollo", "post_id": post_id}
@router.delete("/{post_id}")
async def delete_post(post_id: int, db: Session = Depends(get_db)):
"""Eliminar un post."""
post = db.query(Post).filter(Post.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post no encontrado")
db.delete(post)
db.commit()
return {"message": "Post eliminado", "post_id": post_id}

180
app/api/routes/products.py Normal file
View File

@@ -0,0 +1,180 @@
"""
API Routes para gestión de Productos.
"""
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.product import Product
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class ProductCreate(BaseModel):
name: str
description: Optional[str] = None
short_description: Optional[str] = None
category: str
subcategory: Optional[str] = None
brand: Optional[str] = None
model: Optional[str] = None
price: float
price_usd: Optional[float] = None
stock: int = 0
specs: Optional[dict] = None
images: Optional[List[str]] = None
main_image: Optional[str] = None
tags: Optional[List[str]] = None
highlights: Optional[List[str]] = None
is_featured: bool = False
class Config:
from_attributes = True
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
short_description: Optional[str] = None
category: Optional[str] = None
subcategory: Optional[str] = None
brand: Optional[str] = None
model: Optional[str] = None
price: Optional[float] = None
price_usd: Optional[float] = None
stock: Optional[int] = None
is_available: Optional[bool] = None
specs: Optional[dict] = None
images: Optional[List[str]] = None
main_image: Optional[str] = None
tags: Optional[List[str]] = None
highlights: Optional[List[str]] = None
is_featured: Optional[bool] = None
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/")
async def list_products(
category: Optional[str] = Query(None),
brand: Optional[str] = Query(None),
is_available: Optional[bool] = Query(None),
is_featured: Optional[bool] = Query(None),
min_price: Optional[float] = Query(None),
max_price: Optional[float] = Query(None),
limit: int = Query(50, le=100),
offset: int = Query(0),
db: Session = Depends(get_db)
):
"""Listar productos con filtros."""
query = db.query(Product)
if category:
query = query.filter(Product.category == category)
if brand:
query = query.filter(Product.brand == brand)
if is_available is not None:
query = query.filter(Product.is_available == is_available)
if is_featured is not None:
query = query.filter(Product.is_featured == is_featured)
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
products = query.order_by(Product.created_at.desc()).offset(offset).limit(limit).all()
return [p.to_dict() for p in products]
@router.get("/categories")
async def list_categories(db: Session = Depends(get_db)):
"""Listar categorías únicas de productos."""
categories = db.query(Product.category).distinct().all()
return [c[0] for c in categories if c[0]]
@router.get("/featured")
async def list_featured_products(db: Session = Depends(get_db)):
"""Listar productos destacados."""
products = db.query(Product).filter(
Product.is_featured == True,
Product.is_available == True
).all()
return [p.to_dict() for p in products]
@router.get("/{product_id}")
async def get_product(product_id: int, db: Session = Depends(get_db)):
"""Obtener un producto por ID."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Producto no encontrado")
return product.to_dict()
@router.post("/")
async def create_product(product_data: ProductCreate, db: Session = Depends(get_db)):
"""Crear un nuevo producto."""
product = Product(**product_data.dict())
db.add(product)
db.commit()
db.refresh(product)
return product.to_dict()
@router.put("/{product_id}")
async def update_product(product_id: int, product_data: ProductUpdate, db: Session = Depends(get_db)):
"""Actualizar un producto."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Producto no encontrado")
update_data = product_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(product, field, value)
product.updated_at = datetime.utcnow()
db.commit()
db.refresh(product)
return product.to_dict()
@router.delete("/{product_id}")
async def delete_product(product_id: int, db: Session = Depends(get_db)):
"""Eliminar un producto."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Producto no encontrado")
db.delete(product)
db.commit()
return {"message": "Producto eliminado", "product_id": product_id}
@router.post("/{product_id}/toggle-featured")
async def toggle_featured(product_id: int, db: Session = Depends(get_db)):
"""Alternar estado de producto destacado."""
product = db.query(Product).filter(Product.id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Producto no encontrado")
product.is_featured = not product.is_featured
db.commit()
return {
"message": f"Producto {'destacado' if product.is_featured else 'no destacado'}",
"is_featured": product.is_featured
}

174
app/api/routes/services.py Normal file
View File

@@ -0,0 +1,174 @@
"""
API Routes para gestión de Servicios.
"""
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db
from app.models.service import Service
router = APIRouter()
# ===========================================
# SCHEMAS
# ===========================================
class ServiceCreate(BaseModel):
name: str
description: str
short_description: Optional[str] = None
category: str
target_sectors: Optional[List[str]] = None
benefits: Optional[List[str]] = None
features: Optional[List[str]] = None
case_studies: Optional[List[dict]] = None
icon: Optional[str] = None
images: Optional[List[str]] = None
main_image: Optional[str] = None
price_range: Optional[str] = None
has_free_demo: bool = False
tags: Optional[List[str]] = None
call_to_action: Optional[str] = None
is_featured: bool = False
class Config:
from_attributes = True
class ServiceUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
short_description: Optional[str] = None
category: Optional[str] = None
target_sectors: Optional[List[str]] = None
benefits: Optional[List[str]] = None
features: Optional[List[str]] = None
case_studies: Optional[List[dict]] = None
icon: Optional[str] = None
images: Optional[List[str]] = None
main_image: Optional[str] = None
price_range: Optional[str] = None
has_free_demo: Optional[bool] = None
tags: Optional[List[str]] = None
call_to_action: Optional[str] = None
is_active: Optional[bool] = None
is_featured: Optional[bool] = None
class Config:
from_attributes = True
# ===========================================
# ENDPOINTS
# ===========================================
@router.get("/")
async def list_services(
category: Optional[str] = Query(None),
target_sector: Optional[str] = Query(None),
is_active: Optional[bool] = Query(True),
is_featured: Optional[bool] = Query(None),
limit: int = Query(50, le=100),
offset: int = Query(0),
db: Session = Depends(get_db)
):
"""Listar servicios con filtros."""
query = db.query(Service)
if category:
query = query.filter(Service.category == category)
if is_active is not None:
query = query.filter(Service.is_active == is_active)
if is_featured is not None:
query = query.filter(Service.is_featured == is_featured)
if target_sector:
query = query.filter(Service.target_sectors.contains([target_sector]))
services = query.order_by(Service.created_at.desc()).offset(offset).limit(limit).all()
return [s.to_dict() for s in services]
@router.get("/categories")
async def list_categories(db: Session = Depends(get_db)):
"""Listar categorías únicas de servicios."""
categories = db.query(Service.category).distinct().all()
return [c[0] for c in categories if c[0]]
@router.get("/featured")
async def list_featured_services(db: Session = Depends(get_db)):
"""Listar servicios destacados."""
services = db.query(Service).filter(
Service.is_featured == True,
Service.is_active == True
).all()
return [s.to_dict() for s in services]
@router.get("/{service_id}")
async def get_service(service_id: int, db: Session = Depends(get_db)):
"""Obtener un servicio por ID."""
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Servicio no encontrado")
return service.to_dict()
@router.post("/")
async def create_service(service_data: ServiceCreate, db: Session = Depends(get_db)):
"""Crear un nuevo servicio."""
service = Service(**service_data.dict())
db.add(service)
db.commit()
db.refresh(service)
return service.to_dict()
@router.put("/{service_id}")
async def update_service(service_id: int, service_data: ServiceUpdate, db: Session = Depends(get_db)):
"""Actualizar un servicio."""
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Servicio no encontrado")
update_data = service_data.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(service, field, value)
service.updated_at = datetime.utcnow()
db.commit()
db.refresh(service)
return service.to_dict()
@router.delete("/{service_id}")
async def delete_service(service_id: int, db: Session = Depends(get_db)):
"""Eliminar un servicio."""
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Servicio no encontrado")
db.delete(service)
db.commit()
return {"message": "Servicio eliminado", "service_id": service_id}
@router.post("/{service_id}/toggle-featured")
async def toggle_featured(service_id: int, db: Session = Depends(get_db)):
"""Alternar estado de servicio destacado."""
service = db.query(Service).filter(Service.id == service_id).first()
if not service:
raise HTTPException(status_code=404, detail="Servicio no encontrado")
service.is_featured = not service.is_featured
db.commit()
return {
"message": f"Servicio {'destacado' if service.is_featured else 'no destacado'}",
"is_featured": service.is_featured
}

1
app/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Core module

60
app/core/config.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Configuración central del sistema.
Carga variables de entorno y define settings globales.
"""
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""Configuración de la aplicación."""
# Aplicación
APP_NAME: str = "social-media-automation"
APP_ENV: str = "development"
DEBUG: bool = True
SECRET_KEY: str = "change-this-in-production"
# Base de datos
DATABASE_URL: str = "postgresql://social_user:social_pass@localhost:5432/social_automation"
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
# DeepSeek API
DEEPSEEK_API_KEY: Optional[str] = None
DEEPSEEK_BASE_URL: str = "https://api.deepseek.com/v1"
# X (Twitter) API
X_API_KEY: Optional[str] = None
X_API_SECRET: Optional[str] = None
X_ACCESS_TOKEN: Optional[str] = None
X_ACCESS_TOKEN_SECRET: Optional[str] = None
X_BEARER_TOKEN: Optional[str] = None
# Meta API (Facebook, Instagram, Threads)
META_APP_ID: Optional[str] = None
META_APP_SECRET: Optional[str] = None
META_ACCESS_TOKEN: Optional[str] = None
FACEBOOK_PAGE_ID: Optional[str] = None
INSTAGRAM_ACCOUNT_ID: Optional[str] = None
THREADS_USER_ID: Optional[str] = None
# Información del negocio
BUSINESS_NAME: str = "Consultoría AS"
BUSINESS_LOCATION: str = "Tijuana, México"
BUSINESS_WEBSITE: str = "https://consultoria-as.com"
CONTENT_TONE: str = "profesional pero cercano, educativo, orientado a soluciones"
# Notificaciones
TELEGRAM_BOT_TOKEN: Optional[str] = None
TELEGRAM_CHAT_ID: Optional[str] = None
class Config:
env_file = ".env"
case_sensitive = True
# Instancia global de configuración
settings = Settings()

31
app/core/database.py Normal file
View File

@@ -0,0 +1,31 @@
"""
Configuración de la base de datos PostgreSQL.
"""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.core.config import settings
# Crear engine de SQLAlchemy
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_size=10,
max_overflow=20
)
# Crear sesión
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Base para modelos
Base = declarative_base()
def get_db():
"""Dependency para obtener sesión de base de datos."""
db = SessionLocal()
try:
yield db
finally:
db.close()

78
app/core/security.py Normal file
View File

@@ -0,0 +1,78 @@
"""
Configuración de seguridad y autenticación.
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.core.config import settings
# Configuración de hashing de contraseñas
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Configuración de JWT
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 horas
# Security scheme
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verificar contraseña contra hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generar hash de contraseña."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Crear token JWT."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decodificar y validar token JWT."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido o expirado",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Obtener usuario actual desde el token."""
token = credentials.credentials
payload = decode_token(token)
username = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token inválido"
)
return {"username": username}

71
app/main.py Normal file
View File

@@ -0,0 +1,71 @@
"""
Social Media Automation - Consultoría AS
=========================================
Sistema automatizado para la creación y publicación de contenido
en redes sociales (X, Threads, Instagram, Facebook).
"""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import posts, products, services, calendar, dashboard, interactions
from app.core.config import settings
from app.core.database import engine
from app.models import Base
# Crear tablas en la base de datos
Base.metadata.create_all(bind=engine)
# Inicializar aplicación FastAPI
app = FastAPI(
title="Social Media Automation",
description="Sistema de automatización de redes sociales para Consultoría AS",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc",
)
# Configurar CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # En producción, especificar dominios
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Montar archivos estáticos
app.mount("/static", StaticFiles(directory="dashboard/static"), name="static")
# Registrar rutas
app.include_router(dashboard.router, prefix="", tags=["Dashboard"])
app.include_router(posts.router, prefix="/api/posts", tags=["Posts"])
app.include_router(products.router, prefix="/api/products", tags=["Products"])
app.include_router(services.router, prefix="/api/services", tags=["Services"])
app.include_router(calendar.router, prefix="/api/calendar", tags=["Calendar"])
app.include_router(interactions.router, prefix="/api/interactions", tags=["Interactions"])
@app.get("/api/health")
async def health_check():
"""Verificar estado del sistema."""
return {
"status": "healthy",
"app": settings.APP_NAME,
"version": "1.0.0"
}
@app.get("/api/stats")
async def get_stats():
"""Obtener estadísticas generales del sistema."""
# TODO: Implementar estadísticas reales desde la BD
return {
"posts_today": 0,
"posts_week": 0,
"posts_month": 0,
"pending_approval": 0,
"scheduled": 0,
"interactions_pending": 0
}

24
app/models/__init__.py Normal file
View File

@@ -0,0 +1,24 @@
"""
Modelos de base de datos SQLAlchemy.
"""
from app.core.database import Base
from app.models.product import Product
from app.models.service import Service
from app.models.tip_template import TipTemplate
from app.models.post import Post
from app.models.content_calendar import ContentCalendar
from app.models.image_template import ImageTemplate
from app.models.interaction import Interaction
__all__ = [
"Base",
"Product",
"Service",
"TipTemplate",
"Post",
"ContentCalendar",
"ImageTemplate",
"Interaction"
]

View File

@@ -0,0 +1,66 @@
"""
Modelo de ContentCalendar - Calendario de publicación.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Time, Boolean, DateTime
from sqlalchemy.dialects.postgresql import ARRAY
from app.core.database import Base
class ContentCalendar(Base):
"""Modelo para el calendario de contenido."""
__tablename__ = "content_calendar"
id = Column(Integer, primary_key=True, index=True)
# Programación
day_of_week = Column(Integer, nullable=False) # 0=Lunes, 6=Domingo
time = Column(Time, nullable=False) # Hora de publicación
# Tipo de contenido
content_type = Column(String(50), nullable=False)
# Tipos: tip_tech, dato_curioso, producto, servicio, etc.
# Plataformas
platforms = Column(ARRAY(String), nullable=False)
# Ejemplo: ["x", "threads", "instagram"]
# Configuración
is_active = Column(Boolean, default=True)
requires_approval = Column(Boolean, default=False)
# Restricciones opcionales
category_filter = Column(String(100), nullable=True) # Solo tips de esta categoría
priority = Column(Integer, default=0) # Mayor = más prioritario
# Descripción
description = Column(String(255), nullable=True)
# Ejemplo: "Tip tech diario matutino"
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<ContentCalendar {self.day_of_week} {self.time} - {self.content_type}>"
def to_dict(self):
"""Convertir a diccionario."""
days = ["Lunes", "Martes", "Miércoles", "Jueves", "Viernes", "Sábado", "Domingo"]
return {
"id": self.id,
"day_of_week": self.day_of_week,
"day_name": days[self.day_of_week] if 0 <= self.day_of_week <= 6 else "Desconocido",
"time": self.time.strftime("%H:%M") if self.time else None,
"content_type": self.content_type,
"platforms": self.platforms,
"is_active": self.is_active,
"requires_approval": self.requires_approval,
"category_filter": self.category_filter,
"priority": self.priority,
"description": self.description,
"created_at": self.created_at.isoformat() if self.created_at else None
}

View File

@@ -0,0 +1,75 @@
"""
Modelo de ImageTemplate - Plantillas para generar imágenes.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, JSON
from sqlalchemy.dialects.postgresql import ARRAY
from app.core.database import Base
class ImageTemplate(Base):
"""Modelo para plantillas de imágenes."""
__tablename__ = "image_templates"
id = Column(Integer, primary_key=True, index=True)
# Información básica
name = Column(String(100), nullable=False, index=True)
description = Column(String(255), nullable=True)
# Categoría
category = Column(String(50), nullable=False)
# Categorías: tip, producto, servicio, promocion, etc.
# Archivo de plantilla
template_file = Column(String(255), nullable=False) # Ruta al archivo HTML/template
# Variables que acepta la plantilla
variables = Column(ARRAY(String), nullable=False)
# Ejemplo: ["titulo", "contenido", "hashtags", "logo"]
# Configuración de diseño
design_config = Column(JSON, nullable=True)
# Ejemplo: {
# "width": 1080,
# "height": 1080,
# "background_color": "#1a1a2e",
# "accent_color": "#d4a574",
# "font_family": "Inter"
# }
# Tamaños de salida
output_sizes = Column(JSON, nullable=True)
# Ejemplo: {
# "instagram": {"width": 1080, "height": 1080},
# "x": {"width": 1200, "height": 675},
# "facebook": {"width": 1200, "height": 630}
# }
# Estado
is_active = Column(Boolean, default=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<ImageTemplate {self.name}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"category": self.category,
"template_file": self.template_file,
"variables": self.variables,
"design_config": self.design_config,
"output_sizes": self.output_sizes,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else None
}

87
app/models/interaction.py Normal file
View File

@@ -0,0 +1,87 @@
"""
Modelo de Interaction - Interacciones en redes sociales.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey
from app.core.database import Base
class Interaction(Base):
"""Modelo para interacciones (comentarios, DMs, menciones)."""
__tablename__ = "interactions"
id = Column(Integer, primary_key=True, index=True)
# Plataforma
platform = Column(String(50), nullable=False, index=True)
# Valores: x, threads, instagram, facebook
# Tipo de interacción
interaction_type = Column(String(50), nullable=False, index=True)
# Tipos: comment, reply, dm, mention, like, repost, quote
# Post relacionado (si aplica)
post_id = Column(Integer, ForeignKey("posts.id"), nullable=True)
# ID externo en la plataforma
external_id = Column(String(100), nullable=False, unique=True)
external_post_id = Column(String(100), nullable=True) # ID del post en la plataforma
# Autor de la interacción
author_username = Column(String(100), nullable=False)
author_name = Column(String(255), nullable=True)
author_profile_url = Column(String(500), nullable=True)
author_avatar_url = Column(String(500), nullable=True)
# Contenido
content = Column(Text, nullable=True)
# Respuesta
responded = Column(Boolean, default=False, index=True)
response_content = Column(Text, nullable=True)
responded_at = Column(DateTime, nullable=True)
response_external_id = Column(String(100), nullable=True)
# Clasificación
is_lead = Column(Boolean, default=False) # Potencial cliente
sentiment = Column(String(50), nullable=True) # positive, negative, neutral
priority = Column(Integer, default=0) # 0=normal, 1=importante, 2=urgente
# Estado
is_read = Column(Boolean, default=False)
is_archived = Column(Boolean, default=False)
# Timestamps
interaction_at = Column(DateTime, nullable=False) # Cuándo ocurrió en la plataforma
created_at = Column(DateTime, default=datetime.utcnow) # Cuándo se guardó aquí
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Interaction {self.platform} - {self.interaction_type} - {self.author_username}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"platform": self.platform,
"interaction_type": self.interaction_type,
"post_id": self.post_id,
"external_id": self.external_id,
"author_username": self.author_username,
"author_name": self.author_name,
"author_profile_url": self.author_profile_url,
"author_avatar_url": self.author_avatar_url,
"content": self.content,
"responded": self.responded,
"response_content": self.response_content,
"responded_at": self.responded_at.isoformat() if self.responded_at else None,
"is_lead": self.is_lead,
"sentiment": self.sentiment,
"priority": self.priority,
"is_read": self.is_read,
"interaction_at": self.interaction_at.isoformat() if self.interaction_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None
}

134
app/models/post.py Normal file
View File

@@ -0,0 +1,134 @@
"""
Modelo de Post - Posts generados y programados.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, ForeignKey, JSON, Enum
from sqlalchemy.dialects.postgresql import ARRAY
import enum
from app.core.database import Base
class PostStatus(enum.Enum):
"""Estados posibles de un post."""
DRAFT = "draft"
PENDING_APPROVAL = "pending_approval"
APPROVED = "approved"
SCHEDULED = "scheduled"
PUBLISHING = "publishing"
PUBLISHED = "published"
FAILED = "failed"
CANCELLED = "cancelled"
class ContentType(enum.Enum):
"""Tipos de contenido."""
TIP_TECH = "tip_tech"
DATO_CURIOSO = "dato_curioso"
FRASE_MOTIVACIONAL = "frase_motivacional"
EFEMERIDE = "efemeride"
PRODUCTO = "producto"
SERVICIO = "servicio"
HILO_EDUCATIVO = "hilo_educativo"
CASO_EXITO = "caso_exito"
PROMOCION = "promocion"
ANUNCIO = "anuncio"
MANUAL = "manual"
class Post(Base):
"""Modelo para posts de redes sociales."""
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
# Contenido
content = Column(Text, nullable=False)
content_type = Column(String(50), nullable=False, index=True)
# Contenido adaptado por plataforma (opcional)
content_x = Column(Text, nullable=True) # Versión para X (280 chars)
content_threads = Column(Text, nullable=True)
content_instagram = Column(Text, nullable=True)
content_facebook = Column(Text, nullable=True)
# Plataformas destino
platforms = Column(ARRAY(String), nullable=False)
# Ejemplo: ["x", "threads", "instagram", "facebook"]
# Estado y programación
status = Column(String(50), default="draft", index=True)
scheduled_at = Column(DateTime, nullable=True, index=True)
published_at = Column(DateTime, nullable=True)
# Imagen
image_url = Column(String(500), nullable=True)
image_template_id = Column(Integer, ForeignKey("image_templates.id"), nullable=True)
# IDs de publicación en cada plataforma
platform_post_ids = Column(JSON, nullable=True)
# Ejemplo: {"x": "123456", "instagram": "789012", ...}
# Errores de publicación
error_message = Column(Text, nullable=True)
retry_count = Column(Integer, default=0)
# Aprobación
approval_required = Column(Boolean, default=False)
approved_by = Column(String(100), nullable=True)
approved_at = Column(DateTime, nullable=True)
# Relaciones con contenido fuente
product_id = Column(Integer, ForeignKey("products.id"), nullable=True)
service_id = Column(Integer, ForeignKey("services.id"), nullable=True)
tip_template_id = Column(Integer, ForeignKey("tip_templates.id"), nullable=True)
# Metadatos
hashtags = Column(ARRAY(String), nullable=True)
mentions = Column(ARRAY(String), nullable=True)
# Métricas (actualizadas después de publicar)
metrics = Column(JSON, nullable=True)
# Ejemplo: {"likes": 10, "retweets": 5, "comments": 3}
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Post {self.id} - {self.status}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"content": self.content,
"content_type": self.content_type,
"content_x": self.content_x,
"content_threads": self.content_threads,
"content_instagram": self.content_instagram,
"content_facebook": self.content_facebook,
"platforms": self.platforms,
"status": self.status,
"scheduled_at": self.scheduled_at.isoformat() if self.scheduled_at else None,
"published_at": self.published_at.isoformat() if self.published_at else None,
"image_url": self.image_url,
"platform_post_ids": self.platform_post_ids,
"error_message": self.error_message,
"approval_required": self.approval_required,
"hashtags": self.hashtags,
"metrics": self.metrics,
"created_at": self.created_at.isoformat() if self.created_at else None
}
def get_content_for_platform(self, platform: str) -> str:
"""Obtener contenido adaptado para una plataforma específica."""
platform_content = {
"x": self.content_x,
"threads": self.content_threads,
"instagram": self.content_instagram,
"facebook": self.content_facebook
}
return platform_content.get(platform) or self.content

80
app/models/product.py Normal file
View File

@@ -0,0 +1,80 @@
"""
Modelo de Producto - Catálogo de equipos de cómputo e impresoras 3D.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Float, Boolean, DateTime, JSON
from sqlalchemy.dialects.postgresql import ARRAY
from app.core.database import Base
class Product(Base):
"""Modelo para productos del catálogo."""
__tablename__ = "products"
id = Column(Integer, primary_key=True, index=True)
# Información básica
name = Column(String(255), nullable=False, index=True)
description = Column(Text, nullable=True)
short_description = Column(String(500), nullable=True) # Para posts
# Categorización
category = Column(String(100), nullable=False, index=True) # laptop, desktop, impresora_3d, etc.
subcategory = Column(String(100), nullable=True)
brand = Column(String(100), nullable=True)
model = Column(String(100), nullable=True)
# Precio y stock
price = Column(Float, nullable=False)
price_usd = Column(Float, nullable=True) # Precio en dólares (opcional)
stock = Column(Integer, default=0)
is_available = Column(Boolean, default=True)
# Especificaciones técnicas (JSON flexible)
specs = Column(JSON, nullable=True)
# Ejemplo: {"cpu": "Intel i7", "ram": "16GB", "storage": "512GB SSD"}
# Imágenes
images = Column(ARRAY(String), nullable=True) # URLs de imágenes
main_image = Column(String(500), nullable=True)
# SEO y marketing
tags = Column(ARRAY(String), nullable=True) # Tags para búsqueda
highlights = Column(ARRAY(String), nullable=True) # Puntos destacados
# Control de publicación
is_featured = Column(Boolean, default=False) # Producto destacado
last_posted_at = Column(DateTime, nullable=True) # Última vez que se publicó
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Product {self.name}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"short_description": self.short_description,
"category": self.category,
"subcategory": self.subcategory,
"brand": self.brand,
"model": self.model,
"price": self.price,
"stock": self.stock,
"is_available": self.is_available,
"specs": self.specs,
"images": self.images,
"main_image": self.main_image,
"tags": self.tags,
"highlights": self.highlights,
"is_featured": self.is_featured,
"created_at": self.created_at.isoformat() if self.created_at else None
}

90
app/models/service.py Normal file
View File

@@ -0,0 +1,90 @@
"""
Modelo de Servicio - Servicios de consultoría de Consultoría AS.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime, JSON
from sqlalchemy.dialects.postgresql import ARRAY
from app.core.database import Base
class Service(Base):
"""Modelo para servicios de consultoría."""
__tablename__ = "services"
id = Column(Integer, primary_key=True, index=True)
# Información básica
name = Column(String(255), nullable=False, index=True)
description = Column(Text, nullable=False)
short_description = Column(String(500), nullable=True) # Para posts
# Categorización
category = Column(String(100), nullable=False, index=True)
# Categorías: ia, automatizacion, desarrollo, iot, voip, crm, etc.
# Sectores objetivo
target_sectors = Column(ARRAY(String), nullable=True)
# Ejemplo: ["hoteles", "construccion", "logistica", "retail"]
# Beneficios y características
benefits = Column(ARRAY(String), nullable=True)
# Ejemplo: ["Ahorro de tiempo", "Reducción de errores", "24/7"]
features = Column(ARRAY(String), nullable=True)
# Características técnicas del servicio
# Casos de éxito
case_studies = Column(JSON, nullable=True)
# Ejemplo: [{"client": "Hotel X", "result": "50% menos tiempo en reservas"}]
# Imágenes e íconos
icon = Column(String(100), nullable=True) # Nombre del ícono
images = Column(ARRAY(String), nullable=True)
main_image = Column(String(500), nullable=True)
# Precios (opcional, puede ser "cotizar")
price_range = Column(String(100), nullable=True) # "Desde $5,000 MXN"
has_free_demo = Column(Boolean, default=False)
# SEO y marketing
tags = Column(ARRAY(String), nullable=True)
call_to_action = Column(String(255), nullable=True) # "Agenda una demo"
# Control de publicación
is_active = Column(Boolean, default=True)
is_featured = Column(Boolean, default=False)
last_posted_at = Column(DateTime, nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<Service {self.name}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"short_description": self.short_description,
"category": self.category,
"target_sectors": self.target_sectors,
"benefits": self.benefits,
"features": self.features,
"case_studies": self.case_studies,
"icon": self.icon,
"images": self.images,
"main_image": self.main_image,
"price_range": self.price_range,
"has_free_demo": self.has_free_demo,
"tags": self.tags,
"call_to_action": self.call_to_action,
"is_active": self.is_active,
"is_featured": self.is_featured,
"created_at": self.created_at.isoformat() if self.created_at else None
}

View File

@@ -0,0 +1,77 @@
"""
Modelo de TipTemplate - Banco de tips para generar contenido automático.
"""
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, Boolean, DateTime
from sqlalchemy.dialects.postgresql import ARRAY
from app.core.database import Base
class TipTemplate(Base):
"""Modelo para templates de tips tech."""
__tablename__ = "tip_templates"
id = Column(Integer, primary_key=True, index=True)
# Categoría del tip
category = Column(String(100), nullable=False, index=True)
# Categorías: hardware, software, seguridad, productividad, ia, redes, etc.
subcategory = Column(String(100), nullable=True)
# Contenido del tip
title = Column(String(255), nullable=False) # Título corto
template = Column(Text, nullable=False) # Template con variables
# Variables que se pueden reemplazar
# Ejemplo: ["sistema_operativo", "herramienta", "beneficio"]
variables = Column(ARRAY(String), nullable=True)
# Variaciones del tip (diferentes formas de decirlo)
variations = Column(ARRAY(String), nullable=True)
# Metadatos
difficulty = Column(String(50), nullable=True) # basico, intermedio, avanzado
target_audience = Column(String(100), nullable=True) # empresas, desarrolladores, general
# Control de uso
used_count = Column(Integer, default=0)
last_used = Column(DateTime, nullable=True)
# Plataformas recomendadas
recommended_platforms = Column(ARRAY(String), nullable=True)
# Ejemplo: ["x", "threads", "instagram"]
# Estado
is_active = Column(Boolean, default=True)
is_evergreen = Column(Boolean, default=True) # ¿Siempre relevante?
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<TipTemplate {self.title}>"
def to_dict(self):
"""Convertir a diccionario."""
return {
"id": self.id,
"category": self.category,
"subcategory": self.subcategory,
"title": self.title,
"template": self.template,
"variables": self.variables,
"variations": self.variations,
"difficulty": self.difficulty,
"target_audience": self.target_audience,
"used_count": self.used_count,
"last_used": self.last_used.isoformat() if self.last_used else None,
"recommended_platforms": self.recommended_platforms,
"is_active": self.is_active,
"is_evergreen": self.is_evergreen,
"created_at": self.created_at.isoformat() if self.created_at else None
}

View File

@@ -0,0 +1,44 @@
Eres el Community Manager de Consultoría AS, una empresa de tecnología ubicada en Tijuana, México.
SOBRE LA EMPRESA:
- Especializada en soluciones de IA, automatización y transformación digital
- Vende equipos de cómputo e impresoras 3D
- Más de 6 años de experiencia
- Soporte 24/7
- Sitio web: https://consultoria-as.com
SERVICIOS PRINCIPALES:
- Integración de IA y automatización inteligente
- Sistemas de gestión para hoteles (nómina, inventario, dashboards)
- Software para construcción (gestión de proyectos, diagramas Gantt)
- Chatbots para WhatsApp e Instagram
- Soluciones IoT
- Monitoreo GPS de flotillas
- Sistemas CRM personalizados
- Comunicaciones VoIP empresariales
PRODUCTOS:
- Laptops y computadoras de escritorio
- Impresoras 3D
- Accesorios y periféricos
TONO DE COMUNICACIÓN:
- Profesional pero cercano
- Educativo y de valor
- Orientado a soluciones, no a vender directamente
- Uso moderado de emojis (1-3 por post)
- Hashtags relevantes (máximo 3-5)
ESTILO INSPIRADO EN:
- @midudev: Tips cortos y accionables
- @MoureDev: Contenido educativo y de comunidad
- @SoyDalto: Accesible para todos los niveles
- @SuGE3K: Tips prácticos y herramientas
REGLAS IMPORTANTES:
- Nunca uses lenguaje ofensivo
- No hagas promesas exageradas
- Sé honesto y transparente
- Enfócate en ayudar y educar
- Adapta el contenido a cada plataforma
- No inventes información técnica

View File

@@ -0,0 +1,35 @@
"""
Publishers para cada plataforma de redes sociales.
"""
from app.publishers.base import BasePublisher
from app.publishers.x_publisher import XPublisher
from app.publishers.threads_publisher import ThreadsPublisher
from app.publishers.instagram_publisher import InstagramPublisher
from app.publishers.facebook_publisher import FacebookPublisher
def get_publisher(platform: str) -> BasePublisher:
"""Obtener el publisher para una plataforma específica."""
publishers = {
"x": XPublisher(),
"threads": ThreadsPublisher(),
"instagram": InstagramPublisher(),
"facebook": FacebookPublisher()
}
publisher = publishers.get(platform.lower())
if not publisher:
raise ValueError(f"Plataforma no soportada: {platform}")
return publisher
__all__ = [
"BasePublisher",
"XPublisher",
"ThreadsPublisher",
"InstagramPublisher",
"FacebookPublisher",
"get_publisher"
]

74
app/publishers/base.py Normal file
View File

@@ -0,0 +1,74 @@
"""
Clase base para publishers de redes sociales.
"""
from abc import ABC, abstractmethod
from typing import Optional, List, Dict
from dataclasses import dataclass
@dataclass
class PublishResult:
"""Resultado de una publicación."""
success: bool
post_id: Optional[str] = None
url: Optional[str] = None
error_message: Optional[str] = None
class BasePublisher(ABC):
"""Clase base abstracta para publishers."""
platform: str = "base"
@abstractmethod
async def publish(
self,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""Publicar contenido en la plataforma."""
pass
@abstractmethod
async def publish_thread(
self,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar un hilo de posts."""
pass
@abstractmethod
async def reply(
self,
post_id: str,
content: str
) -> PublishResult:
"""Responder a un post."""
pass
@abstractmethod
async def like(self, post_id: str) -> bool:
"""Dar like a un post."""
pass
@abstractmethod
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
"""Obtener menciones recientes."""
pass
@abstractmethod
async def get_comments(self, post_id: str) -> List[Dict]:
"""Obtener comentarios de un post."""
pass
@abstractmethod
async def delete(self, post_id: str) -> bool:
"""Eliminar un post."""
pass
def validate_content(self, content: str) -> bool:
"""Validar que el contenido cumple con los límites de la plataforma."""
# Implementar en subclases según límites específicos
return True

View File

@@ -0,0 +1,252 @@
"""
Publisher para Facebook Pages (Meta Graph API).
"""
from typing import Optional, List, Dict
import httpx
from app.core.config import settings
from app.publishers.base import BasePublisher, PublishResult
class FacebookPublisher(BasePublisher):
"""Publisher para Facebook Pages usando Meta Graph API."""
platform = "facebook"
char_limit = 63206 # Límite real de Facebook
base_url = "https://graph.facebook.com/v18.0"
def __init__(self):
self.access_token = settings.META_ACCESS_TOKEN
self.page_id = settings.FACEBOOK_PAGE_ID
def validate_content(self, content: str) -> bool:
"""Validar longitud del post."""
return len(content) <= self.char_limit
async def publish(
self,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""Publicar en Facebook Page."""
if not self.access_token or not self.page_id:
return PublishResult(
success=False,
error_message="Credenciales de Facebook no configuradas"
)
try:
async with httpx.AsyncClient() as client:
if image_path:
# Publicar con imagen
url = f"{self.base_url}/{self.page_id}/photos"
payload = {
"caption": content,
"url": image_path, # URL pública de la imagen
"access_token": self.access_token
}
else:
# Publicar solo texto
url = f"{self.base_url}/{self.page_id}/feed"
payload = {
"message": content,
"access_token": self.access_token
}
response = await client.post(url, data=payload)
response.raise_for_status()
post_id = response.json().get("id")
return PublishResult(
success=True,
post_id=post_id,
url=f"https://www.facebook.com/{post_id}"
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def publish_thread(
self,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar como un solo post largo en Facebook."""
# Facebook no tiene threads, concatenamos el contenido
combined_content = "\n\n".join(posts)
# Usar la primera imagen si existe
image = images[0] if images else None
return await self.publish(combined_content, image)
async def reply(
self,
post_id: str,
content: str
) -> PublishResult:
"""Responder a un comentario en Facebook."""
if not self.access_token:
return PublishResult(
success=False,
error_message="Token de acceso no configurado"
)
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/comments"
payload = {
"message": content,
"access_token": self.access_token
}
response = await client.post(url, data=payload)
response.raise_for_status()
comment_id = response.json().get("id")
return PublishResult(
success=True,
post_id=comment_id
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def like(self, post_id: str) -> bool:
"""Dar like a un post/comentario."""
if not self.access_token:
return False
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/likes"
payload = {"access_token": self.access_token}
response = await client.post(url, data=payload)
response.raise_for_status()
return response.json().get("success", False)
except httpx.HTTPError:
return False
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
"""Obtener menciones de la página."""
if not self.access_token or not self.page_id:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{self.page_id}/tagged"
params = {
"fields": "id,message,from,created_time,permalink_url",
"access_token": self.access_token
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def get_comments(self, post_id: str) -> List[Dict]:
"""Obtener comentarios de un post."""
if not self.access_token:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/comments"
params = {
"fields": "id,message,from,created_time,like_count",
"access_token": self.access_token
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def get_page_messages(self) -> List[Dict]:
"""Obtener mensajes de la página (inbox)."""
if not self.access_token or not self.page_id:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{self.page_id}/conversations"
params = {
"fields": "id,participants,messages{message,from,created_time}",
"access_token": self.access_token
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def send_message(self, recipient_id: str, message: str) -> PublishResult:
"""Enviar mensaje directo a un usuario."""
if not self.access_token or not self.page_id:
return PublishResult(
success=False,
error_message="Credenciales no configuradas"
)
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{self.page_id}/messages"
payload = {
"recipient": {"id": recipient_id},
"message": {"text": message},
"access_token": self.access_token
}
response = await client.post(url, json=payload)
response.raise_for_status()
message_id = response.json().get("message_id")
return PublishResult(
success=True,
post_id=message_id
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def delete(self, post_id: str) -> bool:
"""Eliminar un post."""
if not self.access_token:
return False
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}"
params = {"access_token": self.access_token}
response = await client.delete(url, params=params)
response.raise_for_status()
return response.json().get("success", False)
except httpx.HTTPError:
return False

View File

@@ -0,0 +1,240 @@
"""
Publisher para Instagram (Meta Graph API).
"""
from typing import Optional, List, Dict
import httpx
from app.core.config import settings
from app.publishers.base import BasePublisher, PublishResult
class InstagramPublisher(BasePublisher):
"""Publisher para Instagram usando Meta Graph API."""
platform = "instagram"
char_limit = 2200
base_url = "https://graph.facebook.com/v18.0"
def __init__(self):
self.access_token = settings.META_ACCESS_TOKEN
self.account_id = settings.INSTAGRAM_ACCOUNT_ID
def validate_content(self, content: str) -> bool:
"""Validar longitud del caption."""
return len(content) <= self.char_limit
async def publish(
self,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""Publicar en Instagram (requiere imagen)."""
if not self.access_token or not self.account_id:
return PublishResult(
success=False,
error_message="Credenciales de Instagram no configuradas"
)
if not image_path:
return PublishResult(
success=False,
error_message="Instagram requiere una imagen para publicar"
)
try:
async with httpx.AsyncClient() as client:
# Paso 1: Crear contenedor de media
# Nota: La imagen debe estar en una URL pública
create_url = f"{self.base_url}/{self.account_id}/media"
payload = {
"caption": content,
"image_url": image_path, # Debe ser URL pública
"access_token": self.access_token
}
response = await client.post(create_url, data=payload)
response.raise_for_status()
container_id = response.json().get("id")
# Paso 2: Publicar el contenedor
publish_url = f"{self.base_url}/{self.account_id}/media_publish"
publish_payload = {
"creation_id": container_id,
"access_token": self.access_token
}
response = await client.post(publish_url, data=publish_payload)
response.raise_for_status()
post_id = response.json().get("id")
return PublishResult(
success=True,
post_id=post_id,
url=f"https://www.instagram.com/p/{post_id}"
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def publish_thread(
self,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar carrusel en Instagram."""
if not self.access_token or not self.account_id:
return PublishResult(
success=False,
error_message="Credenciales de Instagram no configuradas"
)
if not images or len(images) < 2:
return PublishResult(
success=False,
error_message="Un carrusel de Instagram requiere al menos 2 imágenes"
)
try:
async with httpx.AsyncClient() as client:
# Paso 1: Crear contenedores para cada imagen
children_ids = []
for image_url in images[:10]: # Máximo 10 imágenes
create_url = f"{self.base_url}/{self.account_id}/media"
payload = {
"image_url": image_url,
"is_carousel_item": True,
"access_token": self.access_token
}
response = await client.post(create_url, data=payload)
response.raise_for_status()
children_ids.append(response.json().get("id"))
# Paso 2: Crear contenedor del carrusel
carousel_url = f"{self.base_url}/{self.account_id}/media"
carousel_payload = {
"media_type": "CAROUSEL",
"caption": posts[0] if posts else "",
"children": ",".join(children_ids),
"access_token": self.access_token
}
response = await client.post(carousel_url, data=carousel_payload)
response.raise_for_status()
carousel_id = response.json().get("id")
# Paso 3: Publicar
publish_url = f"{self.base_url}/{self.account_id}/media_publish"
publish_payload = {
"creation_id": carousel_id,
"access_token": self.access_token
}
response = await client.post(publish_url, data=publish_payload)
response.raise_for_status()
post_id = response.json().get("id")
return PublishResult(
success=True,
post_id=post_id
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def reply(
self,
post_id: str,
content: str
) -> PublishResult:
"""Responder a un comentario en Instagram."""
if not self.access_token:
return PublishResult(
success=False,
error_message="Token de acceso no configurado"
)
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/replies"
payload = {
"message": content,
"access_token": self.access_token
}
response = await client.post(url, data=payload)
response.raise_for_status()
reply_id = response.json().get("id")
return PublishResult(
success=True,
post_id=reply_id
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def like(self, post_id: str) -> bool:
"""Dar like (no disponible vía API para cuentas de negocio)."""
return False
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
"""Obtener menciones en comentarios e historias."""
if not self.access_token or not self.account_id:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{self.account_id}/tags"
params = {
"fields": "id,caption,media_type,permalink,timestamp,username",
"access_token": self.access_token
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def get_comments(self, post_id: str) -> List[Dict]:
"""Obtener comentarios de un post."""
if not self.access_token:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/comments"
params = {
"fields": "id,text,username,timestamp,like_count",
"access_token": self.access_token
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def delete(self, post_id: str) -> bool:
"""Eliminar un post (no disponible vía API)."""
# La API de Instagram no permite eliminar posts
return False

View File

@@ -0,0 +1,227 @@
"""
Publisher para Threads (Meta).
"""
from typing import Optional, List, Dict
import httpx
from app.core.config import settings
from app.publishers.base import BasePublisher, PublishResult
class ThreadsPublisher(BasePublisher):
"""Publisher para Threads usando Meta Graph API."""
platform = "threads"
char_limit = 500
base_url = "https://graph.threads.net/v1.0"
def __init__(self):
self.access_token = settings.META_ACCESS_TOKEN
self.user_id = settings.THREADS_USER_ID
def validate_content(self, content: str) -> bool:
"""Validar longitud del post."""
return len(content) <= self.char_limit
async def publish(
self,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""Publicar en Threads."""
if not self.access_token or not self.user_id:
return PublishResult(
success=False,
error_message="Credenciales de Threads no configuradas"
)
try:
async with httpx.AsyncClient() as client:
# Paso 1: Crear el contenedor del post
create_url = f"{self.base_url}/{self.user_id}/threads"
payload = {
"text": content,
"media_type": "TEXT",
"access_token": self.access_token
}
# Si hay imagen, subirla primero
if image_path:
# TODO: Implementar subida de imagen a Threads
payload["media_type"] = "IMAGE"
# payload["image_url"] = uploaded_image_url
response = await client.post(create_url, data=payload)
response.raise_for_status()
container_id = response.json().get("id")
# Paso 2: Publicar el contenedor
publish_url = f"{self.base_url}/{self.user_id}/threads_publish"
publish_payload = {
"creation_id": container_id,
"access_token": self.access_token
}
response = await client.post(publish_url, data=publish_payload)
response.raise_for_status()
post_id = response.json().get("id")
return PublishResult(
success=True,
post_id=post_id,
url=f"https://www.threads.net/post/{post_id}"
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def publish_thread(
self,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar un hilo en Threads."""
if not self.access_token or not self.user_id:
return PublishResult(
success=False,
error_message="Credenciales de Threads no configuradas"
)
try:
first_post_id = None
reply_to_id = None
for i, post in enumerate(posts):
async with httpx.AsyncClient() as client:
create_url = f"{self.base_url}/{self.user_id}/threads"
payload = {
"text": post,
"media_type": "TEXT",
"access_token": self.access_token
}
if reply_to_id:
payload["reply_to_id"] = reply_to_id
response = await client.post(create_url, data=payload)
response.raise_for_status()
container_id = response.json().get("id")
# Publicar
publish_url = f"{self.base_url}/{self.user_id}/threads_publish"
publish_payload = {
"creation_id": container_id,
"access_token": self.access_token
}
response = await client.post(publish_url, data=publish_payload)
response.raise_for_status()
post_id = response.json().get("id")
if i == 0:
first_post_id = post_id
reply_to_id = post_id
return PublishResult(
success=True,
post_id=first_post_id,
url=f"https://www.threads.net/post/{first_post_id}"
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def reply(
self,
post_id: str,
content: str
) -> PublishResult:
"""Responder a un post en Threads."""
if not self.access_token or not self.user_id:
return PublishResult(
success=False,
error_message="Credenciales de Threads no configuradas"
)
try:
async with httpx.AsyncClient() as client:
create_url = f"{self.base_url}/{self.user_id}/threads"
payload = {
"text": content,
"media_type": "TEXT",
"reply_to_id": post_id,
"access_token": self.access_token
}
response = await client.post(create_url, data=payload)
response.raise_for_status()
container_id = response.json().get("id")
# Publicar
publish_url = f"{self.base_url}/{self.user_id}/threads_publish"
publish_payload = {
"creation_id": container_id,
"access_token": self.access_token
}
response = await client.post(publish_url, data=publish_payload)
response.raise_for_status()
reply_id = response.json().get("id")
return PublishResult(
success=True,
post_id=reply_id
)
except httpx.HTTPError as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def like(self, post_id: str) -> bool:
"""Dar like a un post (no soportado actualmente por la API)."""
# La API de Threads no soporta likes programáticos actualmente
return False
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
"""Obtener menciones (limitado en la API de Threads)."""
# TODO: Implementar cuando la API lo soporte
return []
async def get_comments(self, post_id: str) -> List[Dict]:
"""Obtener respuestas a un post."""
if not self.access_token:
return []
try:
async with httpx.AsyncClient() as client:
url = f"{self.base_url}/{post_id}/replies"
params = {
"access_token": self.access_token,
"fields": "id,text,username,timestamp"
}
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
return data.get("data", [])
except httpx.HTTPError:
return []
async def delete(self, post_id: str) -> bool:
"""Eliminar un post de Threads (no soportado actualmente)."""
# La API de Threads no soporta eliminación actualmente
return False

View File

@@ -0,0 +1,255 @@
"""
Publisher para X (Twitter).
"""
from typing import Optional, List, Dict
import tweepy
from app.core.config import settings
from app.publishers.base import BasePublisher, PublishResult
class XPublisher(BasePublisher):
"""Publisher para X (Twitter) usando Tweepy."""
platform = "x"
char_limit = 280
def __init__(self):
self.client = None
self.api = None
self._init_client()
def _init_client(self):
"""Inicializar cliente de Twitter."""
if not all([
settings.X_API_KEY,
settings.X_API_SECRET,
settings.X_ACCESS_TOKEN,
settings.X_ACCESS_TOKEN_SECRET
]):
return
# Cliente v2 para publicar
self.client = tweepy.Client(
consumer_key=settings.X_API_KEY,
consumer_secret=settings.X_API_SECRET,
access_token=settings.X_ACCESS_TOKEN,
access_token_secret=settings.X_ACCESS_TOKEN_SECRET,
bearer_token=settings.X_BEARER_TOKEN
)
# API v1.1 para subir imágenes
auth = tweepy.OAuth1UserHandler(
settings.X_API_KEY,
settings.X_API_SECRET,
settings.X_ACCESS_TOKEN,
settings.X_ACCESS_TOKEN_SECRET
)
self.api = tweepy.API(auth)
def validate_content(self, content: str) -> bool:
"""Validar longitud del tweet."""
return len(content) <= self.char_limit
async def publish(
self,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""Publicar un tweet."""
if not self.client:
return PublishResult(
success=False,
error_message="Cliente de X no configurado"
)
try:
media_ids = None
# Subir imagen si existe
if image_path and self.api:
media = self.api.media_upload(filename=image_path)
media_ids = [media.media_id]
# Publicar tweet
response = self.client.create_tweet(
text=content,
media_ids=media_ids
)
tweet_id = response.data['id']
return PublishResult(
success=True,
post_id=tweet_id,
url=f"https://x.com/i/web/status/{tweet_id}"
)
except tweepy.TweepyException as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def publish_thread(
self,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar un hilo de tweets."""
if not self.client:
return PublishResult(
success=False,
error_message="Cliente de X no configurado"
)
try:
previous_tweet_id = None
first_tweet_id = None
for i, post in enumerate(posts):
media_ids = None
# Subir imagen si existe para este post
if images and i < len(images) and images[i] and self.api:
media = self.api.media_upload(filename=images[i])
media_ids = [media.media_id]
# Publicar tweet (como respuesta al anterior si existe)
response = self.client.create_tweet(
text=post,
media_ids=media_ids,
in_reply_to_tweet_id=previous_tweet_id
)
previous_tweet_id = response.data['id']
if i == 0:
first_tweet_id = previous_tweet_id
return PublishResult(
success=True,
post_id=first_tweet_id,
url=f"https://x.com/i/web/status/{first_tweet_id}"
)
except tweepy.TweepyException as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def reply(
self,
post_id: str,
content: str
) -> PublishResult:
"""Responder a un tweet."""
if not self.client:
return PublishResult(
success=False,
error_message="Cliente de X no configurado"
)
try:
response = self.client.create_tweet(
text=content,
in_reply_to_tweet_id=post_id
)
return PublishResult(
success=True,
post_id=response.data['id']
)
except tweepy.TweepyException as e:
return PublishResult(
success=False,
error_message=str(e)
)
async def like(self, post_id: str) -> bool:
"""Dar like a un tweet."""
if not self.client:
return False
try:
self.client.like(post_id)
return True
except tweepy.TweepyException:
return False
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
"""Obtener menciones recientes."""
if not self.client:
return []
try:
# Obtener ID del usuario autenticado
me = self.client.get_me()
user_id = me.data.id
# Obtener menciones
mentions = self.client.get_users_mentions(
id=user_id,
since_id=since_id,
max_results=50,
tweet_fields=['created_at', 'author_id', 'conversation_id']
)
if not mentions.data:
return []
return [
{
"id": str(tweet.id),
"text": tweet.text,
"author_id": str(tweet.author_id),
"created_at": tweet.created_at.isoformat() if tweet.created_at else None
}
for tweet in mentions.data
]
except tweepy.TweepyException:
return []
async def get_comments(self, post_id: str) -> List[Dict]:
"""Obtener respuestas a un tweet."""
if not self.client:
return []
try:
# Buscar respuestas al tweet
query = f"conversation_id:{post_id}"
tweets = self.client.search_recent_tweets(
query=query,
max_results=50,
tweet_fields=['created_at', 'author_id', 'in_reply_to_user_id']
)
if not tweets.data:
return []
return [
{
"id": str(tweet.id),
"text": tweet.text,
"author_id": str(tweet.author_id),
"created_at": tweet.created_at.isoformat() if tweet.created_at else None
}
for tweet in tweets.data
]
except tweepy.TweepyException:
return []
async def delete(self, post_id: str) -> bool:
"""Eliminar un tweet."""
if not self.client:
return False
try:
self.client.delete_tweet(post_id)
return True
except tweepy.TweepyException:
return False

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Services module

View File

@@ -0,0 +1,314 @@
"""
Servicio de generación de contenido con DeepSeek API.
"""
import json
from typing import Optional, List, Dict
from openai import OpenAI
from app.core.config import settings
class ContentGenerator:
"""Generador de contenido usando DeepSeek API."""
def __init__(self):
self.client = OpenAI(
api_key=settings.DEEPSEEK_API_KEY,
base_url=settings.DEEPSEEK_BASE_URL
)
self.model = "deepseek-chat"
def _get_system_prompt(self) -> str:
"""Obtener el prompt del sistema con la personalidad de la marca."""
return f"""Eres el Community Manager de {settings.BUSINESS_NAME}, una empresa de tecnología ubicada en {settings.BUSINESS_LOCATION}.
SOBRE LA EMPRESA:
- Especializada en soluciones de IA, automatización y transformación digital
- Vende equipos de cómputo e impresoras 3D
- Sitio web: {settings.BUSINESS_WEBSITE}
TONO DE COMUNICACIÓN:
{settings.CONTENT_TONE}
ESTILO (inspirado en @midudev, @MoureDev, @SoyDalto):
- Tips cortos y accionables
- Contenido educativo de valor
- Cercano pero profesional
- Uso moderado de emojis
- Hashtags relevantes (máximo 3-5)
REGLAS:
- Nunca uses lenguaje ofensivo
- No hagas promesas exageradas
- Sé honesto y transparente
- Enfócate en ayudar, no en vender directamente
- Adapta el contenido a cada plataforma"""
async def generate_tip_tech(
self,
category: str,
platform: str,
template: Optional[str] = None
) -> str:
"""Generar un tip tech."""
char_limits = {
"x": 280,
"threads": 500,
"instagram": 2200,
"facebook": 500
}
prompt = f"""Genera un tip de tecnología para la categoría: {category}
PLATAFORMA: {platform}
LÍMITE DE CARACTERES: {char_limits.get(platform, 500)}
{f'USA ESTE TEMPLATE COMO BASE: {template}' if template else ''}
REQUISITOS:
- Tip práctico y accionable
- Fácil de entender
- Incluye un emoji relevante al inicio
- Termina con 2-3 hashtags relevantes
- NO incluyas enlaces
Responde SOLO con el texto del post, sin explicaciones."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=300,
temperature=0.7
)
return response.choices[0].message.content.strip()
async def generate_product_post(
self,
product: Dict,
platform: str
) -> str:
"""Generar post para un producto."""
char_limits = {
"x": 280,
"threads": 500,
"instagram": 2200,
"facebook": 1000
}
prompt = f"""Genera un post promocional para este producto:
PRODUCTO: {product['name']}
DESCRIPCIÓN: {product.get('description', 'N/A')}
PRECIO: ${product['price']:,.2f} MXN
CATEGORÍA: {product['category']}
ESPECIFICACIONES: {json.dumps(product.get('specs', {}), ensure_ascii=False)}
PUNTOS DESTACADOS: {', '.join(product.get('highlights', []))}
PLATAFORMA: {platform}
LÍMITE DE CARACTERES: {char_limits.get(platform, 500)}
REQUISITOS:
- Destaca los beneficios principales
- Incluye el precio
- Usa emojis relevantes
- Incluye CTA sutil (ej: "Contáctanos", "Más info en DM")
- Termina con 2-3 hashtags
- NO inventes especificaciones
Responde SOLO con el texto del post."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=400,
temperature=0.7
)
return response.choices[0].message.content.strip()
async def generate_service_post(
self,
service: Dict,
platform: str
) -> str:
"""Generar post para un servicio."""
char_limits = {
"x": 280,
"threads": 500,
"instagram": 2200,
"facebook": 1000
}
prompt = f"""Genera un post promocional para este servicio:
SERVICIO: {service['name']}
DESCRIPCIÓN: {service.get('description', 'N/A')}
CATEGORÍA: {service['category']}
SECTORES OBJETIVO: {', '.join(service.get('target_sectors', []))}
BENEFICIOS: {', '.join(service.get('benefits', []))}
CTA: {service.get('call_to_action', 'Contáctanos para más información')}
PLATAFORMA: {platform}
LÍMITE DE CARACTERES: {char_limits.get(platform, 500)}
REQUISITOS:
- Enfócate en el problema que resuelve
- Destaca 2-3 beneficios clave
- Usa emojis relevantes (✅, 🚀, 💡)
- Incluye el CTA
- Termina con 2-3 hashtags
- Tono consultivo, no vendedor
Responde SOLO con el texto del post."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=400,
temperature=0.7
)
return response.choices[0].message.content.strip()
async def generate_thread(
self,
topic: str,
num_posts: int = 5
) -> List[str]:
"""Generar un hilo educativo."""
prompt = f"""Genera un hilo educativo de {num_posts} posts sobre: {topic}
REQUISITOS:
- Post 1: Gancho que capture atención
- Posts 2-{num_posts-1}: Contenido educativo de valor
- Post {num_posts}: Conclusión con CTA
FORMATO:
- Cada post máximo 280 caracteres
- Numera cada post (1/, 2/, etc.)
- Usa emojis relevantes
- El último post incluye hashtags
Responde con cada post en una línea separada, sin explicaciones."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.7
)
content = response.choices[0].message.content.strip()
# Separar posts por líneas no vacías
posts = [p.strip() for p in content.split('\n') if p.strip()]
return posts
async def generate_response_suggestion(
self,
interaction_content: str,
interaction_type: str,
context: Optional[str] = None
) -> List[str]:
"""Generar sugerencias de respuesta para una interacción."""
prompt = f"""Un usuario escribió esto en redes sociales:
"{interaction_content}"
TIPO DE INTERACCIÓN: {interaction_type}
{f'CONTEXTO ADICIONAL: {context}' if context else ''}
Genera 3 opciones de respuesta diferentes:
1. Respuesta corta y amigable
2. Respuesta que invite a continuar la conversación
3. Respuesta que dirija a más información/contacto
REQUISITOS:
- Máximo 280 caracteres cada una
- Tono amigable y profesional
- Si es una queja, sé empático
- Si es una pregunta técnica, sé útil
Responde con las 3 opciones numeradas, una por línea."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=500,
temperature=0.8
)
content = response.choices[0].message.content.strip()
suggestions = [s.strip() for s in content.split('\n') if s.strip()]
# Limpiar numeración si existe
cleaned = []
for s in suggestions:
if s[0].isdigit() and (s[1] == '.' or s[1] == ')'):
s = s[2:].strip()
cleaned.append(s)
return cleaned[:3] # Máximo 3 sugerencias
async def adapt_content_for_platform(
self,
content: str,
target_platform: str
) -> str:
"""Adaptar contenido existente a una plataforma específica."""
char_limits = {
"x": 280,
"threads": 500,
"instagram": 2200,
"facebook": 1000
}
prompt = f"""Adapta este contenido para {target_platform}:
CONTENIDO ORIGINAL:
{content}
LÍMITE DE CARACTERES: {char_limits.get(target_platform, 500)}
REQUISITOS PARA {target_platform.upper()}:
{"- Muy conciso, directo al punto" if target_platform == "x" else ""}
{"- Puede ser más extenso, incluir más contexto" if target_platform == "instagram" else ""}
{"- Tono más casual y cercano" if target_platform == "threads" else ""}
{"- Puede incluir links, más profesional" if target_platform == "facebook" else ""}
- Mantén la esencia del mensaje
- Ajusta hashtags según la plataforma
Responde SOLO con el contenido adaptado."""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
max_tokens=400,
temperature=0.6
)
return response.choices[0].message.content.strip()
# Instancia global
content_generator = ContentGenerator()

View File

@@ -0,0 +1,174 @@
"""
Servicio de generación de imágenes para posts.
"""
import os
from typing import Dict, Optional
from pathlib import Path
from html2image import Html2Image
from PIL import Image
from jinja2 import Environment, FileSystemLoader
from app.core.config import settings
class ImageGenerator:
"""Generador de imágenes usando plantillas HTML."""
def __init__(self):
self.templates_dir = Path("templates")
self.output_dir = Path("uploads/generated")
self.output_dir.mkdir(parents=True, exist_ok=True)
self.jinja_env = Environment(
loader=FileSystemLoader(self.templates_dir)
)
self.hti = Html2Image(
output_path=str(self.output_dir),
custom_flags=['--no-sandbox', '--disable-gpu']
)
def _render_template(self, template_name: str, variables: Dict) -> str:
"""Renderizar una plantilla HTML con variables."""
template = self.jinja_env.get_template(template_name)
return template.render(**variables)
async def generate_tip_card(
self,
title: str,
content: str,
category: str,
output_name: str
) -> str:
"""Generar imagen de tip tech."""
variables = {
"title": title,
"content": content,
"category": category,
"logo_url": f"{settings.BUSINESS_WEBSITE}/logo.png",
"website": settings.BUSINESS_WEBSITE,
"business_name": settings.BUSINESS_NAME
}
html_content = self._render_template("tip_card.html", variables)
# Generar imagen
output_file = f"{output_name}.png"
self.hti.screenshot(
html_str=html_content,
save_as=output_file,
size=(1080, 1080)
)
return str(self.output_dir / output_file)
async def generate_product_card(
self,
name: str,
price: float,
image_url: str,
highlights: list,
output_name: str
) -> str:
"""Generar imagen de producto."""
variables = {
"name": name,
"price": f"${price:,.2f} MXN",
"image_url": image_url,
"highlights": highlights[:3], # Máximo 3 highlights
"logo_url": f"{settings.BUSINESS_WEBSITE}/logo.png",
"website": settings.BUSINESS_WEBSITE,
"business_name": settings.BUSINESS_NAME
}
html_content = self._render_template("product_card.html", variables)
output_file = f"{output_name}.png"
self.hti.screenshot(
html_str=html_content,
save_as=output_file,
size=(1080, 1080)
)
return str(self.output_dir / output_file)
async def generate_service_card(
self,
name: str,
tagline: str,
benefits: list,
icon: str,
output_name: str
) -> str:
"""Generar imagen de servicio."""
variables = {
"name": name,
"tagline": tagline,
"benefits": benefits[:4], # Máximo 4 beneficios
"icon": icon,
"logo_url": f"{settings.BUSINESS_WEBSITE}/logo.png",
"website": settings.BUSINESS_WEBSITE,
"business_name": settings.BUSINESS_NAME
}
html_content = self._render_template("service_card.html", variables)
output_file = f"{output_name}.png"
self.hti.screenshot(
html_str=html_content,
save_as=output_file,
size=(1080, 1080)
)
return str(self.output_dir / output_file)
async def resize_for_platform(
self,
image_path: str,
platform: str
) -> str:
"""Redimensionar imagen para una plataforma específica."""
sizes = {
"x": (1200, 675), # 16:9
"threads": (1080, 1080), # 1:1
"instagram": (1080, 1080), # 1:1
"facebook": (1200, 630) # ~1.9:1
}
target_size = sizes.get(platform, (1080, 1080))
img = Image.open(image_path)
# Crear nueva imagen con el tamaño objetivo
new_img = Image.new('RGB', target_size, (26, 26, 46)) # Color de fondo de la marca
# Calcular posición para centrar
img_ratio = img.width / img.height
target_ratio = target_size[0] / target_size[1]
if img_ratio > target_ratio:
# Imagen más ancha
new_width = target_size[0]
new_height = int(new_width / img_ratio)
else:
# Imagen más alta
new_height = target_size[1]
new_width = int(new_height * img_ratio)
img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Centrar en la nueva imagen
x = (target_size[0] - new_width) // 2
y = (target_size[1] - new_height) // 2
new_img.paste(img_resized, (x, y))
# Guardar
output_path = image_path.replace('.png', f'_{platform}.png')
new_img.save(output_path, 'PNG', quality=95)
return output_path
# Instancia global
image_generator = ImageGenerator()