Browse Source

feat: add tag_keys array field for efficient tag filtering

guantao 3 giờ trước cách đây
mục cha
commit
1b10d0f3b2
3 tập tin đã thay đổi với 23 bổ sung19 xóa
  1. 17 16
      knowhub/server.py
  2. 5 3
      knowhub/vector_store.py
  3. 1 0
      migrate_knowledge.py

+ 17 - 16
knowhub/server.py

@@ -689,6 +689,9 @@ async def save_knowledge(knowledge: KnowledgeIn):
         # 生成向量(只基于 task,因为搜索时用户描述的是任务场景)
         embedding = await get_embedding(knowledge.task)
 
+        # 提取 tag keys(用于高效筛选)
+        tag_keys = list(knowledge.tags.keys()) if isinstance(knowledge.tags, dict) else []
+
         # 准备插入数据
         insert_data = {
             "id": knowledge_id,
@@ -698,6 +701,7 @@ async def save_knowledge(knowledge: KnowledgeIn):
             "content": knowledge.content,
             "types": knowledge.types,
             "tags": knowledge.tags,
+            "tag_keys": tag_keys,
             "scopes": knowledge.scopes,
             "owner": owner,
             "resource_ids": knowledge.resource_ids,
@@ -729,7 +733,7 @@ def list_knowledge(
 ):
     """列出知识(支持后端筛选)"""
     try:
-        # 构建过滤表达式(不包含 tags,因为 Milvus 不支持 JSON key 存在性检查)
+        # 构建过滤表达式
         filters = []
 
         # types 支持多个,用 AND 连接(交集:必须同时包含所有选中的type)
@@ -744,28 +748,21 @@ def list_knowledge(
         if owner:
             filters.append(f'owner like "%{owner}%"')
 
+        # tags 支持多个,用 AND 连接(使用 tag_keys 数组进行高效筛选)
+        if tags:
+            tag_list = [t.strip() for t in tags.split(',') if t.strip()]
+            for t in tag_list:
+                filters.append(f'array_contains(tag_keys, "{t}")')
+
         # 如果没有过滤条件,查询所有
         filter_expr = ' and '.join(filters) if filters else 'id != ""'
 
-        # 查询 Milvus(使用更大的 limit 以便后续过滤)
-        query_limit = limit * 10 if tags else limit
-        results = milvus_store.query(filter_expr, limit=query_limit)
+        # 查询 Milvus
+        results = milvus_store.query(filter_expr, limit=limit)
 
         # 转换为可序列化的格式
         serialized_results = [serialize_milvus_result(r) for r in results]
 
-        # 在应用层进行 tags 过滤(因为 Milvus 不支持 JSON key 存在性检查)
-        if tags:
-            tag_list = [t.strip() for t in tags.split(',') if t.strip()]
-            filtered_results = []
-            for item in serialized_results:
-                tags_dict = item.get("tags", {})
-                if isinstance(tags_dict, dict):
-                    # 检查是否包含所有指定的 tag keys
-                    if all(tag_key in tags_dict for tag_key in tag_list):
-                        filtered_results.append(item)
-            serialized_results = filtered_results[:limit]
-
         return {"results": serialized_results, "count": len(serialized_results)}
 
     except Exception as e:
@@ -930,6 +927,8 @@ async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
 
         if patch.tags is not None:
             updates["tags"] = patch.tags
+            # 同时更新 tag_keys
+            updates["tag_keys"] = list(patch.tags.keys()) if isinstance(patch.tags, dict) else []
 
         if patch.scopes is not None:
             updates["scopes"] = patch.scopes
@@ -1183,6 +1182,7 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
                 "content": e["content"],
                 "types": e["types"],
                 "tags": {},
+                "tag_keys": [],
                 "scopes": ["org:cybertogether"],
                 "owner": "agent:slim",
                 "resource_ids": [],
@@ -1343,6 +1343,7 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
                 "content": knowledge_content,
                 "types": types,
                 "tags": {},
+                "tag_keys": [],
                 "scopes": ["org:cybertogether"],
                 "owner": extract_req.submitted_by,
                 "resource_ids": [],

+ 5 - 3
knowhub/vector_store.py

@@ -87,6 +87,8 @@ class MilvusStore:
                 FieldSchema(name="types", dtype=DataType.ARRAY,
                            element_type=DataType.VARCHAR, max_capacity=20, max_length=50),
                 FieldSchema(name="tags", dtype=DataType.JSON),
+                FieldSchema(name="tag_keys", dtype=DataType.ARRAY,
+                           element_type=DataType.VARCHAR, max_capacity=50, max_length=100),
                 FieldSchema(name="scopes", dtype=DataType.ARRAY,
                            element_type=DataType.VARCHAR, max_capacity=20, max_length=100),
                 FieldSchema(name="owner", dtype=DataType.VARCHAR,
@@ -158,7 +160,7 @@ class MilvusStore:
             limit=limit,
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"]
         )
 
@@ -181,7 +183,7 @@ class MilvusStore:
         results = self.collection.query(
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"],
             limit=limit
         )
@@ -200,7 +202,7 @@ class MilvusStore:
         results = self.collection.query(
             expr=f'id == "{knowledge_id}"',
             output_fields=["id", "message_id", "task", "content", "types",
-                          "tags", "scopes", "owner", "resource_ids",
+                          "tags", "tag_keys", "scopes", "owner", "resource_ids",
                           "source", "eval", "created_at", "updated_at"]
         )
         return results[0] if results else None

+ 1 - 0
migrate_knowledge.py

@@ -98,6 +98,7 @@ async def migrate_knowledge():
                 "content": row['content'],
                 "types": types,
                 "tags": tags,
+                "tag_keys": list(tags.keys()) if isinstance(tags, dict) else [],
                 "scopes": scopes,
                 "owner": row['owner'] or "agent:unknown",
                 "resource_ids": resource_ids,