x.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """
  2. X (Twitter) 平台实现
  3. 后端:crawler.aiddit.com/crawler/x
  4. """
  5. import json
  6. from typing import Any, Dict, List, Optional
  7. import httpx
  8. from agent.tools.models import ToolResult
  9. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  10. from agent.tools.builtin.content.registry import PlatformDef, register_platform
  11. CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword"
  12. DEFAULT_TIMEOUT = 60.0
  13. async def search(
  14. platform_id: str,
  15. keyword: str,
  16. max_count: int = 20,
  17. cursor: str = "",
  18. extras: Optional[Dict[str, Any]] = None,
  19. ) -> ToolResult:
  20. try:
  21. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  22. response = await client.post(CRAWLER_URL, json={"keyword": keyword})
  23. response.raise_for_status()
  24. data = response.json()
  25. if data.get("code") != 0:
  26. return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误"))
  27. result_data = data.get("data", {})
  28. tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
  29. # 动态导入评价模块
  30. try:
  31. from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
  32. evaluator = SourceQualityEvaluator()
  33. except ImportError:
  34. evaluator = None
  35. summary_list = []
  36. for idx, tweet in enumerate(tweets[:max_count], 1):
  37. text = tweet.get("body_text", "")
  38. score_info = {}
  39. if evaluator:
  40. try:
  41. eval_res = evaluator.evaluate_post(tweet)
  42. score_info = {
  43. "quality_score": eval_res["total_score"],
  44. "quality_grade": eval_res["grade"]
  45. }
  46. tweet["_quality_score"] = eval_res["total_score"]
  47. except Exception:
  48. pass
  49. summary_item = {
  50. "index": idx,
  51. "author": tweet.get("channel_account_name", ""),
  52. "body_text": text[:100] + ("..." if len(text) > 100 else ""),
  53. "like_count": tweet.get("like_count"),
  54. "comment_count": tweet.get("comment_count"),
  55. "link": tweet.get("link"),
  56. }
  57. summary_item.update(score_info)
  58. summary_list.append(summary_item)
  59. # 拼图
  60. images = []
  61. collage_obj = await _build_tweet_collage(tweets[:max_count])
  62. if collage_obj:
  63. images.append(collage_obj)
  64. return ToolResult(
  65. title=f"X: {keyword}",
  66. output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
  67. long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.",
  68. images=images,
  69. metadata={"posts": tweets[:max_count]},
  70. )
  71. except Exception as e:
  72. return ToolResult(title="X 搜索异常", output="", error=str(e))
  73. MAX_DETAIL_IMAGES = 10
  74. KEEP_INDIVIDUAL = 8
  75. async def _build_images_collage(urls: List[str]) -> Optional[Dict[str, Any]]:
  76. """将一组图片 URL 拼成单张网格图"""
  77. if not urls:
  78. return None
  79. loaded = await load_images(urls)
  80. valid_images = [img for (_, img) in loaded if img is not None]
  81. if not valid_images:
  82. return None
  83. grid = build_image_grid(images=valid_images, labels=None)
  84. import io
  85. buf = io.BytesIO()
  86. grid.save(buf, format="PNG")
  87. img_bytes = buf.getvalue()
  88. try:
  89. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  90. import hashlib
  91. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  92. filename = f"x_detail_collage_{md5_hash}.png"
  93. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  94. return {"type": "url", "url": cdn_url}
  95. except Exception as e:
  96. import logging
  97. logging.getLogger(__name__).warning("Failed to upload x detail collage to CDN: %s", e)
  98. b64, _ = encode_base64(grid, format="PNG")
  99. return {"type": "base64", "media_type": "image/png", "data": b64}
  100. async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
  101. """X 的详情直接从缓存的搜索结果取完整数据"""
  102. author = post.get("channel_account_name", "")
  103. text = post.get("body_text", "")[:30]
  104. img_urls = []
  105. for img_item in post.get("image_url_list", []):
  106. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  107. if url:
  108. img_urls.append(url)
  109. all_images = []
  110. if len(img_urls) > MAX_DETAIL_IMAGES:
  111. for u in img_urls[:KEEP_INDIVIDUAL]:
  112. all_images.append({"type": "url", "url": u})
  113. collage = await _build_images_collage(img_urls[KEEP_INDIVIDUAL:])
  114. if collage:
  115. all_images.append(collage)
  116. else:
  117. for u in img_urls:
  118. all_images.append({"type": "url", "url": u})
  119. output_json = json.dumps(post, ensure_ascii=False, indent=2)
  120. output_text = (
  121. output_json
  122. + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。"
  123. )
  124. return ToolResult(
  125. title=f"X 详情: @{author}",
  126. output=output_text,
  127. long_term_memory=f"Viewed X post by @{author}: {text}",
  128. images=all_images,
  129. )
  130. async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]:
  131. urls, titles = [], []
  132. for tweet in tweets:
  133. thumb = None
  134. for img_item in tweet.get("image_url_list", []):
  135. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  136. if url:
  137. thumb = url
  138. break
  139. if not thumb:
  140. thumb = tweet.get("cover_url")
  141. if thumb:
  142. urls.append(thumb)
  143. base_title = f"@{tweet.get('channel_account_name', '')}"
  144. score = tweet.get("_quality_score")
  145. if score is not None:
  146. title_with_score = f"[{score}分] {base_title}"
  147. else:
  148. title_with_score = base_title
  149. titles.append(title_with_score)
  150. if not urls:
  151. return None
  152. loaded = await load_images(urls)
  153. valid_images, valid_labels = [], []
  154. for (_, img), title in zip(loaded, titles):
  155. if img is not None:
  156. valid_images.append(img)
  157. valid_labels.append(title)
  158. if not valid_images:
  159. return None
  160. grid = build_image_grid(images=valid_images, labels=valid_labels)
  161. import io
  162. buf = io.BytesIO()
  163. grid.save(buf, format="PNG")
  164. img_bytes = buf.getvalue()
  165. try:
  166. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  167. import hashlib
  168. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  169. filename = f"x_collage_{md5_hash}.png"
  170. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  171. return {"type": "url", "url": cdn_url}
  172. except Exception as e:
  173. import logging
  174. logging.getLogger(__name__).warning("Failed to upload x collage to CDN: %s", e)
  175. b64, _ = encode_base64(grid, format="PNG")
  176. return {"type": "base64", "media_type": "image/png", "data": b64}
  177. # ── 注册 ──
  178. _X = PlatformDef(
  179. id="x",
  180. name="X (Twitter)",
  181. aliases=["twitter", "推特"],
  182. )
  183. _X.search_impl = search
  184. _X.detail_impl = detail
  185. register_platform(_X)