weixin_tools.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import logging
  5. from agent.tools import tool, ToolContext, ToolResult
  6. from src.infra.shared.http_client import AsyncHttpClient
  7. from src.infra.shared.common import extract_history_articles
  8. from src.infra.trace.logging.tool_logging import format_tool_result_for_log, log_tool_call
  9. logger = logging.getLogger(__name__)
  10. # url from aigc
  11. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  12. headers = {"Content-Type": "application/json"}
  13. # fetch_article_detail 串行锁:防止并发请求压垮上游
  14. _detail_lock = asyncio.Lock()
  15. # 重试配置
  16. _MAX_RETRIES = 3
  17. _RETRY_DELAYS = (2, 4, 8) # 指数退避(秒)
  18. class _UpstreamError(Exception):
  19. """
  20. 上游返回业务级失败。
  21. 触发场景:HTTP 200 但响应体中 code != 0 且 data 为 None,
  22. 例如 {"code": 10000, "msg": "未知错误", "data": null}。
  23. 这种情况与网络异常一样需要重试。
  24. """
  25. def _check_upstream(response: dict, tool_name: str) -> None:
  26. """
  27. 检查上游响应是否为业务级失败,若是则抛出 _UpstreamError。
  28. 判定条件:code 字段非 0 且 data 字段为 None。
  29. 调用方应在拿到 response 后立即调用此函数,
  30. 从而让外层重试逻辑统一捕获。
  31. """
  32. code = response.get("code")
  33. data = response.get("data")
  34. if code != 0 and data is None:
  35. msg = response.get("msg", "")
  36. raise _UpstreamError(
  37. f"[{tool_name}] 上游业务错误: code={code}, msg={msg}"
  38. )
  39. def _build_success_result(title: str, response: dict) -> ToolResult:
  40. """把上游响应规范为 ToolResult。"""
  41. output = response.get("output")
  42. if not output:
  43. output = json.dumps(response, ensure_ascii=False)
  44. metadata = response.get("metadata")
  45. if not isinstance(metadata, dict):
  46. metadata = {"raw_data": response}
  47. elif "raw_data" not in metadata:
  48. metadata["raw_data"] = response
  49. return ToolResult(title=title, output=output, metadata=metadata)
  50. @tool(description="通过关键词搜索微信文章")
  51. async def weixin_search(keyword: str, page: str = "1", ctx: ToolContext = None) -> ToolResult:
  52. """
  53. 微信关键词搜索
  54. 通过关键词搜索微信的文章信息,page 用于控制翻页
  55. Args:
  56. keyword: 搜索关键词
  57. Returns:
  58. ToolResult: 包含以下内容:
  59. - output: 文本格式的搜索结果摘要
  60. - metadata.search_results: 结构化的搜索结果列表
  61. - title: 文章标题
  62. - url: 文章链接
  63. - statistics: 统计数据
  64. - time: 文章发布时间戳(秒)
  65. - metadata.raw_data: 原始 API 返回数据
  66. Note:
  67. - 使用 next_cursor 参数可以获取下一页结果
  68. - 建议从 metadata.search_results 获取结构化数据,而非解析 output 文本
  69. - 返回的 next_cursor 值可用于下一次搜索的 cursor 参数
  70. """
  71. url = "{}/keyword".format(base_url)
  72. payload = json.dumps({"keyword": keyword, "cursor": page})
  73. params = {"keyword": keyword, "page": page}
  74. last_error: Exception | None = None
  75. for attempt in range(1, _MAX_RETRIES + 1):
  76. try:
  77. async with AsyncHttpClient(timeout=120) as http_client:
  78. response = await http_client.post(url=url, headers=headers, data=payload)
  79. print(json.dumps(response, ensure_ascii=False, indent=4))
  80. # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
  81. _check_upstream(response, "weixin_search")
  82. # API 返回格式: {"code": 0, "data": {"data": [...], "next_cursor": "2"}}
  83. # 需要将 data.data 映射为 search_results,每条记录的 time 字段包装到 statistics 中
  84. raw_items = (response.get("data") or {}).get("data") or []
  85. search_results = []
  86. for item in raw_items:
  87. search_results.append({
  88. "title": item.get("title", ""),
  89. "url": item.get("url", ""),
  90. "statistics": {"time": item.get("time", 0)},
  91. "cover_url": item.get("cover_url", ""),
  92. "nick_name": item.get("nick_name", ""),
  93. "biz": item.get("biz", ""),
  94. })
  95. next_cursor = (response.get("data") or {}).get("next_cursor")
  96. # output 中嵌入结构化文章数据,使 Agent 和下游解析都能获取完整结果
  97. articles_json = json.dumps(search_results, ensure_ascii=False)
  98. output_text = (
  99. f"搜索关键词「{keyword}」返回 {len(search_results)} 条结果\n"
  100. f"```json\n{articles_json}\n```"
  101. )
  102. normalized = {
  103. "output": output_text,
  104. "metadata": {
  105. "search_results": search_results,
  106. "next_cursor": next_cursor,
  107. "raw_data": response,
  108. },
  109. }
  110. result = _build_success_result("微信文章搜索结果", normalized)
  111. log_tool_call("weixin_search", params, format_tool_result_for_log(result))
  112. return result
  113. except Exception as e:
  114. last_error = e
  115. logger.warning("weixin_search 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e)
  116. if attempt < _MAX_RETRIES:
  117. await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
  118. logger.error("weixin_search 重试 %d 次后仍失败", _MAX_RETRIES)
  119. result = ToolResult(
  120. title="微信文章搜索失败",
  121. output="",
  122. error=str(last_error),
  123. metadata=params,
  124. )
  125. log_tool_call("weixin_search", params, format_tool_result_for_log(result))
  126. return result
  127. @tool(description="通过公众号文章链接获取公众号详情信息")
  128. async def fetch_weixin_account(content_link: str, ctx: ToolContext = None) -> ToolResult:
  129. """
  130. 通过公众号文章链接获取公众号的详情信息
  131. Args:
  132. content_link: 公众号文章链接
  133. Returns:
  134. ToolResult: 包含以下内容:
  135. - output: 文本格式的公众号详情摘要
  136. - metadata.account_info: 公众号详情信息
  137. - account_name: 公众号名称
  138. - wx_gh: 公众号ID
  139. - biz_info: 公众号biz信息
  140. - channel_account_id: 公众号账号内部ID
  141. - metadata.raw_data: 原始 API 返回数据
  142. Note:
  143. - 建议从 metadata.account_info 获取结构化数据,而非解析 output 文本
  144. """
  145. url = "{}/account_info".format(base_url)
  146. payload = json.dumps({"content_link": content_link, "is_cache": False})
  147. params = {"content_link": content_link}
  148. last_error: Exception | None = None
  149. for attempt in range(1, _MAX_RETRIES + 1):
  150. try:
  151. async with AsyncHttpClient(timeout=120) as http_client:
  152. response = await http_client.post(url=url, headers=headers, data=payload)
  153. # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
  154. _check_upstream(response, "fetch_weixin_account")
  155. # API 返回格式: {"code": 0, "data": {"data": {"account_name": ..., "wx_gh": ..., ...}}}
  156. raw_data = (response.get("data") or {}).get("data") or {}
  157. account_info = {
  158. "account_name": raw_data.get("account_name", ""),
  159. "wx_gh": raw_data.get("wx_gh", ""),
  160. "biz_info": raw_data.get("biz_info", {}),
  161. "channel_account_id": raw_data.get("channel_account_id", ""),
  162. }
  163. normalized = {
  164. "output": f"公众号: {account_info['account_name']} (wx_gh={account_info['wx_gh']})",
  165. "metadata": {
  166. "account_info": account_info,
  167. "raw_data": response,
  168. },
  169. }
  170. result = _build_success_result("公众号详情信息", normalized)
  171. log_tool_call("fetch_weixin_account", params, format_tool_result_for_log(result))
  172. return result
  173. except Exception as e:
  174. last_error = e
  175. logger.warning("fetch_weixin_account 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e)
  176. if attempt < _MAX_RETRIES:
  177. await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
  178. logger.error("fetch_weixin_account 重试 %d 次后仍失败", _MAX_RETRIES)
  179. result = ToolResult(
  180. title="公众号详情获取失败",
  181. output="",
  182. error=str(last_error),
  183. metadata=params,
  184. )
  185. log_tool_call("fetch_weixin_account", params, format_tool_result_for_log(result))
  186. return result
  187. @tool(description="通过微信公众号的 wx_gh 获取微信公众号的历史发文列表")
  188. async def fetch_account_article_list(
  189. wx_gh: str,
  190. index: str | None = None,
  191. is_cache: bool = True,
  192. ctx: ToolContext = None,
  193. ) -> ToolResult:
  194. """
  195. 通过公众号的 wx_gh 获取历史发文列表
  196. Args:
  197. wx_gh: 公众号ID
  198. index: 分页索引
  199. is_cache: 是否使用缓存
  200. Returns:
  201. ToolResult: 包含以下内容:
  202. - output: 文本格式历史发文列表摘要
  203. - metadata.next_cursor: 游标,用于下一页查询
  204. - metadata.articles: 历史发文列表
  205. - msg_id: 发布消息ID
  206. - title: 文章标题
  207. - digest: 文章摘要描述
  208. - content_url: 文章链接
  209. - cover_url: 封面链接
  210. - create_time: 文章发布时间戳
  211. - position: 文章位置
  212. - statistics: 统计数据
  213. - view_count: 文章阅读量
  214. - like_count: 文章点赞量
  215. - pay_count: 文章付费量
  216. - zs_count: 文章赞赏量
  217. - metadata.raw_data: 原始 API 返回数据
  218. Note:
  219. - 使用 next_cursor 参数可以获取下一页结果
  220. - 建议从 metadata.history_articles 获取结构化数据,而非解析 output 文本
  221. - metadata.raw_data: 原始 API 返回数据
  222. """
  223. url = "{}/blogger".format(base_url)
  224. payload = json.dumps(
  225. {
  226. "account_id": wx_gh,
  227. "cursor": index,
  228. "token": "1fa4c0ad5c66e43ebd525611f3869f53",
  229. "is_cache": is_cache,
  230. }
  231. )
  232. params = {"wx_gh": wx_gh, "index": index, "is_cache": is_cache}
  233. last_error: Exception | None = None
  234. for attempt in range(1, _MAX_RETRIES + 1):
  235. try:
  236. async with AsyncHttpClient(timeout=120) as http_client:
  237. response = await http_client.post(url=url, headers=headers, data=payload)
  238. # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
  239. _check_upstream(response, "fetch_account_article_list")
  240. extracted = extract_history_articles(response)
  241. articles = extracted.get("articles", [])
  242. normalized = {
  243. "output": f"公众号 {wx_gh} 历史发文 {len(articles)} 篇",
  244. "metadata": {
  245. "next_cursor": extracted.get("next_cursor"),
  246. "articles": articles,
  247. "raw_data": response,
  248. },
  249. }
  250. result = _build_success_result("公众号历史发文列表", normalized)
  251. log_tool_call("fetch_account_article_list", params, format_tool_result_for_log(result))
  252. return result
  253. except Exception as e:
  254. last_error = e
  255. logger.warning(
  256. "fetch_account_article_list 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e
  257. )
  258. if attempt < _MAX_RETRIES:
  259. await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
  260. logger.error("fetch_account_article_list 重试 %d 次后仍失败", _MAX_RETRIES)
  261. result = ToolResult(
  262. title="公众号历史发文获取失败",
  263. output="",
  264. error=str(last_error),
  265. metadata=params,
  266. )
  267. log_tool_call("fetch_account_article_list", params, format_tool_result_for_log(result))
  268. return result
  269. @tool(description="通过公众号文章链接获取文章详情")
  270. async def fetch_article_detail(
  271. article_link: str,
  272. is_count: bool = True,
  273. is_cache: bool = True,
  274. ctx: ToolContext = None,
  275. ) -> ToolResult:
  276. """
  277. 通过公众号的 文章链接获取文章详情
  278. Args:
  279. article_link: 文章链接
  280. is_count: 是否统计文章阅读量 默认 False
  281. is_cache: 是否使用缓存 默认 True
  282. Returns:
  283. ToolResult: 包含以下内容:
  284. - output: 文本格式文章详情摘要
  285. - metadata.article_info: 文章详情信息
  286. - title: 文章标题
  287. - channel_content_id: 文章内部ID
  288. - content_link: 文章链接
  289. - body_text: 文章正文文本
  290. - mini_program: 文章嵌入小程序信息【若无则是空数组】
  291. - image_url_list: 文章图片列表【若无则是空数组】
  292. - publish_timestamp: 文章发布时间戳【毫秒时间戳】
  293. - view_count: 文章阅读量
  294. - like_count: 文章点赞量
  295. - share_count: 文章分享量
  296. - looking_count: 文章在看量
  297. - metadata.raw_data: 原始 API 返回数据
  298. Note:
  299. - 建议从 metadata.article_info 获取结构化数据,而非解析 output 文本
  300. - metadata.raw_data: 原始 API 返回数据
  301. """
  302. target_url = f"{base_url}/detail"
  303. payload = json.dumps(
  304. {
  305. "content_link": article_link,
  306. "is_count": is_count,
  307. "is_ad": False,
  308. "is_cache": is_cache,
  309. }
  310. )
  311. params = {"article_link": article_link, "is_count": is_count, "is_cache": is_cache}
  312. last_error: Exception | None = None
  313. async with _detail_lock:
  314. for attempt in range(1, _MAX_RETRIES + 1):
  315. try:
  316. async with AsyncHttpClient(timeout=30) as http_client:
  317. response = await http_client.post(target_url, headers=headers, data=payload)
  318. # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
  319. _check_upstream(response, "fetch_article_detail")
  320. # API 返回格式: {"code": 0, "data": {"data": {"title": ..., "body_text": ..., ...}}}
  321. raw_detail = (response.get("data") or {}).get("data") or {}
  322. article_info = {
  323. "title": raw_detail.get("title", ""),
  324. "channel_content_id": raw_detail.get("channel_content_id", ""),
  325. "content_link": raw_detail.get("content_link", article_link),
  326. "body_text": raw_detail.get("body_text", ""),
  327. "mini_program": raw_detail.get("mini_program", []),
  328. "image_url_list": raw_detail.get("image_url_list", []),
  329. "publish_timestamp": raw_detail.get("publish_timestamp", 0),
  330. "view_count": raw_detail.get("view_count") or 0,
  331. "like_count": raw_detail.get("like_count") or 0,
  332. "share_count": raw_detail.get("share_count") or 0,
  333. "looking_count": raw_detail.get("looking_count") or 0,
  334. }
  335. normalized = {
  336. "output": f"文章详情: {article_info['title']}",
  337. "metadata": {
  338. "article_info": article_info,
  339. "raw_data": response,
  340. },
  341. }
  342. result = _build_success_result(article_info["title"] or "文章详情", normalized)
  343. log_tool_call("fetch_article_detail", params, format_tool_result_for_log(result))
  344. return result
  345. except Exception as e:
  346. last_error = e
  347. logger.warning(
  348. "fetch_article_detail 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e
  349. )
  350. if attempt < _MAX_RETRIES:
  351. await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
  352. logger.error("fetch_article_detail 重试 %d 次后仍失败", _MAX_RETRIES)
  353. result = ToolResult(
  354. title="文章详情获取失败",
  355. output="",
  356. error=str(last_error),
  357. metadata=params,
  358. )
  359. log_tool_call("fetch_article_detail", params, format_tool_result_for_log(result))
  360. return result
  361. if __name__ == "__main__":
  362. url = "http://mp.weixin.qq.com/s?__biz=MjM5ODI5NTE2MA==&mid=2651871172&idx=1&sn=791630221da3b28fc23949c48c994218&chksm=bc39e9a2a29ea779aef9f6a510f24c3b0addfbc08c86d2d20f8bce0c132fc9b0bed98dc6c8ee&scene=7#rd"
  363. async def run():
  364. # response = await fetch_article_detail(url)
  365. response = await weixin_search("伊朗局势")
  366. import json
  367. logger.info(json.dumps(response, ensure_ascii=False, indent=4))
  368. import asyncio
  369. asyncio.run(run())