Procházet zdrojové kódy

Merge remote-tracking branch 'refs/remotes/origin/main'

Talegorithm před 11 hodinami
rodič
revize
fb9aed12bb

+ 211 - 4
agent/core/runner.py

@@ -67,8 +67,8 @@ class ContextUsage:
 
 @dataclass
 class SideBranchContext:
-    """侧分支上下文(压缩/反思)"""
-    type: Literal["compression", "reflection"]
+    """侧分支上下文(压缩/反思/知识评估)"""
+    type: Literal["compression", "reflection", "knowledge_eval"]
     branch_id: str
     start_head_seq: int          # 侧分支起点的 head_seq
     start_sequence: int          # 侧分支第一条消息的 sequence
@@ -783,6 +783,28 @@ class AgentRunner:
         if not needs_compression:
             return history, head_seq, sequence, False
 
+        # 检查是否有待评估知识(压缩前必须先评估)
+        if self.trace_store and not config.force_side_branch:
+            pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
+            if pending:
+                # 设置侧分支队列:反思 → 知识评估 → 压缩
+                # 反思放在前面,确保反思期间完成的 goal 产生的新知识也能在压缩前被评估
+                if config.knowledge.enable_extraction:
+                    config.force_side_branch = ["reflection", "knowledge_eval", "compression"]
+                else:
+                    config.force_side_branch = ["knowledge_eval", "compression"]
+
+                # 在 trace.context 中设置触发事件
+                trace = await self.trace_store.get_trace(trace_id)
+                if trace:
+                    if not trace.context:
+                        trace.context = {}
+                    trace.context["knowledge_eval_trigger"] = "compression"
+                    await self.trace_store.update_trace(trace_id, context=trace.context)
+
+                logger.info(f"[Knowledge Eval] 压缩前触发知识评估,待评估: {len(pending)} 条")
+                return history, head_seq, sequence, True
+
         # 知识提取:在任何压缩发生前,用完整 history 做反思(进入反思侧分支)
         if config.knowledge.enable_extraction and not config.force_side_branch:
             # 设置侧分支队列:先反思,再压缩
@@ -846,6 +868,74 @@ class AgentRunner:
 
         return history, head_seq, sequence, False
 
+    async def _build_knowledge_eval_prompt(
+        self,
+        trace_id: str,
+        goal_tree: Optional[GoalTree]
+    ) -> str:
+        """构建知识评估 prompt"""
+        if not self.trace_store:
+            return ""
+
+        pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
+        if not pending:
+            return ""
+
+        # 获取mission
+        trace = await self.trace_store.get_trace(trace_id)
+        mission = trace.task if trace else "未知任务"
+
+        # 获取当前Goal
+        current_goal = goal_tree.find(goal_tree.current_id) if goal_tree and goal_tree.current_id else None
+        goal_desc = current_goal.description if current_goal else "无当前目标"
+
+        # 构建知识列表
+        knowledge_list = []
+        for idx, entry in enumerate(pending, 1):
+            knowledge_list.append(
+                f"### 知识 {idx}: {entry['knowledge_id']}\n"
+                f"- task: {entry['task']}\n"
+                f"- content: {entry['content']}\n"
+                f"- 注入于: sequence {entry['injected_at_sequence']}, goal {entry['goal_id']}"
+            )
+
+        prompt = f"""你是知识评估助手。请评估以下知识在本次任务执行中的实际效果。
+
+## 当前任务(Mission)
+{mission}
+
+## 当前 Goal
+{goal_desc}
+
+## 待评估知识列表
+{chr(10).join(knowledge_list)}
+
+## 评估维度
+1. **helpfulness**: 知识内容是否对完成任务有实质帮助?
+2. **relevance**: 执行过程中是否体现了该知识的内容?
+
+## 评估分类
+- irrelevant: task与当前任务无关
+- unused: 相关但未使用
+- helpful: 有帮助
+- harmful: 有负面作用
+- neutral: 无明显作用
+
+## 输出格式
+请直接输出评估结果,使用JSON格式:
+
+{{
+  "evaluations": [
+    {{
+      "knowledge_id": "knowledge-xxx",
+      "eval_status": "helpful",
+      "reason": "1-2句评估理由"
+    }}
+  ]
+}}
+"""
+        return prompt
+
     async def _single_turn_compress(
         self,
         trace_id: str,
@@ -1052,6 +1142,17 @@ class AgentRunner:
                         yield trace_obj
                 return
 
+            # 检查Goal完成触发的知识评估
+            if not side_branch_ctx and self.trace_store:
+                trace = await self.trace_store.get_trace(trace_id)
+                if trace and trace.context and trace.context.get("pending_knowledge_eval"):
+                    # 清除标志
+                    trace.context.pop("pending_knowledge_eval", None)
+                    await self.trace_store.update_trace(trace_id, context=trace.context)
+                    # 设置侧分支队列
+                    config.force_side_branch = ["knowledge_eval"]
+                    logger.info("[Knowledge Eval] 检测到Goal完成触发,进入知识评估侧分支")
+
             # Context 管理(仅主路径)
             needs_enter_side_branch = False
             if not side_branch_ctx:
@@ -1071,9 +1172,15 @@ class AgentRunner:
 
             # 进入侧分支
             if needs_enter_side_branch and not side_branch_ctx:
+                # 刷新 trace,获取 _manage_context_usage 可能写入 DB 的 knowledge_eval_trigger
+                if self.trace_store:
+                    fresh = await self.trace_store.get_trace(trace_id)
+                    if fresh:
+                        trace = fresh
                 # 从队列中取出第一个侧分支类型
+                branch_type: Literal["compression", "reflection", "knowledge_eval"]
                 if config.force_side_branch and isinstance(config.force_side_branch, list) and len(config.force_side_branch) > 0:
-                    branch_type = config.force_side_branch.pop(0)
+                    branch_type = config.force_side_branch.pop(0)  # type: ignore
                     logger.info(f"从队列取出侧分支: {branch_type}, 剩余队列: {config.force_side_branch}")
                 elif config.knowledge.enable_extraction:
                     # 兼容旧的单值模式(如果 force_side_branch 是字符串)
@@ -1096,6 +1203,9 @@ class AgentRunner:
 
                 # 持久化侧分支状态
                 if self.trace_store:
+                    # 获取触发事件(如果是 knowledge_eval 分支)
+                    trigger_event = trace.context.get("knowledge_eval_trigger", "unknown") if branch_type == "knowledge_eval" else None
+
                     trace.context["active_side_branch"] = {
                         "type": side_branch_ctx.type,
                         "branch_id": side_branch_ctx.branch_id,
@@ -1105,6 +1215,13 @@ class AgentRunner:
                         "max_turns": side_branch_ctx.max_turns,
                         "started_at": datetime.now().isoformat(),
                     }
+
+                    # 如果是 knowledge_eval 分支,添加 trigger_event
+                    if trigger_event:
+                        trace.context["active_side_branch"]["trigger_event"] = trigger_event
+                        # 清除触发事件标记
+                        trace.context.pop("knowledge_eval_trigger", None)
+
                     await self.trace_store.update_trace(
                         trace_id,
                         context=trace.context
@@ -1113,6 +1230,8 @@ class AgentRunner:
                 # 追加侧分支 prompt
                 if branch_type == "reflection":
                     prompt = config.knowledge.get_reflect_prompt()
+                elif branch_type == "knowledge_eval":
+                    prompt = await self._build_knowledge_eval_prompt(trace_id, goal_tree)
                 else:  # compression
                     from agent.trace.compaction import build_compression_prompt
                     prompt = build_compression_prompt(goal_tree)
@@ -1249,6 +1368,48 @@ class AgentRunner:
                     cache_read_tokens=cache_read_tokens or 0,
                 )
 
+            # 知识评估侧分支:即时检测并写入评估结果
+            if side_branch_ctx and side_branch_ctx.type == "knowledge_eval":
+                text = response_content if isinstance(response_content, str) else ""
+                eval_results = None
+
+                try:
+                    eval_results = json.loads(text.strip())
+                    if "evaluations" not in eval_results:
+                        eval_results = None
+                except json.JSONDecodeError:
+                    import re
+                    json_match = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL)
+                    if json_match:
+                        try:
+                            eval_results = json.loads(json_match.group(1))
+                        except json.JSONDecodeError:
+                            pass
+
+                    if not eval_results:
+                        json_match = re.search(r'\{[^{]*"evaluations"[^}]*\[[^\]]*\][^}]*\}', text, re.DOTALL)
+                        if json_match:
+                            try:
+                                eval_results = json.loads(json_match.group(0))
+                            except json.JSONDecodeError:
+                                pass
+
+                if eval_results and self.trace_store:
+                    current_trace = await self.trace_store.get_trace(trace_id)
+                    trigger_event = current_trace.context.get("active_side_branch", {}).get("trigger_event", "unknown")
+
+                    for eval_item in eval_results.get("evaluations", []):
+                        await self.trace_store.update_knowledge_evaluation(
+                            trace_id=trace_id,
+                            knowledge_id=eval_item["knowledge_id"],
+                            eval_result={
+                                "eval_status": eval_item["eval_status"],
+                                "reason": eval_item.get("reason", "")
+                            },
+                            trigger_event=trigger_event
+                        )
+                    logger.info(f"[Knowledge Eval] 已写入 {len(eval_results.get('evaluations', []))} 条评估结果")
+
             # 如果在侧分支,记录到 assistant_msg(已持久化,不需要额外维护)
 
             yield assistant_msg
@@ -1386,6 +1547,30 @@ class AgentRunner:
                     side_branch_ctx = None
                     continue
 
+                elif should_exit and side_branch_ctx.type == "knowledge_eval":
+                    # === 知识评估侧分支退出 ===
+                    logger.info("知识评估侧分支退出")
+
+                    # 恢复主路径
+                    if self.trace_store:
+                        main_path_messages = await self.trace_store.get_main_path_messages(
+                            trace_id, side_branch_ctx.start_head_seq
+                        )
+                        history = [m.to_llm_dict() for m in main_path_messages]
+                        head_seq = side_branch_ctx.start_head_seq
+
+                    # 清理
+                    trace.context.pop("active_side_branch", None)
+                    if not config.force_side_branch or len(config.force_side_branch) == 0:
+                        config.force_side_branch = None
+                        logger.info("知识评估完成,队列为空")
+                    if self.trace_store:
+                        await self.trace_store.update_trace(
+                            trace_id, context=trace.context, head_sequence=head_seq,
+                        )
+                    side_branch_ctx = None
+                    continue
+
             # 处理工具调用
             # 截断兜底:finish_reason == "length" 说明响应被 max_tokens 截断,
             # tool call 参数很可能不完整,不应执行,改为提示模型分批操作
@@ -1451,6 +1636,13 @@ class AgentRunner:
                     args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
                     logger.info(f"[Tool Call] {tool_name}({args_display})")
 
+                    # 获取trigger_event(如果在knowledge_eval侧分支中)
+                    trigger_event_for_tool = None
+                    if side_branch_ctx and side_branch_ctx.type == "knowledge_eval" and self.trace_store:
+                        current_trace = await self.trace_store.get_trace(trace_id)
+                        if current_trace:
+                            trigger_event_for_tool = current_trace.context.get("active_side_branch", {}).get("trigger_event", "unknown")
+
                     tool_result = await self.tools.execute(
                         tool_name,
                         tool_args,
@@ -1462,12 +1654,14 @@ class AgentRunner:
                             "runner": self,
                             "goal_tree": goal_tree,
                             "knowledge_config": config.knowledge,
+                            "sequence": sequence,  # 添加sequence用于知识注入记录
                             # 新增:侧分支信息
                             "side_branch": {
                                 "type": side_branch_ctx.type,
                                 "branch_id": side_branch_ctx.branch_id,
                                 "is_side_branch": True,
                                 "max_turns": side_branch_ctx.max_turns,
+                                "trigger_event": trigger_event_for_tool,
                             } if side_branch_ctx else None,
                         },
                     )
@@ -1609,7 +1803,20 @@ class AgentRunner:
 
             # 无工具调用
             # 如果在侧分支中,已经在上面处理过了(不会走到这里)
-            # 主路径无工具调用 → 任务完成,检查是否需要完成后反思
+            # 主路径无工具调用 → 任务完成,检查是否需要完成后反思或知识评估
+
+            # 检查是否有待评估的知识
+            if not side_branch_ctx and self.trace_store:
+                pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
+                if pending:
+                    logger.info(f"任务即将结束,但仍有 {len(pending)} 条知识未评估,强制触发评估")
+                    config.force_side_branch = ["knowledge_eval"]
+                    trace = await self.trace_store.get_trace(trace_id)
+                    if trace:
+                        trace.context["knowledge_eval_trigger"] = "task_completion"
+                        await self.trace_store.update_trace(trace_id, context=trace.context)
+                    continue
+
             if not side_branch_ctx and config.knowledge.enable_completion_extraction and not break_after_side_branch:
                 config.force_side_branch = ["reflection"]
                 break_after_side_branch = True

+ 21 - 2
agent/trace/goal_tool.py

@@ -24,6 +24,7 @@ async def inject_knowledge_for_goal(
     store: Optional["TraceStore"] = None,
     trace_id: Optional[str] = None,
     knowledge_config: Optional[dict] = None,
+    sequence: Optional[int] = None,
 ) -> Optional[str]:
     """
     为指定 goal 注入相关知识。
@@ -34,6 +35,7 @@ async def inject_knowledge_for_goal(
         store: TraceStore(用于持久化)
         trace_id: Trace ID
         knowledge_config: 知识管理配置(KnowledgeConfig 对象)
+        sequence: 当前消息序列号(用于记录注入时机)
 
     Returns:
         注入结果描述(如 "📚 已注入 3 条相关知识"),无结果返回 None
@@ -74,6 +76,19 @@ async def inject_knowledge_for_goal(
             if store and trace_id:
                 await store.update_goal_tree(trace_id, tree)
 
+                # 写入 knowledge_log
+                if sequence is not None:
+                    for item in goal.knowledge:
+                        await store.append_knowledge_entry(
+                            trace_id=trace_id,
+                            knowledge_id=item.get("id", ""),
+                            goal_id=goal.id,
+                            injected_at_sequence=sequence,
+                            task=item.get("task", ""),
+                            content=item.get("content", "")
+                        )
+                    logger.info(f"[Knowledge Inject] 已记录 {knowledge_count} 条知识到 knowledge_log")
+
             return f"📚 已注入 {knowledge_count} 条相关知识"
         else:
             goal.knowledge = []
@@ -136,7 +151,8 @@ async def goal(
         done=done,
         abandon=abandon,
         focus=focus,
-        knowledge_config=knowledge_config
+        knowledge_config=knowledge_config,
+        context=context
     )
 
 
@@ -155,6 +171,7 @@ async def goal_tool(
     abandon: Optional[str] = None,
     focus: Optional[str] = None,
     knowledge_config: Optional[object] = None,
+    context: Optional[dict] = None,
 ) -> str:
     """
     管理执行计划。
@@ -213,7 +230,9 @@ async def goal_tool(
         changes.append(f"切换焦点: {display_id}. {goal.description}")
 
         # 自动注入知识
-        inject_msg = await inject_knowledge_for_goal(goal, tree, store, trace_id, knowledge_config)
+        inject_msg = await inject_knowledge_for_goal(
+            goal, tree, store, trace_id, knowledge_config, sequence=context.get("sequence")
+        )
         if inject_msg:
             changes.append(inject_msg)
 

+ 2 - 2
agent/trace/models.py

@@ -178,7 +178,7 @@ class Message:
     content: Any = None                  # 消息内容(和 LLM API 格式一致)
 
     # 侧分支标记
-    branch_type: Optional[Literal["compression", "reflection"]] = None  # 侧分支类型(None = 主路径)
+    branch_type: Optional[Literal["compression", "reflection", "knowledge_eval"]] = None  # 侧分支类型(None = 主路径)
     branch_id: Optional[str] = None      # 侧分支 ID(同一侧分支的消息共享)
 
     # 元数据
@@ -316,7 +316,7 @@ class Message:
         content: Any = None,
         tool_call_id: Optional[str] = None,
         parent_sequence: Optional[int] = None,
-        branch_type: Optional[Literal["compression", "reflection"]] = None,
+        branch_type: Optional[Literal["compression", "reflection", "knowledge_eval"]] = None,
         branch_id: Optional[str] = None,
         prompt_tokens: Optional[int] = None,
         completion_tokens: Optional[int] = None,

+ 89 - 0
agent/trace/store.py

@@ -249,6 +249,20 @@ class FileSystemTraceStore:
         })
         print(f"[DEBUG] Pushed goal_updated event: goal_id={goal_id}, updates={updates}, affected={len(affected_goals)}")
 
+        # Goal 完成时触发知识评估
+        if updates.get("status") in ["completed", "abandoned"]:
+            pending = await self.get_pending_knowledge_entries(trace_id)
+            if pending:
+                # 在trace.context中设置标志,由runner主循环检查
+                trace = await self.get_trace(trace_id)
+                if trace:
+                    if not trace.context:
+                        trace.context = {}
+                    trace.context["pending_knowledge_eval"] = True
+                    trace.context["knowledge_eval_trigger"] = "goal_completion"
+                    await self.update_trace(trace_id, context=trace.context)
+                    logger.info(f"[Knowledge Eval] Goal {goal_id} 完成,设置评估标志,待评估知识: {len(pending)} 条")
+
     async def _check_cascade_completion(
         self,
         trace_id: str,
@@ -750,3 +764,78 @@ class FileSystemTraceStore:
             f.write(json.dumps(event, ensure_ascii=False) + '\n')
 
         return event_id
+
+    # ===== Knowledge Log 管理 =====
+
+    def _get_knowledge_log_file(self, trace_id: str) -> Path:
+        """获取 knowledge_log.json 文件路径"""
+        return self._get_trace_dir(trace_id) / "knowledge_log.json"
+
+    async def get_knowledge_log(self, trace_id: str) -> Dict[str, Any]:
+        """读取知识日志"""
+        log_file = self._get_knowledge_log_file(trace_id)
+        if not log_file.exists():
+            return {"trace_id": trace_id, "entries": []}
+        return json.loads(log_file.read_text(encoding="utf-8"))
+
+    async def append_knowledge_entry(
+        self,
+        trace_id: str,
+        knowledge_id: str,
+        goal_id: str,
+        injected_at_sequence: int,
+        task: str,
+        content: str
+    ) -> None:
+        """追加知识注入记录"""
+        log = await self.get_knowledge_log(trace_id)
+        log["entries"].append({
+            "knowledge_id": knowledge_id,
+            "goal_id": goal_id,
+            "injected_at_sequence": injected_at_sequence,
+            "injected_at": datetime.now().isoformat(),
+            "task": task,
+            "content": content[:500],  # 限制长度
+            "eval_result": None,
+            "evaluated_at": None,
+            "evaluated_at_trigger": None
+        })
+        log_file = self._get_knowledge_log_file(trace_id)
+        log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8")
+
+    async def update_knowledge_evaluation(
+        self,
+        trace_id: str,
+        knowledge_id: str,
+        eval_result: Dict[str, Any],
+        trigger_event: str
+    ) -> None:
+        """更新知识评估结果
+
+        当同一个knowledge_id在不同goal中被多次注入时,
+        优先更新最近一个未评估的条目(按injected_at_sequence倒序)
+        """
+        log = await self.get_knowledge_log(trace_id)
+
+        # 找到所有匹配且未评估的条目
+        matching_entries = [
+            (i, entry) for i, entry in enumerate(log["entries"])
+            if entry["knowledge_id"] == knowledge_id and entry["eval_result"] is None
+        ]
+
+        if matching_entries:
+            # 按injected_at_sequence倒序排序,取最近的一个
+            matching_entries.sort(key=lambda x: x[1]["injected_at_sequence"], reverse=True)
+            idx, entry = matching_entries[0]
+
+            entry["eval_result"] = eval_result
+            entry["evaluated_at"] = datetime.now().isoformat()
+            entry["evaluated_at_trigger"] = trigger_event
+
+        log_file = self._get_knowledge_log_file(trace_id)
+        log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8")
+
+    async def get_pending_knowledge_entries(self, trace_id: str) -> List[Dict[str, Any]]:
+        """获取所有待评估的知识条目"""
+        log = await self.get_knowledge_log(trace_id)
+        return [e for e in log["entries"] if e["eval_result"] is None]

+ 392 - 0
knowhub/docs/dedup-design.md

@@ -0,0 +1,392 @@
+# 知识入库前智能去重与关系判断系统 — 设计文档
+
+## 文档维护规范
+
+0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖
+1. **文档分层,链接代码** - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:`module/file.py:function_name`
+2. **简洁快照,日志分离** - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议、决策历史、修改日志、大量代码;决策依据或修改日志若有必要,可在 `knowhub/docs/decisions.md` 另行记录
+
+---
+
+## 可行性结论
+
+**整体可行,无阻塞性问题。**
+
+---
+
+## 一、去重流程
+
+```
+新知识进入 (status=pending)
+         │
+         ▼
+[Step 1] 复用已存储的 embedding(入队时已生成,不重复调用)
+         │
+         ▼
+[Step 2] 向量召回 top-10 相似知识
+         filter: status == "approved" or status == "checked"
+         │
+         ▼
+[Step 2.5] 相似度预过滤(阈值 0.75)
+         过滤掉 COSINE score < 0.75 的候选
+         无候选 → 直接 approved
+         │
+         ▼
+[Step 3] LLM 关系判断(见第五节 Prompt)
+         LLM 自主判断关系类型和 final_decision
+         │
+    ┌────┴──────────────────────────────────┐
+    ▼                                       ▼
+final_decision=rejected              final_decision=approved
+旧知识 helpful+1(记录到 history)    双向写入 relationships
+                                     更新关系缓存表
+```
+
+---
+
+## 二、同 task 下多条知识的处理策略
+
+只拒绝 `duplicate` 和 `subset`,其他关系两条都保留,并**双向写入**关系标注。引入**关系缓存表**管理关系复杂度。
+
+---
+
+## 三、关系类型定义与保存方式
+
+### 关系类型
+
+关系类型是**开放的**,LLM 可以根据实际情况提出新的关系类型,并自行判断对应的处理动作(approved/rejected)。
+
+| type | 含义                                       | 处理动作 |
+|---|------------------------------------------|---|
+| `duplicate` | task 和 content 语义完全相同                    | 新知识 **rejected**,旧知识 helpful+1 |
+| `subset` | task语义一致,新知识信息被旧知识完全覆盖                   | 新知识 **rejected** |
+| `superset` | task语义一致,新知识比旧知识更全面                      | 两条都 **approved** |
+| `conflict` | 同一 task 下结论矛盾                            | 两条都 **approved** |
+| `complement` | 同一 task 的不同角度,互补                         | 两条都 **approved** |
+| `none` | task 语义不同,或无实质关系(**task 不同时必须判定为 none**) | 新知识直接 **approved**,不写入关系 |
+| *(LLM 自定义)* | LLM 发现的其他关系类型                            | 由 LLM 自行判断 |
+
+### 关系的方向性与双向标注
+
+所有关系都是**有向的**,且**双向写入**:两条知识的 `relationships` 字段都会记录对方,但各自记录的是**从自己出发的出边**。
+
+以 A superset B 为例:
+- A 的 relationships 追加:`{type: "superset", target: "B"}` (A 包含 B)
+- B 的 relationships 追加:`{type: "subset", target: "A"}` (B 被 A 包含)
+
+以 A conflict B 为例:
+- A 的 relationships 追加:`{type: "conflict", target: "B"}`
+- B 的 relationships 追加:`{type: "conflict", target: "A"}`
+
+### 写入规则
+
+- `final_decision = "rejected"`:新知识 status=rejected,**不写入任何 relationships**;遍历 relations,对所有 type 为 `duplicate` 或 `subset` 的旧知识 helpful+1,记录到 helpful_history
+- `final_decision = "approved"`:新知识 status=approved;遍历 relations,对所有 type 不是 `none` 的关系双向写入 relationships,同时更新关系缓存表
+- `none`:不写入 relationships,不更新缓存表
+
+### 关系缓存表
+
+实现位置:`knowhub/server.py:RelationCache`
+
+独立于知识条目存储,结构如下:
+
+```json
+{
+  "conflict":    ["knowledge-A", "knowledge-B", "knowledge-C"],
+  "superset":    ["knowledge-D", "knowledge-E"],
+  "complement":  ["knowledge-F", "knowledge-G"],
+  "custom_type": ["knowledge-I"]
+}
+```
+
+每个关系类型对应一个列表,记录**所有参与该关系的知识 ID**(不区分方向)。LLM 提出新关系类型时,自动在缓存表中新增对应字段。
+
+---
+
+## 四、更新后的知识条目数据结构
+
+### 新增 2 个字段
+
+| 字段 | 类型 | 默认值 | 说明 |
+|---|---|---|---|
+| `status` | VARCHAR(20) | `"pending"` | 入库状态:pending / processing / approved / checked / rejected |
+| `relationships` | JSON | `[]` | 与其他知识的关系列表 |
+
+### status 字段语义
+
+| 值 | 含义 | 可被检索 |
+|---|---|---|
+| `pending` | 刚入队,等待处理 | 否 |
+| `processing` | 正在处理(防并发乐观锁) | 否 |
+| `approved` | 已通过去重,正式入库 | 是 |
+| `checked` | 经人类审核确认 | 是 |
+| `rejected` | 被判定为重复,已丢弃 | 否 |
+
+### relationships 字段结构
+
+每条记录代表一条**出边**(从当前知识出发的关系):
+
+```json
+[
+  {
+    "type": "superset",
+    "target": "knowledge-20260305-a1b2"
+  }
+]
+```
+
+### helpful_history / harmful_history 格式
+
+实现位置:`knowhub/server.py:KnowledgeProcessor._apply_decision`
+
+```json
+{
+  "helpful_history": [
+    {
+      "source": "dedup",
+      "related_id": "knowledge-20260317-new-xxxx",
+      "relation_type": "duplicate",
+      "timestamp": 1710000000
+    }
+  ]
+}
+```
+
+- `source: "dedup"`:标识这条反馈来自去重流程
+- `related_id`:触发这次反馈的新知识 ID(被 rejected 的那条)
+- `relation_type`:触发反馈的关系类型
+
+### 完整知识条目结构
+
+```json
+{
+  "id": "knowledge-20260317-143022-a1b2",
+  "embedding": [...],
+  "message_id": "",
+  "task": "...",
+  "content": "...",
+  "types": ["strategy"],
+  "tags": {},
+  "tag_keys": [],
+  "scopes": ["org:cybertogether"],
+  "owner": "agent:runner",
+  "resource_ids": [],
+  "source": {},
+  "eval": {
+    "score": 3,
+    "helpful": 1,
+    "harmful": 0,
+    "confidence": 0.7,
+    "helpful_history": [],
+    "harmful_history": []
+  },
+  "created_at": 1710000000,
+  "updated_at": 1710000000,
+  "status": "pending",
+  "relationships": []
+}
+```
+
+---
+
+## 五、LLM 关系判断 Prompt
+
+实现位置:`knowhub/server.py:KnowledgeProcessor._llm_judge_relations`
+
+```python
+DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。
+
+【新知识】
+Task: {new_task}
+Content: {new_content}
+
+【相似知识列表】(向量召回 top-10,按相似度排序)
+{existing_list}
+格式: [序号] ID: xxx | Task: xxx | Content: xxx
+
+【已知关系类型参考】
+- duplicate: task 和 content 语义完全相同,无新增信息
+- subset: task语义一致,新知识的content信息完全被某条已有知识覆盖
+- superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容
+- conflict: 同一 task 下给出相互矛盾的结论
+- complement: 描述同一 task 的不同方面,互补
+- none: task 语义不同,或无实质关系(task 不同时必须判定为 none,只有 task 语义一致才可能存在其他关系)
+
+**重要**:如果以上类型无法准确描述关系,你可以自定义新的关系类型(英文小写下划线命名),并自行判断新知识应该 approved 还是 rejected。
+
+【输出格式】(严格 JSON,不要其他内容)
+{{
+  "final_decision": "approved",
+  "relations": [
+    {{
+      "old_id": "knowledge-xxx",
+      "type": "superset",
+      "reverse_type": "subset"
+    }}
+  ]
+}}
+
+"""
+```
+
+### LLM 输出字段的处理逻辑
+
+实现位置:`knowhub/server.py:KnowledgeProcessor._apply_decision`
+
+**final_decision**: "approved" 或 "rejected"
+- 用途:设置新知识的 status 字段
+- 只要 relations 中有任意一条 type 为 duplicate 或 subset,LLM 应输出 rejected
+
+**relations**: 关系列表
+- **old_id**: 旧知识 ID
+  - 用途:定位需要更新的旧知识记录
+- **type**: 从新知识指向旧知识的关系类型
+  - 如果 final_decision="rejected":仅对 type="duplicate" 或 type="subset" 的旧知识 eval.helpful +1,写入 helpful_history;其余关系忽略
+  - 如果 final_decision="approved" 且 type 不是 "none":新知识的 relationships 追加 `{"type": type, "target": old_id}`,同时更新关系缓存表
+- **reverse_type**: 从旧知识指向新知识的反向关系类型
+  - 仅在 final_decision="approved" 且 reverse_type 不是 "none" 时:旧知识的 relationships 追加 `{"type": reverse_type, "target": new_id}`
+
+---
+
+## 六、异步处理架构
+
+### 整体架构
+
+```
+POST /api/knowledge
+  → 生成 embedding
+  → 插入 Milvus (status=pending)
+  → 立即返回 {"status": "pending", "knowledge_id": "..."}
+  → background_tasks.add_task(processor.process_pending)  ← 非阻塞触发
+
+KnowledgeProcessor(后台处理器)
+  → 查询所有 status=pending 的知识(每批50条)
+  → 逐条处理:pending → processing → approved/rejected
+  → asyncio.Lock 防止并发
+
+定时兜底(每60秒)
+  → asyncio.create_task(_periodic_processor())
+  → 检测超时的 processing 条目(>5分钟)并回滚到 pending
+```
+
+### 错误处理策略
+
+| 场景 | 处理方式 |
+|---|---|
+| LLM 调用失败 | 重试 2 次,仍失败则 status=approved(宁可放行,不丢数据) |
+| LLM 输出无法解析 | 同上,fallback 到 approved |
+| 处理超时(>5分钟) | 定时任务检测 processing 状态并回滚到 pending |
+| 并发写入相同知识 | processing 状态作为乐观锁,第二个处理器跳过 |
+| task 语义不相关(score < 0.75) | 预过滤直接排除,不进入 LLM 判断,视为 none |
+
+---
+
+## 七、API 接口设计
+
+### 新增/改造接口
+
+| 接口 | 变化 |
+|---|---|
+| `POST /api/knowledge` | 插入 status=pending,触发后台任务,立即返回 pending 状态 |
+| `POST /api/extract` | 批量插入时每条 status=pending,插入后触发后台任务 |
+| `POST /api/knowledge/slim` | 重建知识时显式传入 status=approved,跳过去重(已精炼知识) |
+| `GET /api/knowledge` | 追加 `status in ["approved", "checked"]` 过滤 |
+| `GET /api/knowledge/search` | 追加 `status in ["approved", "checked"]` 过滤 |
+| `POST /api/knowledge/migrate` | **新增**:手动触发 schema 迁移(中转 collection 模式),返回迁移条数 |
+| `GET /api/knowledge/pending` | **新增**:查询待处理队列 |
+| `POST /api/knowledge/process` | **新增**:手动触发处理,`force=true` 可回滚卡死的 processing 条目 |
+| `GET /api/knowledge/status/{id}` | **新增**:查询单条知识的处理状态和关系 |
+
+### POST /api/knowledge 响应变化
+
+```json
+{
+  "status": "pending",
+  "knowledge_id": "knowledge-20260317-143022-a1b2",
+  "message": "知识已入队,正在处理去重..."
+}
+```
+
+### 迁移脚本处理
+
+`migrate_knowledge.py`:历史数据迁移,迁移的是已存在的知识,插入时显式传入 `status="approved"`,`relationships=[]`,跳过去重流程。
+
+---
+
+## 八、Milvus 关系筛选可行性
+
+### 可行的查询
+
+```python
+# status 过滤(高效,建议加 Trie 索引)
+'status == "approved"'
+'status == "pending" or status == "processing"'
+
+# relationships 非空(Milvus 2.3+ JSON 查询)
+'json_length(relationships) > 0'
+```
+
+### 关系查询方案
+
+**正向查询**(从知识 A 查询它的所有关系):直接读取 A 的 `relationships` 字段,O(1)。
+
+**反向查询**(查询"哪些知识与 A 有 conflict 关系"):通过**关系缓存表**实现,无需全表扫描。
+
+**复杂查询**(查询"所有存在 conflict 关系的知识对"):直接读取关系缓存表的 `conflict` 字段。
+
+### 性能评估
+
+| 查询类型 | 方案 | 性能 |
+|---|---|---|
+| status 过滤 | Milvus Trie 索引 | 极快 |
+| 向量召回 + status 过滤 | HNSW + 标量过滤 | 快(现有机制) |
+| relationships 正向读取 | 直接读 JSON 字段 | O(1) |
+| relationships 反向/复杂查询 | 关系缓存表 | O(1) |
+
+---
+
+## 九、实现步骤与文件清单
+
+### 关键文件修改清单
+
+| 文件 | 修改内容 |
+| --- | --- |
+| `knowhub/vector_store.py` | 新增 status/relationships 字段;更新所有 output_fields;为 status 添加 Trie 索引 |
+| `knowhub/server.py` | 新增 `KnowledgeProcessor` 类(~200行);改造 `save_knowledge` / `extract_knowledge_from_messages`;改造 list/search 追加 status 过滤;新增 3 个接口;更新 `KnowledgeIn` 模型;实现关系缓存表管理 |
+| `migrate_knowledge.py` | 插入时显式传入 `status="approved"`,`relationships=[]` |
+
+### 实现阶段
+
+**Phase 1 — Schema 扩展**(`knowhub/vector_store.py`)
+1. 新增 2 个字段:status、relationships
+2. 更新 search/query/get_by_id 的 output_fields
+3. 为 status 添加 Trie 标量索引
+4. 初始化关系缓存表存储
+
+**Phase 2 — 处理器核心逻辑**(`knowhub/server.py`)
+1. 实现 `KnowledgeProcessor` 类
+2. 实现 `_llm_judge_relations` 方法(使用上面的 Prompt)
+3. 实现 `_apply_decision` 方法(写入 status 和 relationships,同步更新关系缓存表)
+4. 在 `lifespan` 中初始化处理器实例 + 启动定时任务
+5. 实现关系缓存表的读写接口
+
+**Phase 3 — API 改造**(`knowhub/server.py`)
+1. 改造 `POST /api/knowledge`:status=pending,触发后台任务
+2. 改造 `GET /api/knowledge` 和 `GET /api/knowledge/search`:追加 status 过滤
+3. 新增 3 个接口:pending / process / status/{id}
+
+### 数据迁移方案
+
+Milvus Lite 不支持 ALTER COLLECTION 和 rename_collection,采用**软兼容 + 手动触发迁移接口**策略:
+
+- **平时(软兼容)**:读取时用 `.get("status", "approved")` / `.get("relationships", []) or []` 兼容旧数据,旧数据被视为 approved,不影响检索和去重逻辑
+- **迁移(手动触发 `POST /api/knowledge/migrate`)**:采用"中转 collection"模式(Milvus Lite 不支持 rename):
+  1. 创建 `knowledge_migration`(新 schema)
+  2. 从 `knowledge` 逐条读取,补 `status="approved"`, `relationships=[]`,插入 `knowledge_migration`
+  3. drop `knowledge`
+  4. 创建 `knowledge`(新 schema,空)
+  5. 从 `knowledge_migration` 逐条读取,插入 `knowledge`
+  6. drop `knowledge_migration`
+  7. 更新 `self.collection` 引用
+
+  实现位置:`knowhub/vector_store.py:MilvusStore.migrate_schema`

+ 269 - 0
knowhub/docs/feedback-timing-design.md

@@ -0,0 +1,269 @@
+# 知识反馈时机设计文档
+
+## 文档维护规范
+
+0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖
+1. **文档分层,链接代码** - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:`module/file.py:function_name`
+2. **简洁快照,日志分离** - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议、决策历史、修改日志、大量代码;决策依据或修改日志若有必要,可在 `knowhub/docs/decisions.md` 另行记录
+
+---
+
+## 背景
+
+### 现有反馈机制的缺陷
+
+当前的知识反馈存在以下问题(来自 `feedback-optimization-proposal.md`):
+
+- **反馈时机不明确**:没有明确定义何时、由谁来评估知识的有效性
+- **缺少使用状态追踪**:知识被注入后,无法知道它是否真的被用到了
+- **评估粒度粗糙**:只有 helpful/harmful 计数,缺少"为什么有用/无用"的上下文
+
+### 设计目标
+
+1. 记录每条知识的完整生命周期(注入 → 使用 → 评估)
+2. 在自然的执行节点(Goal 完成、压缩、任务结束)触发评估,不打断主流程
+3. 为后续上报 KnowHub 提供结构化的评估数据
+
+---
+
+## 核心概念
+
+### Knowledge Log(知识注入日志)
+
+每个 trace 维护一个 `knowledge_log.json`,记录该 trace 中所有被注入的知识及其评估状态。
+
+**位置**:`.trace/{trace_id}/knowledge_log.json`
+
+**数据结构**:
+
+```json
+{
+  "trace_id": "trace-xxx",
+  "entries": [
+    {
+      "knowledge_id": "knowledge-20260305-a1b2",
+      "goal_id": "1",
+      "injected_at_sequence": 42,
+      "injected_at": "2026-03-20T10:00:00.000000",
+      "task": "知识的原始task描述",
+      "content": "知识内容摘要(截断至500字符)",
+      "eval_result": {
+        "eval_status": "helpful",
+        "reason": "评估理由"
+      },
+      "evaluated_at": "2026-03-20T10:05:00.000000",
+      "evaluated_at_trigger": "goal_completion"
+    }
+  ]
+}
+```
+
+**字段说明**:
+
+| 字段 | 类型 | 说明 |
+|---|---|---|
+| `knowledge_id` | string | KnowHub 中的知识 ID |
+| `goal_id` | string | 注入时的 Goal ID(如 `"1"`, `"2.1"`) |
+| `injected_at_sequence` | int | 注入时的消息序列号 |
+| `injected_at` | datetime | 注入时间(ISO 格式,含毫秒) |
+| `task` | string | 知识的原始 task 描述 |
+| `content` | string | 知识内容(写入时截断至 500 字符) |
+| `eval_result` | object/null | 评估结果对象;未评估时为 `null` |
+| `evaluated_at` | datetime/null | 评估时间;未评估时为 `null` |
+| `evaluated_at_trigger` | string/null | 触发评估的事件(见下表);未评估时为 `null` |
+
+**`evaluated_at_trigger` 可能的值**:
+
+| 值 | 含义 |
+|---|---|
+| `"goal_completion"` | 由 Goal 完成(`completed` 或 `abandoned`)触发 |
+| `"compression"` | 由上下文压缩触发(压缩前必须先评估) |
+| `"task_completion"` | 由任务自然结束触发(主路径无工具调用退出时兜底) |
+
+> 注意:同一个 `knowledge_id` 可能在不同 Goal 中被多次注入,每次产生独立 entry。评估时优先更新最近注入(`injected_at_sequence` 最大)的未评估条目。
+
+---
+
+## 评估触发机制
+
+### 触发点 1:Goal 完成
+
+**时机**:Goal status 变为 `completed` 或 `abandoned`
+
+**触发逻辑**(`agent/trace/store.py:update_goal`):
+
+```
+Goal 完成
+  ↓
+查询 knowledge_log 中 eval_result == null 的条目
+  ↓
+如果有待评估条目
+  → 在 trace.context 中设置标志:
+      pending_knowledge_eval = true
+      knowledge_eval_trigger = "goal_completion"
+  ↓
+Runner 主循环下一次迭代开头检测到标志(agent/core/runner.py:_agent_loop)
+  → 清除标志
+  → 将 "knowledge_eval" 加入 force_side_branch 队列
+```
+
+### 触发点 2:压缩(Compression)
+
+**时机**:上下文 token 数超过阈值,即将执行压缩
+
+**触发逻辑**(`agent/core/runner.py:_manage_context_usage`):
+
+```
+压缩条件触发
+  ↓
+查询 knowledge_log 中 eval_result == null 的条目
+  ↓
+如果有待评估条目
+  → 在 trace.context 中设置:
+      knowledge_eval_trigger = "compression"
+  → 将侧分支队列设为:
+      ["reflection", "knowledge_eval", "compression"](启用知识提取时)
+      ["knowledge_eval", "compression"](未启用知识提取时)
+  → 返回"需要进入侧分支"信号,暂缓压缩
+  ↓
+依次执行侧分支队列后再压缩
+```
+
+**原因**:压缩会删除消息历史,必须在压缩前完成评估,否则执行上下文永久丢失。
+
+### 触发点 3:任务结束(兜底)
+
+**时机**:主路径出现无工具调用的回复,Agent 即将结束任务
+
+**触发逻辑**(`agent/core/runner.py:_agent_loop`,无工具调用分支):
+
+```
+主路径无工具调用(任务即将结束)
+  ↓
+查询 knowledge_log 中 eval_result == null 的条目
+  ↓
+如果有待评估条目
+  → 在 trace.context 中设置:
+      knowledge_eval_trigger = "task_completion"
+  → 将 ["knowledge_eval"] 加入 force_side_branch 队列
+  → continue(不 break,下一轮执行评估侧分支)
+  ↓
+评估完成后再退出
+```
+
+---
+
+## 评估分类(eval_status)
+
+| 状态 | 含义 |
+|---|---|
+| `irrelevant` | 知识的 task 与当前任务无关 |
+| `unused` | 知识与任务相关,但执行过程中没有被使用 |
+| `helpful` | 知识对当前任务有实质帮助 |
+| `harmful` | 知识对当前任务产生了负面作用 |
+| `neutral` | 知识与任务相关但无明显影响 |
+
+---
+
+## 侧分支评估流程
+
+### 侧分支类型
+
+复用现有 `SideBranchContext` 机制,新增 `"knowledge_eval"` 类型(`agent/trace/models.py:Message.branch_type`):
+
+```python
+SideBranchContext(
+    type="knowledge_eval",
+    branch_id=f"knowledge_eval_{uuid.uuid4().hex[:8]}",  # 如 "knowledge_eval_1c5fffaf"
+    max_turns=config.side_branch_max_turns               # 默认 5
+)
+```
+
+`trigger_event` 记录在 `trace.context["active_side_branch"]["trigger_event"]` 中,侧分支退出后写入 `evaluated_at_trigger`。
+
+### 评估 Prompt 结构
+
+完整实现见 `agent/core/runner.py:_build_knowledge_eval_prompt`,结构如下:
+
+```
+你是知识评估助手。请评估以下知识在本次任务执行中的实际效果。
+
+## 当前任务(Mission)       ← trace.task
+## 当前 Goal                ← goal_tree.current 的 description
+## 待评估知识列表            ← 所有 eval_result == null 的条目
+  - knowledge_id / task / content / injected_at_sequence / goal_id
+## 评估维度                  ← helpfulness + relevance
+## 评估分类                  ← 5 个 eval_status 选项
+## 输出格式                  ← JSON
+```
+
+> Prompt 中**不包含消息历史**。LLM 依据对话上下文中已有的执行过程作出判断。
+
+### 评估输出格式
+
+LLM 直接输出 JSON,**无需调用工具**:
+
+```json
+{
+  "evaluations": [
+    {
+      "knowledge_id": "knowledge-20260305-a1b2",
+      "eval_status": "helpful",
+      "reason": "1-2句评估理由"
+    }
+  ]
+}
+```
+
+### 即时写入机制(`agent/core/runner.py:_agent_loop`)
+
+每次 LLM 回复后立即尝试解析,三种策略依次降级:整体解析 → ` ```json ` 代码块 → 正则裸对象。
+
+```
+LLM 输出评估 JSON
+  ↓
+解析成功 → 立即调用 store.update_knowledge_evaluation() 写入每条评估结果
+  ↓
+侧分支达到退出条件(无工具调用 或 超过 max_turns)→ 恢复主路径
+```
+
+解析失败时记录日志,不中断主流程。
+
+---
+
+## 数据流
+
+```
+知识注入(agent/trace/goal_tool.py:inject_knowledge_for_goal)
+  ↓
+写入 knowledge_log.json(eval_result=null)
+  ↓
+  ┌─────────────────────────────────────────────┐
+  │  触发点 A:Goal 完成(goal_completion)       │
+  │  触发点 B:压缩执行前(compression)          │
+  │  触发点 C:任务自然结束(task_completion)    │
+  └─────────────────────────────────────────────┘
+  ↓
+Runner 进入 knowledge_eval 侧分支
+  ↓
+LLM 直接输出 JSON 评估结果(无工具调用)
+  ↓
+Runner 每轮即时解析并写入 knowledge_log.json
+  ↓
+侧分支退出 → 恢复主路径
+```
+
+---
+
+## 与现有系统的集成点
+
+| 集成位置 | 文件 | 说明 |
+|---|---|---|
+| 知识注入时写 log | `agent/trace/goal_tool.py:inject_knowledge_for_goal` | `goal(focus=...)` 触发知识搜索后写入 `knowledge_log.json` |
+| Goal 完成时设置标志 | `agent/trace/store.py:update_goal` | 设置 `trace.context["pending_knowledge_eval"]` 标志 |
+| 主循环检测 Goal 完成标志 | `agent/core/runner.py:_agent_loop` | 每轮迭代开头检测标志,触发 `["knowledge_eval"]` 侧分支 |
+| 压缩前触发评估 | `agent/core/runner.py:_manage_context_usage` | 压缩前检查 pending,先评估再压缩 |
+| 任务结束兜底 | `agent/core/runner.py:_agent_loop` | 任务退出前检查 pending,强制触发评估 |
+| 侧分支类型扩展 | `agent/trace/models.py:Message.branch_type` | Literal 中包含 `"knowledge_eval"` |
+| 即时写入评估结果 | `agent/core/runner.py:_agent_loop` | 存储 assistant 消息后即时解析 JSON 并写入 |
+| Log 文件管理 | `agent/trace/store.py` | `append_knowledge_entry` / `update_knowledge_evaluation` / `get_pending_knowledge_entries` |