Просмотр исходного кода

feat: inject_params & milvus in knowhub

Talegorithm 1 неделя назад
Родитель
Сommit
df9fd5e597

+ 4 - 4
README.md

@@ -192,10 +192,10 @@ run_config = RunConfig(
 )
 ```
 
-**参数注入规则**:
-- `owner`:隐藏参数,LLM 不可见,框架自动注入
-- `tags`:框架默认值 + LLM 传递的值合并
-- `scopes`:框架默认值 + LLM 传递的值合并
+**参数注入规则**(通过框架 `inject_params` 机制实现,详见 `agent/docs/tools.md`)
+- `owner`:隐藏参数,LLM 不可见,框架自动注入(`mode: default`)
+- `tags`:LLM 可追加新 key,框架默认 key 不可被覆盖(`mode: merge`)
+- `scopes`:LLM 可追加,与框架默认值合并去重(`mode: merge`)
 
 ### 知识工具
 

+ 29 - 30
agent/core/runner.py

@@ -899,11 +899,10 @@ class AgentRunner:
                         reasons=["系统自动创建:Agent 未显式创建目标"],
                         parent_id=None
                     )
-                    goal_tree.focus(goal_tree.goals[0].id)
                     if self.trace_store:
                         await self.trace_store.add_goal(trace_id, goal_tree.goals[0])
                         await self.trace_store.update_goal_tree(trace_id, goal_tree)
-                    logger.info(f"自动创建 root goal: {goal_tree.goals[0].id}")
+                    logger.info(f"自动创建 root goal: {goal_tree.goals[0].id}(未自动 focus,等待模型决定)")
                 else:
                     logger.debug(f"[Auto Root Goal] 检测到 goal 工具调用,跳过自动创建")
 
@@ -984,27 +983,21 @@ class AgentRunner:
                     elif tool_args is None:
                         tool_args = {}
 
-                    # 注入知识管理工具的默认字段
-                    if tool_name == "knowledge_save":
-                        tool_args.setdefault("owner", config.knowledge.get_owner(config.agent_id))
-                        if config.knowledge.default_tags:
-                            existing_tags = tool_args.get("tags") or {}
-                            merged_tags = {**config.knowledge.default_tags, **existing_tags}
-                            tool_args["tags"] = merged_tags
-                        if config.knowledge.default_scopes:
-                            existing_scopes = tool_args.get("scopes") or []
-                            tool_args["scopes"] = existing_scopes + config.knowledge.default_scopes
-                    elif tool_name == "knowledge_search":
-                        if config.knowledge.default_search_types and "types" not in tool_args:
-                            tool_args["types"] = config.knowledge.default_search_types
-                        if config.knowledge.default_search_owner and "owner" not in tool_args:
-                            tool_args["owner"] = config.knowledge.default_search_owner
-
                     # 记录工具调用(INFO 级别,显示参数)
                     args_str = json.dumps(tool_args, ensure_ascii=False)
                     args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
                     logger.info(f"[Tool Call] {tool_name}({args_display})")
 
+                    # 构建知识管理注入值
+                    inject_values = None
+                    if config.knowledge:
+                        inject_values = {
+                            "owner": config.knowledge.get_owner(config.uid or "agent"),
+                            "tags": config.knowledge.default_tags,
+                            "scopes": config.knowledge.default_scopes,
+                            "types": config.knowledge.default_search_types,
+                        }
+
                     tool_result = await self.tools.execute(
                         tool_name,
                         tool_args,
@@ -1015,7 +1008,9 @@ class AgentRunner:
                             "goal_id": current_goal_id,
                             "runner": self,
                             "goal_tree": goal_tree,
-                        }
+                            "knowledge_config": config.knowledge,
+                        },
+                        inject_values=inject_values,
                     )
 
                     # 如果是 goal 工具,记录执行后的状态
@@ -1306,21 +1301,20 @@ class AgentRunner:
                 tool_args.setdefault("source_category", "exp")
                 tool_args.setdefault("message_id", trace_id)
 
-                # 注入知识管理默认字段
-                tool_args.setdefault("owner", config.knowledge.get_owner(config.agent_id))
-                if config.knowledge.default_tags:
-                    existing_tags = tool_args.get("tags") or {}
-                    merged_tags = {**config.knowledge.default_tags, **existing_tags}
-                    tool_args["tags"] = merged_tags
-                if config.knowledge.default_scopes:
-                    tool_args.setdefault("scopes", config.knowledge.default_scopes)
-
                 try:
                     await self.tools.execute(
                         "knowledge_save",
                         tool_args,
                         uid=config.uid or "",
-                        context={"store": self.trace_store, "trace_id": trace_id},
+                        context={
+                            "store": self.trace_store,
+                            "trace_id": trace_id,
+                        },
+                        inject_values={
+                            "owner": config.knowledge.get_owner(config.uid or "agent"),
+                            "tags": config.knowledge.default_tags,
+                            "scopes": config.knowledge.default_scopes,
+                        } if config.knowledge else None,
                     )
                     saved_count += 1
                 except Exception as e:
@@ -1600,8 +1594,8 @@ class AgentRunner:
         if goal_tree and goal_tree.goals:
             parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
 
-            # 检测 focus 在有子节点的父目标上:提醒模型 focus 到具体子目标
             if goal_tree.current_id:
+                # 检测 focus 在有子节点的父目标上:提醒模型 focus 到具体子目标
                 children = goal_tree.get_children(goal_tree.current_id)
                 pending_children = [c for c in children if c.status in ("pending", "in_progress")]
                 if pending_children:
@@ -1612,6 +1606,11 @@ class AgentRunner:
                         f"**提醒**:当前焦点在父目标上,建议用 `goal(focus=\"...\")` "
                         f"切换到具体子目标(如 {child_ids})再执行。"
                     )
+            else:
+                # 无焦点:提醒模型 focus
+                parts.append(
+                    "**提醒**:当前没有焦点目标。请用 `goal(focus=\"...\")` 选择一个目标开始执行。"
+                )
 
         # Active Collaborators
         collaborators = trace.context.get("collaborators", [])

+ 39 - 22
agent/docs/tools.md

@@ -60,32 +60,49 @@ async def my_tool(arg: str, context: Optional[ToolContext] = None) -> ToolResult
 
 1. **业务参数**:LLM 可见,由 LLM 填写(如 `query`, `limit`)
 2. **隐藏参数**:LLM 不可见,框架自动注入(如 `context`, `uid`)
-3. **注入参数**:LLM 可见但有默认值,框架自动注入默认值(如 `owner`, `tags`)
+3. **注入参数**:LLM 可见,框架自动注入默认值或与 LLM 值合并(如 `owner`, `tags`)
 
 ```python
 @tool(
-    hidden_params=["context", "uid"],  # 不生成 schema,LLM 看不到
-    inject_params={                     # 自动注入默认值
-        "owner": lambda ctx: ctx.config.knowledge.get_owner(),
-        "tags": lambda ctx, args: {**ctx.config.default_tags, **args.get("tags", {})},
+    hidden_params=["context", "owner"],  # 不生成 schema,LLM 看不到
+    inject_params={                       # 声明注入规则
+        "owner": {"mode": "default", "key": "knowledge_config.owner"},
+        "tags":  {"mode": "merge",   "key": "knowledge_config.default_tags"},
+        "scopes": {"mode": "merge",  "key": "knowledge_config.default_scopes"},
     }
 )
 async def knowledge_save(
     task: str,                          # 业务参数:LLM 填写
     content: str,                       # 业务参数:LLM 填写
     types: List[str],                   # 业务参数:LLM 填写
-    tags: Optional[Dict] = None,        # 注入参数:LLM 可填,框架提供默认值
-    owner: Optional[str] = None,        # 注入参数:LLM 可填,框架提供默认值
+    tags: Optional[Dict] = None,        # 注入参数:LLM 可填,框架合并默认值
+    scopes: Optional[List] = None,      # 注入参数:LLM 可填,框架合并默认值
+    owner: Optional[str] = None,        # 隐藏参数:LLM 看不到,框架注入
     context: Optional[ToolContext] = None,  # 隐藏参数:LLM 看不到
-    uid: str = "",                      # 隐藏参数:LLM 看不到
 ) -> ToolResult:
     """保存知识到知识库"""
     ...
 ```
 
+**inject_params 声明格式**:
+
+```python
+inject_params={
+    "param_name": {
+        "mode": "default" | "merge",  # 注入模式
+        "key": "config_obj.field",    # 从 context 中取值的路径
+    }
+}
+```
+
+- `mode: "default"`:LLM 未提供时注入框架值
+- `mode: "merge"`:框架值与 LLM 值合并。dict 按 key 合并(框架 key 不可被覆盖,LLM 可追加新 key);list 合并去重
+
+**值的来源**:通过 `key` 指定从 `context` 中取值的路径(如 `"knowledge_config.default_tags"` 表示 `context["knowledge_config"].default_tags`)。runner 在调用 `execute()` 时将配置对象放入 context,框架根据 key 路径自动取值。
+
 **注入时机**:
 - Schema 生成时:跳过 `hidden_params`,不暴露给 LLM
-- 工具执行前:注入 `hidden_params` 和 `inject_params` 的默认值
+- 工具执行前:注入 `hidden_params` 和 `inject_params`
 
 **实现位置**:
 - Schema 生成:`agent/tools/schema.py:SchemaGenerator.generate()`
@@ -136,18 +153,20 @@ async def search_notes(
 
 ```python
 @tool(
-    hidden_params=["context"],
+    hidden_params=["context", "owner"],
     inject_params={
-        "owner": lambda ctx: ctx.config.knowledge.get_owner(),
-        "tags": lambda ctx, args: {**ctx.config.default_tags, **args.get("tags", {})},
+        "owner": {"mode": "default", "key": "knowledge_config.owner"},
+        "tags":  {"mode": "merge",   "key": "knowledge_config.default_tags"},
+        "scopes": {"mode": "merge",  "key": "knowledge_config.default_scopes"},
     }
 )
 async def knowledge_save(
     task: str,
     content: str,
     types: List[str],
-    tags: Optional[Dict] = None,  # LLM 可填,框架提供默认值
-    owner: Optional[str] = None,  # LLM 可填,框架提供默认值
+    tags: Optional[Dict] = None,  # LLM 可填,框架合并默认值
+    scopes: Optional[List] = None,  # LLM 可填,框架合并默认值
+    owner: Optional[str] = None,  # LLM 看不到,框架注入
     context: Optional[ToolContext] = None
 ) -> ToolResult:
     """
@@ -157,19 +176,17 @@ async def knowledge_save(
         task: 任务描述
         content: 知识内容
         types: 知识类型
-        tags: 业务标签(可选,默认值)
-        owner: 所有者(可选,有默认值)
+        tags: 业务标签(可选,框架合并默认值)
+        scopes: 可见范围(可选,框架合并默认值)
     """
-    # owner 和 tags 如果 LLM 未提供,框架会注入默认值
     ...
 ```
 
 **注入规则**:
-- `inject_params` 的 value 可以是:
-  - `lambda ctx: ...` - 从 context 计算
-  - `lambda ctx, args: ...` - 从 context 和已有参数计算
-  - 字符串 - 直接使用该值
-- 注入时机:工具执行前,使用 `setdefault` 注入(不覆盖 LLM 提供的值)
+- `inject_params` 的 value 是一个 dict,包含:
+  - `mode`: `"default"`(LLM 未提供则注入)或 `"merge"`(与 LLM 值合并)
+  - `key`: 从 context 中取值的路径(如 `"knowledge_config.default_tags"`)
+- 参数同时在 `hidden_params` 中时,LLM 不可见,框架直接注入
 
 ### 带 UI 元数据
 

+ 10 - 7
agent/tools/builtin/knowledge.py

@@ -16,7 +16,13 @@ logger = logging.getLogger(__name__)
 KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:8000")
 
 
-@tool(hidden_params=["context"])
+@tool(
+    hidden_params=["context"],
+    inject_params={
+        "types": {"mode": "default"},
+        "owner": {"mode": "default"},
+    }
+)
 async def knowledge_search(
     query: str,
     top_k: int = 5,
@@ -98,12 +104,9 @@ async def knowledge_search(
 @tool(
     hidden_params=["context", "owner"],
     inject_params={
-        "owner": lambda ctx: ctx.get("knowledge_config", {}).get("owner") if ctx else None,
-        "tags": lambda ctx, args: {
-            **ctx.get("knowledge_config", {}).get("default_tags", {}),
-            **(args.get("tags") or {})
-        } if ctx else args.get("tags"),
-        "scopes": lambda ctx, args: (args.get("scopes") or []) + (ctx.get("knowledge_config", {}).get("default_scopes") or []) if ctx else args.get("scopes"),
+        "owner": {"mode": "default"},
+        "tags": {"mode": "merge"},
+        "scopes": {"mode": "merge"},
     }
 )
 async def knowledge_save(

+ 32 - 18
agent/tools/registry.py

@@ -229,26 +229,40 @@ class ToolRegistry:
 			if "context" in hidden_params and "context" in sig.parameters:
 				kwargs["context"] = context
 
-			# 注入默认值(inject_params)
+			# 注入参数(inject_params)
 			inject_params = tool_info.get("inject_params", {})
-			for param_name, injector in inject_params.items():
-				if param_name in sig.parameters:
-					# 如果 LLM 已提供值,不覆盖
+			for param_name, rule in inject_params.items():
+				if param_name not in sig.parameters:
+					continue
+
+				if not isinstance(rule, dict) or "mode" not in rule:
+					# 兼容旧格式:直接值或 callable
+					if param_name not in kwargs or kwargs[param_name] is None:
+						kwargs[param_name] = rule() if callable(rule) else rule
+					continue
+
+				mode = rule["mode"]
+				# 从 inject_values 中获取值
+				value = (inject_values or {}).get(param_name)
+
+				if value is None:
+					continue
+
+				if mode == "default":
+					# 默认值模式:LLM 未提供则注入
 					if param_name not in kwargs or kwargs[param_name] is None:
-						if callable(injector):
-							# 检查 injector 的参数数量
-							injector_sig = inspect.signature(injector)
-							if len(injector_sig.parameters) == 1:
-								# lambda ctx: ...
-								kwargs[param_name] = injector(context)
-							elif len(injector_sig.parameters) == 2:
-								# lambda ctx, args: ...
-								kwargs[param_name] = injector(context, kwargs)
-							else:
-								kwargs[param_name] = injector()
-						else:
-							# 直接使用值
-							kwargs[param_name] = injector
+						kwargs[param_name] = value
+				elif mode == "merge":
+					# 合并模式:框架值始终保留,LLM 可追加新内容
+					llm_value = kwargs.get(param_name)
+					if isinstance(value, dict):
+						# dict: LLM 追加新 key,同名 key 以框架值为准
+						kwargs[param_name] = {**(llm_value or {}), **value}
+					elif isinstance(value, list):
+						# list: 合并去重
+						kwargs[param_name] = list(set((llm_value or []) + value))
+					else:
+						kwargs[param_name] = value
 
 			# 执行函数
 			if inspect.iscoroutinefunction(func):

+ 83 - 36
agent/trace/goal_tool.py

@@ -4,14 +4,87 @@ Goal 工具 - 计划管理
 提供 goal 工具供 LLM 管理执行计划。
 """
 
+import logging
 from typing import Optional, List, TYPE_CHECKING
 
 from agent.tools import tool
 
 if TYPE_CHECKING:
-    from .goal_models import GoalTree
+    from .goal_models import GoalTree, Goal
     from .protocols import TraceStore
 
+logger = logging.getLogger(__name__)
+
+
+# ===== 知识注入 =====
+
+async def inject_knowledge_for_goal(
+    goal: "Goal",
+    tree: "GoalTree",
+    store: Optional["TraceStore"] = None,
+    trace_id: Optional[str] = None,
+    knowledge_config: Optional[dict] = None,
+) -> Optional[str]:
+    """
+    为指定 goal 注入相关知识。
+
+    Args:
+        goal: 目标对象
+        tree: GoalTree
+        store: TraceStore(用于持久化)
+        trace_id: Trace ID
+        knowledge_config: 知识管理配置(KnowledgeConfig 对象)
+
+    Returns:
+        注入结果描述(如 "📚 已注入 3 条相关知识"),无结果返回 None
+    """
+    # 检查是否启用知识注入
+    if knowledge_config and not getattr(knowledge_config, 'enable_injection', True):
+        logger.debug(f"[Knowledge Inject] 知识注入已禁用,跳过")
+        return None
+
+    try:
+        from agent.tools.builtin.knowledge import knowledge_search
+
+        logger.info(f"[Knowledge Inject] goal: {goal.id}, query: {goal.description[:80]}")
+
+        # 从配置中获取搜索参数
+        search_types = None
+        search_owner = None
+        if knowledge_config:
+            search_types = getattr(knowledge_config, 'default_search_types', None)
+            search_owner = getattr(knowledge_config, 'default_search_owner', None) or None
+
+        knowledge_result = await knowledge_search(
+            query=goal.description,
+            top_k=3,
+            min_score=3,
+            types=search_types,
+            owner=search_owner,
+            context=None
+        )
+
+        logger.debug(f"[Knowledge Inject] result type: {type(knowledge_result)}, metadata: {getattr(knowledge_result, 'metadata', None)}")
+
+        if knowledge_result.metadata and knowledge_result.metadata.get("items"):
+            goal.knowledge = knowledge_result.metadata["items"]
+            knowledge_count = len(goal.knowledge)
+            logger.info(f"[Knowledge Inject] 注入 {knowledge_count} 条知识到 goal {goal.id}")
+
+            if store and trace_id:
+                await store.update_goal_tree(trace_id, tree)
+
+            return f"📚 已注入 {knowledge_count} 条相关知识"
+        else:
+            goal.knowledge = []
+            logger.info(f"[Knowledge Inject] 未找到相关知识")
+            return None
+
+    except Exception as e:
+        logger.warning(f"[Knowledge Inject] 知识注入失败: {e}")
+        goal.knowledge = []
+        return None
+
 
 # ===== LLM 可调用的 goal 工具 =====
 
@@ -47,9 +120,10 @@ async def goal(
     if tree is None:
         return "错误:GoalTree 未初始化"
 
-    # 从 context 获取 store 和 trace_id
+    # 从 context 获取 store、trace_id 和 knowledge_config
     store = context.get("store") if context else None
     trace_id = context.get("trace_id") if context else None
+    knowledge_config = context.get("knowledge_config") if context else None
 
     return await goal_tool(
         tree=tree,
@@ -61,7 +135,8 @@ async def goal(
         under=under,
         done=done,
         abandon=abandon,
-        focus=focus
+        focus=focus,
+        knowledge_config=knowledge_config
     )
 
 
@@ -79,6 +154,7 @@ async def goal_tool(
     done: Optional[str] = None,
     abandon: Optional[str] = None,
     focus: Optional[str] = None,
+    knowledge_config: Optional[object] = None,
 ) -> str:
     """
     管理执行计划。
@@ -94,6 +170,7 @@ async def goal_tool(
         done: 完成当前目标,值为 summary
         abandon: 放弃当前目标,值为原因
         focus: 切换焦点到指定 ID
+        knowledge_config: 知识管理配置(KnowledgeConfig 对象)
 
     Returns:
         更新后的计划状态文本
@@ -136,33 +213,9 @@ async def goal_tool(
         changes.append(f"切换焦点: {display_id}. {goal.description}")
 
         # 自动注入知识
-        try:
-            from agent.tools.builtin.knowledge import knowledge_search
-
-            knowledge_result = await knowledge_search(
-                query=goal.description,
-                top_k=3,
-                min_score=3,
-                context=None
-            )
-
-            # 将知识保存到 goal 对象
-            if knowledge_result.metadata and knowledge_result.metadata.get("items"):
-                goal.knowledge = knowledge_result.metadata["items"]
-                knowledge_count = len(goal.knowledge)
-                changes.append(f"📚 已注入 {knowledge_count} 条相关知识")
-
-                # 持久化到 store
-                if store and trace_id:
-                    await store.update_goal_tree(trace_id, tree)
-            else:
-                goal.knowledge = []
-
-        except Exception as e:
-            # 知识注入失败不影响 focus 操作
-            import logging
-            logging.getLogger(__name__).warning(f"知识注入失败: {e}")
-            goal.knowledge = []
+        inject_msg = await inject_knowledge_for_goal(goal, tree, store, trace_id, knowledge_config)
+        if inject_msg:
+            changes.append(inject_msg)
 
     # 3. 处理 abandon(放弃当前目标)
     if abandon is not None:
@@ -229,12 +282,6 @@ async def goal_tool(
                 for goal in new_goals:
                     await store.add_goal(trace_id, goal)
 
-            # 如果没有焦点且添加了目标,自动 focus 到第一个新目标
-            if not tree.current_id and new_goals:
-                tree.focus(new_goals[0].id)
-                display_id = tree._generate_display_id(new_goals[0])
-                changes.append(f"自动切换焦点: {display_id}")
-
     # 将完整内存树状态(含 current_id)同步到存储,
     # 因为 store.add_goal / update_goal 各自从磁盘加载,不包含 focus 等内存变更
     if store and trace_id and changes:

+ 181 - 0
knowhub/docs/decisions.md

@@ -196,3 +196,184 @@ Server 零 LLM 成本。
 - 合并内容层的工程收益小于耦合成本
 
 **决策**:两个项目独立推进。唯一预留的接口:两边使用相同的 content hash 方案(sha256),未来如果整合,内容去重天然可行。等 KnowHub 验证"集体记忆"方向后再讨论整合。
+
+---
+
+## 13. 向量检索:Milvus Lite 单一存储架构
+
+**日期**:2026-03-09
+
+**背景**:现有检索方案使用 LLM 语义路由(gemini-2.0-flash-001),从所有知识中挑选候选。存在以下问题:
+- 每次检索都需要调用 LLM,成本和延迟较高
+- 无法利用向量相似度进行精确的语义匹配
+- 难以支持大规模知识库(需要将所有知识元数据传给 LLM)
+
+**方案对比**:
+
+| 方案 | 部署复杂度 | 性能 | 功能完整性 | 迁移成本 |
+|------|-----------|------|-----------|---------|
+| sqlite-vec | 低(单文件) | 中 | 基础向量检索 | 低 |
+| **Milvus Lite** | **低(pip install)** | **高** | **完整(标量过滤+向量检索)** | **中** |
+| Qdrant | 中(需 Docker) | 高 | 完整 | 低 |
+| 完整 Milvus | 高(多组件) | 极高 | 完整 | 高 |
+
+**决策**:采用 Milvus Lite 单一存储架构
+
+**为什么不用 SQLite + Milvus Lite 双存储?**
+- Milvus Lite 支持标量字段存储,可以存储所有知识数据
+- 维护两个数据库增加同步复杂度和一致性风险
+- Milvus Lite 数据也是本地文件存储,备份和迁移同样简单
+- 单一存储简化架构,降低维护成本
+
+**架构设计**:
+
+```
+┌─────────────────────────────────────────────────────┐
+│                   KnowHub Server                     │
+├─────────────────────────────────────────────────────┤
+│                                                      │
+│              ┌─────────────────────┐                │
+│              │   Milvus Lite       │                │
+│              │   (单一存储)         │                │
+│              ├─────────────────────┤                │
+│              │ knowledge 集合       │                │
+│              │ ├─ id (PK)          │                │
+│              │ ├─ embedding (向量)  │                │
+│              │ ├─ task             │                │
+│              │ ├─ content          │                │
+│              │ ├─ types (JSON)     │                │
+│              │ ├─ tags (JSON)      │                │
+│              │ ├─ scopes (JSON)    │                │
+│              │ ├─ owner            │                │
+│              │ ├─ resource_ids     │                │
+│              │ ├─ source (JSON)    │                │
+│              │ ├─ eval (JSON)      │                │
+│              │ ├─ created_at       │                │
+│              │ └─ updated_at       │                │
+│              └─────────────────────┘                │
+│                                                      │
+│              向量索引:HNSW (COSINE)                  │
+│              参数:M=16, efConstruction=200          │
+│                                                      │
+│  ┌──────────────────────────────────────────────┐  │
+│  │           检索流程                            │  │
+│  ├──────────────────────────────────────────────┤  │
+│  │ 1. 向量召回:Milvus 检索 top 3*k 候选        │  │
+│  │ 2. LLM 精排:Gemini 对候选重新排序           │  │
+│  │ 3. Fallback:LLM 失败时直接返回向量 top k    │  │
+│  └──────────────────────────────────────────────┘  │
+└─────────────────────────────────────────────────────┘
+```
+
+**检索流程**(向量召回 + LLM 精排):
+
+```python
+async def knowledge_search(query: str, filters: dict, top_k: int = 5):
+    # 1. 生成查询向量
+    query_embedding = await get_embedding(query)
+
+    # 2. 向量召回(快速、便宜)
+    candidates = await milvus_store.search(
+        embedding=query_embedding,
+        filters=filters,  # types, owner, scopes
+        limit=top_k * 3   # 召回 3*k 个候选
+    )
+
+    # 3. LLM 精排(准确、贵)
+    try:
+        ranked = await llm_rerank(
+            query=query,
+            candidates=candidates,
+            top_k=top_k
+        )
+        return ranked
+    except Exception as e:
+        # 4. Fallback:LLM 失败时直接返回向量 top k
+        logger.warning(f"LLM rerank failed: {e}, fallback to vector top-k")
+        return candidates[:top_k]
+
+async def llm_rerank(query: str, candidates: List[dict], top_k: int):
+    """使用 LLM 对候选进行精排"""
+    # 构造 prompt
+    candidates_text = "\n".join([
+        f"[{i+1}] ID: {c['id']}\nTask: {c['task']}\nContent: {c['content'][:200]}..."
+        for i, c in enumerate(candidates)
+    ])
+
+    prompt = f"""你是知识检索专家。根据用户查询,从候选知识中选出最相关的 {top_k} 条。
+
+用户查询:"{query}"
+
+候选知识:
+{candidates_text}
+
+请输出最相关的 {top_k} 个知识 ID,按相关性从高到低排序,用逗号分隔。
+只输出 ID,不要其他内容。"""
+
+    response = await openrouter_llm_call(
+        messages=[{"role": "user", "content": prompt}],
+        model="google/gemini-2.5-flash-lite"
+    )
+
+    # 解析 LLM 输出
+    selected_ids = parse_ids(response["content"])
+
+    # 按 LLM 排序返回
+    id_to_candidate = {c["id"]: c for c in candidates}
+    return [id_to_candidate[id] for id in selected_ids if id in id_to_candidate]
+```
+
+**Embedding 模型选择**:
+
+优先级:
+1. **OpenAI text-embedding-3-small**(推荐)
+   - 1536 维,性能好,成本低($0.02/1M tokens)
+   - 支持中英文
+2. **本地模型**(备选)
+   - paraphrase-multilingual-MiniLM-L12-v2
+   - 零成本,但需要本地计算资源
+
+**成本分析**:
+
+假设 1000 条知识,每条平均 200 tokens:
+
+| 操作 | 旧方案(纯 LLM 路由) | 新方案(向量召回 + LLM 精排) |
+|------|---------------------|---------------------------|
+| 每次检索 | 200k tokens → $0.04 | 召回:0 成本<br>精排:3k tokens → $0.0006 |
+| 1000 次检索 | $40 | $0.60 |
+| 节省 | - | **98.5%** |
+
+**迁移路径**:
+
+阶段 1:实现 Milvus Lite 存储(2-3 周)
+- 从 SQLite 迁移数据到 Milvus Lite
+- 实现向量召回 + LLM 精排
+- 保留旧 API 兼容性
+
+阶段 2:效果评估(1-2 个月)
+- 对比新旧方案的准确率和成本
+- 收集用户反馈
+- 调优召回倍数(3*k)和精排策略
+
+阶段 3:可能的演进方向
+- 方向 A:优化精排 prompt,提升准确率
+- 方向 B:引入混合检索(向量 + 关键词)
+- 方向 C:升级到完整 Milvus(如果数据量暴增)
+
+**实现位置**:
+- Milvus 封装:`knowhub/vector_store.py`
+- Embedding 生成:`knowhub/embeddings.py`
+- 检索逻辑:`knowhub/server.py:knowledge_search`
+- LLM 精排:`knowhub/server.py:llm_rerank`
+
+**优势**:
+1. 单一存储,架构简单
+2. 向量召回快速且便宜
+3. LLM 精排保证准确性
+4. Fallback 机制保证可用性
+5. 成本降低 98.5%
+
+**权衡**:
+1. 从 SQLite 迁移需要一次性工作
+2. Milvus Lite 的标量查询不如 SQL 灵活(但够用)
+3. 存储空间增加(向量数据)

+ 58 - 12
knowhub/docs/knowledge-management.md

@@ -24,6 +24,43 @@ Agent                           KnowHub Server
 └── resource 资源引用        →   GET /api/resource/{id}
 ```
 
+### 存储架构
+
+KnowHub 采用 Milvus Lite 单一存储架构(详见 `knowhub/docs/decisions.md#13`):
+
+```
+┌─────────────────────────────────────────────────────┐
+│                   KnowHub Server                     │
+├─────────────────────────────────────────────────────┤
+│                                                      │
+│              ┌─────────────────────┐                │
+│              │   Milvus Lite       │                │
+│              │   (单一存储)         │                │
+│              ├─────────────────────┤                │
+│              │ knowledge 集合       │                │
+│              │ - id                │                │
+│              │ - embedding (向量)   │                │
+│              │ - task/content      │                │
+│              │ - types/tags/scopes │                │
+│              │ - owner/eval/source │                │
+│              │ - resource_ids      │                │
+│              └─────────────────────┘                │
+│                                                      │
+│  检索流程:向量召回 → LLM 精排 → 返回 top k          │
+└─────────────────────────────────────────────────────┘
+```
+
+**Milvus Lite**:
+- 存储完整知识数据(所有字段)+ 向量
+- 提供高效的语义向量检索
+- 支持标量字段过滤和查询
+- 本地文件存储,部署简单
+
+实现位置:
+- Milvus 封装:`knowhub/vector_store.py`
+- Embedding 生成:`knowhub/embeddings.py`
+- 检索逻辑:`knowhub/server.py:knowledge_search`
+
 ---
 
 ## 知识结构
@@ -415,25 +452,34 @@ return ToolResult(
 
 ### `GET /api/knowledge/search`
 
-检索知识。核心逻辑在 Server 实现
+检索知识。使用向量召回 + LLM 精排策略
 
 **参数**:
 - `q`: 查询文本
 - `top_k`: 返回数量(默认 5)
 - `min_score`: 最低评分过滤(默认 3)
 - `types`: 按类型过滤(可选,逗号分隔)
+- `owner`: 按所有者过滤(可选)
 
-**检索流程**(两阶段,Server 端实现)
+**检索流程**:
 
-1. **语义路由**:使用 LLM(gemini-2.0-flash-001)从所有知识中挑选 2*k 个语义相关的候选
-   - 输入:query + 知识元数据(id, types, task 前 100 字符)
-   - 输出:候选知识 ID 列表
+1. **向量召回**(快速、便宜)
+   - 生成查询向量(使用 OpenAI text-embedding-3-small 或本地模型)
+   - Milvus Lite 检索语义相似的知识,召回 `top_k * 3` 个候选
+   - 支持标量过滤(types、owner、scopes、min_score)
 
-2. **质量精排**:根据评分和反馈计算质量分,筛选最终的 k 个
-   - 质量分公式:`quality_score = score + helpful - (harmful * 2.0)`
-   - 过滤:`score < min_score` 或 `quality_score < 0` 的知识被剔除
+2. **LLM 精排**(准确、贵)
+   - 使用 LLM(gemini-2.5-flash-lite)对候选重新排序
+   - 根据查询意图和知识内容,选出最相关的 `top_k` 条
+   - 输出按相关性从高到低排序
 
-实现位置:`knowhub/server.py:knowledge_search`
+3. **Fallback**(保证可用性)
+   - 如果 LLM 精排失败或超时,直接返回向量召回的 `top_k` 结果
+   - 保证检索始终可用
+
+实现位置:
+- `knowhub/server.py:knowledge_search` - 主检索逻辑
+- `knowhub/server.py:llm_rerank` - LLM 精排
 
 **响应**:
 
@@ -451,11 +497,11 @@ return ToolResult(
         "helpful": 2,
         "harmful": 0,
         "confidence": 0.9
-      },
-      "quality_score": 5.0
+      }
     }
   ],
-  "count": 3
+  "count": 3,
+  "reranked": true
 }
 ```
 

+ 87 - 0
knowhub/embeddings.py

@@ -0,0 +1,87 @@
+"""
+Embedding 生成模块
+
+使用 OpenRouter 的 openai/text-embedding-3-small 模型生成向量。
+支持单条和批量处理。
+"""
+
+import os
+import asyncio
+from typing import List, Union
+import httpx
+
+OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
+OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
+EMBEDDING_MODEL = "openai/text-embedding-3-small"
+EMBEDDING_DIM = 1536
+
+
+async def get_embedding(text: str) -> List[float]:
+    """
+    生成单条文本的向量
+
+    Args:
+        text: 输入文本
+
+    Returns:
+        1536 维向量
+    """
+    embeddings = await get_embeddings_batch([text])
+    return embeddings[0]
+
+
+async def get_embeddings_batch(texts: List[str], batch_size: int = 100) -> List[List[float]]:
+    """
+    批量生成文本向量
+
+    Args:
+        texts: 文本列表
+        batch_size: 每批处理数量(OpenAI 限制 2048)
+
+    Returns:
+        向量列表
+    """
+    if not texts:
+        return []
+
+    # 分批处理
+    all_embeddings = []
+    for i in range(0, len(texts), batch_size):
+        batch = texts[i:i + batch_size]
+        embeddings = await _call_embedding_api(batch)
+        all_embeddings.extend(embeddings)
+
+    return all_embeddings
+
+
+async def _call_embedding_api(texts: List[str]) -> List[List[float]]:
+    """
+    调用 OpenRouter embedding API
+
+    Args:
+        texts: 文本列表(单批)
+
+    Returns:
+        向量列表
+    """
+    if not OPENROUTER_API_KEY:
+        raise ValueError("OPENROUTER_API_KEY not set in environment")
+
+    async with httpx.AsyncClient(timeout=30.0) as client:
+        response = await client.post(
+            f"{OPENROUTER_BASE_URL}/embeddings",
+            headers={
+                "Authorization": f"Bearer {OPENROUTER_API_KEY}",
+                "Content-Type": "application/json",
+            },
+            json={
+                "model": EMBEDDING_MODEL,
+                "input": texts,
+            }
+        )
+        response.raise_for_status()
+        data = response.json()
+
+        # 按 index 排序(API 可能乱序返回)
+        embeddings_data = sorted(data["data"], key=lambda x: x["index"])
+        return [item["embedding"] for item in embeddings_data]

+ 5 - 0
knowhub/requirements.txt

@@ -1,3 +1,8 @@
 fastapi
 uvicorn[standard]
 pydantic
+pymilvus
+milvus
+httpx
+cryptography
+python-dotenv

+ 367 - 460
knowhub/server.py

@@ -2,7 +2,7 @@
 KnowHub Server
 
 Agent 工具使用经验的共享平台。
-FastAPI + SQLite,单文件部署。
+FastAPI + Milvus Lite(知识)+ SQLite(资源),单文件部署。
 """
 
 import os
@@ -11,6 +11,8 @@ import json
 import sqlite3
 import asyncio
 import base64
+import time
+import uuid
 from contextlib import asynccontextmanager
 from datetime import datetime, timezone
 from typing import Optional
@@ -31,6 +33,10 @@ load_dotenv(Path(__file__).parent.parent / ".env")
 
 from agent.llm.openrouter import openrouter_llm_call
 
+# 导入向量存储和 embedding
+from knowhub.vector_store import MilvusStore
+from knowhub.embeddings import get_embedding, get_embeddings_batch
+
 BRAND_NAME    = os.getenv("BRAND_NAME", "KnowHub")
 BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
 BRAND_DB      = os.getenv("BRAND_DB", "knowhub.db")
@@ -45,6 +51,10 @@ if ORG_KEYS_RAW:
             ORG_KEYS[org.strip()] = key_b64.strip()
 
 DB_PATH = Path(__file__).parent / BRAND_DB
+MILVUS_DATA_DIR = Path(__file__).parent / "milvus_data"
+
+# 全局 Milvus 存储实例
+milvus_store: Optional[MilvusStore] = None
 
 # --- 数据库 ---
 
@@ -127,6 +137,7 @@ def decrypt_content(resource_id: str, encrypted_text: str, provided_key: Optiona
 
 
 def init_db():
+    """初始化 SQLite(仅用于 resources)"""
     conn = get_db()
     conn.execute("""
         CREATE TABLE IF NOT EXISTS experiences (
@@ -160,28 +171,6 @@ def init_db():
         )
     """)
 
-    conn.execute("""
-        CREATE TABLE IF NOT EXISTS knowledge (
-            id            TEXT PRIMARY KEY,
-            message_id    TEXT DEFAULT '',
-            types         TEXT NOT NULL,              -- JSON array: ["strategy", "tool"]
-            task          TEXT NOT NULL,
-            tags          TEXT DEFAULT '{}',          -- JSON object: {"category": "...", "domain": "..."}
-            scopes        TEXT DEFAULT '["org:cybertogether"]',  -- JSON array
-            owner         TEXT DEFAULT '',
-            content       TEXT NOT NULL,
-            resource_ids  TEXT DEFAULT '[]',          -- JSON array: ["code/selenium/login", "credentials/website"]
-            source        TEXT DEFAULT '{}',          -- JSON object: {name, category, urls, agent_id, submitted_by, timestamp}
-            eval          TEXT DEFAULT '{}',          -- JSON object: {score, helpful, harmful, confidence, histories}
-            created_at    TEXT NOT NULL,
-            updated_at    TEXT DEFAULT ''
-        )
-    """)
-    conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_types ON knowledge(types)")
-    conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_task ON knowledge(task)")
-    conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_owner ON knowledge(owner)")
-    conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_scopes ON knowledge(scopes)")
-
     conn.commit()
     conn.close()
 
@@ -294,9 +283,18 @@ class ResourceOut(BaseModel):
 
 @asynccontextmanager
 async def lifespan(app: FastAPI):
+    global milvus_store
+
+    # 初始化 SQLite(resources)
     init_db()
+
+    # 初始化 Milvus Lite(knowledge)
+    milvus_store = MilvusStore(data_dir=str(MILVUS_DATA_DIR))
+
     yield
 
+    # 清理(Milvus Lite 会自动处理)
+
 
 app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
 
@@ -498,181 +496,58 @@ def list_resources(
 
 # ===== Knowledge API =====
 
-# 两阶段检索逻辑
-async def _route_knowledge_by_llm(query_text: str, metadata_list: list[dict], k: int = 5) -> list[str]:
+async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[str]:
     """
-    第一阶段:语义路由。
-    让 LLM 挑选出 2*k 个语义相关的 ID。
+    使用 LLM 对候选知识进行精排
+
+    Args:
+        query: 查询文本
+        candidates: 候选知识列表
+        top_k: 返回数量
+
+    Returns:
+        排序后的知识 ID 列表
     """
-    if not metadata_list:
+    if not candidates:
         return []
 
-    routing_k = k * 2
+    # 构造 prompt
+    candidates_text = "\n".join([
+        f"[{i+1}] ID: {c['id']}\nTask: {c['task']}\nContent: {c['content'][:200]}..."
+        for i, c in enumerate(candidates)
+    ])
 
-    routing_data = [
-        {
-            "id": m["id"],
-            "types": m["types"],
-            "task": m["task"][:100]
-        } for m in metadata_list
-    ]
+    prompt = f"""你是知识检索专家。根据用户查询,从候选知识中选出最相关的 {top_k} 条。
 
-    prompt = f"""
-你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
-任务需求:"{query_text}"
+用户查询:"{query}"
 
-可选知识列表
-{json.dumps(routing_data, ensure_ascii=False, indent=1)}
+候选知识:
+{candidates_text}
 
-请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"
-"""
+请输出最相关的 {top_k} 个知识 ID,按相关性从高到低排序,用逗号分隔
+只输出 ID,不要其他内容。"""
 
     try:
-        print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
-
         response = await openrouter_llm_call(
             messages=[{"role": "user", "content": prompt}],
             model="google/gemini-2.5-flash-lite"
         )
 
         content = response.get("content", "").strip()
-        selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
+        # 解析 ID 列表
+        selected_ids = [
+            idx.strip()
+            for idx in re.split(r'[,\s]+', content)
+            if idx.strip().startswith(("knowledge-", "research-"))
+        ]
+
+        return selected_ids[:top_k]
 
-        print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
-        return selected_ids
     except Exception as e:
-        print(f"LLM 知识路由失败: {e}")
+        print(f"[LLM Rerank] 失败: {e}")
         return []
 
 
-async def _search_knowledge_two_stage(
-    query_text: str,
-    top_k: int = 5,
-    min_score: int = 3,
-    types_filter: Optional[list[str]] = None,
-    owner_filter: Optional[str] = None,
-    conn: sqlite3.Connection = None
-) -> list[dict]:
-    """
-    两阶段检索:语义路由 + 质量精排
-    """
-    if conn is None:
-        conn = get_db()
-        should_close = True
-    else:
-        should_close = False
-
-    try:
-        # 阶段 1: 解析所有知识
-        query = "SELECT * FROM knowledge"
-        rows = conn.execute(query).fetchall()
-
-        if not rows:
-            return []
-
-        content_map = {}
-        metadata_list = []
-
-        for row in rows:
-            kid = row["id"]
-            types = json.loads(row["types"])
-
-            # 标签过滤
-            if types_filter:
-                if not any(t in types for t in types_filter):
-                    continue
-
-            # owner 过滤
-            if owner_filter and row["owner"] != owner_filter:
-                continue
-
-            task = row["task"]
-            content_text = row["content"]
-            eval_data = json.loads(row["eval"])
-            source = json.loads(row["source"])
-
-            meta_item = {
-                "id": kid,
-                "types": types,
-                "task": task,
-                "score": eval_data.get("score", 3),
-                "helpful": eval_data.get("helpful", 0),
-                "harmful": eval_data.get("harmful", 0),
-            }
-            metadata_list.append(meta_item)
-            content_map[kid] = {
-                "task": task,
-                "content": content_text,
-                "types": types,
-                "tags": json.loads(row["tags"]),
-                "scopes": json.loads(row["scopes"]),
-                "owner": row["owner"],
-                "score": meta_item["score"],
-                "helpful": meta_item["helpful"],
-                "harmful": meta_item["harmful"],
-                "message_id": row["message_id"],
-                "source": source,
-                "eval": eval_data,
-                "created_at": row["created_at"],
-                "updated_at": row["updated_at"]
-            }
-
-        if not metadata_list:
-            return []
-
-        # 阶段 2: 语义路由 (取 2*k)
-        candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k)
-
-        # 阶段 3: 质量精排
-        print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...")
-        scored_items = []
-
-        for kid in candidate_ids:
-            if kid in content_map:
-                item = content_map[kid]
-                score = item["score"]
-                helpful = item["helpful"]
-                harmful = item["harmful"]
-
-                # 计算综合分:基础分 + helpful - harmful*2
-                quality_score = score + helpful - (harmful * 2.0)
-
-                # 过滤门槛
-                if score < min_score or quality_score < 0:
-                    print(f"  - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})")
-                    continue
-
-                scored_items.append({
-                    "id": kid,
-                    "message_id": item["message_id"],
-                    "types": item["types"],
-                    "task": item["task"],
-                    "tags": item["tags"],
-                    "scopes": item["scopes"],
-                    "owner": item["owner"],
-                    "content": item["content"],
-                    "source": item["source"],
-                    "eval": item["eval"],
-                    "quality_score": quality_score,
-                    "created_at": item["created_at"],
-                    "updated_at": item["updated_at"]
-                })
-
-        # 按照质量分排序
-        final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True)
-
-        # 截取最终的 top_k
-        result = final_sorted[:top_k]
-
-        print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}")
-        print(f"[Knowledge System] 检索结束。\n")
-        return result
-
-    finally:
-        if should_close:
-            conn.close()
-
-
 @app.get("/api/knowledge/search")
 async def search_knowledge_api(
     q: str = Query(..., description="查询文本"),
@@ -681,37 +556,64 @@ async def search_knowledge_api(
     types: Optional[str] = None,
     owner: Optional[str] = None
 ):
-    """检索知识(两阶段:语义路由 + 质量精排)"""
-    conn = get_db()
+    """检索知识(向量召回 + LLM 精排)"""
     try:
-        types_filter = types.split(",") if types else None
-
-        results = await _search_knowledge_two_stage(
-            query_text=q,
-            top_k=top_k,
-            min_score=min_score,
-            types_filter=types_filter,
-            owner_filter=owner,
-            conn=conn
+        # 1. 生成查询向量
+        query_embedding = await get_embedding(q)
+
+        # 2. 构建过滤表达式
+        filters = []
+        if types:
+            type_list = [t.strip() for t in types.split(',') if t.strip()]
+            for t in type_list:
+                filters.append(f'JSON_CONTAINS(types, "{t}")')
+        if owner:
+            filters.append(f'owner == "{owner}"')
+
+        # 添加 min_score 过滤
+        filters.append(f'JSON_EXTRACT(eval, "$.score") >= {min_score}')
+
+        filter_expr = ' and '.join(filters) if filters else None
+
+        # 3. 向量召回(3*k 个候选)
+        recall_limit = top_k * 3
+        candidates = milvus_store.search(
+            query_embedding=query_embedding,
+            filters=filter_expr,
+            limit=recall_limit
         )
 
-        return {"results": results, "count": len(results)}
-    finally:
-        conn.close()
+        if not candidates:
+            return {"results": [], "count": 0, "reranked": False}
+
+        # 4. LLM 精排
+        reranked_ids = await _llm_rerank(q, candidates, top_k)
+
+        if reranked_ids:
+            # 按 LLM 排序返回
+            id_to_candidate = {c["id"]: c for c in candidates}
+            results = [id_to_candidate[id] for id in reranked_ids if id in id_to_candidate]
+            return {"results": results, "count": len(results), "reranked": True}
+        else:
+            # Fallback:直接返回向量召回的 top k
+            print(f"[Knowledge Search] LLM 精排失败,fallback 到向量 top-{top_k}")
+            return {"results": candidates[:top_k], "count": len(candidates[:top_k]), "reranked": False}
+
+    except Exception as e:
+        print(f"[Knowledge Search] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.post("/api/knowledge", status_code=201)
-def save_knowledge(knowledge: KnowledgeIn):
+async def save_knowledge(knowledge: KnowledgeIn):
     """保存新知识"""
-    import uuid
-    conn = get_db()
     try:
         # 生成 ID
         timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
         random_suffix = uuid.uuid4().hex[:4]
         knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
 
-        now = datetime.now(timezone.utc).isoformat()
+        now = int(time.time())
 
         # 设置默认值
         owner = knowledge.owner or f"agent:{knowledge.source.get('agent_id', 'unknown')}"
@@ -723,7 +625,7 @@ def save_knowledge(knowledge: KnowledgeIn):
             "urls": knowledge.source.get("urls", []),
             "agent_id": knowledge.source.get("agent_id", "unknown"),
             "submitted_by": knowledge.source.get("submitted_by", ""),
-            "timestamp": now,
+            "timestamp": datetime.now(timezone.utc).isoformat(),
             "message_id": knowledge.message_id
         }
 
@@ -737,31 +639,33 @@ def save_knowledge(knowledge: KnowledgeIn):
             "harmful_history": []
         }
 
-        conn.execute(
-            """INSERT INTO knowledge
-            (id, message_id, types, task, tags, scopes, owner, content,
-             resource_ids, source, eval, created_at, updated_at)
-            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
-            (
-                knowledge_id,
-                knowledge.message_id,
-                json.dumps(knowledge.types),
-                knowledge.task,
-                json.dumps(knowledge.tags),
-                json.dumps(knowledge.scopes),
-                owner,
-                knowledge.content,
-                json.dumps(knowledge.resource_ids),
-                json.dumps(source),
-                json.dumps(eval_data),
-                now,
-                now,
-            ),
-        )
-        conn.commit()
+        # 生成向量
+        text = f"{knowledge.task}\n{knowledge.content}"
+        embedding = await get_embedding(text)
+
+        # 插入 Milvus
+        milvus_store.insert({
+            "id": knowledge_id,
+            "embedding": embedding,
+            "message_id": knowledge.message_id,
+            "task": knowledge.task,
+            "content": knowledge.content,
+            "types": knowledge.types,
+            "tags": knowledge.tags,
+            "scopes": knowledge.scopes,
+            "owner": owner,
+            "resource_ids": knowledge.resource_ids,
+            "source": source,
+            "eval": eval_data,
+            "created_at": now,
+            "updated_at": now,
+        })
+
         return {"status": "ok", "knowledge_id": knowledge_id}
-    finally:
-        conn.close()
+
+    except Exception as e:
+        print(f"[Save Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.get("/api/knowledge")
@@ -773,112 +677,78 @@ def list_knowledge(
     tags: Optional[str] = None
 ):
     """列出知识(支持后端筛选)"""
-    conn = get_db()
     try:
-        query = "SELECT * FROM knowledge"
-        params = []
-        conditions = []
+        # 构建过滤表达式
+        filters = []
 
         # types 支持多个,用 AND 连接(交集:必须同时包含所有选中的type)
         if types:
             type_list = [t.strip() for t in types.split(',') if t.strip()]
-            if type_list:
-                for t in type_list:
-                    conditions.append("types LIKE ?")
-                    params.append(f"%{t}%")
+            for t in type_list:
+                filters.append(f'JSON_CONTAINS(types, "{t}")')
 
         if scopes:
-            conditions.append("scopes LIKE ?")
-            params.append(f"%{scopes}%")
+            filters.append(f'JSON_CONTAINS(scopes, "{scopes}")')
 
         if owner:
-            conditions.append("owner LIKE ?")
-            params.append(f"%{owner}%")
+            filters.append(f'owner like "%{owner}%"')
 
         # tags 支持多个,用 AND 连接(交集:必须同时包含所有选中的tag)
         if tags:
             tag_list = [t.strip() for t in tags.split(',') if t.strip()]
-            if tag_list:
-                for t in tag_list:
-                    conditions.append("tags LIKE ?")
-                    params.append(f"%{t}%")
-
-        if conditions:
-            query += " WHERE " + " AND ".join(conditions)
+            for t in tag_list:
+                filters.append(f'JSON_CONTAINS_ANY(tags, ["{t}"])')
 
-        query += " ORDER BY created_at DESC LIMIT ?"
-        params.append(limit)
+        # 如果没有过滤条件,查询所有
+        filter_expr = ' and '.join(filters) if filters else 'id != ""'
 
-        rows = conn.execute(query, params).fetchall()
-
-        results = []
-        for row in rows:
-            results.append({
-                "id": row["id"],
-                "message_id": row["message_id"],
-                "types": json.loads(row["types"]),
-                "task": row["task"],
-                "tags": json.loads(row["tags"]),
-                "scopes": json.loads(row["scopes"]),
-                "owner": row["owner"],
-                "content": row["content"],
-                "source": json.loads(row["source"]),
-                "eval": json.loads(row["eval"]),
-                "created_at": row["created_at"],
-                "updated_at": row["updated_at"]
-            })
+        # 查询 Milvus
+        results = milvus_store.query(filter_expr, limit=limit)
 
         return {"results": results, "count": len(results)}
-    finally:
-        conn.close()
+
+    except Exception as e:
+        print(f"[List Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.get("/api/knowledge/meta/tags")
 def get_all_tags():
     """获取所有已有的 tags"""
-    conn = get_db()
     try:
-        rows = conn.execute("SELECT tags FROM knowledge").fetchall()
+        # 查询所有知识
+        results = milvus_store.query('id != ""', limit=10000)
+
         all_tags = set()
-        for row in rows:
-            tags_dict = json.loads(row["tags"])
-            for key in tags_dict.keys():
-                all_tags.add(key)
+        for item in results:
+            tags_dict = item.get("tags", {})
+            if isinstance(tags_dict, dict):
+                for key in tags_dict.keys():
+                    all_tags.add(key)
+
         return {"tags": sorted(list(all_tags))}
-    finally:
-        conn.close()
+
+    except Exception as e:
+        print(f"[Get Tags] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.get("/api/knowledge/{knowledge_id}")
 def get_knowledge(knowledge_id: str):
     """获取单条知识"""
-    conn = get_db()
     try:
-        row = conn.execute(
-            "SELECT * FROM knowledge WHERE id = ?",
-            (knowledge_id,)
-        ).fetchone()
+        result = milvus_store.get_by_id(knowledge_id)
 
-        if not row:
+        if not result:
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
 
-        return {
-            "id": row["id"],
-            "message_id": row["message_id"],
-            "types": json.loads(row["types"]),
-            "task": row["task"],
-            "tags": json.loads(row["tags"]),
-            "scopes": json.loads(row["scopes"]),
-            "owner": row["owner"],
-            "content": row["content"],
-            "resource_ids": json.loads(row["resource_ids"]),
-            "source": json.loads(row["source"]),
-            "eval": json.loads(row["eval"]),
-            "created_at": row["created_at"],
-            "updated_at": row["updated_at"]
-        }
-    finally:
-        conn.close()
+        return result
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        print(f"[Get Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
@@ -914,14 +784,13 @@ async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
 @app.put("/api/knowledge/{knowledge_id}")
 async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
     """更新知识评估,支持知识进化"""
-    conn = get_db()
     try:
-        row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
-        if not row:
+        # 获取现有知识
+        existing = milvus_store.get_by_id(knowledge_id)
+        if not existing:
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
 
-        now = datetime.now(timezone.utc).isoformat()
-        eval_data = json.loads(row["eval"])
+        eval_data = existing.get("eval", {})
 
         # 更新评分
         if update.update_score is not None:
@@ -942,69 +811,91 @@ async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
             eval_data["harmful_history"].append(update.add_harmful_case)
 
         # 知识进化
-        content = row["content"]
+        content = existing["content"]
+        need_reembed = False
+
         if update.evolve_feedback:
             content = await _evolve_knowledge_with_llm(content, update.evolve_feedback)
             eval_data["helpful"] = eval_data.get("helpful", 0) + 1
+            need_reembed = True
 
-        # 更新数据库
-        conn.execute(
-            "UPDATE knowledge SET content = ?, eval = ?, updated_at = ? WHERE id = ?",
-            (content, json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
-        )
-        conn.commit()
+        # 准备更新数据
+        updates = {
+            "content": content,
+            "eval": eval_data,
+        }
+
+        # 如果内容变化,重新生成向量
+        if need_reembed:
+            text = f"{existing['task']}\n{content}"
+            embedding = await get_embedding(text)
+            updates["embedding"] = embedding
+
+        # 更新 Milvus
+        milvus_store.update(knowledge_id, updates)
 
         return {"status": "ok", "knowledge_id": knowledge_id}
-    finally:
-        conn.close()
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        print(f"[Update Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.patch("/api/knowledge/{knowledge_id}")
-def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
+async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
     """直接编辑知识字段"""
-    conn = get_db()
     try:
-        row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
-        if not row:
+        # 获取现有知识
+        existing = milvus_store.get_by_id(knowledge_id)
+        if not existing:
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
 
-        updates = []
-        params = []
+        updates = {}
+        need_reembed = False
 
         if patch.task is not None:
-            updates.append("task = ?")
-            params.append(patch.task)
+            updates["task"] = patch.task
+            need_reembed = True
+
         if patch.content is not None:
-            updates.append("content = ?")
-            params.append(patch.content)
+            updates["content"] = patch.content
+            need_reembed = True
+
         if patch.types is not None:
-            updates.append("types = ?")
-            params.append(json.dumps(patch.types, ensure_ascii=False))
+            updates["types"] = patch.types
+
         if patch.tags is not None:
-            updates.append("tags = ?")
-            params.append(json.dumps(patch.tags, ensure_ascii=False))
+            updates["tags"] = patch.tags
+
         if patch.scopes is not None:
-            updates.append("scopes = ?")
-            params.append(json.dumps(patch.scopes, ensure_ascii=False))
+            updates["scopes"] = patch.scopes
+
         if patch.owner is not None:
-            updates.append("owner = ?")
-            params.append(patch.owner)
+            updates["owner"] = patch.owner
 
         if not updates:
             return {"status": "ok", "knowledge_id": knowledge_id}
 
-        now = datetime.now(timezone.utc).isoformat()
-        updates.append("updated_at = ?")
-        params.append(now)
-        params.append(knowledge_id)
+        # 如果 task 或 content 变化,重新生成向量
+        if need_reembed:
+            task = updates.get("task", existing["task"])
+            content = updates.get("content", existing["content"])
+            text = f"{task}\n{content}"
+            embedding = await get_embedding(text)
+            updates["embedding"] = embedding
 
-        query = f"UPDATE knowledge SET {', '.join(updates)} WHERE id = ?"
-        conn.execute(query, params)
-        conn.commit()
+        # 更新 Milvus
+        milvus_store.update(knowledge_id, updates)
 
         return {"status": "ok", "knowledge_id": knowledge_id}
-    finally:
-        conn.close()
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        print(f"[Patch Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.post("/api/knowledge/batch_update")
@@ -1013,7 +904,6 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
     if not batch.feedback_list:
         return {"status": "ok", "updated": 0}
 
-    conn = get_db()
     try:
         # 先处理无需进化的,收集需要进化的
         evolution_tasks = []   # [(knowledge_id, old_content, feedback, eval_data)]
@@ -1027,67 +917,72 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
             if not knowledge_id:
                 continue
 
-            row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
-            if not row:
+            existing = milvus_store.get_by_id(knowledge_id)
+            if not existing:
                 continue
 
-            eval_data = json.loads(row["eval"])
+            eval_data = existing.get("eval", {})
 
             if is_effective and feedback:
-                evolution_tasks.append((knowledge_id, row["content"], feedback, eval_data))
+                evolution_tasks.append((knowledge_id, existing["content"], feedback, eval_data, existing["task"]))
             else:
                 simple_updates.append((knowledge_id, is_effective, eval_data))
 
         # 执行简单更新
-        now = datetime.now(timezone.utc).isoformat()
         for knowledge_id, is_effective, eval_data in simple_updates:
             if is_effective:
                 eval_data["helpful"] = eval_data.get("helpful", 0) + 1
             else:
                 eval_data["harmful"] = eval_data.get("harmful", 0) + 1
 
-            conn.execute(
-                "UPDATE knowledge SET eval = ?, updated_at = ? WHERE id = ?",
-                (json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
-            )
+            milvus_store.update(knowledge_id, {"eval": eval_data})
 
         # 并发执行知识进化
         if evolution_tasks:
             print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
             evolved_results = await asyncio.gather(
-                *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _ in evolution_tasks]
+                *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _, _ in evolution_tasks]
             )
-            for (knowledge_id, _, _, eval_data), evolved_content in zip(evolution_tasks, evolved_results):
+
+            for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results):
                 eval_data["helpful"] = eval_data.get("helpful", 0) + 1
-                conn.execute(
-                    "UPDATE knowledge SET content = ?, eval = ?, updated_at = ? WHERE id = ?",
-                    (evolved_content, json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
-                )
 
-        conn.commit()
+                # 重新生成向量
+                text = f"{task}\n{evolved_content}"
+                embedding = await get_embedding(text)
+
+                milvus_store.update(knowledge_id, {
+                    "content": evolved_content,
+                    "eval": eval_data,
+                    "embedding": embedding
+                })
+
         return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)}
-    finally:
-        conn.close()
+
+    except Exception as e:
+        print(f"[Batch Update] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.post("/api/knowledge/slim")
 async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"):
     """知识库瘦身:合并语义相似知识"""
-    conn = get_db()
     try:
-        rows = conn.execute("SELECT * FROM knowledge").fetchall()
-        if len(rows) < 2:
-            return {"status": "ok", "message": f"知识库仅有 {len(rows)} 条,无需瘦身"}
+        # 获取所有知识
+        all_knowledge = milvus_store.query('id != ""', limit=10000)
+
+        if len(all_knowledge) < 2:
+            return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"}
 
         # 构造发给大模型的内容
         entries_text = ""
-        for row in rows:
-            eval_data = json.loads(row["eval"])
-            types = json.loads(row["types"])
-            entries_text += f"[ID: {row['id']}] [Types: {','.join(types)}] "
+        for item in all_knowledge:
+            eval_data = item.get("eval", {})
+            types = item.get("types", [])
+            entries_text += f"[ID: {item['id']}] [Types: {','.join(types)}] "
             entries_text += f"[Helpful: {eval_data.get('helpful', 0)}, Harmful: {eval_data.get('harmful', 0)}] [Score: {eval_data.get('score', 3)}]\n"
-            entries_text += f"Task: {row['task']}\n"
-            entries_text += f"Content: {row['content'][:200]}...\n\n"
+            entries_text += f"Task: {item['task']}\n"
+            entries_text += f"Content: {item['content'][:200]}...\n\n"
 
         prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作:
 
@@ -1115,7 +1010,7 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
 
 禁止输出任何开场白或解释。"""
 
-        print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(rows)} 条知识...")
+        print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(all_knowledge)} 条知识...")
         response = await openrouter_llm_call(
             messages=[{"role": "user", "content": prompt}],
             model=model
@@ -1189,10 +1084,20 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
         if not new_entries:
             raise HTTPException(status_code=500, detail="解析大模型输出失败")
 
-        # 原子化写回
-        now = datetime.now(timezone.utc).isoformat()
-        conn.execute("DELETE FROM knowledge")
-        for e in new_entries:
+        # 生成向量并重建知识库
+        print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...")
+
+        # 批量生成向量
+        texts = [f"{e['task']}\n{e['content']}" for e in new_entries]
+        embeddings = await get_embeddings_batch(texts)
+
+        # 清空并重建
+        now = int(time.time())
+        milvus_store.drop_collection()
+        milvus_store._init_collection()
+
+        knowledge_list = []
+        for e, embedding in zip(new_entries, embeddings):
             eval_data = {
                 "score": e["score"],
                 "helpful": e["helpful"],
@@ -1207,37 +1112,39 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
                 "urls": [],
                 "agent_id": "slim",
                 "submitted_by": "system",
-                "timestamp": now
+                "timestamp": datetime.now(timezone.utc).isoformat()
             }
-            conn.execute(
-                """INSERT INTO knowledge
-                (id, message_id, types, task, tags, scopes, owner, content, source, eval, created_at, updated_at)
-                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
-                (
-                    e["id"],
-                    "",
-                    json.dumps(e["types"]),
-                    e["task"],
-                    json.dumps({}),
-                    json.dumps(["org:cybertogether"]),
-                    "agent:slim",
-                    e["content"],
-                    json.dumps(source, ensure_ascii=False),
-                    json.dumps(eval_data, ensure_ascii=False),
-                    now,
-                    now
-                )
-            )
-        conn.commit()
+            knowledge_list.append({
+                "id": e["id"],
+                "embedding": embedding,
+                "message_id": "",
+                "task": e["task"],
+                "content": e["content"],
+                "types": e["types"],
+                "tags": {},
+                "scopes": ["org:cybertogether"],
+                "owner": "agent:slim",
+                "resource_ids": [],
+                "source": source,
+                "eval": eval_data,
+                "created_at": now,
+                "updated_at": now
+            })
 
-        result_msg = f"瘦身完成:{len(rows)} → {len(new_entries)} 条知识"
+        milvus_store.insert_batch(knowledge_list)
+
+        result_msg = f"瘦身完成:{len(all_knowledge)} → {len(new_entries)} 条知识"
         if report_line:
             result_msg += f"\n{report_line}"
         print(f"[知识瘦身] {result_msg}")
 
-        return {"status": "ok", "before": len(rows), "after": len(new_entries), "report": report_line}
-    finally:
-        conn.close()
+        return {"status": "ok", "before": len(all_knowledge), "after": len(new_entries), "report": report_line}
+
+    except HTTPException:
+        raise
+    except Exception as e:
+        print(f"[Slim Knowledge] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
 
 
 @app.post("/api/extract")
@@ -1321,81 +1228,81 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
         if not isinstance(extracted_knowledge, list):
             raise ValueError("LLM output is not a list")
 
+        if not extracted_knowledge:
+            return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
+
+        # 批量生成向量
+        texts = [f"{item.get('task', '')}\n{item.get('content', '')}" for item in extracted_knowledge]
+        embeddings = await get_embeddings_batch(texts)
+
         # 保存提取的知识
-        conn = get_db()
         knowledge_ids = []
-        now = datetime.now(timezone.utc).isoformat()
+        now = int(time.time())
+        knowledge_list = []
 
-        try:
-            for item in extracted_knowledge:
-                task = item.get("task", "")
-                knowledge_content = item.get("content", "")
-                types = item.get("types", ["strategy"])
-                score = item.get("score", 3)
-
-                if not task or not knowledge_content:
-                    continue
-
-                # 生成 ID
-                import uuid
-                timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
-                random_suffix = uuid.uuid4().hex[:4]
-                knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
-
-                # 准备数据
-                source = {
-                    "name": "message_extraction",
-                    "category": "exp",
-                    "urls": [],
-                    "agent_id": extract_req.agent_id,
-                    "submitted_by": extract_req.submitted_by,
-                    "timestamp": now,
-                    "session_key": extract_req.session_key
-                }
+        for item, embedding in zip(extracted_knowledge, embeddings):
+            task = item.get("task", "")
+            knowledge_content = item.get("content", "")
+            types = item.get("types", ["strategy"])
+            score = item.get("score", 3)
 
-                eval_data = {
-                    "score": score,
-                    "helpful": 1,
-                    "harmful": 0,
-                    "confidence": 0.7,
-                    "helpful_history": [],
-                    "harmful_history": []
-                }
+            if not task or not knowledge_content:
+                continue
+
+            # 生成 ID
+            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
+            random_suffix = uuid.uuid4().hex[:4]
+            knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
+
+            # 准备数据
+            source = {
+                "name": "message_extraction",
+                "category": "exp",
+                "urls": [],
+                "agent_id": extract_req.agent_id,
+                "submitted_by": extract_req.submitted_by,
+                "timestamp": datetime.now(timezone.utc).isoformat(),
+                "session_key": extract_req.session_key
+            }
 
-                # 插入数据库
-                conn.execute(
-                    """INSERT INTO knowledge
-                    (id, message_id, types, task, tags, scopes, owner, content,
-                     source, eval, created_at, updated_at)
-                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
-                    (
-                        knowledge_id,
-                        "",
-                        json.dumps(types),
-                        task,
-                        json.dumps({}),
-                        json.dumps(["org:cybertogether"]),
-                        extract_req.submitted_by,
-                        knowledge_content,
-                        json.dumps(source, ensure_ascii=False),
-                        json.dumps(eval_data, ensure_ascii=False),
-                        now,
-                        now,
-                    ),
-                )
-                knowledge_ids.append(knowledge_id)
-
-            conn.commit()
-            print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
-
-            return {
-                "status": "ok",
-                "extracted_count": len(knowledge_ids),
-                "knowledge_ids": knowledge_ids
+            eval_data = {
+                "score": score,
+                "helpful": 1,
+                "harmful": 0,
+                "confidence": 0.7,
+                "helpful_history": [],
+                "harmful_history": []
             }
 
-        finally:
-            conn.close()
+            knowledge_list.append({
+                "id": knowledge_id,
+                "embedding": embedding,
+                "message_id": "",
+                "task": task,
+                "content": knowledge_content,
+                "types": types,
+                "tags": {},
+                "scopes": ["org:cybertogether"],
+                "owner": extract_req.submitted_by,
+                "resource_ids": [],
+                "source": source,
+                "eval": eval_data,
+                "created_at": now,
+                "updated_at": now,
+            })
+            knowledge_ids.append(knowledge_id)
+
+        # 批量插入
+        if knowledge_list:
+            milvus_store.insert_batch(knowledge_list)
+
+        print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
+
+        return {
+            "status": "ok",
+            "extracted_count": len(knowledge_ids),
+            "knowledge_ids": knowledge_ids
+        }
 
     except json.JSONDecodeError as e:
         print(f"[Extract] JSON 解析失败: {e}")

+ 213 - 0
knowhub/vector_store.py

@@ -0,0 +1,213 @@
+"""
+Milvus Lite 存储封装
+
+单一存储架构,存储完整知识数据 + 向量。
+"""
+
+from milvus import default_server
+from pymilvus import (
+    connections, Collection, FieldSchema,
+    CollectionSchema, DataType, utility
+)
+from typing import List, Dict, Optional
+import json
+import time
+
+
+class MilvusStore:
+    def __init__(self, data_dir: str = "./milvus_data"):
+        """
+        初始化 Milvus Lite 存储
+
+        Args:
+            data_dir: 数据存储目录
+        """
+        # 启动内嵌服务器
+        default_server.set_base_dir(data_dir)
+        default_server.start()
+
+        # 连接
+        connections.connect(
+            host='127.0.0.1',
+            port=default_server.listen_port
+        )
+
+        self._init_collection()
+
+    def _init_collection(self):
+        """初始化 collection"""
+        collection_name = "knowledge"
+
+        if utility.has_collection(collection_name):
+            self.collection = Collection(collection_name)
+        else:
+            # 定义 schema
+            fields = [
+                FieldSchema(name="id", dtype=DataType.VARCHAR,
+                           max_length=100, is_primary=True),
+                FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR,
+                           dim=1536),
+                FieldSchema(name="message_id", dtype=DataType.VARCHAR,
+                           max_length=100),
+                FieldSchema(name="task", dtype=DataType.VARCHAR,
+                           max_length=2000),
+                FieldSchema(name="content", dtype=DataType.VARCHAR,
+                           max_length=50000),
+                FieldSchema(name="types", dtype=DataType.JSON),
+                FieldSchema(name="tags", dtype=DataType.JSON),
+                FieldSchema(name="scopes", dtype=DataType.JSON),
+                FieldSchema(name="owner", dtype=DataType.VARCHAR,
+                           max_length=200),
+                FieldSchema(name="resource_ids", dtype=DataType.JSON),
+                FieldSchema(name="source", dtype=DataType.JSON),
+                FieldSchema(name="eval", dtype=DataType.JSON),
+                FieldSchema(name="created_at", dtype=DataType.INT64),
+                FieldSchema(name="updated_at", dtype=DataType.INT64),
+            ]
+
+            schema = CollectionSchema(fields, description="KnowHub Knowledge")
+            self.collection = Collection(collection_name, schema)
+
+            # 创建向量索引
+            index_params = {
+                "metric_type": "COSINE",
+                "index_type": "HNSW",
+                "params": {"M": 16, "efConstruction": 200}
+            }
+            self.collection.create_index("embedding", index_params)
+
+        self.collection.load()
+
+    def insert(self, knowledge: Dict):
+        """
+        插入单条知识
+
+        Args:
+            knowledge: 知识数据(包含 embedding)
+        """
+        self.collection.insert([knowledge])
+        self.collection.flush()
+
+    def insert_batch(self, knowledge_list: List[Dict]):
+        """
+        批量插入知识
+
+        Args:
+            knowledge_list: 知识列表
+        """
+        if not knowledge_list:
+            return
+        self.collection.insert(knowledge_list)
+        self.collection.flush()
+
+    def search(self,
+               query_embedding: List[float],
+               filters: Optional[str] = None,
+               limit: int = 10) -> List[Dict]:
+        """
+        向量检索 + 标量过滤
+
+        Args:
+            query_embedding: 查询向量
+            filters: 过滤表达式(如: 'owner == "agent"')
+            limit: 返回数量
+
+        Returns:
+            知识列表
+        """
+        search_params = {"metric_type": "COSINE", "params": {"ef": 100}}
+
+        results = self.collection.search(
+            data=[query_embedding],
+            anns_field="embedding",
+            param=search_params,
+            limit=limit,
+            expr=filters,
+            output_fields=["id", "message_id", "task", "content", "types",
+                          "tags", "scopes", "owner", "resource_ids",
+                          "source", "eval", "created_at", "updated_at"]
+        )
+
+        if not results or not results[0]:
+            return []
+
+        return [hit.entity.to_dict() for hit in results[0]]
+
+    def query(self, filters: str, limit: int = 100) -> List[Dict]:
+        """
+        纯标量查询(不使用向量)
+
+        Args:
+            filters: 过滤表达式
+            limit: 返回数量
+
+        Returns:
+            知识列表
+        """
+        results = self.collection.query(
+            expr=filters,
+            output_fields=["id", "message_id", "task", "content", "types",
+                          "tags", "scopes", "owner", "resource_ids",
+                          "source", "eval", "created_at", "updated_at"],
+            limit=limit
+        )
+        return results
+
+    def get_by_id(self, knowledge_id: str) -> Optional[Dict]:
+        """
+        根据 ID 获取知识
+
+        Args:
+            knowledge_id: 知识 ID
+
+        Returns:
+            知识数据,不存在返回 None
+        """
+        results = self.collection.query(
+            expr=f'id == "{knowledge_id}"',
+            output_fields=["id", "message_id", "task", "content", "types",
+                          "tags", "scopes", "owner", "resource_ids",
+                          "source", "eval", "created_at", "updated_at"]
+        )
+        return results[0] if results else None
+
+    def update(self, knowledge_id: str, updates: Dict):
+        """
+        更新知识(先删除再插入)
+
+        Args:
+            knowledge_id: 知识 ID
+            updates: 更新字段
+        """
+        # 1. 查询现有数据
+        existing = self.get_by_id(knowledge_id)
+        if not existing:
+            raise ValueError(f"Knowledge not found: {knowledge_id}")
+
+        # 2. 合并更新
+        existing.update(updates)
+        existing["updated_at"] = int(time.time())
+
+        # 3. 删除旧数据
+        self.delete(knowledge_id)
+
+        # 4. 插入新数据
+        self.insert(existing)
+
+    def delete(self, knowledge_id: str):
+        """
+        删除知识
+
+        Args:
+            knowledge_id: 知识 ID
+        """
+        self.collection.delete(f'id == "{knowledge_id}"')
+        self.collection.flush()
+
+    def count(self) -> int:
+        """返回知识总数"""
+        return self.collection.num_entities
+
+    def drop_collection(self):
+        """删除 collection(危险操作)"""
+        utility.drop_collection("knowledge")

+ 120 - 0
test_vector_search.py

@@ -0,0 +1,120 @@
+"""
+测试 Milvus Lite 向量检索实现
+
+运行前确保:
+1. pip install -r knowhub/requirements.txt
+2. 设置环境变量 OPENROUTER_API_KEY
+"""
+
+import asyncio
+import sys
+from pathlib import Path
+
+# 添加项目路径
+sys.path.insert(0, str(Path(__file__).parent))
+
+from knowhub.vector_store import MilvusStore
+from knowhub.embeddings import get_embedding, get_embeddings_batch
+
+
+async def test_basic():
+    """测试基本功能"""
+    print("=" * 60)
+    print("测试 1: 初始化 Milvus Lite")
+    print("=" * 60)
+
+    store = MilvusStore(data_dir="./test_milvus_data")
+    print(f"✓ Milvus Lite 初始化成功")
+    print(f"  当前知识数量: {store.count()}")
+
+    print("\n" + "=" * 60)
+    print("测试 2: 生成 Embedding")
+    print("=" * 60)
+
+    text = "如何使用 Python 读取 PDF 文件"
+    embedding = await get_embedding(text)
+    print(f"✓ 单条 embedding 生成成功")
+    print(f"  文本: {text}")
+    print(f"  向量维度: {len(embedding)}")
+
+    texts = ["测试文本1", "测试文本2", "测试文本3"]
+    embeddings = await get_embeddings_batch(texts)
+    print(f"✓ 批量 embedding 生成成功")
+    print(f"  文本数量: {len(texts)}")
+    print(f"  向量数量: {len(embeddings)}")
+
+    print("\n" + "=" * 60)
+    print("测试 3: 插入知识")
+    print("=" * 60)
+
+    import time
+    knowledge = {
+        "id": "test-001",
+        "embedding": embedding,
+        "message_id": "",
+        "task": "读取 PDF 文件",
+        "content": "使用 pymupdf 库可以高效读取 PDF 文件内容",
+        "types": ["tool"],
+        "tags": {"category": "file_processing"},
+        "scopes": ["org:test"],
+        "owner": "test_user",
+        "resource_ids": [],
+        "source": {"name": "test"},
+        "eval": {"score": 4, "helpful": 0, "harmful": 0},
+        "created_at": int(time.time()),
+        "updated_at": int(time.time()),
+    }
+
+    store.insert(knowledge)
+    print(f"✓ 知识插入成功")
+    print(f"  ID: {knowledge['id']}")
+    print(f"  当前知识数量: {store.count()}")
+
+    print("\n" + "=" * 60)
+    print("测试 4: 查询知识")
+    print("=" * 60)
+
+    result = store.get_by_id("test-001")
+    print(f"✓ 按 ID 查询成功")
+    print(f"  Task: {result['task']}")
+    print(f"  Content: {result['content']}")
+
+    print("\n" + "=" * 60)
+    print("测试 5: 向量检索")
+    print("=" * 60)
+
+    query_text = "怎么处理 PDF"
+    query_embedding = await get_embedding(query_text)
+    results = store.search(query_embedding, limit=5)
+    print(f"✓ 向量检索成功")
+    print(f"  查询: {query_text}")
+    print(f"  结果数量: {len(results)}")
+    if results:
+        print(f"  Top 1: {results[0]['task']}")
+
+    print("\n" + "=" * 60)
+    print("测试 6: 更新知识")
+    print("=" * 60)
+
+    store.update("test-001", {"content": "使用 pymupdf 库(推荐)或 PyPDF2 库读取 PDF"})
+    updated = store.get_by_id("test-001")
+    print(f"✓ 知识更新成功")
+    print(f"  新内容: {updated['content']}")
+
+    print("\n" + "=" * 60)
+    print("测试 7: 删除知识")
+    print("=" * 60)
+
+    store.delete("test-001")
+    deleted = store.get_by_id("test-001")
+    print(f"✓ 知识删除成功")
+    print(f"  删除后查询结果: {deleted}")
+    print(f"  当前知识数量: {store.count()}")
+
+    print("\n" + "=" * 60)
+    print("所有测试通过!")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    asyncio.run(test_basic())