- Add processed_ids set to prevent duplicate inserts when a tweet is both a mention and a comment (same external_id) - Enhance get_mentions() to fetch usernames via user expansions - Update fetch_interactions to prefer username over numeric author_id Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
280 lines
8.4 KiB
Python
280 lines
8.4 KiB
Python
"""
|
|
Publisher para X (Twitter).
|
|
"""
|
|
|
|
from typing import Optional, List, Dict
|
|
import tweepy
|
|
|
|
from app.core.config import settings
|
|
from app.publishers.base import BasePublisher, PublishResult
|
|
|
|
|
|
class XPublisher(BasePublisher):
|
|
"""Publisher para X (Twitter) usando Tweepy."""
|
|
|
|
platform = "x"
|
|
char_limit = 280
|
|
|
|
def __init__(self):
|
|
self.client = None
|
|
self.api = None
|
|
self._init_client()
|
|
|
|
def _init_client(self):
|
|
"""Inicializar cliente de Twitter."""
|
|
if not all([
|
|
settings.X_API_KEY,
|
|
settings.X_API_SECRET,
|
|
settings.X_ACCESS_TOKEN,
|
|
settings.X_ACCESS_TOKEN_SECRET
|
|
]):
|
|
return
|
|
|
|
# Cliente v2 para publicar
|
|
self.client = tweepy.Client(
|
|
consumer_key=settings.X_API_KEY,
|
|
consumer_secret=settings.X_API_SECRET,
|
|
access_token=settings.X_ACCESS_TOKEN,
|
|
access_token_secret=settings.X_ACCESS_TOKEN_SECRET,
|
|
bearer_token=settings.X_BEARER_TOKEN
|
|
)
|
|
|
|
# API v1.1 para subir imágenes
|
|
auth = tweepy.OAuth1UserHandler(
|
|
settings.X_API_KEY,
|
|
settings.X_API_SECRET,
|
|
settings.X_ACCESS_TOKEN,
|
|
settings.X_ACCESS_TOKEN_SECRET
|
|
)
|
|
self.api = tweepy.API(auth)
|
|
|
|
def validate_content(self, content: str) -> bool:
|
|
"""Validar longitud del tweet."""
|
|
return len(content) <= self.char_limit
|
|
|
|
async def publish(
|
|
self,
|
|
content: str,
|
|
image_path: Optional[str] = None
|
|
) -> PublishResult:
|
|
"""Publicar un tweet."""
|
|
if not self.client:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message="Cliente de X no configurado"
|
|
)
|
|
|
|
try:
|
|
media_ids = None
|
|
|
|
# Subir imagen si existe
|
|
if image_path and self.api:
|
|
media = self.api.media_upload(filename=image_path)
|
|
media_ids = [media.media_id]
|
|
|
|
# Publicar tweet
|
|
response = self.client.create_tweet(
|
|
text=content,
|
|
media_ids=media_ids
|
|
)
|
|
|
|
tweet_id = response.data['id']
|
|
return PublishResult(
|
|
success=True,
|
|
post_id=tweet_id,
|
|
url=f"https://x.com/i/web/status/{tweet_id}"
|
|
)
|
|
|
|
except tweepy.TweepyException as e:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def publish_thread(
|
|
self,
|
|
posts: List[str],
|
|
images: Optional[List[str]] = None
|
|
) -> PublishResult:
|
|
"""Publicar un hilo de tweets."""
|
|
if not self.client:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message="Cliente de X no configurado"
|
|
)
|
|
|
|
try:
|
|
previous_tweet_id = None
|
|
first_tweet_id = None
|
|
|
|
for i, post in enumerate(posts):
|
|
media_ids = None
|
|
|
|
# Subir imagen si existe para este post
|
|
if images and i < len(images) and images[i] and self.api:
|
|
media = self.api.media_upload(filename=images[i])
|
|
media_ids = [media.media_id]
|
|
|
|
# Publicar tweet (como respuesta al anterior si existe)
|
|
response = self.client.create_tweet(
|
|
text=post,
|
|
media_ids=media_ids,
|
|
in_reply_to_tweet_id=previous_tweet_id
|
|
)
|
|
|
|
previous_tweet_id = response.data['id']
|
|
|
|
if i == 0:
|
|
first_tweet_id = previous_tweet_id
|
|
|
|
return PublishResult(
|
|
success=True,
|
|
post_id=first_tweet_id,
|
|
url=f"https://x.com/i/web/status/{first_tweet_id}"
|
|
)
|
|
|
|
except tweepy.TweepyException as e:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def reply(
|
|
self,
|
|
post_id: str,
|
|
content: str
|
|
) -> PublishResult:
|
|
"""Responder a un tweet."""
|
|
if not self.client:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message="Cliente de X no configurado"
|
|
)
|
|
|
|
try:
|
|
response = self.client.create_tweet(
|
|
text=content,
|
|
in_reply_to_tweet_id=post_id
|
|
)
|
|
|
|
return PublishResult(
|
|
success=True,
|
|
post_id=response.data['id']
|
|
)
|
|
|
|
except tweepy.TweepyException as e:
|
|
return PublishResult(
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def like(self, post_id: str) -> bool:
|
|
"""Dar like a un tweet."""
|
|
if not self.client:
|
|
return False
|
|
|
|
try:
|
|
self.client.like(post_id)
|
|
return True
|
|
except tweepy.TweepyException:
|
|
return False
|
|
|
|
async def get_mentions(self, since_id: Optional[str] = None) -> List[Dict]:
|
|
"""Obtener menciones recientes."""
|
|
if not self.client:
|
|
return []
|
|
|
|
try:
|
|
# Obtener ID del usuario autenticado
|
|
me = self.client.get_me()
|
|
user_id = me.data.id
|
|
|
|
# Obtener menciones con información de usuarios
|
|
mentions = self.client.get_users_mentions(
|
|
id=user_id,
|
|
since_id=since_id,
|
|
max_results=50,
|
|
tweet_fields=['created_at', 'author_id', 'conversation_id'],
|
|
user_fields=['username'],
|
|
expansions=['author_id']
|
|
)
|
|
|
|
if not mentions.data:
|
|
return []
|
|
|
|
# Crear mapa de usuarios para obtener usernames
|
|
users_map = {}
|
|
if mentions.includes and 'users' in mentions.includes:
|
|
for user in mentions.includes['users']:
|
|
users_map[str(user.id)] = user.username
|
|
|
|
return [
|
|
{
|
|
"id": str(tweet.id),
|
|
"text": tweet.text,
|
|
"author_id": str(tweet.author_id),
|
|
"username": users_map.get(str(tweet.author_id), str(tweet.author_id)),
|
|
"created_at": tweet.created_at.isoformat() if tweet.created_at else None
|
|
}
|
|
for tweet in mentions.data
|
|
]
|
|
|
|
except tweepy.TweepyException:
|
|
return []
|
|
|
|
async def get_comments(self, post_id: str) -> List[Dict]:
|
|
"""Obtener respuestas a un tweet (excluyendo las propias)."""
|
|
if not self.client:
|
|
return []
|
|
|
|
try:
|
|
# Obtener ID del usuario autenticado para filtrar auto-respuestas
|
|
me = self.client.get_me()
|
|
my_user_id = str(me.data.id) if me.data else None
|
|
|
|
# Buscar respuestas al tweet
|
|
query = f"conversation_id:{post_id}"
|
|
tweets = self.client.search_recent_tweets(
|
|
query=query,
|
|
max_results=50,
|
|
tweet_fields=['created_at', 'author_id', 'in_reply_to_user_id'],
|
|
user_fields=['username'],
|
|
expansions=['author_id']
|
|
)
|
|
|
|
if not tweets.data:
|
|
return []
|
|
|
|
# Crear mapa de usuarios para obtener usernames
|
|
users_map = {}
|
|
if tweets.includes and 'users' in tweets.includes:
|
|
for user in tweets.includes['users']:
|
|
users_map[str(user.id)] = user.username
|
|
|
|
# Filtrar tweets propios (auto-respuestas del hilo)
|
|
return [
|
|
{
|
|
"id": str(tweet.id),
|
|
"text": tweet.text,
|
|
"author_id": str(tweet.author_id),
|
|
"username": users_map.get(str(tweet.author_id), "unknown"),
|
|
"created_at": tweet.created_at.isoformat() if tweet.created_at else None
|
|
}
|
|
for tweet in tweets.data
|
|
if str(tweet.author_id) != my_user_id # Excluir tweets propios
|
|
]
|
|
|
|
except tweepy.TweepyException:
|
|
return []
|
|
|
|
async def delete(self, post_id: str) -> bool:
|
|
"""Eliminar un tweet."""
|
|
if not self.client:
|
|
return False
|
|
|
|
try:
|
|
self.client.delete_tweet(post_id)
|
|
return True
|
|
except tweepy.TweepyException:
|
|
return False
|