| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- from __future__ import annotations
- import json
- import logging
- from agent.tools import tool, ToolContext, ToolResult
- from src.infra.shared.http_client import AsyncHttpClient
- from src.infra.shared.common import extract_history_articles
- logger = logging.getLogger(__name__)
- # url from aigc
- base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
- headers = {"Content-Type": "application/json"}
- def _build_success_result(title: str, response: dict) -> ToolResult:
- """把上游响应规范为 ToolResult。"""
- output = response.get("output")
- if not output:
- output = json.dumps(response, ensure_ascii=False)
- metadata = response.get("metadata")
- if not isinstance(metadata, dict):
- metadata = {"raw_data": response}
- elif "raw_data" not in metadata:
- metadata["raw_data"] = response
- return ToolResult(title=title, output=output, metadata=metadata)
- @tool(description="通过关键词搜索微信文章")
- async def weixin_search(keyword: str, page: str = "1", ctx: ToolContext = None) -> ToolResult:
- """
- 微信关键词搜索
- 通过关键词搜索微信的文章信息,page 用于控制翻页
- Args:
- keyword: 搜索关键词
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式的搜索结果摘要
- - metadata.search_results: 结构化的搜索结果列表
- - title: 文章标题
- - url: 文章链接
- - statistics: 统计数据
- - time: 文章发布时间戳(秒)
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 使用 next_cursor 参数可以获取下一页结果
- - 建议从 metadata.search_results 获取结构化数据,而非解析 output 文本
- - 返回的 next_cursor 值可用于下一次搜索的 cursor 参数
- """
- url = "{}/keyword".format(base_url)
- payload = json.dumps({"keyword": keyword, "cursor": page})
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- return _build_success_result("微信文章搜索结果", response)
- except Exception as e:
- logger.exception("weixin_search failed")
- return ToolResult(
- title="微信文章搜索失败",
- output="",
- error=str(e),
- metadata={"keyword": keyword, "page": page},
- )
- @tool(description="通过公众号文章链接获取公众号详情信息")
- async def fetch_weixin_account(content_link: str, ctx: ToolContext = None) -> ToolResult:
- """
- 通过公众号文章链接获取公众号的详情信息
- Args:
- content_link: 公众号文章链接
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式的公众号详情摘要
- - metadata.account_info: 公众号详情信息
- - account_name: 公众号名称
- - wx_gh: 公众号ID
- - biz_info: 公众号biz信息
- - channel_account_id: 公众号账号内部ID
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 建议从 metadata.account_info 获取结构化数据,而非解析 output 文本
- """
- url = "{}/account_info".format(base_url)
- payload = json.dumps({"content_link": content_link, "is_cache": False})
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- return _build_success_result("公众号详情信息", response)
- except Exception as e:
- logger.exception("fetch_weixin_account failed")
- return ToolResult(
- title="公众号详情获取失败",
- output="",
- error=str(e),
- metadata={"content_link": content_link},
- )
- @tool(description="通过微信公众号的 wx_gh 获取微信公众号的历史发文列表")
- async def fetch_account_article_list(
- wx_gh: str,
- index: str | None = None,
- is_cache: bool = True,
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 通过公众号的 wx_gh 获取历史发文列表
- Args:
- wx_gh: 公众号ID
- index: 分页索引
- is_cache: 是否使用缓存
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式历史发文列表摘要
- - metadata.next_cursor: 游标,用于下一页查询
- - metadata.articles: 历史发文列表
- - msg_id: 发布消息ID
- - title: 文章标题
- - digest: 文章摘要描述
- - content_url: 文章链接
- - cover_url: 封面链接
- - create_time: 文章发布时间戳
- - position: 文章位置
- - statistics: 统计数据
- - view_count: 文章阅读量
- - like_count: 文章点赞量
- - pay_count: 文章付费量
- - zs_count: 文章赞赏量
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 使用 next_cursor 参数可以获取下一页结果
- - 建议从 metadata.history_articles 获取结构化数据,而非解析 output 文本
- - metadata.raw_data: 原始 API 返回数据
- """
- url = "{}/blogger".format(base_url)
- payload = json.dumps(
- {
- "account_id": wx_gh,
- "cursor": index,
- "token": "1fa4c0ad5c66e43ebd525611f3869f53",
- "is_cache": is_cache,
- }
- )
- try:
- async with AsyncHttpClient(timeout=120) as http_client:
- response = await http_client.post(url=url, headers=headers, data=payload)
- normalized = extract_history_articles(response)
- return _build_success_result("公众号历史发文列表", normalized)
- except Exception as e:
- logger.exception("fetch_account_article_list failed")
- return ToolResult(
- title="公众号历史发文获取失败",
- output="",
- error=str(e),
- metadata={"wx_gh": wx_gh, "index": index, "is_cache": is_cache},
- )
- @tool(description="通过公众号文章链接获取文章详情")
- async def fetch_article_detail(
- article_link: str,
- is_count: bool = False,
- is_cache: bool = True,
- ctx: ToolContext = None,
- ) -> ToolResult:
- """
- 通过公众号的 文章链接获取文章详情
- Args:
- article_link: 文章链接
- is_count: 是否统计文章阅读量 默认 False
- is_cache: 是否使用缓存 默认 True
- Returns:
- ToolResult: 包含以下内容:
- - output: 文本格式文章详情摘要
- - metadata.article_info: 文章详情信息
- - title: 文章标题
- - channel_content_id: 文章内部ID
- - content_link: 文章链接
- - body_text: 文章正文文本
- - mini_program: 文章嵌入小程序信息【若无则是空数组】
- - image_url_list: 文章图片列表【若无则是空数组】
- - publish_timestamp: 文章发布时间戳【毫秒时间戳】
- - metadata.raw_data: 原始 API 返回数据
- Note:
- - 建议从 metadata.article_info 获取结构化数据,而非解析 output 文本
- - metadata.raw_data: 原始 API 返回数据
- """
- target_url = f"{base_url}/detail"
- payload = json.dumps(
- {
- "content_link": article_link,
- "is_count": is_count,
- "is_ad": False,
- "is_cache": is_cache,
- }
- )
- try:
- async with AsyncHttpClient(timeout=10) as http_client:
- response = await http_client.post(target_url, headers=headers, data=payload)
- return _build_success_result("文章详情信息", response)
- except Exception as e:
- logger.exception("fetch_article_detail failed")
- return ToolResult(
- title="文章详情获取失败",
- output="",
- error=str(e),
- metadata={
- "article_link": article_link,
- "is_count": is_count,
- "is_cache": is_cache,
- },
- )
- if __name__ == "__main__":
- url = "http://mp.weixin.qq.com/s?__biz=MjM5ODI5NTE2MA==&mid=2651871172&idx=1&sn=791630221da3b28fc23949c48c994218&chksm=bc39e9a2a29ea779aef9f6a510f24c3b0addfbc08c86d2d20f8bce0c132fc9b0bed98dc6c8ee&scene=7#rd"
- async def run():
- response = await fetch_article_detail(url)
- import json
- print(json.dumps(response, ensure_ascii=False, indent=4))
- import asyncio
- asyncio.run(run())
|