Browse Source

feat: 增加过程追踪和可视化展示

jihuaqiang 1 day ago
parent
commit
085c1a98ea

+ 21 - 5
examples/content_finder/content_finder.md

@@ -22,9 +22,10 @@ $system$
 - 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
 - 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
 - 作品画像获取:`get_content_fans_portrait`
 - 作品画像获取:`get_content_fans_portrait`
 - 作者画像获取:`get_account_fans_portrait`
 - 作者画像获取:`get_account_fans_portrait`
-- 过程记录:`think_and_plan`
-- 存储结果至数据库:`store_results_mysql`
-- 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
+ - 过程记录:`think_and_plan`
+ - 存储结果至数据库:`store_results_mysql`
+ - 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
+ - 过程策略表格(记录每条内容的选择依据):`exec_summary`(**最后一步**,见执行流程第 9 步)
 
 
 ## 重要约束
 ## 重要约束
 - 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
 - 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
@@ -45,7 +46,18 @@ $system$
 5. **输出阶段**:先按 `output_schema` 写入 `output.json`
 5. **输出阶段**:先按 `output_schema` 写入 `output.json`
 6. **Schema 校验阶段**:逐字段自检;不符合就重写 `output.json`
 6. **Schema 校验阶段**:逐字段自检;不符合就重写 `output.json`
 7. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
 7. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
-8. **接入平台阶段**:最后按 `aigc_platform_plan` 生成 AIGC 爬取计划
+8. **接入平台阶段**:按 `aigc_platform_plan` 生成 AIGC 爬取计划
+9. **过程摘要阶段(内容策略表格)**:在以上全部完成后,调用 `exec_summary(trace_id, summary_json)`,将**每条入选/候选内容的选择策略**整理成表格形式的 JSON,写入 `{output_dir}/{trace_id}/process_trace.json`。`summary_json` 必须是合法 JSON,可以是数组或对象:
+   - 如果是数组:每个元素代表一行记录,最终会被包成 `{ "rows": [...] }`
+   - 如果是对象:应包含 `rows` 字段,`rows` 为行列表
+   
+   建议每行至少包含以下字段(键名必须是英文):
+   - `strategy_type`: `"case_based"` 或 `"feature_based"`,表示是从高赞 case 出发还是从特征出发
+   - `from_case_aweme_id` / `from_case_point` / `from_feature`: 具体来源的 case 选题点或特征词
+   - `search_keyword`: 使用的搜索词
+   - `channel`: 获取方式,例如 `"search"`(搜索)、`"author"`(账号)、`"ranking"`(榜单,预留)、`"other"`
+   - `decision_basis`: 主要保留依据,例如 `"demand_filtering"`(需求理解阶段的筛选方案)、`"content_portrait"`(内容画像匹配)、`"author_portrait"`(作者画像匹配)、`"other"`
+   - `decision_notes`: 补充说明文本,用于解释本条内容为什么被保留/淘汰
 
 
 ## 强制要求(违反即为错误)
 ## 强制要求(违反即为错误)
 
 
@@ -55,7 +67,8 @@ $system$
 3. **特征分层归类**本质是对输入特征的筛选与重组,必须使用原词,不能联想新词;上/下层特征均来自实质特征,形式特征不参与上/下层细分。
 3. **特征分层归类**本质是对输入特征的筛选与重组,必须使用原词,不能联想新词;上/下层特征均来自实质特征,形式特征不参与上/下层细分。
 4. 当实质特征不为空时,必须满足:上层特征和下层特征不能同时为空,且应满足 `上层特征 ∪ 下层特征 = 实质特征`(允许同一原词在不同阶段被引用)。
 4. 当实质特征不为空时,必须满足:上层特征和下层特征不能同时为空,且应满足 `上层特征 ∪ 下层特征 = 实质特征`(允许同一原词在不同阶段被引用)。
 5. 不管下层特征是否具体,都需要调用**高赞case工具**,不能直接发起搜索,搜索词和输出字段**必须基于`get_video_topic`工具返回的metadata.videos字段**进行填充
 5. 不管下层特征是否具体,都需要调用**高赞case工具**,不能直接发起搜索,搜索词和输出字段**必须基于`get_video_topic`工具返回的metadata.videos字段**进行填充
-6. 此阶段必须输出下面的结构(举例)
+6. 所有`下层特征`的特征词必须根据**高赞视频选题点提取**的结果进行后续步骤,不需要再和原始的特征词关联。
+7. 此阶段必须输出下面的结构(举例)
 ```json
 ```json
 {
 {
   "特征归类": {
   "特征归类": {
@@ -121,6 +134,9 @@ $system$
 - `contents` 中入选视频是否在**入库成功后**已按 `aigc_platform_plan` 调用 `create_crawler_plan_by_douyin_content_id`?
 - `contents` 中入选视频是否在**入库成功后**已按 `aigc_platform_plan` 调用 `create_crawler_plan_by_douyin_content_id`?
 - **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
 - **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
 
 
+### 过程摘要是否已写入
+- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`,且 `summary_json` 以“表格”的形式逐条记录了入选/候选内容的选择策略(起点类型、来源 case/特征、搜索词、信道、保留依据等)?
+
 
 
 $user$
 $user$
 任务:找10个以「%query%」为特征的视频。
 任务:找10个以「%query%」为特征的视频。

+ 2 - 0
examples/content_finder/core.py

@@ -85,6 +85,7 @@ from tools import (
     think_and_plan,
     think_and_plan,
     find_authors_from_db,
     find_authors_from_db,
     get_video_topic,
     get_video_topic,
+    exec_summary,
 )
 )
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -179,6 +180,7 @@ async def run_agent(
         "create_crawler_plan_by_douyin_account_id",
         "create_crawler_plan_by_douyin_account_id",
         "think_and_plan",
         "think_and_plan",
         "get_video_topic",
         "get_video_topic",
+        "exec_summary",
     ]
     ]
 
 
     runner = AgentRunner(
     runner = AgentRunner(

+ 315 - 12
examples/content_finder/render_log_html.py

@@ -7,6 +7,8 @@
 
 
 from __future__ import annotations
 from __future__ import annotations
 
 
+import argparse
+import json
 import html
 import html
 import logging
 import logging
 import os
 import os
@@ -77,6 +79,13 @@ TOOL_DESCRIPTION_MAP: dict[str, str] = {
 INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
 INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
 # 设为 None 则默认生成到输入文件同名 .html
 # 设为 None 则默认生成到输入文件同名 .html
 OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None
 OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None
+# 产物输出目录(content_finder 的标准 output 目录)
+OUTPUT_DIR = os.getenv("OUTPUT_DIR", ".cache/output")
+# 置顶摘要表格数据源(可选)。不填则默认取 input_log 同目录下的 process_trace.json / output.json
+PROCESS_TRACE_PATH: str | None = os.getenv("PROCESS_TRACE_PATH") or None
+OUTPUT_JSON_PATH: str | None = os.getenv("OUTPUT_JSON_PATH") or None
+# 如果未显式指定 PROCESS_TRACE_PATH/OUTPUT_JSON_PATH,且同目录不存在文件,则尝试从该 trace_id 推导 .cache/output/{trace_id}/...
+TRACE_ID: str | None = os.getenv("TRACE_ID") or None
 # 是否默认折叠所有 [FOLD] 块
 # 是否默认折叠所有 [FOLD] 块
 COLLAPSE_ALL_FOLDS = False
 COLLAPSE_ALL_FOLDS = False
 # 命中这些前缀/关键词的折叠块默认收起
 # 命中这些前缀/关键词的折叠块默认收起
@@ -213,7 +222,162 @@ def render_node(
     return "".join(parts)
     return "".join(parts)
 
 
 
 
-def build_html(body: str, source_name: str) -> str:
+def _safe_str(v: object) -> str:
+    if v is None:
+        return ""
+    if isinstance(v, (str, int, float, bool)):
+        return str(v)
+    return json.dumps(v, ensure_ascii=False)
+
+
+def _truncate(s: str, max_len: int) -> str:
+    s = s or ""
+    if len(s) <= max_len:
+        return s
+    return s[: max(0, max_len - 1)] + "…"
+
+
+def _read_json_file(path: Path) -> dict:
+    return json.loads(path.read_text(encoding="utf-8"))
+
+
+def _build_aweme_id_to_video_url(output_json_path: Path) -> dict[str, str]:
+    """
+    从 output.json 的 contents[] 构建 {aweme_id: video_url} 映射。
+
+    约定:
+    - output.json 中每条 content 都包含 aweme_id 与 video_url(字符串)
+    """
+    data = _read_json_file(output_json_path)
+    contents = data.get("contents") or []
+    if not isinstance(contents, list):
+        return {}
+
+    mapping: dict[str, str] = {}
+    for item in contents:
+        if not isinstance(item, dict):
+            continue
+        aweme_id = _safe_str(item.get("aweme_id")).strip()
+        video_url = _safe_str(item.get("video_url")).strip()
+        if aweme_id and video_url:
+            mapping[aweme_id] = video_url
+    return mapping
+
+
+def _build_process_trace_table_html(*, process_trace_path: Path, output_json_path: Path) -> str:
+    """
+    生成置顶摘要表格。
+
+    数据来源:
+    - process_trace.json: rows[]
+    - output.json: contents[],按 aweme_id 补齐 video_url
+    """
+    if not process_trace_path.exists() or not output_json_path.exists():
+        return ""
+
+    try:
+        trace_data = _read_json_file(process_trace_path)
+    except Exception as e:
+        logger.warning("read process_trace.json failed: path=%s err=%s", process_trace_path, e)
+        return ""
+
+    rows = trace_data.get("rows") or []
+    if not isinstance(rows, list) or not rows:
+        return ""
+
+    aweme_to_url: dict[str, str] = {}
+    try:
+        aweme_to_url = _build_aweme_id_to_video_url(output_json_path)
+    except Exception as e:
+        logger.warning("read output.json failed: path=%s err=%s", output_json_path, e)
+
+    headers: list[tuple[str, str]] = [
+        ("input_features", "特征"),
+        ("aweme_id", "视频id"),
+        ("title", "标题"),
+        ("video_url", "视频链接"),
+        ("author_nickname", "作者"),
+        ("from_case_point", "参考点"),
+        ("strategy_type", "策略"),
+        ("channel", "渠道"),
+        ("search_keyword", "搜索词"),
+        ("decision_basis", "依据"),
+        ("decision_notes", "理由"),
+    ]
+
+    def td(text: str, *, muted: bool = False, title: str | None = None) -> str:
+        klass = "cell muted" if muted else "cell"
+        title_attr = f' title="{html.escape(title)}"' if title else ""
+        return f'<td class="{klass}"{title_attr}>{html.escape(text)}</td>'
+
+    body_parts: list[str] = []
+    for r in rows:
+        if not isinstance(r, dict):
+            continue
+        aweme_id = _safe_str(r.get("aweme_id")).strip()
+        video_url = aweme_to_url.get(aweme_id, "")
+
+        values: dict[str, str] = {
+            "strategy_type": _safe_str(r.get("strategy_type")),
+            "from_case_point": _safe_str(r.get("from_case_point")),
+            "search_keyword": _safe_str(r.get("search_keyword")),
+            "aweme_id": aweme_id,
+            "title": _safe_str(r.get("title")),
+            "author_nickname": _safe_str(r.get("author_nickname")),
+            "channel": _safe_str(r.get("channel")),
+            "decision_basis": _safe_str(r.get("decision_basis")),
+            "decision_notes": _safe_str(r.get("decision_notes")),
+            "input_features": _safe_str(r.get("input_features")),
+            "video_url": video_url,
+        }
+
+        tds: list[str] = []
+        for key, _label in headers:
+            val = values.get(key, "")
+            if key == "decision_notes":
+                full = val
+                val = _truncate(val, 80)
+                tds.append(td(val, title=full))
+                continue
+            if key == "title":
+                full = val
+                val = _truncate(val, 60)
+                tds.append(td(val, title=full))
+                continue
+            if key == "video_url":
+                if video_url:
+                    safe_url = html.escape(video_url, quote=True)
+                    tds.append(
+                        '<td class="cell link-cell">'
+                        f'<a class="link" href="{safe_url}" target="_blank" rel="noreferrer">打开</a>'
+                        "</td>"
+                    )
+                else:
+                    tds.append(td("", muted=True))
+                continue
+            tds.append(td(val))
+
+        body_parts.append("<tr>" + "".join(tds) + "</tr>")
+
+    if not body_parts:
+        return ""
+
+    thead = "".join(f"<th>{html.escape(label)}</th>" for _key, label in headers)
+    return (
+        '<div class="summary-panel">'
+        '<div class="summary-title">过程追踪摘要</div>'
+        f'<div class="summary-meta">{html.escape(process_trace_path.name)}</div>'
+        '<div class="table-wrap">'
+        '<table class="summary-table">'
+        f"<thead><tr>{thead}</tr></thead>"
+        f"<tbody>{''.join(body_parts)}</tbody>"
+        "</table>"
+        "</div>"
+        "</div>"
+    )
+
+
+def build_html(body: str, source_name: str, *, summary_table_html: str = "") -> str:
     return f"""<!doctype html>
     return f"""<!doctype html>
 <html lang="zh-CN">
 <html lang="zh-CN">
 <head>
 <head>
@@ -276,6 +440,80 @@ def build_html(body: str, source_name: str) -> str:
       border-radius: 10px;
       border-radius: 10px;
       padding: 10px;
       padding: 10px;
     }}
     }}
+    .summary-panel {{
+      background: rgba(255, 255, 255, 0.02);
+      border: 1px solid var(--border);
+      border-radius: 10px;
+      padding: 12px;
+      margin-bottom: 12px;
+    }}
+    .summary-title {{
+      font-size: 14px;
+      font-weight: 700;
+      margin-bottom: 2px;
+      letter-spacing: 0.2px;
+    }}
+    .summary-meta {{
+      color: var(--muted);
+      font-size: 12px;
+      margin-bottom: 10px;
+    }}
+    .table-wrap {{
+      overflow: auto;
+      border: 1px solid var(--border);
+      border-radius: 10px;
+      background: rgba(0, 0, 0, 0.12);
+    }}
+    table.summary-table {{
+      border-collapse: separate;
+      border-spacing: 0;
+      width: 100%;
+      min-width: 1100px;
+      font-size: 12px;
+    }}
+    table.summary-table thead th {{
+      position: sticky;
+      top: 0;
+      z-index: 1;
+      text-align: left;
+      padding: 10px 10px;
+      background: #111a2b;
+      border-bottom: 1px solid var(--border);
+      color: #cdd6e5;
+      white-space: nowrap;
+    }}
+    table.summary-table tbody tr {{
+      border-bottom: 1px solid rgba(255, 255, 255, 0.04);
+    }}
+    table.summary-table tbody tr:nth-child(2n) {{
+      background: rgba(255, 255, 255, 0.02);
+    }}
+    table.summary-table td.cell {{
+      padding: 9px 10px;
+      vertical-align: top;
+      border-bottom: 1px solid rgba(255, 255, 255, 0.04);
+      color: var(--text);
+      line-height: 1.35;
+    }}
+    table.summary-table td.muted {{
+      color: var(--muted);
+    }}
+    td.link-cell {{
+      white-space: nowrap;
+    }}
+    a.link {{
+      color: var(--accent);
+      text-decoration: none;
+      border: 1px solid rgba(110, 168, 254, 0.35);
+      padding: 2px 8px;
+      border-radius: 999px;
+      display: inline-block;
+      background: rgba(110, 168, 254, 0.08);
+    }}
+    a.link:hover {{
+      border-color: var(--accent);
+      background: rgba(110, 168, 254, 0.14);
+    }}
     details {{
     details {{
       margin: 6px 0;
       margin: 6px 0;
       border: 1px solid var(--border);
       border: 1px solid var(--border);
@@ -328,6 +566,7 @@ def build_html(body: str, source_name: str) -> str:
       <button id="expand-tools">展开全部工具调用</button>
       <button id="expand-tools">展开全部工具调用</button>
       <button id="collapse-tools">折叠全部工具调用</button>
       <button id="collapse-tools">折叠全部工具调用</button>
     </div>
     </div>
+    {summary_table_html}
     <div class="content">{body}</div>
     <div class="content">{body}</div>
   </div>
   </div>
   <script>
   <script>
@@ -359,7 +598,28 @@ def generate_html(
         collapse_keywords=collapse_keywords,
         collapse_keywords=collapse_keywords,
         collapse_all=collapse_all,
         collapse_all=collapse_all,
     )
     )
-    html_content = build_html(body=body, source_name=input_path.name)
+    if PROCESS_TRACE_PATH:
+        process_trace_path = resolve_config_path(PROCESS_TRACE_PATH)
+    else:
+        process_trace_path = input_path.with_name("process_trace.json")
+
+    if OUTPUT_JSON_PATH:
+        output_json_path = resolve_config_path(OUTPUT_JSON_PATH)
+    else:
+        output_json_path = input_path.with_name("output.json")
+
+    if TRACE_ID and (not process_trace_path.exists() or not output_json_path.exists()):
+        trace_dir = resolve_config_path(f".cache/output/{TRACE_ID}")
+        if not process_trace_path.exists():
+            process_trace_path = trace_dir / "process_trace.json"
+        if not output_json_path.exists():
+            output_json_path = trace_dir / "output.json"
+
+    summary_table_html = _build_process_trace_table_html(
+        process_trace_path=process_trace_path,
+        output_json_path=output_json_path,
+    )
+    html_content = build_html(body=body, source_name=input_path.name, summary_table_html=summary_table_html)
     output_path.parent.mkdir(parents=True, exist_ok=True)
     output_path.parent.mkdir(parents=True, exist_ok=True)
     output_path.write_text(html_content, encoding="utf-8")
     output_path.write_text(html_content, encoding="utf-8")
 
 
@@ -412,11 +672,40 @@ def render_log_html_and_upload(*, trace_id: str, log_file_path: Path) -> str | N
         return None
         return None
 
 
 
 
-def main() -> None:
-    input_base = resolve_config_path(INPUT_LOG_PATH)
+def _resolve_input_log_path_from_trace_id(*, trace_id: str, output_dir: Path) -> Path:
+    tid = (trace_id or "").strip()
+    if not tid:
+        raise ValueError("trace_id is required")
+
+    run_dir = (output_dir / tid).resolve()
+    if not run_dir.exists():
+        raise FileNotFoundError(f"OUTPUT_DIR 下未找到 trace_id 目录: {run_dir}")
+
+    log_path = run_dir / "log.txt"
+    if log_path.exists():
+        return log_path
+
+    # 兼容:部分任务可能用 run_log_*.txt 命名
+    candidates = sorted(
+        run_dir.glob("run_log_*.txt"),
+        key=lambda p: p.stat().st_mtime,
+        reverse=True,
+    )
+    if not candidates:
+        candidates = sorted(
+            run_dir.glob("*.txt"),
+            key=lambda p: p.stat().st_mtime,
+            reverse=True,
+        )
+    if not candidates:
+        raise FileNotFoundError(f"trace_id 目录下未找到可渲染日志文件: {run_dir}")
+    return candidates[0]
+
+
+def _resolve_input_log_path_from_input_base(input_base: Path) -> Path:
     if input_base.is_file():
     if input_base.is_file():
-        input_path = input_base
-    elif input_base.is_dir():
+        return input_base
+    if input_base.is_dir():
         # 优先渲染最新 run_log_*.txt,其次渲染任意 *.txt
         # 优先渲染最新 run_log_*.txt,其次渲染任意 *.txt
         candidates = sorted(
         candidates = sorted(
             input_base.glob("run_log_*.txt"),
             input_base.glob("run_log_*.txt"),
@@ -431,14 +720,28 @@ def main() -> None:
             )
             )
         if not candidates:
         if not candidates:
             raise FileNotFoundError(f"目录下未找到可渲染日志文件: {input_base}")
             raise FileNotFoundError(f"目录下未找到可渲染日志文件: {input_base}")
-        input_path = candidates[0]
-    else:
-        raise FileNotFoundError(f"输入日志路径不存在: {input_base}")
+        return candidates[0]
+    raise FileNotFoundError(f"输入日志路径不存在: {input_base}")
+
 
 
-    if OUTPUT_HTML_PATH:
-        output_path = resolve_config_path(OUTPUT_HTML_PATH)
+def main(argv: list[str] | None = None) -> None:
+    parser = argparse.ArgumentParser(description="Render run log text to collapsible HTML.")
+    parser.add_argument("--trace-id", dest="trace_id", default="", help="trace_id in OUTPUT_DIR/<trace_id>/")
+    parser.add_argument("trace_id_pos", nargs="?", default="", help="trace_id (positional), same as --trace-id")
+    args = parser.parse_args(argv)
+
+    trace_id = ((args.trace_id or "").strip() or (args.trace_id_pos or "").strip())
+    if trace_id:
+        output_dir = resolve_config_path(OUTPUT_DIR)
+        input_path = _resolve_input_log_path_from_trace_id(trace_id=trace_id, output_dir=output_dir)
+        output_path = input_path.with_name("log.html")
     else:
     else:
-        output_path = input_path.with_suffix(".html")
+        input_base = resolve_config_path(INPUT_LOG_PATH)
+        input_path = _resolve_input_log_path_from_input_base(input_base)
+        if OUTPUT_HTML_PATH:
+            output_path = resolve_config_path(OUTPUT_HTML_PATH)
+        else:
+            output_path = input_path.with_suffix(".html")
 
 
     generate_html(
     generate_html(
         input_path=input_path,
         input_path=input_path,

+ 1 - 0
examples/content_finder/server.py

@@ -245,6 +245,7 @@ async def create_task(request: TaskRequest):
                     "get_content_fans_portrait",
                     "get_content_fans_portrait",
                     "get_account_fans_portrait",
                     "get_account_fans_portrait",
                     "store_results_mysql",
                     "store_results_mysql",
+                    "exec_summary",
                 ]
                 ]
 
 
                 runner = AgentRunner(
                 runner = AgentRunner(

+ 1 - 1
examples/content_finder/skills/demand_analysis.md

@@ -47,7 +47,7 @@ description: 需求分析
    对每条内容的灵感点和`features`进行相关性判别,选出最贴合特征词的3条内容作为**最佳选题筛选结果**。
    对每条内容的灵感点和`features`进行相关性判别,选出最贴合特征词的3条内容作为**最佳选题筛选结果**。
   - 步骤2.2 选题点提取
   - 步骤2.2 选题点提取
    - 先根据步骤2.1的**最佳选题筛选结果**填充 **起点策略.高赞case_灵感点**,**起点策略.高赞case_目的点**,**起点策略.高赞case_关键点** 这些字段内容,注意直接使用原词填充。
    - 先根据步骤2.1的**最佳选题筛选结果**填充 **起点策略.高赞case_灵感点**,**起点策略.高赞case_目的点**,**起点策略.高赞case_关键点** 这些字段内容,注意直接使用原词填充。
-   - 用步骤2.1的所有**最佳选题筛选结果**里面的灵感点填充**起点策略.高赞case出发搜索词**字段,由灵感点扩展出的即时搜索词(允许 3-5 个同义/上下位词)
+   - 用步骤2.1的所有**最佳选题筛选结果**里面的灵感点填充**起点策略.高赞case出发搜索词**字段,由灵感点扩展出的即时搜索词(允许 3-5 个同义/上下位词),即时搜索词不允许和原始特征`features`一样。
    - 用步骤2.1的所有**最佳选题筛选结果**里面的目的点补充**筛选方案.目的点对齐规则**字段
    - 用步骤2.1的所有**最佳选题筛选结果**里面的目的点补充**筛选方案.目的点对齐规则**字段
    - 用步骤2.1的所有**最佳选题筛选结果**里面的关键点补充**筛选方案.关键点打分说明**字段
    - 用步骤2.1的所有**最佳选题筛选结果**里面的关键点补充**筛选方案.关键点打分说明**字段
 
 

+ 2 - 0
examples/content_finder/tools/__init__.py

@@ -11,6 +11,7 @@ from .aigc_platform_api import create_crawler_plan_by_douyin_content_id, create_
 from .think_and_plan import think_and_plan
 from .think_and_plan import think_and_plan
 from .find_authors_from_db import find_authors_from_db
 from .find_authors_from_db import find_authors_from_db
 from .get_video_topic import get_video_topic
 from .get_video_topic import get_video_topic
+from .exec_summary import exec_summary
 
 
 __all__ = [
 __all__ = [
     "douyin_search",
     "douyin_search",
@@ -24,4 +25,5 @@ __all__ = [
     "think_and_plan",
     "think_and_plan",
     "find_authors_from_db",
     "find_authors_from_db",
     "get_video_topic",
     "get_video_topic",
+    "exec_summary",
 ]
 ]

+ 295 - 0
examples/content_finder/tools/exec_summary.py

@@ -0,0 +1,295 @@
+"""
+在流程结束后写入**内容策略表格** JSON。
+
+输出路径:{OUTPUT_DIR}/{trace_id}/process_trace.json
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Sequence, Tuple
+
+from agent.tools import tool, ToolResult
+from utils.tool_logging import format_tool_result_for_log, log_tool_call
+
+_LOG_LABEL = "工具调用:exec_summary -> 写入过程 trace JSON"
+
+logger = logging.getLogger(__name__)
+
+
+def _output_dir_path() -> Path:
+    # 与 store_results_mysql / output.json 目录约定一致
+    return Path(os.getenv("OUTPUT_DIR", ".cache/output"))
+
+
+def _parse_payload(summary_json: str) -> Dict[str, Any]:
+    """
+    解析并规范化 LLM 传入的表格数据。
+
+    - 如果是数组:视为“表格行列表”,包成 {"rows": [...]}
+    - 如果是对象:直接返回(用于后续扩展字段)
+    """
+    data = json.loads(summary_json)
+    if isinstance(data, list):
+        return {"rows": data}
+    if not isinstance(data, dict):
+        raise ValueError("summary_json 解析后必须是 JSON 对象或数组")
+    return data
+
+
+def _split_input_features(raw: str) -> List[str]:
+    s = (raw or "").strip()
+    if not s:
+        return []
+    parts = s.replace(",", ",").split(",")
+    out: List[str] = []
+    for p in parts:
+        t = p.strip()
+        if t:
+            out.append(t)
+    return out
+
+
+def _load_output_json(*, trace_id: str) -> Optional[Dict[str, Any]]:
+    path = _output_dir_path() / trace_id / "output.json"
+    try:
+        with path.open("r", encoding="utf-8") as f:
+            data = json.load(f)
+    except FileNotFoundError:
+        return None
+    except Exception:
+        logger.warning("读取 output.json 失败: %s", str(path), exc_info=True)
+        return None
+    return data if isinstance(data, dict) else None
+
+
+def _extract_get_video_topic_videos(*, trace_id: str) -> List[Dict[str, Any]]:
+    """
+    从 log.txt 中提取 get_video_topic 的返回 metadata.videos(原始选题点)。
+
+    期望日志片段形态(render_log_html 同源格式):
+    [FOLD:🔧 工具调用:get_video_topic ...]
+    ...
+    [FOLD:📤 返回内容]
+    <json array>
+    [/FOLD]
+    """
+    log_path = _output_dir_path() / trace_id / "log.txt"
+    try:
+        text = log_path.read_text(encoding="utf-8")
+    except FileNotFoundError:
+        return []
+    except Exception:
+        logger.warning("读取 log.txt 失败: %s", str(log_path), exc_info=True)
+        return []
+
+    marker = "[FOLD:🔧 工具调用:get_video_topic"
+    start = text.find(marker)
+    if start < 0:
+        return []
+    snippet = text[start:]
+
+    out_marker = "[FOLD:📤 返回内容]"
+    out_start = snippet.find(out_marker)
+    if out_start < 0:
+        return []
+    after = snippet[out_start + len(out_marker) :]
+
+    json_start = after.find("[")
+    if json_start < 0:
+        return []
+    json_end = after.find("[/FOLD]")
+    if json_end < 0:
+        return []
+
+    raw = after[json_start:json_end].strip()
+    try:
+        parsed = json.loads(raw)
+    except Exception:
+        logger.warning("解析 get_video_topic 返回 JSON 失败", exc_info=True)
+        return []
+
+    return parsed if isinstance(parsed, list) else []
+
+
+def _flatten_case_points_text(video: Dict[str, Any]) -> str:
+    tp = video.get("选题点")
+    if not isinstance(tp, dict):
+        return ""
+    tokens: List[str] = []
+    for k in ("灵感点", "目的点", "关键点"):
+        v = tp.get(k)
+        if isinstance(v, list):
+            for x in v:
+                if isinstance(x, str) and x.strip():
+                    tokens.append(x.strip())
+    return " ".join(tokens)
+
+
+def _score_match(*, row_text: str, candidate_text: str) -> int:
+    """
+    简单可控的匹配评分:按“子串命中次数”计分,避免引入分词依赖。
+    """
+    rt = (row_text or "").strip()
+    ct = (candidate_text or "").strip()
+    if not rt or not ct:
+        return 0
+    score = 0
+    for token in _split_input_features(rt):
+        if token and token in ct:
+            score += 2
+    # 再做一次整体包含(更强信号)
+    if rt and rt in ct:
+        score += 3
+    return score
+
+
+def _pick_best_case_video(
+    *, row: Dict[str, Any], case_videos: Sequence[Dict[str, Any]]
+) -> Optional[Dict[str, Any]]:
+    if not case_videos:
+        return None
+    row_text = " ".join(
+        [
+            str(row.get("from_case_point") or ""),
+            str(row.get("search_keyword") or ""),
+            str(row.get("title") or ""),
+        ]
+    ).strip()
+    scored: List[Tuple[int, int]] = []
+    for i, v in enumerate(case_videos):
+        scored.append((_score_match(row_text=row_text, candidate_text=_flatten_case_points_text(v)), i))
+    scored.sort(reverse=True)
+    best_score, best_idx = scored[0]
+    # 低于 1 视为“不确定”,但仍给出一个稳定的默认(第一个)
+    if best_score <= 0:
+        return case_videos[0]
+    return case_videos[best_idx]
+
+
+def _normalize_payload(*, trace_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
+    rows = payload.get("rows")
+    if not isinstance(rows, list):
+        return payload
+
+    output_json = _load_output_json(trace_id=trace_id) or {}
+    input_features = _split_input_features(str(output_json.get("query") or ""))
+    case_videos = _extract_get_video_topic_videos(trace_id=trace_id)
+
+    normalized_rows: List[Any] = []
+    for item in rows:
+        if not isinstance(item, dict):
+            normalized_rows.append(item)
+            continue
+        row = dict(item)
+
+        # 1) 每条视频都体现原始输入特征词
+        if "input_features" not in row:
+            row["input_features"] = input_features
+
+        # 2) from_case_point:尽量输出“原始选题点信息”,而不是联想词
+        if "from_case_point" in row and case_videos:
+            original = _pick_best_case_video(row=row, case_videos=case_videos)
+            if isinstance(original, dict) and isinstance(original.get("选题点"), dict):
+                # 保留模型原先写的联想/归类结果,便于排查,但不作为主字段
+                if isinstance(row.get("from_case_point"), str) and row.get("from_case_point"):
+                    row["from_case_point_guess"] = row["from_case_point"]
+                row["from_case_point"] = original.get("选题点")
+                if "from_case_aweme_id" not in row:
+                    row["from_case_aweme_id"] = str(original.get("id") or "").strip() or None
+
+        normalized_rows.append(row)
+
+    out = dict(payload)
+    out["rows"] = normalized_rows
+    return out
+
+
+def _write_process_trace(*, trace_id: str, payload: Dict[str, Any]) -> Path:
+    out_dir = _output_dir_path() / trace_id
+    out_dir.mkdir(parents=True, exist_ok=True)
+    path = out_dir / "process_trace.json"
+    doc = {
+        **payload,
+        "schema_version": "1.0",
+        "trace_id": trace_id,
+        "generated_at": datetime.now(timezone.utc).isoformat(),
+    }
+    with path.open("w", encoding="utf-8") as f:
+        json.dump(doc, f, ensure_ascii=False, indent=2)
+    return path
+
+
+@tool(
+    description=(
+        "在**全部流程执行完毕之后**调用:把每条入选/候选内容的「选择策略」整理成表格 JSON,"
+        "写入当前任务的 output 目录下的 process_trace.json,便于后续复盘。"
+        "参数 summary_json 为 JSON 字符串,可以是:"
+        "1)数组:每一项是一行记录;会被包成 {\"rows\": [...]};"
+        "2)对象:应包含 rows 字段,rows 为行列表。"
+        "建议每行至少包含:strategy_type(\"case_based\" | \"feature_based\")、"
+        "from_case_aweme_id / from_feature(来源 case 的选题点或特征)、"
+        "search_keyword(使用的搜索词)、"
+        "channel(\"search\" | \"author\" | \"ranking\" | \"other\" 等)、"
+        "decision_basis(如 \"demand_filtering\" | \"content_portrait\" | \"author_portrait\" | \"other\")、"
+        "decision_notes(自由文本补充原因)。"
+    ),
+)
+async def exec_summary(trace_id: str, summary_json: str) -> ToolResult:
+    """
+    Args:
+        trace_id: 本次任务 trace_id(与 output.json 同目录)。
+        summary_json: JSON 字符串。对象或数组均可;数组会包成 {\"rows\": [...] }。
+    """
+    call_params = {"trace_id": trace_id, "summary_json": "<json>"}
+    tid = (trace_id or "").strip()
+    if not tid:
+        err = ToolResult(
+            title="过程摘要",
+            output="trace_id 不能为空",
+            metadata={"ok": False, "error": "empty trace_id"},
+        )
+        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
+        return err
+
+    try:
+        payload = _parse_payload(summary_json)
+    except json.JSONDecodeError as e:
+        err = ToolResult(
+            title="过程摘要",
+            output=f"summary_json 不是合法 JSON: {e}",
+            metadata={"ok": False, "error": str(e)},
+        )
+        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
+        return err
+    except ValueError as e:
+        err = ToolResult(
+            title="过程摘要",
+            output=str(e),
+            metadata={"ok": False, "error": str(e)},
+        )
+        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
+        return err
+
+    payload = _normalize_payload(trace_id=tid, payload=payload)
+
+    try:
+        path = _write_process_trace(trace_id=tid, payload=payload)
+    except OSError as e:
+        msg = f"写入 process_trace.json 失败: {e}"
+        logger.error(msg, exc_info=True)
+        err = ToolResult(title="过程摘要", output=msg, metadata={"ok": False, "error": str(e)})
+        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
+        return err
+
+    out = ToolResult(
+        title="过程摘要",
+        output=f"已写入 {path}",
+        metadata={"ok": True, "trace_id": tid, "path": str(path)},
+    )
+    log_tool_call(_LOG_LABEL, {"trace_id": tid}, format_tool_result_for_log(out))
+    return out