feat(phase-2): Complete social media API integration

- Add ImageUploadService for public URL generation (Meta APIs)
- Create PublisherManager for unified multi-platform publishing
- Add /api/publish endpoints (single, multiple, thread, test)
- Add compose page in dashboard for creating posts
- Add connection test script (scripts/test_connections.py)
- Update navigation with compose link and logout

New endpoints:
- POST /api/publish/single - Publish to one platform
- POST /api/publish/multiple - Publish to multiple platforms
- POST /api/publish/thread - Publish thread (X/Threads)
- GET /api/publish/test - Test all API connections
- GET /api/publish/platforms - List available platforms

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 01:43:22 +00:00
parent cda224f852
commit 3caf2a67fb
9 changed files with 1596 additions and 6 deletions

343
app/publishers/manager.py Normal file
View File

@@ -0,0 +1,343 @@
"""
PublisherManager - Gestor unificado de publicaciones.
Coordina todos los publishers y proporciona una interfaz única.
"""
import asyncio
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from enum import Enum
from app.publishers.base import PublishResult
from app.publishers.x_publisher import XPublisher
from app.publishers.threads_publisher import ThreadsPublisher
from app.publishers.facebook_publisher import FacebookPublisher
from app.publishers.instagram_publisher import InstagramPublisher
from app.services.image_upload import image_upload
class Platform(str, Enum):
"""Plataformas soportadas."""
X = "x"
THREADS = "threads"
FACEBOOK = "facebook"
INSTAGRAM = "instagram"
@dataclass
class MultiPublishResult:
"""Resultado de publicación en múltiples plataformas."""
success: bool
results: Dict[str, PublishResult]
errors: List[str]
@property
def successful_platforms(self) -> List[str]:
"""Plataformas donde se publicó exitosamente."""
return [p for p, r in self.results.items() if r.success]
@property
def failed_platforms(self) -> List[str]:
"""Plataformas donde falló la publicación."""
return [p for p, r in self.results.items() if not r.success]
class PublisherManager:
"""
Gestor unificado de publicaciones.
Proporciona una interfaz única para publicar contenido
en todas las plataformas soportadas.
"""
def __init__(self):
self._publishers = {}
self._init_publishers()
def _init_publishers(self):
"""Inicializar publishers disponibles."""
self._publishers = {
Platform.X: XPublisher(),
Platform.THREADS: ThreadsPublisher(),
Platform.FACEBOOK: FacebookPublisher(),
Platform.INSTAGRAM: InstagramPublisher(),
}
def get_publisher(self, platform: Platform):
"""Obtener publisher para una plataforma."""
return self._publishers.get(platform)
def get_available_platforms(self) -> List[str]:
"""Obtener lista de plataformas con credenciales configuradas."""
available = []
for platform, publisher in self._publishers.items():
if self._is_configured(platform):
available.append(platform.value)
return available
def _is_configured(self, platform: Platform) -> bool:
"""Verificar si una plataforma tiene credenciales configuradas."""
publisher = self._publishers.get(platform)
if not publisher:
return False
if platform == Platform.X:
return publisher.client is not None
elif platform == Platform.THREADS:
return bool(publisher.access_token and publisher.user_id)
elif platform == Platform.FACEBOOK:
return bool(publisher.access_token and publisher.page_id)
elif platform == Platform.INSTAGRAM:
return bool(publisher.access_token and publisher.account_id)
return False
async def publish(
self,
platform: Platform,
content: str,
image_path: Optional[str] = None
) -> PublishResult:
"""
Publicar en una plataforma específica.
Args:
platform: Plataforma destino
content: Texto del post
image_path: Ruta local a imagen (opcional)
Returns:
PublishResult con el resultado
"""
publisher = self._publishers.get(platform)
if not publisher:
return PublishResult(
success=False,
error_message=f"Plataforma no soportada: {platform}"
)
# Validar contenido
if not publisher.validate_content(content):
return PublishResult(
success=False,
error_message=f"Contenido excede límite de {publisher.char_limit} caracteres"
)
# Subir imagen si es necesario para Meta APIs
public_image_url = None
if image_path and platform in [Platform.THREADS, Platform.FACEBOOK, Platform.INSTAGRAM]:
try:
public_image_url = await image_upload.upload_from_path(image_path)
except Exception as e:
return PublishResult(
success=False,
error_message=f"Error subiendo imagen: {e}"
)
# Publicar
try:
if platform == Platform.X:
return await publisher.publish(content, image_path)
else:
return await publisher.publish(content, public_image_url or image_path)
except Exception as e:
return PublishResult(
success=False,
error_message=f"Error al publicar: {e}"
)
async def publish_to_multiple(
self,
platforms: List[Platform],
content: Dict[str, str],
image_path: Optional[str] = None,
parallel: bool = True
) -> MultiPublishResult:
"""
Publicar en múltiples plataformas.
Args:
platforms: Lista de plataformas
content: Dict con contenido por plataforma {platform: texto}
o string único para todas
image_path: Ruta a imagen (opcional)
parallel: Si True, publica en paralelo
Returns:
MultiPublishResult con resultados por plataforma
"""
results = {}
errors = []
# Normalizar contenido
if isinstance(content, str):
content = {p.value: content for p in platforms}
# Subir imagen una sola vez si es necesario
public_image_url = None
if image_path:
try:
public_image_url = await image_upload.upload_from_path(image_path)
except Exception as e:
errors.append(f"Error subiendo imagen: {e}")
async def publish_to_platform(platform: Platform):
platform_content = content.get(platform.value, "")
if not platform_content:
return platform, PublishResult(
success=False,
error_message="Sin contenido para esta plataforma"
)
# X usa ruta local, Meta usa URL pública
img = image_path if platform == Platform.X else public_image_url
result = await self.publish(platform, platform_content, img)
return platform, result
if parallel:
# Publicar en paralelo
tasks = [publish_to_platform(p) for p in platforms]
platform_results = await asyncio.gather(*tasks, return_exceptions=True)
for item in platform_results:
if isinstance(item, Exception):
errors.append(str(item))
else:
platform, result = item
results[platform.value] = result
else:
# Publicar secuencialmente
for platform in platforms:
try:
_, result = await publish_to_platform(platform)
results[platform.value] = result
except Exception as e:
errors.append(f"{platform.value}: {e}")
results[platform.value] = PublishResult(
success=False,
error_message=str(e)
)
# Determinar éxito global
success = any(r.success for r in results.values())
return MultiPublishResult(
success=success,
results=results,
errors=errors
)
async def publish_thread(
self,
platform: Platform,
posts: List[str],
images: Optional[List[str]] = None
) -> PublishResult:
"""Publicar un hilo en una plataforma."""
publisher = self._publishers.get(platform)
if not publisher:
return PublishResult(
success=False,
error_message=f"Plataforma no soportada: {platform}"
)
return await publisher.publish_thread(posts, images)
async def test_connection(self, platform: Platform) -> Dict[str, Any]:
"""
Probar conexión con una plataforma.
Returns:
Dict con status y detalles
"""
result = {
"platform": platform.value,
"configured": self._is_configured(platform),
"connected": False,
"error": None,
"details": {}
}
if not result["configured"]:
result["error"] = "Credenciales no configuradas"
return result
publisher = self._publishers.get(platform)
try:
if platform == Platform.X:
me = publisher.client.get_me()
if me.data:
result["connected"] = True
result["details"] = {
"username": me.data.username,
"name": me.data.name,
"id": str(me.data.id)
}
elif platform == Platform.THREADS:
import httpx
async with httpx.AsyncClient() as client:
url = f"{publisher.base_url}/{publisher.user_id}"
params = {
"fields": "id,username",
"access_token": publisher.access_token
}
response = await client.get(url, params=params)
if response.status_code == 200:
data = response.json()
result["connected"] = True
result["details"] = data
elif platform == Platform.FACEBOOK:
import httpx
async with httpx.AsyncClient() as client:
url = f"{publisher.base_url}/{publisher.page_id}"
params = {
"fields": "id,name,followers_count",
"access_token": publisher.access_token
}
response = await client.get(url, params=params)
if response.status_code == 200:
data = response.json()
result["connected"] = True
result["details"] = data
elif platform == Platform.INSTAGRAM:
import httpx
async with httpx.AsyncClient() as client:
url = f"{publisher.base_url}/{publisher.account_id}"
params = {
"fields": "id,username,followers_count,media_count",
"access_token": publisher.access_token
}
response = await client.get(url, params=params)
if response.status_code == 200:
data = response.json()
result["connected"] = True
result["details"] = data
except Exception as e:
result["error"] = str(e)
return result
async def test_all_connections(self) -> Dict[str, Dict[str, Any]]:
"""Probar conexión con todas las plataformas."""
results = {}
for platform in Platform:
results[platform.value] = await self.test_connection(platform)
return results
# Instancia global
publisher_manager = PublisherManager()