weixin_tools.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from __future__ import annotations
  2. import json
  3. import logging
  4. from agent.tools import tool, ToolContext, ToolResult
  5. from src.infra.shared.http_client import AsyncHttpClient
  6. from src.infra.shared.common import extract_history_articles
  7. logger = logging.getLogger(__name__)
  8. # url from aigc
  9. base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
  10. headers = {"Content-Type": "application/json"}
  11. def _build_success_result(title: str, response: dict) -> ToolResult:
  12. """把上游响应规范为 ToolResult。"""
  13. output = response.get("output")
  14. if not output:
  15. output = json.dumps(response, ensure_ascii=False)
  16. metadata = response.get("metadata")
  17. if not isinstance(metadata, dict):
  18. metadata = {"raw_data": response}
  19. elif "raw_data" not in metadata:
  20. metadata["raw_data"] = response
  21. return ToolResult(title=title, output=output, metadata=metadata)
  22. @tool(description="通过关键词搜索微信文章")
  23. async def weixin_search(keyword: str, page: str = "1", ctx: ToolContext = None) -> ToolResult:
  24. """
  25. 微信关键词搜索
  26. 通过关键词搜索微信的文章信息,page 用于控制翻页
  27. Args:
  28. keyword: 搜索关键词
  29. Returns:
  30. ToolResult: 包含以下内容:
  31. - output: 文本格式的搜索结果摘要
  32. - metadata.search_results: 结构化的搜索结果列表
  33. - title: 文章标题
  34. - url: 文章链接
  35. - statistics: 统计数据
  36. - time: 文章发布时间戳(秒)
  37. - metadata.raw_data: 原始 API 返回数据
  38. Note:
  39. - 使用 next_cursor 参数可以获取下一页结果
  40. - 建议从 metadata.search_results 获取结构化数据,而非解析 output 文本
  41. - 返回的 next_cursor 值可用于下一次搜索的 cursor 参数
  42. """
  43. url = "{}/keyword".format(base_url)
  44. payload = json.dumps({"keyword": keyword, "cursor": page})
  45. try:
  46. async with AsyncHttpClient(timeout=120) as http_client:
  47. response = await http_client.post(url=url, headers=headers, data=payload)
  48. return _build_success_result("微信文章搜索结果", response)
  49. except Exception as e:
  50. logger.exception("weixin_search failed")
  51. return ToolResult(
  52. title="微信文章搜索失败",
  53. output="",
  54. error=str(e),
  55. metadata={"keyword": keyword, "page": page},
  56. )
  57. @tool(description="通过公众号文章链接获取公众号详情信息")
  58. async def fetch_weixin_account(content_link: str, ctx: ToolContext = None) -> ToolResult:
  59. """
  60. 通过公众号文章链接获取公众号的详情信息
  61. Args:
  62. content_link: 公众号文章链接
  63. Returns:
  64. ToolResult: 包含以下内容:
  65. - output: 文本格式的公众号详情摘要
  66. - metadata.account_info: 公众号详情信息
  67. - account_name: 公众号名称
  68. - wx_gh: 公众号ID
  69. - biz_info: 公众号biz信息
  70. - channel_account_id: 公众号账号内部ID
  71. - metadata.raw_data: 原始 API 返回数据
  72. Note:
  73. - 建议从 metadata.account_info 获取结构化数据,而非解析 output 文本
  74. """
  75. url = "{}/account_info".format(base_url)
  76. payload = json.dumps({"content_link": content_link, "is_cache": False})
  77. try:
  78. async with AsyncHttpClient(timeout=120) as http_client:
  79. response = await http_client.post(url=url, headers=headers, data=payload)
  80. return _build_success_result("公众号详情信息", response)
  81. except Exception as e:
  82. logger.exception("fetch_weixin_account failed")
  83. return ToolResult(
  84. title="公众号详情获取失败",
  85. output="",
  86. error=str(e),
  87. metadata={"content_link": content_link},
  88. )
  89. @tool(description="通过微信公众号的 wx_gh 获取微信公众号的历史发文列表")
  90. async def fetch_account_article_list(
  91. wx_gh: str,
  92. index: str | None = None,
  93. is_cache: bool = True,
  94. ctx: ToolContext = None,
  95. ) -> ToolResult:
  96. """
  97. 通过公众号的 wx_gh 获取历史发文列表
  98. Args:
  99. wx_gh: 公众号ID
  100. index: 分页索引
  101. is_cache: 是否使用缓存
  102. Returns:
  103. ToolResult: 包含以下内容:
  104. - output: 文本格式历史发文列表摘要
  105. - metadata.next_cursor: 游标,用于下一页查询
  106. - metadata.articles: 历史发文列表
  107. - msg_id: 发布消息ID
  108. - title: 文章标题
  109. - digest: 文章摘要描述
  110. - content_url: 文章链接
  111. - cover_url: 封面链接
  112. - create_time: 文章发布时间戳
  113. - position: 文章位置
  114. - statistics: 统计数据
  115. - view_count: 文章阅读量
  116. - like_count: 文章点赞量
  117. - pay_count: 文章付费量
  118. - zs_count: 文章赞赏量
  119. - metadata.raw_data: 原始 API 返回数据
  120. Note:
  121. - 使用 next_cursor 参数可以获取下一页结果
  122. - 建议从 metadata.history_articles 获取结构化数据,而非解析 output 文本
  123. - metadata.raw_data: 原始 API 返回数据
  124. """
  125. url = "{}/blogger".format(base_url)
  126. payload = json.dumps(
  127. {
  128. "account_id": wx_gh,
  129. "cursor": index,
  130. "token": "1fa4c0ad5c66e43ebd525611f3869f53",
  131. "is_cache": is_cache,
  132. }
  133. )
  134. try:
  135. async with AsyncHttpClient(timeout=120) as http_client:
  136. response = await http_client.post(url=url, headers=headers, data=payload)
  137. normalized = extract_history_articles(response)
  138. return _build_success_result("公众号历史发文列表", normalized)
  139. except Exception as e:
  140. logger.exception("fetch_account_article_list failed")
  141. return ToolResult(
  142. title="公众号历史发文获取失败",
  143. output="",
  144. error=str(e),
  145. metadata={"wx_gh": wx_gh, "index": index, "is_cache": is_cache},
  146. )
  147. @tool(description="通过公众号文章链接获取文章详情")
  148. async def fetch_article_detail(
  149. article_link: str,
  150. is_count: bool = False,
  151. is_cache: bool = True,
  152. ctx: ToolContext = None,
  153. ) -> ToolResult:
  154. """
  155. 通过公众号的 文章链接获取文章详情
  156. Args:
  157. article_link: 文章链接
  158. is_count: 是否统计文章阅读量 默认 False
  159. is_cache: 是否使用缓存 默认 True
  160. Returns:
  161. ToolResult: 包含以下内容:
  162. - output: 文本格式文章详情摘要
  163. - metadata.article_info: 文章详情信息
  164. - title: 文章标题
  165. - channel_content_id: 文章内部ID
  166. - content_link: 文章链接
  167. - body_text: 文章正文文本
  168. - mini_program: 文章嵌入小程序信息【若无则是空数组】
  169. - image_url_list: 文章图片列表【若无则是空数组】
  170. - publish_timestamp: 文章发布时间戳【毫秒时间戳】
  171. - metadata.raw_data: 原始 API 返回数据
  172. Note:
  173. - 建议从 metadata.article_info 获取结构化数据,而非解析 output 文本
  174. - metadata.raw_data: 原始 API 返回数据
  175. """
  176. target_url = f"{base_url}/detail"
  177. payload = json.dumps(
  178. {
  179. "content_link": article_link,
  180. "is_count": is_count,
  181. "is_ad": False,
  182. "is_cache": is_cache,
  183. }
  184. )
  185. try:
  186. async with AsyncHttpClient(timeout=10) as http_client:
  187. response = await http_client.post(target_url, headers=headers, data=payload)
  188. return _build_success_result("文章详情信息", response)
  189. except Exception as e:
  190. logger.exception("fetch_article_detail failed")
  191. return ToolResult(
  192. title="文章详情获取失败",
  193. output="",
  194. error=str(e),
  195. metadata={
  196. "article_link": article_link,
  197. "is_count": is_count,
  198. "is_cache": is_cache,
  199. },
  200. )
  201. if __name__ == "__main__":
  202. url = "http://mp.weixin.qq.com/s?__biz=MjM5ODI5NTE2MA==&mid=2651871172&idx=1&sn=791630221da3b28fc23949c48c994218&chksm=bc39e9a2a29ea779aef9f6a510f24c3b0addfbc08c86d2d20f8bce0c132fc9b0bed98dc6c8ee&scene=7#rd"
  203. async def run():
  204. response = await fetch_article_detail(url)
  205. import json
  206. print(json.dumps(response, ensure_ascii=False, indent=4))
  207. import asyncio
  208. asyncio.run(run())