x.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. """
  2. X (Twitter) 平台实现
  3. 后端:crawler.aiddit.com/crawler/x
  4. """
  5. import json
  6. from typing import Any, Dict, List, Optional
  7. import httpx
  8. from agent.tools.models import ToolResult
  9. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  10. from agent.tools.builtin.content.registry import PlatformDef, register_platform
  11. CRAWLER_URL = "http://crawler.aiddit.com/crawler/x/keyword"
  12. COMMENT_URL = "http://crawler.aiddit.com/crawler/x/comment"
  13. DEFAULT_TIMEOUT = 60.0
  14. AUTHOR_COMMENT_TOP_N = 10
  15. async def search(
  16. platform_id: str,
  17. keyword: str,
  18. max_count: int = 20,
  19. cursor: str = "",
  20. extras: Optional[Dict[str, Any]] = None,
  21. ) -> ToolResult:
  22. try:
  23. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  24. response = await client.post(CRAWLER_URL, json={"keyword": keyword})
  25. response.raise_for_status()
  26. data = response.json()
  27. if data.get("code") != 0:
  28. return ToolResult(title="X 搜索失败", output="", error=data.get("msg", "未知错误"))
  29. result_data = data.get("data", {})
  30. tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
  31. # 动态导入评价模块
  32. try:
  33. from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
  34. evaluator = SourceQualityEvaluator()
  35. except ImportError:
  36. evaluator = None
  37. # 视频帖在评分前先并发探测 mp4 duration(HTTP Range,不下载视频流),
  38. # 让 evaluator 用真实时长替代 body 长度作为内容信号。
  39. if evaluator and tweets:
  40. try:
  41. from agent.tools.builtin.content.transcription import probe_durations_for_posts
  42. await probe_durations_for_posts("x", tweets[:max_count], concurrency=8)
  43. except Exception as e:
  44. import logging
  45. logging.getLogger(__name__).info("duration probe failed for x: %s", e)
  46. summary_list = []
  47. for idx, tweet in enumerate(tweets[:max_count], 1):
  48. text = tweet.get("body_text", "")
  49. score_info = {}
  50. if evaluator:
  51. try:
  52. eval_res = evaluator.evaluate_post(tweet)
  53. score_info = {
  54. "quality_score": eval_res["total_score"],
  55. "quality_grade": eval_res["grade"]
  56. }
  57. tweet["_quality_score"] = eval_res["total_score"]
  58. except Exception:
  59. pass
  60. summary_item = {
  61. "index": idx,
  62. "author": tweet.get("channel_account_name", ""),
  63. "body_text": text[:100] + ("..." if len(text) > 100 else ""),
  64. "like_count": tweet.get("like_count"),
  65. "comment_count": tweet.get("comment_count"),
  66. "link": tweet.get("link"),
  67. }
  68. summary_item.update(score_info)
  69. summary_list.append(summary_item)
  70. # 拼图
  71. images = []
  72. collage_obj = await _build_tweet_collage(tweets[:max_count])
  73. if collage_obj:
  74. images.append(collage_obj)
  75. return ToolResult(
  76. title=f"X: {keyword}",
  77. output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
  78. long_term_memory=f"Searched X for '{keyword}', {len(tweets)} results.",
  79. images=images,
  80. metadata={"posts": tweets[:max_count]},
  81. )
  82. except Exception as e:
  83. return ToolResult(title="X 搜索异常", output="", error=str(e))
  84. MAX_DETAIL_IMAGES = 10
  85. KEEP_INDIVIDUAL = 8
  86. async def _build_images_collage(urls: List[str]) -> Optional[Dict[str, Any]]:
  87. """将一组图片 URL 拼成单张网格图"""
  88. if not urls:
  89. return None
  90. loaded = await load_images(urls)
  91. valid_images = [img for (_, img) in loaded if img is not None]
  92. if not valid_images:
  93. return None
  94. grid = build_image_grid(images=valid_images, labels=None)
  95. import io
  96. buf = io.BytesIO()
  97. grid.save(buf, format="PNG")
  98. img_bytes = buf.getvalue()
  99. try:
  100. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  101. import hashlib
  102. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  103. filename = f"x_detail_collage_{md5_hash}.png"
  104. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  105. return {"type": "url", "url": cdn_url}
  106. except Exception as e:
  107. import logging
  108. logging.getLogger(__name__).warning("Failed to upload x detail collage to CDN: %s", e)
  109. b64, _ = encode_base64(grid, format="PNG")
  110. return {"type": "base64", "media_type": "image/png", "data": b64}
  111. async def _fetch_author_comments(content_id: str, author_id: str) -> List[Dict[str, Any]]:
  112. """拉取该推文评论,仅保留原作者本人发布的回复,按点赞数降序取 Top N。"""
  113. if not content_id or not author_id:
  114. return []
  115. try:
  116. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  117. resp = await client.post(COMMENT_URL, json={"content_id": content_id})
  118. resp.raise_for_status()
  119. payload = resp.json()
  120. except Exception as e:
  121. import logging
  122. logging.getLogger(__name__).warning("Failed to fetch x comments for %s: %s", content_id, e)
  123. return []
  124. if payload.get("code") != 0:
  125. return []
  126. inner = payload.get("data", {})
  127. raw_comments = inner.get("data", []) if isinstance(inner, dict) else []
  128. author_id_str = str(author_id)
  129. author_comments = []
  130. for c in raw_comments:
  131. author = c.get("author") or {}
  132. if str(author.get("rest_id", "")) != author_id_str:
  133. continue
  134. author_comments.append({
  135. "text": c.get("display_text") or c.get("text", ""),
  136. "likes": c.get("likes", 0) or 0,
  137. "replies": c.get("replies", 0) or 0,
  138. "created_at": c.get("created_at", ""),
  139. })
  140. author_comments.sort(key=lambda x: x["likes"], reverse=True)
  141. return author_comments[:AUTHOR_COMMENT_TOP_N]
  142. async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
  143. """X 的详情直接从缓存的搜索结果取完整数据,并补拉作者本人的热门补充评论。"""
  144. author = post.get("channel_account_name", "")
  145. author_id = post.get("channel_account_id", "")
  146. content_id = post.get("channel_content_id", "")
  147. text = post.get("body_text", "")[:30]
  148. img_urls = []
  149. for img_item in post.get("image_url_list", []):
  150. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  151. if url:
  152. img_urls.append(url)
  153. all_images = []
  154. if len(img_urls) > MAX_DETAIL_IMAGES:
  155. for u in img_urls[:KEEP_INDIVIDUAL]:
  156. all_images.append({"type": "url", "url": u})
  157. collage = await _build_images_collage(img_urls[KEEP_INDIVIDUAL:])
  158. if collage:
  159. all_images.append(collage)
  160. else:
  161. for u in img_urls:
  162. all_images.append({"type": "url", "url": u})
  163. author_comments = await _fetch_author_comments(content_id, author_id)
  164. extras_d = extras or {}
  165. trace_id = extras_d.get("__trace_id__")
  166. if not trace_id:
  167. import os as _os
  168. trace_id = _os.getenv("TRACE_ID")
  169. # 把作者评论写回 cache,让下游离线流程(如 extract_sources)也能拿到
  170. if author_comments:
  171. from agent.tools.builtin.content import cache as _cache
  172. if trace_id and content_id:
  173. _cache.update_post_field(trace_id, "x", content_id, "author_comments", author_comments)
  174. # 视频字幕:检测到 video_url_list 时通过 Deepgram 转写 (default on, opt-out via extras)
  175. transcript_text: Optional[str] = post.get("video_transcript") # cache hit reuse
  176. if not transcript_text and extras_d.get("include_transcript", True):
  177. from agent.tools.builtin.content.transcription import transcribe_video_from_post
  178. transcript_text = await transcribe_video_from_post("x", post)
  179. if transcript_text:
  180. post["video_transcript"] = transcript_text
  181. from agent.tools.builtin.content import cache as _cache
  182. if trace_id and content_id:
  183. _cache.update_post_field(trace_id, "x", content_id, "video_transcript", transcript_text)
  184. output_json = json.dumps(post, ensure_ascii=False, indent=2)
  185. sections = [output_json]
  186. if author_comments:
  187. lines = [f"=== 作者 @{author} 在评论区的补充(按点赞 Top {len(author_comments)}) ==="]
  188. for i, c in enumerate(author_comments, 1):
  189. lines.append(f"{i}. [赞{c['likes']} · 回复{c['replies']}] {c['text']}")
  190. sections.append("\n".join(lines))
  191. # transcript already embedded as post["video_transcript"] inside output_json above;
  192. # no need to repeat as a separate section.
  193. output_text = "\n\n".join(sections)
  194. memory_extras = []
  195. if author_comments:
  196. memory_extras.append(f"{len(author_comments)} author replies")
  197. if transcript_text:
  198. memory_extras.append("+transcript")
  199. memory_suffix = " + " + ", ".join(memory_extras) if memory_extras else ""
  200. return ToolResult(
  201. title=f"X 详情: @{author}",
  202. output=output_text,
  203. long_term_memory=f"Viewed X post by @{author}: {text}{memory_suffix}",
  204. images=all_images,
  205. )
  206. async def _build_tweet_collage(tweets: List[Dict[str, Any]]) -> Optional[str]:
  207. urls, titles = [], []
  208. for tweet in tweets:
  209. thumb = None
  210. for img_item in tweet.get("image_url_list", []):
  211. url = img_item.get("image_url") if isinstance(img_item, dict) else img_item
  212. if url:
  213. thumb = url
  214. break
  215. if not thumb:
  216. thumb = tweet.get("cover_url")
  217. if thumb:
  218. urls.append(thumb)
  219. base_title = f"@{tweet.get('channel_account_name', '')}"
  220. score = tweet.get("_quality_score")
  221. if score is not None:
  222. title_with_score = f"[{score}分] {base_title}"
  223. else:
  224. title_with_score = base_title
  225. titles.append(title_with_score)
  226. if not urls:
  227. return None
  228. loaded = await load_images(urls)
  229. valid_images, valid_labels = [], []
  230. for (_, img), title in zip(loaded, titles):
  231. if img is not None:
  232. valid_images.append(img)
  233. valid_labels.append(title)
  234. if not valid_images:
  235. return None
  236. grid = build_image_grid(images=valid_images, labels=valid_labels)
  237. import io
  238. buf = io.BytesIO()
  239. grid.save(buf, format="PNG")
  240. img_bytes = buf.getvalue()
  241. try:
  242. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  243. import hashlib
  244. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  245. filename = f"x_collage_{md5_hash}.png"
  246. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  247. return {"type": "url", "url": cdn_url}
  248. except Exception as e:
  249. import logging
  250. logging.getLogger(__name__).warning("Failed to upload x collage to CDN: %s", e)
  251. b64, _ = encode_base64(grid, format="PNG")
  252. return {"type": "base64", "media_type": "image/png", "data": b64}
  253. # ── 注册 ──
  254. _X = PlatformDef(
  255. id="x",
  256. name="X (Twitter)",
  257. aliases=["twitter", "推特"],
  258. )
  259. _X.search_impl = search
  260. _X.detail_impl = detail
  261. register_platform(_X)