Просмотр исходного кода

add research agent & sync_atomic_capabilities

guantao 19 часов назад
Родитель
Сommit
8b9a2258cf

+ 83 - 1
agent/tools/builtin/knowledge.py

@@ -191,6 +191,8 @@ async def knowledge_save(
     submitted_by: str = "",
     score: int = 3,
     message_id: str = "",
+    capability_ids: Optional[List[str]] = None,
+    tool_ids: Optional[List[str]] = None,
     context: Optional[ToolContext] = None,
 ) -> ToolResult:
     """
@@ -244,7 +246,9 @@ async def knowledge_save(
                 "helpful": 1,
                 "harmful": 0,
                 "confidence": 0.5,
-            }
+            },
+            "capability_ids": capability_ids or [],
+            "tool_ids": tool_ids or []
         }
 
         async with httpx.AsyncClient(timeout=30.0) as client:
@@ -778,3 +782,81 @@ async def relation_search(
         return ToolResult(title=f"❌ {table_name} 检索失败", output=f"HTTP Error: {e.response.text}", error=str(e))
     except Exception as e:
         return ToolResult(title=f"❌ {table_name} 检索失败", output=str(e), error=str(e))
+
+
+@tool(groups=["knowledge_internal"], hidden_params=["context"])
+async def tool_create(
+    id: str,
+    name: str = "",
+    version: Optional[str] = None,
+    introduction: str = "",
+    tutorial: str = "",
+    input: str = "",
+    output: str = "",
+    status: str = "未接入",
+    capability_ids: Optional[List[str]] = None,
+    knowledge_ids: Optional[List[str]] = None,
+    provider_ids: Optional[List[str]] = None,
+    context: Optional[ToolContext] = None
+) -> ToolResult:
+    """创建或更新工具(直接存入数据库)"""
+    try:
+        payload = {
+            "id": id, "name": name, "version": version, "introduction": introduction,
+            "tutorial": tutorial, "input": input, "output": output, "status": status,
+            "capability_ids": capability_ids or [], "knowledge_ids": knowledge_ids or [],
+            "provider_ids": provider_ids or []
+        }
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            res = await client.post(f"{KNOWHUB_API}/api/tool", json=payload)
+            res.raise_for_status()
+        return ToolResult(title="✅ 工具保存成功", output=f"成功创建/更新工具: {id}")
+    except Exception as e:
+        return ToolResult(title="❌ 工具保存失败", output=str(e), error=str(e))
+
+
+@tool(groups=["knowledge_internal"], hidden_params=["context"])
+async def capability_create(
+    id: str,
+    name: str = "",
+    criterion: str = "",
+    description: str = "",
+    requirement_ids: Optional[List[str]] = None,
+    implements: Optional[Dict[str, str]] = None,
+    tool_ids: Optional[List[str]] = None,
+    knowledge_ids: Optional[List[str]] = None,
+    context: Optional[ToolContext] = None
+) -> ToolResult:
+    """创建或更新能力(直接存入数据库)"""
+    try:
+        payload = {
+            "id": id, "name": name, "criterion": criterion, "description": description,
+            "requirement_ids": requirement_ids or [], "implements": implements or {},
+            "tool_ids": tool_ids or [], "knowledge_ids": knowledge_ids or []
+        }
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            res = await client.post(f"{KNOWHUB_API}/api/capability", json=payload)
+            res.raise_for_status()
+        return ToolResult(title="✅ 能力保存成功", output=f"成功创建/更新能力: {id}")
+    except Exception as e:
+        return ToolResult(title="❌ 能力保存失败", output=str(e), error=str(e))
+
+
+@tool(groups=["knowledge_internal"], hidden_params=["context"])
+async def requirement_create(
+    id: str,
+    description: str = "",
+    capability_ids: Optional[List[str]] = None,
+    context: Optional[ToolContext] = None
+) -> ToolResult:
+    """创建或更新需求(直接存入数据库)"""
+    try:
+        payload = {
+            "id": id, "description": description, "capability_ids": capability_ids or []
+        }
+        async with httpx.AsyncClient(timeout=30.0) as client:
+            res = await client.post(f"{KNOWHUB_API}/api/requirement", json=payload)
+            res.raise_for_status()
+        return ToolResult(title="✅ 需求保存成功", output=f"成功创建/更新需求: {id}")
+    except Exception as e:
+        return ToolResult(title="❌ 需求保存失败", output=str(e), error=str(e))

+ 4 - 2
knowhub/agents/librarian.py

@@ -45,6 +45,7 @@ def get_librarian_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> Run
         "read_file", "write_file",
         "list_cache_status",
         "match_tree_nodes",
+        "sync_atomic_capabilities",
         "skill",
     ]
 
@@ -67,12 +68,11 @@ def get_librarian_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> Run
             enable_injection=False,
         ),
         tools=tools,
-        exclude_tools=["ask_knowledge", "upload_knowledge", "bash_command", "grep_content", "glob_files"],
     )
 
 
 def _register_internal_tools():
-    """注册内部工具(缓存管理 + 树匹配),只需调用一次"""
+    """注册内部工具"""
     try:
         sys.path.insert(0, str(Path(__file__).parent.parent))
         from internal_tools.cache_manager import (
@@ -82,6 +82,7 @@ def _register_internal_tools():
             list_cache_status,
         )
         from internal_tools.tree_matcher import match_tree_nodes
+        from internal_tools.capability_extractor import sync_atomic_capabilities
         from agent.tools import get_tool_registry
         registry = get_tool_registry()
         registry.register(cache_research_data)
@@ -89,6 +90,7 @@ def _register_internal_tools():
         registry.register(commit_to_database)
         registry.register(list_cache_status)
         registry.register(match_tree_nodes)
+        registry.register(sync_atomic_capabilities)
         logger.info("✓ 已注册 Librarian 内部工具")
     except Exception as e:
         logger.error(f"✗ 注册内部工具失败: {e}")

+ 6 - 1
knowhub/agents/librarian_agent.prompt

@@ -10,7 +10,12 @@ $system$
 你是一个知识库管理员。你有两项核心职责:
 
 1. **检索整合**:面对查询时,跨多张表检索,顺着关联链拼出完整上下文,给出精准回答
-2. **入库编排**:收到新数据时,与已有知识比对去重,识别关联关系,整理为结构化条目归入正确位置。**注意:你所有的归档与起草工作,必须严格并且仅限于编辑 `.cache/.knowledge/pre_upload_list.json` 这个草稿文件,严禁在根目录或任何其他地方擅自创建诸如 `knowledge/` 或 `tools/` 的散装文件夹和文件!**
+2. **入库编排**:收到新数据时,与已有知识比对去重,识别关联关系,整理为结构化条目归入正确位置。
+   **【红线警告:绝对禁止的存储行为】**
+   - **严禁**创建 `drafts/`、`knowledge/` 或 `tools/` 等任何散装文件夹!
+   - **严禁**使用 `write_file` 去保存单独的文件,也**严禁**用它尝试拼接修改庞大的 JSON 字典!
+   - 你所有的草稿与起草必须,并且**只能**使用专用工具 `cache_research_data` 逐条将组装好的实体对象安全存入草稿箱。
+   - 当所有实体对象准备完毕要上传数据库时,直接调用 `commit_to_database()`。
 
 你只做整理和检索,不自行创造知识内容。
 

+ 163 - 0
knowhub/agents/research.py

@@ -0,0 +1,163 @@
+import asyncio
+import json
+import logging
+import sys
+from pathlib import Path
+from typing import Dict, Any, Optional
+
+# 确保项目路径可用
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from agent.core.runner import AgentRunner
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_qwen_llm_call
+from agent.llm.prompts import SimplePrompt
+
+logger = logging.getLogger(__name__)
+
+# 文件保存 trace 映射关系,持久化续跑
+TRACE_MAP_FILE = Path(".cache/research_trace_map.json")
+
+
+def _load_trace_map() -> Dict[str, str]:
+    if TRACE_MAP_FILE.exists():
+        return json.loads(TRACE_MAP_FILE.read_text(encoding="utf-8"))
+    return {}
+
+
+def _save_trace_map(mapping: Dict[str, str]):
+    TRACE_MAP_FILE.parent.mkdir(parents=True, exist_ok=True)
+    TRACE_MAP_FILE.write_text(json.dumps(mapping, indent=2, ensure_ascii=False), encoding="utf-8")
+
+
+def get_research_trace_id(caller_trace_id: str) -> Optional[str]:
+    """根据调用方 trace_id 查找对应的 Research trace_id"""
+    if not caller_trace_id:
+        return None
+    mapping = _load_trace_map()
+    return mapping.get(caller_trace_id)
+
+
+def set_research_trace_id(caller_trace_id: str, research_trace_id: str):
+    """记录映射"""
+    if not caller_trace_id:
+        return
+    mapping = _load_trace_map()
+    mapping[caller_trace_id] = research_trace_id
+    _save_trace_map(mapping)
+
+
+# ===== 单例 Runner =====
+
+_runner: Optional[AgentRunner] = None
+_prompt_messages = None
+_initialized = False
+
+
+def _ensure_initialized():
+    """延迟初始化 Runner 和 Prompt(首次调用时执行)"""
+    global _runner, _prompt_messages, _initialized
+    if _initialized:
+        return
+    _initialized = True
+
+    # 初始化 Runner。工具会自动从 __file__.parent.parent.parent / agent / tools 加载吗?
+    # 根据用户环境,内置通用工具大概是在 agent/tools,或者自动全局识别
+    # 在这里,我们将 skills_dir 也设为此处寻找特定技能,如果需要的话可以扩展。
+    skills_dir = Path(__file__).parent / "skills"
+    
+    _runner = AgentRunner(
+        trace_store=FileSystemTraceStore(base_path=".trace"),
+        llm_call=create_qwen_llm_call(model="qwen3.5-plus"),  # prompt使用sonnet,但如果想和系统对齐可保留qwen,按照之前的设定
+        skills_dir=str(skills_dir) if skills_dir.exists() else None,
+        debug=True,
+        logger_name="agents.research",
+    )
+
+    prompt_path = Path(__file__).parent / "research_agent.prompt"
+    if prompt_path.exists():
+        prompt = SimplePrompt(prompt_path)
+        _prompt_messages = prompt.build_messages()
+        
+        # 尝试通过 prompt meta 获取模型设置
+        if getattr(prompt, "meta", None) and prompt.meta.get("model"):
+            model_name = prompt.meta["model"]
+            _runner.llm_call = create_qwen_llm_call(model=model_name)
+    else:
+        _prompt_messages = []
+        logger.warning(f"Research prompt 文件不存在: {prompt_path}")
+
+    logger.info("✓ Research Agent 已初始化")
+
+
+# ===== 核心方法 =====
+
+async def research(query: str, caller_trace_id: str = "") -> Dict[str, Any]:
+    """
+    同步执行深度调研。运行 Research Agent,返回调查结果。
+
+    Args:
+        query: 用户设定的研究主题或查询
+        caller_trace_id: 调用方 trace_id,用于续跑
+
+    Returns:
+        {"response": str, "source_ids": [str], "sources": [dict]}
+    """
+    _ensure_initialized()
+
+    # 初始化云端无头浏览器(因为是部署在线上,必须防卡顿并自动分配独立环境)
+    try:
+        from agent.tools.builtin.browser import init_browser_session
+        await init_browser_session(browser_type="cloud")
+    except Exception as e:
+        logger.warning(f"Failed to init cloud browser: {e}")
+
+    # 查找或创建 trace
+    research_trace_id = get_research_trace_id(caller_trace_id)
+
+    from agent.core.runner import RunConfig
+    config = RunConfig(
+        model="qwen3.5-plus",
+        temperature=0.3,
+        max_iterations=200,
+        tool_groups=["core", "content", "browser"],
+        skills=["planning", "research", "browser"],
+    )
+    config.trace_id = research_trace_id  # None = 新建, 有值 = 续跑
+
+    # 构建消息
+    content = f"[RESEARCH TASK] {query}"
+    if research_trace_id is None:
+        messages = _prompt_messages + [{"role": "user", "content": content}]
+    else:
+        messages = [{"role": "user", "content": content}]
+
+    # 运行 Agent
+    response_text = ""
+    actual_trace_id = None
+
+    async for item in _runner.run(
+        messages=messages, 
+        config=config,
+    ):
+        if isinstance(item, Trace):
+            actual_trace_id = item.trace_id
+        elif isinstance(item, Message):
+            if item.role == "assistant":
+                msg_content = item.content
+                if isinstance(msg_content, dict):
+                    text = msg_content.get("text", "")
+                    if text:
+                        response_text = text
+                elif isinstance(msg_content, str) and msg_content:
+                    response_text = msg_content
+
+    # 记录 trace 映射
+    if actual_trace_id and caller_trace_id:
+        set_research_trace_id(caller_trace_id, actual_trace_id)
+
+    return {
+        "response": response_text,
+        "source_ids": [],
+        "sources": [], 
+    }

+ 141 - 0
knowhub/agents/research_agent.prompt

@@ -0,0 +1,141 @@
+---
+model: sonnet-4.6
+temperature: 0.3
+---
+
+$system$
+## 角色
+你是一个调研专家,负责根据指令搜索并如实记录调研发现。
+
+**你的边界**:只负责搜索和记录,不负责制定策略。发现的工序流程、方案、案例都要如实记录,但不要自己设计工序。
+**调研结果的形式可以多样**:单个工具、工序流程、真实案例都可以。但无论哪种形式,**必须落到具体工具**——每个步骤用什么工具来执行,需要明确。
+
+## 可用工具
+### 内容搜索工具
+- `search_posts(keyword, channel, cursor="0", max_count=20)`: 搜索帖子
+  - **channel 参数**:xhs(小红书), gzh(公众号), zhihu(知乎), bili(B站), douyin(抖音), toutiao(头条), weibo(微博)
+  - 示例:`search_posts("flux 2.0", channel="xhs", max_count=20)`
+- `select_post(index)`: 查看帖子详情(需先调用 search_posts)
+  - 示例:`select_post(index=1)`
+- `youtube_search(keyword)`: 搜索 YouTube 视频
+  - 示例:`youtube_search("flux 2.0 tutorial")`
+- `youtube_detail(content_id, include_captions=True)`: 获取 YouTube 视频详情和字幕
+  - 示例:`youtube_detail("视频ID", include_captions=True)`
+- `x_search(keyword)`: 搜索 X (Twitter) 内容
+  - 示例:`x_search("flux 2.0 max")`
+- `knowledge_search`: 搜索知识库
+- `browser-use`: 浏览器搜索(search_posts 不好用时使用)
+
+## 执行流程
+
+### 第一步:理解调研目标
+
+### 第二步:执行搜索
+
+**调研渠道策略**:
+1. **官网** - 获取官方介绍、技术规格、API 文档
+2. **内容平台** - 获取真实用例和使用经验
+   - 公众号:`search_posts(keyword="...", channel="gzh")`
+   - X:`x_search(keyword="...")`
+   - 知乎:`search_posts(keyword="...", channel="zhihu")`
+   - 小红书:`search_posts(keyword="...", channel="xhs")`
+3. **视频平台** - 获取用法教程和实操演示
+   - YouTube:`youtube_search(keyword="...")` → `youtube_detail(content_id="...")`
+   - B站:`search_posts(keyword="...", channel="bili")`
+
+**重要**:
+- **必须优先使用专用搜索工具**(search_posts、youtube_search、x_search)
+- **禁止使用 browser-use 搜索公众号、知乎、小红书、B站等已有专用工具的平台**
+- browser-use 仅用于搜索没有专用工具的平台或官网
+
+**Query 策略**(从以下角度搜索):
+1. **找官网** - "[工具名] 官网"、"[工具名] official website"
+2. **找用例** - "[工具名] 用例"、"[工具名] 使用案例"、"[工具名] tutorial"
+3. **找评测** - "[工具名] 评测"、"[工具名] review"、"[工具名] 测试"
+4. **找竞品讨论** - "[工具名] vs [竞品]"、"[工具名] 和 [竞品] 谁更强"
+5. **找排行** - "2026 年最强 [领域] 工具"、"[领域] 工具排行"
+
+**搜索优先级**:
+1. **知识库优先**:用 `knowledge_search` 按需求关键词搜索,查看已有策略经验、工具评估、工作流总结
+2. **线上调研**:知识库结果不充分时,进行线上搜索
+
+### 第三步:反思与调整
+
+在搜索过程中,你需要主动进行反思和调整:
+每完成 1-2 轮搜索后,在继续前先评估:
+- 当前方向是否有效?是否偏离需求?
+- 结果质量如何?下一轮应该调整 query 还是换角度?
+- 可选调用 `reflect` 工具辅助判断
+根据反思结果调整后续搜索策略,直到你认为信息充分或遇到明确的阻塞。
+
+### 第四步:结束与输出
+
+**何时结束**:
+- 信息已充分覆盖调研目标
+- 搜索结果开始重复,无新信息
+- 方向不明确,需要用户指导
+
+**如何结束**:
+1. **必须**使用 `write_file` 将调研结果按照下面的 JSON 格式写入到examples/tool_research/outputs/nanobanana_2
+2. 输出文件路径由调用方在 task 中指定,如未指定则输出为纯文本消息
+
+
+## 输出格式
+
+**Schema**:
+
+```jsonschema
+{
+  "搜索主题": "string — 本次搜索主题",
+  "搜索轨迹": "string — 搜索过程:尝试了哪些 query、如何调整方向等",
+  "调研发现": [
+    {
+      "名称": "string — 发现项名称(工具名/方案名/案例名)",
+      "类型": "tool | workflow | case — 单个工具 / 工序流程或整体方案 / 真实案例",
+      "来源": "string — 来源(knowledge_id / URL / 帖子链接)",
+      "核心描述": "string — 核心思路或能力描述",
+      "工序步骤": [
+        {
+          "步骤名称": "string — 步骤名称(如:生成线稿、角色一致性处理)",
+          "使用工具": "string — 该步骤使用的具体工具名称",
+          "说明": "string — 该步骤的操作说明"
+        }
+      ],
+      "工具信息": {
+        "工具名称": "string — 工具名称(类型为 tool 时必填)",
+        "仓库或链接": "string — 仓库或官网链接",
+        "输入格式": "string — 输入格式",
+        "输出格式": "string — 输出格式",
+        "最近更新": "string — 最近更新时间",
+        "能力": ["string — 工具能力"],
+        "限制": ["string — 工具限制"]
+      },
+      "外部评价": {
+        "专家或KOL推荐": ["string — 来源 + 评价摘要"],
+        "社区反馈": ["string — 来源 + 反馈摘要"],
+        "热度指标": "string — 提及次数、榜单排名、帖子热度等"
+      },
+      "使用案例": [
+        {
+          "描述": "string — 用例描述",
+          "来源链接": "string — 来源链接",
+          "相似度": "high | medium | low"
+        }
+      ],
+      "优点": ["string"],
+      "缺点": ["string"],
+      "风险": ["string"]
+    }
+  ]
+}
+```
+
+**字段说明**:
+- `工序步骤`:类型为 `workflow` 或 `case` 时填写,逐步骤记录用了什么工具
+- `工具信息`:类型为 `tool` 时必填;`workflow`/`case` 类型中,如果整体方案依赖某个核心工具(如 ComfyUI),也可填写
+- `外部评价`:尽量填写,是主 agent 选择工具时的重要参考;找不到可留空
+
+
+## 注意事项
+- `search_posts` 不好用时改用 `browser-use`
+- 如果调研过程中遇到不确定的问题,要停下来询问用户

+ 137 - 21
knowhub/internal_tools/cache_manager.py

@@ -29,13 +29,99 @@ def _ensure_dirs():
 
 @tool()
 async def organize_cached_data(merge: bool = True) -> ToolResult:
-    """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
-    return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
+    """旧指令保留口。现在已废弃,无需调用。"""
+    return ToolResult(title="ℹ️ 提示", output="请直接使用 cache_research_data。")
 
 @tool()
-async def cache_research_data(data: str | Dict[str, Any], source: str = "unknown") -> ToolResult:
-    """为了兼容旧指令保留。现在实际上不再需要独立调用。"""
-    return ToolResult(title="ℹ️ 提示", output="请直接使用 read_file 和 write_file 编辑 pre_upload_list.json。")
+async def cache_research_data(entity_type: str, data: Dict[str, Any]) -> ToolResult:
+    """
+    【极端重要】由于通过文本级读写组装极庞大的嵌套 JSON 文件很容易导致大模型截断、忘闭合引发毁灭性覆盖或奔溃,
+    任何要在 JSON 缓存中「安全追加」一条信息的操作请仅限调用此工具!不要使用 write_file!
+    
+    Args:
+        entity_type: 所属的数据类别,仅能填入 "requirements", "capabilities", "tools" 或是 "knowledge"。
+        data: 具体那单独一条你想草拟或记录的数据实体结构(请直接传递 Json Object, 我们会在底层完成拼接保存)。
+        
+    Returns:
+        缓存操作的执行结果
+    """
+    _ensure_dirs()
+    if entity_type not in ("requirements", "capabilities", "tools", "knowledge"):
+        return ToolResult(
+            title="❌ 参数异常", 
+            output=f"传入的 entity_type = {entity_type} 不合法。必须是 requirements, capabilities, tools, knowledge。请重新确认参数类型!",
+            error="Invalid entity_type"
+        )
+        
+    # 0. 数据格式硬校验 (Schema Validation)
+    if entity_type == "knowledge":
+        if "task" not in data:
+            return ToolResult(
+                title="❌ 校验失败",
+                output="【严重错误】写入 knowledge 必须包含 'task' 字段!不能用 'title' 代替。请修正 JSON 结构后重新调用本工具。",
+                error="Missing 'task' field"
+            )
+        if "tags" in data and not isinstance(data["tags"], dict):
+            return ToolResult(
+                title="❌ 校验失败",
+                output="【严重错误】knowledge 的 'tags' 字段强制要求为字典格式 (如 `{\"标签名\": \"\"}`),绝对不能是数组(List)。请修正后重新调用本工具。",
+                error="Invalid 'tags' format"
+            )
+    elif entity_type == "capabilities":
+        if "name" not in data or "description" not in data:
+            return ToolResult(
+                title="❌ 校验失败",
+                output="写入 capabilities 必须包含 'name' 和 'description' 字段。请修正后再调用。",
+                error="Missing capability fields"
+            )
+    elif entity_type == "requirements":
+        if "description" not in data:
+            return ToolResult(
+                title="❌ 校验失败",
+                output="写入 requirements 必须包含 'description' 字段。请修正后再调用。",
+                error="Missing requirement fields"
+            )
+    
+    try:
+        # 1. 内存层安全读取
+        if PRE_UPLOAD_FILE.exists():
+            with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
+                try:
+                    cache_dict = json.load(f)
+                except json.JSONDecodeError:
+                    # 如果原文件损坏,进行挽救性备份并重新初始化
+                    backup_file = CACHE_DIR / f"pre_upload_list_backup_{int(datetime.now().timestamp())}.json"
+                    os.rename(PRE_UPLOAD_FILE, backup_file)
+                    cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
+        else:
+            cache_dict = {"requirements": [], "capabilities": [], "tools": [], "knowledge": []}
+            
+        # 2. 追加
+        if entity_type not in cache_dict:
+            cache_dict[entity_type] = []
+        
+        # 去重更新与追加 (以 ID 为准)
+        data_id = data.get("id")
+        replaced = False
+        if data_id:
+            for idx, existing in enumerate(cache_dict[entity_type]):
+                if existing.get("id") == data_id:
+                    cache_dict[entity_type][idx] = data
+                    replaced = True
+                    break
+        if not replaced:
+            cache_dict[entity_type].append(data)
+            
+        # 3. 稳妥写盘
+        with open(PRE_UPLOAD_FILE, "w", encoding="utf-8") as f:
+            json.dump(cache_dict, f, ensure_ascii=False, indent=2)
+            
+        action = "更新" if replaced else "新建"
+        return ToolResult(title="✅ 存入草稿箱成功", output=f"成功将一条 {entity_type} {action}写入到了缓存文件!当前此类别规模: {len(cache_dict[entity_type])} 个。")
+        
+    except Exception as e:
+        logger.error(f"Cache save failed: {e}")
+        return ToolResult(title="❌ 系统异常", output=f"执行时发生底层错误: {str(e)}", error=str(e))
 
 @tool(
     description=(
@@ -103,12 +189,19 @@ async def commit_to_database() -> ToolResult:
         from agent.tools.builtin.knowledge import knowledge_save
         for k in knowledges:
             try:
+                raw_tags = k.get("tags", {})
+                if isinstance(raw_tags, list):
+                    raw_tags = {str(item): "" for item in raw_tags}
+                
                 await knowledge_save(
-                    task=k.get("task", "补充知识"),
+                    task=k.get("task", k.get("title", "补充知识")),
                     content=k.get("content", ""),
                     types=k.get("types", []),
                     score=k.get("score", 3),
-                    source_category=k.get("source", {}).get("category", "exp")
+                    source_category=k.get("source", {}).get("category", "exp"),
+                    capability_ids=k.get("capability_ids", []),
+                    tool_ids=k.get("tool_ids", []),
+                    tags=raw_tags
                 )
                 saved_knows += 1
             except Exception as e:
@@ -142,11 +235,13 @@ async def commit_to_database() -> ToolResult:
 
 
 @tool(
-    description="查看当前预整理草稿的统计信息。"
+    description="查看当前预整理草稿的统计明细,或者通过传入 entity_id 获取某条特定草稿的完整 JSON 详情。"
 )
-async def list_cache_status() -> ToolResult:
+async def list_cache_status(entity_id: Optional[str] = None) -> ToolResult:
     """
-    查看草稿状态(pre_upload_list.json)
+    查看草稿状态(pre_upload_list.json)或特定记录的详情。
+    Args:
+        entity_id: (可选) 如果传入实体 ID(如 'REQ_001'),将返回该条目的完整草稿详情。不传则仅列出概要。
     """
     _ensure_dirs()
     if not PRE_UPLOAD_FILE.exists():
@@ -155,17 +250,38 @@ async def list_cache_status() -> ToolResult:
     try:
         with open(PRE_UPLOAD_FILE, "r", encoding="utf-8") as f:
             data = json.load(f)
+            
+        if entity_id:
+            # 查找具体详情
+            for group in ("requirements", "capabilities", "tools", "knowledge"):
+                for item in data.get(group, []):
+                    if item.get("id") == entity_id:
+                        return ToolResult(
+                            title=f"📄 草稿详情: {entity_id}", 
+                            output=json.dumps(item, ensure_ascii=False, indent=2)
+                        )
+            return ToolResult(title="⚠️ 找不到对象", output=f"在草稿箱中未找到 ID 为 {entity_id} 的实体。")
         
-        reqs = len(data.get("requirements", []))
-        caps = len(data.get("capabilities", []))
-        tools = len(data.get("tools", []))
-        knows = len(data.get("knowledge", []))
-
-        output = (f"当前草稿数据({PRE_UPLOAD_FILE.name}):\n"
-                  f" - 需求: {reqs}\n"
-                  f" - 能力: {caps}\n"
-                  f" - 工具: {tools}\n"
-                  f" - 知识: {knows}\n")
-        return ToolResult(title="📁 草稿状态", output=output)
+        # 仅返回统计明细
+        reqs = data.get("requirements", [])
+        caps = data.get("capabilities", [])
+        tools = data.get("tools", [])
+        knows = data.get("knowledge", [])
+
+        output_lines = [f"当前草稿数据({PRE_UPLOAD_FILE.name}):(通过传参 entity_id 查看具体完整JSON)"]
+        
+        output_lines.append(f"\n- 需求 ({len(reqs)}条):")
+        for r in reqs: output_lines.append(f"  • [{r.get('id')}] {r.get('description', '')[:50]}...")
+            
+        output_lines.append(f"\n- 能力 ({len(caps)}条):")
+        for c in caps: output_lines.append(f"  • [{c.get('id')}] {c.get('name', '')}")
+            
+        output_lines.append(f"\n- 工具 ({len(tools)}条):")
+        for t in tools: output_lines.append(f"  • [{t.get('id')}] {t.get('name', '')}")
+            
+        output_lines.append(f"\n- 知识 ({len(knows)}条):")
+        for k in knows: output_lines.append(f"  • [{k.get('id')}] {k.get('title', '')[:50]}...")
+
+        return ToolResult(title="📁 草稿状态明细", output="\n".join(output_lines))
     except Exception as e:
         return ToolResult(title="❌ 读取状态失败", output=str(e), error=str(e))

+ 224 - 0
knowhub/internal_tools/capability_extractor.py

@@ -0,0 +1,224 @@
+import json
+import logging
+import uuid
+import os
+import asyncio
+from typing import List
+from agent.tools import tool, ToolResult
+from agent.llm.openrouter import openrouter_llm_call
+
+# 导入底层 Postgres 资产表依赖
+from tool_agent.tool.tool_store import PostgreSQLToolStore
+from tool_agent.tool.capability import PostgreSQLCapabilityStore
+
+logger = logging.getLogger(__name__)
+
+SYSTEM_PROMPT_CAPABILITY = """你是一个专业的能力分析师。
+你的任务是从给定的【待分析新工具】的使用介绍和它挂载的【相关背景知识文章】中,提取出它对整网【原子能力表】的贡献,并选择新建或是融合。
+
+## 定义与格式
+1. 原子能力是面向需求、跨工具的独立完整业务单元。
+2. 端到端型工具(如Midjourney)直接抽取能力;编排平台工具(如ComfyUI节点群)从实际搭建的工作流中提取能力视角(不要原子化平台或独立节点本身)。
+
+请输出严格的 JSON 数组结构:
+[
+  {
+    "action": "create",
+    "tool_id": "<当前待分析的工具ID>",
+    "knowledge_ids": ["<哪些传入的相关知识促成了这个能力,填入真实的知识ID>"],
+    "capability_id": "NEW_<任意数字字母临时ID>",
+    "name": "<总结提炼的新能力统称名>",
+    "criterion": "<客观统一的判定标准>",
+    "description": "<抽象的能力需求场景说明>",
+    "implement_description": "<在该特定工具里是如何实现该能力的(具体操作或调用链)>"
+  },
+  {
+    "action": "attach",
+    "tool_id": "<当前待分析的工具ID>",
+    "knowledge_ids": ["<关联的知识ID,可为空数组>"],
+    "capability_id": "<来自下面【已有全量能力库】字典中完全等价功能的真实ID>",
+    "implement_description": "<在该特定工具里实现此老能力的具体手法>"
+  },
+  {
+    "action": "update_and_attach",
+    "tool_id": "<当前待分析的工具ID>",
+    "knowledge_ids": ["<关联的知识ID>"],
+    "capability_id": "<来自【已有全量能力库】的真实ID>",
+    "name": "<更统整包容的全局更好命名(如果不改就留空不变)>",
+    "criterion": "<更新优化后的判定标准>",
+    "description": "<更新优化后的描述>",
+    "implement_description": "<在该特定工具中如何实现>"
+  }
+]
+
+请绝对不要输出 markdown 包装,仅输出原生的合法 JSON。如果一个工具覆盖了多个独立原子能力,请为每个能力出具一条动作操作。
+"""
+
+def _fetch_knowledge_map(cursor, k_ids: list):
+    if not k_ids: return {}
+    placeholders = ','.join(['%s'] * len(k_ids))
+    cursor.execute(f"SELECT row_to_json(knowledge) as data FROM knowledge WHERE id IN ({placeholders})", list(k_ids))
+    mapping = {}
+    for r in cursor.fetchall():
+        d = r['data']
+        text = str(d.get('content', d.get('markdown', d.get('description', ''))))
+        mapping[d['id']] = {"title": d.get('title', ''), "content": text[:4000]}
+    return mapping
+
+
+async def extract_capabilities_with_claude(existing_caps, tool_batch, knowledge_map):
+    cap_str = json.dumps([{"id": c["id"], "name": c["name"], "criterion": c.get("criterion", "")} for c in existing_caps], ensure_ascii=False)
+    tool_str = json.dumps([{"id": t["id"], "name": t["name"], "desc": t["introduction"], "docs": t["tutorial"], "associated_knowledge": t.get("knowledge_ids", [])} for t in tool_batch], ensure_ascii=False)
+    knowledge_str = json.dumps(knowledge_map, ensure_ascii=False)
+    
+    prompt = f"【现有全量原子能力库字典】:\n{cap_str}\n\n【相关背景知识文章】:\n{knowledge_str}\n\n【本次待分析抽取合并的工具列表】:\n{tool_str}\n\n请严格输出JSON操作数组:"
+    
+    messages = [
+        {"role": "system", "content": SYSTEM_PROMPT_CAPABILITY},
+        {"role": "user", "content": prompt}
+    ]
+    
+    result_text = ""
+    try:
+        result = await openrouter_llm_call(
+            messages=messages,
+            model="anthropic/claude-sonnet-4-5",
+            temperature=0.2
+        )
+        result_text = result.get("content", "")
+    except Exception as e:
+        logger.error(f"OpenRouter API failed: {e}")
+        return []
+
+    with open("raw_capability_responses.log", "a", encoding="utf-8") as f:
+        f.write(f"\n--- Synchronous Capability Batch Output ---\n{result_text}\n")
+
+    try:
+        clean_json = result_text.strip()
+        if clean_json.startswith("```json"): clean_json = clean_json[7:]
+        elif clean_json.startswith("```"): clean_json = clean_json[3:]
+        if clean_json.endswith("```"): clean_json = clean_json[:-3]
+        
+        data = json.loads(clean_json.strip())
+        if isinstance(data, dict):
+            if "action" in data: return [data]
+            return []
+        elif isinstance(data, list):
+            return [item for item in data if isinstance(item, dict) and "action" in item]
+        return []
+    except Exception as e:
+        logger.error(f"Failed to parse capability JSON: {e}")
+        return []
+
+
+@tool()
+async def sync_atomic_capabilities(target_tool_ids: List[str]) -> ToolResult:
+    """
+    一键式强同步工具(为Librarian等智能体量身打造)。
+    针对新发现的或发生变动的特定工具/知识源,它能在数十秒内完成关联获取、大模型分析并直接完成底层 PostgreSQL 更新操作。
+    直接返回应用成功后的增减战报。
+    
+    Args:
+        target_tool_ids: 必须提供。指定要被大模型执行能力审查提取的工具 ID 列表。建议每次传入数量极少(如 1-3个)以保证 15 秒内同步快速返回。
+    """
+    if not target_tool_ids:
+        return ToolResult(title="❌ 参数错误", output="必须提供 target_tool_ids,独立系统不再允许发起全量全局扫描避免阻塞。", error="Missing target_tool_ids")
+
+    logger.info(f"开启单通道同步能力萃取 (目标: {target_tool_ids})...")
+    
+    cap_store = PostgreSQLCapabilityStore()
+    tool_store = PostgreSQLToolStore()
+    k_cursor = cap_store._get_cursor()
+    stats = {"created": 0, "attached": 0, "updated": 0, "knowledge_inherited": 0}
+    
+    try:
+        existing_caps = cap_store.list_all(limit=5000)
+        all_tools = tool_store.list_all(limit=2000)
+        
+        target_tools = [t for t in all_tools if t.get("id") in target_tool_ids]
+        if not target_tools:
+            return ToolResult(title="❌ 未找到工具", output=f"找不到任何由 {target_tool_ids} 制定的接入工具")
+
+        # 拉取目标工具的强绑定相关知识
+        batch_k_ids = set([k for t in target_tools for k in t.get("knowledge_ids", [])])
+        k_map = _fetch_knowledge_map(k_cursor, list(batch_k_ids))
+        
+        # Claude 执行抽象推理构建矩阵
+        ops = await extract_capabilities_with_claude(existing_caps, target_tools, k_map)
+        
+        if not ops:
+            return ToolResult(title="ℹ️ 分析完成", output="大模型判定当前工具没有提取出任何有效或创新的功能资产。")
+            
+        temp_id_mapping = {}
+
+        # 落地阶段一:先创造出新的原子能力
+        for op in ops:
+            if op.get("action") == "create" and op.get("capability_id") and op.get("tool_id"):
+                real_id = f"cap-{uuid.uuid4().hex[:12]}"
+                temp_id_mapping[op.get("capability_id")] = real_id
+                
+                t_id = op.get("tool_id")
+                inherited_knowledge = op.get("knowledge_ids", [])
+                stats["knowledge_inherited"] += len(inherited_knowledge)
+                
+                cap_store.insert_or_update({
+                    "id": real_id,
+                    "name": op.get("name", ""),
+                    "criterion": op.get("criterion", ""),
+                    "description": op.get("description", ""),
+                    "tool_ids": [t_id],
+                    "implements": {t_id: op.get("implement_description", "")},
+                    "knowledge_ids": inherited_knowledge
+                })
+                stats["created"] += 1
+
+        # 落地阶段二:处理老能力的依附和扩展刷新
+        for op in ops:
+            action = op.get("action")
+            if action in ("attach", "update_and_attach") and op.get("capability_id") and op.get("tool_id"):
+                c_id = temp_id_mapping.get(op.get("capability_id"), op.get("capability_id"))
+                existing_cap = cap_store.get_by_id(c_id)
+                if not existing_cap: continue
+                
+                if action == "update_and_attach":
+                    existing_cap["name"] = op.get("name") or existing_cap.get("name")
+                    existing_cap["criterion"] = op.get("criterion") or existing_cap.get("criterion")
+                    existing_cap["description"] = op.get("description") or existing_cap.get("description")
+                    stats["updated"] += 1
+                
+                t_id = op.get("tool_id")
+                imp_desc = op.get("implement_description", "")
+                
+                tool_ids = existing_cap.get("tool_ids", [])
+                if t_id not in tool_ids: tool_ids.append(t_id)
+                existing_cap["tool_ids"] = tool_ids
+                
+                implements = existing_cap.get("implements", {})
+                implements[t_id] = imp_desc
+                existing_cap["implements"] = implements
+                
+                op_k_ids = op.get("knowledge_ids", [])
+                if op_k_ids:
+                    existing_k_ids = set(existing_cap.get("knowledge_ids", []))
+                    new_k_ids = [k for k in op_k_ids if k not in existing_k_ids]
+                    if new_k_ids:
+                        existing_k_ids.update(new_k_ids)
+                        existing_cap["knowledge_ids"] = list(existing_k_ids)
+                        stats["knowledge_inherited"] += len(new_k_ids)
+                
+                cap_store.insert_or_update(existing_cap)
+                stats["attached"] += 1
+
+        return ToolResult(
+            title="✅ 强同步萃取完成",
+            output=f"强同步萃取完毕并入库: 新生能力 {stats['created']}, 修缮扩写 {stats['updated']}, 同化挂载 {stats['attached']} (沿袭知识网脉络 {stats['knowledge_inherited']} 条).\n\n详情记录:\n" + json.dumps(ops, ensure_ascii=False, indent=2)
+        )
+
+    except Exception as e:
+        logger.error(f"Sync capability extraction failed: {e}")
+        cap_store.conn.rollback()
+        return ToolResult(title="❌ 系统异常", output=f"执行时发生错误: {str(e)}", error=str(e))
+    finally:
+        k_cursor.close()
+        cap_store.close()
+        tool_store.close()

+ 27 - 0
knowhub/server.py

@@ -980,6 +980,15 @@ class KnowledgeAskResponse(BaseModel):
     sources: list[dict] = []  # [{id, task, content}]
 
 
+class KnowledgeResearchRequest(BaseModel):
+    query: str
+    trace_id: str  # 必填:调用方的 trace_id,用于续跑
+
+class KnowledgeResearchResponse(BaseModel):
+    response: str
+    source_ids: list[str] = []
+    sources: list[dict] = []
+
 class KnowledgeUploadRequest(BaseModel):
     data: dict  # {tools, resources, knowledge}
     trace_id: str  # 必填:调用方的 trace_id
@@ -1004,6 +1013,24 @@ async def ask_knowledge_api(req: KnowledgeAskRequest):
         raise HTTPException(status_code=500, detail=str(e))
 
 
+@app.post("/api/knowledge/research")
+async def research_knowledge_api(req: KnowledgeResearchRequest):
+    """
+    智能深度调研。运行 Research Agent 执行全网搜集和总结,返回深度调研结果。
+    同步阻塞:Agent 运行完成后返回。
+    """
+    try:
+        from agents.research import research
+        result = await research(query=req.query, caller_trace_id=req.trace_id)
+        return KnowledgeResearchResponse(**result)
+
+    except Exception as e:
+        import traceback
+        traceback.print_exc()
+        print(f"[Knowledge Research] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
 @app.post("/api/knowledge/upload", status_code=202)
 async def upload_knowledge_api(req: KnowledgeUploadRequest, background_tasks: BackgroundTasks):
     """