| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422 |
- from __future__ import annotations
- import asyncio
- import json
- import logging
- from agent.tools import tool, ToolContext, ToolResult
- from src.infra.shared.http_client import AsyncHttpClient
- from src.infra.shared.common import extract_history_articles
- from src.infra.trace.logging.tool_logging import format_tool_result_for_log, log_tool_call
- logger = logging.getLogger(__name__)
- # url from aigc
- base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
- headers = {"Content-Type": "application/json"}
- # fetch_article_detail 串行锁:防止并发请求压垮上游
- _detail_lock = asyncio.Lock()
- # 重试配置
- _MAX_RETRIES = 3
- _RETRY_DELAYS = (2, 4, 8) # 指数退避(秒)
- class _UpstreamError(Exception):
- """
- 上游返回业务级失败。
- 触发场景:HTTP 200 但响应体中 code != 0 且 data 为 None,
- 例如 {"code": 10000, "msg": "未知错误", "data": null}。
- 这种情况与网络异常一样需要重试。
- """
- def _check_upstream(response: dict, tool_name: str) -> None:
- """
- 检查上游响应是否为业务级失败,若是则抛出 _UpstreamError。
- 判定条件:code 字段非 0 且 data 字段为 None。
- 调用方应在拿到 response 后立即调用此函数,
- 从而让外层重试逻辑统一捕获。
- """
- code = response.get("code")
- data = response.get("data")
- if code != 0 and data is None:
- msg = response.get("msg", "")
- raise _UpstreamError(
- f"[{tool_name}] 上游业务错误: code={code}, msg={msg}"
- )
- def _build_success_result(title: str, response: dict) -> ToolResult:
- """把上游响应规范为 ToolResult。"""
- output = response.get("output")
- if not output:
- output = json.dumps(response, ensure_ascii=False)
- metadata = response.get("metadata")
- if not isinstance(metadata, dict):
- metadata = {"raw_data": response}
- elif "raw_data" not in metadata:
- metadata["raw_data"] = response
- return ToolResult(title=title, output=output, metadata=metadata)
- @tool(description="通过关键词搜索微信文章")
- async def weixin_search(keyword: str, page: str = "1", ctx: ToolContext = None) -> ToolResult:
- """
- 微信关键词搜索
- 通过关键词搜索微信的文章信息,page 用于控制翻页
- Args:
- keyword: 搜索关键词
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式的搜索结果摘要
- - metadata.search_results: 结构化的搜索结果列表
- - title: 文章标题
- - url: 文章链接
- - statistics: 统计数据
- - time: 文章发布时间戳(秒)
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 使用 next_cursor 参数可以获取下一页结果
- - 建议从 metadata.search_results 获取结构化数据,而非解析 output 文本
- - 返回的 next_cursor 值可用于下一次搜索的 cursor 参数
- """
- url = "{}/keyword".format(base_url)
- payload = json.dumps({"keyword": keyword, "cursor": page})
- params = {"keyword": keyword, "page": page}
- last_error: Exception | None = None
- for attempt in range(1, _MAX_RETRIES + 1):
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- print(json.dumps(response, ensure_ascii=False, indent=4))
- # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
- _check_upstream(response, "weixin_search")
- # API 返回格式: {"code": 0, "data": {"data": [...], "next_cursor": "2"}}
- # 需要将 data.data 映射为 search_results,每条记录的 time 字段包装到 statistics 中
- raw_items = (response.get("data") or {}).get("data") or []
- search_results = []
- for item in raw_items:
- search_results.append({
- "title": item.get("title", ""),
- "url": item.get("url", ""),
- "statistics": {"time": item.get("time", 0)},
- "cover_url": item.get("cover_url", ""),
- "nick_name": item.get("nick_name", ""),
- "biz": item.get("biz", ""),
- })
- next_cursor = (response.get("data") or {}).get("next_cursor")
- # output 中嵌入结构化文章数据,使 Agent 和下游解析都能获取完整结果
- articles_json = json.dumps(search_results, ensure_ascii=False)
- output_text = (
- f"搜索关键词「{keyword}」返回 {len(search_results)} 条结果\n"
- f"```json\n{articles_json}\n```"
- )
- normalized = {
- "output": output_text,
- "metadata": {
- "search_results": search_results,
- "next_cursor": next_cursor,
- "raw_data": response,
- },
- }
- result = _build_success_result("微信文章搜索结果", normalized)
- log_tool_call("weixin_search", params, format_tool_result_for_log(result))
- return result
- except Exception as e:
- last_error = e
- logger.warning("weixin_search 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e)
- if attempt < _MAX_RETRIES:
- await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
- logger.error("weixin_search 重试 %d 次后仍失败", _MAX_RETRIES)
- result = ToolResult(
- title="微信文章搜索失败",
- output="",
- error=str(last_error),
- metadata=params,
- )
- log_tool_call("weixin_search", params, format_tool_result_for_log(result))
- return result
- @tool(description="通过公众号文章链接获取公众号详情信息")
- async def fetch_weixin_account(content_link: str, ctx: ToolContext = None) -> ToolResult:
- """
- 通过公众号文章链接获取公众号的详情信息
- Args:
- content_link: 公众号文章链接
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式的公众号详情摘要
- - metadata.account_info: 公众号详情信息
- - account_name: 公众号名称
- - wx_gh: 公众号ID
- - biz_info: 公众号biz信息
- - channel_account_id: 公众号账号内部ID
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 建议从 metadata.account_info 获取结构化数据,而非解析 output 文本
- """
- url = "{}/account_info".format(base_url)
- payload = json.dumps({"content_link": content_link, "is_cache": False})
- params = {"content_link": content_link}
- last_error: Exception | None = None
- for attempt in range(1, _MAX_RETRIES + 1):
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
- _check_upstream(response, "fetch_weixin_account")
- # API 返回格式: {"code": 0, "data": {"data": {"account_name": ..., "wx_gh": ..., ...}}}
- raw_data = (response.get("data") or {}).get("data") or {}
- account_info = {
- "account_name": raw_data.get("account_name", ""),
- "wx_gh": raw_data.get("wx_gh", ""),
- "biz_info": raw_data.get("biz_info", {}),
- "channel_account_id": raw_data.get("channel_account_id", ""),
- }
- normalized = {
- "output": f"公众号: {account_info['account_name']} (wx_gh={account_info['wx_gh']})",
- "metadata": {
- "account_info": account_info,
- "raw_data": response,
- },
- }
- result = _build_success_result("公众号详情信息", normalized)
- log_tool_call("fetch_weixin_account", params, format_tool_result_for_log(result))
- return result
- except Exception as e:
- last_error = e
- logger.warning("fetch_weixin_account 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e)
- if attempt < _MAX_RETRIES:
- await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
- logger.error("fetch_weixin_account 重试 %d 次后仍失败", _MAX_RETRIES)
- result = ToolResult(
- title="公众号详情获取失败",
- output="",
- error=str(last_error),
- metadata=params,
- )
- log_tool_call("fetch_weixin_account", params, format_tool_result_for_log(result))
- return result
- @tool(description="通过微信公众号的 wx_gh 获取微信公众号的历史发文列表")
- async def fetch_account_article_list(
- wx_gh: str,
- index: str | None = None,
- is_cache: bool = True,
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 通过公众号的 wx_gh 获取历史发文列表
- Args:
- wx_gh: 公众号ID
- index: 分页索引
- is_cache: 是否使用缓存
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式历史发文列表摘要
- - metadata.next_cursor: 游标,用于下一页查询
- - metadata.articles: 历史发文列表
- - msg_id: 发布消息ID
- - title: 文章标题
- - digest: 文章摘要描述
- - content_url: 文章链接
- - cover_url: 封面链接
- - create_time: 文章发布时间戳
- - position: 文章位置
- - statistics: 统计数据
- - view_count: 文章阅读量
- - like_count: 文章点赞量
- - pay_count: 文章付费量
- - zs_count: 文章赞赏量
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 使用 next_cursor 参数可以获取下一页结果
- - 建议从 metadata.history_articles 获取结构化数据,而非解析 output 文本
- - metadata.raw_data: 原始 API 返回数据
- """
- url = "{}/blogger".format(base_url)
- payload = json.dumps(
- {
- "account_id": wx_gh,
- "cursor": index,
- "token": "1fa4c0ad5c66e43ebd525611f3869f53",
- "is_cache": is_cache,
- }
- )
- params = {"wx_gh": wx_gh, "index": index, "is_cache": is_cache}
- last_error: Exception | None = None
- for attempt in range(1, _MAX_RETRIES + 1):
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
- _check_upstream(response, "fetch_account_article_list")
- extracted = extract_history_articles(response)
- articles = extracted.get("articles", [])
- normalized = {
- "output": f"公众号 {wx_gh} 历史发文 {len(articles)} 篇",
- "metadata": {
- "next_cursor": extracted.get("next_cursor"),
- "articles": articles,
- "raw_data": response,
- },
- }
- result = _build_success_result("公众号历史发文列表", normalized)
- log_tool_call("fetch_account_article_list", params, format_tool_result_for_log(result))
- return result
- except Exception as e:
- last_error = e
- logger.warning(
- "fetch_account_article_list 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e
- )
- if attempt < _MAX_RETRIES:
- await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
- logger.error("fetch_account_article_list 重试 %d 次后仍失败", _MAX_RETRIES)
- result = ToolResult(
- title="公众号历史发文获取失败",
- output="",
- error=str(last_error),
- metadata=params,
- )
- log_tool_call("fetch_account_article_list", params, format_tool_result_for_log(result))
- return result
- @tool(description="通过公众号文章链接获取文章详情")
- async def fetch_article_detail(
- article_link: str,
- is_count: bool = True,
- is_cache: bool = True,
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 通过公众号的 文章链接获取文章详情
- Args:
- article_link: 文章链接
- is_count: 是否统计文章阅读量 默认 False
- is_cache: 是否使用缓存 默认 True
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式文章详情摘要
- - metadata.article_info: 文章详情信息
- - title: 文章标题
- - channel_content_id: 文章内部ID
- - content_link: 文章链接
- - body_text: 文章正文文本
- - mini_program: 文章嵌入小程序信息【若无则是空数组】
- - image_url_list: 文章图片列表【若无则是空数组】
- - publish_timestamp: 文章发布时间戳【毫秒时间戳】
- - view_count: 文章阅读量
- - like_count: 文章点赞量
- - share_count: 文章分享量
- - looking_count: 文章在看量
-
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 建议从 metadata.article_info 获取结构化数据,而非解析 output 文本
- - metadata.raw_data: 原始 API 返回数据
- """
- target_url = f"{base_url}/detail"
- payload = json.dumps(
- {
- "content_link": article_link,
- "is_count": is_count,
- "is_ad": False,
- "is_cache": is_cache,
- }
- )
- params = {"article_link": article_link, "is_count": is_count, "is_cache": is_cache}
- last_error: Exception | None = None
- async with _detail_lock:
- for attempt in range(1, _MAX_RETRIES + 1):
- try:
- async with AsyncHttpClient(timeout=30) as http_client:
- response = await http_client.post(target_url, headers=headers, data=payload)
- # 业务级失败(code 非 0 且 data 为 None)视为可重试错误
- _check_upstream(response, "fetch_article_detail")
- # API 返回格式: {"code": 0, "data": {"data": {"title": ..., "body_text": ..., ...}}}
- raw_detail = (response.get("data") or {}).get("data") or {}
- article_info = {
- "title": raw_detail.get("title", ""),
- "channel_content_id": raw_detail.get("channel_content_id", ""),
- "content_link": raw_detail.get("content_link", article_link),
- "body_text": raw_detail.get("body_text", ""),
- "mini_program": raw_detail.get("mini_program", []),
- "image_url_list": raw_detail.get("image_url_list", []),
- "publish_timestamp": raw_detail.get("publish_timestamp", 0),
- "view_count": raw_detail.get("view_count") or 0,
- "like_count": raw_detail.get("like_count") or 0,
- "share_count": raw_detail.get("share_count") or 0,
- "looking_count": raw_detail.get("looking_count") or 0,
- }
- normalized = {
- "output": f"文章详情: {article_info['title']}",
- "metadata": {
- "article_info": article_info,
- "raw_data": response,
- },
- }
- result = _build_success_result(article_info["title"] or "文章详情", normalized)
- log_tool_call("fetch_article_detail", params, format_tool_result_for_log(result))
- return result
- except Exception as e:
- last_error = e
- logger.warning(
- "fetch_article_detail 第 %d/%d 次失败: %s", attempt, _MAX_RETRIES, e
- )
- if attempt < _MAX_RETRIES:
- await asyncio.sleep(_RETRY_DELAYS[attempt - 1])
- logger.error("fetch_article_detail 重试 %d 次后仍失败", _MAX_RETRIES)
- result = ToolResult(
- title="文章详情获取失败",
- output="",
- error=str(last_error),
- metadata=params,
- )
- log_tool_call("fetch_article_detail", params, format_tool_result_for_log(result))
- return result
- if __name__ == "__main__":
- url = "http://mp.weixin.qq.com/s?__biz=MjM5ODI5NTE2MA==&mid=2651871172&idx=1&sn=791630221da3b28fc23949c48c994218&chksm=bc39e9a2a29ea779aef9f6a510f24c3b0addfbc08c86d2d20f8bce0c132fc9b0bed98dc6c8ee&scene=7#rd"
- async def run():
- # response = await fetch_article_detail(url)
- response = await weixin_search("伊朗局势")
- import json
- logger.info(json.dumps(response, ensure_ascii=False, indent=4))
- import asyncio
- asyncio.run(run())
|