Forráskód Böngészése

feat: 流程优化

jihuaqiang 12 órája
szülő
commit
a1ff36d14d

+ 4 - 4
agent/core/prompts/compression.py

@@ -9,10 +9,10 @@
 COMPRESSION_PROMPT_TEMPLATE = """请对以上对话历史进行压缩总结。
 
 ### 摘要要求
-1. 保留关键决策、结论和产出(如创建的文件、修改的代码、得出的分析结论)
-2. 保留重要的上下文(如用户的要求、约束条件、之前的讨论结果)
-3. 省略中间探索过程、重复的工具调用细节
-4. 使用结构化格式(标题 + 要点 + 相关资源引用,若有)
+1. 寻找阶段保留内容寻找关键的过程,基于什么策略,case出发/特征出发,从哪个灵感点/特征词出发,搜索了哪个词,找到了哪些视频
+2. 筛选阶段保留关键的过程,执行了什么步骤,依据什么规则,筛选了哪些视频
+3. 对于已经淘汰的视频,无需进行记录。
+4. 省略中间探索过程、重复的工具调用细节
 5. 控制在 2000 字以内
 
 当前 GoalTree 状态:

+ 38 - 35
examples/content_finder/content_finder.md

@@ -4,7 +4,15 @@ temperature: 0.3
 ---
 
 $system$
-你是一个专业的内容寻找助手,帮助运营人员在抖音平台上寻找符合特征的视频内容。
+
+## 身份
+你是一个专业的内容寻找专家,擅长寻找符合[输入特征]的视频内容。
+
+## 寻找到的内容用于投放在下面的场景
+- 投放载体:微信小程序
+- 核心用户群:95% 是 50 岁以上中老年人
+- 增长方式:微信分享裂变
+- 核心指标:分享率、DAU
 
 ## 思考输出要求(非常重要)
 你在执行过程中,**必须在文本中主动输出你的思考和推理**,而不是只调用工具。具体要求:
@@ -16,47 +24,46 @@ $system$
 
 ## 可用工具(按目的)
 - 获取高赞视频的选题点: `get_video_topic`
-- 抖音视频搜索:`douyin_search`
+- 抖音视频搜索:`douyin_search`,失败后证明不可用,立即使用`douyin_search_tikhub`重试
 - 抖音视频搜索(Tikhub):`douyin_search_tikhub`
 - 订阅账号作品搜索:`douyin_user_videos`
-- 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
-- 批量画像(**筛选阶段优先**:一次调用拉齐多条内容画像并按规则账号兜底):`batch_fetch_portraits`(参数 `candidates_json` 为 JSON 数组字符串)。工具返回的同一条 `tool` 消息正文末尾会附带 `## metadata (JSON)`(含 `results`);同时会写入 `{OUTPUT_DIR}/{trace_id}/batch_portraits.json`,上下文丢失时用 `read_file` 读取该文件即可恢复结构化结果。
-- 作品画像(单条):`get_content_fans_portrait`
-- 作者画像(单条兜底):`get_account_fans_portrait`
+- 数据库作者检索(`query` 与 `demand_find_author.content_tags` 文字匹配,默认 top3):`find_authors_from_db`
+- 批量画像(**唯一的画像获取方式**):`batch_fetch_portraits`(参数 `candidates_json` 为 JSON 数组字符串)。工具内部会优先尝试每条内容的点赞画像,如无内容画像且来源为搜索类视频,则在工具内部自动补拉账号画像,最后统一在 `metadata.results` 中输出。
 - 过程记录:`think_and_plan`
+- 候选池增删改:`video_poll_action`
 - 存储结果至数据库:`store_results_mysql`
 - 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
-- 过程策略表格(记录每条内容的选择依据):`exec_summary`(**最后一步**,见执行流程第 9 步)
 
 ## 重要约束
 - **严格禁止**调用任何名称以 `browser_` 开头的浏览器工具
 - 每个结论都必须有工具调用证据。
 - Agent执行过程中会在 OUTPUT_DIR 下存储执行的log,当遇到上下文丢失的情况时可从该文件读取。
 
-## 运营人员平台背景
-- 平台载体:微信小程序
-- 核心用户群:95% 是 50 岁以上中老年人
-- 增长方式:微信分享裂变
-- 核心指标:分享率、DAU
-
 ## 执行流程(按顺序,禁止跳步)
-1. **需求理解阶段**: 按 `demand_analysis` 执行
-2. **内容寻找**:按 `content_finding_strategy` 执行
-3. **筛选阶段**:按 `content_filtering_strategy` 执行
-4. **优质账号扩展**: 对于**筛选阶段**获取到用户画像的优质作者,按`high_quality_analysis`执行
-5. **输出阶段**:先按 `output_schema` 写入 `output.json`
+1. **需求理解阶段**: 按 [demand_analysis] 执行
+2. **内容寻找**:按 [content_finding_strategy] 执行
+3. **筛选阶段**:按 [content_filtering_strategy] 执行
+4. **优质账号扩展**: 对于**筛选阶段**获取到用户画像的优质作者,按[high_quality_account]执行
+5. **输出阶段**:先按 [output_schema] 写入 `output.json`
 6. **Schema 校验阶段**:逐字段自检;不符合就重写 `output.json`
-7. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
-8. **接入平台阶段**:按 `aigc_platform_plan` 生成 AIGC 爬取计划
-9. **过程摘要阶段(内容策略表格)**:在以上全部完成后,按 `exec_summary_rows` 的要求生成 `summary_json`,并调用 `exec_summary(trace_id, summary_json, log_path)` tool(`log_path` 传入 `{output_dir}/{trace_id}/log.txt`),将**每条入选的选择策略**整理成表格形式的 JSON,写入 `{output_dir}/{trace_id}/process_trace.json`。
+7. **入库阶段**:仅在 Schema 校验通过后,调用 [store_results_mysql(trace_id)] 存储到远程数据库
+8. **接入平台阶段**:按 [aigc_platform_plan] 生成 AIGC 爬取计划
 
 ## 强制要求(违反即为错误)
 
 ### 寻找内容阶段
 1. 最多搜索次数:len(高赞case出发搜索词) * 2 + len(特征出发搜索词) * 2 , 即每个搜索词最多搜2页。
 2. 搜索阶段只能使用"高赞case出发搜索词" 和 "特征出发搜索词",**禁止扩展搜索词**
-3. **非常重要**: 达到**最多搜索次数**后即使不满足要求的输出数量也直接输出,不再继续扩展搜索。
-4. 对每个搜索词,先确定寻找策略优先级,再按优先级执行所有的策略,不能跳过订阅账号作品搜索的策略。
+3. 候选池的数量达到20条时,先进入筛选阶段。
+4. **非常重要**: 达到**最多搜索次数**后即使不满足要求的输出数量也直接输出,不再继续扩展搜索。
+5. 对每个搜索词,先确定寻找策略优先级,再按优先级执行所有的策略,不能跳过订阅账号作品搜索的策略。
+6. 对每条寻找到的内容,都冗余补充以下字段作为该条内容的过程记录
+  - `strategy_type`:寻找策略。值为[case出发] or [特征出发]。
+  - `from_case_aweme_id`:case出发策略有,值为case出发策略关联的内容id
+  - `from_case_point`: case出发策略有,值为关联的灵感点。
+  - `search_keyword`: 搜索词,该内容从哪个搜索词来。
+  - `channel`:暂时都写为抖音
+  - `find_way`: 寻找方式 "搜索" / "索引榜单搜索" / "垂类推荐流" / "订阅账号作品搜索"
 
 ### 需求理解阶段
 1. 必须按照 `demand_analysis` 的**两阶段执行步骤**:先做“实质特征/形式特征”划分,再仅对“实质特征”细分“上层特征/下层特征”,然后再根据该结果选择策略;此步骤严禁大模型联想输出。
@@ -90,15 +97,14 @@ $system$
 }
 ```
 
-### 筛选阶段必须按照 `content_filtering_strategy` 的步骤进行,对于**case出发**的搜索结果,满足6分即可输出不需要查看画像;其他结果按顺序查看画像
-1. **优先**对本轮待画像的候选列表调用一次 `batch_fetch_portraits`:在 `candidates_json` 中传入数组,每项含 `aweme_id`、可选 `author_sec_uid`;来自 `douyin_user_videos` 的条目设 `try_account_fallback: false`,来自 `douyin_search` / `douyin_search_tikhub` 的条目设 `true`(默认)。根据返回的 `metadata.results` 逐条读取 `content.has_portrait` / `account.has_portrait` 与 `portrait_data`(逻辑同原单条工具)。
-2. 仅当批量不适用(例如单条补拉)时,再使用 `get_content_fans_portrait`,检查 `metadata.has_portrait`;若 `has_portrait=False` 且来源为搜索类视频,再调用 `get_account_fans_portrait` 兜底;`douyin_user_videos` 来源不调用账号兜底。
-补充:`douyin_search` 失败后再调用 `douyin_search_tikhub` 作为兜底。
-3. **不允许跳过画像获取直接输出**
+### 筛选阶段必须按照 `content_filtering_strategy` 的步骤进行,
+**需要记录每条内容的`decision_basis`(筛选依据), 值为"基于case出发策略筛选"/"内容点赞用户画像"/"账号粉丝画像"/"其他".
+1. 对于**case出发**的搜索结果,满足6分即可输出不需要查看画像;其他结果按以下顺序查看画像
+2. 对本轮待筛选的候选列表**统一调用一次 `batch_fetch_portraits` 获取画像**:在 `candidates_json` 中传入数组,每项含 `aweme_id`、可选 `author_sec_uid`;来自 `douyin_user_videos` 的条目设 `try_account_fallback: false`,来自 `douyin_search` / `douyin_search_tikhub` 的条目设 `true`(默认)。工具内部会先尝试内容侧画像,如无内容画像且允许兜底,则自动补拉账号画像,并在 `metadata.results` 中统一输出 `content` / `account` 的 `has_portrait` 与 `portrait_data`。
 
-### 输出字段必须严格遵循 Schema
+### 输出字段必须严格遵循[output_schema]
 - 顶层字段只能有:`trace_id`、`query`、`demand_id`、`summary`、`good_account_expansion`、`contents`
-- 每条内容字段只能有:`title`、`aweme_id`、`rank`、`video_url`、`author_nickname`、`author_sec_uid`、`author_url`、`statistics`、`portrait_data`、`reason`
+- 每条内容字段只能有:`title`、`aweme_id`、`rank`、`video_url`、`author_nickname`、`author_sec_uid`、`author_url`、`statistics`、`portrait_data`、`reason`、`strategy_type`、`from_case_aweme_id`、`from_case_point`、`search_keyword`、`channel`、`find_way`、`decision_basis`
 - **禁止自创字段**(如 `results`、`metrics`、`tags`、`platform` 等)
 - **禁止使用中文 key**
 
@@ -120,10 +126,10 @@ $system$
 - **禁止**:未校验 Schema 就直接入库。
 
 ### 4.Schema 合规闸门(入库前必须通过)
-- 在调用 `store_results_mysql` 前,必须逐项核对 `output.json` 是否满足 `output_schema`;**不通过就先重写 JSON,不得入库**。
+- 在调用 `store_results_mysql` 前,必须逐项核对 `output.json` 是否满足 [output_schema];**不通过就先重写 JSON,不得入库**。
 - 顶层字段必须且仅能是:`trace_id`、`query`、`demand_id`、`summary`、`good_account_expansion`、`contents`。
-- `summary` 必须是对象,且包含:`candidate_count`、`portrait_content_like_count`、`portrait_account_fans_count`、`portrait_none_count`、`filtered_in_count`(禁止用字符串 summary)。
 - `good_account_expansion` 必须是对象:`{"enabled": <bool>, "accounts": [...]}`;`accounts` 每项字段必须是:`author_nickname`、`author_sec_uid`、`age_50_plus_ratio`、`age_50_plus_tgi`、`content_tags`(禁止 `account_name`、`sec_uid` 等别名)。
+- 每条内容字段只能且必须有:`title`、`aweme_id`、`rank`、`video_url`、`author_nickname`、`author_sec_uid`、`author_url`、`statistics`、`portrait_data`、`reason`、`strategy_type`、`from_case_aweme_id`、`from_case_point`、`search_keyword`、`channel`、`find_way`、`decision_basis`
 - 每条 `contents` 的 `statistics` 字段必须是:`digg_count`、`comment_count`、`share_count`(禁止 `likes` / `comments` / `shares`)。
 - 每条 `contents` 的 `portrait_data.source` 只允许:`content_like`、`account_fans`、`none`(禁止 `content`、`account` 等缩写)。
 - 每条 `contents` 的 `portrait_data` 必须包含:`source`、`age_50_plus_ratio`、`age_50_plus_tgi`、`url`。
@@ -133,9 +139,6 @@ $system$
 - `contents` 中入选视频是否在**入库成功后**已按 `aigc_platform_plan` 调用 `create_crawler_plan_by_douyin_content_id`?
 - **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
 
-### 6.过程摘要是否已写入
-- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`生成了每条视频的过程记录,尤其是case出发的策略,是否对每个内容关联了灵感点.
-
 
 $user$
 任务:找最多10个以「%query%」为特征的视频。

+ 1 - 5
examples/content_finder/core.py

@@ -86,13 +86,12 @@ from tools import (
     think_and_plan,
     find_authors_from_db,
     get_video_topic,
-    exec_summary,
 )
 
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "婆媳矛盾,反转式"
+DEFAULT_QUERY = "朱镕基,一百口棺材"
 DEFAULT_DEMAND_ID = 1
 
 
@@ -173,8 +172,6 @@ async def run_agent(
         "douyin_search",
         "douyin_search_tikhub",
         "douyin_user_videos",
-        "get_content_fans_portrait",
-        "get_account_fans_portrait",
         "batch_fetch_portraits",
         "find_authors_from_db",
         "store_results_mysql",
@@ -182,7 +179,6 @@ async def run_agent(
         "create_crawler_plan_by_douyin_account_id",
         "think_and_plan",
         "get_video_topic",
-        "exec_summary",
     ]
 
     runner = AgentRunner(

+ 35 - 3
examples/content_finder/db/store_results.py

@@ -2,7 +2,7 @@
 推荐结果写入(demand_find_author、demand_find_content_result 表)
 """
 import json
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
 
 from .connection import get_connection
 
@@ -20,6 +20,36 @@ def _normalize_content_tags(value: Any) -> str:
     return str(value)
 
 
+# demand_find_content_result.process_trace:由寻找过程字段拼成可读文本
+_PROCESS_TRACE_FIELDS: Tuple[Tuple[str, str], ...] = (
+    ("strategy_type", "寻找策略"),
+    ("from_case_aweme_id", "case内容id"),
+    ("from_case_point", "灵感点"),
+    ("search_keyword", "搜索词"),
+    ("channel", "渠道"),
+    ("find_way", "寻找方式"),
+)
+
+
+def _format_process_trace_text(item: Dict[str, Any]) -> str:
+    """将 contents 条目中与寻找过程相关的字段拼成多行文本写入 process_trace。"""
+    lines: List[str] = []
+    for key, label in _PROCESS_TRACE_FIELDS:
+        val = item.get(key)
+        if val is None:
+            continue
+        s = str(val).strip()
+        if not s:
+            continue
+        lines.append(f"{label}: {s}")
+    if lines:
+        return "\n".join(lines)
+    legacy = item.get("process_trace")
+    if legacy is None:
+        return ""
+    return str(legacy).strip()
+
+
 def upsert_good_authors(
     conn,
     trace_id: str,
@@ -121,12 +151,12 @@ def insert_contents(
       trace_id, query, rank_no, aweme_id, video_url, title, author_name, author_id, author_link,
       digg_count, comment_count, share_count,
       portrait_source, elderly_ratio, elderly_tgi, recommendation_reason,
-      demand_content_id, dt
+      demand_content_id, dt, channel, process_trace
     ) VALUES (
       %s, %s, %s, %s, %s, %s, %s, %s, %s,
       %s, %s, %s,
       %s, %s, %s, %s,
-      %s, %s
+      %s, %s, %s, %s
     )
     """
     with conn.cursor() as cur:
@@ -161,6 +191,8 @@ def insert_contents(
                     item.get("reason") or "",
                     demand_content_id,
                     dt,
+                    item.get("channel") or "",
+                    _format_process_trace_text(item),
                 ),
             )
             rows += cur.rowcount

+ 18 - 59
examples/content_finder/render_log_html.py

@@ -82,10 +82,9 @@ INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
 OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None
 # 产物输出目录(content_finder 的标准 output 目录)
 OUTPUT_DIR = os.getenv("OUTPUT_DIR", ".cache/output")
-# 置顶摘要表格数据源(可选)。不填则默认取 input_log 同目录下的 process_trace.json / output.json
-PROCESS_TRACE_PATH: str | None = os.getenv("PROCESS_TRACE_PATH") or None
+# 置顶摘要表格数据源:仅读 output.json(contents[])。不填则默认取 input_log 同目录下的 output.json
 OUTPUT_JSON_PATH: str | None = os.getenv("OUTPUT_JSON_PATH") or None
-# 如果未显式指定 PROCESS_TRACE_PATH/OUTPUT_JSON_PATH,且同目录不存在文件,则尝试从该 trace_id 推导 .cache/output/{trace_id}/...
+# 若未显式指定 OUTPUT_JSON_PATH 且同目录无文件,可设 TRACE_ID 推导 .cache/output/{trace_id}/output.json
 TRACE_ID: str | None = os.getenv("TRACE_ID") or None
 # 是否默认折叠所有 [FOLD] 块
 COLLAPSE_ALL_FOLDS = False
@@ -242,55 +241,26 @@ def _read_json_file(path: Path) -> dict:
     return json.loads(path.read_text(encoding="utf-8"))
 
 
-def _build_aweme_id_to_video_url(output_json_path: Path) -> dict[str, str]:
-    """
-    从 output.json 的 contents[] 构建 {aweme_id: video_url} 映射。
-
-    约定:
-    - output.json 中每条 content 都包含 aweme_id 与 video_url(字符串)
-    """
-    data = _read_json_file(output_json_path)
-    contents = data.get("contents") or []
-    if not isinstance(contents, list):
-        return {}
-
-    mapping: dict[str, str] = {}
-    for item in contents:
-        if not isinstance(item, dict):
-            continue
-        aweme_id = _safe_str(item.get("aweme_id")).strip()
-        video_url = _safe_str(item.get("video_url")).strip()
-        if aweme_id and video_url:
-            mapping[aweme_id] = video_url
-    return mapping
-
-
-def _build_process_trace_table_html(*, process_trace_path: Path, output_json_path: Path) -> str:
+def _build_output_summary_table_html(*, output_json_path: Path) -> str:
     """
     生成置顶摘要表格。
 
-    数据来源:
-    - process_trace.json: rows[]
-    - output.json: contents[],按 aweme_id 补齐 video_url
+    数据仅来自 output.json 的 contents[];decision_notes 对应每条 reason。
     """
-    if not process_trace_path.exists() or not output_json_path.exists():
+    if not output_json_path.exists():
         return ""
 
     try:
-        trace_data = _read_json_file(process_trace_path)
+        data = _read_json_file(output_json_path)
     except Exception as e:
-        logger.warning("read process_trace.json failed: path=%s err=%s", process_trace_path, e)
+        logger.warning("read output.json failed: path=%s err=%s", output_json_path, e)
         return ""
 
-    rows = trace_data.get("rows") or []
+    rows = data.get("contents") or []
     if not isinstance(rows, list) or not rows:
         return ""
 
-    aweme_to_url: dict[str, str] = {}
-    try:
-        aweme_to_url = _build_aweme_id_to_video_url(output_json_path)
-    except Exception as e:
-        logger.warning("read output.json failed: path=%s err=%s", output_json_path, e)
+    default_query = _safe_str(data.get("query"))
 
     headers: list[tuple[str, str]] = [
         ("input_features", "特征词"),
@@ -316,7 +286,7 @@ def _build_process_trace_table_html(*, process_trace_path: Path, output_json_pat
         if not isinstance(r, dict):
             continue
         aweme_id = _safe_str(r.get("aweme_id")).strip()
-        video_url = aweme_to_url.get(aweme_id, "")
+        video_url = _safe_str(r.get("video_url")).strip()
 
         values: dict[str, str] = {
             "strategy_type": _safe_str(r.get("strategy_type")),
@@ -327,8 +297,8 @@ def _build_process_trace_table_html(*, process_trace_path: Path, output_json_pat
             "author_nickname": _safe_str(r.get("author_nickname")),
             "channel": _safe_str(r.get("channel")),
             "decision_basis": _safe_str(r.get("decision_basis")),
-            "decision_notes": _safe_str(r.get("decision_notes")),
-            "input_features": _safe_str(r.get("input_features")),
+            "decision_notes": _safe_str(r.get("reason")),
+            "input_features": _safe_str(r.get("input_features")) or default_query,
             "video_url": video_url,
         }
 
@@ -366,8 +336,8 @@ def _build_process_trace_table_html(*, process_trace_path: Path, output_json_pat
     thead = "".join(f"<th>{html.escape(label)}</th>" for _key, label in headers)
     return (
         '<div class="summary-panel">'
-        '<div class="summary-title">过程追踪摘要</div>'
-        f'<div class="summary-meta">{html.escape(process_trace_path.name)}</div>'
+        '<div class="summary-title">内容寻找过程摘要</div>'
+        f'<div class="summary-meta">{html.escape(output_json_path.name)}</div>'
         '<div class="table-wrap">'
         '<table class="summary-table">'
         f"<thead><tr>{thead}</tr></thead>"
@@ -599,27 +569,16 @@ def generate_html(
         collapse_keywords=collapse_keywords,
         collapse_all=collapse_all,
     )
-    if PROCESS_TRACE_PATH:
-        process_trace_path = resolve_config_path(PROCESS_TRACE_PATH)
-    else:
-        process_trace_path = input_path.with_name("process_trace.json")
-
     if OUTPUT_JSON_PATH:
         output_json_path = resolve_config_path(OUTPUT_JSON_PATH)
     else:
         output_json_path = input_path.with_name("output.json")
 
-    if TRACE_ID and (not process_trace_path.exists() or not output_json_path.exists()):
+    if TRACE_ID and not output_json_path.exists():
         trace_dir = resolve_config_path(f".cache/output/{TRACE_ID}")
-        if not process_trace_path.exists():
-            process_trace_path = trace_dir / "process_trace.json"
-        if not output_json_path.exists():
-            output_json_path = trace_dir / "output.json"
-
-    summary_table_html = _build_process_trace_table_html(
-        process_trace_path=process_trace_path,
-        output_json_path=output_json_path,
-    )
+        output_json_path = trace_dir / "output.json"
+
+    summary_table_html = _build_output_summary_table_html(output_json_path=output_json_path)
     html_content = build_html(body=body, source_name=input_path.name, summary_table_html=summary_table_html)
     output_path.parent.mkdir(parents=True, exist_ok=True)
     output_path.write_text(html_content, encoding="utf-8")

+ 1 - 1
examples/content_finder/skills/content_finding_strategy.md

@@ -40,7 +40,7 @@ description: 内容寻找方法论
 **兜底策略**:`douyin_search` 失败或无结果时,使用 `douyin_search_tikhub`。
 
 ### 订阅账号作品搜索
-- 先调用 `find_authors_from_db(query)`:从数据库历史沉淀中按搜索词找到相关优质作者(返回 `author_sec_uid`)
+- 先调用 `find_authors_from_db(query)`:用 `query` 与历史表 `content_tags` 文字匹配,取匹配度最高的作者(默认最多 3 个,返回 `author_sec_uid`)
 - 再对 Top 作者调用 `douyin_user_videos(account_id=author_sec_uid)` 拉作品,作为候选池补充
 **仍需遵守数量控制**:作者扩展拿到的作品也计入候选数量,总量不要超过 **N = M × 2**。
 

+ 0 - 41
examples/content_finder/skills/exec_summary_rows.md

@@ -1,41 +0,0 @@
----
-name: exec_summary_rows
-description: 仅在需要写入 process_trace.json 时,用于记录最终输出的每条视频的寻找过程
----
-
-## 目标
-生成用于记录最终输出的每条视频的寻找过程的**summary_json**
-
-## 强约束(必须遵守)
-1. **视频选择**:只能对 `output.json.contents` 中出现的 `aweme_id` 生成 rows;不得输出任何不在 contents 的视频(包括淘汰候选/搜索过程中的视频)。
-2. **rows 数量必须等于 contents 数量**:一条入选内容必须对应且仅对应一行 row。
-3. **字段固定且统一**:每行 row 只允许包含下列 key(不得增删改名):
-   - `aweme_id`:视频id
-   - `title`:视频标题
-   - `author_nickname`:作者名称。
-   - `strategy_type`:寻找策略。"case出发" / "特征出发"。
-   - `from_case_aweme_id`:case出发策略关联的内容id
-   - `from_case_point`: case出发策略 关联的灵感点。
-   - `from_feature`: 特征出发 关联的特征词。
-   - `search_keyword`: 搜索词,该内容从哪个搜索词来。
-   - `channel`:寻找方式 "抖音搜索" / "索引榜单搜索" / "垂类推荐流" / "订阅账号作品搜索"
-   - `decision_basis`:筛选的方式 "基于case出发策略筛选" / "内容点赞用户画像" / "账号粉丝画像" / "其他"
-   - `decision_notes`:筛选的理由
-   - `input_features`: Agent起始输入的特征词
-4. **值使用中文枚举**
-5. **input_features**:必须是 `list[str]`;默认从 `output.json.query` 按逗号拆分得到(兼容中文逗号)。
-
-## 依据
-- `output.json`(必须读取并以 `contents` 为准)
-  - `query`:用于 `input_features` 拆分
-  - `contents[]`:每条入选内容,含 `aweme_id/title/author_nickname/reason/portrait_data.source` 等
-- `log.txt`:用于判断内容的 strategy_type(来自哪种策略)、from_case_point(来自哪个灵感点)、search_keyword(搜索词)与渠道等
-
-
-## summary_json输出格式(必须严格)
-只输出一个 JSON 对象(不要 Markdown、不要解释、不要多余文本):
-
-```json
-{"rows":[{...},{...}]}
-```
-

+ 10 - 5
examples/content_finder/skills/output_schema.md

@@ -1,10 +1,8 @@
 ---
 name: output_schema
-description: 输出结果指南
+description: output.json文件schema定义
 ---
 
-## 输出结果指南
-
 ### 输出目录
 输出 JSON 写入到output_dir目录下当次执行的 trace_id 目录内的 `output.json` 文件。
 **获取路径方式**:先调用 `get_current_context` 获取 `trace_id` 和 `output_dir`,再使用 `write_file` 写入 `{output_dir}/{trace_id}/output.json`。
@@ -41,6 +39,12 @@ description: 输出结果指南
     {
       "title": "<来自 metadata 的标题/desc>",
       "aweme_id": "内容id",
+      "strategy_type": "寻找策略。值为[case出发]or[特征出发]",
+      "from_case_aweme_id": "case出发策略有,值为case出发策略关联的内容id",
+      "from_case_point": "case出发策略有,值为关联的灵感点",
+      "search_keyword": "搜索词,该内容从哪个搜索词来",
+      "channel": "抖音",
+      "find_way": "寻找方式, 值可为[搜索]/[索引榜单搜索]/[垂类推荐流]/[订阅账号作品搜索]",
       "rank": 1,
       "video_url": "https://www.douyin.com/video/<aweme_id>",
       "author_nickname": "作者名",
@@ -53,10 +57,11 @@ description: 输出结果指南
       },
       "portrait_data": {
         "source": "content_like | account_fans | none",
-        "age_50_plus_ratio": null,
-        "age_50_plus_tgi": null,
+        "age_50_plus_ratio": "百分比格式",
+        "age_50_plus_tgi": "tgi值",
         "url": "画像链接"
       },
+      "decision_basis": "筛选的方式,取值为[基于case出发策略筛选]/[内容点赞用户画像]/[账号粉丝画像]/[其他]",
       "reason": "<入选理由>"
     }
   ]

+ 0 - 2
examples/content_finder/tools/__init__.py

@@ -15,7 +15,6 @@ from .aigc_platform_api import create_crawler_plan_by_douyin_content_id, create_
 from .think_and_plan import think_and_plan
 from .find_authors_from_db import find_authors_from_db
 from .get_video_topic import get_video_topic
-from .exec_summary import exec_summary
 
 __all__ = [
     "douyin_search",
@@ -30,5 +29,4 @@ __all__ = [
     "think_and_plan",
     "find_authors_from_db",
     "get_video_topic",
-    "exec_summary",
 ]

+ 13 - 1
examples/content_finder/tools/aigc_platform_api.py

@@ -27,7 +27,19 @@ def _log_aigc_return(label: str, params: Dict[str, Any], r: ToolResult) -> ToolR
     log_tool_call(label, params, format_tool_result_for_log(r))
     return r
 
-CAN_NOT_CREATE_PLAN = False
+
+def _env_bool(name: str, default: bool = False) -> bool:
+    """Read boolean from env; unset or empty uses default. truthy: 1/true/yes/on (case-insensitive)."""
+    raw = os.getenv(name)
+    if raw is None:
+        return default
+    s = raw.strip().lower()
+    if s == "":
+        return default
+    return s in ("1", "true", "yes", "on")
+
+
+CAN_NOT_CREATE_PLAN = _env_bool("CAN_NOT_CREATE_PLAN", False)
 
 AIGC_BASE_URL = "https://aigc-api.aiddit.com"
 CRAWLER_PLAN_CREATE_URL = f"{AIGC_BASE_URL}/aigc/crawler/plan/save"

+ 0 - 422
examples/content_finder/tools/exec_summary.py

@@ -1,422 +0,0 @@
-"""
-在流程结束后写入**内容策略表格** JSON,并回写 MySQL。
-
-输出路径:{OUTPUT_DIR}/{trace_id}/process_trace.json
-每条策略行另按 (trace_id, aweme_id) 更新 demand_find_content_result.process_trace(TEXT)。
-"""
-
-from __future__ import annotations
-
-import json
-import logging
-import os
-from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple
-
-from agent.tools import tool, ToolResult
-from utils.tool_logging import format_tool_result_for_log, log_tool_call
-
-from db import update_process_trace_by_aweme_id
-
-_LOG_LABEL = "工具调用:exec_summary -> 写入过程 trace JSON"
-
-logger = logging.getLogger(__name__)
-
-
-def _output_dir_path() -> Path:
-    # 与 store_results_mysql / output.json 目录约定一致
-    return Path(os.getenv("OUTPUT_DIR", ".cache/output"))
-
-
-def _parse_payload(summary_json: str) -> Dict[str, Any]:
-    """
-    解析并规范化 LLM 传入的表格数据。
-
-    - 如果是数组:视为“表格行列表”,包成 {"rows": [...]}
-    - 如果是对象:直接返回(用于后续扩展字段)
-    """
-    data = json.loads(summary_json)
-    if isinstance(data, list):
-        return {"rows": data}
-    if not isinstance(data, dict):
-        raise ValueError("summary_json 解析后必须是 JSON 对象或数组")
-    return data
-
-
-def _split_input_features(raw: str) -> List[str]:
-    s = (raw or "").strip()
-    if not s:
-        return []
-    parts = s.replace(",", ",").split(",")
-    out: List[str] = []
-    for p in parts:
-        t = p.strip()
-        if t:
-            out.append(t)
-    return out
-
-
-def _load_output_json(*, trace_id: str) -> Optional[Dict[str, Any]]:
-    path = _output_dir_path() / trace_id / "output.json"
-    try:
-        with path.open("r", encoding="utf-8") as f:
-            data = json.load(f)
-    except FileNotFoundError:
-        return None
-    except Exception:
-        logger.warning("读取 output.json 失败: %s", str(path), exc_info=True)
-        return None
-    return data if isinstance(data, dict) else None
-
-
-def _extract_contents(*, trace_id: str) -> List[Dict[str, Any]]:
-    """
-    从 output.json 读取最终入选 contents。
-
-    约定:
-    - 只允许对 output.json.contents 内的 aweme_id 生成/写入 process_trace rows
-    """
-    output_json = _load_output_json(trace_id=trace_id) or {}
-    contents = output_json.get("contents")
-    if not isinstance(contents, list):
-        return []
-    out: List[Dict[str, Any]] = []
-    for item in contents:
-        if isinstance(item, dict):
-            out.append(item)
-    return out
-
-
-def _map_strategy_type(value: Any) -> str:
-    v = str(value or "").strip()
-    if v in ("case_based", "case", "case出发"):
-        return "case出发"
-    if v in ("feature_based", "feature", "特征出发"):
-        return "特征出发"
-    return v
-
-
-def _map_channel(value: Any) -> str:
-    v = str(value or "").strip()
-    mapping = {
-        "search": "抖音搜索",
-        "author": "订阅账号",
-        "ranking": "榜单",
-        "other": "其他",
-        "抖音搜索": "抖音搜索",
-        "订阅账号": "订阅账号",
-        "榜单": "榜单",
-        "其他": "其他",
-    }
-    return mapping.get(v, v)
-
-
-def _map_decision_basis(value: Any) -> str:
-    v = str(value or "").strip()
-    mapping = {
-        "content_portrait": "内容画像匹配",
-        "author_portrait": "作者画像匹配",
-        "demand_filtering": "需求筛选",
-        "other": "其他",
-        "画像缺失": "画像缺失",
-        "内容画像匹配": "内容画像匹配",
-        "作者画像匹配": "作者画像匹配",
-        "需求筛选": "需求筛选",
-        "其他": "其他",
-    }
-    return mapping.get(v, v)
-
-
-def _infer_decision_basis_from_output_content(content: Dict[str, Any]) -> str:
-    portrait = content.get("portrait_data") or {}
-    source = str(portrait.get("source") or "").strip()
-    if source == "content_like":
-        return "内容画像匹配"
-    if source == "account_fans":
-        return "作者画像匹配"
-    if source == "none":
-        return "画像缺失"
-    return ""
-
-
-def _build_base_row(*, trace_id: str, content: Dict[str, Any], input_features: List[str], query: str) -> Dict[str, Any]:
-    return {
-        "trace_id": trace_id,
-        "aweme_id": str(content.get("aweme_id") or "").strip(),
-        "title": str(content.get("title") or "").strip(),
-        "author_nickname": str(content.get("author_nickname") or "").strip(),
-        "strategy_type": "",
-        "from_case_aweme_id": "",
-        "from_case_point": "",
-        "from_feature": "",
-        "search_keyword": str(query or "").strip(),
-        "channel": "抖音搜索",
-        "decision_basis": _infer_decision_basis_from_output_content(content),
-        "decision_notes": str(content.get("reason") or "").strip(),
-        "input_features": input_features,
-    }
-
-
-_ROW_KEYS: Tuple[str, ...] = (
-    "trace_id",
-    "aweme_id",
-    "title",
-    "author_nickname",
-    "strategy_type",
-    "from_case_aweme_id",
-    "from_case_point",
-    "from_feature",
-    "search_keyword",
-    "channel",
-    "decision_basis",
-    "decision_notes",
-    "input_features",
-)
-
-
-def _sanitize_row(row: Dict[str, Any]) -> Dict[str, Any]:
-    """只保留固定字段,并把枚举值规范成中文。"""
-    out: Dict[str, Any] = {k: row.get(k, "") for k in _ROW_KEYS}
-    out["strategy_type"] = _map_strategy_type(out.get("strategy_type"))
-    out["channel"] = _map_channel(out.get("channel"))
-    out["decision_basis"] = _map_decision_basis(out.get("decision_basis"))
-    # input_features 规范为 list[str]
-    feats = out.get("input_features")
-    if isinstance(feats, list):
-        out["input_features"] = [str(x).strip() for x in feats if str(x).strip()]
-    elif isinstance(feats, str):
-        out["input_features"] = _split_input_features(feats)
-    else:
-        out["input_features"] = []
-    return out
-
-
-def _normalize_payload(*, trace_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
-    # tool 只做最小职责:过滤/补全/规范化;复杂推理由 skill 生成 summary_json 来完成
-    raw_rows = payload.get("rows")
-    rows_in_payload: List[Dict[str, Any]] = []
-    if isinstance(raw_rows, list):
-        for item in raw_rows:
-            if isinstance(item, dict):
-                rows_in_payload.append(item)
-
-    output_json = _load_output_json(trace_id=trace_id) or {}
-    query = str(output_json.get("query") or "").strip()
-    input_features = _split_input_features(query)
-    contents = _extract_contents(trace_id=trace_id)
-    contents_by_aweme_id: Dict[str, Dict[str, Any]] = {
-        str(c.get("aweme_id") or "").strip(): c for c in contents if str(c.get("aweme_id") or "").strip()
-    }
-
-    # 先把 payload rows 归并到 aweme_id
-    payload_by_aweme_id: Dict[str, Dict[str, Any]] = {}
-    for r in rows_in_payload:
-        aweme_id = str(r.get("aweme_id") or r.get("awemeId") or "").strip()
-        if not aweme_id:
-            continue
-        payload_by_aweme_id[aweme_id] = dict(r)
-
-    # 只允许 payload 覆盖“策略/来源/解释”字段,避免覆盖 output.json.contents 的身份字段(title/author 等)
-    allowed_payload_keys: set[str] = {
-        "strategy_type",
-        "from_case_aweme_id",
-        "from_case_point",
-        "from_feature",
-        "search_keyword",
-        "channel",
-        "decision_basis",
-        "decision_notes",
-        "input_features",
-    }
-
-    # 兼容 payload 的常见别名/驼峰 key(模型输出不稳定时,尽量不丢信息)
-    alias_map: Dict[str, Tuple[str, ...]] = {
-        "strategy_type": ("strategy_type", "strategyType"),
-        "from_case_aweme_id": ("from_case_aweme_id", "fromCaseAwemeId", "case_aweme_id", "caseAwemeId"),
-        "from_case_point": ("from_case_point", "fromCasePoint", "case_point", "casePoint"),
-        "from_feature": ("from_feature", "fromFeature", "feature", "from_feature_name"),
-        "search_keyword": ("search_keyword", "searchKeyword", "keyword"),
-        "channel": ("channel", "source_channel", "sourceChannel", "source"),
-        "decision_basis": ("decision_basis", "decisionBasis"),
-        "decision_notes": ("decision_notes", "decisionNotes", "notes"),
-        "input_features": ("input_features", "inputFeatures"),
-    }
-
-    def _pick(provided: Dict[str, Any], key: str) -> Any:
-        for k in alias_map.get(key, (key,)):
-            if k in provided:
-                return provided.get(k)
-        return None
-
-    normalized: List[Dict[str, Any]] = []
-    for aweme_id, content in contents_by_aweme_id.items():
-        base = _build_base_row(trace_id=trace_id, content=content, input_features=input_features, query=query)
-        provided = payload_by_aweme_id.get(aweme_id) or {}
-
-        merged = dict(base)
-        # 只合并允许覆盖的字段
-        for k in allowed_payload_keys:
-            v = _pick(provided, k)
-            if v is not None:
-                merged[k] = v
-
-        # 身份字段强制以 output.json.contents 为准(即使 payload 传了也不采纳)
-        merged["aweme_id"] = str(content.get("aweme_id") or "").strip()
-        merged["title"] = str(content.get("title") or "").strip()
-        merged["author_nickname"] = str(content.get("author_nickname") or "").strip()
-
-        # 如果缺失 input_features,用 query 拆分补齐
-        if "input_features" not in merged or not merged.get("input_features"):
-            merged["input_features"] = input_features
-
-        normalized.append(_sanitize_row(merged))
-
-    # 保持稳定顺序:按 rank(若有)或 aweme_id
-    def _sort_key(r: Dict[str, Any]) -> Tuple[int, str]:
-        c = contents_by_aweme_id.get(str(r.get("aweme_id") or "").strip()) or {}
-        try:
-            rank = int(c.get("rank") or 0)
-        except Exception:
-            rank = 0
-        return (rank if rank > 0 else 10**9, str(r.get("aweme_id") or ""))
-
-    normalized.sort(key=_sort_key)
-    return {"rows": normalized}
-
-
-def _write_process_trace(*, trace_id: str, payload: Dict[str, Any]) -> Path:
-    out_dir = _output_dir_path() / trace_id
-    out_dir.mkdir(parents=True, exist_ok=True)
-    path = out_dir / "process_trace.json"
-    # 输出格式收敛:只允许 {"rows": [...]}
-    doc = {"rows": payload.get("rows") or []}
-    with path.open("w", encoding="utf-8") as f:
-        json.dump(doc, f, ensure_ascii=False, indent=2)
-    return path
-
-
-def _sync_process_trace_rows_to_mysql(*, trace_id: str, rows: List[Dict[str, Any]]) -> Dict[str, Any]:
-    """
-    将每条归一化后的策略行序列化为 JSON 文本,按 (trace_id, aweme_id) 更新 process_trace 与 channel。
-
-    channel 当前统一为「抖音」(与 process_trace.json 内 channel「抖音搜索」区分)。
-    表中无匹配行时 rowcount 为 0,计入 skipped。
-    """
-    updated = 0
-    skipped = 0
-    errors: List[str] = []
-    for row in rows:
-        aweme_id = str(row.get("aweme_id") or "").strip()
-        if not aweme_id:
-            skipped += 1
-            continue
-        text = json.dumps(row, ensure_ascii=False)
-        try:
-            n = update_process_trace_by_aweme_id(
-                trace_id=trace_id,
-                aweme_id=aweme_id,
-                process_trace_text=text,
-                channel="抖音",
-            )
-            if n > 0:
-                updated += 1
-            else:
-                skipped += 1
-        except Exception as e:
-            logger.warning(
-                "process_trace 回写 MySQL 失败 trace_id=%s aweme_id=%s: %s",
-                trace_id,
-                aweme_id,
-                e,
-                exc_info=True,
-            )
-            errors.append(f"{aweme_id}: {e}")
-    return {"updated": updated, "skipped": skipped, "errors": errors}
-
-
-@tool(
-    description=(
-        "在**全部流程执行完毕之后**调用:把每条最终入选内容的「选择策略」整理成表格 JSON,"
-        "写入当前任务的 output 目录下的 process_trace.json,便于后续复盘;"
-        "并将每一行策略 JSON 序列化为文本,按 trace_id + aweme_id 回写到 "
-        "demand_find_content_result.process_trace,并同步将 channel 字段设为「抖音」。"
-        "参数 summary_json 为 JSON 字符串,可以是数组或对象(对象需包含 rows)。"
-        "可选参数 log_path/log_text 用于传入本次运行日志(便于复盘留档/未来扩展)。"
-    ),
-)
-async def exec_summary(
-    trace_id: str,
-    summary_json: str,
-    log_path: str = "",
-    log_text: str = "",
-) -> ToolResult:
-    call_params = {
-        "trace_id": trace_id,
-        "summary_json": "<json>",
-        "log_path": (log_path or "").strip(),
-        "log_text": "<text>",
-    }
-    tid = (trace_id or "").strip()
-    if not tid:
-        err = ToolResult(
-            title="过程摘要",
-            output="trace_id 不能为空",
-            metadata={"ok": False, "error": "empty trace_id"},
-        )
-        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
-        return err
-
-    try:
-        payload = _parse_payload(summary_json)
-    except json.JSONDecodeError as e:
-        err = ToolResult(
-            title="过程摘要",
-            output=f"summary_json 不是合法 JSON: {e}",
-            metadata={"ok": False, "error": str(e)},
-        )
-        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
-        return err
-    except ValueError as e:
-        err = ToolResult(
-            title="过程摘要",
-            output=str(e),
-            metadata={"ok": False, "error": str(e)},
-        )
-        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
-        return err
-
-    payload = _normalize_payload(trace_id=tid, payload=payload)
-
-    try:
-        path = _write_process_trace(trace_id=tid, payload=payload)
-    except OSError as e:
-        msg = f"写入 process_trace.json 失败: {e}"
-        logger.error(msg, exc_info=True)
-        err = ToolResult(title="过程摘要", output=msg, metadata={"ok": False, "error": str(e)})
-        log_tool_call(_LOG_LABEL, call_params, format_tool_result_for_log(err))
-        return err
-
-    rows = payload.get("rows") or []
-    mysql_meta: Dict[str, Any]
-    try:
-        mysql_meta = _sync_process_trace_rows_to_mysql(trace_id=tid, rows=rows if isinstance(rows, list) else [])
-    except Exception as e:
-        logger.warning("process_trace 批量回写 MySQL 异常: %s", e, exc_info=True)
-        mysql_meta = {"updated": 0, "skipped": 0, "errors": [str(e)]}
-
-    out = ToolResult(
-        title="过程摘要",
-        output=f"已写入 {path};MySQL process_trace 已更新 {mysql_meta.get('updated', 0)} 条",
-        metadata={
-            "ok": True,
-            "trace_id": tid,
-            "path": str(path),
-            "log_path": (log_path or "").strip(),
-            "log_text_len": len((log_text or "").strip()),
-            "mysql_process_trace_updated": mysql_meta.get("updated", 0),
-            "mysql_process_trace_skipped": mysql_meta.get("skipped", 0),
-            "mysql_process_trace_errors": mysql_meta.get("errors") or [],
-        },
-    )
-    log_tool_call(_LOG_LABEL, {"trace_id": tid}, format_tool_result_for_log(out))
-    return out

+ 28 - 16
examples/content_finder/tools/find_authors_from_db.py

@@ -1,8 +1,9 @@
 """
-从数据库中按“搜索词 query”检索历史任务沉淀的优质作者(demand_find_author)。
+从 demand_find_author 中,用调用方传入的 query 与 content_tags 做文字匹配,
+按匹配度优先返回作者(sec_uid / 链接)。
 
 用途:
-- 先用该工具找到相关作者(sec_uid / 链接)
+- 先用该工具找到相关作者
 - 再调用 douyin_user_videos(account_id=sec_uid) 获取其作品做二次筛选
 """
 
@@ -10,14 +11,14 @@ from __future__ import annotations
 
 import json
 import re
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List
 
 from agent.tools import ToolResult, tool
 from utils.tool_logging import format_tool_result_for_log, log_tool_call
 
 from db import get_connection
 
-_LOG_LABEL = "工具调用:find_authors_from_db -> 数据库按搜索词检索优质作者"
+_LOG_LABEL = "工具调用:find_authors_from_db -> 按 content_tags 匹配优质作者"
 
 
 _DOUYIN_USER_URL_RE = re.compile(r"^https?://www\.douyin\.com/user/(?P<sec_uid>[^/?#]+)")
@@ -35,7 +36,8 @@ def _query_authors(conn, query: str, limit: int) -> List[Dict[str, Any]]:
     if not q:
         return []
 
-    # demand_find_author 本身不存 query,需要通过 trace_id 关联 demand_find_content_result.query
+    # 仅用 query 与 content_tags 文字匹配;匹配度:全等 > 前缀匹配 > 子串匹配,再按标签更短、画像字段排序
+    # content_tags 必须出现在 SELECT 中:MySQL 下 DISTINCT + ORDER BY 引用列需一致(否则 3065)
     sql = """
     SELECT DISTINCT
       a.author_name,
@@ -43,27 +45,37 @@ def _query_authors(conn, query: str, limit: int) -> List[Dict[str, Any]]:
       a.elderly_ratio,
       a.elderly_tgi,
       a.remark,
-      a.trace_id
+      a.trace_id,
+      a.content_tags
     FROM demand_find_author a
-    INNER JOIN demand_find_content_result r
-      ON r.trace_id = a.trace_id
-    WHERE r.query LIKE %s
-    ORDER BY a.elderly_ratio DESC, a.elderly_tgi DESC
+    WHERE a.content_tags IS NOT NULL
+      AND TRIM(a.content_tags) <> ''
+      AND a.content_tags LIKE %s
+    ORDER BY
+      CASE
+        WHEN a.content_tags = %s THEN 0
+        WHEN a.content_tags LIKE %s THEN 1
+        ELSE 2
+      END ASC,
+      CHAR_LENGTH(a.content_tags) ASC,
+      a.elderly_ratio DESC,
+      a.elderly_tgi DESC
     LIMIT %s
     """
-    like = f"%{q}%"
+    like_contains = f"%{q}%"
+    like_prefix = f"{q}%"
     with conn.cursor() as cur:
-        cur.execute(sql, (like, int(limit)))
+        cur.execute(sql, (like_contains, q, like_prefix, int(limit)))
         rows = cur.fetchall() or []
         return [dict(r) for r in rows]
 
 
-@tool(description="从 demand_find_author 中按搜索词查找相关作者")
-async def find_authors_from_db(query: str, limit: int = 20) -> ToolResult:
+@tool(description="从优质作者库中按搜索词匹配查找作者")
+async def find_authors_from_db(query: str, limit: int = 3) -> ToolResult:
     """
     Args:
-        query: 搜索词(与历史 demand_find_content_result.query 模糊匹配
-        limit: 返回作者数量上限
+        query: 与 content_tags 做匹配的关键词(子串匹配;匹配度优先:全等、前缀、包含
+        limit: 返回作者数量上限(默认 3)
     """
     call_params = {"query": query, "limit": limit}
     conn = get_connection()

+ 1 - 1
examples/content_finder/tools/get_video_topic.py

@@ -183,7 +183,7 @@ def _query_points_by_post_ids(conn, post_ids: List[str]) -> Dict[str, JsonDict]:
     return result
 
 
-@tool(description="根据特征匹配高赞视频,并返回每个视频的灵感点/目的点/关键点列表(当前占位返回空)")
+@tool(description="根据特征匹配高赞视频,并返回每个视频的灵感点/目的点/关键点列表")
 async def get_video_topic(
     features: str = "",
     limit: int = 20,

+ 2 - 50
examples/content_finder/tools/hotspot_profile.py

@@ -22,44 +22,6 @@ _LABEL_CONTENT = "工具调用:get_content_fans_portrait -> 内容点赞用户
 _LABEL_BATCH = "工具调用:batch_fetch_portraits -> 批量获取内容/账号画像(热点宝)"
 
 BATCH_MAX_ITEMS = 30
-_BATCH_SNAPSHOT_NAME = "batch_portraits.json"
-
-
-def _repo_root_from_this_file() -> Path:
-    # examples/content_finder/tools/hotspot_profile.py -> Agent 仓库根
-    return Path(__file__).resolve().parents[3]
-
-
-def _resolve_output_dir_path() -> Path:
-    raw = (os.getenv("OUTPUT_DIR") or ".cache/output").strip()
-    p = Path(raw).expanduser()
-    return p.resolve() if p.is_absolute() else (_repo_root_from_this_file() / p).resolve()
-
-
-def _persist_batch_portraits_json(
-    trace_id: Optional[str],
-    results: List[Dict[str, Any]],
-    count: int,
-) -> Optional[str]:
-    """将批量画像结果写入 OUTPUT_DIR/<trace_id>/batch_portraits.json,便于 read_file 与排障。"""
-    if not trace_id:
-        return None
-    try:
-        out_dir = _resolve_output_dir_path() / trace_id
-        out_dir.mkdir(parents=True, exist_ok=True)
-        path = out_dir / _BATCH_SNAPSHOT_NAME
-        path.write_text(
-            json.dumps(
-                {"trace_id": trace_id, "count": count, "results": results},
-                ensure_ascii=False,
-                indent=2,
-            ),
-            encoding="utf-8",
-        )
-        return str(path)
-    except OSError as e:
-        logger.warning("batch portrait snapshot write failed: %s", e)
-        return None
 
 
 ACCOUNT_FANS_PORTRAIT_API = "http://crawapi.piaoquantv.com/crawler/dou_yin/re_dian_bao/account_fans_portrait"
@@ -485,7 +447,7 @@ async def get_content_fans_portrait(
     description=(
         "批量获取多条候选视频的画像:工具内依次请求内容点赞画像;"
         "若无画像且允许兜底则再请求作者粉丝画像。一次调用返回所有条目,减少对话轮次。"
-        "完整结构化结果在同一条 tool 消息的 metadata JSON 中,并写入 OUTPUT_DIR/<trace_id>/batch_portraits.json。"
+        "完整结构化结果在同一条 tool 消息的 metadata JSON 中"
     ),
     hidden_params=["context"],
 )
@@ -694,19 +656,12 @@ async def batch_fetch_portraits(
         tid = context.get("trace_id")
         if isinstance(tid, str) and tid.strip():
             trace_id = tid.strip()
-    snapshot_path = _persist_batch_portraits_json(trace_id, results, len(results))
 
     out_display = (os.getenv("OUTPUT_DIR") or ".cache/output").strip()
-    rel_hint = (
-        f"{out_display}/{trace_id}/{_BATCH_SNAPSHOT_NAME}"
-        if trace_id
-        else f"{out_display}/<trace_id>/{_BATCH_SNAPSHOT_NAME}"
-    )
     meta_hint = (
         "\n\n本条 tool 消息在标题与摘要后附有 ## metadata (JSON),其中 results 含每条 "
         "content/account 的 has_portrait 与 portrait_data;若上下文被压缩,可用 read_file 读取:"
-        f" {rel_hint}"
-        + (f"(本机路径: {snapshot_path})" if snapshot_path else "")
+        f" {out_display}/{trace_id}/output.json"
     )
     output_body = full_text + meta_hint
 
@@ -716,7 +671,6 @@ async def batch_fetch_portraits(
             "count": len(results),
             "candidates": len(parsed),
             "trace_id": trace_id,
-            "snapshot_path": snapshot_path,
         },
     )
 
@@ -724,8 +678,6 @@ async def batch_fetch_portraits(
         "results": results,
         "count": len(results),
     }
-    if snapshot_path:
-        meta["snapshot_path"] = snapshot_path
 
     return _log_return(
         _LABEL_BATCH,

+ 1 - 1
examples/content_finder/tools/store_results_mysql.py

@@ -35,7 +35,7 @@ def _load_output(trace_id: str) -> Dict[str, Any]:
         return json.load(f)
 
 
-@tool(description="将推荐结果写入 MySQL")
+@tool(description="将内容寻找结果写入 MySQL")
 async def store_results_mysql(trace_id: str) -> ToolResult:
     """
     根据 trace_id 读取 output.json,并写入 MySQL。