Browse Source

Merge branch 'main' of https://git.yishihui.com/howard/Agent

guantao 9 hours ago
parent
commit
9d9906e853
7 changed files with 365 additions and 253 deletions
  1. 4 1
      .gitignore
  2. 66 28
      config/feishu_contacts.json
  3. 107 39
      knowhub/server.py
  4. 19 6
      knowhub/vector_store.py
  5. 169 0
      migrate_knowledge.py
  6. 0 59
      test_embeddings.py
  7. 0 120
      test_vector_search.py

+ 4 - 1
.gitignore

@@ -69,4 +69,7 @@ frontend/react-template/yarn.lock
 # data
 # data
 knowhub/knowhub.db
 knowhub/knowhub.db
 knowhub/knowhub.db-shm
 knowhub/knowhub.db-shm
-knowhub/knowhub.db-wal
+knowhub/knowhub.db-wal
+
+# Milvus data
+knowhub/milvus_data/

+ 66 - 28
config/feishu_contacts.json

@@ -1,30 +1,68 @@
 [
 [
-  {
-    "name": "谭景玉",
-    "description": "",
-    "open_id": "ou_11fdbd559cc6513ab53ff06d6c63413d",
-    "chat_id": "oc_56e85f0e2c97405d176729b62d8f56e5"
-  },
-  {
-    "name": "王华东",
-    "description": "",
-    "open_id": "ou_82340312cf9d215f49a41b67fa9c02c2"
-  },
-  {
-    "name": "孙若天",
-    "description": "",
-    "open_id": "ou_ede69f28c2617bf80a7574f059879c8d",
-    "chat_id": "oc_98019f9a0419b46a215ca604b04c5cc6"
-  },
-  {
-    "name": "刘斌",
-    "description": "",
-    "open_id": "ou_50c2307c3531e6293b3d5533d14592e9"
-  },
-  {
-    "name": "关涛",
-    "description": "",
-    "open_id": "ou_90b80ed994fe41b7f038a63cb9182f72",
-    "chat_id": "oc_ac9633d2c61f43b5049c425305482491"
-  }
+    {
+        "name": "谭景玉",
+        "description": "",
+        "open_id": "ou_11fdbd559cc6513ab53ff06d6c63413d",
+        "chat_id": "oc_56e85f0e2c97405d176729b62d8f56e5"
+    },
+    {
+        "name": "王华东",
+        "description": "",
+        "open_id": "ou_82340312cf9d215f49a41b67fa9c02c2"
+    },
+    {
+        "name": "孙若天",
+        "description": "",
+        "open_id": "ou_ede69f28c2617bf80a7574f059879c8d"
+    },
+    {
+        "name": "刘斌",
+        "description": "",
+        "open_id": "ou_50c2307c3531e6293b3d5533d14592e9"
+    },
+    {
+        "name": "关涛",
+        "description": "",
+        "open_id": "ou_90b80ed994fe41b7f038a63cb9182f72"
+    },
+    {
+        "name": "刘龙",
+        "description": "",
+        "open_id": "ou_2d25b935d720c4404e4ea88b70bcdce2"
+    },
+    {
+        "name": "马晗",
+        "description": "",
+        "open_id": "ou_405c6b8581ee36b153ff8159cfc9f29e"
+    },
+    {
+        "name": "王雅萱",
+        "description": "",
+        "open_id": "ou_22b070f27496faae0c4d841b82eda5c1"
+    },
+    {
+        "name": "苏朋",
+        "description": "",
+        "open_id": "ou_1bca759ca8efc4870f262c97c1a05e52"
+    },
+    {
+        "name": "杨孝辉",
+        "description": "",
+        "open_id": "ou_68709278795778df53519a577e3da486"
+    },
+    {
+        "name": "刘志恒",
+        "description": "",
+        "open_id": "ou_1922d32126d44e1cdbe658e6c184f502"
+    },
+    {
+        "name": "聂琦",
+        "description": "",
+        "open_id": "ou_3707d21dd00fa8753b9827214b834142"
+    },
+    {
+        "name": "蒋德敏",
+        "description": "",
+        "open_id": "ou_f6a659e740ebd110cfe27dc38bbddb37"
+    }
 ]
 ]

+ 107 - 39
knowhub/server.py

@@ -136,6 +136,50 @@ def decrypt_content(resource_id: str, encrypted_text: str, provided_key: Optiona
         return "[ENCRYPTED]"
         return "[ENCRYPTED]"
 
 
 
 
+def serialize_milvus_result(data):
+    """将 Milvus 返回的数据转换为可序列化的字典"""
+    # 基本类型直接返回
+    if data is None or isinstance(data, (str, int, float, bool)):
+        return data
+
+    # 字典类型递归处理
+    if isinstance(data, dict):
+        return {k: serialize_milvus_result(v) for k, v in data.items()}
+
+    # 列表/元组类型递归处理
+    if isinstance(data, (list, tuple)):
+        return [serialize_milvus_result(item) for item in data]
+
+    # 尝试转换为字典(对于有 to_dict 方法的对象)
+    if hasattr(data, 'to_dict') and callable(getattr(data, 'to_dict')):
+        try:
+            return serialize_milvus_result(data.to_dict())
+        except:
+            pass
+
+    # 尝试转换为列表(对于可迭代对象,如 RepeatedScalarContainer)
+    if hasattr(data, '__iter__') and not isinstance(data, (str, bytes, dict)):
+        try:
+            # 强制转换为列表并递归处理
+            result = []
+            for item in data:
+                result.append(serialize_milvus_result(item))
+            return result
+        except:
+            pass
+
+    # 尝试获取对象的属性字典
+    if hasattr(data, '__dict__'):
+        try:
+            return serialize_milvus_result(vars(data))
+        except:
+            pass
+
+    # 最后的 fallback:对于无法处理的类型,返回 None 而不是字符串表示
+    # 这样可以避免产生无法序列化的字符串
+    return None
+
+
 def init_db():
 def init_db():
     """初始化 SQLite(仅用于 resources)"""
     """初始化 SQLite(仅用于 resources)"""
     conn = get_db()
     conn = get_db()
@@ -566,12 +610,12 @@ async def search_knowledge_api(
         if types:
         if types:
             type_list = [t.strip() for t in types.split(',') if t.strip()]
             type_list = [t.strip() for t in types.split(',') if t.strip()]
             for t in type_list:
             for t in type_list:
-                filters.append(f'JSON_CONTAINS(types, "{t}")')
+                filters.append(f'array_contains(types, "{t}")')
         if owner:
         if owner:
             filters.append(f'owner == "{owner}"')
             filters.append(f'owner == "{owner}"')
 
 
         # 添加 min_score 过滤
         # 添加 min_score 过滤
-        filters.append(f'JSON_EXTRACT(eval, "$.score") >= {min_score}')
+        filters.append(f'eval["score"] >= {min_score}')
 
 
         filter_expr = ' and '.join(filters) if filters else None
         filter_expr = ' and '.join(filters) if filters else None
 
 
@@ -586,18 +630,21 @@ async def search_knowledge_api(
         if not candidates:
         if not candidates:
             return {"results": [], "count": 0, "reranked": False}
             return {"results": [], "count": 0, "reranked": False}
 
 
+        # 转换为可序列化的格式
+        serialized_candidates = [serialize_milvus_result(c) for c in candidates]
+
         # 4. LLM 精排
         # 4. LLM 精排
-        reranked_ids = await _llm_rerank(q, candidates, top_k)
+        reranked_ids = await _llm_rerank(q, serialized_candidates, top_k)
 
 
         if reranked_ids:
         if reranked_ids:
             # 按 LLM 排序返回
             # 按 LLM 排序返回
-            id_to_candidate = {c["id"]: c for c in candidates}
+            id_to_candidate = {c["id"]: c for c in serialized_candidates}
             results = [id_to_candidate[id] for id in reranked_ids if id in id_to_candidate]
             results = [id_to_candidate[id] for id in reranked_ids if id in id_to_candidate]
             return {"results": results, "count": len(results), "reranked": True}
             return {"results": results, "count": len(results), "reranked": True}
         else:
         else:
             # Fallback:直接返回向量召回的 top k
             # Fallback:直接返回向量召回的 top k
             print(f"[Knowledge Search] LLM 精排失败,fallback 到向量 top-{top_k}")
             print(f"[Knowledge Search] LLM 精排失败,fallback 到向量 top-{top_k}")
-            return {"results": candidates[:top_k], "count": len(candidates[:top_k]), "reranked": False}
+            return {"results": serialized_candidates[:top_k], "count": len(serialized_candidates[:top_k]), "reranked": False}
 
 
     except Exception as e:
     except Exception as e:
         print(f"[Knowledge Search] 错误: {e}")
         print(f"[Knowledge Search] 错误: {e}")
@@ -639,12 +686,14 @@ async def save_knowledge(knowledge: KnowledgeIn):
             "harmful_history": []
             "harmful_history": []
         }
         }
 
 
-        # 生成向量
-        text = f"{knowledge.task}\n{knowledge.content}"
-        embedding = await get_embedding(text)
+        # 生成向量(只基于 task,因为搜索时用户描述的是任务场景)
+        embedding = await get_embedding(knowledge.task)
 
 
-        # 插入 Milvus
-        milvus_store.insert({
+        # 提取 tag keys(用于高效筛选)
+        tag_keys = list(knowledge.tags.keys()) if isinstance(knowledge.tags, dict) else []
+
+        # 准备插入数据
+        insert_data = {
             "id": knowledge_id,
             "id": knowledge_id,
             "embedding": embedding,
             "embedding": embedding,
             "message_id": knowledge.message_id,
             "message_id": knowledge.message_id,
@@ -652,6 +701,7 @@ async def save_knowledge(knowledge: KnowledgeIn):
             "content": knowledge.content,
             "content": knowledge.content,
             "types": knowledge.types,
             "types": knowledge.types,
             "tags": knowledge.tags,
             "tags": knowledge.tags,
+            "tag_keys": tag_keys,
             "scopes": knowledge.scopes,
             "scopes": knowledge.scopes,
             "owner": owner,
             "owner": owner,
             "resource_ids": knowledge.resource_ids,
             "resource_ids": knowledge.resource_ids,
@@ -659,7 +709,12 @@ async def save_knowledge(knowledge: KnowledgeIn):
             "eval": eval_data,
             "eval": eval_data,
             "created_at": now,
             "created_at": now,
             "updated_at": now,
             "updated_at": now,
-        })
+        }
+
+        print(f"[Save Knowledge] 插入数据: {json.dumps({k: v for k, v in insert_data.items() if k != 'embedding'}, ensure_ascii=False)}")
+
+        # 插入 Milvus
+        milvus_store.insert(insert_data)
 
 
         return {"status": "ok", "knowledge_id": knowledge_id}
         return {"status": "ok", "knowledge_id": knowledge_id}
 
 
@@ -685,19 +740,19 @@ def list_knowledge(
         if types:
         if types:
             type_list = [t.strip() for t in types.split(',') if t.strip()]
             type_list = [t.strip() for t in types.split(',') if t.strip()]
             for t in type_list:
             for t in type_list:
-                filters.append(f'JSON_CONTAINS(types, "{t}")')
+                filters.append(f'array_contains(types, "{t}")')
 
 
         if scopes:
         if scopes:
-            filters.append(f'JSON_CONTAINS(scopes, "{scopes}")')
+            filters.append(f'array_contains(scopes, "{scopes}")')
 
 
         if owner:
         if owner:
             filters.append(f'owner like "%{owner}%"')
             filters.append(f'owner like "%{owner}%"')
 
 
-        # tags 支持多个,用 AND 连接(交集:必须同时包含所有选中的tag
+        # tags 支持多个,用 AND 连接(使用 tag_keys 数组进行高效筛选
         if tags:
         if tags:
             tag_list = [t.strip() for t in tags.split(',') if t.strip()]
             tag_list = [t.strip() for t in tags.split(',') if t.strip()]
             for t in tag_list:
             for t in tag_list:
-                filters.append(f'JSON_CONTAINS_ANY(tags, ["{t}"])')
+                filters.append(f'array_contains(tag_keys, "{t}")')
 
 
         # 如果没有过滤条件,查询所有
         # 如果没有过滤条件,查询所有
         filter_expr = ' and '.join(filters) if filters else 'id != ""'
         filter_expr = ' and '.join(filters) if filters else 'id != ""'
@@ -705,7 +760,10 @@ def list_knowledge(
         # 查询 Milvus
         # 查询 Milvus
         results = milvus_store.query(filter_expr, limit=limit)
         results = milvus_store.query(filter_expr, limit=limit)
 
 
-        return {"results": results, "count": len(results)}
+        # 转换为可序列化的格式
+        serialized_results = [serialize_milvus_result(r) for r in results]
+
+        return {"results": serialized_results, "count": len(serialized_results)}
 
 
     except Exception as e:
     except Exception as e:
         print(f"[List Knowledge] 错误: {e}")
         print(f"[List Knowledge] 错误: {e}")
@@ -721,7 +779,9 @@ def get_all_tags():
 
 
         all_tags = set()
         all_tags = set()
         for item in results:
         for item in results:
-            tags_dict = item.get("tags", {})
+            # 转换为标准字典
+            serialized_item = serialize_milvus_result(item)
+            tags_dict = serialized_item.get("tags", {})
             if isinstance(tags_dict, dict):
             if isinstance(tags_dict, dict):
                 for key in tags_dict.keys():
                 for key in tags_dict.keys():
                     all_tags.add(key)
                     all_tags.add(key)
@@ -742,7 +802,7 @@ def get_knowledge(knowledge_id: str):
         if not result:
         if not result:
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
 
 
-        return result
+        return serialize_milvus_result(result)
 
 
     except HTTPException:
     except HTTPException:
         raise
         raise
@@ -827,8 +887,7 @@ async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
 
 
         # 如果内容变化,重新生成向量
         # 如果内容变化,重新生成向量
         if need_reembed:
         if need_reembed:
-            text = f"{existing['task']}\n{content}"
-            embedding = await get_embedding(text)
+            embedding = await get_embedding(existing['task'])
             updates["embedding"] = embedding
             updates["embedding"] = embedding
 
 
         # 更新 Milvus
         # 更新 Milvus
@@ -861,13 +920,15 @@ async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
 
 
         if patch.content is not None:
         if patch.content is not None:
             updates["content"] = patch.content
             updates["content"] = patch.content
-            need_reembed = True
+            # content 变化不需要重新生成 embedding(只基于 task)
 
 
         if patch.types is not None:
         if patch.types is not None:
             updates["types"] = patch.types
             updates["types"] = patch.types
 
 
         if patch.tags is not None:
         if patch.tags is not None:
             updates["tags"] = patch.tags
             updates["tags"] = patch.tags
+            # 同时更新 tag_keys
+            updates["tag_keys"] = list(patch.tags.keys()) if isinstance(patch.tags, dict) else []
 
 
         if patch.scopes is not None:
         if patch.scopes is not None:
             updates["scopes"] = patch.scopes
             updates["scopes"] = patch.scopes
@@ -878,12 +939,10 @@ async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
         if not updates:
         if not updates:
             return {"status": "ok", "knowledge_id": knowledge_id}
             return {"status": "ok", "knowledge_id": knowledge_id}
 
 
-        # 如果 task 或 content 变化,重新生成向量
+        # 如果 task 变化,重新生成向量
         if need_reembed:
         if need_reembed:
             task = updates.get("task", existing["task"])
             task = updates.get("task", existing["task"])
-            content = updates.get("content", existing["content"])
-            text = f"{task}\n{content}"
-            embedding = await get_embedding(text)
+            embedding = await get_embedding(task)
             updates["embedding"] = embedding
             updates["embedding"] = embedding
 
 
         # 更新 Milvus
         # 更新 Milvus
@@ -947,9 +1006,8 @@ async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
             for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results):
             for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results):
                 eval_data["helpful"] = eval_data.get("helpful", 0) + 1
                 eval_data["helpful"] = eval_data.get("helpful", 0) + 1
 
 
-                # 重新生成向量
-                text = f"{task}\n{evolved_content}"
-                embedding = await get_embedding(text)
+                # 重新生成向量(只基于 task)
+                embedding = await get_embedding(task)
 
 
                 milvus_store.update(knowledge_id, {
                 milvus_store.update(knowledge_id, {
                     "content": evolved_content,
                     "content": evolved_content,
@@ -970,6 +1028,8 @@ async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"):
     try:
     try:
         # 获取所有知识
         # 获取所有知识
         all_knowledge = milvus_store.query('id != ""', limit=10000)
         all_knowledge = milvus_store.query('id != ""', limit=10000)
+        # 转换为可序列化的格式
+        all_knowledge = [serialize_milvus_result(item) for item in all_knowledge]
 
 
         if len(all_knowledge) < 2:
         if len(all_knowledge) < 2:
             return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"}
             return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"}
@@ -1087,8 +1147,8 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
         # 生成向量并重建知识库
         # 生成向量并重建知识库
         print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...")
         print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...")
 
 
-        # 批量生成向量
-        texts = [f"{e['task']}\n{e['content']}" for e in new_entries]
+        # 批量生成向量(只基于 task)
+        texts = [e['task'] for e in new_entries]
         embeddings = await get_embeddings_batch(texts)
         embeddings = await get_embeddings_batch(texts)
 
 
         # 清空并重建
         # 清空并重建
@@ -1122,6 +1182,7 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
                 "content": e["content"],
                 "content": e["content"],
                 "types": e["types"],
                 "types": e["types"],
                 "tags": {},
                 "tags": {},
+                "tag_keys": [],
                 "scopes": ["org:cybertogether"],
                 "scopes": ["org:cybertogether"],
                 "owner": "agent:slim",
                 "owner": "agent:slim",
                 "resource_ids": [],
                 "resource_ids": [],
@@ -1231,8 +1292,8 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
         if not extracted_knowledge:
         if not extracted_knowledge:
             return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
             return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
 
 
-        # 批量生成向量
-        texts = [f"{item.get('task', '')}\n{item.get('content', '')}" for item in extracted_knowledge]
+        # 批量生成向量(只基于 task)
+        texts = [item.get('task', '') for item in extracted_knowledge]
         embeddings = await get_embeddings_batch(texts)
         embeddings = await get_embeddings_batch(texts)
 
 
         # 保存提取的知识
         # 保存提取的知识
@@ -1282,6 +1343,7 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
                 "content": knowledge_content,
                 "content": knowledge_content,
                 "types": types,
                 "types": types,
                 "tags": {},
                 "tags": {},
+                "tag_keys": [],
                 "scopes": ["org:cybertogether"],
                 "scopes": ["org:cybertogether"],
                 "owner": extract_req.submitted_by,
                 "owner": extract_req.submitted_by,
                 "resource_ids": [],
                 "resource_ids": [],
@@ -1543,14 +1605,20 @@ def frontend():
 
 
             document.getElementById('modalTitle').textContent = '编辑知识';
             document.getElementById('modalTitle').textContent = '编辑知识';
             document.getElementById('editId').value = k.id;
             document.getElementById('editId').value = k.id;
-            document.getElementById('taskInput').value = k.task;
-            document.getElementById('contentInput').value = k.content;
-            document.getElementById('tagsInput').value = JSON.stringify(k.tags);
-            document.getElementById('scopesInput').value = k.scopes.join(', ');
-            document.getElementById('ownerInput').value = k.owner;
+            document.getElementById('taskInput').value = k.task || '';
+            document.getElementById('contentInput').value = k.content || '';
+            document.getElementById('tagsInput').value = JSON.stringify(k.tags || {});
+
+            // 防御性检查:确保 scopes 是数组
+            const scopes = Array.isArray(k.scopes) ? k.scopes : [];
+            document.getElementById('scopesInput').value = scopes.join(', ');
+
+            document.getElementById('ownerInput').value = k.owner || '';
 
 
+            // 防御性检查:确保 types 是数组
+            const types = Array.isArray(k.types) ? k.types : [];
             document.querySelectorAll('.type-checkbox').forEach(el => {
             document.querySelectorAll('.type-checkbox').forEach(el => {
-                el.checked = k.types.includes(el.value);
+                el.checked = types.includes(el.value);
             });
             });
 
 
             document.getElementById('modal').classList.remove('hidden');
             document.getElementById('modal').classList.remove('hidden');
@@ -1625,4 +1693,4 @@ def frontend():
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     import uvicorn
     import uvicorn
-    uvicorn.run(app, host="0.0.0.0", port=9998)
+    uvicorn.run(app, host="0.0.0.0", port=9999)

+ 19 - 6
knowhub/vector_store.py

@@ -41,6 +41,14 @@ class MilvusStore:
             try:
             try:
                 default_server.start()
                 default_server.start()
                 print(f"[Milvus] Milvus Lite 启动成功 (端口 {default_server.listen_port})")
                 print(f"[Milvus] Milvus Lite 启动成功 (端口 {default_server.listen_port})")
+                # 启动后建立连接
+                connections.connect(
+                    alias="default",
+                    host='127.0.0.1',
+                    port=default_server.listen_port,
+                    timeout=5
+                )
+                print(f"[Milvus] 已连接到新启动的实例")
             except Exception as e:
             except Exception as e:
                 print(f"[Milvus] 启动失败: {e}")
                 print(f"[Milvus] 启动失败: {e}")
                 # 尝试连接到可能已经在运行的实例
                 # 尝试连接到可能已经在运行的实例
@@ -76,12 +84,17 @@ class MilvusStore:
                            max_length=2000),
                            max_length=2000),
                 FieldSchema(name="content", dtype=DataType.VARCHAR,
                 FieldSchema(name="content", dtype=DataType.VARCHAR,
                            max_length=50000),
                            max_length=50000),
-                FieldSchema(name="types", dtype=DataType.JSON),
+                FieldSchema(name="types", dtype=DataType.ARRAY,
+                           element_type=DataType.VARCHAR, max_capacity=20, max_length=50),
                 FieldSchema(name="tags", dtype=DataType.JSON),
                 FieldSchema(name="tags", dtype=DataType.JSON),
-                FieldSchema(name="scopes", dtype=DataType.JSON),
+                FieldSchema(name="tag_keys", dtype=DataType.ARRAY,
+                           element_type=DataType.VARCHAR, max_capacity=50, max_length=100),
+                FieldSchema(name="scopes", dtype=DataType.ARRAY,
+                           element_type=DataType.VARCHAR, max_capacity=20, max_length=100),
                 FieldSchema(name="owner", dtype=DataType.VARCHAR,
                 FieldSchema(name="owner", dtype=DataType.VARCHAR,
                            max_length=200),
                            max_length=200),
-                FieldSchema(name="resource_ids", dtype=DataType.JSON),
+                FieldSchema(name="resource_ids", dtype=DataType.ARRAY,
+                           element_type=DataType.VARCHAR, max_capacity=50, max_length=200),
                 FieldSchema(name="source", dtype=DataType.JSON),
                 FieldSchema(name="source", dtype=DataType.JSON),
                 FieldSchema(name="eval", dtype=DataType.JSON),
                 FieldSchema(name="eval", dtype=DataType.JSON),
                 FieldSchema(name="created_at", dtype=DataType.INT64),
                 FieldSchema(name="created_at", dtype=DataType.INT64),
@@ -147,7 +160,7 @@ class MilvusStore:
             limit=limit,
             limit=limit,
             expr=filters,
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"]
                           "source", "eval", "created_at", "updated_at"]
         )
         )
 
 
@@ -170,7 +183,7 @@ class MilvusStore:
         results = self.collection.query(
         results = self.collection.query(
             expr=filters,
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"],
                           "source", "eval", "created_at", "updated_at"],
             limit=limit
             limit=limit
         )
         )
@@ -189,7 +202,7 @@ class MilvusStore:
         results = self.collection.query(
         results = self.collection.query(
             expr=f'id == "{knowledge_id}"',
             expr=f'id == "{knowledge_id}"',
             output_fields=["id", "message_id", "task", "content", "types",
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"]
                           "source", "eval", "created_at", "updated_at"]
         )
         )
         return results[0] if results else None
         return results[0] if results else None

+ 169 - 0
migrate_knowledge.py

@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+"""
+知识库迁移脚本: SQLite -> Milvus
+从旧的 SQLite 数据库迁移知识数据到新的 Milvus 向量数据库
+"""
+
+import sys
+import json
+import sqlite3
+import asyncio
+from pathlib import Path
+from datetime import datetime
+
+# 添加项目路径
+sys.path.insert(0, str(Path(__file__).parent))
+
+from knowhub.vector_store import MilvusStore
+from knowhub.embeddings import get_embeddings_batch
+
+
+async def migrate_knowledge():
+    """迁移知识数据"""
+
+    # 源数据库路径
+    source_db = Path("/root/knowhub_backup_20260309_204451.db")
+    if not source_db.exists():
+        print(f"❌ 源数据库不存在: {source_db}")
+        return
+
+    # 目标 Milvus 存储
+    milvus_data_dir = Path(__file__).parent / "knowhub/milvus_data"
+    target_store = MilvusStore(str(milvus_data_dir))
+
+    print(f"📂 源数据库: {source_db}")
+    print(f"📂 目标 Milvus: {milvus_data_dir}")
+    print(f"📊 当前 Milvus 中的知识数量: {target_store.count()}")
+
+    # 读取源数据
+    print("\n📖 正在读取源数据...")
+    conn = sqlite3.connect(str(source_db))
+    conn.row_factory = sqlite3.Row
+    cursor = conn.cursor()
+
+    cursor.execute("SELECT * FROM knowledge ORDER BY created_at")
+    rows = cursor.fetchall()
+    conn.close()
+
+    print(f"✅ 读取到 {len(rows)} 条知识数据")
+
+    if len(rows) == 0:
+        print("⚠️  没有数据需要迁移")
+        return
+
+    # 显示迁移信息
+    print(f"\n⚠️  即将迁移 {len(rows)} 条知识到 Milvus")
+    print(f"   当前 Milvus 中已有 {target_store.count()} 条知识")
+    print("   开始迁移...")
+
+    # 转换数据格式
+    print("\n🔄 正在转换数据格式...")
+    knowledge_list = []
+    tasks = []  # 用于批量生成 embedding
+
+    for row in rows:
+        try:
+            # 解析 JSON 字段
+            types = json.loads(row['types']) if row['types'] else ["strategy"]
+            tags = json.loads(row['tags']) if row['tags'] else {}
+            scopes = json.loads(row['scopes']) if row['scopes'] else ["org:cybertogether"]
+            source = json.loads(row['source']) if row['source'] else {}
+            eval_data = json.loads(row['eval']) if row['eval'] else {
+                "score": 3, "helpful": 1, "harmful": 0, "confidence": 0.5,
+                "helpful_history": [], "harmful_history": []
+            }
+            resource_ids = json.loads(row['resource_ids']) if row['resource_ids'] else []
+
+            # 解析时间戳
+            created_at = row['created_at']
+            updated_at = row['updated_at'] if row['updated_at'] else created_at
+
+            # 转换为时间戳(如果是 ISO 格式字符串)
+            if isinstance(created_at, str):
+                try:
+                    created_at = int(datetime.fromisoformat(created_at.replace('Z', '+00:00')).timestamp())
+                except:
+                    created_at = int(datetime.now().timestamp())
+
+            if isinstance(updated_at, str):
+                try:
+                    updated_at = int(datetime.fromisoformat(updated_at.replace('Z', '+00:00')).timestamp())
+                except:
+                    updated_at = created_at
+
+            knowledge_list.append({
+                "id": row['id'],
+                "message_id": row['message_id'] or "",
+                "task": row['task'],
+                "content": row['content'],
+                "types": types,
+                "tags": tags,
+                "tag_keys": list(tags.keys()) if isinstance(tags, dict) else [],
+                "scopes": scopes,
+                "owner": row['owner'] or "agent:unknown",
+                "resource_ids": resource_ids,
+                "source": source,
+                "eval": eval_data,
+                "created_at": created_at,
+                "updated_at": updated_at,
+            })
+
+            # 收集 task 用于生成 embedding(只基于 task)
+            tasks.append(row['task'])
+
+        except Exception as e:
+            print(f"⚠️  跳过无效数据 {row['id']}: {e}")
+            continue
+
+    print(f"✅ 成功转换 {len(knowledge_list)} 条知识")
+
+    # 批量生成 embeddings
+    print(f"\n🧮 正在生成 embeddings (只基于 task 字段)...")
+    batch_size = 100
+    all_embeddings = []
+
+    for i in range(0, len(tasks), batch_size):
+        batch_tasks = tasks[i:i+batch_size]
+        print(f"   处理 {i+1}-{min(i+batch_size, len(tasks))}/{len(tasks)}...")
+
+        try:
+            embeddings = await get_embeddings_batch(batch_tasks)
+            all_embeddings.extend(embeddings)
+        except Exception as e:
+            print(f"❌ 生成 embeddings 失败: {e}")
+            return
+
+    print(f"✅ 成功生成 {len(all_embeddings)} 个 embeddings")
+
+    # 添加 embeddings 到知识数据
+    for knowledge, embedding in zip(knowledge_list, all_embeddings):
+        knowledge["embedding"] = embedding
+
+    # 批量插入到 Milvus
+    print(f"\n💾 正在插入数据到 Milvus...")
+    batch_size = 100
+
+    for i in range(0, len(knowledge_list), batch_size):
+        batch = knowledge_list[i:i+batch_size]
+        try:
+            target_store.insert_batch(batch)
+            print(f"   已插入 {min(i+batch_size, len(knowledge_list))}/{len(knowledge_list)}")
+        except Exception as e:
+            print(f"❌ 插入失败: {e}")
+            print(f"   失败的批次: {i}-{i+batch_size}")
+            # 尝试逐条插入
+            for j, item in enumerate(batch):
+                try:
+                    target_store.insert(item)
+                except Exception as e2:
+                    print(f"   ⚠️  跳过 {item['id']}: {e2}")
+
+    # 验证
+    final_count = target_store.count()
+    print(f"\n✅ 迁移完成!")
+    print(f"   Milvus 中的知识总数: {final_count}")
+    print(f"   新增: {final_count - target_store.count() + len(knowledge_list)}")
+
+
+if __name__ == "__main__":
+    asyncio.run(migrate_knowledge())

+ 0 - 59
test_embeddings.py

@@ -1,59 +0,0 @@
-"""
-测试 Embeddings 模块(不依赖 Milvus)
-"""
-
-import asyncio
-import sys
-from pathlib import Path
-
-sys.path.insert(0, str(Path(__file__).parent))
-
-# 加载环境变量
-from dotenv import load_dotenv
-load_dotenv(Path(__file__).parent / ".env")
-
-from knowhub.embeddings import get_embedding, get_embeddings_batch
-
-
-async def test_embeddings():
-    print("=" * 60)
-    print("测试 Embeddings 模块")
-    print("=" * 60)
-
-    # 测试单条
-    print("\n1. 测试单条 embedding 生成...")
-    text = "如何使用 Python 读取 PDF 文件"
-    try:
-        embedding = await get_embedding(text)
-        print(f"✓ 成功生成 embedding")
-        print(f"  文本: {text}")
-        print(f"  向量维度: {len(embedding)}")
-        print(f"  前 5 个值: {embedding[:5]}")
-    except Exception as e:
-        print(f"✗ 失败: {e}")
-        return
-
-    # 测试批量
-    print("\n2. 测试批量 embedding 生成...")
-    texts = [
-        "使用 pymupdf 读取 PDF",
-        "使用 selenium 进行网页自动化",
-        "使用 pandas 处理数据"
-    ]
-    try:
-        embeddings = await get_embeddings_batch(texts)
-        print(f"✓ 成功生成批量 embeddings")
-        print(f"  文本数量: {len(texts)}")
-        print(f"  向量数量: {len(embeddings)}")
-        print(f"  每个向量维度: {len(embeddings[0])}")
-    except Exception as e:
-        print(f"✗ 失败: {e}")
-        return
-
-    print("\n" + "=" * 60)
-    print("Embeddings 模块测试通过!")
-    print("=" * 60)
-
-
-if __name__ == "__main__":
-    asyncio.run(test_embeddings())

+ 0 - 120
test_vector_search.py

@@ -1,120 +0,0 @@
-"""
-测试 Milvus Lite 向量检索实现
-
-运行前确保:
-1. pip install -r knowhub/requirements.txt
-2. 设置环境变量 OPENROUTER_API_KEY
-"""
-
-import asyncio
-import sys
-from pathlib import Path
-
-# 添加项目路径
-sys.path.insert(0, str(Path(__file__).parent))
-
-from knowhub.vector_store import MilvusStore
-from knowhub.embeddings import get_embedding, get_embeddings_batch
-
-
-async def test_basic():
-    """测试基本功能"""
-    print("=" * 60)
-    print("测试 1: 初始化 Milvus Lite")
-    print("=" * 60)
-
-    store = MilvusStore(data_dir="./test_milvus_data")
-    print(f"✓ Milvus Lite 初始化成功")
-    print(f"  当前知识数量: {store.count()}")
-
-    print("\n" + "=" * 60)
-    print("测试 2: 生成 Embedding")
-    print("=" * 60)
-
-    text = "如何使用 Python 读取 PDF 文件"
-    embedding = await get_embedding(text)
-    print(f"✓ 单条 embedding 生成成功")
-    print(f"  文本: {text}")
-    print(f"  向量维度: {len(embedding)}")
-
-    texts = ["测试文本1", "测试文本2", "测试文本3"]
-    embeddings = await get_embeddings_batch(texts)
-    print(f"✓ 批量 embedding 生成成功")
-    print(f"  文本数量: {len(texts)}")
-    print(f"  向量数量: {len(embeddings)}")
-
-    print("\n" + "=" * 60)
-    print("测试 3: 插入知识")
-    print("=" * 60)
-
-    import time
-    knowledge = {
-        "id": "test-001",
-        "embedding": embedding,
-        "message_id": "",
-        "task": "读取 PDF 文件",
-        "content": "使用 pymupdf 库可以高效读取 PDF 文件内容",
-        "types": ["tool"],
-        "tags": {"category": "file_processing"},
-        "scopes": ["org:test"],
-        "owner": "test_user",
-        "resource_ids": [],
-        "source": {"name": "test"},
-        "eval": {"score": 4, "helpful": 0, "harmful": 0},
-        "created_at": int(time.time()),
-        "updated_at": int(time.time()),
-    }
-
-    store.insert(knowledge)
-    print(f"✓ 知识插入成功")
-    print(f"  ID: {knowledge['id']}")
-    print(f"  当前知识数量: {store.count()}")
-
-    print("\n" + "=" * 60)
-    print("测试 4: 查询知识")
-    print("=" * 60)
-
-    result = store.get_by_id("test-001")
-    print(f"✓ 按 ID 查询成功")
-    print(f"  Task: {result['task']}")
-    print(f"  Content: {result['content']}")
-
-    print("\n" + "=" * 60)
-    print("测试 5: 向量检索")
-    print("=" * 60)
-
-    query_text = "怎么处理 PDF"
-    query_embedding = await get_embedding(query_text)
-    results = store.search(query_embedding, limit=5)
-    print(f"✓ 向量检索成功")
-    print(f"  查询: {query_text}")
-    print(f"  结果数量: {len(results)}")
-    if results:
-        print(f"  Top 1: {results[0]['task']}")
-
-    print("\n" + "=" * 60)
-    print("测试 6: 更新知识")
-    print("=" * 60)
-
-    store.update("test-001", {"content": "使用 pymupdf 库(推荐)或 PyPDF2 库读取 PDF"})
-    updated = store.get_by_id("test-001")
-    print(f"✓ 知识更新成功")
-    print(f"  新内容: {updated['content']}")
-
-    print("\n" + "=" * 60)
-    print("测试 7: 删除知识")
-    print("=" * 60)
-
-    store.delete("test-001")
-    deleted = store.get_by_id("test-001")
-    print(f"✓ 知识删除成功")
-    print(f"  删除后查询结果: {deleted}")
-    print(f"  当前知识数量: {store.count()}")
-
-    print("\n" + "=" * 60)
-    print("所有测试通过!")
-    print("=" * 60)
-
-
-if __name__ == "__main__":
-    asyncio.run(test_basic())