luojunhui před 1 dnem
rodič
revize
36c5a922db

+ 86 - 0
src/infra/shared/common.py

@@ -0,0 +1,86 @@
+from typing import Any, Dict, List, Optional
+
+
+def show_desc_to_sta(show_desc):
+    def decode_show_v(show_v):
+        """
+
+        :param show_v:
+        :return:
+        """
+        foo = show_v.replace("千", "e3").replace("万", "e4").replace("亿", "e8")
+        foo = eval(foo)
+        return int(foo)
+
+    def decode_show_k(show_k):
+        """
+
+        :param show_k:
+        :return:
+        """
+        this_dict = {
+            "阅读": "show_view_count",  # 文章
+            "看过": "show_view_count",  # 图文
+            "观看": "show_view_count",  # 视频
+            "赞": "show_like_count",
+            "付费": "show_pay_count",
+            "赞赏": "show_zs_count",
+        }
+        if show_k not in this_dict:
+            print(f"error from decode_show_k, show_k not found: {show_k}")
+        return this_dict.get(show_k, "show_unknown")
+
+    show_desc = show_desc.replace("+", "")
+    sta = {}
+    for show_kv in show_desc.split("\u2004\u2005"):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split("\u2006")
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        "show_view_count": sta.get("show_view_count", 0),
+        "show_like_count": sta.get("show_like_count", 0),
+        "show_pay_count": sta.get("show_pay_count", 0),
+        "show_zs_count": sta.get("show_zs_count", 0),
+    }
+    return res
+
+
+def extract_history_articles(article_response: Dict[str, Any]) -> Dict[str, Any]:
+    code = article_response.get("code")
+    if code != 0:
+        return {"next_cursor": None, "articles": []}
+
+    response = article_response.get("data") or {}
+    cursor = response.get("next_cursor")
+
+    groups: List[Dict[str, Any]] = response.get("data") or []
+
+    article_list: List[Dict[str, Any]] = []
+    for group in groups:
+        app_msg = (group or {}).get("AppMsg") or {}
+        base_info = app_msg.get("BaseInfo") or {}
+        create_time = base_info.get("CreateTime")
+
+        detail_info_list: List[Dict[str, Any]] = app_msg.get("DetailInfo") or []
+        for article in detail_info_list:
+            detail = show_desc_to_sta(article["ShowDesc"])
+            article_list.append(
+                {
+                    "title": article.get("Title"),
+                    "digest": article.get("Digest"),
+                    "content_url": article.get("ContentUrl"),
+                    "create_time": create_time,
+                    "position": article.get("ItemIndex"),
+                    "cover_url": article.get("CoverImgUrl"),
+                    "view_count": detail.get("show_view_count"),
+                    "like_count": detail.get("show_like_count"),
+                    "pay_count": detail.get("show_pay_count"),
+                    "zs_count": detail.get("show_zs_count"),
+                    "msg_id": base_info.get("AppMsgId"),
+                }
+            )
+
+    return {"next_cursor": cursor, "articles": article_list}

+ 99 - 0
src/infra/shared/http_client.py

@@ -0,0 +1,99 @@
+import aiohttp
+from typing import Optional, Union, Dict, Any
+
+
+class AsyncHttpClient:
+    def __init__(
+        self,
+        timeout: int = 10,
+        max_connections: int = 100,
+        default_headers: Optional[Dict[str, str]] = None,
+    ):
+        """
+        简化版异步 HTTP 客户端
+
+        :param timeout: 请求超时时间(秒)
+        :param max_connections: 连接池最大连接数
+        :param default_headers: 默认请求头
+        """
+        self.timeout = aiohttp.ClientTimeout(total=timeout)
+        self.connector = aiohttp.TCPConnector(limit=max_connections)
+        self.default_headers = default_headers or {}
+        self.session = None
+
+    async def __aenter__(self):
+        self.session = aiohttp.ClientSession(
+            connector=self.connector, timeout=self.timeout, headers=self.default_headers
+        )
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.session.close()
+
+    async def request(
+        self,
+        method: str,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """核心请求方法"""
+        request_headers = {**self.default_headers, **(headers or {})}
+
+        try:
+            async with self.session.request(
+                method,
+                url,
+                params=params,
+                data=data,
+                json=json,
+                headers=request_headers,
+            ) as response:
+                response.raise_for_status()
+                content_type = response.headers.get("Content-Type", "")
+
+                if "application/json" in content_type:
+                    return await response.json()
+                return await response.text()
+
+        except aiohttp.ClientResponseError as e:
+            print(f"HTTP error: {e.status} {e.message}")
+            raise
+        except aiohttp.ClientError as e:
+            print(f"Network error: {str(e)}")
+            raise
+
+    async def get(
+        self,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """GET 请求"""
+        return await self.request("GET", url, params=params, headers=headers)
+
+    async def post(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """POST 请求"""
+        return await self.request("POST", url, data=data, json=json, headers=headers)
+
+    async def put(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """
+        PUT 请求
+
+        通常用于更新资源,可以发送表单数据或 JSON 数据
+        """
+        return await self.request("PUT", url, data=data, json=json, headers=headers)

+ 35 - 40
tests/content_finder.prompt

@@ -4,70 +4,65 @@ temperature: 0.3
 ---
 ---
 
 
 $system$
 $system$
-你是一个专业的内容寻找助手,帮助运营人员在抖音平台上寻找符合要求的视频内容。
+你是一个专业的内容寻找助手,帮助运营人员在微信平台上寻找符合要求的文章内容。
 
 
 ## 重要约束
 ## 重要约束
-- 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
-- 可用工具:`douyin_search`、`douyin_user_videos`、`get_content_fans_portrait`、`get_account_fans_portrait`、`store_results_mysql`、`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
+- 只在微信平台搜索,不要切换到其他平台(小红书、B站、抖音等)
+- 可用工具:`wechat_search`、`fetch_weixin_account`、`fetch_account_article_list`、`fetch_article_detail`
 - **严格禁止**调用任何名称以 `browser_` 开头的浏览器工具
 - **严格禁止**调用任何名称以 `browser_` 开头的浏览器工具
 
 
 ## 平台背景
 ## 平台背景
-- 平台载体:微信小程序
+- 平台载体:微信公众号
 - 核心用户群:95% 是 50 岁以上中老年人
 - 核心用户群:95% 是 50 岁以上中老年人
-- 增长方式:微信分享裂变
-- 核心指标:分享率、DAU
+- 增长方式:微信文章阅读率
+- 核心指标:阅读量
 
 
 ## 执行流程(按顺序,禁止跳步)
 ## 执行流程(按顺序,禁止跳步)
-1. **搜索阶段**:按 `content_finding_strategy` 执行
-2. **筛选阶段**:按 `content_filtering_strategy` 执行
-3. **输出阶段**:先按 `output_schema` 写入 `output.json`
-4. **Schema 校验阶段**:逐字段自检;不符合就重写 `output.json`
-5. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
-6. **接入平台阶段**:最后按 `aigc_platform_plan` 生成 AIGC 爬取计划
+1. **搜索阶段(article_finding_strategy)**
+   - 基于 `%query%` 提取关键词并串行搜索。
+   - 搜索结果只从工具返回的结构化字段读取,不解析工具 output 文本。
+   - 先收集候选文章池(数量建议为目标数量 M 的 2 倍)。
+
+2. **过滤阶段(article_filter_strategy)**
+   - 输入:`input_query=%query%` + 候选文章池。
+   - 对候选文章调用 `fetch_article_detail` 获取详情后再过滤。
+   - 按三段执行:
+     - 相关性判断:文章与 `input_query` 的核心意图、对象、场景是否匹配。
+     - 硬性淘汰:低质、夸大、明显跑题内容直接剔除。
+     - 兴趣评分:以 query 需求价值为核心,结合可读性、可信度、情感适配、时效性综合排序。
+   - 若筛选后数量不足(C < M × 0.8),回到搜索阶段补充候选再筛选。
+
+3. **账号沉淀阶段(account_precipitation)**
+   - 对通过过滤的文章逐条调用 `fetch_weixin_account` 获取账号信息。
+   - 账号聚合去重规则:优先 `account_id`,缺失时用 `account_name` 兜底。
+   - 统计账号命中文章数与代表文章(用于内部沉淀与后续复用)。
+   - 账号沉淀为流程内步骤,不改变本次最终输出 Schema。
+
+4. **输出阶段**
+   - 先按 `output_schema` 生成并写入 `output.json`(JSON 格式)。
+   - 仅输出通过过滤后的文章结果(按综合排序取前 M 条)。
 
 
 ## 强制要求(违反即为错误)
 ## 强制要求(违反即为错误)
 
 
-### 画像工具必须调用
-对每条候选内容,**必须**按以下顺序获取画像:
-1. 先调用 `get_content_fans_portrait`,检查 `metadata.has_portrait`
-2. 若 `has_portrait=False`,再调用 `get_account_fans_portrait` 兜底
-3. **不允许跳过画像获取直接输出**
-
 ### 输出字段必须严格遵循 Schema
 ### 输出字段必须严格遵循 Schema
-- 顶层字段只能有:`trace_id`、`query`、`demand_id`、`summary`、`good_account_expansion`、`contents`
-- 每条内容字段只能有:`title`、`aweme_id`、`rank`、`video_url`、`author_nickname`、`author_sec_uid`、`author_url`、`statistics`、`portrait_data`、`reason`
+- 顶层字段只能有:`trace_id`、`query`、`demand_id`、`summary`、`contents`
+- 每条内容字段只能有:`title`、`url`、`statistics`
 - **禁止自创字段**(如 `results`、`metrics`、`tags`、`platform` 等)
 - **禁止自创字段**(如 `results`、`metrics`、`tags`、`platform` 等)
 - **禁止使用中文 key**
 - **禁止使用中文 key**
+- `summary` 中需简要说明三阶段执行情况(搜索数量、过滤后数量、沉淀账号数)
 
 
 ## 流程自检
 ## 流程自检
 
 
 **在宣称任务完成或结束对话前,必须逐项确认;任一项未满足则继续执行,不得提前收尾。**
 **在宣称任务完成或结束对话前,必须逐项确认;任一项未满足则继续执行,不得提前收尾。**
 
 
-### 1.画像(内容 + 账号)是否已获取
-- 对**最终写入 `contents` 的每一条**视频,是否都已调用过 `get_content_fans_portrait(aweme_id)`?
-- 对其中 `metadata.has_portrait=False` 的条目,是否**在同一条目上**已调用 `get_account_fans_portrait(account_id=author.sec_uid)` 作为兜底?
-- **禁止**:仅因内容侧无画像就跳过账号画像、直接把 `portrait_data` 当空或来源标为 `none` 而未尝试账号接口(除非两次调用均失败且已在理由中说明)。
-
 ### 输出、校验、入库顺序是否正确
 ### 输出、校验、入库顺序是否正确
-- 是否已先写 `output.json`,再完成 Schema 校验,最后才调用 `store_results_mysql(trace_id)`?
+- 无需写数据库,直接写入 `output.json` 即可。
+- 输出沉淀的账号
 - **禁止**:未校验 Schema 就直接入库。
 - **禁止**:未校验 Schema 就直接入库。
 
 
-### Schema 合规闸门(入库前必须通过)
-- 在调用 `store_results_mysql` 前,必须逐项核对 `output.json` 是否满足 `output_schema`;**不通过就先重写 JSON,不得入库**。
-- 顶层字段必须且仅能是:`trace_id`、`query`、`demand_id`、`summary`、`good_account_expansion`、`contents`。
-- `summary` 必须是对象,且包含:`candidate_count`、`portrait_content_like_count`、`portrait_account_fans_count`、`portrait_none_count`、`filtered_in_count`(禁止用字符串 summary)。
-- `good_account_expansion` 必须是对象:`{"enabled": <bool>, "accounts": [...]}`;`accounts` 每项字段必须是:`author_nickname`、`author_sec_uid`、`age_50_plus_ratio`、`age_50_plus_tgi`(禁止 `account_name`、`sec_uid` 等别名)。
-- 每条 `contents` 的 `statistics` 字段必须是:`digg_count`、`comment_count`、`share_count`(禁止 `likes` / `comments` / `shares`)。
-- 每条 `contents` 的 `portrait_data.source` 只允许:`content_like`、`account_fans`、`none`(禁止 `content`、`account` 等缩写)。
-- 每条 `contents` 的 `portrait_data` 必须包含:`source`、`age_50_plus_ratio`、`age_50_plus_tgi`、`url`。
-
-### AIGC 接入(爬取计划)是否已接入
-- `contents` 中入选视频是否在**入库成功后**已按 `aigc_platform_plan` 调用 `create_crawler_plan_by_douyin_content_id`?
-- **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
-
 
 
 $user$
 $user$
-任务:找10个与「%query%」相关的、老年人感兴趣的视频
+任务:找10个与「%query%」相关的、老年人感兴趣的文章。
 要求:
 要求:
 - 适合老年人分享观看
 - 适合老年人分享观看
 - 热度要高,质量要好
 - 热度要高,质量要好

+ 9 - 11
tests/run_single.py

@@ -1,13 +1,16 @@
 from typing import Dict, Any, Optional
 from typing import Dict, Any, Optional
 import os
 import os
 from pathlib import Path
 from pathlib import Path
+
+from tools import fetch_account_article_list, fetch_weixin_account, weixin_search
+
 from agent import AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message
 from agent import AgentRunner, RunConfig, FileSystemTraceStore, Trace, Message
 from agent.llm import create_openrouter_llm_call
 from agent.llm import create_openrouter_llm_call
 from agent.llm.prompts import SimplePrompt
 from agent.llm.prompts import SimplePrompt
 from agent.tools.builtin.knowledge import KnowledgeConfig
 from agent.tools.builtin.knowledge import KnowledgeConfig
 
 
 # 默认搜索词
 # 默认搜索词
-DEFAULT_QUERY = "戏曲表演"
+DEFAULT_QUERY = "伊朗、以色列、和平是永恒的主题"
 DEFAULT_DEMAND_ID = 1
 DEFAULT_DEMAND_ID = 1
 
 
 import logging
 import logging
@@ -40,7 +43,7 @@ async def run_agent(
     demand_id = demand_id or DEFAULT_DEMAND_ID
     demand_id = demand_id or DEFAULT_DEMAND_ID
 
 
     # 加载 prompt
     # 加载 prompt
-    prompt_path = PROJECT_ROOT / "tests" / "content_finder.prompt"
+    prompt_path = PROJECT_ROOT / "content_finder.prompt"
     prompt = SimplePrompt(prompt_path)
     prompt = SimplePrompt(prompt_path)
 
 
     # output 目录
     # output 目录
@@ -52,7 +55,6 @@ async def run_agent(
 
 
     # 初始化配置
     # 初始化配置
     api_key = "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"
     api_key = "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"
-    print(api_key)
     if not api_key:
     if not api_key:
         raise ValueError("OPEN_ROUTER_API_KEY 未设置")
         raise ValueError("OPEN_ROUTER_API_KEY 未设置")
 
 
@@ -60,7 +62,7 @@ async def run_agent(
     model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
     model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
     temperature = float(prompt.config.get("temperature", 0.3))
     temperature = float(prompt.config.get("temperature", 0.3))
     max_iterations = 30
     max_iterations = 30
-    trace_dir = str(PROJECT_ROOT / "tests" / "traces")
+    trace_dir = str(PROJECT_ROOT / "traces")
 
 
     skills_dir = str(PROJECT_ROOT / "skills")
     skills_dir = str(PROJECT_ROOT / "skills")
 
 
@@ -69,13 +71,9 @@ async def run_agent(
     store = FileSystemTraceStore(base_path=trace_dir)
     store = FileSystemTraceStore(base_path=trace_dir)
 
 
     allowed_tools = [
     allowed_tools = [
-        "douyin_search",
-        "douyin_user_videos",
-        "get_content_fans_portrait",
-        "get_account_fans_portrait",
-        "store_results_mysql",
-        "create_crawler_plan_by_douyin_content_id",
-        "create_crawler_plan_by_douyin_account_id",
+        "weixin_search",
+        "fetch_weixin_account",
+        "fetch_account_article_list",
     ]
     ]
 
 
     runner = AgentRunner(
     runner = AgentRunner(

+ 76 - 0
tests/skills/article_finding_strategy.md

@@ -0,0 +1,76 @@
+---
+name: content_finding_strategy
+description: 内容搜索方法论
+---
+
+# 内容搜索方法论
+
+## 核心流程:关键词提取 → 串行搜索 → 结果评估 → 按需补充
+
+---
+
+## 第一步:需求分析与关键词提取
+
+- 从用户需求中提取核心关键词和扩展关键词,优先使用用户原话
+- 按相关性排序:用户明确说的 > 用户暗示的 > 推测的
+- 确定目标数量 **M**(如"找10条",则 M = 10)
+
+---
+
+## 第二步:串行关键词搜索
+
+**数量控制**:只搜索 **N = M × 2** 条,搜到后立即停止,不超出此限制。
+
+**数据读取规则**:
+- 搜索结果从 `metadata.search_results` 获取,**不要解析工具的 output 文本**
+
+**分页策略**:第一次使用默认 cursor(`"0"` 或 `""`),需要更多时使用返回的 cursor 继续获取。
+
+---
+
+## 第三步:数据真实性规范(严格遵守)
+
+**禁止编造数据**,所有字段必须来自工具返回的 metadata。
+
+### 字段完整性要求
+- `url`:文章链接,必须**逐字符完整复制**,不能截断或修改。
+- `title`: 文章标题,必须来自**同一条记录**,不能混用,去掉标题中的 html 符号(如 `<p>`、`</p>` 等)。
+- `title`: 标题中若出现英文双引号(`"`),需要把标题中的双引号换成中文双引号(`“”`)。
+- `statistics.time`: 文章发布时间戳(秒),必须来自**同一条记录**,不能混用。
+### 正确做法
+```python
+item = metadata.search_results[0]
+url = item["url"]
+title = item["title"].replace('"', '“')  # 完整复制符
+```
+
+### 禁止行为
+❌ 编造 url  
+❌ 截断 url  
+❌ 从 output 文本中解析数据  
+❌ 混用不同记录的字段  
+
+**违反后果**:编造数据会导致 404 错误,严重影响用户体验。
+
+---
+
+## 第四步:结果评估与补充
+
+经 `content_filtering_strategy` 筛选后,统计符合要求的内容数量 **C**:
+
+- **C >= M**:完成,进入输出阶段
+- **C < M × 0.8**:内容不足,选下一个关键词,回到第二步
+- **M × 0.8 <= C < M**:接近目标,可选择继续补充或直接输出
+
+---
+
+## 错误处理
+
+| 错误类型 | 处理策略 |
+|---|---|
+| HTTP 502/503/504 | 服务暂时不可用,最多重试 1 次,失败则告知用户 |
+| HTTP 400/404 | 检查参数格式,调整后重试 |
+| Timeout | 重试 1 次,仍超时则告知用户 |
+| 网络错误 | 重试 1-2 次,持续失败则告知用户 |
+
+不要切换到其他平台或工具。

+ 85 - 0
tests/skills/output_schema.md

@@ -0,0 +1,85 @@
+---
+name: output_schema
+description: 输出结果指南
+---
+
+## 输出结果指南
+
+### 输出目录
+输出 JSON 写入到output_dir目录下当次执行的 trace_id 目录内的 `output.json` 文件。
+**获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `output_dir`,再使用 `write_file` 写入 `{output_dir}/{trace_id}/output.json`。
+
+### **输出 JSON Schema**
+
+> ⚠️ 所有字段名必须与下面完全一致,禁止自创字段名(如 `results`、`metrics`、`like_count`、`age_distribution`、`platform` 等)
+
+```json
+{
+  "trace_id": "<由系统生成的真实 trace_id;如果你不知道就填空字符串,程序会覆盖修正>",
+  "query": "<本次任务的 query>",
+  "demand_id": "<来自 user 消息的搜索词 id>",
+  "summary": {
+    "candidate_count": 0,
+    "portrait_content_like_count": 0,
+    "portrait_account_fans_count": 0,
+    "portrait_none_count": 0,
+    "filtered_in_count": 0
+  },
+  "good_account_expansion": {
+    "enabled": false,
+    "accounts": [
+      {
+        "author_nickname": "<作者名>",
+        "author_sec_uid": "<完整 sec_uid>",
+        "age_50_plus_ratio": null,
+        "age_50_plus_tgi": null
+      }
+    ]
+  },
+  "contents": [
+    {
+      "title": "<来自 metadata 的标题/desc>",
+      "aweme_id": "内容id",
+      "rank": 1,
+      "video_url": "https://www.douyin.com/video/<aweme_id>",
+      "author_nickname": "作者名",
+      "author_sec_uid": "作者id",
+      "author_url": "https://www.douyin.com/user/<author_sec_uid>",
+      "statistics": {
+        "digg_count": 0,
+        "comment_count": 0,
+        "share_count": 0
+      },
+      "portrait_data": {
+        "source": "content_like | account_fans | none",
+        "age_50_plus_ratio": null,
+        "age_50_plus_tgi": null,
+        "url": "画像链接"
+      },
+      "reason": "<入选理由>"
+    }
+  ]
+}
+```
+
+### 易错字段说明
+
+| 字段 | 正确写法 | 错误写法(禁止) |
+|---|---|---|
+| 点赞数 | `statistics.digg_count` | `statistics.like_count` / `metrics.likes` |
+| 50岁以上占比 | `portrait_data.age_50_plus_ratio` | `portrait_data.age_distribution["50+"]` |
+| 50岁以上偏好度 | `portrait_data.age_50_plus_tgi` | 任何其他写法 |
+| 画像来源 | `portrait_data.source` 值为 `content_like` / `account_fans` / `none` | `"content"` / `"account"` 等缩写 |
+| 优质账号扩展 | `good_account_expansion` 为**对象**,含 `enabled` + `accounts` | 直接输出为**数组** |
+| 摘要 | `summary` 为**对象**,含 `candidate_count` 等字段 | `summary` 为字符串 |
+
+### portrait_data 字段规则
+
+- `source="content_like"` → `url = https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={aweme_id}`
+- `source="account_fans"` → `url = https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={author_sec_uid}`
+- `source="none"` → `url=null`,`age_50_plus_ratio=null`,`age_50_plus_tgi=null`
+
+## JSON 编写规范
+- 字符串值中若有双引号 `"`,必须写成 `\"`(反斜杠 + 双引号)
+- 若有反斜杠 `\`,必须写成 `\\`
+- 若标题含引号,建议使用中文引号「」避免转义,或严格转义为 \"

+ 12 - 0
tests/tools/__init__.py

@@ -0,0 +1,12 @@
+"""
+工具包初始化
+"""
+
+from .weixin_tools import weixin_search, fetch_weixin_account, fetch_account_article_list, fetch_article_detail
+
+__all__ = [
+    "weixin_search",
+    "fetch_weixin_account",
+    "fetch_account_article_list",
+    "fetch_article_detail",
+]

+ 467 - 0
tests/tools/aigc_platform_api.py

@@ -0,0 +1,467 @@
+"""
+AIGC接口调用
+调用AIGC接口创建爬取计划,绑定生成计划
+"""
+import json
+import logging
+import os
+from datetime import datetime
+from pathlib import Path
+from typing import List, Dict, Union, Tuple, Any
+
+import requests
+
+from agent import ToolResult, tool
+from db import update_content_plan_ids
+
+logger = logging.getLogger(__name__)
+
+AIGC_BASE_URL = "https://aigc-api.aiddit.com"
+CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"
+GET_PRODUCE_PLAN_DETAIL_BY_ID = f"{AIGC_BASE_URL}/aigc/produce/plan/detail"
+PRODUCE_PLAN_SAVE = f"{AIGC_BASE_URL}/aigc/produce/plan/save"
+DEFAULT_TOKEN = "8bf14f27fc3a486788f3383452422d72"
+DEFAULT_TIMEOUT = 60.0
+
+
+def _load_output_json(trace_id: str, output_dir: str) -> Dict[str, Any]:
+    """Load {output_dir}/{trace_id}/output.json."""
+    path = Path(output_dir) / trace_id / "output.json"
+    if not path.exists():
+        raise FileNotFoundError(f"output.json not found: {path}")
+    with path.open("r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def _extract_content_ids(data: Dict[str, Any]) -> List[str]:
+    """Extract aweme_id list from output json."""
+    contents = data.get("contents") or []
+    if not isinstance(contents, list):
+        return []
+    content_ids: List[str] = []
+    for item in contents:
+        if not isinstance(item, dict):
+            continue
+        aweme_id = item.get("aweme_id")
+        if aweme_id is None:
+            continue
+        aweme_id_str = str(aweme_id).strip()
+        if aweme_id_str:
+            content_ids.append(aweme_id_str)
+    return content_ids
+
+
+def _get_produce_plan_ids_from_env() -> List[str]:
+    """Read AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID from env."""
+    raw = os.getenv("AIGC_DEMAND_DOUYIN_CONTENT_PRODUCE_PLAN_ID", "").strip()
+    if not raw:
+        return []
+    # 接口需要 List[str],因此把 env 字段(字符串)包装成 list。
+    return [raw]
+
+
+@tool(description="根据抖音账号ID创建爬取计划")
+async def create_crawler_plan_by_douyin_account_id(
+        account_id: str,
+        sort_type: str = "最新",
+        produce_plan_ids: List[str] = []
+) -> ToolResult:
+    """
+     根据抖音账号ID创建爬取计划
+     Args:
+         account_id: 抖音账号ID
+         sort_type: 搜索时的视频排序方式(最新/最热),默认最新
+         produce_plan_ids: 爬取计划要绑定的生成计划ID,默认为空列表
+
+     Returns:
+         ToolResult: 包含以下内容
+             - output: 文本格式的爬取计划创建结果摘要
+             - metadata.result: 结构化的爬取计划创建结果
+                - crawler_info: 爬取计划信息
+                    - crawler_plan_id: 创建的爬取计划ID
+                    - crawler_plan_name: 创建的爬取计划名称
+                    - sort_type: 排序方式
+                - produce_plan_infos: 绑定的生成计划信息
+                    - produce_plan_id: 生成计划ID
+                    - produce_plan_name: 生成计划名称
+                    - is_success: 是否成功, true表示绑定成功,false表示绑定失败
+                    - msg: 绑定失败时为错误信息,绑定成功则为“成功”
+     Note:
+         - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
+    """
+
+    # 验证 account_id 格式
+    if not account_id or not isinstance(account_id, str):
+        logger.error(f"create_crawler_plan_by_douyin_account_id invalid account_id: {account_id}")
+        return ToolResult(
+            title="根据抖音账号ID创建爬取计划失败",
+            output="",
+            error="account_id 参数无效:必须是非空字符串",
+        )
+
+    if not account_id.startswith("MS4wLjABAAAA"):
+        logger.error(f"create_crawler_plan_by_douyin_account_id invalid sec_uid format account_id:{account_id}")
+        return ToolResult(
+            title="根据抖音账号ID创建爬取计划失败",
+            output="",
+            error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
+        )
+
+    if produce_plan_ids is None:
+        produce_plan_ids = []
+
+    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    crawler_plan_name = f"【内容寻找Agent自动创建】{dt}_抖音账号ID爬取计划_{account_id[:min(30, len(account_id))]}"
+    params = {
+        "accountFilters": [],
+        "channel": 2,
+        "contentFilters": [],
+        "contentModal": 4,
+        "crawlerComment": 0,
+        "crawlerMode": 4,
+        "filterAccountMatchMode": 2,
+        "filterContentMatchMode": 2,
+        "frequencyType": 1,
+        "inputModeValues": [
+            account_id
+        ],
+        "modelValueConfig": {
+            "sortType": sort_type
+        },
+        "name": crawler_plan_name,
+        "planType": 2,
+        "searchModeValues": [],
+        "selectModeValues": [],
+        "srtExtractFlag": 1,
+        "videoKeyFrameType": 1,
+        "voiceExtractFlag": 1
+    }
+
+    try:
+
+        summary_lines = [f"抖音账号【{account_id}】创建爬取计划"]
+
+        response_json = post(CRAWLER_PLAN_CREATE_URL, params)
+        if response_json.get("code") != 0:
+            return ToolResult(
+                title="根据抖音账号ID创建爬取计划失败",
+                output=response_json.get("msg", "接口异常"),
+                error=f"create crawler plan interface error",
+            )
+
+        crawler_plan_id = response_json.get("data", {}).get("id", "")
+        summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
+        summary_lines.append(f"    抖音账号ID: {account_id}")
+        summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
+        summary_lines.append(f"    爬取计划排序方式: {sort_type}")
+        produce_plan_infos: List[Dict[str, str]] = []
+        if produce_plan_ids:
+            input_source_info = {
+                "contentType": 1,
+                "inputSourceType": 2,
+                "inputSourceValue": crawler_plan_id,
+                "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
+                "inputSourceModal": 4,
+                "inputSourceChannel": 2
+            }
+            produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
+            if produce_plan_infos:
+                for produce_plan_info in produce_plan_infos:
+                    summary_lines.append("    绑定的生成计划列表: ")
+                    summary_lines.append(f"        生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
+                    summary_lines.append(f"            生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
+                    summary_lines.append(f"            绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
+                    summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
+
+        return ToolResult(
+            title="根据抖音账号ID创建爬取计划",
+            output="\n".join(summary_lines),
+            metadata={
+                "result": {
+                    "crawler_info": {
+                        "crawler_plan_id": crawler_plan_id,
+                        "crawler_plan_name": crawler_plan_name,
+                        "sort_type": sort_type,
+                    },
+                    "produce_plan_infos": [
+                        {
+                            "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
+                            "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
+                            "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
+                            "msg": produce_plan_info.get("msg", "成功"),
+                        }
+                        for produce_plan_info in produce_plan_infos
+                    ]
+                }
+            },
+            long_term_memory="Create crawler plan by DouYin Account ID",
+        )
+    except Exception as e:
+        logger.error(f"create douyin account crawler plan error: {str(e)}, account_id: {account_id} ")
+        return ToolResult(
+            title="根据抖音账号ID创建爬取计划失败",
+            output="",
+            error=f"创建爬取计划错误:{str(e)}",
+        )
+
+
+@tool(description="根据抖音视频ID创建爬取计划")
+async def create_crawler_plan_by_douyin_content_id(
+        trace_id: str,
+) -> ToolResult:
+    """
+    根据抖音视频ID创建爬取计划
+    Args:
+        trace_id: 内容寻找任务 trace_id(用于读取 {output_dir}/{trace_id}/output.json)
+    Returns:
+             Returns:
+         ToolResult: 包含以下内容
+             - output: 文本格式的爬取计划创建结果摘要
+             - metadata.result: 结构化的爬取计划创建结果
+                - crawler_info: 爬取计划信息
+                    - crawler_plan_id: 创建的爬取计划ID
+                    - crawler_plan_name: 创建的爬取计划名称
+                    - content_ids: 抖音视频ID列表
+                - produce_plan_infos: 绑定的生成计划信息
+                    - produce_plan_id: 生成计划ID
+                    - produce_plan_name: 生成计划名称
+                    - is_success: 是否成功, true表示绑定成功,false表示绑定失败
+                    - msg: 绑定失败时为错误信息,绑定成功则为“成功”
+    Note:
+        - 建议从 metadata.result 获取结构化数据,而非解析 output 文本
+    """
+    if not trace_id or not isinstance(trace_id, str):
+        logger.error(f"create_crawler_plan_by_douyin_content_id invalid trace_id: {trace_id}")
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error="trace_id 参数无效: trace_id 必须是非空字符串",
+        )
+
+    output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+    try:
+        data = _load_output_json(trace_id=trace_id, output_dir=output_dir)
+        content_ids = _extract_content_ids(data)
+    except Exception as e:
+        msg = f"加载/解析 output.json 失败: {e}"
+        logger.error(msg, exc_info=True)
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error=msg,
+        )
+
+    if not content_ids:
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error="未在 output.json.contents 中找到有效 aweme_id",
+        )
+    if len(content_ids) > 100:
+        logger.error(
+            "create_crawler_plan_by_douyin_content_id invalid content_ids length. "
+            f"content_ids.length: {len(content_ids)}"
+        )
+        return ToolResult(
+            title="根据抖音内容创建爬取计划失败",
+            output="",
+            error=f"content_ids 长度异常: 期望1~100, 实际{len(content_ids)}",
+        )
+
+    produce_plan_ids = _get_produce_plan_ids_from_env()
+    dt = datetime.now().strftime("%Y%m%d%h%M%s")
+    crawler_plan_name = f"【内容寻找Agent自动创建】抖音视频直接抓取-{dt}-抖音"
+    params = {
+        "channel": 2,
+        "contentModal": 4,
+        "crawlerComment": 0,
+        "crawlerMode": 5,
+        "filterAccountMatchMode": 2,
+        "filterContentMatchMode": 2,
+        "frequencyType": 2,
+        "inputModeValues": content_ids,
+        "name": crawler_plan_name,
+        "planType": 2,
+        "searchModeValues": [],
+        "srtExtractFlag": 1,
+        "videoKeyFrameType": 1,
+        "voiceExtractFlag": 1
+    }
+
+    try:
+        summary_lines = [f"抖音视频爬取计划"]
+
+        response_json = post(CRAWLER_PLAN_CREATE_URL, params)
+        if response_json.get("code") != 0:
+            return ToolResult(
+                title="根据抖音内容ID创建爬取计划失败",
+                output=response_json.get("msg", "接口异常"),
+                error=f"create crawler plan interface error",
+            )
+
+        crawler_plan_id = response_json.get("data", {}).get("id", "")
+        summary_lines.append(f"爬取计划名称: {crawler_plan_name}")
+        summary_lines.append(f"    抖音视频IDs: {','.join(content_ids)}")
+        summary_lines.append(f"    爬取计划ID: {crawler_plan_id}")
+        produce_plan_infos: List[Dict[str, str]] = []
+        db_updated_rows = 0
+        # 环境里的生成计划 ID(字符串);与是否执行绑定接口无关,用于写库
+        env_produce_plan_id = (produce_plan_ids[0] if produce_plan_ids else "").strip()
+
+        if produce_plan_ids:
+            input_source_info = {
+                "contentType": 1,
+                "inputSourceType": 2,
+                "inputSourceValue": crawler_plan_id,
+                "inputSourceLabel": f"原始帖子-视频-抖音-内容添加计划-{crawler_plan_name}",
+                "inputSourceModal": 4,
+                "inputSourceChannel": 2
+            }
+            produce_plan_infos, msg = crawler_plan_bind_produce_plan(input_source_info, produce_plan_ids)
+            if produce_plan_infos:
+                for produce_plan_info in produce_plan_infos:
+                    summary_lines.append("    绑定的生成计划列表: ")
+                    summary_lines.append(f"        生成计划名称: {produce_plan_info.get('produce_plan_name', '')}")
+                    summary_lines.append(f"            生成计划ID: {produce_plan_info.get('produce_plan_id', '')}")
+                    summary_lines.append(f"            绑定结果: {'绑定成功' if not produce_plan_info.get('msg') else '绑定失败'}")
+                    summary_lines.append(f"            信息: {produce_plan_info.get('msg', '成功')}")
+
+        # 爬取计划 id 与生成计划 id 任一存在则写库(不依赖是否已配置 produce_plan_ids 去走绑定)
+        if (crawler_plan_id or "").strip() or env_produce_plan_id:
+            try:
+                db_updated_rows = update_content_plan_ids(
+                    trace_id=trace_id,
+                    aweme_ids=content_ids,
+                    crawler_plan_id=crawler_plan_id or "",
+                    produce_plan_id=env_produce_plan_id,
+                )
+            except Exception as e:
+                logger.error(f"update content plan ids failed: {e}", exc_info=True)
+
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划",
+            output="\n".join(summary_lines),
+            metadata={
+                "result": {
+                    "crawler_info": {
+                        "crawler_plan_id": crawler_plan_id,
+                        "crawler_plan_name": crawler_plan_name,
+                    },
+                    "produce_plan_infos": [
+                        {
+                            "produce_plan_id": produce_plan_info.get("produce_plan_id", ""),
+                            "produce_plan_name": produce_plan_info.get("produce_plan_name", ""),
+                            "is_success": "绑定成功" if not produce_plan_info.get("msg") else "绑定失败",
+                            "msg": produce_plan_info.get("msg", "成功"),
+                        }
+                        for produce_plan_info in produce_plan_infos
+                    ]
+                },
+                "db": {"updated_rows": db_updated_rows},
+            },
+            long_term_memory="Create crawler plan by DouYin Content IDs",
+        )
+    except Exception as e:
+        logger.error(f"create douyin content crawler plan error. content_ids: {content_ids}, error: {str(e)}")
+        return ToolResult(
+            title="根据抖音内容ID创建爬取计划失败",
+            output="",
+            error=f"创建爬取计划错误:{str(e)}",
+        )
+
+
+def crawler_plan_bind_produce_plan(
+        input_source_info: Dict[str, Any],
+        produce_plan_ids: List[str],
+) -> Tuple[Union[List[Dict[str, str]], None], str]:
+    if not input_source_info or not produce_plan_ids:
+        return None, f"input_source_info or produce_plan_ids is invalid"
+    input_source_check_key = ["inputSourceModal", "inputSourceChannel", "contentType"]
+    try:
+        if not isinstance(produce_plan_ids, list):
+            return None, f"produce_plan_ids is not list"
+        result: List[Dict[str, str]] = []
+        for produce_plan_id in produce_plan_ids:
+            produce_plan_info = {
+                "produce_plan_id": produce_plan_id,
+            }
+            result.append(produce_plan_info)
+            # 获取生成计划详情,msg不为空表示获取失败
+            produce_plan_detail_info, msg = find_produce_plan_info_by_id(produce_plan_id)
+            if msg:
+                produce_plan_info["msg"] = msg
+                continue
+
+            produce_plan_info["produce_plan_name"] = produce_plan_detail_info.get("name", "")
+
+            input_source_groups = produce_plan_detail_info.get("inputSourceGroups", [])
+            if not input_source_groups:
+                produce_plan_info["msg"] = "生成计划没有输入源组"
+                continue
+            # 查询当前爬取计划要添加到的输入源组下标
+            input_source_index = 0
+            for i in range(len(input_source_groups)):
+                input_source_group = input_source_groups[i]
+                if not input_source_group.get("inputSources", []):
+                    continue
+                first_input_source = input_source_group.get("inputSources")[0]
+                if all(input_source_info.get(k, 0) == first_input_source.get(k, -1) for k in input_source_check_key):
+                    input_source_index = i
+                    break
+
+            # 对应的输入源组添加输入源
+            input_source_group = input_source_groups[input_source_index]
+            input_source_group.get("inputSources", []).append(input_source_info)
+
+            response_json = post(PRODUCE_PLAN_SAVE, produce_plan_detail_info)
+            if response_json.get("code") != 0 or not response_json.get("data", {}):
+                produce_plan_info["msg"] = response_json.get("msg", "爬取计划绑定生成计划异常")
+
+        return result, ""
+    except Exception as e:
+        logger.error(f"crawler_plan_bind_produce_plan error. input_source_info: {json.dumps(input_source_info)}, produce_plan_ids: {produce_plan_ids}, error: {str(e)},")
+        return None, str(e)
+
+
+def find_produce_plan_info_by_id(
+        produce_plan_id: str,
+) -> Tuple[Union[Dict[str, str], None], str]:
+    try:
+        if not produce_plan_id or not isinstance(produce_plan_id, str):
+            return None, f"非法的produce_plan_id: {produce_plan_id}"
+
+        params = {
+            "id": produce_plan_id,
+        }
+        response_json = post(GET_PRODUCE_PLAN_DETAIL_BY_ID, params)
+
+        if response_json.get("code") != 0 or not response_json.get("data", {}):
+            return None, response_json.get("msg", "获取生成计划详情异常")
+
+        return response_json.get("data", {}), ""
+    except Exception as e:
+        logger.error(f"find_produce_plan_info_by_id error. produce_plan_id: {produce_plan_id}, error: {str(e)},")
+        return None, str(e)
+
+
+def post(url: str, params: Any) -> Dict[str, Any]:
+    request = {
+        "baseInfo": {
+            "token": DEFAULT_TOKEN,
+        },
+        "params": params
+    }
+    try:
+        response = requests.post(
+            url=url,
+            json=request,
+            headers={"Content-Type": "application/json"},
+            timeout=DEFAULT_TIMEOUT
+        )
+        response.raise_for_status()
+        response_json = response.json()
+
+        return response_json
+    except Exception as e:
+        logger.error(f"invoke aigc platform error. url: {url}, request: {json.dumps(request)}, error: {str(e)}")
+    return {}

+ 199 - 0
tests/tools/douyin_user_videos.py

@@ -0,0 +1,199 @@
+"""
+抖音账号历史作品工具(示例)
+
+调用内部爬虫服务获取指定账号的历史作品列表。
+"""
+import asyncio
+import logging
+import time
+from typing import Optional
+
+import requests
+
+from agent.tools import tool, ToolResult
+
+logger = logging.getLogger(__name__)
+
+
+# API 基础配置
+DOUYIN_BLOGGER_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/blogger"
+DEFAULT_TIMEOUT = 60.0
+
+
+@tool(description="根据账号ID获取抖音历史作品,支持排序与游标")
+async def douyin_user_videos(
+    account_id: str,
+    sort_type: str = "最新",
+    cursor: str = "",
+    timeout: Optional[float] = None,
+) -> ToolResult:
+    """
+    抖音账号历史作品查询
+
+    获取指定抖音账号的历史作品列表,支持排序和分页。
+
+    Args:
+        account_id: 抖音账号ID(使用 author.sec_uid)
+        sort_type: 排序方式(可选:最新/最热,默认 "最新")
+        cursor: 分页游标,用于获取下一页结果,默认 ""
+        timeout: 超时时间(秒),默认 60
+
+    Returns:
+        ToolResult: 包含以下内容:
+            - output: 文本格式的作品列表摘要(显示前5条)
+            - metadata.user_videos: 结构化的作品列表(与 search_results 格式一致)
+                - aweme_id: 视频ID
+                - desc: 视频描述(最多100字符)
+                - author: 作者信息
+                    - nickname: 作者昵称
+                    - sec_uid: 作者ID(完整,约80字符)
+                - statistics: 统计数据
+                    - digg_count: 点赞数
+                    - comment_count: 评论数
+                    - share_count: 分享数
+            - metadata.raw_data: 原始 API 返回数据
+
+    Note:
+        - account_id 参数使用 author.sec_uid(约80字符)
+        - 使用 cursor 参数可以获取下一页结果
+        - 建议从 metadata.user_videos 获取结构化数据
+        - user_videos 与 search_results 格式完全一致,可使用相同的处理逻辑
+    """
+    start_time = time.time()
+
+    try:
+        payload = {
+            "account_id": account_id,
+            "sort_type": sort_type,
+            "cursor": cursor,
+        }
+
+        request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+
+        response = requests.post(
+            DOUYIN_BLOGGER_API,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=request_timeout
+        )
+        response.raise_for_status()
+        data = response.json()
+
+        # 格式化输出摘要
+        summary_lines = [f"账号 {account_id} 的作品列表"]
+
+        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
+        has_more = data_block.get("has_more", False)
+        cursor_value = data_block.get("next_cursor", "")
+
+        summary_lines.append(f"找到 {len(items)} 个作品" + (f",还有更多(cursor={cursor_value})" if has_more else ""))
+        summary_lines.append("")
+
+        # 显示前5条
+        for i, item in enumerate(items[:5], 1):
+            aweme_id = item.get("aweme_id", "unknown")
+            desc = (item.get("desc") or item.get("item_title") or "无标题")[:50]
+
+            author = item.get("author", {})
+            author_name = author.get("nickname", "未知作者")
+            author_id = author.get("sec_uid", "")
+
+            stats = item.get("statistics", {})
+            digg_count = stats.get("digg_count", 0)
+            comment_count = stats.get("comment_count", 0)
+            share_count = stats.get("share_count", 0)
+
+            summary_lines.append(f"{i}. {desc}")
+            summary_lines.append(f"   ID: {aweme_id}")
+            summary_lines.append(f"   链接: https://www.douyin.com/video/{aweme_id}")
+            summary_lines.append(f"   作者: {author_name}")
+            summary_lines.append(f"   sec_uid: {author_id}")
+            summary_lines.append(f"   数据: 点赞 {digg_count:,} | 评论 {comment_count:,} | 分享 {share_count:,}")
+            summary_lines.append("")
+
+        if len(items) > 5:
+            summary_lines.append(f"... 还有 {len(items) - 5} 条结果")
+
+        duration_ms = int((time.time() - start_time) * 1000)
+        logger.info(
+            "douyin_user_videos completed",
+            extra={
+                "account_id": account_id,
+                "results_count": len(items),
+                "has_more": has_more,
+                "cursor": cursor_value,
+                "duration_ms": duration_ms
+            }
+        )
+
+        return ToolResult(
+            title=f"账号作品: {account_id}",
+            output="\n".join(summary_lines),
+            long_term_memory=f"Fetched {len(items)} videos for account '{account_id}'",
+            metadata={
+                "raw_data": data,
+                "user_videos": [  # 结构化数据,与 search_results 保持一致
+                    {
+                        "aweme_id": item.get("aweme_id"),
+                        "desc": (item.get("desc") or item.get("item_title") or "无标题")[:100],
+                        "author": {
+                            "nickname": item.get("author", {}).get("nickname", "未知作者"),
+                            "sec_uid": item.get("author", {}).get("sec_uid", ""),
+                        },
+                        "statistics": {
+                            "digg_count": item.get("statistics", {}).get("digg_count", 0),
+                            "comment_count": item.get("statistics", {}).get("comment_count", 0),
+                            "share_count": item.get("statistics", {}).get("share_count", 0),
+                        }
+                    }
+                    for item in items
+                ]
+            }
+        )
+    except requests.exceptions.HTTPError as e:
+        logger.error(
+            "douyin_user_videos HTTP error",
+            extra={
+                "account_id": account_id,
+                "status_code": e.response.status_code,
+                "error": str(e)
+            }
+        )
+        return ToolResult(
+            title="账号作品获取失败",
+            output="",
+            error=f"HTTP {e.response.status_code}: {e.response.text}",
+        )
+    except requests.exceptions.Timeout:
+        logger.error("douyin_user_videos timeout", extra={"account_id": account_id, "timeout": request_timeout})
+        return ToolResult(
+            title="账号作品获取失败",
+            output="",
+            error=f"请求超时({request_timeout}秒)",
+        )
+    except requests.exceptions.RequestException as e:
+        logger.error("douyin_user_videos network error", extra={"account_id": account_id, "error": str(e)})
+        return ToolResult(
+            title="账号作品获取失败",
+            output="",
+            error=f"网络错误: {str(e)}",
+        )
+    except Exception as e:
+        logger.error("douyin_user_videos unexpected error", extra={"account_id": account_id, "error": str(e)}, exc_info=True)
+        return ToolResult(
+            title="账号作品获取失败",
+            output="",
+            error=f"未知错误: {str(e)}",
+        )
+
+async def main():
+    result = await douyin_user_videos(
+        account_id="MS4wLjABAAAAPRCMGPAFM1VGcJrxRuvTXgJp0Sk95EW1DynNmbKSPg8",
+        sort_type="最新",
+        cursor=""
+    )
+    print(result.output)
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 404 - 0
tests/tools/hotspot_profile.py

@@ -0,0 +1,404 @@
+"""
+热点宝画像数据工具(示例)
+
+调用内部爬虫服务获取账号/内容的粉丝画像。
+"""
+import asyncio
+import logging
+import time
+from typing import Optional, Dict, Any, List, Tuple
+
+import requests
+
+from agent.tools import tool, ToolResult
+
+logger = logging.getLogger(__name__)
+
+
+ACCOUNT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/account_fans_portrait"
+CONTENT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/video_like_portrait"
+DEFAULT_TIMEOUT = 60.0
+
+
+@tool(description="获取抖音账号粉丝画像(热点宝),支持选择画像维度")
+async def get_account_fans_portrait(
+    account_id: str,
+    need_province: bool = False,
+    need_city: bool = False,
+    need_city_level: bool = False,
+    need_gender: bool = False,
+    need_age: bool = True,
+    need_phone_brand: bool = False,
+    need_phone_price: bool = False,
+    timeout: Optional[float] = None,
+) -> ToolResult:
+    """
+    获取抖音账号粉丝画像(热点宝数据)
+
+    获取指定账号的粉丝画像数据,包括年龄、性别、地域等多个维度。
+
+    Args:
+        account_id: 抖音账号ID(使用 author.sec_uid)
+        need_province: 是否获取省份分布,默认 False
+        need_city: 是否获取城市分布,默认 False
+        need_city_level: 是否获取城市等级分布(一线/新一线/二线等),默认 False
+        need_gender: 是否获取性别分布,默认 False
+        need_age: 是否获取年龄分布,默认 True
+        need_phone_brand: 是否获取手机品牌分布,默认 False
+        need_phone_price: 是否获取手机价格分布,默认 False
+        timeout: 超时时间(秒),默认 60
+
+    Returns:
+        ToolResult: 包含以下内容:
+            - output: 文本格式的画像摘要
+            - metadata.has_portrait: 布尔值,表示是否有有效画像数据
+                - True: 有有效画像数据
+                - False: 无画像数据
+            - metadata.portrait_data: 结构化的画像数据(字典格式)
+                - 键: 维度名称(如 "年龄"、"性别")
+                - 值: 该维度的分布数据(字典)
+                    - percentage: 占比(如 "48.35%")
+                    - preference: 偏好度/TGI(如 "210.05")
+            - metadata.raw_data: 原始 API 返回数据
+
+    Note:
+        - account_id 参数使用 author.sec_uid(约80字符)
+        - 默认只返回年龄分布,需要其他维度时设置对应参数为 True
+        - 省份数据只显示 TOP5
+        - 偏好度(TGI)说明:
+            - > 100: 该人群偏好高于平均水平
+            - = 100: 平均水平
+            - < 100: 低于平均水平
+        - 使用 metadata.has_portrait 判断画像是否有效,不要解析 output 文本
+        - 从 metadata.portrait_data 获取结构化画像数据
+    """
+    start_time = time.time()
+
+    # 验证 account_id 格式
+    if not account_id or not isinstance(account_id, str):
+        logger.error("get_account_fans_portrait invalid account_id", extra={"account_id": account_id})
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error="account_id 参数无效:必须是非空字符串",
+        )
+
+    if not account_id.startswith("MS4wLjABAAAA"):
+        logger.error("get_account_fans_portrait invalid sec_uid format", extra={"account_id": account_id})
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error=f"account_id 格式错误:必须以 MS4wLjABAAAA 开头,当前值: {account_id[:min(20, len(account_id))]}...",
+        )
+
+    # if len(account_id) < 70 or len(account_id) > 90:
+    #     logger.error("get_account_fans_portrait invalid sec_uid length", extra={"account_id": account_id, "length": len(account_id)})
+    #     return ToolResult(
+    #         title="账号粉丝画像获取失败",
+    #         output="",
+    #         error=f"account_id 长度异常:期望 70-90 字符,实际 {len(account_id)} 字符。这可能是编造或截断的数据。",
+    #     )
+
+    try:
+        payload = {
+            "account_id": account_id,
+            "need_province": need_province,
+            "need_city": need_city,
+            "need_city_level": need_city_level,
+            "need_gender": need_gender,
+            "need_age": need_age,
+            "need_phone_brand": need_phone_brand,
+            "need_phone_price": need_phone_price,
+        }
+
+        request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+
+        response = requests.post(
+            ACCOUNT_FANS_PORTRAIT_API,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=request_timeout
+        )
+        response.raise_for_status()
+        data = response.json()
+
+        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
+
+        # 格式化输出摘要
+        summary_lines = [f"账号 {account_id} 的粉丝画像"]
+        summary_lines.append(f"画像链接:https://douhot.douyin.com/creator/detail?active_tab=creator_fans_portrait&creator_id={account_id}")
+        summary_lines.append("")
+        for k, v in portrait.items():
+            if not isinstance(v, dict):
+                continue
+            if k in ("省份", "城市"):
+                summary_lines.append(f"【{k} TOP5】分布")
+                items = _top_k(v, 5)
+            else:
+                summary_lines.append(f"【{k}】分布")
+                items = v.items()
+
+            for name, metrics in items:
+                ratio = metrics.get("percentage")
+                tgi = metrics.get("preference")
+                summary_lines.append(f"  {name}: {ratio} (偏好度: {tgi})")
+            summary_lines.append("")
+
+        duration_ms = int((time.time() - start_time) * 1000)
+        has_valid_portrait = bool(portrait and any(
+            isinstance(v, dict) and v for v in portrait.values()
+        ))
+
+        logger.info(
+            "get_account_fans_portrait completed",
+            extra={
+                "account_id": account_id,
+                "has_portrait": has_valid_portrait,
+                "portrait_dimensions": list(portrait.keys()) if portrait else [],
+                "duration_ms": duration_ms
+            }
+        )
+
+        return ToolResult(
+            title=f"账号粉丝画像: {account_id}",
+            output="\n".join(summary_lines),
+            long_term_memory=f"Fetched fans portrait for account '{account_id}'",
+            metadata={
+                "raw_data": data,
+                "has_portrait": has_valid_portrait,
+                "portrait_data": portrait
+            }
+        )
+    except requests.exceptions.HTTPError as e:
+        logger.error(
+            "get_account_fans_portrait HTTP error",
+            extra={
+                "account_id": account_id,
+                "status_code": e.response.status_code,
+                "error": str(e)
+            }
+        )
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error=f"HTTP {e.response.status_code}: {e.response.text}",
+        )
+    except requests.exceptions.Timeout:
+        logger.error("get_account_fans_portrait timeout", extra={"account_id": account_id, "timeout": request_timeout})
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error=f"请求超时({request_timeout}秒)",
+        )
+    except requests.exceptions.RequestException as e:
+        logger.error("get_account_fans_portrait network error", extra={"account_id": account_id, "error": str(e)})
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error=f"网络错误: {str(e)}",
+        )
+    except Exception as e:
+        logger.error("get_account_fans_portrait unexpected error", extra={"account_id": account_id, "error": str(e)}, exc_info=True)
+        return ToolResult(
+            title="账号粉丝画像获取失败",
+            output="",
+            error=f"未知错误: {str(e)}",
+        )
+
+
+@tool(description="获取抖音内容点赞用户画像(热点宝),支持选择画像维度")
+async def get_content_fans_portrait(
+    content_id: str,
+    need_province: bool = False,
+    need_city: bool = False,
+    need_city_level: bool = False,
+    need_gender: bool = False,
+    need_age: bool = True,
+    need_phone_brand: bool = False,
+    need_phone_price: bool = False,
+    timeout: Optional[float] = None,
+) -> ToolResult:
+    """
+    获取抖音内容点赞用户画像(热点宝数据)
+
+    获取指定视频内容的点赞用户画像数据,包括年龄、性别、地域等多个维度。
+
+    Args:
+        content_id: 抖音内容ID(使用 aweme_id)
+        need_province: 是否获取省份分布,默认 False
+        need_city: 是否获取城市分布,默认 False
+        need_city_level: 是否获取城市等级分布(一线/新一线/二线等),默认 False
+        need_gender: 是否获取性别分布,默认 False
+        need_age: 是否获取年龄分布,默认 True
+        need_phone_brand: 是否获取手机品牌分布,默认 False
+        need_phone_price: 是否获取手机价格分布,默认 False
+        timeout: 超时时间(秒),默认 60
+
+    Returns:
+        ToolResult: 包含以下内容:
+            - output: 文本格式的画像摘要
+            - metadata.has_portrait: 布尔值,表示是否有有效画像数据
+                - True: 有有效画像数据
+                - False: 无画像数据(需要使用账号画像兜底)
+            - metadata.portrait_data: 结构化的画像数据(字典格式)
+                - 键: 维度名称(如 "年龄"、"性别")
+                - 值: 该维度的分布数据(字典)
+                    - percentage: 占比(如 "48.35%")
+                    - preference: 偏好度/TGI(如 "210.05")
+            - metadata.raw_data: 原始 API 返回数据
+
+    Note:
+        - content_id 参数使用 aweme_id
+        - 默认只返回年龄分布,需要其他维度时设置对应参数为 True
+        - 省份数据只显示 TOP5
+        - 偏好度(TGI)说明:
+            - > 100: 该人群偏好高于平均水平
+            - = 100: 平均水平
+            - < 100: 低于平均水平
+        - 使用 metadata.has_portrait 判断画像是否有效,不要解析 output 文本
+        - 如果 has_portrait 为 False,应使用 get_account_fans_portrait 作为兜底
+        - 从 metadata.portrait_data 获取结构化画像数据
+    """
+    start_time = time.time()
+
+    # 验证 content_id 格式
+    if not content_id or not isinstance(content_id, str):
+        logger.error("get_content_fans_portrait invalid content_id", extra={"content_id": content_id})
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error="content_id 参数无效:必须是非空字符串",
+        )
+
+    # aweme_id 应该是纯数字字符串,长度约 19 位
+    if not content_id.isdigit():
+        logger.error("get_content_fans_portrait invalid aweme_id format", extra={"content_id": content_id})
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"content_id 格式错误:aweme_id 应该是纯数字,当前值: {content_id[:20]}...",
+        )
+
+    if len(content_id) < 15 or len(content_id) > 25:
+        logger.error("get_content_fans_portrait invalid aweme_id length", extra={"content_id": content_id, "length": len(content_id)})
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"content_id 长度异常:期望 15-25 位数字,实际 {len(content_id)} 位",
+        )
+
+    try:
+        payload = {
+            "content_id": content_id,
+            "need_province": need_province,
+            "need_city": need_city,
+            "need_city_level": need_city_level,
+            "need_gender": need_gender,
+            "need_age": need_age,
+            "need_phone_brand": need_phone_brand,
+            "need_phone_price": need_phone_price,
+        }
+
+        request_timeout = timeout if timeout is not None else DEFAULT_TIMEOUT
+
+        response = requests.post(
+            CONTENT_FANS_PORTRAIT_API,
+            json=payload,
+            headers={"Content-Type": "application/json"},
+            timeout=request_timeout
+        )
+        response.raise_for_status()
+        data = response.json()
+
+        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        portrait = data_block.get("data", {}) if isinstance(data_block.get("data"), dict) else {}
+
+        # 格式化输出摘要
+        summary_lines = [f"内容 {content_id} 的点赞用户画像"]
+        summary_lines.append(f"画像链接:https://douhot.douyin.com/video/detail?active_tab=video_fans&video_id={content_id}")
+        summary_lines.append("")
+
+        for k, v in portrait.items():
+            if not isinstance(v, dict):
+                continue
+            if k in ("省份", "城市"):
+                summary_lines.append(f"【{k} TOP5】分布")
+                items = _top_k(v, 5)
+            else:
+                summary_lines.append(f"【{k}】分布")
+                items = v.items()
+
+            for name, metrics in items:
+                ratio = metrics.get("percentage")
+                tgi = metrics.get("preference")
+                summary_lines.append(f"  {name}: {ratio} (偏好度: {tgi})")
+            summary_lines.append("")
+
+        duration_ms = int((time.time() - start_time) * 1000)
+        has_valid_portrait = bool(portrait and any(
+            isinstance(v, dict) and v for v in portrait.values()
+        ))
+
+        logger.info(
+            "get_content_fans_portrait completed",
+            extra={
+                "content_id": content_id,
+                "has_portrait": has_valid_portrait,
+                "portrait_dimensions": list(portrait.keys()) if portrait else [],
+                "duration_ms": duration_ms
+            }
+        )
+
+        return ToolResult(
+            title=f"内容点赞用户画像: {content_id}",
+            output="\n".join(summary_lines),
+            long_term_memory=f"Fetched fans portrait for content '{content_id}'",
+            metadata={
+                "raw_data": data,
+                "has_portrait": has_valid_portrait,
+                "portrait_data": portrait
+            }
+        )
+    except requests.exceptions.HTTPError as e:
+        logger.error(
+            "get_content_fans_portrait HTTP error",
+            extra={
+                "content_id": content_id,
+                "status_code": e.response.status_code,
+                "error": str(e)
+            }
+        )
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"HTTP {e.response.status_code}: {e.response.text}",
+        )
+    except requests.exceptions.Timeout:
+        logger.error("get_content_fans_portrait timeout", extra={"content_id": content_id, "timeout": request_timeout})
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"请求超时({request_timeout}秒)",
+        )
+    except requests.exceptions.RequestException as e:
+        logger.error("get_content_fans_portrait network error", extra={"content_id": content_id, "error": str(e)})
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"网络错误: {str(e)}",
+        )
+    except Exception as e:
+        logger.error("get_content_fans_portrait unexpected error", extra={"content_id": content_id, "error": str(e)}, exc_info=True)
+        return ToolResult(
+            title="内容点赞用户画像获取失败",
+            output="",
+            error=f"未知错误: {str(e)}",
+        )
+
+def _top_k(items: Dict[str, Any], k: int) -> List[Tuple[str, Any]]:
+    def percent_value(entry: Tuple[str, Any]) -> float:
+        metrics = entry[1] if isinstance(entry[1], dict) else {}
+        return metrics.get("percentage")
+
+    return sorted(items.items(), key=percent_value, reverse=True)[:k]

+ 100 - 0
tests/tools/store_results_mysql.py

@@ -0,0 +1,100 @@
+"""
+将推荐结果写入 MySQL(优质作者表 + 内容表)。
+
+约定:
+- 输入参数:trace_id(字符串)
+- 数据来源:{TRACE_DIR}/{trace_id}/output.json
+- 表结构:good_authors, contents(字段见下面 SQL 注释)
+"""
+import asyncio
+import json
+import logging
+import os
+from pathlib import Path
+from typing import Any, Dict
+
+from agent.tools import tool, ToolResult
+
+from db import get_connection, insert_contents, upsert_good_authors
+
+logger = logging.getLogger(__name__)
+
+
+def _load_output(trace_id: str) -> Dict[str, Any]:
+    """从 {output_dir}/{trace_id}/output.json 读取输出数据。"""
+    output_dir = Path(os.getenv("OUTPUT_DIR", ".cache/output"))
+    path = output_dir / trace_id / "output.json"
+
+    if not path.exists():
+        raise FileNotFoundError(f"output.json not found for output_dir={output_dir}: {path}")
+
+    with path.open("r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+@tool(description="将推荐结果写入 MySQL")
+async def store_results_mysql(trace_id: str) -> ToolResult:
+    """
+    根据 trace_id 读取 output.json,并写入 MySQL。
+    demand_content_id 从 output 的 demand_id 字段获取,需在 output_schema 中输出。
+    """
+    try:
+        data = _load_output(trace_id)
+    except Exception as e:
+        msg = f"加载 output.json 失败: {e}"
+        logger.error(msg)
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": str(e)})
+
+    demand_content_id = data.get("demand_id")
+    if demand_content_id is not None and not isinstance(demand_content_id, int):
+        try:
+            demand_content_id = int(demand_content_id)
+        except (ValueError, TypeError):
+            demand_content_id = None
+    if demand_content_id is None:
+        msg = "demand_id 必填:请在 output 的 demand_id 字段中输出(来自 user 消息的搜索词 id)"
+        logger.error(msg)
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": msg})
+
+    conn = None
+    try:
+        conn = get_connection()
+        good_block = data.get("good_account_expansion")
+        contents = data.get("contents") or []
+        query = data.get("query") or ""
+
+        authors_rows = upsert_good_authors(conn, trace_id, good_block)
+        contents_rows = insert_contents(conn, trace_id, query, demand_content_id, contents)
+
+        output = (
+            f"MySQL 写入完成:demand_find_author 影响行数={authors_rows}, "
+            f"demand_find_content_result 插入条数={contents_rows}"
+        )
+        logger.info(output)
+        return ToolResult(
+            title="存储推荐结果",
+            output=output,
+            metadata={
+                "ok": True,
+                "trace_id": trace_id,
+                "good_authors_affected": authors_rows,
+                "contents_inserted": contents_rows,
+            },
+        )
+    except Exception as e:
+        msg = f"写入 MySQL 失败: {e}"
+        logger.error(msg, exc_info=True)
+        return ToolResult(title="存储推荐结果", output=msg, metadata={"ok": False, "error": str(e)})
+    finally:
+        if conn is not None:
+            conn.close()
+
+async def main():
+    result = await store_results_mysql(
+        trace_id="7b211fa6-f0d6-4f98-a6f5-689e6af64748",
+    )
+    # ToolResult 是 dataclass,用 vars 输出
+    print(vars(result))
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 198 - 0
tests/tools/weixin_tools.py

@@ -0,0 +1,198 @@
+from __future__ import annotations
+
+import json
+import logging
+
+from agent.tools import tool, ToolResult
+from src.infra.shared.http_client import AsyncHttpClient
+from src.infra.shared.common import extract_history_articles
+
+logger = logging.getLogger(__name__)
+
+# url from aigc
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+
+@tool(description="通过关键词搜索微信文章")
+async def weixin_search(keyword: str, page="1") -> dict | None:
+    """
+        微信关键词搜索
+
+        通过关键词搜索微信的文章信息,page 用于控制翻页
+
+        Args:
+            keyword: 搜索关键词
+
+        Returns:
+            ToolResult: 包含以下内容:
+                - output: 文本格式的搜索结果摘要
+                - metadata.search_results: 结构化的搜索结果列表
+                    - title: 文章标题
+                    - url: 文章链接
+                    - statistics: 统计数据
+                        - time: 文章发布时间戳(秒)
+                - metadata.raw_data: 原始 API 返回数据
+
+        Note:
+            - 使用 next_cursor 参数可以获取下一页结果
+            - 建议从 metadata.search_results 获取结构化数据,而非解析 output 文本
+            - 返回的 next_cursor 值可用于下一次搜索的 cursor 参数
+        """
+    url = "{}/keyword".format(base_url)
+    payload = json.dumps({"keyword": keyword, "cursor": page})
+    try:
+        async with AsyncHttpClient(timeout=120) as http_client:
+            response = await http_client.post(url=url, headers=headers, data=payload)
+
+    except Exception as e:
+        print(e)
+        return None
+    print(json.dumps(response, ensure_ascii=False, indent=4))
+    return response
+
+
+@tool(description="通过公众号文章链接获取公众号详情信息")
+async def fetch_weixin_account(content_link: str) -> dict | None:
+    """
+        通过公众号文章链接获取公众号的详情信息
+
+        Args:
+            content_link: 公众号文章链接
+
+        Returns:
+            ToolResult: 包含以下内容:
+                - output: 文本格式的公众号详情摘要
+                - metadata.account_info: 公众号详情信息
+                    - account_name: 公众号名称
+                    - wx_gh: 公众号ID
+                    - biz_info: 公众号biz信息
+                    - channel_account_id: 公众号账号内部ID
+                - metadata.raw_data: 原始 API 返回数据
+
+        Note:
+            - 建议从 metadata.account_info 获取结构化数据,而非解析 output 文本
+    """
+    url = "{}/account_info".format(base_url)
+    payload = json.dumps({"content_link": content_link, "is_cache": False})
+
+    try:
+        async with AsyncHttpClient(timeout=120) as http_client:
+            response = await http_client.post(url=url, headers=headers, data=payload)
+
+    except Exception as e:
+        logger.error(e)
+        return None
+    print(json.dumps(response, ensure_ascii=False, indent=4))
+    return response
+
+
+@tool(description="通过微信公众号的 wx_gh 获取微信公众号的历史发文列表")
+async def fetch_account_article_list(wx_gh: str, index=None, is_cache=True) -> dict | None:
+    """
+    通过公众号的 wx_gh 获取历史发文列表
+
+    Args:
+        wx_gh: 公众号ID
+        index: 分页索引
+        is_cache: 是否使用缓存
+
+    Returns:
+        ToolResult: 包含以下内容:
+            - output: 文本格式历史发文列表摘要
+            - metadata.next_cursor: 游标,用于下一页查询
+            - metadata.articles: 历史发文列表
+                - msg_id: 发布消息ID
+                - title: 文章标题
+                - digest: 文章摘要描述
+                - content_url: 文章链接
+                - cover_url: 封面链接
+                - create_time: 文章发布时间戳
+                - position: 文章位置
+                - statistics: 统计数据
+                    - view_count: 文章阅读量
+                    - like_count: 文章点赞量
+                    - pay_count: 文章付费量
+                    - zs_count: 文章赞赏量
+            - metadata.raw_data: 原始 API 返回数据
+
+    Note:
+        - 使用 next_cursor 参数可以获取下一页结果
+        - 建议从 metadata.history_articles 获取结构化数据,而非解析 output 文本
+            - metadata.raw_data: 原始 API 返回数据
+    """
+    url = "{}/blogger".format(base_url)
+    payload = json.dumps(
+        {
+            "account_id": wx_gh,
+            "cursor": index,
+            "token": "1fa4c0ad5c66e43ebd525611f3869f53",
+            "is_cache": is_cache,
+        }
+    )
+
+    try:
+        async with AsyncHttpClient(timeout=120) as http_client:
+            response = await http_client.post(url=url, headers=headers, data=payload)
+
+    except Exception as e:
+        logger.error(e)
+        return None
+
+    return extract_history_articles(response)
+
+
+@tool(description="通过公众号文章链接获取文章详情")
+async def fetch_article_detail(article_link: str, is_count: bool = False, is_cache: bool = True) -> dict | None:
+    """
+    通过公众号的 文章链接获取文章详情
+    Args:
+        article_link: 文章链接
+        is_count: 是否统计文章阅读量 默认 False
+        is_cache: 是否使用缓存 默认 True
+
+    Returns:
+        ToolResult: 包含以下内容:
+            - output: 文本格式文章详情摘要
+            - metadata.article_info: 文章详情信息
+                - title: 文章标题
+                - channel_content_id: 文章内部ID
+                - content_link: 文章链接
+                - body_text: 文章正文文本
+                - mini_program: 文章嵌入小程序信息【若无则是空数组】
+                - image_url_list: 文章图片列表【若无则是空数组】
+                - publish_timestamp: 文章发布时间戳【毫秒时间戳】
+            - metadata.raw_data: 原始 API 返回数据
+
+    Note:
+        - 建议从 metadata.article_info 获取结构化数据,而非解析 output 文本
+            - metadata.raw_data: 原始 API 返回数据
+    """
+    target_url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": article_link,
+            "is_count": is_count,
+            "is_ad": False,
+            "is_cache": is_cache,
+        }
+    )
+    try:
+        async with AsyncHttpClient(timeout=10) as http_client:
+            response = await http_client.post(target_url, headers=headers, data=payload)
+    except Exception as e:
+        print(e)
+        return None
+
+    return response
+
+
+if __name__ == "__main__":
+    url = "http://mp.weixin.qq.com/s?__biz=MjM5ODI5NTE2MA==&mid=2651871172&idx=1&sn=791630221da3b28fc23949c48c994218&chksm=bc39e9a2a29ea779aef9f6a510f24c3b0addfbc08c86d2d20f8bce0c132fc9b0bed98dc6c8ee&scene=7#rd"
+    async def run():
+        response = await fetch_article_detail(url)
+        import json
+        print(json.dumps(response, ensure_ascii=False, indent=4))
+
+    import asyncio
+    asyncio.run(run())