Sfoglia il codice sorgente

寻找策略修改

jihuaqiang 1 giorno fa
parent
commit
e09afb3c90

+ 34 - 35
examples/content_finder/content_finder.md

@@ -18,19 +18,19 @@ $system$
 - 获取高赞视频的选题点: `get_video_topic`
 - 抖音视频搜索:`douyin_search`
 - 抖音视频搜索(Tikhub):`douyin_search_tikhub`
-- 抖音作者作品搜索:`douyin_user_videos`
+- 订阅账号作品搜索:`douyin_user_videos`
 - 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
 - 作品画像获取:`get_content_fans_portrait`
 - 作者画像获取:`get_account_fans_portrait`
- - 过程记录:`think_and_plan`
- - 存储结果至数据库:`store_results_mysql`
- - 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
- - 过程策略表格(记录每条内容的选择依据):`exec_summary`(**最后一步**,见执行流程第 9 步)
+- 过程记录:`think_and_plan`
+- 存储结果至数据库:`store_results_mysql`
+- 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
+- 过程策略表格(记录每条内容的选择依据):`exec_summary`(**最后一步**,见执行流程第 9 步)
 
 ## 重要约束
-- 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
 - **严格禁止**调用任何名称以 `browser_` 开头的浏览器工具
 - 每个结论都必须有工具调用证据。
+- Agent执行过程中会在 OUTPUT_DIR 下存储执行的log,当遇到上下文丢失的情况时可从该文件读取。
 
 ## 运营人员平台背景
 - 平台载体:微信小程序
@@ -47,28 +47,24 @@ $system$
 6. **Schema 校验阶段**:逐字段自检;不符合就重写 `output.json`
 7. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
 8. **接入平台阶段**:按 `aigc_platform_plan` 生成 AIGC 爬取计划
-9. **过程摘要阶段(内容策略表格)**:在以上全部完成后,调用 `exec_summary(trace_id, summary_json)`,将**每条入选/候选内容的选择策略**整理成表格形式的 JSON,写入 `{output_dir}/{trace_id}/process_trace.json`。`summary_json` 必须是合法 JSON,可以是数组或对象:
-   - 如果是数组:每个元素代表一行记录,最终会被包成 `{ "rows": [...] }`
-   - 如果是对象:应包含 `rows` 字段,`rows` 为行列表
+9. **过程摘要阶段(内容策略表格)**:在以上全部完成后,按 `exec_summary_rows` 的要求生成 `summary_json`,并调用 `exec_summary(trace_id, summary_json, log_path)` tool(`log_path` 传入 `{output_dir}/{trace_id}/log.txt`),将**每条入选的选择策略**整理成表格形式的 JSON,写入 `{output_dir}/{trace_id}/process_trace.json`。
    
-   建议每行至少包含以下字段(键名必须是英文):
-   - `strategy_type`: `"case_based"` 或 `"feature_based"`,表示是从高赞 case 出发还是从特征出发
-   - `from_case_aweme_id` / `from_case_point` / `from_feature`: 具体来源的 case 选题点或特征词
-   - `search_keyword`: 使用的搜索词
-   - `channel`: 获取方式,例如 `"search"`(搜索)、`"author"`(账号)、`"ranking"`(榜单,预留)、`"other"`
-   - `decision_basis`: 主要保留依据,例如 `"demand_filtering"`(需求理解阶段的筛选方案)、`"content_portrait"`(内容画像匹配)、`"author_portrait"`(作者画像匹配)、`"other"`
-   - `decision_notes`: 补充说明文本,用于解释本条内容为什么被保留/淘汰
 
 ## 强制要求(违反即为错误)
 
+### 寻找内容阶段
+1. 最多搜索次数:len(高赞case出发搜索词) * 2 + len(特征出发搜索词) * 2 , 即每个搜索词最多搜2页。
+2. 搜索阶段只能使用"高赞case出发搜索词" 和 "特征出发搜索词",**禁止扩展搜索词**
+3. **非常重要**: 达到**最多搜索次数**后即使不满足要求的输出数量也直接输出,不再继续扩展搜索。
+4. 对每个搜索词,先确定寻找策略优先级,再按优先级执行所有的策略,不能跳过订阅账号作品搜索的策略。
+
 ### 需求理解阶段
-1. 禁止使用特征作为搜索词。
-2. 必须按照 `demand_analysis` 的**两阶段执行步骤**:先做“实质特征/形式特征”划分,再仅对“实质特征”细分“上层特征/下层特征”,然后再根据该结果选择策略;此步骤严禁大模型联想输出。
-3. **特征分层归类**本质是对输入特征的筛选与重组,必须使用原词,不能联想新词;上/下层特征均来自实质特征,形式特征不参与上/下层细分。
-4. 当实质特征不为空时,必须满足:上层特征和下层特征不能同时为空,且应满足 `上层特征 ∪ 下层特征 = 实质特征`(允许同一原词在不同阶段被引用)。
-5. 不管下层特征是否具体,都需要调用**高赞case工具**,不能直接发起搜索,搜索词和输出字段**必须基于`get_video_topic`工具返回的metadata.videos字段**进行填充
-6. 所有`下层特征`的特征词必须根据**高赞视频选题点提取**的结果进行后续步骤,不需要再和原始的特征词关联。
-7. 此阶段必须输出下面的结构(举例)
+1. 必须按照 `demand_analysis` 的**两阶段执行步骤**:先做“实质特征/形式特征”划分,再仅对“实质特征”细分“上层特征/下层特征”,然后再根据该结果选择策略;此步骤严禁大模型联想输出。
+2. **特征分层归类**本质是对输入特征的筛选与重组,必须使用原词,不能联想新词;上/下层特征均来自实质特征,形式特征不参与上/下层细分。
+3. 当实质特征不为空时,必须满足:上层特征和下层特征不能同时为空,且应满足 `上层特征 ∪ 下层特征 = 实质特征`(允许同一原词在不同阶段被引用)。
+4. 命中**case出发策略**时,不管下层特征是否具体,都需要调用**高赞case工具**,不能直接发起搜索,搜索词和输出字段**必须基于`get_video_topic`工具返回的metadata.videos字段**进行原值填充,所有`下层特征`的特征词必须根据**高赞视频选题点提取**的结果进行后续步骤,不需要再和原始的特征词关联,也不允许联想或者新生成。
+5. 命中**特征出发策略**时,使用原始的特征词填充特征出发搜索词
+6. 此阶段必须输出下面的结构(举例)
 ```json
 {
   "特征归类": {
@@ -78,8 +74,8 @@ $system$
     "上层特征": ["特征词2"]
   },
   "起点策略": {
-    "高赞case出发搜索词": [],
-    "特征出发待寻找账号列表": [],
+    "高赞case出发搜索词": ["case出发的灵感点"],
+    "特征出发搜索词": ["使用上层特征or下层特征填充"],
     "是否调用高赞case工具": true,
     "高赞case_灵感点": [],
     "高赞case_目的点": [],
@@ -94,8 +90,7 @@ $system$
 }
 ```
 
-### 画像工具必须调用
-对每条候选内容,**必须**按以下顺序获取画像:
+### 筛选阶段必须按照 `content_filtering_strategy` 的步骤进行,对于**case出发**的搜索结果,满足6分即可输出不需要查看画像;其他结果按顺序查看画像
 1. 先调用 `get_content_fans_portrait`,检查 `metadata.has_portrait`。
 2. 若 `has_portrait=False`,如果是 `douyin_search` 或 `douyin_search_tikhub` 获取到的视频,再调用 `get_account_fans_portrait` 兜底,如果是`douyin_user_videos`则不需要再次调用`get_account_fans_portrait`。
 补充:`douyin_search` 失败后再调用 `douyin_search_tikhub` 作为兜底。
@@ -111,16 +106,20 @@ $system$
 
 **在宣称任务完成或结束对话前,必须逐项确认;任一项未满足则继续执行,不得提前收尾。**
 
-### 1.画像(内容 + 账号)是否已获取
-- 对**最终写入 `contents` 的每一条**视频,是否都已调用过 `get_content_fans_portrait(aweme_id)`?
+### 1.寻找阶段策略是否都已执行
+根据**需求理解阶段**输出的case出发和特征出发搜索词都已经执行了内容寻找
+
+### 2.筛选阶段是否按规则执行
+- 对于所有`基于case出发策略`的搜索结果,优先进行 **基础筛选**和**基于case出发策略筛选**,满足6分条件直接进入最终输出池,不需要调用画像数据。
+- 其他策略获取的视频或达不到6分的视频,是否都已调用过 `get_content_fans_portrait(aweme_id)`?
 - 对其中 `metadata.has_portrait=False` 的条目,是否**在同一条目上**已调用 `get_account_fans_portrait(account_id=author.sec_uid)` 作为兜底?
 - **禁止**:仅因内容侧无画像就跳过账号画像、直接把 `portrait_data` 当空或来源标为 `none` 而未尝试账号接口(除非两次调用均失败且已在理由中说明)。
 
-### 输出、校验、入库顺序是否正确
+### 3.输出、校验、入库顺序是否正确
 - 是否已先写 `output.json`,再完成 Schema 校验,最后才调用 `store_results_mysql(trace_id)`?
 - **禁止**:未校验 Schema 就直接入库。
 
-### Schema 合规闸门(入库前必须通过)
+### 4.Schema 合规闸门(入库前必须通过)
 - 在调用 `store_results_mysql` 前,必须逐项核对 `output.json` 是否满足 `output_schema`;**不通过就先重写 JSON,不得入库**。
 - 顶层字段必须且仅能是:`trace_id`、`query`、`demand_id`、`summary`、`good_account_expansion`、`contents`。
 - `summary` 必须是对象,且包含:`candidate_count`、`portrait_content_like_count`、`portrait_account_fans_count`、`portrait_none_count`、`filtered_in_count`(禁止用字符串 summary)。
@@ -130,16 +129,16 @@ $system$
 - 每条 `contents` 的 `portrait_data` 必须包含:`source`、`age_50_plus_ratio`、`age_50_plus_tgi`、`url`。
 - 字符串值中若有双引号 `"`,必须写成 `\"`(反斜杠 + 双引号)
 
-### AIGC 接入(爬取计划)是否已接入
+### 5.AIGC 接入(爬取计划)是否已接入
 - `contents` 中入选视频是否在**入库成功后**已按 `aigc_platform_plan` 调用 `create_crawler_plan_by_douyin_content_id`?
 - **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
 
-### 过程摘要是否已写入
-- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`,且 `summary_json` 以“表格”的形式逐条记录了入选/候选内容的选择策略(起点类型、来源 case/特征、搜索词、信道、保留依据等)?
+### 6.过程摘要是否已写入
+- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`生成了每条视频的过程记录.
 
 
 $user$
-任务:找10个以「%query%」为特征的视频。
+任务:找最多10个以「%query%」为特征的视频。
 
 特征词: %query%
 搜索词id: %demand_id%(如有)

+ 21 - 5
examples/content_finder/core.py

@@ -10,7 +10,7 @@ import sys
 import os
 from pathlib import Path
 from typing import Optional, Dict, Any
-from utils.log_capture import build_log, log
+from utils.log_capture import attach_log_file, build_log, log
 from datetime import datetime
 import uuid
 
@@ -91,7 +91,7 @@ from tools import (
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "正能量"
+DEFAULT_QUERY = "体制内,近亲繁殖"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -222,6 +222,17 @@ async def run_agent(
             async for item in runner.run(messages=messages, config=config):
                 if isinstance(item, Trace):
                     trace_id = item.trace_id
+                    # 一旦拿到 trace_id,立即绑定日志文件,确保后续步骤(含 exec_summary)能读到实时 log.txt
+                    try:
+                        log_file_path = _resolve_log_file_path(
+                            content_finder_root=content_finder_root,
+                            output_dir_path=output_dir_path,
+                            trace_id=trace_id,
+                            execution_id=execution_id,
+                        )
+                        attach_log_file(execution_id, log_file_path)
+                    except Exception as e:
+                        logger.warning(f"绑定实时 log.txt 失败: trace_id={trace_id}, err={e}")
 
                     if item.status == "completed":
                         logger.info(f"Agent 执行完成: trace_id={trace_id}")
@@ -260,9 +271,14 @@ async def run_agent(
                 trace_id=trace_id,
                 execution_id=execution_id,
             )
-            log_file_path.parent.mkdir(parents=True, exist_ok=True)
-            with open(log_file_path, "w", encoding="utf-8") as f:
-                f.write(full_log)
+            # 兜底:如果实时落盘失败/未绑定,则在结束时一次性写入
+            try:
+                if not log_file_path.exists():
+                    log_file_path.parent.mkdir(parents=True, exist_ok=True)
+                    with open(log_file_path, "w", encoding="utf-8") as f:
+                        f.write(full_log)
+            except Exception as e:
+                logger.warning(f"写入 log.txt 兜底失败: trace_id={trace_id}, err={e}")
 
             try:
                 from render_log_html import render_log_html_and_upload

+ 4 - 0
examples/content_finder/db/__init__.py

@@ -12,6 +12,7 @@ from .schedule import (
     get_next_unprocessed_demand,
     get_daily_unprocessed_pool,
     create_task_record,
+    fetch_trace_ids_created_after,
     update_task_status,
     update_task_on_complete,
 )
@@ -20,6 +21,7 @@ from .store_results import (
     insert_contents,
     update_content_plan_ids,
     update_web_html_url,
+    update_process_trace_by_aweme_id,
 )
 
 __all__ = [
@@ -28,10 +30,12 @@ __all__ = [
     "get_next_unprocessed_demand",
     "get_daily_unprocessed_pool",
     "create_task_record",
+    "fetch_trace_ids_created_after",
     "update_task_status",
     "update_task_on_complete",
     "upsert_good_authors",
     "insert_contents",
     "update_content_plan_ids",
     "update_web_html_url",
+    "update_process_trace_by_aweme_id",
 ]

+ 30 - 0
examples/content_finder/db/schedule.py

@@ -6,6 +6,7 @@ demand_find_task: 执行记录表,通过 demand_content_id 关联
 """
 
 import logging
+from datetime import datetime
 from typing import Any, Dict, List, Optional
 
 from .connection import get_connection
@@ -161,6 +162,35 @@ def update_task_on_complete(demand_content_id: int, trace_id: str, status: int)
             conn.close()
 
 
+def fetch_trace_ids_created_after(cutoff: datetime) -> list[str]:
+    """
+    查询 demand_find_task 中 created_at 晚于 cutoff 的去重 trace_id(排除空串)。
+
+    cutoff 与表字段比较规则与 MySQL DATETIME/TIMESTAMP 一致。
+    """
+    sql = """
+    SELECT DISTINCT trace_id
+    FROM demand_find_task
+    WHERE created_at > %s
+      AND trace_id IS NOT NULL
+      AND trace_id <> ''
+    ORDER BY trace_id
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (cutoff,))
+            rows = cur.fetchall() or []
+            return [str(r["trace_id"]) for r in rows]
+    except Exception as e:
+        logger.error(f"fetch_trace_ids_created_after 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
 def update_task_status(trace_id: str, demand_content_id: int, status: int) -> None:
     """
     更新 demand_find_task 中指定记录的状态。

+ 29 - 0
examples/content_finder/db/store_results.py

@@ -226,3 +226,32 @@ def update_web_html_url(trace_id: str, web_html_url: str) -> int:
             return cur.rowcount
     finally:
         conn.close()
+
+
+def update_process_trace_by_aweme_id(*, trace_id: str, aweme_id: str, process_trace_text: str) -> int:
+    """
+    根据 (trace_id, aweme_id) 回写 demand_find_content_result.process_trace(TEXT)。
+
+    约定:
+    - trace_id 为 output 子目录名
+    - aweme_id 为内容唯一 id(表中 demand_find_content_result.aweme_id)
+    - process_trace_text 为 JSON 序列化后的字符串(或原始文本)
+    """
+    t = (trace_id or "").strip()
+    a = (aweme_id or "").strip()
+    text = (process_trace_text or "").strip()
+    if not t or not a or not text:
+        return 0
+
+    sql = """
+    UPDATE demand_find_content_result
+    SET process_trace = %s
+    WHERE trace_id = %s AND aweme_id = %s
+    """
+    conn = get_connection()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(sql, (text, t, a))
+            return cur.rowcount
+    finally:
+        conn.close()

+ 4 - 4
examples/content_finder/render_log_html.py

@@ -292,17 +292,17 @@ def _build_process_trace_table_html(*, process_trace_path: Path, output_json_pat
         logger.warning("read output.json failed: path=%s err=%s", output_json_path, e)
 
     headers: list[tuple[str, str]] = [
-        ("input_features", "特征"),
+        ("input_features", "特征"),
         ("aweme_id", "视频id"),
         ("title", "标题"),
         ("video_url", "视频链接"),
         ("author_nickname", "作者"),
-        ("from_case_point", "参考点"),
         ("strategy_type", "策略"),
+        ("from_case_point", "参考点"),
         ("channel", "渠道"),
         ("search_keyword", "搜索词"),
-        ("decision_basis", "依据"),
-        ("decision_notes", "理由"),
+        ("decision_basis", "筛选依据"),
+        ("decision_notes", "筛选理由"),
     ]
 
     def td(text: str, *, muted: bool = False, title: str | None = None) -> str:

+ 6 - 10
examples/content_finder/skills/content_filtering_strategy.md

@@ -6,28 +6,24 @@ description: 内容筛选方法论
 # 内容筛选方法论
 
 ## 核心流程
-基础筛选 -> 基于高赞case筛选 → 画像匹配 → 去重排序
+基础筛选 -> 基于case出发策略筛选 → 画像匹配 → 去重排序
 
 ---
 ## 筛选步骤
 ### 阶段1:基础筛选
-
-在获取画像前先快速过滤,减少不必要的 API 调用。
-
+在获取画像前先快速过滤掉和搜索词明显不相关的视频以及点赞量一般的视频,减少不必要的 API 调用。
 **热度参考标准**:
-
 | 点赞量 | 热度等级 |
 |---|---|
 | 1000+ | 一般热度 |
 | 5000+ | 较高热度 |
 | 10000+ | 高热度 |
 | 50000+ | 爆款 |
-
 评估维度:digg_count(点赞)、comment_count(评论)、share_count(分享)
 
-### 阶段2:**基于高赞case筛选**
+### 阶段2:**基于case出发策略筛选**(仅适用于case出发的搜索结果)
 
-使用 `demand_analysis` 的结果里的**筛选方案**做高赞case筛选,目的在于:
+使用 `demand_analysis` 的结果里的**筛选方案**做case出发策略筛选,目的在于:
 - 先把“明显不满足筛选方案”的内容尽早淘汰,减少无效画像调用
 - 在每条内容的 `reason` 中给出“需求对齐的依据”(来自标题/描述可读信息 + goodcase 选题点)
 
@@ -49,13 +45,13 @@ description: 内容筛选方法论
    - 命中:加分或作为排序
    - 不命中:不直接淘汰,但在 `reason` 中标注“不匹配/不确定”
 
+注意: 此步骤**超过6分的直接进入输出池**,不需要再根据画像数据判断。
 #### 在输出 reason 中必须包含的要素
 
 对于进入后续画像阶段的候选,在其 `reason` 中至少写明:
 至少包含四项:命中的 `目的点` 状态;命中的 `灵感点` 状态;`关键点`(命中/部分/缺失)与缺失说明或不确定点;形式规则是命中还是不确定(如无法从标题/描述判断)
 
 ### 阶段3:画像匹配筛选
-
 **分批处理**:先处理前 10 条候选内容,筛选后 >= M 则停止,不足再继续下一批。  
 **并行限制**:每次最多并行调用 3 个画像工具。  
 **停止条件**:已获取画像数量 >= M × 1.5 时,立即停止,进入下一阶段。
@@ -92,7 +88,7 @@ description: 内容筛选方法论
 
 ## 关键原则
 **标准来自需求**:评估维度随需求变化,不固化。
-**分阶段筛选**:先基础筛选,再基于高赞case筛选,最后画像匹配,提高效率。
+**分阶段筛选**:先基础筛选,再基于case出发的筛选,最后画像匹配,提高效率。
 **画像兜底策略**:优先内容画像,缺失时用账号画像,确保数据覆盖。
 **说明评估逻辑**:让用户理解为什么这条内容被推荐或排除。
 **承认不确定性**:数据不足以判断时,如实说明,而不是强行打分。

+ 23 - 26
examples/content_finder/skills/content_finding_strategy.md

@@ -1,42 +1,51 @@
 ---
 name: content_finding_strategy
-description: 内容搜索方法论
+description: 内容寻找方法论
 ---
 
-# 内容搜索方法论
+# 内容寻找方法论
 
-## 核心流程:关键词提取 → 串行搜索 → 结果评估 → 按需补充
+## 核心流程:关键词提取 → 寻找策略确定 → 策略内容 → 结果评估
 
----
 
-## 第步:关键词提取
+## 第1步:关键词提取
 
-- 搜索词来自于`需求分析`步骤的输出,禁止将特征作为搜索词
+- 搜索词来自于`需求分析`步骤的输出,提取**高赞case出发搜索词**和**特征出发搜索词**
 - 确定目标数量 **M**(如"找10条",则 M = 10)
+- 所有的搜索词必须取自**高赞case出发搜索词**和**特征出发搜索词**,不允许联想其他词。
 
----
+## 第2步:寻找策略
+
+### 策略汇总
+1. 抖音搜索(已实现)
+2. 索引榜单搜索(暂未实现,可不执行)
+3. 垂类推荐流(暂未实现, 可不执行)
+4. 订阅账号作品搜索(已实现)
 
-## 第二步:串行关键词搜索
+### 寻找策略确定
+1. 搜索词只能来源于 **第一步:关键词提取**
+2. ‼️重要:**严谨联想或扩展**搜索词。
+3. 具象的搜索词优先 **抖音搜索**,抽象的搜索词优先**订阅账号作品搜索**
+4. 无论具象或者抽象,都需要按优先级执行所有的策略,**不能跳过**。
 
-### 优先:抖音搜索
+## 第3步:策略执行
+### 抖音搜索
 **搜索词限制**: 仅搜索第一步中输出的搜索词,严谨联想或者扩展其他词搜索。
 **数量控制**:只搜索 **N = M × 2** 条,搜到后立即停止,不超出此限制。
 **数据读取规则**:
 - 搜索结果从 `metadata.search_results` 获取,**不要解析工具的 output 文本**
 - 账号作品从 `metadata.user_videos` 获取
 - 数据库作者从 `find_authors_from_db` 的 `metadata.authors` 获取(优先使用其中的 `author_sec_uid`)
-**分页策略**:第一次使用默认 cursor(`"0"` 或 `""`),需要更多时使用返回的 cursor 继续获取。
+**分页策略**:第一次使用默认 cursor(`"0"` 或 `""`),需要更多时使用返回的 cursor 继续获取,**最多搜索2页**
 **兜底策略**:`douyin_search` 失败或无结果时,使用 `douyin_search_tikhub`。
 
-### 备选:历史优质作者扩展(备选策略)
-当关键词搜索结果质量不稳定、或需要更贴近目标人群的内容时,可走“作者→作品”的扩展路径:
+### 订阅账号作品搜索
 - 先调用 `find_authors_from_db(query)`:从数据库历史沉淀中按搜索词找到相关优质作者(返回 `author_sec_uid`)
 - 再对 Top 作者调用 `douyin_user_videos(account_id=author_sec_uid)` 拉作品,作为候选池补充
 **仍需遵守数量控制**:作者扩展拿到的作品也计入候选数量,总量不要超过 **N = M × 2**。
 
----
 
-## 第三步:数据真实性规范(严格遵守)
+## 第4步:结果评估
 
 **禁止编造数据**,所有字段必须来自工具返回的 metadata。
 
@@ -59,18 +68,6 @@ sec_uid = item["author"]["sec_uid"]  # 完整复制,约 80 字符
 
 **违反后果**:编造数据会导致 404 错误,严重影响用户体验。
 
----
-
-## 第四步:结果评估与补充
-
-经 `content_filtering_strategy` 筛选后,统计符合要求的内容数量 **C**:
-
-- **C >= M**:完成,进入输出阶段
-- **C < M × 0.8**:内容不足,选下一个关键词,回到第二步
-- **M × 0.8 <= C < M**:接近目标,可选择继续补充或直接输出
-
----
-
 ## 错误处理
 
 | 错误类型 | 处理策略 |

+ 10 - 14
examples/content_finder/skills/demand_analysis.md

@@ -29,14 +29,13 @@ description: 需求分析
 
 > 重要:**形式特征不参与“上层/下层”分层**,它们只进入后续的判别规则(如表达结构、节奏、可分享程度)。
 
-### 步骤2. 根据步骤1的归类选择策略
-根据上述分层结果决定要执行哪些起点策略:
-- **当 `下层特征` 非空**:必须执行 **A. 高赞视频选题点提取**(用 `get_video_topic` 拉“灵感点/目的点/关键点”)
-- **当 `上层特征` 非空**:执行 **B. 特征出发**(构建主题树 → 下钻出可执行词)
+### 步骤2. 根据步骤1的归类执行策略
+- **当 `下层特征` 非空**:必须执行 **case出发策略**(用 `get_video_topic` 拉“灵感点/目的点/关键点”)
+- **当 `下层特征`或者`上层特征` 非空**:必须执行 **特征出发策略**(使用特征填充搜索词)
 - **两者都非空**:A + B **都执行**,最后合并去重
 - **只有形式词/实质词为空**:承认信息不足,只能先按用户原话/补充提问(或用最少假设)生成候选搜索词包;不要编造“高赞case选题点”
 
-### A. 高赞视频选题点提取
+### case出发策略
 
 适用:`下层特征` 非空必须执行此步骤。  
 要求:必须**基于`get_video_topic`的metadata.videos内的所有视频的选题点进行**,不允许自行联想填充字段。
@@ -47,18 +46,15 @@ description: 需求分析
    对每条内容的灵感点和`features`进行相关性判别,选出最贴合特征词的3条内容作为**最佳选题筛选结果**。
   - 步骤2.2 选题点提取
    - 先根据步骤2.1的**最佳选题筛选结果**填充 **起点策略.高赞case_灵感点**,**起点策略.高赞case_目的点**,**起点策略.高赞case_关键点** 这些字段内容,注意直接使用原词填充。
-   - 用步骤2.1的所有**最佳选题筛选结果**里面的灵感点填充**起点策略.高赞case出发搜索词**字段,由灵感点扩展出的即时搜索词(允许 3-5 个同义/上下位词),即时搜索词不允许和原始特征`features`一样。
+   - 用步骤2.1的所有**最佳选题筛选结果**里面的灵感点填充**起点策略.高赞case出发搜索词**字段,直接使用灵感点填充,不需要联想或者修改。即时搜索词不允许和原始特征`features`一样。
    - 用步骤2.1的所有**最佳选题筛选结果**里面的目的点补充**筛选方案.目的点对齐规则**字段
    - 用步骤2.1的所有**最佳选题筛选结果**里面的关键点补充**筛选方案.关键点打分说明**字段
 
-### B. 特征出发(仅用于上层特征)
-适用:`上层特征` 非空必须执行此步骤。 
+### 特征出发策略
+适用:`下层特征`或`上层特征` 有一个非空必须执行此步骤。 
 动作:
-1. 输入:使用 `上层特征` 作为主题根
-2. 以上层特征构建主题树(主题 -> 子主题 -> 关键词)
-3. 将树上的子主题/关键词**下钻成可执行搜索词**(落到能直接丢给 `douyin_search` 的词)
-4. 结合库内优质作者特征做扩展(可选:`find_authors_from_db` → `douyin_user_videos`)
-5. 合并得到 `特征出发待寻找账号列表`,进入搜索阶段
+1. 使用所有`下层特征`和`上层特征` 填充**起点策略.特征出发搜索词**
+2. 不允许修改原本的特征词
 
 > 两条起点可并行,不互斥;最后合并去重。
 
@@ -80,7 +76,7 @@ description: 需求分析
   },
   "起点策略": {
     "高赞case出发搜索词": [],
-    "特征出发待寻找账号列表": [],
+    "特征出发搜索词": [],
     "是否调用高赞case工具": true,
     "高赞case_灵感点": [],
     "高赞case_目的点": [],

+ 58 - 0
examples/content_finder/skills/exec_summary_rows.md

@@ -0,0 +1,58 @@
+---
+name: exec_summary_rows
+description: 仅在需要写入 process_trace.json 时,用于记录最终输出的每条视频的寻找过程
+---
+
+## 目标
+生成用于记录最终输出的每条视频的寻找过程的json
+
+## 强约束(必须遵守)
+1. **只基于入选内容**:只能对 `output.json.contents` 中出现的 `aweme_id` 生成 rows;不得输出任何不在 contents 的视频(包括淘汰候选/搜索过程中的视频)。
+2. **rows 数量必须等于 contents 数量**:一条入选内容必须对应且仅对应一行 row。
+3. **字段固定且统一**:每行 row 只允许包含下列 key(不得增删改名):
+   - `aweme_id`
+   - `title`
+   - `author_nickname`
+   - `strategy_type`
+   - `from_case_aweme_id`
+   - `from_case_point`
+   - `from_feature`
+   - `search_keyword`
+   - `channel`
+   - `decision_basis`
+   - `decision_notes`
+   - `input_features`
+4. **值使用中文枚举**:
+   - `strategy_type`: `"case出发策略"` / `"特征出发策略"`
+   - `channel`: `"抖音搜索"` / `"订阅账号"` / `"榜单"` / `"其他"`
+   - `decision_basis`: `"内容画像匹配"` / `"作者画像匹配"` / `"需求筛选"` / `"画像缺失"` / `"其他"`
+5. **input_features**:必须是 `list[str]`;默认从 `output.json.query` 按逗号拆分得到(兼容中文逗号)。
+6. **允许为空的字段**:`from_case_aweme_id/from_case_point/from_feature` 若无法确定可为空字符串,但不能缺 key。
+
+## 依据
+- `output.json`(必须读取并以 `contents` 为准)
+  - `query`:用于 `input_features` 拆分
+  - `contents[]`:每条入选内容,含 `aweme_id/title/author_nickname/reason/portrait_data.source` 等
+- `log.txt`:用于判断内容来自哪种策略、来自哪个 case出发/特征出发、搜索词与渠道等
+
+## 生成规则(建议优先级)
+对每个 `content in output.json.contents`:
+1. `aweme_id/title/author_nickname` 直接来自 content(必须与 contents 一致)
+2. `decision_notes`:优先用 `content.reason`(入选理由)
+3. `decision_basis`:
+   - 若 `content.portrait_data.source == "content_like"` → `"内容画像匹配"`
+   - 若 `== "account_fans"` → `"作者画像匹配"`
+   - 若 `== "none"` → `"画像缺失"`
+   - 否则 `"其他"`
+4. `strategy_type/channel/search_keyword`:
+   - 若日志/上下文能明确来源,按事实填(且用上面的中文枚举)
+   - 否则 字段留空
+5. `from_case_* / from_feature`:能确定就填;不确定可空串。
+
+## 输出格式(必须严格)
+只输出一个 JSON 对象(不要 Markdown、不要解释、不要多余文本):
+
+```json
+{"rows":[{...},{...}]}
+```
+

+ 2 - 2
examples/content_finder/skills/high_quality_account.md

@@ -17,9 +17,9 @@ description: 优质账号扩展
 
 ### 扩展策略
 1. 调用 `douyin_user_videos(account_id=author.sec_uid)`,获取 10-20 条近期作品
-2. 对找到的作品进行大改的分析,掌握账号内容特征,内容特征可以用若干类型词语描述,比如"历史人物/生活科普/搞笑娱乐"等
+2. 对找到的作品进行分析,综合视频内容的占比,给当前作者一个唯一的品类特征,比如"近代历史故事分享/生活科普/搞笑娱乐"等
 3. 通过筛选的作品加入候选池,标注来源"优质账号扩展"
 
 ### 必须在输出中说明
-- 发现优质账号:说明账号名、目标人群占比、tgi,以及扩展了哪些作品、账号内容特征(使用若干词语输出,如历史人物/生活科普/搞笑娱乐等特征标签)
+- 发现优质账号:说明账号名、目标人群占比、tgi,以及扩展了哪些作品、账号品类特征(近代历史故事分享/生活科普/搞笑娱乐等特征标签)
 - 未发现:说明"未发现符合扩展条件的优质账号(需占比 > 60% 且 tgi > 120)"

+ 1 - 1
examples/content_finder/skills/output_schema.md

@@ -33,7 +33,7 @@ description: 输出结果指南
         "author_sec_uid": "<完整 sec_uid>",
         "age_50_plus_ratio": null,
         "age_50_plus_tgi": null,
-        "content_tags": "账号内容特征"
+        "content_tags": "账号品类特征"
       }
     ]
   },

+ 219 - 147
examples/content_finder/tools/exec_summary.py

@@ -9,9 +9,8 @@ from __future__ import annotations
 import json
 import logging
 import os
-from datetime import datetime, timezone
 from pathlib import Path
-from typing import Any, Dict, List, Optional, Sequence, Tuple
+from typing import Any, Dict, List, Optional, Tuple
 
 from agent.tools import tool, ToolResult
 from utils.tool_logging import format_tool_result_for_log, log_tool_call
@@ -67,158 +66,227 @@ def _load_output_json(*, trace_id: str) -> Optional[Dict[str, Any]]:
     return data if isinstance(data, dict) else None
 
 
-def _extract_get_video_topic_videos(*, trace_id: str) -> List[Dict[str, Any]]:
+def _extract_contents(*, trace_id: str) -> List[Dict[str, Any]]:
     """
-    从 log.txt 中提取 get_video_topic 的返回 metadata.videos(原始选题点)。
-
-    期望日志片段形态(render_log_html 同源格式):
-    [FOLD:🔧 工具调用:get_video_topic ...]
-    ...
-    [FOLD:📤 返回内容]
-    <json array>
-    [/FOLD]
+    从 output.json 读取最终入选 contents。
+
+    约定:
+    - 只允许对 output.json.contents 内的 aweme_id 生成/写入 process_trace rows
     """
-    log_path = _output_dir_path() / trace_id / "log.txt"
-    try:
-        text = log_path.read_text(encoding="utf-8")
-    except FileNotFoundError:
-        return []
-    except Exception:
-        logger.warning("读取 log.txt 失败: %s", str(log_path), exc_info=True)
+    output_json = _load_output_json(trace_id=trace_id) or {}
+    contents = output_json.get("contents")
+    if not isinstance(contents, list):
         return []
+    out: List[Dict[str, Any]] = []
+    for item in contents:
+        if isinstance(item, dict):
+            out.append(item)
+    return out
 
-    marker = "[FOLD:🔧 工具调用:get_video_topic"
-    start = text.find(marker)
-    if start < 0:
-        return []
-    snippet = text[start:]
 
-    out_marker = "[FOLD:📤 返回内容]"
-    out_start = snippet.find(out_marker)
-    if out_start < 0:
-        return []
-    after = snippet[out_start + len(out_marker) :]
+def _map_strategy_type(value: Any) -> str:
+    v = str(value or "").strip()
+    if v in ("case_based", "case", "case出发"):
+        return "case出发"
+    if v in ("feature_based", "feature", "特征出发"):
+        return "特征出发"
+    return v
+
+
+def _map_channel(value: Any) -> str:
+    v = str(value or "").strip()
+    mapping = {
+        "search": "抖音搜索",
+        "author": "订阅账号",
+        "ranking": "榜单",
+        "other": "其他",
+        "抖音搜索": "抖音搜索",
+        "订阅账号": "订阅账号",
+        "榜单": "榜单",
+        "其他": "其他",
+    }
+    return mapping.get(v, v)
+
+
+def _map_decision_basis(value: Any) -> str:
+    v = str(value or "").strip()
+    mapping = {
+        "content_portrait": "内容画像匹配",
+        "author_portrait": "作者画像匹配",
+        "demand_filtering": "需求筛选",
+        "other": "其他",
+        "画像缺失": "画像缺失",
+        "内容画像匹配": "内容画像匹配",
+        "作者画像匹配": "作者画像匹配",
+        "需求筛选": "需求筛选",
+        "其他": "其他",
+    }
+    return mapping.get(v, v)
 
-    json_start = after.find("[")
-    if json_start < 0:
-        return []
-    json_end = after.find("[/FOLD]")
-    if json_end < 0:
-        return []
 
-    raw = after[json_start:json_end].strip()
-    try:
-        parsed = json.loads(raw)
-    except Exception:
-        logger.warning("解析 get_video_topic 返回 JSON 失败", exc_info=True)
-        return []
+def _infer_decision_basis_from_output_content(content: Dict[str, Any]) -> str:
+    portrait = content.get("portrait_data") or {}
+    source = str(portrait.get("source") or "").strip()
+    if source == "content_like":
+        return "内容画像匹配"
+    if source == "account_fans":
+        return "作者画像匹配"
+    if source == "none":
+        return "画像缺失"
+    return ""
 
-    return parsed if isinstance(parsed, list) else []
 
+def _build_base_row(*, trace_id: str, content: Dict[str, Any], input_features: List[str], query: str) -> Dict[str, Any]:
+    return {
+        "trace_id": trace_id,
+        "aweme_id": str(content.get("aweme_id") or "").strip(),
+        "title": str(content.get("title") or "").strip(),
+        "author_nickname": str(content.get("author_nickname") or "").strip(),
+        "strategy_type": "",
+        "from_case_aweme_id": "",
+        "from_case_point": "",
+        "from_feature": "",
+        "search_keyword": str(query or "").strip(),
+        "channel": "抖音搜索",
+        "decision_basis": _infer_decision_basis_from_output_content(content),
+        "decision_notes": str(content.get("reason") or "").strip(),
+        "input_features": input_features,
+    }
 
-def _flatten_case_points_text(video: Dict[str, Any]) -> str:
-    tp = video.get("选题点")
-    if not isinstance(tp, dict):
-        return ""
-    tokens: List[str] = []
-    for k in ("灵感点", "目的点", "关键点"):
-        v = tp.get(k)
-        if isinstance(v, list):
-            for x in v:
-                if isinstance(x, str) and x.strip():
-                    tokens.append(x.strip())
-    return " ".join(tokens)
 
+_ROW_KEYS: Tuple[str, ...] = (
+    "trace_id",
+    "aweme_id",
+    "title",
+    "author_nickname",
+    "strategy_type",
+    "from_case_aweme_id",
+    "from_case_point",
+    "from_feature",
+    "search_keyword",
+    "channel",
+    "decision_basis",
+    "decision_notes",
+    "input_features",
+)
 
-def _score_match(*, row_text: str, candidate_text: str) -> int:
-    """
-    简单可控的匹配评分:按“子串命中次数”计分,避免引入分词依赖。
-    """
-    rt = (row_text or "").strip()
-    ct = (candidate_text or "").strip()
-    if not rt or not ct:
-        return 0
-    score = 0
-    for token in _split_input_features(rt):
-        if token and token in ct:
-            score += 2
-    # 再做一次整体包含(更强信号)
-    if rt and rt in ct:
-        score += 3
-    return score
-
-
-def _pick_best_case_video(
-    *, row: Dict[str, Any], case_videos: Sequence[Dict[str, Any]]
-) -> Optional[Dict[str, Any]]:
-    if not case_videos:
-        return None
-    row_text = " ".join(
-        [
-            str(row.get("from_case_point") or ""),
-            str(row.get("search_keyword") or ""),
-            str(row.get("title") or ""),
-        ]
-    ).strip()
-    scored: List[Tuple[int, int]] = []
-    for i, v in enumerate(case_videos):
-        scored.append((_score_match(row_text=row_text, candidate_text=_flatten_case_points_text(v)), i))
-    scored.sort(reverse=True)
-    best_score, best_idx = scored[0]
-    # 低于 1 视为“不确定”,但仍给出一个稳定的默认(第一个)
-    if best_score <= 0:
-        return case_videos[0]
-    return case_videos[best_idx]
+
+def _sanitize_row(row: Dict[str, Any]) -> Dict[str, Any]:
+    """只保留固定字段,并把枚举值规范成中文。"""
+    out: Dict[str, Any] = {k: row.get(k, "") for k in _ROW_KEYS}
+    out["strategy_type"] = _map_strategy_type(out.get("strategy_type"))
+    out["channel"] = _map_channel(out.get("channel"))
+    out["decision_basis"] = _map_decision_basis(out.get("decision_basis"))
+    # input_features 规范为 list[str]
+    feats = out.get("input_features")
+    if isinstance(feats, list):
+        out["input_features"] = [str(x).strip() for x in feats if str(x).strip()]
+    elif isinstance(feats, str):
+        out["input_features"] = _split_input_features(feats)
+    else:
+        out["input_features"] = []
+    return out
 
 
 def _normalize_payload(*, trace_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
-    rows = payload.get("rows")
-    if not isinstance(rows, list):
-        return payload
+    # tool 只做最小职责:过滤/补全/规范化;复杂推理由 skill 生成 summary_json 来完成
+    raw_rows = payload.get("rows")
+    rows_in_payload: List[Dict[str, Any]] = []
+    if isinstance(raw_rows, list):
+        for item in raw_rows:
+            if isinstance(item, dict):
+                rows_in_payload.append(item)
 
     output_json = _load_output_json(trace_id=trace_id) or {}
-    input_features = _split_input_features(str(output_json.get("query") or ""))
-    case_videos = _extract_get_video_topic_videos(trace_id=trace_id)
+    query = str(output_json.get("query") or "").strip()
+    input_features = _split_input_features(query)
+    contents = _extract_contents(trace_id=trace_id)
+    contents_by_aweme_id: Dict[str, Dict[str, Any]] = {
+        str(c.get("aweme_id") or "").strip(): c for c in contents if str(c.get("aweme_id") or "").strip()
+    }
 
-    normalized_rows: List[Any] = []
-    for item in rows:
-        if not isinstance(item, dict):
-            normalized_rows.append(item)
+    # 先把 payload rows 归并到 aweme_id
+    payload_by_aweme_id: Dict[str, Dict[str, Any]] = {}
+    for r in rows_in_payload:
+        aweme_id = str(r.get("aweme_id") or r.get("awemeId") or "").strip()
+        if not aweme_id:
             continue
-        row = dict(item)
-
-        # 1) 每条视频都体现原始输入特征词
-        if "input_features" not in row:
-            row["input_features"] = input_features
-
-        # 2) from_case_point:尽量输出“原始选题点信息”,而不是联想词
-        if "from_case_point" in row and case_videos:
-            original = _pick_best_case_video(row=row, case_videos=case_videos)
-            if isinstance(original, dict) and isinstance(original.get("选题点"), dict):
-                # 保留模型原先写的联想/归类结果,便于排查,但不作为主字段
-                if isinstance(row.get("from_case_point"), str) and row.get("from_case_point"):
-                    row["from_case_point_guess"] = row["from_case_point"]
-                row["from_case_point"] = original.get("选题点")
-                if "from_case_aweme_id" not in row:
-                    row["from_case_aweme_id"] = str(original.get("id") or "").strip() or None
-
-        normalized_rows.append(row)
-
-    out = dict(payload)
-    out["rows"] = normalized_rows
-    return out
+        payload_by_aweme_id[aweme_id] = dict(r)
+
+    # 只允许 payload 覆盖“策略/来源/解释”字段,避免覆盖 output.json.contents 的身份字段(title/author 等)
+    allowed_payload_keys: set[str] = {
+        "strategy_type",
+        "from_case_aweme_id",
+        "from_case_point",
+        "from_feature",
+        "search_keyword",
+        "channel",
+        "decision_basis",
+        "decision_notes",
+        "input_features",
+    }
+
+    # 兼容 payload 的常见别名/驼峰 key(模型输出不稳定时,尽量不丢信息)
+    alias_map: Dict[str, Tuple[str, ...]] = {
+        "strategy_type": ("strategy_type", "strategyType"),
+        "from_case_aweme_id": ("from_case_aweme_id", "fromCaseAwemeId", "case_aweme_id", "caseAwemeId"),
+        "from_case_point": ("from_case_point", "fromCasePoint", "case_point", "casePoint"),
+        "from_feature": ("from_feature", "fromFeature", "feature", "from_feature_name"),
+        "search_keyword": ("search_keyword", "searchKeyword", "keyword"),
+        "channel": ("channel", "source_channel", "sourceChannel", "source"),
+        "decision_basis": ("decision_basis", "decisionBasis"),
+        "decision_notes": ("decision_notes", "decisionNotes", "notes"),
+        "input_features": ("input_features", "inputFeatures"),
+    }
+
+    def _pick(provided: Dict[str, Any], key: str) -> Any:
+        for k in alias_map.get(key, (key,)):
+            if k in provided:
+                return provided.get(k)
+        return None
+
+    normalized: List[Dict[str, Any]] = []
+    for aweme_id, content in contents_by_aweme_id.items():
+        base = _build_base_row(trace_id=trace_id, content=content, input_features=input_features, query=query)
+        provided = payload_by_aweme_id.get(aweme_id) or {}
+
+        merged = dict(base)
+        # 只合并允许覆盖的字段
+        for k in allowed_payload_keys:
+            v = _pick(provided, k)
+            if v is not None:
+                merged[k] = v
+
+        # 身份字段强制以 output.json.contents 为准(即使 payload 传了也不采纳)
+        merged["aweme_id"] = str(content.get("aweme_id") or "").strip()
+        merged["title"] = str(content.get("title") or "").strip()
+        merged["author_nickname"] = str(content.get("author_nickname") or "").strip()
+
+        # 如果缺失 input_features,用 query 拆分补齐
+        if "input_features" not in merged or not merged.get("input_features"):
+            merged["input_features"] = input_features
+
+        normalized.append(_sanitize_row(merged))
+
+    # 保持稳定顺序:按 rank(若有)或 aweme_id
+    def _sort_key(r: Dict[str, Any]) -> Tuple[int, str]:
+        c = contents_by_aweme_id.get(str(r.get("aweme_id") or "").strip()) or {}
+        try:
+            rank = int(c.get("rank") or 0)
+        except Exception:
+            rank = 0
+        return (rank if rank > 0 else 10**9, str(r.get("aweme_id") or ""))
+
+    normalized.sort(key=_sort_key)
+    return {"rows": normalized}
 
 
 def _write_process_trace(*, trace_id: str, payload: Dict[str, Any]) -> Path:
     out_dir = _output_dir_path() / trace_id
     out_dir.mkdir(parents=True, exist_ok=True)
     path = out_dir / "process_trace.json"
-    doc = {
-        **payload,
-        "schema_version": "1.0",
-        "trace_id": trace_id,
-        "generated_at": datetime.now(timezone.utc).isoformat(),
-    }
+    # 输出格式收敛:只允许 {"rows": [...]}
+    doc = {"rows": payload.get("rows") or []}
     with path.open("w", encoding="utf-8") as f:
         json.dump(doc, f, ensure_ascii=False, indent=2)
     return path
@@ -226,26 +294,24 @@ def _write_process_trace(*, trace_id: str, payload: Dict[str, Any]) -> Path:
 
 @tool(
     description=(
-        "在**全部流程执行完毕之后**调用:把每条入选/候选内容的「选择策略」整理成表格 JSON,"
+        "在**全部流程执行完毕之后**调用:把每条最终入选内容的「选择策略」整理成表格 JSON,"
         "写入当前任务的 output 目录下的 process_trace.json,便于后续复盘。"
-        "参数 summary_json 为 JSON 字符串,可以是:"
-        "1)数组:每一项是一行记录;会被包成 {\"rows\": [...]};"
-        "2)对象:应包含 rows 字段,rows 为行列表。"
-        "建议每行至少包含:strategy_type(\"case_based\" | \"feature_based\")、"
-        "from_case_aweme_id / from_feature(来源 case 的选题点或特征)、"
-        "search_keyword(使用的搜索词)、"
-        "channel(\"search\" | \"author\" | \"ranking\" | \"other\" 等)、"
-        "decision_basis(如 \"demand_filtering\" | \"content_portrait\" | \"author_portrait\" | \"other\")、"
-        "decision_notes(自由文本补充原因)。"
+        "参数 summary_json 为 JSON 字符串,可以是数组或对象(对象需包含 rows)。"
+        "可选参数 log_path/log_text 用于传入本次运行日志(便于复盘留档/未来扩展)。"
     ),
 )
-async def exec_summary(trace_id: str, summary_json: str) -> ToolResult:
-    """
-    Args:
-        trace_id: 本次任务 trace_id(与 output.json 同目录)。
-        summary_json: JSON 字符串。对象或数组均可;数组会包成 {\"rows\": [...] }。
-    """
-    call_params = {"trace_id": trace_id, "summary_json": "<json>"}
+async def exec_summary(
+    trace_id: str,
+    summary_json: str,
+    log_path: str = "",
+    log_text: str = "",
+) -> ToolResult:
+    call_params = {
+        "trace_id": trace_id,
+        "summary_json": "<json>",
+        "log_path": (log_path or "").strip(),
+        "log_text": "<text>",
+    }
     tid = (trace_id or "").strip()
     if not tid:
         err = ToolResult(
@@ -289,7 +355,13 @@ async def exec_summary(trace_id: str, summary_json: str) -> ToolResult:
     out = ToolResult(
         title="过程摘要",
         output=f"已写入 {path}",
-        metadata={"ok": True, "trace_id": tid, "path": str(path)},
+        metadata={
+            "ok": True,
+            "trace_id": tid,
+            "path": str(path),
+            "log_path": (log_path or "").strip(),
+            "log_text_len": len((log_text or "").strip()),
+        },
     )
     log_tool_call(_LOG_LABEL, {"trace_id": tid}, format_tool_result_for_log(out))
     return out

+ 54 - 5
examples/content_finder/utils/log_capture.py

@@ -10,21 +10,61 @@ import io
 import sys
 import contextvars
 import threading
+from pathlib import Path
+from typing import TextIO, Union
 from contextlib import contextmanager
 
 # 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
-_current_build_id: contextvars.ContextVar[int | None] = contextvars.ContextVar(
-    'log_build_id', default=None
+# 兼容历史:可能是 int,也可能是 uuid 字符串
+BuildId = Union[int, str]
+_current_build_id: contextvars.ContextVar[BuildId | None] = contextvars.ContextVar(
+    "log_build_id", default=None
 )
 
 # build_id → StringIO buffer 的全局注册表(线程安全)
-_buffers: dict[int, io.StringIO] = {}
+_buffers: dict[BuildId, io.StringIO] = {}
 _buffers_lock = threading.Lock()
 
+# build_id → 追加写入的文件句柄(线程安全)
+_file_handles: dict[BuildId, TextIO] = {}
+_files_lock = threading.Lock()
+
 # 保存真实 stdout(进程启动时的值,不会被覆盖)
 _real_stdout = sys.stdout
 
 
+def attach_log_file(build_id: BuildId, file_path: str | Path) -> None:
+    """
+    绑定实时落盘文件:后续 log() 会在写 stdout/buffer 的同时 append 到该文件。
+
+    - 会自动创建父目录
+    - 会以 utf-8 追加模式打开
+    - 重复绑定同一个 build_id 会关闭旧句柄并替换
+    """
+    path = Path(file_path).expanduser()
+    path.parent.mkdir(parents=True, exist_ok=True)
+    fh = path.open("a", encoding="utf-8")
+    with _files_lock:
+        old = _file_handles.pop(build_id, None)
+        if old is not None:
+            try:
+                old.close()
+            except Exception:
+                pass
+        _file_handles[build_id] = fh
+
+
+def detach_log_file(build_id: BuildId) -> None:
+    """解除绑定并关闭文件句柄(若存在)。"""
+    with _files_lock:
+        fh = _file_handles.pop(build_id, None)
+    if fh is not None:
+        try:
+            fh.close()
+        except Exception:
+            pass
+
+
 def log(*args, **kwargs):
     """并发安全的日志函数,替代 print()。
 
@@ -40,10 +80,18 @@ def log(*args, **kwargs):
         buf = _buffers.get(build_id)
         if buf is not None:
             print(*args, file=buf, **kwargs)
+        fh = _file_handles.get(build_id)
+        if fh is not None:
+            try:
+                print(*args, file=fh, **kwargs)
+                fh.flush()
+            except Exception:
+                # 文件写入失败不应影响主流程
+                pass
 
 
 @contextmanager
-def build_log(build_id: int):
+def build_log(build_id: BuildId):
     """Agent 执行的日志上下文管理器。
 
     使用方式:
@@ -64,6 +112,7 @@ def build_log(build_id: int):
         # 清理
         with _buffers_lock:
             _buffers.pop(build_id, None)
+        detach_log_file(build_id)
         _current_build_id.reset(token)
         buf.close()
 
@@ -78,7 +127,7 @@ def log_fold(label: str):
         log("[/FOLD]")
 
 
-def get_log_content(build_id: int) -> str | None:
+def get_log_content(build_id: BuildId) -> str | None:
     """获取指定 build 当前已收集的日志内容(用于实时查看)"""
     buf = _buffers.get(build_id)
     return buf.getvalue() if buf else None