Explorar o código

feat: 日志以及可视化调整

jihuaqiang hai 1 día
pai
achega
03997f8b73

+ 10 - 0
agent/core/runner.py

@@ -311,6 +311,16 @@ class AgentRunner:
             raise ValueError("llm_call function not provided")
 
         config = config or RunConfig()
+
+        disable_side_branches = os.getenv("AGENT_DISABLE_SIDE_BRANCHES", "").lower() in {
+            "1",
+            "true",
+            "yes",
+            "on",
+        }
+        if disable_side_branches:
+            # 外部强制禁用侧分支:压缩/反思都不进入
+            config.force_side_branch = None
         trace = None
 
         try:

+ 18 - 1
examples/content_finder/content_finder.prompt

@@ -6,9 +6,26 @@ temperature: 0.3
 $system$
 你是一个专业的内容寻找助手,帮助运营人员在抖音平台上寻找符合要求的视频内容。
 
+## 思考输出要求(非常重要)
+你在执行过程中,**必须在文本中主动输出你的思考和推理**,而不是只调用工具。具体要求:
+1. **行动前先说理由**:每次调用工具之前,先用 1-2 句话说明你为什么要调这个工具、你期望从中得到什么信息、你当前的思路是什么。
+2. **拿到结果后立刻分析**:工具返回数据后,立即输出你对结果的解读——数据说明了什么?有哪些关键发现?是否符合预期?是否需要调整策略?
+3. **阶段性总结**:每个阶段结束时,输出一段简要总结:本阶段做了什么、得到了哪些关键结论、对下一步有什么影响。
+4. **决策透明化**:当你做出筛选/保留/淘汰决策时,必须在文本中明确说明理由(如"视频点赞用户画像和老年群体不匹配,50+用户点赞占比仅5%,故淘汰")。
+5. **`think_and_plan` 用于结构化记录**:`think_and_plan` 仍然用于记录计划和关键节点,但它不能替代你在对话中直接输出的思考文本。两者互补,缺一不可。
+
+## 可用工具(按目的)
+- 抖音视频搜索:`douyin_search`
+- 抖音作者作品搜索:`douyin_user_videos`
+- 数据库作者检索(按搜索词找历史优质作者):`find_authors_from_db`
+- 作品画像获取:`get_content_fans_portrait`
+- 作者画像获取:`get_account_fans_portrait`
+- 过程记录:`think_and_plan`
+- 存储结果至数据库:`store_results_mysql`
+- 创建aigc计划:`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
+
 ## 重要约束
 - 只在抖音平台搜索,不要切换到其他平台(小红书、B站等)
-- 可用工具:`douyin_search`、`douyin_user_videos`、`get_content_fans_portrait`、`get_account_fans_portrait`、`store_results_mysql`、`create_crawler_plan_by_douyin_content_id`、`create_crawler_plan_by_douyin_account_id`
 - **严格禁止**调用任何名称以 `browser_` 开头的浏览器工具
 
 ## 平台背景

+ 97 - 72
examples/content_finder/core.py

@@ -10,11 +10,26 @@ import sys
 import os
 from pathlib import Path
 from typing import Optional, Dict, Any
+from utils.log_capture import build_log, log
+from datetime import datetime
+import uuid
+
+
+def _resolve_input_log_dir(content_finder_root: Path) -> Path:
+    """与 .env 中 INPUT_LOG_PATH 一致:目录;相对路径相对 content_finder 根目录。"""
+    raw = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
+    p = Path(raw).expanduser()
+    if p.is_absolute():
+        return p if not p.suffix else p.parent
+    return (content_finder_root / p).resolve()
 
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
 from dotenv import load_dotenv
+
 load_dotenv()
+# 保证从仓库根目录运行时也能读到 content_finder 下的 .env(INPUT_LOG_PATH 等)
+load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=True)
 
 from agent import (
     AgentRunner,
@@ -36,15 +51,31 @@ from tools import (
     create_crawler_plan_by_douyin_content_id,
     create_crawler_plan_by_douyin_account_id,
     store_results_mysql,
+    think_and_plan,
+    find_authors_from_db,
 )
 
 logger = logging.getLogger(__name__)
 
 # 默认搜索词
-DEFAULT_QUERY = "戏曲表演"
+DEFAULT_QUERY = "伟人功绩"
 DEFAULT_DEMAND_ID = 1
 
 
+def extract_assistant_text(message: Message) -> str:
+    if message.role != "assistant":
+        return ""
+    content = message.content
+    if isinstance(content, str):
+        return content
+    if isinstance(content, dict):
+        text = content.get("text", "")
+        # 即使本轮包含工具调用,也打印模型给出的文本,便于观察每一步输出
+        if text:
+            return text
+    return ""
+
+
 async def run_agent(
     query: Optional[str] = None,
     demand_id: Optional[int] = None,
@@ -72,12 +103,18 @@ async def run_agent(
     prompt_path = Path(__file__).parent / "content_finder.prompt"
     prompt = SimplePrompt(prompt_path)
 
-    # output 目录
+    # output 目录(相对路径相对 content_finder)
+    content_finder_root = Path(__file__).resolve().parent
     output_dir = os.getenv("OUTPUT_DIR", ".cache/output")
+    output_dir_path = Path(output_dir).expanduser()
+    if not output_dir_path.is_absolute():
+        output_dir_path = (content_finder_root / output_dir_path).resolve()
 
     # 构建消息(替换 %query%、%output_dir%、%demand_id%)
     demand_id_str = str(demand_id) if demand_id is not None else ""
-    messages = prompt.build_messages(query=query, output_dir=output_dir, demand_id=demand_id_str)
+    messages = prompt.build_messages(
+        query=query, output_dir=str(output_dir_path), demand_id=demand_id_str
+    )
 
     # 初始化配置
     api_key = os.getenv("OPEN_ROUTER_API_KEY")
@@ -101,9 +138,11 @@ async def run_agent(
         "douyin_user_videos",
         "get_content_fans_portrait",
         "get_account_fans_portrait",
+        "find_authors_from_db",
         "store_results_mysql",
         "create_crawler_plan_by_douyin_content_id",
         "create_crawler_plan_by_douyin_account_id",
+        "think_and_plan",
     ]
 
     runner = AgentRunner(
@@ -119,81 +158,67 @@ async def run_agent(
         max_iterations=max_iterations,
         tools=allowed_tools,
         extra_llm_params={"max_tokens": 8192},
-        knowledge=KnowledgeConfig(
-            enable_extraction=True,
-            enable_completion_extraction=True,
-            enable_injection=True,
-            owner="content_finder_agent",
-            default_tags={"project": "content_finder"},
-            default_scopes=["com.piaoquantv.supply"],
-            default_search_types=["tool", "usecase", "definition"],
-            default_search_owner="content_finder_agent"
-        )
+        # knowledge=KnowledgeConfig(
+        #     enable_extraction=True,
+        #     enable_completion_extraction=True,
+        #     enable_injection=True,
+        #     owner="content_finder_agent",
+        #     default_tags={"project": "content_finder"},
+        #     default_scopes=["com.piaoquantv.supply"],
+        #     default_search_types=["tool", "usecase", "definition"],
+        #     default_search_owner="content_finder_agent"
+        # )
     )
 
     # 执行
     trace_id = None
+    execution_id = str(uuid.uuid4())
 
     try:
-        async for item in runner.run(messages=messages, config=config):
-            if isinstance(item, Trace):
-                trace_id = item.trace_id
-
-                if item.status == "completed":
-                    logger.info(f"Agent 执行完成: trace_id={trace_id}")
-                    return {
-                        "trace_id": trace_id,
-                        "status": "completed"
-                    }
-                elif item.status == "failed":
-                    logger.error(f"Agent 执行失败: {item.error_message}")
-                    return {
-                        "trace_id": trace_id,
-                        "status": "failed",
-                        "error": item.error_message
-                    }
-
-            elif isinstance(item, Message) and stream_output:
-                # 流式输出(仅 run.py 需要)
-                if item.role == "assistant":
-                    content = item.content
-                    if isinstance(content, dict):
-                        text = content.get("text", "")
-                        tool_calls = content.get("tool_calls", [])
-
-                        if text:
-                            # 如果有推荐结果,完整输出
-                            if len(text) > 500 and ("推荐结果" in text or "推荐内容" in text or "🎯" in text):
-                                print(f"\n{text}")
-                            # 如果有工具调用且文本较短,只输出摘要
-                            elif tool_calls and len(text) > 100:
-                                print(f"[思考] {text[:100]}...")
-                            # 其他情况输出完整文本
-                            else:
-                                print(f"\n{text}")
-
-                        # 输出工具调用信息
-                        if tool_calls:
-                            for tc in tool_calls:
-                                tool_name = tc.get("function", {}).get("name", "unknown")
-                                # 跳过 goal 工具的输出,减少噪音
-                                if tool_name != "goal":
-                                    print(f"[工具] {tool_name}")
-                    elif isinstance(content, str) and content:
-                        print(f"\n{content}")
-
-                elif item.role == "tool":
-                    content = item.content
-                    if isinstance(content, dict):
-                        tool_name = content.get("tool_name", "unknown")
-                        print(f"[结果] {tool_name} ✓")
-
-        # 如果循环结束但没有返回,说明异常退出
-        return {
-            "trace_id": trace_id,
-            "status": "failed",
-            "error": "Agent 异常退出"
-        }
+        log_dir = _resolve_input_log_dir(content_finder_root)
+        log_dir.mkdir(parents=True, exist_ok=True)
+        log_file_path = log_dir / f"run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
+
+        run_result: Optional[Dict[str, Any]] = None
+
+        with build_log(execution_id) as log_buffer:
+            async for item in runner.run(messages=messages, config=config):
+                if isinstance(item, Trace):
+                    trace_id = item.trace_id
+
+                    if item.status == "completed":
+                        logger.info(f"Agent 执行完成: trace_id={trace_id}")
+                        run_result = {
+                            "trace_id": trace_id,
+                            "status": "completed",
+                        }
+                        break
+                    if item.status == "failed":
+                        logger.error(f"Agent 执行失败: {item.error_message}")
+                        run_result = {
+                            "trace_id": trace_id,
+                            "status": "failed",
+                            "error": item.error_message,
+                        }
+                        break
+
+                elif isinstance(item, Message) and stream_output:
+                    text = extract_assistant_text(item)
+                    if text:
+                        log(f"[assistant] {text}")
+
+            if run_result is None:
+                run_result = {
+                    "trace_id": trace_id,
+                    "status": "failed",
+                    "error": "Agent 异常退出",
+                }
+
+            full_log = log_buffer.getvalue()
+            with open(log_file_path, "w", encoding="utf-8") as f:
+                f.write(full_log)
+
+        return run_result
 
     except KeyboardInterrupt:
         logger.info("用户中断")

+ 402 - 0
examples/content_finder/render_log_html.py

@@ -0,0 +1,402 @@
+#!/usr/bin/env python3
+"""将 run_log 文本渲染为可折叠 HTML 页面。
+
+直接在脚本内修改 INPUT_LOG_PATH / OUTPUT_HTML_PATH 后运行:
+    python examples/piaoquan_needs/render_log_html.py
+"""
+
+from __future__ import annotations
+
+import html
+import os
+from dataclasses import dataclass, field
+from pathlib import Path
+from dotenv import load_dotenv
+
+# 加载 examples/content_finder/.env(不依赖你从哪个目录运行)
+load_dotenv(dotenv_path=Path(__file__).resolve().parent / ".env", override=False)
+
+
+@dataclass
+class Node:
+    title: str | None = None
+    entries: list[str | "Node"] = field(default_factory=list)
+
+    @property
+    def is_fold(self) -> bool:
+        return self.title is not None
+
+
+def parse_log(content: str) -> Node:
+    root = Node(title=None)
+    stack: list[Node] = [root]
+
+    for raw_line in content.splitlines():
+        line = raw_line.rstrip("\n")
+        tag = line.strip()
+        if tag.startswith("[FOLD:") and tag.endswith("]"):
+            title = tag[len("[FOLD:") : -1]
+            node = Node(title=title)
+            stack[-1].entries.append(node)
+            stack.append(node)
+            continue
+        if tag == "[/FOLD]":
+            # 容错:遇到多余的 [/FOLD] 时,忽略而不是把它当作正文
+            if len(stack) > 1:
+                stack.pop()
+            continue
+        stack[-1].entries.append(line)
+
+    while len(stack) > 1:
+        unclosed = stack.pop()
+        # 容错: 遇到缺失 [/FOLD] 时,保留原有内容,不丢日志
+        stack[-1].entries.append(unclosed)
+    return root
+
+
+DEFAULT_COLLAPSE_PREFIXES = ["🔧", "📥", "📤"]
+DEFAULT_COLLAPSE_KEYWORDS = ["调用参数", "返回内容"]
+
+# 工具功能摘要(静态映射,用于日志可视化展示)
+TOOL_DESCRIPTION_MAP: dict[str, str] = {
+    "think_and_plan": "系统化记录思考、计划与下一步行动(只记录不获取新信息)。",
+    "douyin_search": "通过关键词在抖音上搜索视频内容。",
+    "douyin_user_videos": "通过账号/作者 sec_uid 获取其历史作品列表。",
+    "get_content_fans_portrait": "获取视频点赞用户画像(热点宝),判断 metadata.has_portrait。",
+    "get_account_fans_portrait": "获取作者粉丝画像(热点宝),用于内容画像缺失兜底。",
+    "store_results_mysql": "将 output.json 写入 MySQL(作者表与内容表)。",
+    "create_crawler_plan_by_douyin_content_id": "为入选视频生成 AIGC 爬取计划。",
+    "create_crawler_plan_by_douyin_account_id": "为入选账号生成 AIGC 爬取计划。",
+}
+
+# =========================
+# 运行配置(默认从 .env 读取)
+# =========================
+INPUT_LOG_PATH = os.getenv("INPUT_LOG_PATH", ".cache/input_log")
+# 设为 None 则默认生成到输入文件同名 .html
+OUTPUT_HTML_PATH: str | None = os.getenv("OUTPUT_HTML_PATH") or None
+# 是否默认折叠所有 [FOLD] 块
+COLLAPSE_ALL_FOLDS = False
+# 命中这些前缀/关键词的折叠块默认收起
+COLLAPSE_PREFIXES = DEFAULT_COLLAPSE_PREFIXES
+COLLAPSE_KEYWORDS = DEFAULT_COLLAPSE_KEYWORDS
+
+
+def resolve_config_path(path_str: str) -> Path:
+    """解析配置中的路径,兼容从项目根目录或脚本目录运行。"""
+    raw = Path(path_str).expanduser()
+    if raw.is_absolute():
+        return raw.resolve()
+
+    cwd_candidate = (Path.cwd() / raw).resolve()
+    if cwd_candidate.exists():
+        return cwd_candidate
+
+    script_dir = Path(__file__).resolve().parent
+    script_candidate = (script_dir / raw).resolve()
+    if script_candidate.exists():
+        return script_candidate
+
+    project_root = script_dir.parent.parent
+    project_candidate = (project_root / raw).resolve()
+    if project_candidate.exists():
+        return project_candidate
+
+    # 如果都不存在,返回项目根拼接结果,便于报错信息更稳定
+    return project_candidate
+
+
+def should_collapse(
+    title: str,
+    collapse_prefixes: list[str],
+    collapse_keywords: list[str],
+    collapse_all: bool,
+) -> bool:
+    if collapse_all:
+        return True
+    if any(title.startswith(prefix) for prefix in collapse_prefixes):
+        return True
+    return any(keyword in title for keyword in collapse_keywords)
+
+
+def render_text_block(lines: list[str]) -> str:
+    if not lines:
+        return ""
+
+    normalized = lines[:]
+    while normalized and normalized[0].strip() == "":
+        normalized.pop(0)
+    while normalized and normalized[-1].strip() == "":
+        normalized.pop()
+    if not normalized:
+        return ""
+
+    compact: list[str] = []
+    empty_streak = 0
+    for line in normalized:
+        if line.strip() == "":
+            empty_streak += 1
+            if empty_streak <= 1:
+                compact.append("")
+        else:
+            empty_streak = 0
+            compact.append(line)
+
+    escaped = html.escape("\n".join(compact))
+    return f'<pre class="log-text">{escaped}</pre>'
+
+
+def enrich_fold_title(title: str) -> str:
+    """为工具调用标题附加工具功能描述。"""
+    tool_prefix = "🔧 "
+    if not title.startswith(tool_prefix):
+        return title
+
+    tool_name = title[len(tool_prefix):].strip()
+    description = TOOL_DESCRIPTION_MAP.get(tool_name)
+    if not description:
+        return title
+    return f"{tool_prefix}{tool_name}({description})"
+
+
+def render_node(
+    node: Node,
+    collapse_prefixes: list[str],
+    collapse_keywords: list[str],
+    collapse_all: bool,
+) -> str:
+    parts: list[str] = []
+    text_buffer: list[str] = []
+
+    def flush_text_buffer() -> None:
+        if text_buffer:
+            parts.append(render_text_block(text_buffer))
+            text_buffer.clear()
+
+    for entry in node.entries:
+        if isinstance(entry, str):
+            text_buffer.append(entry)
+            continue
+
+        child = entry
+        if child.is_fold:
+            flush_text_buffer()
+            title = child.title or ""
+            is_collapsed = should_collapse(
+                title=title,
+                collapse_prefixes=collapse_prefixes,
+                collapse_keywords=collapse_keywords,
+                collapse_all=collapse_all,
+            )
+            folded_class = "fold tool-fold" if is_collapsed else "fold normal-fold"
+            open_attr = "" if is_collapsed else " open"
+            display_title = enrich_fold_title(title)
+            inner = render_node(
+                child,
+                collapse_prefixes=collapse_prefixes,
+                collapse_keywords=collapse_keywords,
+                collapse_all=collapse_all,
+            )
+            parts.append(
+                f'<details class="{folded_class}"{open_attr}>'
+                f'<summary>{html.escape(display_title)}</summary>'
+                f"{inner}"
+                "</details>"
+            )
+
+    flush_text_buffer()
+
+    return "".join(parts)
+
+
+def build_html(body: str, source_name: str) -> str:
+    return f"""<!doctype html>
+<html lang="zh-CN">
+<head>
+  <meta charset="UTF-8" />
+  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
+  <title>Run Log 可视化 - {html.escape(source_name)}</title>
+  <style>
+    :root {{
+      --bg: #0b1020;
+      --panel: #131a2a;
+      --text: #e8edf7;
+      --muted: #98a2b3;
+      --accent: #6ea8fe;
+      --border: #263146;
+    }}
+    * {{
+      box-sizing: border-box;
+    }}
+    body {{
+      margin: 0;
+      background: var(--bg);
+      color: var(--text);
+      font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", sans-serif;
+    }}
+    .wrap {{
+      max-width: 1200px;
+      margin: 0 auto;
+      padding: 20px;
+    }}
+    .header {{
+      margin-bottom: 14px;
+      display: flex;
+      align-items: center;
+      gap: 10px;
+      flex-wrap: wrap;
+    }}
+    .title {{
+      font-size: 18px;
+      font-weight: 700;
+    }}
+    .source {{
+      color: var(--muted);
+      font-size: 13px;
+    }}
+    button {{
+      border: 1px solid var(--border);
+      background: var(--panel);
+      color: var(--text);
+      padding: 6px 10px;
+      border-radius: 8px;
+      cursor: pointer;
+    }}
+    button:hover {{
+      border-color: var(--accent);
+      color: var(--accent);
+    }}
+    .content {{
+      background: var(--panel);
+      border: 1px solid var(--border);
+      border-radius: 10px;
+      padding: 10px;
+    }}
+    details {{
+      margin: 6px 0;
+      border: 1px solid var(--border);
+      border-radius: 8px;
+      background: rgba(255, 255, 255, 0.01);
+    }}
+    details > summary {{
+      cursor: pointer;
+      padding: 8px 10px;
+      font-size: 13px;
+      list-style: none;
+      user-select: none;
+      color: #cdd6e5;
+    }}
+    details > summary::-webkit-details-marker {{
+      display: none;
+    }}
+    details > summary::before {{
+      content: "▶";
+      display: inline-block;
+      margin-right: 6px;
+      transform: rotate(0deg);
+      transition: transform 120ms ease;
+      color: var(--muted);
+    }}
+    details[open] > summary::before {{
+      transform: rotate(90deg);
+    }}
+    .tool-fold > summary {{
+      color: #f6cf76;
+    }}
+    .log-text {{
+      margin: 0;
+      padding: 10px;
+      border-top: 1px dashed var(--border);
+      color: var(--text);
+      white-space: pre-wrap;
+      word-break: break-word;
+      line-height: 1.4;
+      font-size: 13px;
+      font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace;
+    }}
+  </style>
+</head>
+<body>
+  <div class="wrap">
+    <div class="header">
+      <div class="title">Run Log 可视化</div>
+      <div class="source">{html.escape(source_name)}</div>
+      <button id="expand-tools">展开全部工具调用</button>
+      <button id="collapse-tools">折叠全部工具调用</button>
+    </div>
+    <div class="content">{body}</div>
+  </div>
+  <script>
+    const toolFolds = Array.from(document.querySelectorAll("details.tool-fold"));
+    document.getElementById("expand-tools").addEventListener("click", () => {{
+      toolFolds.forEach((el) => (el.open = true));
+    }});
+    document.getElementById("collapse-tools").addEventListener("click", () => {{
+      toolFolds.forEach((el) => (el.open = false));
+    }});
+  </script>
+</body>
+</html>
+"""
+
+
+def generate_html(
+    input_path: Path,
+    output_path: Path,
+    collapse_prefixes: list[str],
+    collapse_keywords: list[str],
+    collapse_all: bool = False,
+) -> None:
+    content = input_path.read_text(encoding="utf-8")
+    tree = parse_log(content)
+    body = render_node(
+        tree,
+        collapse_prefixes=collapse_prefixes,
+        collapse_keywords=collapse_keywords,
+        collapse_all=collapse_all,
+    )
+    html_content = build_html(body=body, source_name=input_path.name)
+    output_path.parent.mkdir(parents=True, exist_ok=True)
+    output_path.write_text(html_content, encoding="utf-8")
+
+
+def main() -> None:
+    input_base = resolve_config_path(INPUT_LOG_PATH)
+    if input_base.is_file():
+        input_path = input_base
+    elif input_base.is_dir():
+        # 优先渲染最新 run_log_*.txt,其次渲染任意 *.txt
+        candidates = sorted(
+            input_base.glob("run_log_*.txt"),
+            key=lambda p: p.stat().st_mtime,
+            reverse=True,
+        )
+        if not candidates:
+            candidates = sorted(
+                input_base.glob("*.txt"),
+                key=lambda p: p.stat().st_mtime,
+                reverse=True,
+            )
+        if not candidates:
+            raise FileNotFoundError(f"目录下未找到可渲染日志文件: {input_base}")
+        input_path = candidates[0]
+    else:
+        raise FileNotFoundError(f"输入日志路径不存在: {input_base}")
+
+    if OUTPUT_HTML_PATH:
+        output_path = resolve_config_path(OUTPUT_HTML_PATH)
+    else:
+        output_path = input_path.with_suffix(".html")
+
+    generate_html(
+        input_path=input_path,
+        output_path=output_path,
+        collapse_prefixes=COLLAPSE_PREFIXES,
+        collapse_keywords=COLLAPSE_KEYWORDS,
+        collapse_all=COLLAPSE_ALL_FOLDS,
+    )
+    print(f"HTML 已生成: {output_path}")
+
+
+if __name__ == "__main__":
+    main()

+ 10 - 0
examples/content_finder/skills/content_finding_strategy.md

@@ -21,9 +21,19 @@ description: 内容搜索方法论
 
 **数量控制**:只搜索 **N = M × 2** 条,搜到后立即停止,不超出此限制。
 
+### 备选:历史优质作者扩展(备选策略)
+
+当关键词搜索结果质量不稳定、或需要更贴近目标人群的内容时,可走“作者→作品”的扩展路径:
+
+- 先调用 `find_authors_from_db(query)`:从数据库历史沉淀中按搜索词找到相关优质作者(返回 `author_sec_uid`)
+- 再对 Top 作者调用 `douyin_user_videos(account_id=author_sec_uid)` 拉作品,作为候选池补充
+
+**仍需遵守数量控制**:作者扩展拿到的作品也计入候选数量,总量不要超过 **N = M × 2**。
+
 **数据读取规则**:
 - 搜索结果从 `metadata.search_results` 获取,**不要解析工具的 output 文本**
 - 账号作品从 `metadata.user_videos` 获取
+- 数据库作者从 `find_authors_from_db` 的 `metadata.authors` 获取(优先使用其中的 `author_sec_uid`)
 
 **分页策略**:第一次使用默认 cursor(`"0"` 或 `""`),需要更多时使用返回的 cursor 继续获取。
 

+ 4 - 0
examples/content_finder/tools/__init__.py

@@ -7,6 +7,8 @@ from .douyin_user_videos import douyin_user_videos
 from .hotspot_profile import get_content_fans_portrait, get_account_fans_portrait
 from .store_results_mysql import store_results_mysql
 from .aigc_platform_api import create_crawler_plan_by_douyin_content_id, create_crawler_plan_by_douyin_account_id
+from .think_and_plan import think_and_plan
+from .find_authors_from_db import find_authors_from_db
 
 __all__ = [
     "douyin_search",
@@ -16,4 +18,6 @@ __all__ = [
     "store_results_mysql",
     "create_crawler_plan_by_douyin_content_id",
     "create_crawler_plan_by_douyin_account_id",
+    "think_and_plan",
+    "find_authors_from_db",
 ]

+ 104 - 0
examples/content_finder/tools/find_authors_from_db.py

@@ -0,0 +1,104 @@
+"""
+从数据库中按“搜索词 query”检索历史任务沉淀的优质作者(demand_find_author)。
+
+用途:
+- 先用该工具找到相关作者(sec_uid / 链接)
+- 再调用 douyin_user_videos(account_id=sec_uid) 获取其作品做二次筛选
+"""
+
+from __future__ import annotations
+
+import re
+from typing import Any, Dict, List, Optional
+
+from agent.tools import ToolResult, tool
+
+from db import get_connection
+
+
+_DOUYIN_USER_URL_RE = re.compile(r"^https?://www\.douyin\.com/user/(?P<sec_uid>[^/?#]+)")
+
+
+def _extract_sec_uid(author_link: str) -> str:
+    if not author_link:
+        return ""
+    m = _DOUYIN_USER_URL_RE.match(author_link.strip())
+    return m.group("sec_uid") if m else ""
+
+
+def _query_authors(conn, query: str, limit: int) -> List[Dict[str, Any]]:
+    q = (query or "").strip()
+    if not q:
+        return []
+
+    # demand_find_author 本身不存 query,需要通过 trace_id 关联 demand_find_content_result.query
+    sql = """
+    SELECT DISTINCT
+      a.author_name,
+      a.author_link,
+      a.elderly_ratio,
+      a.elderly_tgi,
+      a.remark,
+      a.trace_id
+    FROM demand_find_author a
+    INNER JOIN demand_find_content_result r
+      ON r.trace_id = a.trace_id
+    WHERE r.query LIKE %s
+    ORDER BY a.elderly_ratio DESC, a.elderly_tgi DESC
+    LIMIT %s
+    """
+    like = f"%{q}%"
+    with conn.cursor() as cur:
+        cur.execute(sql, (like, int(limit)))
+        rows = cur.fetchall() or []
+        return [dict(r) for r in rows]
+
+
+@tool(description="从 demand_find_author 中按搜索词查找相关作者")
+async def find_authors_from_db(query: str, limit: int = 20) -> ToolResult:
+    """
+    Args:
+        query: 搜索词(与历史 demand_find_content_result.query 模糊匹配)
+        limit: 返回作者数量上限
+    """
+    conn = get_connection()
+    try:
+        rows = _query_authors(conn, query=query, limit=limit)
+    finally:
+        conn.close()
+
+    authors: List[Dict[str, Any]] = []
+    for r in rows:
+        author_link = r.get("author_link") or ""
+        authors.append(
+            {
+                "author_nickname": r.get("author_name") or "",
+                "author_url": author_link,
+                "author_sec_uid": _extract_sec_uid(author_link),
+                "age_50_plus_ratio": r.get("elderly_ratio") or "",
+                "age_50_plus_tgi": r.get("elderly_tgi") or "",
+                "remark": r.get("remark") or "",
+                "trace_id": r.get("trace_id") or "",
+            }
+        )
+
+    lines = [f"按搜索词「{query}」在数据库中找到 {len(authors)} 个相关作者:", ""]
+    for i, a in enumerate(authors, 1):
+        lines.append(f"{i}. {a['author_nickname']}")
+        if a["author_sec_uid"]:
+            lines.append(f"   sec_uid: {a['author_sec_uid']}")
+        if a["author_url"]:
+            lines.append(f"   链接: {a['author_url']}")
+        if a["age_50_plus_ratio"] != "" or a["age_50_plus_tgi"] != "":
+            lines.append(f"   画像: 50+占比={a['age_50_plus_ratio']} | TGI={a['age_50_plus_tgi']}")
+        if a["remark"]:
+            lines.append(f"   备注: {a['remark']}")
+        lines.append("")
+
+    return ToolResult(
+        title="数据库作者检索",
+        output="\n".join(lines).strip(),
+        metadata={"authors": authors, "query": query, "limit": limit},
+        long_term_memory=f"DB author search for '{query}', found {len(authors)} authors",
+    )
+

+ 46 - 0
examples/content_finder/tools/think_and_plan.py

@@ -0,0 +1,46 @@
+import json
+
+from agent.tools import tool
+from utils.log_capture import log, log_fold
+
+
+def _log_tool_call(tool_name: str, params: dict, result: str) -> None:
+    """以折叠块结构化输出工具调用的参数与返回内容(经 log() 进入 build_log buffer)。"""
+    with log_fold(f"🔧 {tool_name}"):
+        with log_fold("📥 调用参数"):
+            log(json.dumps(params, ensure_ascii=False, indent=2))
+        with log_fold("📤 返回内容"):
+            log(result)
+
+
+@tool(
+    description="系统化思考与规划工具。不会获取新信息或更改数据库,只用于记录思考过程。",
+)
+def think_and_plan(thought: str, thought_number: int, action: str, plan: str) -> str:
+    """这是用于系统化思考与规划的工具,支持在面对复杂选题构建任务时分阶段梳理思考、规划和行动步骤。该工具不会获取新信息或更改数据库,只会将想法附加到记忆中。
+
+    Args:
+        thought: 当前的思考内容,可以是对问题的分析、假设、洞见、反思或对前一步骤的总结。
+        thought_number: 当前思考步骤的编号,用于追踪和回溯整个思考与规划过程。
+        action: 基于当前思考和计划,建议下一步采取的行动步骤。
+        plan: 针对当前任务拟定的计划或方案。
+
+    Returns:
+        A string describing the thought, plan, and action steps.
+    """
+    params = {
+        "thought": thought,
+        "thought_number": thought_number,
+        "action": action,
+        "plan": plan,
+    }
+
+    result = (
+        f"[思考 #{thought_number}]\n"
+        f"思考: {thought}\n"
+        f"计划: {plan}\n"
+        f"下一步: {action}\n"
+        f"(此工具仅用于记录思考过程,不会修改任何数据)"
+    )
+    _log_tool_call("think_and_plan", params, result)
+    return result

+ 134 - 0
examples/content_finder/utils/log_capture.py

@@ -0,0 +1,134 @@
+"""
+Tee 日志捕获工具
+
+支持多 Agent 并发执行:
+  - 每个 Agent 通过 build_log(build_id) 注册自己的日志 buffer
+  - log() 函数根据 contextvars 自动路由到当前 Agent 的 buffer
+  - 同时输出到真实 stdout,不劫持 sys.stdout
+"""
+import io
+import sys
+import contextvars
+import threading
+from contextlib import contextmanager
+
+# 当前 Agent 执行绑定的 build_id(通过 contextvars 跨 asyncio.to_thread 传播)
+_current_build_id: contextvars.ContextVar[int | None] = contextvars.ContextVar(
+    'log_build_id', default=None
+)
+
+# build_id → StringIO buffer 的全局注册表(线程安全)
+_buffers: dict[int, io.StringIO] = {}
+_buffers_lock = threading.Lock()
+
+# 保存真实 stdout(进程启动时的值,不会被覆盖)
+_real_stdout = sys.stdout
+
+
+def log(*args, **kwargs):
+    """并发安全的日志函数,替代 print()。
+
+    同时输出到 stdout 和当前 Agent 的日志 buffer。
+    如果不在 Agent 上下文中,等同于普通 print()。
+    """
+    # 1. 始终输出到真实 stdout
+    print(*args, file=_real_stdout, **kwargs)
+
+    # 2. 如果在 Agent 上下文中,额外写入 buffer
+    build_id = _current_build_id.get()
+    if build_id is not None:
+        buf = _buffers.get(build_id)
+        if buf is not None:
+            print(*args, file=buf, **kwargs)
+
+
+@contextmanager
+def build_log(build_id: int):
+    """Agent 执行的日志上下文管理器。
+
+    使用方式:
+        with build_log(build_id):
+            log("这条会写入 buffer")
+            ...
+        # with 结束后仅清理内存缓冲区
+    """
+    buf = io.StringIO()
+    token = _current_build_id.set(build_id)
+
+    with _buffers_lock:
+        _buffers[build_id] = buf
+
+    try:
+        yield buf
+    finally:
+        # 清理
+        with _buffers_lock:
+            _buffers.pop(build_id, None)
+        _current_build_id.reset(token)
+        buf.close()
+
+
+@contextmanager
+def log_fold(label: str):
+    """可折叠日志块的上下文管理器"""
+    log(f"[FOLD:{label}]")
+    try:
+        yield
+    finally:
+        log("[/FOLD]")
+
+
+def get_log_content(build_id: int) -> str | None:
+    """获取指定 build 当前已收集的日志内容(用于实时查看)"""
+    buf = _buffers.get(build_id)
+    return buf.getvalue() if buf else None
+
+
+def _save_to_db(build_id: int, content: str) -> bool:
+    """兼容旧接口:已禁用 DB 落库。"""
+    return False
+
+
+# ============================================================================
+# 兼容旧接口 — TeeStream(仅供单线程场景使用,如 run_build_topic_agent.py)
+# ============================================================================
+
+class TeeStream(io.TextIOBase):
+    """Tee 模式的输出流:同时写入原始 stdout 和内部缓冲区
+
+    ⚠️ 仅供单进程单 Agent 使用(如命令行运行),并发场景请使用 build_log()。
+    """
+
+    def __init__(self, original_stdout):
+        super().__init__()
+        self.original_stdout = original_stdout
+        self._buffer = io.StringIO()
+
+    def write(self, s):
+        if s:
+            self.original_stdout.write(s)
+            self._buffer.write(s)
+        return len(s) if s else 0
+
+    def flush(self):
+        self.original_stdout.flush()
+        self._buffer.flush()
+
+    def get_log(self) -> str:
+        return self._buffer.getvalue()
+
+    def save_to_db(self, build_id: int) -> bool:
+        return False
+
+    @property
+    def encoding(self):
+        return self.original_stdout.encoding
+
+    def isatty(self):
+        return False
+
+    def readable(self):
+        return False
+
+    def writable(self):
+        return True