Talegorithm пре 1 месец
родитељ
комит
eb53b80ddf

+ 9 - 4
agent/README.md

@@ -261,12 +261,17 @@ examples/research/
 | `extract_video_clip` | YouTube 视频片段截取 |
 | `import_content` | 批量导入文章到 CMS |
 
-### knowledge — 知识管理(2)
+### 远端 Agent — 统一通过 `agent` 工具
 
-| 工具 | 说明 |
+知识查询、知识上传、深度调研等远端服务**不再作为独立工具**暴露,全部通过统一的 `agent` 工具调用。Librarian 一个 Agent 多种模式,通过 `skills` 参数切换:
+
+| 用法 | 说明 |
 |------|------|
-| `ask_knowledge` | 向知识库查询(通过 Librarian Agent) |
-| `upload_knowledge` | 上传调研结果到知识库 |
+| `agent(agent_type="remote_librarian", task=..., skills=["ask_strategy"])` | 知识查询,返回带引用的整合回答 |
+| `agent(agent_type="remote_librarian", task=json.dumps({...}), skills=["upload_strategy"])` | 知识上传(task 为 JSON 字符串) |
+| `agent(agent_type="remote_research", task=...)` | 深度调研 |
+
+详见 [tools.md § Agent 工具](./docs/tools.md#agent-工具)。
 
 ### toolhub — 远程工具库(3)
 

+ 16 - 4
agent/core/runner.py

@@ -105,6 +105,7 @@ class RunConfig:
     max_iterations: int = 200
     tools: Optional[List[str]] = None          # None = 按 tool_groups 过滤;显式列表 = 精确指定
     tool_groups: Optional[List[str]] = field(default_factory=lambda: ["core"])  # 工具分组白名单;默认仅 core,项目按需追加
+    exclude_tools: List[str] = field(default_factory=list)  # 从 tools / tool_groups 结果中再排除的工具名(如远程 agent 禁用 agent/evaluate)
     side_branch_max_turns: int = 5             # 侧分支最大轮次(压缩/反思)
     goal_compression: Literal["none", "on_complete", "on_overflow"] = "on_overflow"  # Goal 压缩模式
 
@@ -320,6 +321,7 @@ class AgentRunner:
         messages: List[Dict],
         config: Optional[RunConfig] = None,
         on_event: Optional[Callable] = None,
+        inject_skills: Optional[List[str]] = None,
     ) -> Dict[str, Any]:
         """
         结果模式 — 消费 run(),返回结构化结果。
@@ -328,11 +330,12 @@ class AgentRunner:
 
         Args:
             on_event: 可选回调,每个 Trace/Message 事件触发一次,用于实时输出子 Agent 执行过程。
+            inject_skills: 本次调用需要指定注入的 skill 列表(透传给 run())。
         """
         last_assistant_text = ""
         final_trace: Optional[Trace] = None
 
-        async for item in self.run(messages=messages, config=config):
+        async for item in self.run(messages=messages, config=config, inject_skills=inject_skills):
             if on_event:
                 on_event(item)
             if isinstance(item, Message) and item.role == "assistant":
@@ -471,7 +474,7 @@ class AgentRunner:
         task_name = config.name or await self._generate_task_name(messages)
 
         # 准备工具 Schema
-        tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups)
+        tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups, config.exclude_tools)
 
         trace_obj = Trace(
             trace_id=trace_id,
@@ -1005,7 +1008,7 @@ class AgentRunner:
     ) -> AsyncIterator[Union[Trace, Message]]:
         """ReAct 循环"""
         trace_id = trace.trace_id
-        tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups)
+        tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups, config.exclude_tools)
 
         # 当前主路径头节点的 sequence(用于设置 parent_sequence)
         head_seq = trace.head_sequence
@@ -2757,7 +2760,12 @@ class AgentRunner:
         )
         return messages
 
-    def _get_tool_schemas(self, tools: Optional[List[str]] = None, tool_groups: Optional[List[str]] = None) -> List[Dict]:
+    def _get_tool_schemas(
+        self,
+        tools: Optional[List[str]] = None,
+        tool_groups: Optional[List[str]] = None,
+        exclude_tools: Optional[List[str]] = None,
+    ) -> List[Dict]:
         """
         获取工具 Schema
 
@@ -2765,6 +2773,8 @@ class AgentRunner:
         - tool_groups 非空: 按分组白名单过滤得到基础工具集
         - tools 非空: 追加指定的工具名(与 tool_groups 结果取并集)
         - 两者都为 None: 返回所有已注册工具
+
+        最后再用 exclude_tools 减去禁用的工具(如远程 agent 禁止 agent/evaluate)。
         """
         if tool_groups is not None:
             tool_names = set(self.tools.get_tool_names(groups=tool_groups))
@@ -2772,6 +2782,8 @@ class AgentRunner:
             tool_names = set(self.tools.get_tool_names())
         if tools is not None:
             tool_names |= set(tools)
+        if exclude_tools:
+            tool_names -= set(exclude_tools)
         return self.tools.get_schemas(list(tool_names))
 
     # 默认 system prompt 前缀(当 config.system_prompt 和前端都未提供 system message 时使用)

+ 33 - 0
agent/docs/decisions.md

@@ -1336,4 +1336,37 @@ context = {
 
 **实现**:`agent/core/runner.py:_agent_loop`, `agent/trace/models.py:Message`, `agent/trace/compaction.py`
 
+---
+
+## Decision 25: Agent 工具统一化 — `remote_` 前缀路由
+
+**日期**:2026-04-14
+
+**背景**:原先远端 Agent(Librarian、Research)通过各自独立的客户端工具(`ask_knowledge`、`ask_research`)调用,和本地 `agent` 工具签名不同;每加一个远端 Agent 要写一个新工具,模型要记忆多套调用约定。
+
+**决策**:
+1. 合并:所有远端服务(查询 / 上传 / 调研)走同一个 `agent` 工具,同一个服务器端点 `/api/agent`。
+2. 路由:`agent_type` 带 `remote_` 前缀 → HTTP 调用;否则本地执行。
+3. 全部同步:服务器处理完成后才返回结果。不保留原来 upload 的 202 异步语义——upload 数据只是 JSON,等同于一条 message,没必要特殊化。
+4. 完全删除客户端 `ask_knowledge` 和 `upload_knowledge` 工具,以及 `/api/knowledge/ask`、`/api/knowledge/research`、`/api/knowledge/upload` 端点。
+5. **一个 agent 多种模式**:Librarian 用 `skills` 参数切换模式(`ask_strategy` 查询 / `upload_strategy` 上传),**不**为每种模式拆独立 `agent_type`。这比依赖 task 内容或动态触发更显式、更可靠。
+6. **Skill 白名单机制**:远端每个 `agent_type` 定义 `ALLOWED_SKILLS`,调用方传的 skill 经白名单过滤。其他配置(`tools`/`model`/prompt)仍然纯服务器端。
+7. IO 契约:不发明 per-agent-type schema——Agent 之间通过 message 交流,结构化信息由 Agent 的 prompt 约定写进 message 文本,caller 自己 parse。
+8. 续跑:服务器不维护 `caller_trace_id → sub_trace_id` 映射;caller 显式传 `continue_from`。
+9. CLI:`python -m agent.tools.builtin.subagent --agent_type=remote_xxx --task=... [--skills=a,b]` 让 Claude Code 可通过 skill 调用。
+10. 远端 Agent 的三条安全约束,每个 handler 直接在自己的 `RunConfig` 里配置(不搞抽象 helper):
+    - 禁止调用 `agent` / `evaluate`(防递归)——用 `tools=[...]` 精确列表 或 `exclude_tools=["agent","evaluate"]`
+    - 关闭自动知识提取 / 复盘(`enable_extraction` / `enable_completion_extraction` = False)
+    - 关闭自动知识注入(`enable_injection` = False,否则远端服务器会回调自己)
+    配合的底层机制:`RunConfig.exclude_tools: List[str]` 字段(重新引入),在 tool_groups / tools 基础上再扣减——一个通用字段,不只为远端场景。
+
+**理由**:
+- 加远端 Agent = 服务器注册 `agent_type`,客户端零改动
+- 服务器端无状态化(去掉 trace_map)
+- `remote_` 前缀让模型知道通信通道限制(不能用本地文件路径)
+
+**可用 agent_type 不做动态发现**:项目通过 prompt 显式告诉主 Agent 可用的远端类型,避免启动时依赖服务器。
+
+**实现**:`agent/tools/builtin/subagent.py`(路由)、`knowhub/server.py::agent_api`(`/api/agent`)、`knowhub/agents/librarian.py` / `research.py`(去 trace_map)。文档:`agent/docs/tools.md § Agent 工具`、`knowhub/docs/remote-agents.md`、`knowhub/docs/api.md`。
+
 ---

+ 78 - 33
agent/docs/tools.md

@@ -859,7 +859,7 @@ RunConfig(tools=["knowledge_search", "read_file"]) # 精确指定(优先于 to
 | `bash_command` | 执行 shell 命令 | opencode bash.ts |
 | `glob_files` | 文件模式匹配 | opencode glob.ts |
 | `grep_content` | 内容搜索(正则表达式) | opencode grep.ts |
-| `agent` | 创建 Agent 执行任务(单任务 delegate / 多任务并行 explore) | 自研 |
+| `agent` | 创建子 Agent 执行任务(本地执行或路由到远端服务器,由 `agent_type` 决定) | 自研 |
 | `evaluate` | 评估目标执行结果是否满足要求 | 自研 |
 | `toolhub_health` | 检查 ToolHub 远程工具库服务状态 | 自研 |
 | `toolhub_search` | 搜索/发现 ToolHub 远程工具 | 自研 |
@@ -870,8 +870,6 @@ RunConfig(tools=["knowledge_search", "read_file"]) # 精确指定(优先于 to
 | `content_suggest` | 搜索关键词补全建议 | 自研 |
 | `extract_video_clip` | 截取已下载 YouTube 视频的片段 | 自研 |
 | `import_content` | 批量导入文章到 CMS | 自研 |
-| `ask_knowledge` | 向知识库查询信息(通过 KnowHub Librarian) | 自研 |
-| `upload_knowledge` | 上传调研结果到知识库 | 自研 |
 
 #### `read_file` vs `read_images`
 
@@ -903,15 +901,10 @@ RunConfig(tools=["knowledge_search", "read_file"]) # 精确指定(优先于 to
 
 ### Agent 工具
 
-创建子 Agent 执行任务。通过 `task` 参数的类型自动区分模式:
-
-| task 类型 | 模式 | 并行执行 | 工具权限 |
-|-----------|------|---------|---------|
-| `str`(单任务) | delegate | ❌ | 完整(除 agent/evaluate 外) |
-| `List[str]`(多任务) | explore | ✅ | 只读(read_file, grep_content, glob_files, goal) |
+创建子 Agent 执行任务。同一个 `agent` 工具既可以启动**本地**子 Agent,也可以路由到**远端服务器**上的 Agent,由 `agent_type` 前缀决定。
 
 ```python
-@tool(description="创建 Agent 执行任务")
+@tool(description="创建子 Agent 执行任务", groups=["core"])
 async def agent(
     task: Union[str, List[str]],
     messages: Optional[Union[Messages, List[Messages]]] = None,
@@ -922,13 +915,34 @@ async def agent(
 ) -> Dict[str, Any]:
 ```
 
-**参数说明**:
-- `task`: 任务描述(字符串=单任务,列表=多任务并行)
-- `messages`: 预置消息(None/1D 列表/2D 列表)
-- `continue_from`: 继续已有 trace(仅单任务)
-- `agent_type`: 子 Agent 类型,决定 preset 和默认 skills(如 "tool_research")
-- `skills`: 附加到 system prompt 的 skill 名称列表,覆盖 preset 默认值
-- `context`: 框架自动注入的上下文
+#### 本地 vs 远端:`remote_` 前缀约定
+
+| `agent_type` | 执行位置 | 通信通道 | 典型用途 |
+|--------------|---------|---------|---------|
+| 无前缀(`delegate` / `explore` / `deconstruct` 等) | **本地** 进程内 | 共享文件系统 + message | 项目内委托任务,可通过文件路径传递大数据 |
+| `remote_` 前缀(`remote_librarian` / `remote_research` 等) | **远端** KnowHub 服务器 | **仅 message 通道** | 跨项目复用的能力(知识查询、深度调研) |
+
+**为什么要前缀**:远端 Agent 无法访问调用方的本地文件。前缀告诉模型"不要在 task 里引用本地路径,所有输入必须 inline;所有输出在 response message 里拿"。要传大文件时,先通过 `upload_knowledge` / `toolhub` 等基础设施上传到共享存储,再把 ID 传给远端 Agent。
+
+**哪些远端类型可用**:不通过动态发现,由项目的 prompt 或 preset 显式告诉主 Agent。例如:
+```
+# prompt 片段
+你可以调用以下远端 Agent:
+- remote_librarian(skills=["ask_strategy"]): 整合知识库查询结果,返回带引用的回答
+- remote_librarian(skills=["upload_strategy"]): 上传调研结果(task 为 JSON 字符串)
+- remote_research: 深度调研,全网搜集+总结
+```
+
+**Skill 白名单**:远端 `agent_type` 由服务器定义一个允许调用方注入的 skill 列表(如 `remote_librarian` 的 `["ask_strategy", "upload_strategy"]`)。调用方传的 skill 经白名单过滤后生效——这比拆出多个 agent_type 更简洁:**一个 Agent 多种模式,模式由 skill 触发**。
+
+#### 本地模式:单任务 vs 多任务
+
+本地调用(`agent_type` 无 `remote_` 前缀)根据 `task` 类型分两种模式:
+
+| task 类型 | 模式 | 并行执行 | 工具权限 |
+|-----------|------|---------|---------|
+| `str`(单任务) | delegate | ❌ | 完整(除 agent/evaluate 外) |
+| `List[str]`(多任务) | explore | ✅ | 只读(read_file, grep_content, glob_files, goal) |
 
 **messages 参数**:
 - `None`:无预置消息
@@ -937,22 +951,53 @@ async def agent(
 
 运行时判断:`messages[0]` 是 dict → 1D 共享;是 list → 2D per-agent。
 
-**agent_type 与 Presets**:
-- 通过 `agent_type` 参数指定预定义的 Agent 配置(工具权限、system prompt、skills 等)
-- 项目可在 `presets.json` 中定义自定义 preset,支持从 `.prompt` 文件加载 system prompt
-- 详见 `agent/docs/architecture.md` 的 "Agent 预设" 章节和 `examples/production/` 示例
-
-**单任务(delegate)**:
-- 适合委托专门任务(如代码分析、文档生成)
-- 完整工具权限,可执行复杂操作
-- 支持 `continue_from` 参数续跑已有 Sub-Trace
-
-**多任务(explore)**:
-- 适合对比多个方案(如技术选型、架构设计)
-- 使用 `asyncio.gather()` 并行执行,显著提升效率
-- 每个任务创建独立的 Sub-Trace,互不干扰
-- 只读权限(文件系统层面),可使用 goal 工具管理计划
-- 不支持 `continue_from`
+**单任务(delegate)**:适合委托专门任务,完整工具权限;支持 `continue_from` 续跑已有 Sub-Trace。
+**多任务(explore)**:适合对比多个方案,并行执行,只读权限,不支持 `continue_from`。
+
+#### 远端模式
+
+`remote_` 前缀的 `agent_type` 通过 HTTP 调用 KnowHub 服务器的 `POST /api/agent` 端点执行。
+
+| 字段 | 客户端传 | 服务器说了算 |
+|------|---------|-------------|
+| `agent_type` / `task` / `messages` / `continue_from` | ✓ | — |
+| `skills` / `tool_groups` / `model` / prompt | 传了也会被服务器忽略 | ✓(由服务器 preset 决定) |
+
+**理由**:固定服务器端的能力包络避免越权(如客户端请求 `tool_groups=["knowledge_internal"]`),同时让 Agent 升级变成服务器单方面部署。
+
+**限制**:
+- 远端模式**只支持单任务**(`task: str`)。需要并行时在客户端 fan-out 发多个请求。
+- 不支持跨终端文件共享——输入输出全走 message。
+
+**续跑**:完全由 caller 负责记住和传入 `continue_from`(服务器不再维护 `caller_trace_id → sub_trace_id` 的映射)。首次调用不传 `continue_from`,服务器创建新 trace 并在返回值中给出 `sub_trace_id`;下次调用时 caller 把它作为 `continue_from` 传回。
+
+**返回值**:本地和远端统一返回 `{status, sub_trace_id, summary, stats}`。`summary` 是 Agent 最终产出的 message 文本——Agent 之间通过 message 通信,不定义 per-agent 的返回 schema。如果需要结构化输出(引用来源、ID 列表等),由 Agent 的 prompt 约定写进 message 文本,调用方自行 parse。
+
+#### CLI 使用
+
+`agent` 工具支持命令行调用,主要用于远端 Agent(本地 Agent 需要父 trace 上下文,不适合 CLI)。实现:`agent/tools/builtin/subagent.py` 的 `__main__` 入口。
+
+```bash
+# 首次调用
+python -m agent.tools.builtin.subagent \
+    --agent_type=remote_librarian \
+    --task="ControlNet 相关的工具知识"
+# 返回 JSON,包含 sub_trace_id
+
+# 续跑
+python -m agent.tools.builtin.subagent \
+    --agent_type=remote_librarian \
+    --task="再补充 IP-Adapter 的" \
+    --continue_from=<sub_trace_id>
+```
+
+基于此 CLI,项目在 `~/.claude/skills/` 或项目 skills 目录可以注册 user skill,让 Claude Code 也能通过 skill 调用远端 Agent——模式和 `toolhub` / `knowhub` skill 一致。
+
+#### `agent_type` 与 Presets
+
+- 本地 `agent_type`:在项目 `presets.json` 中定义(工具权限、system prompt、skills 等),支持从 `.prompt` 文件加载 system prompt
+- 远端 `agent_type`:在**服务器** `knowhub/agents/` 下定义(如 `knowhub/agents/research.py`),客户端 presets 不需要配置
+- 详见 `agent/docs/architecture.md` 的 "Agent 预设" 章节
 
 ### Evaluate 工具
 

+ 5 - 4
agent/tools/builtin/__init__.py

@@ -18,7 +18,10 @@ from agent.tools.builtin.skill import skill, list_skills
 from agent.tools.builtin.subagent import agent, evaluate
 # sandbox 工具已废弃(2026-04);search.py / crawler.py 已重构为 content/ 工具族(2026-04)
 from agent.tools.builtin.knowledge import(knowledge_search,knowledge_save,knowledge_list,knowledge_update,knowledge_batch_update,knowledge_slim)
-from agent.tools.builtin.librarian import ask_knowledge, upload_knowledge
+# 知识上传/查询已统一到 agent 工具:
+#   agent(agent_type="remote_librarian", task=...)         # 查询
+#   agent(agent_type="remote_librarian_ingest", task=...)  # 上传(异步)
+#   agent(agent_type="remote_research", task=...)          # 深度调研
 from agent.tools.builtin.context import get_current_context
 from agent.tools.builtin.toolhub import toolhub_health, toolhub_search, toolhub_call
 from agent.tools.builtin.resource import resource_list_tools, resource_get_tool
@@ -44,9 +47,7 @@ __all__ = [
     # 系统工具
     "bash_command",
     "skill",
-    # 知识管理(新架构 - 通过 IM 与 Knowledge Manager 交互)
-    "ask_knowledge",
-    "upload_knowledge",
+    # 知识管理:统一通过 agent(agent_type="remote_librarian" / "remote_librarian_ingest" / "remote_research")
     # 知识管理(旧架构 - 直接 HTTP API,仅供 Knowledge Manager 内部使用)
     # "knowledge_search",
     # "knowledge_save",

+ 0 - 236
agent/tools/builtin/librarian.py

@@ -1,236 +0,0 @@
-"""
-Knowledge Manager 工具 - 通过 HTTP API 与 KnowHub 交互
-
-提供两个工具:
-- ask_knowledge: 查询知识库(同步阻塞,等待 Librarian Agent 整合回答)
-- upload_knowledge: 上传调研结果(异步,校验后立即返回)
-
-通过 KnowHub HTTP API 调用,不依赖 IM。
-"""
-
-import os
-import json
-import logging
-from typing import Optional, Dict, Any
-import httpx
-from agent.tools import tool, ToolResult, ToolContext
-
-logger = logging.getLogger(__name__)
-
-KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:9999").rstrip("/")
-
-
-@tool(
-    hidden_params=["context"],
-    inject_params={
-        "trace_id": {"mode": "default", "key": "trace_id"},
-    },
-    groups=["knowledge"],
-)
-async def ask_knowledge(
-    query: str,
-    trace_id: str = "",
-    context: Optional[ToolContext] = None,
-) -> ToolResult:
-    """
-    向知识库查询信息(同步阻塞,等待整合回答)
-
-    KnowHub 内部使用 Librarian Agent 整合检索结果,返回带引用的回答。
-    同一 trace_id 的多次查询复用同一个 Librarian Agent,积累任务理解。
-
-    Args:
-        query: 查询内容(如:"ControlNet 相关的工具知识")
-        trace_id: 调用方的 trace_id,用于 Librarian Agent 续跑
-        context: 工具上下文
-
-    Returns:
-        整合回答 + source_ids + 各 source 摘要
-    """
-    try:
-        async with httpx.AsyncClient(timeout=300.0) as client:
-            response = await client.post(
-                f"{KNOWHUB_API}/api/knowledge/ask",
-                json={
-                    "query": query,
-                    "trace_id": trace_id,
-                }
-            )
-            response.raise_for_status()
-            result = response.json()
-
-        source_ids = result.get("source_ids", [])
-        sources = result.get("sources", [])
-        resp_text = result.get("response", "")
-
-        return ToolResult(
-            title=f"📚 知识库查询结果({len(source_ids)} 条来源)",
-            output=resp_text,
-            metadata={
-                "source_ids": source_ids,
-                "sources": sources,
-            }
-        )
-
-    except httpx.HTTPStatusError as e:
-        # ask 端点不可用时降级到直接搜索
-        if e.response.status_code == 404:
-            logger.warning("ask 端点不可用,降级到 knowledge_search")
-            from agent.tools.builtin.knowledge import knowledge_search
-            fallback = await knowledge_search(query=query, top_k=5, min_score=3)
-            return ToolResult(
-                title="📚 知识库查询结果(直连)",
-                output=fallback.output,
-                metadata={"source": "fallback", "raw": fallback.metadata}
-            )
-        raise
-
-    except Exception as e:
-        logger.error(f"查询知识库失败: {e}")
-        # 网络错误也降级
-        logger.warning("ask 请求失败,降级到 knowledge_search")
-        try:
-            from agent.tools.builtin.knowledge import knowledge_search
-            fallback = await knowledge_search(query=query, top_k=5, min_score=3)
-            return ToolResult(
-                title="📚 知识库查询结果(直连)",
-                output=fallback.output,
-                metadata={"source": "fallback", "raw": fallback.metadata}
-            )
-        except Exception as e2:
-            return ToolResult(
-                title="❌ 查询失败",
-                output=f"错误: {str(e)}(降级也失败: {str(e2)})",
-                error=str(e)
-            )
-
-
-@tool(
-    hidden_params=["context"],
-    inject_params={
-        "trace_id": {"mode": "default", "key": "trace_id"},
-    },
-    groups=["knowledge"],
-)
-async def upload_knowledge(
-    data: Dict[str, Any],
-    source_type: str = "research",
-    finalize: bool = False,
-    trace_id: str = "",
-    context: Optional[ToolContext] = None,
-) -> ToolResult:
-    """
-    上传调研结果或执行经验到知识库(异步,校验后立即返回)
-
-    KnowHub 校验格式后立即返回,后台队列处理去重和入库。
-
-    Args:
-        data: 结构化数据,包含:
-            - tools: 工具列表
-            - resources: 资源列表
-            - knowledge: 知识列表
-        source_type: 数据来源分类。调研结果填 "research",执行经验填 "execution"。
-        finalize: 是否最终提交(True=入库,False=仅缓冲)
-        trace_id: 调用方的 trace_id
-        context: 工具上下文
-
-    Returns:
-        上传确认(立即返回,不等待处理完成)
-    """
-    try:
-        if isinstance(data, str):
-            import json
-            data = json.loads(data)
-
-        # 标记来源类型
-        if "knowledge" in data and isinstance(data["knowledge"], list):
-            for k in data["knowledge"]:
-                if "source" not in k:
-                    k["source"] = {}
-                if "category" not in k["source"]:
-                    k["source"]["category"] = source_type
-
-        async with httpx.AsyncClient(timeout=30.0) as client:
-            response = await client.post(
-                f"{KNOWHUB_API}/api/knowledge/upload",
-                json={
-                    "data": data,
-                    "trace_id": trace_id,
-                    "finalize": finalize,
-                }
-            )
-            response.raise_for_status()
-
-        summary = []
-        if data.get("tools"):
-            summary.append(f"工具: {len(data['tools'])} 个")
-        if data.get("resources"):
-            summary.append(f"资源: {len(data['resources'])} 个")
-        if data.get("knowledge"):
-            summary.append(f"知识: {len(data['knowledge'])} 个")
-
-        action = "最终提交" if finalize else f"增量上传({source_type})"
-
-        return ToolResult(
-            title=f"✅ {action}成功",
-            output=f"已提交到 KnowHub\n\n" + "\n".join(f"- {s}" for s in summary),
-            long_term_memory=f"{action}: {', '.join(summary)}",
-            metadata={"finalize": finalize}
-        )
-
-    except Exception as e:
-        logger.error(f"上传知识失败: {e}")
-        return ToolResult(
-            title="❌ 上传失败",
-            output=f"错误: {str(e)}",
-            error=str(e)
-        )
-
-
-if __name__ == "__main__":
-    import sys
-    import asyncio
-
-    COMMANDS = {
-        "ask": ask_knowledge,
-        "upload": upload_knowledge,
-    }
-
-    def _parse_args(argv):
-        kwargs = {}
-        for arg in argv:
-            if arg.startswith("--") and "=" in arg:
-                k, v = arg.split("=", 1)
-                k = k.lstrip("-").replace("-", "_")
-                try:
-                    import json as _json
-                    v = _json.loads(v)
-                except (json.JSONDecodeError, ValueError):
-                    pass
-                kwargs[k] = v
-        return kwargs
-
-    if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
-        print(f"用法: python {sys.argv[0]} <command> [--key=value ...]")
-        print(f"可用命令: {', '.join(COMMANDS.keys())}")
-        print(f"示例: python {sys.argv[0]} ask --query='ControlNet 相关的工具'")
-        sys.exit(0)
-
-    cmd = sys.argv[1]
-    if cmd not in COMMANDS:
-        print(f"未知命令: {cmd},可用: {', '.join(COMMANDS.keys())}")
-        sys.exit(1)
-
-    kwargs = _parse_args(sys.argv[2:])
-
-    # trace_id:CLI 参数 > 环境变量 > 自动生成
-    if "trace_id" not in kwargs:
-        import uuid
-        kwargs["trace_id"] = os.getenv("TRACE_ID", f"cli-{uuid.uuid4().hex[:8]}")
-
-    result = asyncio.run(COMMANDS[cmd](**kwargs))
-    out = {"trace_id": kwargs.get("trace_id", ""), "output": result.output}
-    if result.error:
-        out["error"] = result.error
-    if result.metadata:
-        out["metadata"] = result.metadata
-    print(json.dumps(out, ensure_ascii=False, indent=2))

+ 144 - 8
agent/tools/builtin/subagent.py

@@ -1,11 +1,14 @@
 """
 Sub-Agent 工具 - agent / evaluate
 
-agent: 创建 Agent 执行任务(单任务 delegate 或多任务并行 explore)
+agent: 创建子 Agent 执行任务。
+  - 本地:`agent_type` 无 `remote_` 前缀,进程内执行(单任务 delegate / 多任务并行 explore)
+  - 远端:`agent_type` 以 `remote_` 开头,HTTP 路由到 KnowHub 服务器的 /api/agent
 evaluate: 评估目标执行结果是否满足要求
 """
 
 import asyncio
+import os
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Union
 
@@ -16,6 +19,13 @@ from agent.trace.goal_models import GoalTree
 from agent.trace.websocket import broadcast_sub_trace_started, broadcast_sub_trace_completed
 
 
+# ===== 远端路由常量 =====
+
+REMOTE_PREFIX = "remote_"
+KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:9999").rstrip("/")
+REMOTE_AGENT_TIMEOUT = float(os.getenv("REMOTE_AGENT_TIMEOUT", "600"))  # 秒
+
+
 # ===== prompts =====
 
 # ===== 评估任务 =====
@@ -608,9 +618,66 @@ async def _run_agents(
         return formatted
 
 
+# ===== 远端 Agent 路由 =====
+
+async def _run_remote_agent(
+    agent_type: str,
+    task: str,
+    messages: Optional[Messages],
+    continue_from: Optional[str],
+    skills: Optional[List[str]] = None,
+) -> Dict[str, Any]:
+    """
+    通过 HTTP 调用 KnowHub 服务器上的远端 Agent。
+
+    远端 Agent 的 tools / model / prompt 由服务器端 preset 决定。
+    skills 由 caller 指定,服务器按 agent_type 的白名单过滤。
+    """
+    import httpx
+
+    payload = {
+        "agent_type": agent_type,
+        "task": task,
+        "messages": messages,
+        "continue_from": continue_from,
+        "skills": skills,
+    }
+
+    try:
+        async with httpx.AsyncClient(timeout=REMOTE_AGENT_TIMEOUT) as client:
+            response = await client.post(f"{KNOWHUB_API}/api/agent", json=payload)
+            response.raise_for_status()
+            result = response.json()
+
+        return {
+            "mode": "remote",
+            "agent_type": agent_type,
+            "sub_trace_id": result.get("sub_trace_id"),
+            "status": result.get("status", "completed"),
+            "summary": result.get("summary", ""),
+            "stats": result.get("stats", {}),
+            "error": result.get("error"),
+        }
+
+    except httpx.HTTPStatusError as e:
+        return {
+            "mode": "remote",
+            "agent_type": agent_type,
+            "status": "failed",
+            "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}",
+        }
+    except Exception as e:
+        return {
+            "mode": "remote",
+            "agent_type": agent_type,
+            "status": "failed",
+            "error": f"远端调用失败: {type(e).__name__}: {e}",
+        }
+
+
 # ===== 工具定义 =====
 
-@tool(description="创建 Agent 执行任务", hidden_params=["context"], groups=["core"])
+@tool(description="创建 Agent 执行任务(本地执行或路由到远端服务器,由 agent_type 决定)", hidden_params=["context"], groups=["core"])
 async def agent(
     task: Union[str, List[str]],
     messages: Optional[Union[Messages, List[Messages]]] = None,
@@ -620,19 +687,41 @@ async def agent(
     context: Optional[dict] = None,
 ) -> Dict[str, Any]:
     """
-    创建 Agent 执行任务。
+    创建 Agent 执行任务。
 
-    单任务 (task: str): delegate 模式,全量工具
-    多任务 (task: List[str]): explore 模式,只读工具,并行执行
+    路由规则:
+    - agent_type 以 "remote_" 开头:HTTP 调用 KnowHub 服务器的 /api/agent(仅单任务,无本地文件访问)
+    - 否则本地执行:单任务 str = delegate(全量工具);多任务 List[str] = explore(并行、只读)
 
     Args:
-        task: 任务描述。字符串=单任务,列表=多任务并行
+        task: 任务描述。字符串=单任务,列表=多任务并行(远端模式只支持单任务)
         messages: 预置消息。1D 列表=所有 agent 共享;2D 列表=per-agent
         continue_from: 继续已有 trace(仅单任务)
-        agent_type: 子 Agent 类型,决定 preset 和默认 skills(如 "deconstruct")
-        skills: 附加到 system prompt 的 skill 名称列表,覆盖 preset 默认值
+        agent_type: 子 Agent 类型。带 "remote_" 前缀走远端;否则本地 preset
+        skills: 指定本次调用使用的 skill 列表
+                - 本地:附加到 system prompt
+                - 远端:由服务器按 agent_type 白名单过滤(如 remote_librarian 允许 ask_strategy / upload_strategy)
         context: 框架自动注入的上下文
     """
+    # 远端路由:agent_type 以 remote_ 开头
+    if agent_type and agent_type.startswith(REMOTE_PREFIX):
+        if not isinstance(task, str):
+            return {"status": "failed", "error": "remote agent 只支持单任务 (task: str)"}
+        # 归一化 messages:远端只接受 1D Messages 或 None
+        remote_msgs: Optional[Messages] = None
+        if messages is not None:
+            if messages and isinstance(messages[0], list):
+                return {"status": "failed", "error": "remote agent 不支持 2D messages (per-agent)"}
+            remote_msgs = messages
+        return await _run_remote_agent(
+            agent_type=agent_type,
+            task=task,
+            messages=remote_msgs,
+            continue_from=continue_from,
+            skills=skills,
+        )
+
+    # 本地路径:需要 context
     if not context:
         return {"status": "failed", "error": "context is required"}
 
@@ -841,3 +930,50 @@ async def evaluate(
             "error": error_msg,
             "sub_trace_id": sub_trace_id,
         }
+
+
+# ===== CLI 入口(仅支持远端 agent;本地需要父 trace 上下文,不适合 CLI)=====
+
+if __name__ == "__main__":
+    import argparse
+    import json as _json
+    import sys as _sys
+
+    parser = argparse.ArgumentParser(
+        description="调用远端 Agent(KnowHub 服务器端 /api/agent)",
+    )
+    parser.add_argument("--agent_type", required=True,
+                        help="远端 Agent 类型,必须以 'remote_' 开头(如 remote_librarian, remote_research)")
+    parser.add_argument("--task", required=True, help="任务描述")
+    parser.add_argument("--continue_from", default=None,
+                        help="已有 sub_trace_id,传入则续跑该 trace")
+    parser.add_argument("--messages", default=None,
+                        help="预置消息(JSON 字符串,OpenAI 格式的 list of dict)")
+    parser.add_argument("--skills", default=None,
+                        help="指定 skill 列表,逗号分隔(如 ask_strategy,upload_strategy);服务器按白名单过滤")
+    args = parser.parse_args()
+
+    if not args.agent_type.startswith(REMOTE_PREFIX):
+        print(f"错误:agent_type 必须以 '{REMOTE_PREFIX}' 开头,收到: {args.agent_type}",
+              file=_sys.stderr)
+        _sys.exit(2)
+
+    msgs = None
+    if args.messages:
+        try:
+            msgs = _json.loads(args.messages)
+        except _json.JSONDecodeError as e:
+            print(f"错误:--messages 不是合法 JSON: {e}", file=_sys.stderr)
+            _sys.exit(2)
+
+    skills_list = [s.strip() for s in args.skills.split(",") if s.strip()] if args.skills else None
+
+    result = asyncio.run(_run_remote_agent(
+        agent_type=args.agent_type,
+        task=args.task,
+        messages=msgs,
+        continue_from=args.continue_from,
+        skills=skills_list,
+    ))
+    print(_json.dumps(result, ensure_ascii=False, indent=2))
+    _sys.exit(0 if result.get("status") == "completed" else 1)

+ 21 - 16
agent/trace/goal_tool.py

@@ -46,31 +46,39 @@ async def inject_knowledge_for_goal(
         return None
 
     try:
-        from agent.tools.builtin.librarian import ask_knowledge
+        import re
+        from agent.tools.builtin.subagent import _run_remote_agent
 
         logger.info(f"[Knowledge Inject] goal: {goal.id}, query: {goal.description[:80]}")
 
-        # 通过 ask 接口获取整合回答
-        ask_result = await ask_knowledge(
-            query=goal.description,
-            trace_id=trace_id or "",
+        # 通过统一 agent 工具调用远端 Librarian,明确走 ask 策略
+        result = await _run_remote_agent(
+            agent_type="remote_librarian",
+            task=goal.description,
+            messages=None,
+            continue_from=None,  # 知识注入每次独立查询,不续跑
+            skills=["ask_strategy"],
         )
 
-        metadata = ask_result.metadata or {}
-        source_ids = metadata.get("source_ids", [])
-        sources = metadata.get("sources", [])
-        response_text = ask_result.output or ""
+        if result.get("status") != "completed":
+            logger.warning(f"[Knowledge Inject] 远端 Librarian 调用失败: {result.get('error')}")
+            goal.knowledge = []
+            return None
+
+        response_text = result.get("summary", "")
+
+        # 从 summary 中解析 source_ids(Librarian prompt 约定以 knowledge-xxx 形式引用)
+        source_ids = re.findall(r'\[?(knowledge-[a-zA-Z0-9_-]+)\]?', response_text)
+        source_ids = list(dict.fromkeys(source_ids))  # 去重保序
 
         if source_ids:
-            # 构建 goal.knowledge(兼容现有格式)
-            goal.knowledge = sources if sources else [{"id": sid} for sid in source_ids]
+            goal.knowledge = [{"id": sid} for sid in source_ids]
             knowledge_count = len(source_ids)
             logger.info(f"[Knowledge Inject] 注入 {knowledge_count} 条知识到 goal {goal.id}")
 
             if store and trace_id:
                 await store.update_goal_tree(trace_id, tree)
 
-                # 写入 cognition_log: query 事件
                 if sequence is not None:
                     await store.append_cognition_event(
                         trace_id=trace_id,
@@ -81,10 +89,7 @@ async def inject_knowledge_for_goal(
                             "query": goal.description,
                             "response": response_text[:2000],
                             "source_ids": source_ids,
-                            "sources": [
-                                {"id": s.get("id", ""), "task": s.get("task", ""), "content": s.get("content", "")[:500]}
-                                for s in sources
-                            ],
+                            "sources": [],
                         }
                     )
                     logger.info(f"[Knowledge Inject] 已记录 query 事件到 cognition_log")

+ 64 - 0
examples/process/config.py

@@ -0,0 +1,64 @@
+"""
+项目配置
+"""
+
+from agent.core.runner import KnowledgeConfig, RunConfig
+
+
+# ===== Stage 1:调研配置 =====
+
+RESEARCH_RUN_CONFIG = RunConfig(
+    model="qwen3.5-plus",
+    temperature=0.3,
+    max_iterations=1000,
+    extra_llm_params={"extra_body": {"enable_thinking": True}},
+    agent_type="main",
+    name="工具调研与文档生成",
+    knowledge=KnowledgeConfig(
+        enable_extraction=False,
+        enable_completion_extraction=True,
+        enable_injection=False,
+        owner="sunlit.howard@gmail.com",
+        default_tags={"project": "tool_research", "domain": "ai_agent"},
+        default_scopes=["org:cybertogether"],
+        default_search_types=["tool", "guide"],
+        default_search_owner="sunlit.howard@gmail.com"
+    )
+)
+
+# ===== Stage 2:分析配置 =====
+
+ANALYSIS_RUN_CONFIG = RunConfig(
+    model="anthropic/claude-sonnet-4-6",
+    temperature=0.3,
+    max_iterations=500,
+    agent_type="coordinator",
+    name="工作流分析 Pipeline",
+)
+
+ANALYSIS_MODEL = "anthropic/claude-sonnet-4-6"
+
+# ===== 输出目录 =====
+
+RESEARCH_OUTPUT_DIR = "examples/tool_research_v2/output"            # Stage 1 输出根目录
+ANALYSIS_OUTPUT_DIR = "examples/tool_research_v2/output/analysis"   # Stage 2 输出目录
+
+
+# ===== 基础设施配置 =====
+
+SKILLS_DIR = "./skills"
+TRACE_STORE_PATH = ".trace"
+DEBUG = True
+LOG_LEVEL = "INFO"
+LOG_FILE = None
+
+# ===== 浏览器配置 =====
+BROWSER_TYPE = "local"
+HEADLESS = False
+
+# ===== IM 配置 =====
+IM_ENABLED = True
+IM_CONTACT_ID = "agent_research"
+IM_SERVER_URL = "ws://43.106.118.91:8105"
+IM_WINDOW_MODE = True
+IM_NOTIFY_INTERVAL = 10.0

+ 27 - 0
examples/process/presets.json

@@ -0,0 +1,27 @@
+{
+  "main": {
+    "max_iterations": 1000,
+    "skills": ["planning"],
+    "description": "主 Agent - 调研任务管理与协调"
+  },
+  "research": {
+    "system_prompt_file": "prompts/research.prompt",
+    "max_iterations": 200,
+    "temperature": 0.3,
+    "skills": ["planning", "research", "browser"],
+    "description": "调研 Agent - 根据指令搜索策略、工具、方法论等信息"
+  },
+  "coordinator": {
+    "system_prompt_file": "prompts/coordinator.prompt",
+    "max_iterations": 500,
+    "skills": ["planning"],
+    "description": "工作流分析协调器:编排4步分析流程"
+  },
+  "analyst": {
+    "system_prompt_file": "prompts/analyst.prompt",
+    "max_iterations": 200,
+    "temperature": 0.3,
+    "skills": [],
+    "description": "分析子 Agent:执行单步分析任务(意图归纳、聚类、变体发现、粗工序提取)"
+  }
+}

+ 23 - 0
examples/process/prompts/analyst.prompt

@@ -0,0 +1,23 @@
+---
+temperature: 0.3
+---
+
+$system$
+
+## 角色
+你是一个专注的工作流分析专家。你负责执行工作流分析 Pipeline 中的单步分析任务,包括:
+- 意图归纳:将一条工作流的技术步骤归纳为意图级描述
+- 语义聚类:对跨工作流的意图列表做语义聚类,发现能力模块
+- 变体发现:在一个能力模块内发现不同实现变体
+- 粗工序总结:对一个品类的工作流总结因果性编排思路
+
+## 核心原则
+1. **忠实于数据**:不臆造不存在于原始步骤中的意图或工具
+2. **意图粒度判断**:归纳意图时,由你根据上下文判断合并粒度,目标是让不同工作流可比较
+3. **结构化输出**:始终返回符合任务要求的 JSON 格式
+4. **因果推理**:总结粗工序时,必须解释"为什么先做 A 再做 B",而不是简单列举频次
+
+## 输出规范
+- 意图描述:20 字以内,动宾结构(如"锁定产品物理结构")
+- 能力模块名称:4-8 字,品类无关(如"定义风格"而非"电商风格定义")
+- 所有输出必须是合法 JSON,不要包含 markdown 代码块标记

+ 179 - 0
examples/process/prompts/coordinator.prompt

@@ -0,0 +1,179 @@
+---
+temperature: 0.3
+---
+
+$system$
+
+## 角色
+你是一个工作流分析协调器。你的任务是对已采集的 AI 创作工作流数据进行结构化分析,提炼出可复用的工序体系(细工序 + 粗工序),并输出完整的结构化 JSON 报告。
+
+## 输入数据格式
+你将收到一个 JSON 对象,包含:
+- `workflows`:工作流列表,每条工作流有 `id`(如 `wf_001`)、`name`、`category`(可能为空)、`source_channel`、`steps`(步骤数组,每步有 `步骤描述`、`使用工具`、`用户输入`、`输出结果`)
+
+## 可用工具
+- `agent`:调用 analyst 子 agent 执行分析任务
+- `write_file`:写中间结果和最终输出
+- `read_file`:读取中间结果
+
+## 原子能力表
+在 `%output_dir%/atomic_capabilities.json` 中已预先准备好原子能力表,格式为:
+```json
+{
+  "atomic_capabilities": [
+    {
+      "id": "能力ID",
+      "name": "能力名称",
+      "description": "能力描述",
+      "criterion": "判断标准"
+    }
+  ]
+}
+```
+在 Step 2b 匹配时,使用 `read_file` 读取该文件。
+
+## 执行流程
+
+### Step 1:意图归纳(粒度统一化)
+
+对**每一条**工作流,调用一个 analyst 子 agent,任务如下:
+```
+读取工作流 {wf_id} 的完整步骤,将步骤重新归纳为「意图级描述」。
+规则:
+- 允许把多个连续步骤合并为一个意图(如9个技术步骤 → 3个意图)
+- 允许一个步骤就对应一个意图
+- 意图粒度由你根据上下文判断,目标是让不同工作流的步骤在意图层面可比较
+- 每个意图记录:意图描述(20字以内)、涉及的原始步骤序号、核心工具名称
+输出为 JSON 数组。
+```
+
+将所有工作流的意图归纳结果合并,写入 `%output_dir%/step1_intents.json`,格式:
+```json
+{
+  "wf_001": [
+    {"intent": "意图描述", "source_steps": [步骤序号], "tools": ["工具名称"]},
+    ...
+  ]
+}
+```
+
+**注意**:Step 1 可以并行调用多个 analyst(每条工作流一个),提高效率。
+
+---
+
+### Step 2:能力模块聚类(细工序发现)
+
+读取 `step1_intents.json`,收集所有工作流的所有意图描述(约 N×3 个)。
+
+**Step 2a:LLM 聚类**
+
+调用一个 analyst 子 agent,任务如下:
+```
+对以下意图列表做跨工作流语义聚类,发现品类无关的「能力模块」(细工序)。
+规则:
+- 语义相近的意图归为一个能力模块
+- 能力模块应品类无关,可跨品类复用
+- 目标:10-20 个能力模块
+每个模块输出:模块ID(cm_001起)、模块名称、描述、包含的意图列表(带来源工作流ID)
+```
+
+**Step 2b:原子能力匹配**
+
+用 `read_file` 读取 `%output_dir%/atomic_capabilities.json`,对每个能力模块在原子能力表中查找语义最接近的原子能力:
+- 如果找到语义相近的原子能力,记录其 `id` 和 `name`,并估算相似度分数(0-1)
+- 如果没有语义相近的原子能力,则标记 `atomic_capability_id: null`,表示这是新发现的能力模块
+
+将最终结果写入 `%output_dir%/step2_modules.json`,每个模块包含以下字段:
+- 匹配成功:`atomic_capability_id`(原子能力ID)、`atomic_capability_name`(原子能力名称)、`atomic_match_score`(相似度分数)
+- 未匹配:三个字段均为 `null`
+
+---
+
+### Step 3:变体发现
+
+读取 `step2_modules.json` 和原始工作流数据。
+
+对**每个能力模块**,调用一个 analyst 子 agent,任务如下:
+```
+分析能力模块「{module_name}」下的所有原始步骤(来自不同工作流),发现实现变体。
+规则:
+- 按工具/方法的相似性聚类
+- 对每个变体,聚合其典型执行步骤(按步骤功能去重合并,保留最具代表性的描述)
+- 注明每个变体来自哪些工作流
+```
+
+将所有模块的变体结果合并,更新 `step2_modules.json` 中对应模块的 `variants` 字段。
+
+---
+
+### Step 4:粗工序提取
+
+读取原始工作流数据和 `step2_modules.json`。
+
+**4a 品类分组**:调用一个 analyst 子 agent,按内容品类对所有工作流分组。
+
+**4b 粗工序总结**:对**每个品类**,调用一个 analyst 子 agent,任务如下:
+```
+读取品类「{category}」下所有工作流的能力模块编排顺序,总结:
+1. 做这类内容的典型思路(能力模块的编排顺序)
+2. 为什么先做 A 再做 B(因果推理,不是统计频次)
+输出:粗工序名称、步骤列表(能力模块引用)、因果推理说明、来源工作流列表
+```
+
+---
+
+### 最终输出
+
+将所有结果整合,写入 `%output_path%`,格式见 `输出格式规范`。
+
+## 输出格式规范
+
+```json
+{
+  "capability_modules": [
+    {
+      "id": "模块ID",
+      "name": "模块名称",
+      "description": "模块描述",
+      "atomic_capability_id": "原子能力ID或null",
+      "atomic_capability_name": "原子能力名称或null",
+      "atomic_match_score": "相似度分数或null",
+      "variants": [
+        {
+          "name": "变体名称",
+          "typical_steps": [
+            {"step": "步骤名称", "detail": "步骤详情"}
+          ],
+          "source_workflows": ["工作流ID"]
+        }
+      ]
+    }
+  ],
+  "coarse_workflows": [
+    {
+      "id": "粗工序ID",
+      "category": "品类名称",
+      "rationale": "因果推理说明",
+      "steps": [
+        {"module_id": "能力模块ID", "module_name": "能力模块名称", "note": "备注"}
+      ],
+      "source_workflows": ["工作流ID"]
+    }
+  ],
+  "provenance": {
+    "工作流ID": {
+      "source_channel": "来源渠道",
+      "source_name": "来源名称",
+      "category": "品类"
+    }
+  }
+}
+```
+
+$user$
+请开始分析以下工作流数据:
+
+%workflows_json%
+
+输出目录(中间文件):%output_dir%
+最终输出路径:%output_path%

+ 80 - 0
examples/process/prompts/research.prompt

@@ -0,0 +1,80 @@
+---
+temperature: 0.3
+---
+
+$system$
+## 角色
+你是一个专注的渠道调研专家。你负责在指定的单个渠道(如小红书、X、youtube)进行完整的广度调研,包括多关键词搜索、适度查看内容,并输出结构化的【工序(工作流)】调研结果。
+
+## 核心原则
+1. **渠道专注**:你只负责一个渠道的完整调研,绝不跨渠道。
+2. **唯工序论**:本调研**只关注能解决问题的多步工序(Workflow/组合方案)**,不搜集单一零散工具。
+3. **结构化提取**:必须将工序严格拆解为按序执行的步骤,并明确每个步骤的工具、输入与输出。
+4. **相关性过滤**:只记录与调研目标相关的**图片生成**工序(排除视频、音频及非AI桌面软件如PS)。
+
+## 可用工具
+### 内容搜索工具
+- `search_posts(keyword, channel, cursor="0", max_count=20)`: 搜索帖子
+  - **channel 参数**:xhs(小红书), gzh(公众号), zhihu(知乎), bili(B站), douyin(抖音), toutiao(头条), weibo(微博)
+  - 示例:`search_posts("AI 生图工作流", channel="xhs", max_count=20)`
+- `select_post(index)`: 查看帖子详情(需先调用 search_posts)
+  - 示例:`select_post(index=1)`
+- `youtube_search(keyword)`: 搜索 YouTube 视频
+  - 示例:`youtube_search("AI image workflow tutorial")`
+- `youtube_detail(content_id, include_captions=True)`: 获取 YouTube 视频详情和字幕
+  - 示例:`youtube_detail("视频ID", include_captions=True)`
+- `x_search(keyword)`: 搜索 X (Twitter) 内容
+  - 示例:`x_search("AI workflow comfyui")`
+- `browser-use`: 浏览器搜索(search_posts 不好用时使用)
+
+### 接口失败处理策略
+如果搜索接口失败或返回为空:检查参数 -> 重试2次 -> 若仍失败,在JSON中标注 `"渠道状态": "接口失败"` 并返回空数据。
+
+## 执行流程
+
+### 第一步:广度扫描(发现工序)
+1. **初始搜索**:使用任务提供的关键词,每个关键词搜索 20 条结果。
+2. **适度查看内容**:对点赞数高或标题符合业务需求的帖子查看详情(可看图片)。
+3. **提取工序信息**:
+   - 重点识别:这是一套怎样的完整工作流?
+   - **严格拆解步骤**:必须提取每一步的 `工具`、`用户输入(Prompt/参数/参考图)`、`输出结果(阶段性产物)`。
+
+### 第二步:反思与补充搜索
+每完成一轮关键词搜索后,主动评估:
+- 已发现的工序是否覆盖了主要方案?
+- 是否有明显遗漏的渠道热点或关键词变体?
+- 如有不足,调整关键词后补充搜索 1-2 轮,直到结果开始重复为止。
+
+## 输出格式(必须使用 `write_file` 写入指定路径)
+```jsonschema
+{
+  "渠道名称": "string - 小红书/X/YouTube等",
+  "初始关键词": ["string"],
+  "采集时间": "string - ISO 8601",
+  "渠道状态": "string - 正常/接口失败",
+  "工序发现": [
+    {
+      "方案名称": "string - 如 Midjourney + ControlNet + 局部重绘工作流",
+      "最新提及时间": "string",
+      "工序步骤": [
+        {
+          "步骤序号": "number - 如 1, 2, 3",
+          "步骤描述": "string - 该步骤的核心目的",
+          "使用工具": "string - 该步骤使用的具体工具/模型/节点",
+          "用户输入": "string - 用户的真实输入(如:特定风格的Prompt、垫图、Lora权重等)",
+          "输出结果": "string - 该步骤生成的阶段性产物或最终图片效果"
+        }
+      ],
+      "帖子链接": ["string"]
+    }
+  ],
+  "调研轨迹": {
+    "搜索次数": "number",
+    "查看详情数": "number",
+    "发现工序数": "number"
+  }
+}
+```
+
+$user$
+%task%

+ 78 - 0
examples/process/prompts/tool_research.prompt

@@ -0,0 +1,78 @@
+---
+temperature: 0.3
+---
+
+$system$
+
+## 角色
+你是一个工序调研协调器。你负责制定调研策略、分发广度采集任务、交叉验证数据,并输出结构化的工序(Workflow)调研报告。
+
+**核心目标**:
+寻找并汇总能解决特定生图需求的**多步工序(Workflow)**。必须结构化记录每个工序的执行步骤(Step、输入、输出、工具)。
+
+## 可用工具
+- `agent`: 调用 research 子 agent 执行渠道采集
+- `write_file`: 输出调研报告和结构化数据
+
+## 工作流程:广度扫描与汇总
+
+### 第一步:理解需求并构造搜索关键词
+- 将用户的业务能力需求,转换为**AI生图工序/工作流**的搜索词。
+- 关键词应偏向寻找"教程"、"工作流"、"全套方案",而非单一软件名。
+- 准备 3-5 个关键词。
+
+### 第二步:并行调用渠道调研 agent
+默认使用小红书、X、youtube 这三个渠道进行调研。为每个渠道启动一个独立的 research 子 agent。
+
+**任务描述规范示例**:
+```
+[小红书调研] 使用以下关键词进行广度扫描:[关键词1, 关键词2, 关键词3]。
+要求:
+- 每个关键词搜索 20 条结果。
+- 只提取多步工序(Workflow),必须严格拆解每个工序的步骤(包含输入、输出、工具)。
+- 输出路径:%output_dir%/<渠道名>.json
+```
+
+每个子 agent 会将自己的采集结果写入对应的渠道 JSON 文件。
+
+### 第三步:汇总并输出结构化数据
+
+等所有渠道子 agent 完成后,将三个渠道的工序发现合并,执行以下两个输出:
+
+**输出 1:可读报告**(Markdown 格式)
+用 `write_file` 写入 `%output_dir%/report.md`,内容包括:
+- 调研概况表格(渠道、搜索词数、获取结果数、提取工序数)
+- 每个工序的详细步骤拆解(表格形式)
+- 核心工具汇总
+
+**输出 2:结构化工序数据**(JSON 格式,供后续分析使用)
+用 `write_file` 写入 `%output_dir%/workflows.json`,格式如下:
+
+```json
+{
+  "需求": "用户的原始需求描述",
+  "工序发现": [
+    {
+      "方案名称": "方案的简短名称,如 ComfyUI + ControlNet 工作流",
+      "来源渠道": "小红书/X/YouTube",
+      "工序步骤": [
+        {
+          "步骤序号": 1,
+          "步骤描述": "该步骤的核心目的",
+          "使用工具": "该步骤使用的具体工具/模型/节点",
+          "用户输入": "用户的真实输入(如:特定风格的Prompt、垫图、LoRA权重等)",
+          "输出结果": "该步骤生成的阶段性产物或最终图片效果"
+        }
+      ],
+      "帖子链接": ["https://..."]
+    }
+  ]
+}
+```
+
+**重要**:`workflows.json` 是必须输出的,不可省略。即使某些工序步骤信息不完整,也要尽量填充已知字段。
+
+$user$
+请开始工作:调研 %requirement%
+
+输出目录:%output_dir%/

+ 7 - 0
examples/process/requirements.json

@@ -0,0 +1,7 @@
+[
+  "生成真实户外场景中的人物活动照片,画面要呈现自然光线下的街道、公园、游乐场等具体地点环境,人物动作自然生动,背景环境细节丰富真实",
+  "生成人物局部特写画面,如放大呈现嘴巴咬食物、手持物品、耳朵佩戴饰品、鼻子、指甲等身体局部细节,画面填充感强,细节清晰可见",
+  "生成人物近景半身或胸部以上的画面,突出人物面部表情和情绪,背景适当虚化,让观看者能清楚看到人物的神态与互动感",
+  "生成具有强烈光影对比的场景图,画面中光源明显(如阳光折射、水面反光、彩虹色光晕),暗部极深、亮部极亮,整体呈现出戏剧性的明暗反差和光线质感",
+  "生成带有明显颗粒感或纸张纹理的插画风格图片,画面整体像是印刷在粗糙介质上,物体表面有细腻的颗粒噪点或手工绘制的笔触肌理"
+]

+ 62 - 0
examples/process/research.prompt

@@ -0,0 +1,62 @@
+---
+model: qwen3.5-plus
+temperature: 0.3
+---
+
+$system$
+## 角色
+你是一个专注的渠道调研专家。你负责在指定的单个渠道(如小红书、X、youtube)进行完整的广度调研,包括多关键词搜索、适度查看内容,并输出结构化的【工序(工作流)】调研结果。
+
+## 核心原则
+1. **渠道专注**:你只负责一个渠道的完整调研,绝不跨渠道。
+2. **唯工序论**:本调研**只关注能解决问题的多步工序(Workflow/组合方案)**,不搜集单一零散工具。
+3. **结构化提取**:必须将工序严格拆解为按序执行的步骤,并明确每个步骤的工具、输入与输出。
+4. **相关性过滤**:只记录与调研目标相关的**图片生成**工序(排除视频、音频及非AI桌面软件如PS)。
+
+## 可用工具
+### 内容搜索工具
+- `search_posts(keyword, channel, cursor="0", max_count=20)`: 搜索帖子(channel参数必须用双引号小写,如 `"xhs"`)
+- `select_post(index)`: 查看帖子详情
+- `youtube_search(keyword)` / `youtube_detail(content_id)`: 搜索/查看 YouTube
+- `x_search(keyword)`: 搜索 X (Twitter) 内容
+
+### 接口失败处理策略
+如果搜索接口失败或返回为空:检查参数 -> 重试2次 -> 若仍失败,在JSON中标注 `"渠道状态": "接口失败"` 并返回空数据。
+
+## 执行流程:广度扫描(发现工序)
+1. **初始搜索**:使用任务提供的关键词,每个关键词搜索 20 条结果。
+2. **适度查看内容**:对点赞数高或标题符合业务需求的帖子查看详情(可看图片)。
+3. **提取工序信息**:
+   - 重点识别:这是一套怎样的完整工作流?
+   - **严格拆解步骤**:必须提取每一步的 `工具`、`用户输入(Prompt/参数/参考图)`、`输出结果(阶段性产物)`。
+
+第二部结构化输出:
+
+## 输出格式(必须使用 `write_file` 写入指定路径)
+```jsonschema
+{
+  "渠道名称": "string - 小红书/X/公众号等",
+  "初始关键词": ["string"],
+  "采集时间": "string - ISO 8601",
+  "渠道状态": "string - 正常/接口失败",
+  "工序发现": [
+    {
+      "方案名称": "string - 如 Midjourney + ControlNet + 局部重绘工作流",
+      "最新提及时间": "string",
+      "工序步骤": [
+        {
+          "步骤序号": "number - 如 1, 2, 3",
+          "步骤描述": "string - 该步骤的核心目的",
+          "使用工具": "string - 该步骤使用的具体工具/模型/节点",
+          "用户输入": "string - 用户的真实输入(如:特定风格的Prompt、垫图、Lora权重等)",
+          "输出结果": "string - 该步骤生成的阶段性产物或最终图片效果"
+        }
+      ],
+      "帖子链接": {"string"}
+  ]
+  "调研轨迹": {
+    "搜索次数": "number",
+    "查看详情数": "number",
+    "发现工序数": "number"
+  }
+}

+ 546 - 0
examples/process/run.py

@@ -0,0 +1,546 @@
+"""
+两阶段 Pipeline:工具调研 + 工作流分析
+
+Stage 1:批量调研(qwen3.5-plus),每个需求输出到 output/research/NN/
+Stage 2:工作流分析(claude-sonnet),读取 Stage 1 输出,生成 output/analysis/result.json
+
+用法:
+  python run.py                        # 完整两阶段(默认)
+  python run.py --stage research       # 只跑调研
+  python run.py --stage analysis       # 只跑分析(用已有调研结果)
+  python run.py --stage research --from 2  # 从第3个需求续跑
+"""
+
+import argparse
+import json
+import os
+import sys
+import asyncio
+from pathlib import Path
+
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from agent.llm.prompts import SimplePrompt
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_qwen_llm_call, create_openrouter_llm_call
+from agent.cli import InteractiveController
+from agent.utils import setup_logging
+
+from config import (
+    RESEARCH_RUN_CONFIG, ANALYSIS_RUN_CONFIG,
+    RESEARCH_OUTPUT_DIR, ANALYSIS_OUTPUT_DIR,
+    SKILLS_DIR, TRACE_STORE_PATH, DEBUG, LOG_LEVEL, LOG_FILE,
+    IM_ENABLED, IM_CONTACT_ID, IM_SERVER_URL, IM_WINDOW_MODE, IM_NOTIFY_INTERVAL,
+)
+
+
+# ─────────────────────────────────────────────
+# Stage 1 helpers
+# ─────────────────────────────────────────────
+
+async def run_single(
+    runner: AgentRunner,
+    interactive: InteractiveController,
+    store: FileSystemTraceStore,
+    prompt: SimplePrompt,
+    requirement: str,
+    output_dir: Path,
+    task_name: str,
+    req_index: int,
+) -> tuple[str, bool]:
+    """执行单个需求的完整调研流程,返回 (最终响应文本, 是否应退出)。"""
+
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    messages = prompt.build_messages(
+        requirement=requirement,
+        output_dir=str(output_dir),
+    )
+
+    prompt_model = prompt.config.get("model", None)
+    run_config = RunConfig(
+        model=prompt_model or RESEARCH_RUN_CONFIG.model,
+        temperature=RESEARCH_RUN_CONFIG.temperature,
+        max_iterations=RESEARCH_RUN_CONFIG.max_iterations,
+        extra_llm_params=RESEARCH_RUN_CONFIG.extra_llm_params,
+        agent_type=RESEARCH_RUN_CONFIG.agent_type,
+        name=f"{task_name}:需求{req_index:02d}",
+        knowledge=RESEARCH_RUN_CONFIG.knowledge,
+    )
+
+    print(f"\n{'=' * 60}")
+    print(f"[{req_index:02d}] 开始调研")
+    print(f"需求:{requirement[:80]}{'...' if len(requirement) > 80 else ''}")
+    print(f"输出:{output_dir}")
+    print(f"{'=' * 60}")
+
+    current_trace_id = None
+    current_sequence = 0
+    final_response = ""
+    should_exit = False
+
+    try:
+        async for item in runner.run(messages=messages, config=run_config):
+            cmd = interactive.check_stdin()
+            if cmd == 'pause':
+                print("\n⏸️ 正在暂停执行...")
+                if current_trace_id:
+                    await runner.stop(current_trace_id)
+                await asyncio.sleep(0.5)
+                menu_result = await interactive.show_menu(current_trace_id, current_sequence)
+                if menu_result["action"] == "stop":
+                    should_exit = True
+                    break
+                elif menu_result["action"] == "continue":
+                    new_messages = menu_result.get("messages", [])
+                    run_config.after_sequence = menu_result.get("after_sequence")
+                    if new_messages:
+                        messages = new_messages
+                    break
+            elif cmd == 'quit':
+                print("\n🛑 用户请求停止...")
+                if current_trace_id:
+                    await runner.stop(current_trace_id)
+                should_exit = True
+                break
+
+            if isinstance(item, Trace):
+                current_trace_id = item.trace_id
+                if item.status == "running":
+                    print(f"[Trace] 开始: {item.trace_id[:8]}...")
+                elif item.status == "completed":
+                    print(f"\n[Trace] ✅ 完成  messages={item.total_messages}  cost=${item.total_cost:.4f}")
+                elif item.status == "failed":
+                    print(f"\n[Trace] ❌ 失败: {item.error_message}")
+                elif item.status == "stopped":
+                    print(f"\n[Trace] ⏸️ 已停止")
+
+            elif isinstance(item, Message):
+                current_sequence = item.sequence
+                if item.role == "assistant":
+                    content = item.content
+                    if isinstance(content, dict):
+                        text = content.get("text", "")
+                        tool_calls = content.get("tool_calls")
+                        if text and not tool_calls:
+                            final_response = text
+                            print(f"\n[Response] Agent 回复:")
+                            print(text)
+                        elif text:
+                            preview = text[:150] + "..." if len(text) > 150 else text
+                            print(f"[Assistant] {preview}")
+                elif item.role == "tool":
+                    content = item.content
+                    tool_name = "unknown"
+                    if isinstance(content, dict):
+                        tool_name = content.get("tool_name", "unknown")
+                    if item.description and item.description != tool_name:
+                        desc = item.description[:80] if len(item.description) > 80 else item.description
+                        print(f"[Tool Result] ✅ {tool_name}: {desc}...")
+                    else:
+                        print(f"[Tool Result] ✅ {tool_name}")
+
+    except Exception as e:
+        print(f"\n执行出错: {e}")
+        import traceback
+        traceback.print_exc()
+
+    if final_response:
+        output_file = output_dir / "result.txt"
+        with open(output_file, 'w', encoding='utf-8') as f:
+            f.write(final_response)
+        print(f"\n✓ 结果已保存到: {output_file}")
+
+    if current_trace_id:
+        print(f"  Trace ID: {current_trace_id}")
+
+    return final_response, should_exit
+
+
+# ─────────────────────────────────────────────
+# Stage 2 helpers
+# ─────────────────────────────────────────────
+
+def load_workflows_from_dir(research_dir: Path) -> list[dict]:
+    """
+    扫描 research_dir 下所有子目录(00/, 01/ ...),合并工序发现列表。
+
+    优先读取 workflows.json(Stage 1 新格式);
+    若不存在则把目录内 *.md 文件内容作为文本传给 coordinator(兜底)。
+    """
+    workflows = []
+    wf_index = 1
+
+    subdirs = sorted(
+        [d for d in research_dir.iterdir() if d.is_dir()],
+        key=lambda d: d.name,
+    )
+
+    if not subdirs:
+        # 单次调研输出(直接含 JSON 文件)
+        subdirs = [research_dir]
+
+    for subdir in subdirs:
+        workflows_json_path = subdir / "workflows.json"
+
+        # ── 优先:读取 workflows.json ──
+        if workflows_json_path.exists():
+            try:
+                with open(workflows_json_path, encoding='utf-8') as f:
+                    data = json.load(f)
+                discovered = data.get("工序发现", [])
+                for item in discovered:
+                    wf_id = f"wf_{wf_index:03d}"
+                    wf_index += 1
+                    workflows.append({
+                        "id": wf_id,
+                        "name": item.get("方案名称", "未命名工序"),
+                        "category": "",
+                        "source_channel": item.get("来源渠道", "未知"),
+                        "source_file": str(workflows_json_path.relative_to(research_dir)),
+                        "steps": item.get("工序步骤", []),
+                        "post_links": list(item.get("帖子链接", [])),
+                    })
+                    print(f"   + {wf_id}: {item.get('方案名称', '未命名')[:50]}")
+                continue
+            except (json.JSONDecodeError, IOError) as e:
+                print(f"   [警告] workflows.json 解析失败: {subdir.name} ({e}),尝试 Markdown 兜底")
+
+        # ── 兜底:读取 *.md 文件内容 ──
+        md_files = sorted(subdir.glob("*.md"))
+        if md_files:
+            for md_file in md_files:
+                try:
+                    content = md_file.read_text(encoding='utf-8')
+                    wf_id = f"wf_{wf_index:03d}"
+                    wf_index += 1
+                    workflows.append({
+                        "id": wf_id,
+                        "name": md_file.stem,
+                        "category": "",
+                        "source_channel": "Markdown报告",
+                        "source_file": str(md_file.relative_to(research_dir)),
+                        "steps": [],
+                        "raw_markdown": content,  # coordinator 可直接阅读
+                    })
+                    print(f"   + {wf_id}: [MD兜底] {md_file.name}")
+                except IOError as e:
+                    print(f"   [警告] 无法读取 {md_file.name}: {e}")
+        else:
+            print(f"   [跳过] {subdir.name}:无 workflows.json 也无 .md 文件")
+
+    return workflows
+
+
+async def fetch_atomic_capabilities() -> list[dict]:
+    """从 knowhub API 获取全量原子能力表。"""
+    import urllib.request
+    knowhub_api = os.getenv("KNOWHUB_API", "http://43.106.118.91:9999")
+    url = f"{knowhub_api}/api/capability?limit=500"
+    try:
+        with urllib.request.urlopen(url, timeout=10) as resp:
+            data = json.loads(resp.read().decode())
+        capabilities = data.get("results", [])
+        print(f"   已获取原子能力表:{len(capabilities)} 条")
+        return capabilities
+    except Exception as e:
+        print(f"   [警告] 获取原子能力表失败:{e},将跳过匹配")
+        return []
+
+
+async def run_analysis(
+    research_dir: Path,
+    analysis_dir: Path,
+    store: FileSystemTraceStore,
+    prompt_path: Path,
+) -> bool:
+    """执行 Stage 2 分析,返回是否成功。"""
+
+    print(f"\n{'=' * 60}")
+    print("Stage 2:工作流分析")
+    print(f"输入:{research_dir}")
+    print(f"输出:{analysis_dir}")
+    print(f"{'=' * 60}")
+
+    # 扫描工作流数据
+    print("扫描调研结果...")
+    workflows = load_workflows_from_dir(research_dir)
+    if not workflows:
+        print("   错误: 未找到任何工序数据,请先运行 Stage 1")
+        return False
+    print(f"   共加载 {len(workflows)} 条工作流")
+
+    analysis_dir.mkdir(parents=True, exist_ok=True)
+
+    # 获取原子能力表并写入文件
+    print("获取原子能力表...")
+    atomic_capabilities = await fetch_atomic_capabilities()
+    atomic_capabilities_path = analysis_dir / "atomic_capabilities.json"
+    atomic_capabilities_path.write_text(
+        json.dumps({"atomic_capabilities": atomic_capabilities}, ensure_ascii=False, indent=2),
+        encoding='utf-8'
+    )
+    print(f"   已写入:{atomic_capabilities_path}")
+    output_path = analysis_dir / "result.json"
+
+    # 加载 coordinator prompt
+    prompt = SimplePrompt(prompt_path)
+    workflows_json = json.dumps({"workflows": workflows}, ensure_ascii=False, indent=2)
+    messages = prompt.build_messages(
+        workflows_json=workflows_json,
+        output_dir=str(analysis_dir),
+        output_path=str(output_path),
+    )
+
+    # 创建 Runner(OpenRouter / Claude)
+    prompt_model = prompt.config.get("model", None) or ANALYSIS_RUN_CONFIG.model
+    print(f"   模型: {prompt_model}")
+
+    runner = AgentRunner(
+        trace_store=store,
+        llm_call=create_openrouter_llm_call(model=prompt_model),
+        skills_dir=SKILLS_DIR,
+        debug=DEBUG,
+    )
+    interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True)
+    runner.stdin_check = interactive.check_stdin
+
+    run_config = RunConfig(
+        model=prompt_model,
+        temperature=ANALYSIS_RUN_CONFIG.temperature,
+        max_iterations=ANALYSIS_RUN_CONFIG.max_iterations,
+        agent_type=ANALYSIS_RUN_CONFIG.agent_type,
+        name=f"工作流分析:{len(workflows)} 条工作流",
+    )
+
+    current_trace_id = None
+    current_sequence = 0
+
+    try:
+        async for item in runner.run(messages=messages, config=run_config):
+            cmd = interactive.check_stdin()
+            if cmd == 'pause':
+                print("\n⏸️ 正在暂停...")
+                if current_trace_id:
+                    await runner.stop(current_trace_id)
+                await asyncio.sleep(0.5)
+                menu_result = await interactive.show_menu(current_trace_id, current_sequence)
+                if menu_result["action"] == "stop":
+                    break
+                elif menu_result["action"] == "continue":
+                    new_messages = menu_result.get("messages", [])
+                    run_config.after_sequence = menu_result.get("after_sequence")
+                    if new_messages:
+                        messages = new_messages
+                    break
+            elif cmd == 'quit':
+                print("\n🛑 停止执行...")
+                if current_trace_id:
+                    await runner.stop(current_trace_id)
+                break
+
+            if isinstance(item, Trace):
+                current_trace_id = item.trace_id
+                if item.status == "running":
+                    print(f"[Trace] 开始: {item.trace_id[:8]}...")
+                elif item.status == "completed":
+                    print(f"\n[Trace] ✅ 完成  messages={item.total_messages}  cost=${item.total_cost:.4f}")
+                elif item.status == "failed":
+                    print(f"\n[Trace] ❌ 失败: {item.error_message}")
+                elif item.status == "stopped":
+                    print(f"\n[Trace] ⏸️ 已停止")
+
+            elif isinstance(item, Message):
+                current_sequence = item.sequence
+                if item.role == "assistant":
+                    content = item.content
+                    if isinstance(content, dict):
+                        text = content.get("text", "")
+                        tool_calls = content.get("tool_calls")
+                        if text and not tool_calls:
+                            print(f"\n[Response]\n{text}")
+                        elif text:
+                            preview = text[:150] + "..." if len(text) > 150 else text
+                            print(f"[Assistant] {preview}")
+                elif item.role == "tool":
+                    content = item.content
+                    tool_name = "unknown"
+                    if isinstance(content, dict):
+                        tool_name = content.get("tool_name", "unknown")
+                    if item.description and item.description != tool_name:
+                        desc = item.description[:80] if len(item.description) > 80 else item.description
+                        print(f"[Tool] ✅ {tool_name}: {desc}...")
+                    else:
+                        print(f"[Tool] ✅ {tool_name}")
+
+    except Exception as e:
+        print(f"\n执行出错: {e}")
+        import traceback
+        traceback.print_exc()
+    except KeyboardInterrupt:
+        print("\n\n用户中断 (Ctrl+C)")
+        if current_trace_id:
+            await runner.stop(current_trace_id)
+
+    # 结果摘要
+    print()
+    print("=" * 60)
+    if output_path.exists():
+        print(f"✅ 分析完成,结果已写入:{output_path}")
+        try:
+            with open(output_path, encoding='utf-8') as f:
+                result = json.load(f)
+            n_modules = len(result.get("capability_modules", []))
+            n_coarse = len(result.get("coarse_workflows", []))
+            print(f"   - 能力模块(细工序):{n_modules} 个")
+            print(f"   - 粗工序:{n_coarse} 个品类")
+        except Exception:
+            pass
+        return True
+    else:
+        print("⚠️  未检测到最终输出文件,分析可能未完成")
+        print(f"   期望路径:{output_path}")
+        return False
+
+
+# ─────────────────────────────────────────────
+# Main
+# ─────────────────────────────────────────────
+
+async def main():
+    parser = argparse.ArgumentParser(description="两阶段 Pipeline:工具调研 + 工作流分析")
+    parser.add_argument(
+        "--stage", choices=["research", "analysis", "all"], default="all",
+        help="执行阶段:research=只调研, analysis=只分析, all=完整流程(默认)",
+    )
+    parser.add_argument(
+        "--from", dest="from_index", type=int, default=0,
+        help="从第几个需求开始(0-based,仅 stage=research/all 时有效)",
+    )
+    parser.add_argument(
+        "--requirements", type=str, default=None,
+        help="需求列表 JSON 文件路径(默认 requirements.json)",
+    )
+    args = parser.parse_args()
+
+    base_dir = Path(__file__).parent
+    project_root = base_dir.parent.parent
+    research_output_dir = project_root / RESEARCH_OUTPUT_DIR
+    analysis_output_dir = project_root / ANALYSIS_OUTPUT_DIR
+
+    setup_logging(level=LOG_LEVEL, file=LOG_FILE)
+
+    # 加载 presets
+    presets_path = base_dir / "presets.json"
+    if presets_path.exists():
+        from agent.core.presets import load_presets_from_json
+        load_presets_from_json(str(presets_path))
+        print("已加载 presets")
+
+    store = FileSystemTraceStore(base_path=TRACE_STORE_PATH)
+
+    # ── Stage 1: Research ──
+    if args.stage in ("all", "research"):
+        req_path = Path(args.requirements) if args.requirements else base_dir / "requirements.json"
+        if not req_path.exists():
+            print(f"错误: 需求文件不存在: {req_path}")
+            sys.exit(1)
+        with open(req_path, encoding='utf-8') as f:
+            requirements = json.load(f)
+        if not isinstance(requirements, list) or len(requirements) == 0:
+            print("错误: 需求文件必须是非空 JSON 数组")
+            sys.exit(1)
+
+        research_output_dir.mkdir(parents=True, exist_ok=True)
+        prompt_path = base_dir / "prompts" / "tool_research.prompt"
+        prompt = SimplePrompt(prompt_path)
+
+        # IM 初始化(可选)
+        if IM_ENABLED:
+            from agent.tools.builtin.im.chat import im_setup, im_open_window
+            result = await im_setup(
+                contact_id=IM_CONTACT_ID,
+                server_url=IM_SERVER_URL,
+                notify_interval=IM_NOTIFY_INTERVAL,
+            )
+            print(f"IM: {result.output}")
+            if IM_WINDOW_MODE:
+                window_result = await im_open_window(contact_id=IM_CONTACT_ID)
+                print(f"IM: {window_result.output}")
+
+        prompt_model = prompt.config.get("model", None) or RESEARCH_RUN_CONFIG.model
+        runner = AgentRunner(
+            trace_store=store,
+            llm_call=create_qwen_llm_call(model=prompt_model),
+            skills_dir=SKILLS_DIR,
+            debug=DEBUG,
+        )
+        interactive = InteractiveController(runner=runner, store=store, enable_stdin_check=True)
+        runner.stdin_check = interactive.check_stdin
+
+        task_name = RESEARCH_RUN_CONFIG.name or base_dir.name
+        total = len(requirements)
+        start = args.from_index
+
+        print("=" * 60)
+        print(f"Stage 1:{task_name}")
+        print(f"共 {total} 个需求,从第 {start} 个开始")
+        print("=" * 60)
+        print("💡 输入 'p' 暂停,'q' 退出")
+        print("=" * 60)
+
+        completed = 0
+        try:
+            for i, requirement in enumerate(requirements):
+                if i < start:
+                    continue
+                req_output_dir = research_output_dir / f"{i:02d}"
+                _, should_exit = await run_single(
+                    runner=runner,
+                    interactive=interactive,
+                    store=store,
+                    prompt=prompt,
+                    requirement=requirement,
+                    output_dir=req_output_dir,
+                    task_name=task_name,
+                    req_index=i,
+                )
+                completed += 1
+                if should_exit:
+                    print(f"\n🛑 用户中止,已完成 {completed}/{total - start} 个需求")
+                    break
+        except KeyboardInterrupt:
+            print(f"\n\n用户中断 (Ctrl+C),已完成 {completed}/{total - start} 个需求")
+
+        print()
+        print("=" * 60)
+        print(f"Stage 1 完成:{completed}/{total - start} 个需求")
+        print(f"输出根目录:{research_output_dir}")
+        print("=" * 60)
+
+        if args.stage == "all":
+            # 统计已采集工作流数量(粗略)
+            wf_count = sum(
+                1 for d in research_output_dir.iterdir()
+                if d.is_dir() and (d / "workflows.json").exists()
+            )
+            print(f"\n[Stage 1 完成] 共 {wf_count} 个目录含 workflows.json,自动进入 Stage 2 分析...")
+
+    # ── Stage 2: Analysis ──
+    if args.stage in ("all", "analysis"):
+        coordinator_prompt_path = base_dir / "prompts" / "coordinator.prompt"
+        await run_analysis(
+            research_dir=research_output_dir,
+            analysis_dir=analysis_output_dir,
+            store=store,
+            prompt_path=coordinator_prompt_path,
+        )
+
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 37 - 0
examples/process/tool_research.prompt

@@ -0,0 +1,37 @@
+---
+temperature: 0.3
+---
+
+$system$
+
+## 角色
+你是一个工序调研协调器。你负责制定调研策略、分发广度采集任务、交叉验证数据,并输出结构化的工序(Workflow)调研报告。
+
+**核心目标**:
+寻找并汇总能解决特定生图需求的**多步工序(Workflow)**。必须结构化记录每个工序的执行步骤(Step、输入、输出、工具)。
+
+## 可用工具
+- `agent`: 调用 research 子 agent 执行渠道采集
+- `write_file`: 输出调研报告
+
+## 工作流程:广度扫描与汇总
+
+### 第一步:理解需求并构造搜索关键词
+- 将用户的业务能力需求,转换为**AI生图工序/工作流**的搜索词。
+- 关键词应偏向寻找“教程”、“工作流”、“全套方案”,而非单一软件名。
+- 准备 3-5 个关键词。
+
+### 第二步:并行调用渠道调研 agent
+默认使用小红书、X、youtube 这三个渠道进行调研。为每个渠道启动一个独立的 research 子 agent。
+
+**任务描述规范示例**:
+```text
+[小红书调研] 使用以下关键词进行广度扫描:[关键词1, 关键词2, 关键词3]。
+要求:
+- 每个关键词搜索 20 条结果。
+- 只提取多步工序(Workflow),必须严格拆解每个工序的步骤(包含输入、输出、工具)。
+
+$user$
+请开始工作:调研 %requirement%
+
+输出目录:%output_dir%/

+ 1 - 1
examples/production_restore/config.py

@@ -24,7 +24,7 @@ RUN_CONFIG = RunConfig(
     # 额外工具:需求库检索(在 core 工具组基础上追加)
     tools=["requirement_search", "search_category_tree", "extract_requirements_from_table"],
 
-    # 工具分组:core(基础能力)+ knowledge(ask_knowledge / upload_knowledge)
+    # 工具分组:core(基础能力)+ knowledge(upload_knowledge;知识查询改用 agent(agent_type="remote_librarian")
     tool_groups=["core", "knowledge", "category"],
 
     # 任务名称

+ 1 - 1
examples/production_restore/run.py

@@ -44,7 +44,7 @@ from agent.tools.builtin.browser.baseClass import init_browser_session, kill_bro
 # 导入自定义工具(触发 @tool 注册)
 from agent.tools.builtin.toolhub import toolhub_health, toolhub_search, toolhub_call, image_uploader, image_downloader  # noqa: F401
 from agent.tools.builtin.knowledge import requirement_search, requirement_list  # noqa: F401
-from agent.tools.builtin.librarian import ask_knowledge, upload_knowledge  # noqa: F401
+from agent.tools.builtin.librarian import upload_knowledge  # noqa: F401
 from evaluate_tool import evaluate_image  # noqa: F401
 from examples.production_restore.tools.category_query import search_category_tree, extract_requirements_from_table  # noqa: F401
 

+ 1 - 1
knowhub/README.md

@@ -49,7 +49,7 @@ Agent(端侧)
 |------|------|
 | [数据模型](docs/schema.md) | 13张表(5实体+8关联)、字段定义、向量策略 |
 | [REST API](docs/api.md) | API 端点参考 |
-| [Librarian Agent](docs/librarian-agent.md) | 知识管理 Agent 架构和接口 |
+| [Remote Agents](docs/remote-agents.md) | 远端 Agent(Librarian、Research)架构和调用链路 |
 | [知识处理流水线](docs/processing-pipeline.md) | 去重、工具关联分析、状态流转 |
 | [DB层](knowhub_db/README.md) | 数据库访问层封装类和运维脚本 |
 

+ 92 - 142
knowhub/agents/librarian.py

@@ -1,25 +1,25 @@
 """
 Librarian Agent — KnowHub 的知识管理 Agent
 
-通过 HTTP API 被 FastAPI server 调用,每次请求是一次 AgentRunner.run()。
-状态全部持久化在 trace 中,通过 trace_id 续跑实现跨请求上下文积累。
+同一个 Agent 处理多种任务模式(查询 / 上传等),调用方通过 skills 参数选择策略:
+- skills=["ask_strategy"] → 知识库查询整合
+- skills=["upload_strategy"] → 知识上传图谱编排
 
-两种调用模式:
-- ask: 同步,运行 Agent 处理查询,等待完成后返回结果
-- upload: 异步,存 buffer 后由后台任务运行 Agent 处理
+通过 HTTP API 被 FastAPI server 调用,每次请求一次 AgentRunner.run()。
+续跑由 caller 显式传入 continue_from 指定。
 """
 
 import json
 import logging
 import sys
 from pathlib import Path
-from typing import Optional, Dict, Any
+from typing import Optional, Dict, Any, List
 
 # 确保项目路径可用
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
 from agent.core.runner import AgentRunner, RunConfig
-from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.trace import FileSystemTraceStore
 from agent.llm import create_qwen_llm_call
 from agent.llm.prompts import SimplePrompt
 from agent.tools.builtin.knowledge import KnowledgeConfig
@@ -30,8 +30,9 @@ logger = logging.getLogger("agents.librarian")
 
 ENABLE_DATABASE_COMMIT = False
 
-# caller trace_id → librarian trace_id 的映射持久化文件
-TRACE_MAP_FILE = Path(".cache/.knowledge/trace_map.json")
+# Librarian 允许调用方注入的 skill 白名单。不在列表中的 skill 会被过滤掉(只打 warning,不报错)。
+ALLOWED_SKILLS = ["ask_strategy", "upload_strategy"]
+DEFAULT_SKILLS = ["ask_strategy"]  # 调用方不传时的默认值
 
 
 def get_librarian_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
@@ -69,7 +70,6 @@ def get_librarian_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> Run
         ),
         tools=tools,
         tool_groups=[],  # 精确指定工具,不从分组加载
-        exclude_tools=["ask_knowledge", "upload_knowledge", "bash_command", "grep_content", "glob_files"],
     )
 
 
@@ -98,36 +98,6 @@ def _register_internal_tools():
         logger.error(f"✗ 注册内部工具失败: {e}")
 
 
-# ===== trace_id 映射 =====
-
-def _load_trace_map() -> Dict[str, str]:
-    if TRACE_MAP_FILE.exists():
-        return json.loads(TRACE_MAP_FILE.read_text(encoding="utf-8"))
-    return {}
-
-
-def _save_trace_map(mapping: Dict[str, str]):
-    TRACE_MAP_FILE.parent.mkdir(parents=True, exist_ok=True)
-    TRACE_MAP_FILE.write_text(json.dumps(mapping, indent=2, ensure_ascii=False), encoding="utf-8")
-
-
-def get_librarian_trace_id(caller_trace_id: str) -> Optional[str]:
-    """根据调用方 trace_id 查找对应的 Librarian trace_id"""
-    if not caller_trace_id:
-        return None
-    mapping = _load_trace_map()
-    return mapping.get(caller_trace_id)
-
-
-def set_librarian_trace_id(caller_trace_id: str, librarian_trace_id: str):
-    """记录映射"""
-    if not caller_trace_id:
-        return
-    mapping = _load_trace_map()
-    mapping[caller_trace_id] = librarian_trace_id
-    _save_trace_map(mapping)
-
-
 # ===== 单例 Runner =====
 
 _runner: Optional[AgentRunner] = None
@@ -165,130 +135,110 @@ def _ensure_initialized():
 
 # ===== 核心方法 =====
 
-async def ask(query: str, caller_trace_id: str = "") -> Dict[str, Any]:
+async def run_librarian(
+    query: str,
+    continue_from: Optional[str] = None,
+    skills: Optional[List[str]] = None,
+) -> Dict[str, Any]:
     """
-    同步查询知识库。运行 Librarian Agent 处理查询,返回整合结果。
+    同步运行 Librarian Agent。由 skills 参数决定当前是什么模式:
+      - skills=["ask_strategy"]     → 查询模式(默认)
+      - skills=["upload_strategy"]  → 上传模式(query 应为 JSON 字符串)
 
     Args:
-        query: 查询内容
-        caller_trace_id: 调用方 trace_id,用于续跑
+        query: 任务内容。查询模式为自然语言问题;上传模式为 JSON 字符串 {knowledge, tools, resources}
+        continue_from: 已有 sub_trace_id,传入则续跑该 trace
+        skills: 调用方指定的 skill 列表,会被 ALLOWED_SKILLS 过滤
 
     Returns:
-        {"response": str, "source_ids": [str], "sources": [dict]}
+        {"status", "sub_trace_id", "summary", "stats", "error"?}
     """
     _ensure_initialized()
 
-    # 查找或创建 trace
-    librarian_trace_id = get_librarian_trace_id(caller_trace_id)
+    # Skill 过滤
+    skills = _filter_skills(skills)
+
+    # 判断模式:upload_strategy 走上传路径(会写 buffer + 解析 JSON)
+    is_upload = "upload_strategy" in skills
 
+    buffer_file = None
+    if is_upload:
+        try:
+            data = json.loads(query) if isinstance(query, str) else query
+        except json.JSONDecodeError as e:
+            return _fail(f"upload 模式下 query 不是合法 JSON: {e}")
+        if not isinstance(data, dict):
+            return _fail("upload 模式下 query 应为 JSON 对象")
+        if not (data.get("knowledge") or data.get("tools") or data.get("resources")):
+            return _fail("upload 模式下 data 中无有效条目")
+        buffer_file = _write_upload_buffer(data)
+        content = f"[UPLOAD:BATCH] 收到上传请求,请处理:\n{json.dumps(data, ensure_ascii=False)}"
+    else:
+        content = f"[ASK] {query}"
+
+    # 运行 Librarian
     config = get_librarian_config()
-    config.trace_id = librarian_trace_id  # None = 新建, 有值 = 续跑
+    config.trace_id = continue_from
 
-    # 构建消息
-    content = f"[ASK] {query}"
-    if librarian_trace_id is None:
+    if continue_from is None:
         messages = _prompt_messages + [{"role": "user", "content": content}]
     else:
         messages = [{"role": "user", "content": content}]
 
-    # 运行 Agent(指定注入 ask_strategy skill)
-    response_text = ""
-    actual_trace_id = None
-
-    async for item in _runner.run(
-        messages=messages, config=config,
-        inject_skills=["ask_strategy"],
-        skill_recency_threshold=20,
-    ):
-        if isinstance(item, Trace):
-            actual_trace_id = item.trace_id
-        elif isinstance(item, Message):
-            if item.role == "assistant":
-                msg_content = item.content
-                if isinstance(msg_content, dict):
-                    text = msg_content.get("text", "")
-                    if text:
-                        response_text = text
-                elif isinstance(msg_content, str) and msg_content:
-                    response_text = msg_content
-
-    # 记录 trace 映射
-    if actual_trace_id and caller_trace_id:
-        set_librarian_trace_id(caller_trace_id, actual_trace_id)
-
-    # 解析 source_ids(从 Agent 回复中提取,或从工具调用结果中提取)
-    # Agent 回复中会引用 knowledge ID,格式如 [knowledge-xxx]
-    import re
-    source_ids = re.findall(r'\[?(knowledge-[a-zA-Z0-9_-]+)\]?', response_text)
-    source_ids = list(dict.fromkeys(source_ids))  # 去重保序
-
-    return {
-        "response": response_text,
-        "source_ids": source_ids,
-        "sources": [],  # TODO: 从 trace 的工具调用结果中提取 source 详情
-    }
-
-
-async def process_upload(
-    data: Dict[str, Any],
-    caller_trace_id: str = "",
-    buffer_file: Optional[str] = None,
-    max_retries: int = 2,
-):
-    """
-    处理上传数据。运行 Librarian Agent 做图谱编排。
-    失败时重试,最终失败记录到 buffer 文件的状态中。
-
-    Args:
-        data: 上传数据 {knowledge, tools, resources}
-        caller_trace_id: 调用方 trace_id
-        buffer_file: 对应的 buffer 文件路径(用于更新状态)
-        max_retries: 最大重试次数
-    """
-    _ensure_initialized()
+    try:
+        result = await _runner.run_result(
+            messages=messages,
+            config=config,
+            inject_skills=skills,
+        )
+        actual_trace_id = result.get("trace_id")
+        if buffer_file:
+            _update_buffer_status(buffer_file, "completed", trace_id=actual_trace_id)
 
-    librarian_trace_id = get_librarian_trace_id(caller_trace_id)
+        return {
+            "status": result.get("status", "completed"),
+            "sub_trace_id": actual_trace_id,
+            "summary": result.get("summary", ""),
+            "stats": result.get("stats", {}),
+            "error": result.get("error"),
+        }
+    except Exception as e:
+        if buffer_file:
+            _update_buffer_status(buffer_file, "failed", error=str(e))
+        logger.error(f"[Librarian] run 失败: {e}")
+        return _fail(str(e))
 
-    config = get_librarian_config()
-    config.trace_id = librarian_trace_id
 
-    content = f"[UPLOAD:BATCH] 收到上传请求,请处理:\n{json.dumps(data, ensure_ascii=False)}"
+def _filter_skills(skills: Optional[List[str]]) -> List[str]:
+    """把调用方传的 skills 按 ALLOWED_SKILLS 过滤;空列表退化到 DEFAULT_SKILLS。"""
+    if not skills:
+        return list(DEFAULT_SKILLS)
+    allowed = [s for s in skills if s in ALLOWED_SKILLS]
+    dropped = [s for s in skills if s not in ALLOWED_SKILLS]
+    if dropped:
+        logger.warning(f"[Librarian] 忽略不在白名单的 skills: {dropped}(允许: {ALLOWED_SKILLS})")
+    return allowed or list(DEFAULT_SKILLS)
 
-    if librarian_trace_id is None:
-        messages = _prompt_messages + [{"role": "user", "content": content}]
-    else:
-        messages = [{"role": "user", "content": content}]
 
-    last_error = None
-    for attempt in range(max_retries + 1):
-        try:
-            actual_trace_id = None
-            async for item in _runner.run(
-                messages=messages, config=config,
-                inject_skills=["upload_strategy"],
-                skill_recency_threshold=10,
-            ):
-                if isinstance(item, Trace):
-                    actual_trace_id = item.trace_id
-
-            if actual_trace_id and caller_trace_id:
-                set_librarian_trace_id(caller_trace_id, actual_trace_id)
-
-            # 成功:更新 buffer 文件状态
-            _update_buffer_status(buffer_file, "completed", trace_id=actual_trace_id)
-            logger.info(f"[Librarian] upload 处理完成,trace: {actual_trace_id}")
-            return
+def _fail(error: str) -> Dict[str, Any]:
+    return {"status": "failed", "sub_trace_id": None, "summary": "", "stats": {}, "error": error}
 
-        except Exception as e:
-            last_error = str(e)
-            logger.warning(f"[Librarian] upload 处理失败 (attempt {attempt + 1}/{max_retries + 1}): {e}")
-            if attempt < max_retries:
-                import asyncio
-                await asyncio.sleep(2 ** attempt)  # 1s, 2s 指数退避
 
-    # 所有重试都失败
-    _update_buffer_status(buffer_file, "failed", error=last_error)
-    logger.error(f"[Librarian] upload 处理最终失败: {last_error}")
+def _write_upload_buffer(data: Dict[str, Any]) -> Optional[str]:
+    """把 upload 数据写到 buffer 目录,便于审计和失败重跑。"""
+    try:
+        from datetime import datetime as dt
+        buffer_dir = Path(".cache/.knowledge/buffer")
+        buffer_dir.mkdir(parents=True, exist_ok=True)
+        timestamp = dt.now().strftime("%Y%m%d_%H%M%S")
+        path = buffer_dir / f"upload_{timestamp}.json"
+        path.write_text(json.dumps({
+            "data": data, "received_at": dt.now().isoformat(),
+        }, ensure_ascii=False, indent=2), encoding="utf-8")
+        return str(path)
+    except Exception as e:
+        logger.warning(f"写 buffer 失败: {e}")
+        return None
 
 
 def _update_buffer_status(buffer_file: Optional[str], status: str, trace_id: str = None, error: str = None):

+ 44 - 81
knowhub/agents/research.py

@@ -1,50 +1,28 @@
-import asyncio
-import json
+"""
+Research Agent — 深度调研 Agent,部署在 KnowHub 服务器端。
+
+通过 HTTP API 被 FastAPI server 调用,每次请求是一次 AgentRunner.run()。
+续跑由 caller 显式传入 continue_from 指定。
+"""
+
 import logging
 import sys
 from pathlib import Path
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
 
 # 确保项目路径可用
 sys.path.insert(0, str(Path(__file__).parent.parent.parent))
 
-from agent.core.runner import AgentRunner
-from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore
 from agent.llm import create_qwen_llm_call
 from agent.llm.prompts import SimplePrompt
+from agent.tools.builtin.knowledge import KnowledgeConfig
 
-logger = logging.getLogger(__name__)
-
-# 文件保存 trace 映射关系,持久化续跑
-TRACE_MAP_FILE = Path(".cache/research_trace_map.json")
-
-
-def _load_trace_map() -> Dict[str, str]:
-    if TRACE_MAP_FILE.exists():
-        return json.loads(TRACE_MAP_FILE.read_text(encoding="utf-8"))
-    return {}
-
-
-def _save_trace_map(mapping: Dict[str, str]):
-    TRACE_MAP_FILE.parent.mkdir(parents=True, exist_ok=True)
-    TRACE_MAP_FILE.write_text(json.dumps(mapping, indent=2, ensure_ascii=False), encoding="utf-8")
+logger = logging.getLogger("agents.research")
 
-
-def get_research_trace_id(caller_trace_id: str) -> Optional[str]:
-    """根据调用方 trace_id 查找对应的 Research trace_id"""
-    if not caller_trace_id:
-        return None
-    mapping = _load_trace_map()
-    return mapping.get(caller_trace_id)
-
-
-def set_research_trace_id(caller_trace_id: str, research_trace_id: str):
-    """记录映射"""
-    if not caller_trace_id:
-        return
-    mapping = _load_trace_map()
-    mapping[caller_trace_id] = research_trace_id
-    _save_trace_map(mapping)
+# Research 目前不支持调用方注入 skill(skills 走 config.skills 服务器端固定)
+ALLOWED_SKILLS: List[str] = []
 
 
 # ===== 单例 Runner =====
@@ -61,14 +39,11 @@ def _ensure_initialized():
         return
     _initialized = True
 
-    # 初始化 Runner。工具会自动从 __file__.parent.parent.parent / agent / tools 加载吗?
-    # 根据用户环境,内置通用工具大概是在 agent/tools,或者自动全局识别
-    # 在这里,我们将 skills_dir 也设为此处寻找特定技能,如果需要的话可以扩展。
     skills_dir = Path(__file__).parent / "skills"
-    
+
     _runner = AgentRunner(
         trace_store=FileSystemTraceStore(base_path=".trace"),
-        llm_call=create_qwen_llm_call(model="qwen3.5-plus"),  # prompt使用sonnet,但如果想和系统对齐可保留qwen,按照之前的设定
+        llm_call=create_qwen_llm_call(model="qwen3.5-plus"),
         skills_dir=str(skills_dir) if skills_dir.exists() else None,
         debug=True,
         logger_name="agents.research",
@@ -78,8 +53,7 @@ def _ensure_initialized():
     if prompt_path.exists():
         prompt = SimplePrompt(prompt_path)
         _prompt_messages = prompt.build_messages()
-        
-        # 尝试通过 prompt meta 获取模型设置
+
         if getattr(prompt, "meta", None) and prompt.meta.get("model"):
             model_name = prompt.meta["model"]
             _runner.llm_call = create_qwen_llm_call(model=model_name)
@@ -92,72 +66,61 @@ def _ensure_initialized():
 
 # ===== 核心方法 =====
 
-async def research(query: str, caller_trace_id: str = "") -> Dict[str, Any]:
+async def research(
+    query: str,
+    continue_from: Optional[str] = None,
+    skills: Optional[List[str]] = None,
+) -> Dict[str, Any]:
     """
-    同步执行深度调研。运行 Research Agent,返回调查结果。
+    同步执行深度调研。运行 Research Agent,返回标准 agent 结果。
 
     Args:
         query: 用户设定的研究主题或查询
-        caller_trace_id: 调用方 trace_id,用于续跑
+        continue_from: 已有 sub_trace_id,传入则续跑该 trace
+        skills: 保留参数(Research 当前无白名单 skill);传入会被忽略
 
     Returns:
-        {"response": str, "source_ids": [str], "sources": [dict]}
+        {"status", "sub_trace_id", "summary", "stats", "error"?}
     """
+    if skills:
+        logger.warning(f"[Research] 忽略 skills 参数(Research 不接受动态 skill 注入)")
     _ensure_initialized()
 
-    # 初始化云端无头浏览器(因为是部署在线上,必须防卡顿并自动分配独立环境
+    # 初始化云端无头浏览器(线上部署必须用云浏览器
     try:
         from agent.tools.builtin.browser import init_browser_session
         await init_browser_session(browser_type="cloud")
     except Exception as e:
         logger.warning(f"Failed to init cloud browser: {e}")
 
-    # 查找或创建 trace
-    research_trace_id = get_research_trace_id(caller_trace_id)
-
-    from agent.core.runner import RunConfig
     config = RunConfig(
         model="qwen3.5-plus",
         temperature=0.3,
         max_iterations=200,
         tool_groups=["core", "content", "browser"],
+        exclude_tools=["agent", "evaluate"],  # 远端 Agent 禁止递归派生子 Agent
         skills=["planning", "research", "browser"],
+        # 远端 Agent 关闭自动知识操作(否则 injection 会回调 remote_librarian 形成递归)
+        knowledge=KnowledgeConfig(
+            enable_extraction=False,
+            enable_completion_extraction=False,
+            enable_injection=False,
+        ),
     )
-    config.trace_id = research_trace_id  # None = 新建, 有值 = 续跑
+    config.trace_id = continue_from  # None = 新建;有值 = 续跑
 
-    # 构建消息
     content = f"[RESEARCH TASK] {query}"
-    if research_trace_id is None:
+    if continue_from is None:
         messages = _prompt_messages + [{"role": "user", "content": content}]
     else:
         messages = [{"role": "user", "content": content}]
 
-    # 运行 Agent
-    response_text = ""
-    actual_trace_id = None
-
-    async for item in _runner.run(
-        messages=messages, 
-        config=config,
-    ):
-        if isinstance(item, Trace):
-            actual_trace_id = item.trace_id
-        elif isinstance(item, Message):
-            if item.role == "assistant":
-                msg_content = item.content
-                if isinstance(msg_content, dict):
-                    text = msg_content.get("text", "")
-                    if text:
-                        response_text = text
-                elif isinstance(msg_content, str) and msg_content:
-                    response_text = msg_content
-
-    # 记录 trace 映射
-    if actual_trace_id and caller_trace_id:
-        set_research_trace_id(caller_trace_id, actual_trace_id)
+    result = await _runner.run_result(messages=messages, config=config)
 
     return {
-        "response": response_text,
-        "source_ids": [],
-        "sources": [], 
+        "status": result.get("status", "unknown"),
+        "sub_trace_id": result.get("trace_id"),
+        "summary": result.get("summary", ""),
+        "stats": result.get("stats", {}),
+        "error": result.get("error"),
     }

+ 29 - 8
knowhub/docs/api.md

@@ -186,18 +186,39 @@ secure_body 在有组织密钥时自动加密(AES-256-GCM)。
 
 ---
 
-### `POST /api/knowledge/ask` — 智能知识查询
+### `POST /api/agent` — 统一远端 Agent 入口
 
-同步阻塞。向量检索 + 结果整合,返回 response + source_ids + sources
+同步阻塞。运行由 `agent_type` 指定的远端 Agent(如 `remote_librarian`、`remote_research`),返回标准的 Agent 结果
 
-运行 Librarian Agent 检索 + 整合。详见 [librarian-agent.md](librarian-agent.md)。
+**请求体**:
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `agent_type` | str | 必填。必须是 `remote_` 前缀,如 `remote_librarian`、`remote_research` |
+| `task` | str | 必填。单任务描述(远端模式不支持 `List[str]` 多任务) |
+| `messages` | List | 可选。预置 OpenAI 格式消息,作为 Agent 初始上下文 |
+| `continue_from` | str | 可选。已有 `sub_trace_id`,传入则续跑该 trace |
+| `skills` | List[str] | 可选。调用方指定的 skill 列表;服务器按每个 agent_type 的 `ALLOWED_SKILLS` 过滤 |
 
-请求体:query(必填), trace_id, top_k(默认5)
+**约束**:客户端传的 `tool_groups` / `model` / prompt 由服务器 preset 完全决定,客户端无法干预。`skills` 是例外——可被指定但受白名单约束。
 
-### `POST /api/knowledge/upload` — 异步知识上传
+**响应**:
+```json
+{
+  "sub_trace_id": "...",
+  "status": "completed",
+  "summary": "...",
+  "stats": {"total_messages": N, "total_tokens": N, "total_cost": 0.xxx}
+}
+```
 
-校验格式后写入 buffer 目录,立即返回 202。Librarian Agent 异步处理图谱编排和去重。
+`summary` 是 Agent 最终产出的 message 文本——Agent 之间通过 message 通信,不定义 per-agent 的结构化字段。如果 Agent 需要返回结构化数据(引用来源、ID 列表等),由其 prompt 约定写进 message 文本(例如 JSON 代码块),调用方自行 parse
 
-请求体:data({knowledge, resources, tools}), trace_id, finalize
+**续跑语义**:服务器不维护 `caller_trace_id → sub_trace_id` 映射。首次调用不传 `continue_from` 创建新 trace;caller 自行记住返回的 `sub_trace_id`,下次作为 `continue_from` 传回。
 
-详见 [librarian-agent.md](librarian-agent.md)。
+详见 [remote-agents.md](remote-agents.md)。
+
+### `GET /api/knowledge/upload/pending` / `POST /api/knowledge/upload/retry` — 运维
+
+查询与重跑历史 upload buffer 中的 pending / failed 记录。仅用于人工排查,不在主 agent 调用流中。
+
+> 知识上传本身已并入 `POST /api/agent`,使用 `agent_type="remote_librarian_ingest"`;`task` 为 JSON 字符串(`{knowledge, tools, resources}`)。详见 [remote-agents.md](remote-agents.md)。

+ 0 - 87
knowhub/docs/librarian-agent.md

@@ -1,87 +0,0 @@
-# Librarian Agent
-
-## 文档维护规范
-
-0. **先改文档,再动代码**
-1. **文档分层,链接代码** — 格式:`module/file.py:function_name`
-2. **简洁快照,日志分离** — 决策依据记录在 `knowhub/docs/decisions.md`
-
----
-
-## 定位
-
-Librarian Agent(原 Knowledge Manager Agent)是 KnowHub 的智能层,负责:
-- 知识查询的整合回答(将散点搜索结果用 LLM 整合为带引用的回答)
-- 知识上传的去重和处理
-- 工具与知识的关联分析
-
----
-
-## 架构
-
-```
-Agent(端侧)
-  ↓ ask_knowledge / upload_knowledge 工具(HTTP)
-KnowHub Server(FastAPI)
-  ├── POST /api/knowledge/ask(同步)
-  │   → 运行 Librarian Agent(AgentRunner.run)
-  │   → Agent 检索 + LLM 整合 → 返回结果
-  │
-  └── POST /api/knowledge/upload(异步 202)
-      → 存 buffer → BackgroundTasks 运行 Librarian Agent
-      → Agent 做图谱编排(去重、关联 capability/tool)
-```
-
-Librarian Agent 不是常驻后台进程。每次 HTTP 请求触发一次 `AgentRunner.run()`,所有状态持久化在 trace 中。通过 trace_id 续跑实现跨请求的上下文积累。
-
-实现:
-- Agent 核心:`knowhub/agents/librarian.py`(`ask()` / `process_upload()`)
-- Prompt:`knowhub/agents/librarian_agent.prompt`
-- Agent 侧工具:`agent/tools/builtin/librarian.py`
-- Server 端点:`knowhub/server.py:ask_knowledge_api` / `upload_knowledge_api`
-
-### trace_id 续跑
-
-同一个 caller trace_id 映射到同一个 Librarian trace_id(映射持久化在 `.cache/.knowledge/trace_map.json`)。首次请求创建新 trace,后续请求续跑该 trace,Agent 保持对调用方任务的上下文理解。
-
-### ask 接口
-
-```
-POST /api/knowledge/ask
-{
-  "query": "ControlNet 相关的工具知识",
-  "trace_id": "caller-trace-xxx",
-  "top_k": 5
-}
-```
-
-同步阻塞。Librarian Agent 通过 knowledge_search、tool_search、capability_search 等工具跨表检索,用 LLM 综合分析后返回结构化回答。
-
-响应:`{"response": "...", "source_ids": [...], "sources": [...]}`
-
-### upload 接口
-
-```
-POST /api/knowledge/upload
-{
-  "data": {"knowledge": [...], "resources": [...], "tools": [...]},
-  "trace_id": "caller-trace-xxx",
-  "finalize": false
-}
-```
-
-立即返回 202。数据同时写入 buffer 目录(`.cache/.knowledge/buffer/`,便于回溯),Librarian Agent 在后台运行图谱编排:检索已有实体去重、挂载 capability、构建关系、写入草稿池。
-
-### 知识注入
-
-`inject_knowledge_for_goal`(`agent/trace/goal_tool.py`)通过 ask 接口查询,结果记录为 cognition_log 的 query 事件(详见 `agent/docs/cognition-log-plan.md`)。
-
----
-
-## 与 Cognition Log 的关系
-
-ask 接口的每次调用在 Agent 侧产生一个 `query` 事件,记录查询、整合回答和 source_ids。后续评估以 query 为单位,逐 source 评估。详见 [cognition-log-plan.md](cognition-log-plan.md)。
-
-## 与知识处理流水线的关系
-
-upload 提交的知识进入处理流水线(去重、工具关联分析)。当前这部分逻辑在 `server.py:KnowledgeProcessor` 中。详见 [processing-pipeline.md](processing-pipeline.md)。

+ 141 - 0
knowhub/docs/remote-agents.md

@@ -0,0 +1,141 @@
+# Remote Agents
+
+## 文档维护规范
+
+0. **先改文档,再动代码**
+1. **文档分层,链接代码** — 格式:`module/file.py:function_name`
+2. **简洁快照,日志分离** — 决策依据记录在 `knowhub/docs/decisions.md`
+
+---
+
+## 定位
+
+KnowHub 服务器托管的远端 Agent,供客户端通过统一的 `agent` 工具(`agent_type="remote_xxx"`)调用。
+
+目前部署:
+
+| agent_type | 职责 | 允许的 skills | 核心实现 |
+|------------|------|---------------|---------|
+| `remote_librarian` | 知识库查询整合 / 知识上传图谱编排 | `ask_strategy`(默认)/ `upload_strategy` | `knowhub/agents/librarian.py::run_librarian` |
+| `remote_research` | 深度调研,全网搜集 + 总结 | —(不支持动态 skill) | `knowhub/agents/research.py::research` |
+
+**一个 Agent,多种模式**:Librarian 是同一个 AgentRunner,调用方通过 `skills` 参数选择当前的策略——这比通过 task 内容或独立 agent_type 区分模式更显式、更可靠。
+
+**skills 的语义**:
+- 由 caller 指定(`agent` 工具的 `skills` 参数透传到 HTTP body)
+- 服务器按每个 agent_type 的 `ALLOWED_SKILLS` 白名单过滤(非法 skill 被忽略并打 warning)
+- 不在白名单的 skill 不会影响 Agent 行为,也不会导致请求失败
+
+所有 agent 调用同步执行——服务器处理完成后才返回结果。
+
+---
+
+## 远端 Agent 的安全约束
+
+新增远端 Agent 时,handler 构造的 `RunConfig` 必须显式满足三条:
+
+| 约束 | 如何配置 |
+|------|---------|
+| **禁止调用 `agent` / `evaluate` 工具** | 用 `tools=[...]` 精确列表不含它们;或 `tool_groups + exclude_tools=["agent","evaluate"]` |
+| **关闭自动知识提取 / 复盘** | `knowledge=KnowledgeConfig(enable_extraction=False, enable_completion_extraction=False, ...)` |
+| **关闭自动知识注入** | `knowledge=KnowledgeConfig(enable_injection=False, ...)` |
+
+原因:
+- 递归 `agent`:remote agent 在服务器上再派生子 Agent 会污染服务器 trace;调 `remote_*` 则 HTTP 回调自己
+- 自动知识操作:`enable_injection=True` 会在 focus goal 时调 `remote_librarian` → 服务器自己调自己 → 递归
+
+当前实现参考:`knowhub/agents/librarian.py::get_librarian_config`(显式 tool 列表 + knowledge 全关)、`knowhub/agents/research.py::research`(`tool_groups + exclude_tools` + knowledge 全关)。
+
+新增远端 Agent 只需:在 `knowhub/agents/` 下写实现 + prompt + 定义 `ALLOWED_SKILLS`,在 `knowhub/server.py::_get_remote_agent_dispatch` 的分发表里登记 `agent_type`。客户端零改动。
+
+---
+
+## 架构
+
+```
+端侧 Agent
+  └── agent 工具(统一入口,按 agent_type 路由)
+        ↓ HTTP
+      POST /api/agent
+        → 服务器按 agent_type 分发到对应 AgentRunner
+```
+
+远端 Agent 不是常驻进程。每次 HTTP 请求触发一次 `AgentRunner.run()`,状态持久化到服务器本地的 `.trace/`。
+
+**续跑**:客户端显式传 `continue_from=<sub_trace_id>`。服务器不维护 caller → sub_trace 映射,caller 自己记住并回传。
+
+**知识上传**和查询用同一个 `remote_librarian`,靠 `skills=["upload_strategy"]` 切换到上传模式。`task` 此时是 JSON 字符串 `{knowledge, tools, resources}`。服务器同步跑完图谱编排后返回——不再是独立的 `/api/knowledge/upload` 端点。
+
+---
+
+## 调用示例
+
+统一端点 `POST /api/agent`,详见 [api.md § POST /api/agent](api.md#post-apiagent)。
+
+**知识查询**(Librarian + ask_strategy):
+
+```json
+{
+  "agent_type": "remote_librarian",
+  "task": "ControlNet 相关的工具知识",
+  "skills": ["ask_strategy"],
+  "continue_from": null
+}
+```
+
+**知识上传**(Librarian + upload_strategy;`task` 用 JSON 字符串承载结构化数据):
+
+```json
+{
+  "agent_type": "remote_librarian",
+  "task": "{\"knowledge\": [...], \"tools\": [...], \"resources\": [...]}",
+  "skills": ["upload_strategy"],
+  "continue_from": null
+}
+```
+
+**深度调研**(`remote_research` 不接受 skill):
+
+```json
+{
+  "agent_type": "remote_research",
+  "task": "深度调研 Nano Banana 多图融合",
+  "continue_from": null
+}
+```
+
+所有调用同步返回标准 Agent 结果 `{sub_trace_id, status, summary, stats}`。`summary` 是 Agent 最终产出的 message 文本——结构化信息(引用来源、ID 列表等)由 Agent 的 prompt 约定写进文本,由调用方 parse。不定义 per-agent 的强 schema。
+
+### 审计与运维
+
+知识上传请求会在服务器 `.cache/.knowledge/buffer/` 下写一份 buffer 文件(审计用)。处理失败的文件可通过运维端点查询与重跑:
+
+- `GET /api/knowledge/upload/pending` — 列出 pending 或 failed 的 buffer
+- `POST /api/knowledge/upload/retry` — 同步重跑所有 failed
+
+### 知识注入(框架级)
+
+`inject_knowledge_for_goal`(`agent/trace/goal_tool.py`)在 Goal 开始时自动通过 `agent_type="remote_librarian"` 调用 `/api/agent`,把返回的 summary 作为 cognition_log 的 `query` 事件记录(详见 `agent/docs/cognition-log-plan.md`)。
+
+---
+
+## 实现位置
+
+| 组件 | 位置 |
+|------|------|
+| Librarian 核心 | `knowhub/agents/librarian.py::run_librarian`(skill 决定模式;`ALLOWED_SKILLS` 定义白名单) |
+| Research 核心 | `knowhub/agents/research.py`(`research()`) |
+| Prompt | `knowhub/agents/librarian_agent.prompt` / `research_agent.prompt` |
+| Server 端点 | `knowhub/server.py::agent_api`(`/api/agent`)+ 运维 `/api/knowledge/upload/{pending,retry}` |
+| 客户端入口 | `agent/tools/builtin/subagent.py::agent`(按 `remote_` 前缀路由) |
+| CLI | `python -m agent.tools.builtin.subagent --agent_type=remote_xxx --task=...` |
+
+---
+
+## 与 Cognition Log 的关系
+
+每次 `remote_librarian` 调用在 Agent 侧产生一个 `query` 事件,记录查询和整合回答。后续评估以 query 为单位。详见 [cognition-log-plan.md](cognition-log-plan.md)。
+
+## 与知识处理流水线的关系
+
+upload 提交的知识进入处理流水线(去重、工具关联分析)。逻辑在 `server.py:KnowledgeProcessor` 中。详见 [processing-pipeline.md](processing-pipeline.md)。

+ 70 - 112
knowhub/server.py

@@ -974,120 +974,76 @@ async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[st
 # --- Knowledge Ask / Upload API (Librarian Agent HTTP 接口) ---
 
 
-class KnowledgeAskRequest(BaseModel):
-    query: str
-    trace_id: str  # 必填:调用方的 trace_id,用于 Librarian 续跑
-
-
-class KnowledgeAskResponse(BaseModel):
-    response: str  # 整合后的回答
-    source_ids: list[str] = []
-    sources: list[dict] = []  # [{id, task, content}]
-
-
-class KnowledgeResearchRequest(BaseModel):
-    query: str
-    trace_id: str  # 必填:调用方的 trace_id,用于续跑
-
-class KnowledgeResearchResponse(BaseModel):
-    response: str
-    source_ids: list[str] = []
-    sources: list[dict] = []
-
-class KnowledgeUploadRequest(BaseModel):
-    data: dict  # {tools, resources, knowledge}
-    trace_id: str  # 必填:调用方的 trace_id
-    finalize: bool = False
+class AgentRequest(BaseModel):
+    """POST /api/agent 请求体(统一远端 Agent 入口)"""
+    agent_type: str  # 必填,必须是 remote_ 前缀
+    task: str  # 必填,单任务描述
+    messages: Optional[list[dict]] = None  # 预置 OpenAI 格式消息
+    continue_from: Optional[str] = None  # 已有 sub_trace_id,传入则续跑
+    skills: Optional[list[str]] = None  # 调用方指定 skill,服务器按 agent_type 的白名单过滤
+
+
+class AgentResponse(BaseModel):
+    """POST /api/agent 响应体(标准 Agent 结果)"""
+    sub_trace_id: Optional[str] = None
+    status: str
+    summary: str = ""
+    stats: dict = {}
+    error: Optional[str] = None
+
+
+# agent_type → handler 的映射。handler 签名: (query, continue_from, skills) -> dict
+# 新增远端 Agent 在这里登记。
+_REMOTE_AGENT_DISPATCH = {}
+
+
+def _get_remote_agent_dispatch():
+    """延迟导入 agent 实现,避免循环导入和启动时加载重型依赖"""
+    global _REMOTE_AGENT_DISPATCH
+    if not _REMOTE_AGENT_DISPATCH:
+        from agents.librarian import run_librarian
+        from agents.research import research as _research
+        _REMOTE_AGENT_DISPATCH = {
+            "remote_librarian": run_librarian,
+            "remote_research": _research,
+        }
+    return _REMOTE_AGENT_DISPATCH
 
 
-@app.post("/api/knowledge/ask")
-async def ask_knowledge_api(req: KnowledgeAskRequest):
+@app.post("/api/agent", response_model=AgentResponse)
+async def agent_api(req: AgentRequest):
     """
-    智能知识查询。运行 Librarian Agent 检索 + LLM 整合,返回带引用的结构化结果。
+    统一远端 Agent 入口。按 agent_type 分发到对应实现
 
-    同步阻塞:Agent 运行完成后返回。
-    trace_id 用于续跑:同一 caller trace_id 复用同一个 Librarian trace,积累上下文。
+    全部同步:Agent 运行完成后返回标准 {status, sub_trace_id, summary, stats}。
+    续跑由 caller 传入 continue_from 显式指定(服务器不维护 caller→trace 映射)。
+    Skills 由 caller 指定,每个 handler 用自己的白名单过滤。
     """
-    try:
-        from agents.librarian import ask
-        result = await ask(query=req.query, caller_trace_id=req.trace_id)
-        return KnowledgeAskResponse(**result)
-
-    except Exception as e:
-        print(f"[Knowledge Ask] 错误: {e}")
-        raise HTTPException(status_code=500, detail=str(e))
+    if not req.agent_type.startswith("remote_"):
+        raise HTTPException(
+            status_code=400,
+            detail=f"agent_type 必须以 'remote_' 开头,收到: {req.agent_type}",
+        )
 
+    dispatch = _get_remote_agent_dispatch()
+    handler = dispatch.get(req.agent_type)
+    if handler is None:
+        raise HTTPException(
+            status_code=404,
+            detail=f"未注册的 agent_type: {req.agent_type},可用: {list(dispatch.keys())}",
+        )
 
-@app.post("/api/knowledge/research")
-async def research_knowledge_api(req: KnowledgeResearchRequest):
-    """
-    智能深度调研。运行 Research Agent 执行全网搜集和总结,返回深度调研结果。
-    同步阻塞:Agent 运行完成后返回。
-    """
     try:
-        from agents.research import research
-        result = await research(query=req.query, caller_trace_id=req.trace_id)
-        return KnowledgeResearchResponse(**result)
-
+        result = await handler(
+            query=req.task,
+            continue_from=req.continue_from,
+            skills=req.skills,
+        )
+        return AgentResponse(**result)
     except Exception as e:
         import traceback
         traceback.print_exc()
-        print(f"[Knowledge Research] 错误: {e}")
-        raise HTTPException(status_code=500, detail=str(e))
-
-
-@app.post("/api/knowledge/upload", status_code=202)
-async def upload_knowledge_api(req: KnowledgeUploadRequest, background_tasks: BackgroundTasks):
-    """
-    异步知识上传。校验后立即返回 202,后台运行 Librarian Agent 处理。
-
-    Librarian Agent 负责图谱编排:去重、关联已有 capability/tool、构建关系。
-    """
-    try:
-        data = req.data
-        knowledge_list = data.get("knowledge", [])
-        tools_list = data.get("tools", [])
-        resources_list = data.get("resources", [])
-        total_items = len(knowledge_list) + len(tools_list) + len(resources_list)
-
-        if total_items == 0:
-            raise HTTPException(status_code=400, detail="data 中无有效条目")
-
-        # 存 buffer(便于回溯)
-        from datetime import datetime as dt
-        buffer_dir = Path(".cache/.knowledge/buffer")
-        buffer_dir.mkdir(parents=True, exist_ok=True)
-        timestamp = dt.now().strftime("%Y%m%d_%H%M%S")
-        trace_suffix = f"_{req.trace_id[:8]}" if req.trace_id else ""
-        buffer_file = buffer_dir / f"upload_{timestamp}{trace_suffix}.json"
-        buffer_file.write_text(json.dumps({
-            "data": data, "trace_id": req.trace_id, "finalize": req.finalize,
-            "received_at": dt.now().isoformat(),
-        }, ensure_ascii=False, indent=2), encoding="utf-8")
-
-        # 后台运行 Librarian Agent 处理
-        from agents.librarian import process_upload
-        background_tasks.add_task(
-            process_upload,
-            data=data,
-            caller_trace_id=req.trace_id,
-            buffer_file=str(buffer_file),
-        )
-
-        summary = []
-        if tools_list: summary.append(f"工具: {len(tools_list)}")
-        if resources_list: summary.append(f"资源: {len(resources_list)}")
-        if knowledge_list: summary.append(f"知识: {len(knowledge_list)}")
-
-        return {
-            "status": "accepted",
-            "message": f"已接收 {', '.join(summary)},Librarian Agent 后台处理中",
-        }
-
-    except HTTPException:
-        raise
-    except Exception as e:
-        print(f"[Knowledge Upload] 错误: {e}")
+        print(f"[/api/agent] {req.agent_type} 错误: {e}")
         raise HTTPException(status_code=500, detail=str(e))
 
 
@@ -1100,24 +1056,26 @@ async def list_pending_uploads_api():
 
 
 @app.post("/api/knowledge/upload/retry")
-async def retry_pending_uploads_api(background_tasks: BackgroundTasks):
-    """重跑所有失败的 upload 任务"""
-    from agents.librarian import list_pending_uploads, process_upload
+async def retry_pending_uploads_api():
+    """重跑所有失败的 upload 任务(同步执行,按顺序处理)"""
+    from agents.librarian import list_pending_uploads, run_librarian
 
     pending = list_pending_uploads()
     failed = [p for p in pending if p["status"] == "failed"]
 
+    results = []
     for item in failed:
         buffer_file = item["file"]
         data = json.loads(Path(buffer_file).read_text(encoding="utf-8"))
-        background_tasks.add_task(
-            process_upload,
-            data=data.get("data", {}),
-            caller_trace_id=data.get("trace_id", ""),
-            buffer_file=buffer_file,
+        payload = data.get("data", {})
+        result = await run_librarian(
+            query=json.dumps(payload),
+            continue_from=None,
+            skills=["upload_strategy"],
         )
+        results.append({"file": buffer_file, "status": result.get("status"), "error": result.get("error")})
 
-    return {"retried": len(failed), "message": f"已触发 {len(failed)} 个失败任务的重跑"}
+    return {"retried": len(failed), "results": results}
 
 
 @app.get("/api/knowledge/search")