فهرست منبع

refactor: unified msg list build after compression & multi-owner filter in knowledge inject

Talegorithm 2 ساعت پیش
والد
کامیت
a2c784f011

+ 1 - 0
agent/core/prompts/__init__.py

@@ -31,6 +31,7 @@ from agent.core.prompts.compression import (
     COMPRESSION_EVAL_PROMPT_TEMPLATE,
     SUMMARY_HEADER_TEMPLATE,
     build_compression_eval_prompt,
+    build_single_turn_prompt,
     build_summary_header,
 )
 

+ 21 - 0
agent/core/prompts/compression.py

@@ -28,6 +28,23 @@ COMPRESSION_PROMPT_TEMPLATE = """请对以上对话历史进行压缩总结。
 # 保留旧名以兼容 compaction.py 的调用
 COMPRESSION_EVAL_PROMPT_TEMPLATE = COMPRESSION_PROMPT_TEMPLATE
 
+SINGLE_TURN_PROMPT = """请对以上对话历史进行压缩总结。
+
+### 摘要要求
+1. 保留关键决策、结论和产出(如创建的文件、修改的代码、得出的分析结论)
+2. 保留重要的上下文(如用户的要求、约束条件、之前的讨论结果)
+3. 省略中间探索过程、重复的工具调用细节
+4. 使用结构化格式(标题 + 要点 + 相关资源引用,若有)
+5. 控制在 2000 字以内
+
+当前 GoalTree 状态:
+{goal_tree_prompt}
+
+格式要求:
+[[SUMMARY]]
+(此处填写结构化的摘要内容)
+"""
+
 SUMMARY_HEADER_TEMPLATE = """## 对话历史摘要(自动压缩)
 
 {summary_text}
@@ -48,5 +65,9 @@ def build_compression_eval_prompt(
     )
 
 
+def build_single_turn_prompt(goal_tree_prompt: str) -> str:
+    return SINGLE_TURN_PROMPT.format(goal_tree_prompt=goal_tree_prompt)
+
+
 def build_summary_header(summary_text: str) -> str:
     return SUMMARY_HEADER_TEMPLATE.format(summary_text=summary_text)

+ 90 - 149
agent/core/runner.py

@@ -788,6 +788,7 @@ class AgentRunner:
             config.force_side_branch = ["reflection", "compression"]
             return history, head_seq, sequence, True
 
+        # 以下为未启用反思、需要压缩的情况,直接进行level 1压缩,并检查是否需要进行level 2压缩(进入侧分支)
         # Level 1 压缩:Goal 完成压缩
         if config.goal_compression != "none" and self.trace_store and goal_tree:
             if head_seq > 0:
@@ -854,15 +855,15 @@ class AgentRunner:
         history: List[Dict],
         goal_tree: Optional[GoalTree],
         config: RunConfig,
-        sequence: int,
-        start_head_seq: int,
-    ) -> Tuple[List[Dict], int, int]:
-        """单次 LLM 调用压缩(fallback 方案)"""
+    ) -> str:
+        """单次 LLM 调用生成压缩摘要,返回 summary 文本"""
 
-        logger.info("执行单次 LLM 压缩(fallback)")
+        logger.info("执行单次 LLM 压缩")
 
-        # 构建压缩 prompt
-        compress_prompt = build_compression_prompt(goal_tree)
+        # 构建压缩 prompt(使用 SINGLE_TURN_PROMPT)
+        from agent.core.prompts import build_single_turn_prompt
+        goal_prompt = goal_tree.to_prompt(include_summary=True) if goal_tree else ""
+        compress_prompt = build_single_turn_prompt(goal_prompt)
         compress_messages = list(history) + [
             {"role": "user", "content": compress_prompt}
         ]
@@ -889,32 +890,7 @@ class AgentRunner:
                 summary_text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):
             ].strip()
 
-        if not summary_text:
-            logger.warning("单次压缩未返回有效内容,跳过压缩")
-            return history, start_head_seq, sequence
-
-        # 创建 summary 消息
-        from agent.core.prompts import build_summary_header
-        summary_msg = Message.create(
-            trace_id=trace_id,
-            role="user",
-            sequence=sequence,
-            parent_sequence=start_head_seq,
-            branch_type=None,  # 主路径
-            content=build_summary_header(summary_text),
-        )
-
-        if self.trace_store:
-            await self.trace_store.add_message(summary_msg)
-
-        new_history = self._rebuild_history_after_compression(
-            history, summary_msg.to_llm_dict(), label="单次压缩"
-        )
-
-        new_head_seq = sequence
-        sequence += 1
-
-        return new_history, new_head_seq, sequence
+        return summary_text
 
     async def _agent_loop(
         self,
@@ -938,13 +914,14 @@ class AgentRunner:
         if trace.context.get("active_side_branch"):
             side_branch_data = trace.context["active_side_branch"]
             branch_id = side_branch_data["branch_id"]
+            start_sequence = side_branch_data["start_sequence"]
 
-            # 从数据库查询侧分支消息
+            # 从数据库查询侧分支消息(按 sequence 范围)
             if self.trace_store:
                 all_messages = await self.trace_store.get_trace_messages(trace_id)
                 side_messages = [
                     m for m in all_messages
-                    if m.branch_id == branch_id
+                    if m.sequence >= start_sequence
                 ]
 
                 # 恢复侧分支上下文
@@ -1206,115 +1183,78 @@ class AgentRunner:
             if side_branch_ctx:
                 # 计算侧分支已执行的轮次
                 turns_in_branch = iteration - side_branch_ctx.start_iteration
+                should_exit = turns_in_branch >= side_branch_ctx.max_turns or not tool_calls
 
-                # 检查是否达到最大轮次
                 if turns_in_branch >= side_branch_ctx.max_turns:
                     logger.warning(
                         f"侧分支 {side_branch_ctx.type} 达到最大轮次 "
                         f"{side_branch_ctx.max_turns},强制退出"
                     )
 
-                    if side_branch_ctx.type == "compression":
-                        # 压缩侧分支:fallback 到单次 LLM 调用
-                        logger.info("Fallback 到单次 LLM 压缩")
-
-                        # 清除侧分支状态
-                        trace.context.pop("active_side_branch", None)
-                        if self.trace_store:
-                            await self.trace_store.update_trace(
-                                trace_id, context=trace.context
-                            )
-
-                        # 恢复到侧分支开始前的 history
-                        if self.trace_store:
-                            main_path_messages = await self.trace_store.get_main_path_messages(
-                                trace_id, side_branch_ctx.start_head_seq
-                            )
-                            history = [m.to_llm_dict() for m in main_path_messages]
-
-                        # 执行单次 LLM 压缩
-                        history, head_seq, sequence = await self._single_turn_compress(
-                            trace_id, history, goal_tree, config, sequence,
-                            side_branch_ctx.start_head_seq
+                if should_exit and side_branch_ctx.type == "compression":
+                    # === 压缩侧分支退出(超时 + 正常完成统一处理)===
+                    summary_text = ""
+
+                    # 1. 从当前回复提取
+                    if response_content:
+                        if "[[SUMMARY]]" in response_content:
+                            summary_text = response_content[
+                                response_content.index("[[SUMMARY]]") + len("[[SUMMARY]]"):
+                            ].strip()
+                        elif response_content.strip():
+                            summary_text = response_content.strip()
+
+                    # 2. 从持久化存储按 sequence 范围查询
+                    if not summary_text and self.trace_store:
+                        all_messages = await self.trace_store.get_trace_messages(trace_id)
+                        side_messages = [
+                            m for m in all_messages
+                            if m.sequence >= side_branch_ctx.start_sequence
+                        ]
+                        for msg in reversed(side_messages):
+                            if msg.role == "assistant" and isinstance(msg.content, dict):
+                                text = msg.content.get("text", "")
+                                if "[[SUMMARY]]" in text:
+                                    summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
+                                    break
+                                elif text:
+                                    summary_text = text
+                                    break
+
+                    # 3. 单次 LLM 调用
+                    if not summary_text:
+                        logger.warning("侧分支未生成有效 summary,fallback 到单次 LLM 压缩")
+                        pre_branch_history = history[:side_branch_ctx.start_history_length]
+                        summary_text = await self._single_turn_compress(
+                            trace_id, pre_branch_history, goal_tree, config,
                         )
 
-                        # 清除强制侧分支配置
-                        config.force_side_branch = None
-
-                        side_branch_ctx = None
-                        continue
-
-                    elif side_branch_ctx.type == "reflection":
-                        # 反思侧分支:直接退出,不管结果
-                        logger.info("反思侧分支超时,直接退出")
-
-                        # 清除侧分支状态
-                        trace.context.pop("active_side_branch", None)
-
-                        # 队列中如果还有侧分支(如 compression),保持;否则清空
-                        if not config.force_side_branch or len(config.force_side_branch) == 0:
-                            config.force_side_branch = None
-                            logger.info("反思超时,队列为空")
-
-                        if self.trace_store:
-                            await self.trace_store.update_trace(
-                                trace_id, context=trace.context
-                            )
-
-                        # 恢复到侧分支开始前的 history
-                        if self.trace_store:
-                            main_path_messages = await self.trace_store.get_main_path_messages(
-                                trace_id, side_branch_ctx.start_head_seq
-                            )
-                            history = [m.to_llm_dict() for m in main_path_messages]
-                            head_seq = side_branch_ctx.start_head_seq
-
-                        side_branch_ctx = None
-                        continue
-
-                # 检查是否无工具调用(侧分支完成)
-                if not tool_calls:
-                    logger.info(f"侧分支 {side_branch_ctx.type} 完成(无工具调用)")
-
-                    # 提取结果
-                    if side_branch_ctx.type == "compression":
-                        # 从数据库查询侧分支消息并提取 summary
-                        summary_text = ""
-                        if self.trace_store:
-                            all_messages = await self.trace_store.get_trace_messages(trace_id)
-                            side_messages = [
-                                m for m in all_messages
-                                if m.branch_id == side_branch_ctx.branch_id
-                            ]
-
-                            for msg in side_messages:
-                                if msg.role == "assistant" and isinstance(msg.content, dict):
-                                    text = msg.content.get("text", "")
-                                    if "[[SUMMARY]]" in text:
-                                        summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
-                                        break
-                                    elif text:
-                                        summary_text = text
-
-                        if not summary_text:
-                            logger.warning("侧分支未生成有效 summary,使用默认")
-                            summary_text = "压缩完成"
-
-                        # 创建主路径的 summary 消息(末尾追加详细 GoalTree)
+                    # 创建主路径 summary 消息并重建 history
+                    if summary_text:
                         from agent.core.prompts import build_summary_header
                         summary_content = build_summary_header(summary_text)
 
-                        # 追加详细 GoalTree(压缩后立即注入)
                         if goal_tree and goal_tree.goals:
                             goal_tree_detail = goal_tree.to_prompt(include_summary=True)
                             summary_content += f"\n\n## Current Plan\n\n{goal_tree_detail}"
 
+                        # 找第一条 user message 的 sequence 作为 parent
+                        # 续跑时 get_main_path_messages 沿 parent 链回溯,
+                        # 指向 first_user 可以跳过所有被压缩的中间消息
+                        first_user_seq = None
+                        if self.trace_store:
+                            all_msgs = await self.trace_store.get_trace_messages(trace_id)
+                            for m in all_msgs:
+                                if m.role == "user":
+                                    first_user_seq = m.sequence
+                                    break
+
                         summary_msg = Message.create(
                             trace_id=trace_id,
                             role="user",
                             sequence=sequence,
-                            parent_sequence=side_branch_ctx.start_head_seq,
-                            branch_type=None,  # 回到主路径
+                            parent_sequence=first_user_seq,
+                            branch_type=None,
                             content=summary_content,
                         )
 
@@ -1324,41 +1264,42 @@ class AgentRunner:
                         history = self._rebuild_history_after_compression(
                             history, summary_msg.to_llm_dict(), label="压缩侧分支"
                         )
-
                         head_seq = sequence
                         sequence += 1
+                    else:
+                        logger.error("所有压缩方案均未生成有效 summary,跳过压缩")
 
-                        # 清除侧分支队列
-                        config.force_side_branch = None
-
-                    elif side_branch_ctx.type == "reflection":
-                        # 反思侧分支:直接恢复主路径
-                        logger.info("反思侧分支完成")
+                    # 清理
+                    trace.context.pop("active_side_branch", None)
+                    config.force_side_branch = None
+                    if self.trace_store:
+                        await self.trace_store.update_trace(
+                            trace_id, context=trace.context, head_sequence=head_seq,
+                        )
+                    side_branch_ctx = None
+                    continue
 
-                        if self.trace_store:
-                            main_path_messages = await self.trace_store.get_main_path_messages(
-                                trace_id, side_branch_ctx.start_head_seq
-                            )
-                            history = [m.to_llm_dict() for m in main_path_messages]
-                            head_seq = side_branch_ctx.start_head_seq
+                elif should_exit and side_branch_ctx.type == "reflection":
+                    # === 反思侧分支退出(超时 + 正常完成统一处理)===
+                    logger.info("反思侧分支退出")
 
-                        # 队列中如果还有侧分支,保持 force_side_branch;否则清空
-                        if not config.force_side_branch or len(config.force_side_branch) == 0:
-                            config.force_side_branch = None
-                            logger.info("反思完成,队列为空")
+                    # 恢复主路径
+                    if self.trace_store:
+                        main_path_messages = await self.trace_store.get_main_path_messages(
+                            trace_id, side_branch_ctx.start_head_seq
+                        )
+                        history = [m.to_llm_dict() for m in main_path_messages]
+                        head_seq = side_branch_ctx.start_head_seq
 
-                    # 清除侧分支状态
+                    # 清
                     trace.context.pop("active_side_branch", None)
+                    if not config.force_side_branch or len(config.force_side_branch) == 0:
+                        config.force_side_branch = None
+                        logger.info("反思完成,队列为空")
                     if self.trace_store:
                         await self.trace_store.update_trace(
-                            trace_id,
-                            context=trace.context,
-                            head_sequence=head_seq,
+                            trace_id, context=trace.context, head_sequence=head_seq,
                         )
-
-                    # 注意:不在这里清除 force_side_branch,因为反思侧分支可能已经设置了下一个侧分支
-                    # force_side_branch 的清除由各个分支类型自己处理
-
                     side_branch_ctx = None
                     continue
 

+ 2 - 2
agent/tools/builtin/knowledge.py

@@ -41,7 +41,7 @@ class KnowledgeConfig:
     default_tags: Optional[Dict[str, str]] = None      # 默认 tags(会与工具调用参数合并)
     default_scopes: Optional[List[str]] = None         # 默认 scopes(空则用 ["org:cybertogether"])
     default_search_types: Optional[List[str]] = None   # 默认搜索类型过滤
-    default_search_owner: str = ""                     # 默认搜索 owner 过滤(空则不过滤)
+    default_search_owner: str = ""                     # 默认搜索 owner 过滤(空则不过滤,支持多个owner用逗号分隔,如 "user1@example.com,user2@example.com"
 
     def get_reflect_prompt(self) -> str:
         """压缩时反思 prompt"""
@@ -106,7 +106,7 @@ async def knowledge_search(
         top_k: 返回数量(默认 5)
         min_score: 最低评分过滤(默认 3)
         types: 按类型过滤(user_profile/strategy/tool/usecase/definition/plan)
-        owner: 按所有者过滤(可选)
+        owner: 按所有者过滤(可选,支持多个owner用逗号分隔的字符串,如 "user1@example.com,user2@example.com"
         context: 工具上下文
 
     Returns:

+ 1 - 1
examples/research/config.py

@@ -36,7 +36,7 @@ RUN_CONFIG = RunConfig(
         default_tags={"project": "research", "domain": "ai_agent"},  # 默认 tags(会与工具调用参数合并)
         default_scopes=["org:cybertogether"],  # 默认 scopes
         default_search_types=[],  # 默认搜索类型过滤
-        default_search_owner=""  # 默认搜索 owner 过滤(空则不过滤)
+        default_search_owner=""  # 默认搜索 owner 过滤(空则不过滤,支持多个owner用逗号分隔,如 "user1@example.com,user2@example.com"
     )
 )
 

+ 2 - 2
knowhub/docs/knowledge-management.md

@@ -108,7 +108,7 @@ KnowHub 采用 Milvus Lite 单一存储架构(详见 `knowhub/docs/decisions.m
 - **id**: 唯一标识,格式 `knowledge-{timestamp}-{random}`
 - **message_id**: 来源 Message ID(用于精确溯源到具体消息)
 - **types**: 知识类型数组(可多选)
-  - `user_profile`: 用户偏好、习惯、背景
+  - `user_profile`: 特定用户偏好、习惯、背景
   - `strategy`: 执行经验(从反思中获得)
   - `tool`: 工具使用方法、优缺点、代码示例
   - `usecase`: 用户背景、方案、步骤、效果
@@ -459,7 +459,7 @@ return ToolResult(
 - `top_k`: 返回数量(默认 5)
 - `min_score`: 最低评分过滤(默认 3)
 - `types`: 按类型过滤(可选,逗号分隔)
-- `owner`: 按所有者过滤(可选)
+- `owner`: 按所有者过滤(可选,支持多个owner用逗号分隔,如 "user1@example.com,user2@example.com"
 
 **检索流程**:
 

+ 14 - 2
knowhub/server.py

@@ -612,7 +612,13 @@ async def search_knowledge_api(
             for t in type_list:
                 filters.append(f'array_contains(types, "{t}")')
         if owner:
-            filters.append(f'owner == "{owner}"')
+            owner_list = [o.strip() for o in owner.split(',') if o.strip()]
+            if len(owner_list) == 1:
+                filters.append(f'owner == "{owner_list[0]}"')
+            else:
+                # 多个owner用OR连接
+                owner_filters = [f'owner == "{o}"' for o in owner_list]
+                filters.append(f'({" or ".join(owner_filters)})')
 
         # 添加 min_score 过滤
         filters.append(f'eval["score"] >= {min_score}')
@@ -747,7 +753,13 @@ def list_knowledge(
             filters.append(f'array_contains(scopes, "{scopes}")')
 
         if owner:
-            filters.append(f'owner like "%{owner}%"')
+            owner_list = [o.strip() for o in owner.split(',') if o.strip()]
+            if len(owner_list) == 1:
+                filters.append(f'owner like "%{owner_list[0]}%"')
+            else:
+                # 多个owner用OR连接
+                owner_filters = [f'owner like "%{o}%"' for o in owner_list]
+                filters.append(f'({" or ".join(owner_filters)})')
 
         # tags 支持多个,用 AND 连接(使用 tag_keys 数组进行高效筛选)
         if tags: