x.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. """
  2. X (Twitter) 平台实现
  3. 后端:crawler.aiddit.com/crawler/x
  4. """
  5. import json
  6. from typing import Any, Dict, List, Optional
  7. import httpx
  8. from agent.tools.models import ToolResult
  9. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  10. from agent.tools.builtin.content.registry import PlatformDef, register_platform
  11. CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword"
  12. DEFAULT_TIMEOUT = 60.0
  13. async def search(
  14. platform_id: str,
  15. keyword: str,
  16. max_count: int = 20,
  17. cursor: str = "",
  18. extras: Optional[Dict[str, Any]] = None,
  19. ) -> ToolResult:
  20. try:
  21. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  22. response = await client.post(CRAWLER_URL, json={"keyword": keyword})
  23. response.raise_for_status()
  24. data = response.json()
  25. if data.get("code") != 0:
  26. return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误"))
  27. result_data = data.get("data", {})
  28. tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
  29. summary_list = []
  30. for idx, tweet in enumerate(tweets[:max_count], 1):
  31. text = tweet.get("body_text", "")
  32. summary_list.append({
  33. "index": idx,
  34. "author": tweet.get("channel_account_name", ""),
  35. "body_text": text[:100] + ("..." if len(text) > 100 else ""),
  36. "like_count": tweet.get("like_count"),
  37. "comment_count": tweet.get("comment_count"),
  38. })
  39. # 拼图
  40. images = []
  41. collage_b64 = await _build_tweet_collage(tweets[:max_count])
  42. if collage_b64:
  43. images.append({"type": "base64", "media_type": "image/png", "data": collage_b64})
  44. return ToolResult(
  45. title=f"X: {keyword}",
  46. output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
  47. long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.",
  48. images=images,
  49. metadata={"posts": tweets[:max_count]},
  50. )
  51. except Exception as e:
  52. return ToolResult(title="X 搜索异常", output="", error=str(e))
  53. async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
  54. """X 的详情直接从缓存的搜索结果取完整数据"""
  55. author = post.get("channel_account_name", "")
  56. text = post.get("body_text", "")[:30]
  57. all_images = []
  58. for img_item in post.get("image_url_list", []):
  59. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  60. if url:
  61. all_images.append({"type": "url", "url": url})
  62. return ToolResult(
  63. title=f"X 详情: @{author}",
  64. output=json.dumps(post, ensure_ascii=False, indent=2),
  65. long_term_memory=f"Viewed X post by @{author}: {text}",
  66. images=all_images,
  67. )
  68. async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]:
  69. urls, titles = [], []
  70. for tweet in tweets:
  71. thumb = None
  72. for img_item in tweet.get("image_url_list", []):
  73. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  74. if url:
  75. thumb = url
  76. break
  77. if not thumb:
  78. thumb = tweet.get("cover_url")
  79. if thumb:
  80. urls.append(thumb)
  81. titles.append(f"@{tweet.get('channel_account_name', '')}")
  82. if not urls:
  83. return None
  84. loaded = await load_images(urls)
  85. valid_images, valid_labels = [], []
  86. for (_, img), title in zip(loaded, titles):
  87. if img is not None:
  88. valid_images.append(img)
  89. valid_labels.append(title)
  90. if not valid_images:
  91. return None
  92. grid = build_image_grid(images=valid_images, labels=valid_labels)
  93. b64, _ = encode_base64(grid, format="PNG")
  94. return b64
  95. # ── 注册 ──
  96. _X = PlatformDef(
  97. id="x",
  98. name="X (Twitter)",
  99. aliases=["twitter", "推特"],
  100. )
  101. _X.search_impl = search
  102. _X.detail_impl = detail
  103. register_platform(_X)