- Add get_followers() method to X publisher - Track new followers as "follow" interaction type - Update daily reports to show followers separately - Store follower name, username, bio, and profile image Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
186 lines
7.0 KiB
Python
186 lines
7.0 KiB
Python
"""
|
|
Tareas para obtener interacciones de redes sociales.
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
from worker.celery_app import celery_app
|
|
from app.core.database import SessionLocal
|
|
from app.models.interaction import Interaction
|
|
from app.models.post import Post
|
|
from app.publishers import get_publisher
|
|
|
|
|
|
def run_async(coro):
|
|
"""Helper para ejecutar coroutines en Celery."""
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.fetch_interactions.fetch_all_interactions")
|
|
def fetch_all_interactions():
|
|
"""
|
|
Obtener interacciones de todas las plataformas.
|
|
Se ejecuta cada 5 minutos.
|
|
"""
|
|
platforms = ["x", "threads", "instagram", "facebook"]
|
|
results = []
|
|
|
|
for platform in platforms:
|
|
try:
|
|
result = fetch_platform_interactions.delay(platform)
|
|
results.append(f"{platform}: tarea enviada")
|
|
except Exception as e:
|
|
results.append(f"{platform}: error - {e}")
|
|
|
|
return results
|
|
|
|
|
|
@celery_app.task(name="worker.tasks.fetch_interactions.fetch_platform_interactions")
|
|
def fetch_platform_interactions(platform: str):
|
|
"""Obtener interacciones de una plataforma específica."""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
publisher = get_publisher(platform)
|
|
|
|
# Set para trackear external_ids procesados en esta ejecución
|
|
# Evita duplicados cuando un tweet es tanto mención como comentario
|
|
processed_ids = set()
|
|
|
|
# Obtener menciones
|
|
mentions = run_async(publisher.get_mentions())
|
|
new_mentions = 0
|
|
|
|
for mention in mentions:
|
|
external_id = mention.get("id")
|
|
|
|
# Verificar si ya existe en DB o fue procesado en esta ejecución
|
|
if external_id in processed_ids:
|
|
continue
|
|
|
|
existing = db.query(Interaction).filter(
|
|
Interaction.external_id == external_id
|
|
).first()
|
|
|
|
if not existing:
|
|
# Obtener username (preferir username sobre author_id)
|
|
author_username = mention.get("username") or mention.get("author_id", "unknown")
|
|
|
|
interaction = Interaction(
|
|
platform=platform,
|
|
interaction_type="mention",
|
|
external_id=external_id,
|
|
author_username=author_username,
|
|
author_name=mention.get("name"),
|
|
content=mention.get("text", mention.get("message")),
|
|
interaction_at=datetime.fromisoformat(
|
|
mention.get("created_at", datetime.utcnow().isoformat()).replace("Z", "+00:00")
|
|
) if mention.get("created_at") else datetime.utcnow()
|
|
)
|
|
db.add(interaction)
|
|
processed_ids.add(external_id)
|
|
new_mentions += 1
|
|
|
|
# Obtener comentarios de posts recientes
|
|
recent_posts = db.query(Post).filter(
|
|
Post.platform_post_ids.isnot(None),
|
|
Post.platforms.contains([platform])
|
|
).order_by(Post.published_at.desc()).limit(10).all()
|
|
|
|
new_comments = 0
|
|
|
|
for post in recent_posts:
|
|
platform_id = post.platform_post_ids.get(platform)
|
|
if not platform_id:
|
|
continue
|
|
|
|
comments = run_async(publisher.get_comments(platform_id))
|
|
|
|
for comment in comments:
|
|
external_id = comment.get("id")
|
|
|
|
# Verificar si ya fue procesado como mención o existe en DB
|
|
if external_id in processed_ids:
|
|
continue
|
|
|
|
existing = db.query(Interaction).filter(
|
|
Interaction.external_id == external_id
|
|
).first()
|
|
|
|
if not existing:
|
|
# Obtener username (X usa 'username', Meta usa 'from.id')
|
|
author_username = comment.get("username")
|
|
if not author_username:
|
|
from_data = comment.get("from", {})
|
|
author_username = from_data.get("id", "unknown") if isinstance(from_data, dict) else "unknown"
|
|
|
|
# Obtener nombre del autor
|
|
author_name = None
|
|
if isinstance(comment.get("from"), dict):
|
|
author_name = comment.get("from", {}).get("name")
|
|
|
|
interaction = Interaction(
|
|
platform=platform,
|
|
interaction_type="comment",
|
|
post_id=post.id,
|
|
external_id=external_id,
|
|
external_post_id=platform_id,
|
|
author_username=author_username,
|
|
author_name=author_name,
|
|
content=comment.get("text", comment.get("message")),
|
|
interaction_at=datetime.fromisoformat(
|
|
comment.get("created_at", comment.get("timestamp", comment.get("created_time", datetime.utcnow().isoformat()))).replace("Z", "+00:00")
|
|
) if comment.get("created_at") or comment.get("timestamp") or comment.get("created_time") else datetime.utcnow()
|
|
)
|
|
db.add(interaction)
|
|
processed_ids.add(external_id)
|
|
new_comments += 1
|
|
|
|
# Obtener nuevos followers (solo para X por ahora)
|
|
new_follows = 0
|
|
if platform == "x" and hasattr(publisher, 'get_followers'):
|
|
followers = run_async(publisher.get_followers(max_results=100))
|
|
|
|
for follower in followers:
|
|
# Usar "follow_" prefix para distinguir de tweet IDs
|
|
follower_id = follower.get("id")
|
|
external_id = f"follow_{follower_id}"
|
|
|
|
if external_id in processed_ids:
|
|
continue
|
|
|
|
existing = db.query(Interaction).filter(
|
|
Interaction.external_id == external_id
|
|
).first()
|
|
|
|
if not existing:
|
|
interaction = Interaction(
|
|
platform=platform,
|
|
interaction_type="follow",
|
|
external_id=external_id,
|
|
author_username=follower.get("username", "unknown"),
|
|
author_name=follower.get("name"),
|
|
author_avatar_url=follower.get("profile_image_url"),
|
|
content=follower.get("description"), # Bio del usuario
|
|
interaction_at=datetime.utcnow() # No sabemos cuándo siguió exactamente
|
|
)
|
|
db.add(interaction)
|
|
processed_ids.add(external_id)
|
|
new_follows += 1
|
|
|
|
db.commit()
|
|
|
|
result = f"{platform}: {new_mentions} menciones, {new_comments} comentarios"
|
|
if new_follows > 0:
|
|
result += f", {new_follows} follows nuevos"
|
|
return result
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
return f"{platform}: error - {e}"
|
|
|
|
finally:
|
|
db.close()
|