dedup-design.md 15 KB

知识入库前智能去重与关系判断系统 — 设计文档

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现;除非改动较小、不被文档涵盖
  2. 文档分层,链接代码 - 重要或复杂设计可以另有详细文档;关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录最重要的、与代码准确对应的或者明确的已完成的设计的信息,避免推测、建议、决策历史、修改日志、大量代码;决策依据或修改日志若有必要,可在 knowhub/docs/decisions.md 另行记录

可行性结论

整体可行,无阻塞性问题。


一、去重流程

新知识进入 (status=pending)
         │
         ▼
[Step 1] 复用已存储的 embedding(入队时已生成,不重复调用)
         │
         ▼
[Step 2] 向量召回 top-10 相似知识
         filter: status == "approved" or status == "checked"
         │
         ▼
[Step 2.5] 相似度预过滤(阈值 0.75)
         过滤掉 COSINE score < 0.75 的候选
         无候选 → 直接 approved
         │
         ▼
[Step 3] LLM 关系判断(见第五节 Prompt)
         LLM 自主判断关系类型和 final_decision
         │
    ┌────┴──────────────────────────────────┐
    ▼                                       ▼
final_decision=rejected              final_decision=approved
旧知识 helpful+1(记录到 history)    双向写入 relationships
                                     更新关系缓存表

二、同 task 下多条知识的处理策略

只拒绝 duplicatesubset,其他关系两条都保留,并双向写入关系标注。引入关系缓存表管理关系复杂度。


三、关系类型定义与保存方式

关系类型

关系类型是开放的,LLM 可以根据实际情况提出新的关系类型,并自行判断对应的处理动作(approved/rejected)。

type 含义 处理动作
duplicate task 和 content 语义完全相同 新知识 rejected,旧知识 helpful+1
subset task语义一致,新知识信息被旧知识完全覆盖 新知识 rejected
superset task语义一致,新知识比旧知识更全面 两条都 approved
conflict 同一 task 下结论矛盾 两条都 approved
complement 同一 task 的不同角度,互补 两条都 approved
none task 语义不同,或无实质关系(task 不同时必须判定为 none 新知识直接 approved,不写入关系
(LLM 自定义) LLM 发现的其他关系类型 由 LLM 自行判断

关系的方向性与双向标注

所有关系都是有向的,且双向写入:两条知识的 relationships 字段都会记录对方,但各自记录的是从自己出发的出边

以 A superset B 为例:

  • A 的 relationships 追加:{type: "superset", target: "B"} (A 包含 B)
  • B 的 relationships 追加:{type: "subset", target: "A"} (B 被 A 包含)

以 A conflict B 为例:

  • A 的 relationships 追加:{type: "conflict", target: "B"}
  • B 的 relationships 追加:{type: "conflict", target: "A"}

写入规则

  • final_decision = "rejected":新知识 status=rejected,不写入任何 relationships;遍历 relations,对所有 type 为 duplicatesubset 的旧知识 helpful+1,记录到 helpful_history
  • final_decision = "approved":新知识 status=approved;遍历 relations,对所有 type 不是 none 的关系双向写入 relationships,同时更新关系缓存表
  • none:不写入 relationships,不更新缓存表

关系缓存表

实现位置:knowhub/server.py:RelationCache

独立于知识条目存储,结构如下:

{
  "conflict":    ["knowledge-A", "knowledge-B", "knowledge-C"],
  "superset":    ["knowledge-D", "knowledge-E"],
  "complement":  ["knowledge-F", "knowledge-G"],
  "custom_type": ["knowledge-I"]
}

每个关系类型对应一个列表,记录所有参与该关系的知识 ID(不区分方向)。LLM 提出新关系类型时,自动在缓存表中新增对应字段。


四、更新后的知识条目数据结构

新增 2 个字段

字段 类型 默认值 说明
status VARCHAR(20) "pending" 入库状态:pending / processing / approved / checked / rejected
relationships JSON [] 与其他知识的关系列表

status 字段语义

含义 可被检索
pending 刚入队,等待处理
processing 正在处理(防并发乐观锁)
approved 已通过去重,正式入库
checked 经人类审核确认
rejected 被判定为重复,已丢弃

relationships 字段结构

每条记录代表一条出边(从当前知识出发的关系):

[
  {
    "type": "superset",
    "target": "knowledge-20260305-a1b2"
  }
]

helpful_history / harmful_history 格式

实现位置:knowhub/server.py:KnowledgeProcessor._apply_decision

{
  "helpful_history": [
    {
      "source": "dedup",
      "related_id": "knowledge-20260317-new-xxxx",
      "relation_type": "duplicate",
      "timestamp": 1710000000
    }
  ]
}
  • source: "dedup":标识这条反馈来自去重流程
  • related_id:触发这次反馈的新知识 ID(被 rejected 的那条)
  • relation_type:触发反馈的关系类型

完整知识条目结构

{
  "id": "knowledge-20260317-143022-a1b2",
  "embedding": [...],
  "message_id": "",
  "task": "...",
  "content": "...",
  "types": ["strategy"],
  "tags": {},
  "tag_keys": [],
  "scopes": ["org:cybertogether"],
  "owner": "agent:runner",
  "resource_ids": [],
  "source": {},
  "eval": {
    "score": 3,
    "helpful": 1,
    "harmful": 0,
    "confidence": 0.7,
    "helpful_history": [],
    "harmful_history": []
  },
  "created_at": 1710000000,
  "updated_at": 1710000000,
  "status": "pending",
  "relationships": []
}

五、LLM 关系判断 Prompt

实现位置:knowhub/server.py:KnowledgeProcessor._llm_judge_relations

DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。

【新知识】
Task: {new_task}
Content: {new_content}

【相似知识列表】(向量召回 top-10,按相似度排序)
{existing_list}
格式: [序号] ID: xxx | Task: xxx | Content: xxx

【已知关系类型参考】
- duplicate: task 和 content 语义完全相同,无新增信息
- subset: task语义一致,新知识的content信息完全被某条已有知识覆盖
- superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容
- conflict: 同一 task 下给出相互矛盾的结论
- complement: 描述同一 task 的不同方面,互补
- none: task 语义不同,或无实质关系(task 不同时必须判定为 none,只有 task 语义一致才可能存在其他关系)

**重要**:如果以上类型无法准确描述关系,你可以自定义新的关系类型(英文小写下划线命名),并自行判断新知识应该 approved 还是 rejected。

【输出格式】(严格 JSON,不要其他内容)
{{
  "final_decision": "approved",
  "relations": [
    {{
      "old_id": "knowledge-xxx",
      "type": "superset",
      "reverse_type": "subset"
    }}
  ]
}}

"""

LLM 输出字段的处理逻辑

实现位置:knowhub/server.py:KnowledgeProcessor._apply_decision

final_decision: "approved" 或 "rejected"

  • 用途:设置新知识的 status 字段
  • 只要 relations 中有任意一条 type 为 duplicate 或 subset,LLM 应输出 rejected

relations: 关系列表

  • old_id: 旧知识 ID
    • 用途:定位需要更新的旧知识记录
  • type: 从新知识指向旧知识的关系类型
    • 如果 final_decision="rejected":仅对 type="duplicate" 或 type="subset" 的旧知识 eval.helpful +1,写入 helpful_history;其余关系忽略
    • 如果 final_decision="approved" 且 type 不是 "none":新知识的 relationships 追加 {"type": type, "target": old_id},同时更新关系缓存表
  • reverse_type: 从旧知识指向新知识的反向关系类型
    • 仅在 final_decision="approved" 且 reverse_type 不是 "none" 时:旧知识的 relationships 追加 {"type": reverse_type, "target": new_id}

六、异步处理架构

整体架构

POST /api/knowledge
  → 生成 embedding
  → 插入 Milvus (status=pending)
  → 立即返回 {"status": "pending", "knowledge_id": "..."}
  → background_tasks.add_task(processor.process_pending)  ← 非阻塞触发

KnowledgeProcessor(后台处理器)
  → 查询所有 status=pending 的知识(每批50条)
  → 逐条处理:pending → processing → approved/rejected
  → asyncio.Lock 防止并发

定时兜底(每60秒)
  → asyncio.create_task(_periodic_processor())
  → 检测超时的 processing 条目(>5分钟)并回滚到 pending

错误处理策略

场景 处理方式
LLM 调用失败 重试 2 次,仍失败则 status=approved(宁可放行,不丢数据)
LLM 输出无法解析 同上,fallback 到 approved
处理超时(>5分钟) 定时任务检测 processing 状态并回滚到 pending
并发写入相同知识 processing 状态作为乐观锁,第二个处理器跳过
task 语义不相关(score < 0.75) 预过滤直接排除,不进入 LLM 判断,视为 none

七、API 接口设计

新增/改造接口

接口 变化
POST /api/knowledge 插入 status=pending,触发后台任务,立即返回 pending 状态
POST /api/extract 批量插入时每条 status=pending,插入后触发后台任务
POST /api/knowledge/slim 重建知识时显式传入 status=approved,跳过去重(已精炼知识)
GET /api/knowledge 追加 status in ["approved", "checked"] 过滤
GET /api/knowledge/search 追加 status in ["approved", "checked"] 过滤
POST /api/knowledge/migrate 新增:手动触发 schema 迁移(中转 collection 模式),返回迁移条数
GET /api/knowledge/pending 新增:查询待处理队列
POST /api/knowledge/process 新增:手动触发处理,force=true 可回滚卡死的 processing 条目
GET /api/knowledge/status/{id} 新增:查询单条知识的处理状态和关系

POST /api/knowledge 响应变化

{
  "status": "pending",
  "knowledge_id": "knowledge-20260317-143022-a1b2",
  "message": "知识已入队,正在处理去重..."
}

迁移脚本处理

migrate_knowledge.py:历史数据迁移,迁移的是已存在的知识,插入时显式传入 status="approved"relationships=[],跳过去重流程。


八、Milvus 关系筛选可行性

可行的查询

# status 过滤(高效,建议加 Trie 索引)
'status == "approved"'
'status == "pending" or status == "processing"'

# relationships 非空(Milvus 2.3+ JSON 查询)
'json_length(relationships) > 0'

关系查询方案

正向查询(从知识 A 查询它的所有关系):直接读取 A 的 relationships 字段,O(1)。

反向查询(查询"哪些知识与 A 有 conflict 关系"):通过关系缓存表实现,无需全表扫描。

复杂查询(查询"所有存在 conflict 关系的知识对"):直接读取关系缓存表的 conflict 字段。

性能评估

查询类型 方案 性能
status 过滤 Milvus Trie 索引 极快
向量召回 + status 过滤 HNSW + 标量过滤 快(现有机制)
relationships 正向读取 直接读 JSON 字段 O(1)
relationships 反向/复杂查询 关系缓存表 O(1)

九、实现步骤与文件清单

关键文件修改清单

文件 修改内容
knowhub/vector_store.py 新增 status/relationships 字段;更新所有 output_fields;为 status 添加 Trie 索引
knowhub/server.py 新增 KnowledgeProcessor 类(~200行);改造 save_knowledge / extract_knowledge_from_messages;改造 list/search 追加 status 过滤;新增 3 个接口;更新 KnowledgeIn 模型;实现关系缓存表管理
migrate_knowledge.py 插入时显式传入 status="approved"relationships=[]

实现阶段

Phase 1 — Schema 扩展knowhub/vector_store.py

  1. 新增 2 个字段:status、relationships
  2. 更新 search/query/get_by_id 的 output_fields
  3. 为 status 添加 Trie 标量索引
  4. 初始化关系缓存表存储

Phase 2 — 处理器核心逻辑knowhub/server.py

  1. 实现 KnowledgeProcessor
  2. 实现 _llm_judge_relations 方法(使用上面的 Prompt)
  3. 实现 _apply_decision 方法(写入 status 和 relationships,同步更新关系缓存表)
  4. lifespan 中初始化处理器实例 + 启动定时任务
  5. 实现关系缓存表的读写接口

Phase 3 — API 改造knowhub/server.py

  1. 改造 POST /api/knowledge:status=pending,触发后台任务
  2. 改造 GET /api/knowledgeGET /api/knowledge/search:追加 status 过滤
  3. 新增 3 个接口:pending / process / status/{id}

数据迁移方案

Milvus Lite 不支持 ALTER COLLECTION 和 rename_collection,采用软兼容 + 手动触发迁移接口策略:

  • 平时(软兼容):读取时用 .get("status", "approved") / .get("relationships", []) or [] 兼容旧数据,旧数据被视为 approved,不影响检索和去重逻辑
  • 迁移(手动触发 POST /api/knowledge/migrate:采用"中转 collection"模式(Milvus Lite 不支持 rename):
    1. 创建 knowledge_migration(新 schema)
    2. knowledge 逐条读取,补 status="approved", relationships=[],插入 knowledge_migration
    3. drop knowledge
    4. 创建 knowledge(新 schema,空)
    5. knowledge_migration 逐条读取,插入 knowledge
    6. drop knowledge_migration
    7. 更新 self.collection 引用

实现位置:knowhub/vector_store.py:MilvusStore.migrate_schema