# 知识入库前智能去重与关系判断系统 — 设计文档 ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖 1. **文档分层,链接代码** - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议、决策历史、修改日志、大量代码;决策依据或修改日志若有必要,可在 `knowhub/docs/decisions.md` 另行记录 --- ## 可行性结论 **整体可行,无阻塞性问题。** --- ## 一、去重流程 ``` 新知识进入 (status=pending) │ ▼ [Step 1] 复用已存储的 embedding(入队时已生成,不重复调用) │ ▼ [Step 2] 向量召回 top-10 相似知识 filter: status == "approved" or status == "checked" │ ▼ [Step 2.5] 相似度预过滤(阈值 0.75) 过滤掉 COSINE score < 0.75 的候选 无候选 → 直接 approved │ ▼ [Step 3] LLM 关系判断(见第五节 Prompt) LLM 自主判断关系类型和 final_decision │ ┌────┴──────────────────────────────────┐ ▼ ▼ final_decision=rejected final_decision=approved 旧知识 helpful+1(记录到 history) 双向写入 relationships 更新关系缓存表 ``` --- ## 二、同 task 下多条知识的处理策略 只拒绝 `duplicate` 和 `subset`,其他关系两条都保留,并**双向写入**关系标注。引入**关系缓存表**管理关系复杂度。 --- ## 三、关系类型定义与保存方式 ### 关系类型 关系类型是**开放的**,LLM 可以根据实际情况提出新的关系类型,并自行判断对应的处理动作(approved/rejected)。 | type | 含义 | 处理动作 | |---|------------------------------------------|---| | `duplicate` | task 和 content 语义完全相同 | 新知识 **rejected**,旧知识 helpful+1 | | `subset` | task语义一致,新知识信息被旧知识完全覆盖 | 新知识 **rejected** | | `superset` | task语义一致,新知识比旧知识更全面 | 两条都 **approved** | | `conflict` | 同一 task 下结论矛盾 | 两条都 **approved** | | `complement` | 同一 task 的不同角度,互补 | 两条都 **approved** | | `none` | task 语义不同,或无实质关系(**task 不同时必须判定为 none**) | 新知识直接 **approved**,不写入关系 | | *(LLM 自定义)* | LLM 发现的其他关系类型 | 由 LLM 自行判断 | ### 关系的方向性与双向标注 所有关系都是**有向的**,且**双向写入**:两条知识的 `relationships` 字段都会记录对方,但各自记录的是**从自己出发的出边**。 以 A superset B 为例: - A 的 relationships 追加:`{type: "superset", target: "B"}` (A 包含 B) - B 的 relationships 追加:`{type: "subset", target: "A"}` (B 被 A 包含) 以 A conflict B 为例: - A 的 relationships 追加:`{type: "conflict", target: "B"}` - B 的 relationships 追加:`{type: "conflict", target: "A"}` ### 写入规则 - `final_decision = "rejected"`:新知识 status=rejected,**不写入任何 relationships**;遍历 relations,对所有 type 为 `duplicate` 或 `subset` 的旧知识 helpful+1,记录到 helpful_history - `final_decision = "approved"`:新知识 status=approved;遍历 relations,对所有 type 不是 `none` 的关系双向写入 relationships,同时更新关系缓存表 - `none`:不写入 relationships,不更新缓存表 ### 关系缓存表 实现位置:`knowhub/server.py:RelationCache` 独立于知识条目存储,结构如下: ```json { "conflict": ["knowledge-A", "knowledge-B", "knowledge-C"], "superset": ["knowledge-D", "knowledge-E"], "complement": ["knowledge-F", "knowledge-G"], "custom_type": ["knowledge-I"] } ``` 每个关系类型对应一个列表,记录**所有参与该关系的知识 ID**(不区分方向)。LLM 提出新关系类型时,自动在缓存表中新增对应字段。 --- ## 四、更新后的知识条目数据结构 ### 新增 2 个字段 | 字段 | 类型 | 默认值 | 说明 | |---|---|---|---| | `status` | VARCHAR(20) | `"pending"` | 入库状态:pending / processing / approved / checked / rejected | | `relationships` | JSON | `[]` | 与其他知识的关系列表 | ### status 字段语义 | 值 | 含义 | 可被检索 | |---|---|---| | `pending` | 刚入队,等待处理 | 否 | | `processing` | 正在处理(防并发乐观锁) | 否 | | `approved` | 已通过去重,正式入库 | 是 | | `checked` | 经人类审核确认 | 是 | | `rejected` | 被判定为重复,已丢弃 | 否 | ### relationships 字段结构 每条记录代表一条**出边**(从当前知识出发的关系): ```json [ { "type": "superset", "target": "knowledge-20260305-a1b2" } ] ``` ### helpful_history / harmful_history 格式 实现位置:`knowhub/server.py:KnowledgeProcessor._apply_decision` ```json { "helpful_history": [ { "source": "dedup", "related_id": "knowledge-20260317-new-xxxx", "relation_type": "duplicate", "timestamp": 1710000000 } ] } ``` - `source: "dedup"`:标识这条反馈来自去重流程 - `related_id`:触发这次反馈的新知识 ID(被 rejected 的那条) - `relation_type`:触发反馈的关系类型 ### 完整知识条目结构 ```json { "id": "knowledge-20260317-143022-a1b2", "embedding": [...], "message_id": "", "task": "...", "content": "...", "types": ["strategy"], "tags": {}, "tag_keys": [], "scopes": ["org:cybertogether"], "owner": "agent:runner", "resource_ids": [], "source": {}, "eval": { "score": 3, "helpful": 1, "harmful": 0, "confidence": 0.7, "helpful_history": [], "harmful_history": [] }, "created_at": 1710000000, "updated_at": 1710000000, "status": "pending", "relationships": [] } ``` --- ## 五、LLM 关系判断 Prompt 实现位置:`knowhub/server.py:KnowledgeProcessor._llm_judge_relations` ```python DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。 【新知识】 Task: {new_task} Content: {new_content} 【相似知识列表】(向量召回 top-10,按相似度排序) {existing_list} 格式: [序号] ID: xxx | Task: xxx | Content: xxx 【已知关系类型参考】 - duplicate: task 和 content 语义完全相同,无新增信息 - subset: task语义一致,新知识的content信息完全被某条已有知识覆盖 - superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容 - conflict: 同一 task 下给出相互矛盾的结论 - complement: 描述同一 task 的不同方面,互补 - none: task 语义不同,或无实质关系(task 不同时必须判定为 none,只有 task 语义一致才可能存在其他关系) **重要**:如果以上类型无法准确描述关系,你可以自定义新的关系类型(英文小写下划线命名),并自行判断新知识应该 approved 还是 rejected。 【输出格式】(严格 JSON,不要其他内容) {{ "final_decision": "approved", "relations": [ {{ "old_id": "knowledge-xxx", "type": "superset", "reverse_type": "subset" }} ] }} """ ``` ### LLM 输出字段的处理逻辑 实现位置:`knowhub/server.py:KnowledgeProcessor._apply_decision` **final_decision**: "approved" 或 "rejected" - 用途:设置新知识的 status 字段 - 只要 relations 中有任意一条 type 为 duplicate 或 subset,LLM 应输出 rejected **relations**: 关系列表 - **old_id**: 旧知识 ID - 用途:定位需要更新的旧知识记录 - **type**: 从新知识指向旧知识的关系类型 - 如果 final_decision="rejected":仅对 type="duplicate" 或 type="subset" 的旧知识 eval.helpful +1,写入 helpful_history;其余关系忽略 - 如果 final_decision="approved" 且 type 不是 "none":新知识的 relationships 追加 `{"type": type, "target": old_id}`,同时更新关系缓存表 - **reverse_type**: 从旧知识指向新知识的反向关系类型 - 仅在 final_decision="approved" 且 reverse_type 不是 "none" 时:旧知识的 relationships 追加 `{"type": reverse_type, "target": new_id}` --- ## 六、异步处理架构 ### 整体架构 ``` POST /api/knowledge → 生成 embedding → 插入 Milvus (status=pending) → 立即返回 {"status": "pending", "knowledge_id": "..."} → background_tasks.add_task(processor.process_pending) ← 非阻塞触发 KnowledgeProcessor(后台处理器) → 查询所有 status=pending 的知识(每批50条) → 逐条处理:pending → processing → approved/rejected → asyncio.Lock 防止并发 定时兜底(每60秒) → asyncio.create_task(_periodic_processor()) → 检测超时的 processing 条目(>5分钟)并回滚到 pending ``` ### 错误处理策略 | 场景 | 处理方式 | |---|---| | LLM 调用失败 | 重试 2 次,仍失败则 status=approved(宁可放行,不丢数据) | | LLM 输出无法解析 | 同上,fallback 到 approved | | 处理超时(>5分钟) | 定时任务检测 processing 状态并回滚到 pending | | 并发写入相同知识 | processing 状态作为乐观锁,第二个处理器跳过 | | task 语义不相关(score < 0.75) | 预过滤直接排除,不进入 LLM 判断,视为 none | --- ## 七、API 接口设计 ### 新增/改造接口 | 接口 | 变化 | |---|---| | `POST /api/knowledge` | 插入 status=pending,触发后台任务,立即返回 pending 状态 | | `POST /api/extract` | 批量插入时每条 status=pending,插入后触发后台任务 | | `POST /api/knowledge/slim` | 重建知识时显式传入 status=approved,跳过去重(已精炼知识) | | `GET /api/knowledge` | 追加 `status in ["approved", "checked"]` 过滤 | | `GET /api/knowledge/search` | 追加 `status in ["approved", "checked"]` 过滤 | | `POST /api/knowledge/migrate` | **新增**:手动触发 schema 迁移(中转 collection 模式),返回迁移条数 | | `GET /api/knowledge/pending` | **新增**:查询待处理队列 | | `POST /api/knowledge/process` | **新增**:手动触发处理,`force=true` 可回滚卡死的 processing 条目 | | `GET /api/knowledge/status/{id}` | **新增**:查询单条知识的处理状态和关系 | ### POST /api/knowledge 响应变化 ```json { "status": "pending", "knowledge_id": "knowledge-20260317-143022-a1b2", "message": "知识已入队,正在处理去重..." } ``` ### 迁移脚本处理 `migrate_knowledge.py`:历史数据迁移,迁移的是已存在的知识,插入时显式传入 `status="approved"`,`relationships=[]`,跳过去重流程。 --- ## 八、Milvus 关系筛选可行性 ### 可行的查询 ```python # status 过滤(高效,建议加 Trie 索引) 'status == "approved"' 'status == "pending" or status == "processing"' # relationships 非空(Milvus 2.3+ JSON 查询) 'json_length(relationships) > 0' ``` ### 关系查询方案 **正向查询**(从知识 A 查询它的所有关系):直接读取 A 的 `relationships` 字段,O(1)。 **反向查询**(查询"哪些知识与 A 有 conflict 关系"):通过**关系缓存表**实现,无需全表扫描。 **复杂查询**(查询"所有存在 conflict 关系的知识对"):直接读取关系缓存表的 `conflict` 字段。 ### 性能评估 | 查询类型 | 方案 | 性能 | |---|---|---| | status 过滤 | Milvus Trie 索引 | 极快 | | 向量召回 + status 过滤 | HNSW + 标量过滤 | 快(现有机制) | | relationships 正向读取 | 直接读 JSON 字段 | O(1) | | relationships 反向/复杂查询 | 关系缓存表 | O(1) | --- ## 九、实现步骤与文件清单 ### 关键文件修改清单 | 文件 | 修改内容 | | --- | --- | | `knowhub/vector_store.py` | 新增 status/relationships 字段;更新所有 output_fields;为 status 添加 Trie 索引 | | `knowhub/server.py` | 新增 `KnowledgeProcessor` 类(~200行);改造 `save_knowledge` / `extract_knowledge_from_messages`;改造 list/search 追加 status 过滤;新增 3 个接口;更新 `KnowledgeIn` 模型;实现关系缓存表管理 | | `migrate_knowledge.py` | 插入时显式传入 `status="approved"`,`relationships=[]` | ### 实现阶段 **Phase 1 — Schema 扩展**(`knowhub/vector_store.py`) 1. 新增 2 个字段:status、relationships 2. 更新 search/query/get_by_id 的 output_fields 3. 为 status 添加 Trie 标量索引 4. 初始化关系缓存表存储 **Phase 2 — 处理器核心逻辑**(`knowhub/server.py`) 1. 实现 `KnowledgeProcessor` 类 2. 实现 `_llm_judge_relations` 方法(使用上面的 Prompt) 3. 实现 `_apply_decision` 方法(写入 status 和 relationships,同步更新关系缓存表) 4. 在 `lifespan` 中初始化处理器实例 + 启动定时任务 5. 实现关系缓存表的读写接口 **Phase 3 — API 改造**(`knowhub/server.py`) 1. 改造 `POST /api/knowledge`:status=pending,触发后台任务 2. 改造 `GET /api/knowledge` 和 `GET /api/knowledge/search`:追加 status 过滤 3. 新增 3 个接口:pending / process / status/{id} ### 数据迁移方案 Milvus Lite 不支持 ALTER COLLECTION 和 rename_collection,采用**软兼容 + 手动触发迁移接口**策略: - **平时(软兼容)**:读取时用 `.get("status", "approved")` / `.get("relationships", []) or []` 兼容旧数据,旧数据被视为 approved,不影响检索和去重逻辑 - **迁移(手动触发 `POST /api/knowledge/migrate`)**:采用"中转 collection"模式(Milvus Lite 不支持 rename): 1. 创建 `knowledge_migration`(新 schema) 2. 从 `knowledge` 逐条读取,补 `status="approved"`, `relationships=[]`,插入 `knowledge_migration` 3. drop `knowledge` 4. 创建 `knowledge`(新 schema,空) 5. 从 `knowledge_migration` 逐条读取,插入 `knowledge` 6. drop `knowledge_migration` 7. 更新 `self.collection` 引用 实现位置:`knowhub/vector_store.py:MilvusStore.migrate_schema`