module/file.py:function_nameknowhub/docs/decisions.md 另行记录整体可行,无阻塞性问题。
新知识进入 (status=pending)
│
▼
[Step 1] 复用已存储的 embedding(入队时已生成,不重复调用)
│
▼
[Step 2] 向量召回 top-10 相似知识
filter: status == "approved" or status == "checked"
│
▼
[Step 2.5] 相似度预过滤(阈值 0.75)
过滤掉 COSINE score < 0.75 的候选
无候选 → 直接 approved
│
▼
[Step 3] LLM 关系判断(见第五节 Prompt)
LLM 自主判断关系类型和 final_decision
│
┌────┴──────────────────────────────────┐
▼ ▼
final_decision=rejected final_decision=approved
旧知识 helpful+1(记录到 history) 双向写入 relationships
更新关系缓存表
只拒绝 duplicate 和 subset,其他关系两条都保留,并双向写入关系标注。引入关系缓存表管理关系复杂度。
关系类型是开放的,LLM 可以根据实际情况提出新的关系类型,并自行判断对应的处理动作(approved/rejected)。
| type | 含义 | 处理动作 |
|---|---|---|
duplicate |
task 和 content 语义完全相同 | 新知识 rejected,旧知识 helpful+1 |
subset |
task语义一致,新知识信息被旧知识完全覆盖 | 新知识 rejected |
superset |
task语义一致,新知识比旧知识更全面 | 两条都 approved |
conflict |
同一 task 下结论矛盾 | 两条都 approved |
complement |
同一 task 的不同角度,互补 | 两条都 approved |
none |
task 语义不同,或无实质关系(task 不同时必须判定为 none) | 新知识直接 approved,不写入关系 |
| (LLM 自定义) | LLM 发现的其他关系类型 | 由 LLM 自行判断 |
所有关系都是有向的,且双向写入:两条知识的 relationships 字段都会记录对方,但各自记录的是从自己出发的出边。
以 A superset B 为例:
{type: "superset", target: "B"} (A 包含 B){type: "subset", target: "A"} (B 被 A 包含)以 A conflict B 为例:
{type: "conflict", target: "B"}{type: "conflict", target: "A"}final_decision = "rejected":新知识 status=rejected,不写入任何 relationships;遍历 relations,对所有 type 为 duplicate 或 subset 的旧知识 helpful+1,记录到 helpful_historyfinal_decision = "approved":新知识 status=approved;遍历 relations,对所有 type 不是 none 的关系双向写入 relationships,同时更新关系缓存表none:不写入 relationships,不更新缓存表实现位置:knowhub/server.py:RelationCache
独立于知识条目存储,结构如下:
{
"conflict": ["knowledge-A", "knowledge-B", "knowledge-C"],
"superset": ["knowledge-D", "knowledge-E"],
"complement": ["knowledge-F", "knowledge-G"],
"custom_type": ["knowledge-I"]
}
每个关系类型对应一个列表,记录所有参与该关系的知识 ID(不区分方向)。LLM 提出新关系类型时,自动在缓存表中新增对应字段。
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
status |
VARCHAR(20) | "pending" |
入库状态:pending / processing / approved / checked / rejected |
relationships |
JSON | [] |
与其他知识的关系列表 |
| 值 | 含义 | 可被检索 |
|---|---|---|
pending |
刚入队,等待处理 | 否 |
processing |
正在处理(防并发乐观锁) | 否 |
approved |
已通过去重,正式入库 | 是 |
checked |
经人类审核确认 | 是 |
rejected |
被判定为重复,已丢弃 | 否 |
每条记录代表一条出边(从当前知识出发的关系):
[
{
"type": "superset",
"target": "knowledge-20260305-a1b2"
}
]
实现位置:knowhub/server.py:KnowledgeProcessor._apply_decision
{
"helpful_history": [
{
"source": "dedup",
"related_id": "knowledge-20260317-new-xxxx",
"relation_type": "duplicate",
"timestamp": 1710000000
}
]
}
source: "dedup":标识这条反馈来自去重流程related_id:触发这次反馈的新知识 ID(被 rejected 的那条)relation_type:触发反馈的关系类型{
"id": "knowledge-20260317-143022-a1b2",
"embedding": [...],
"message_id": "",
"task": "...",
"content": "...",
"types": ["strategy"],
"tags": {},
"tag_keys": [],
"scopes": ["org:cybertogether"],
"owner": "agent:runner",
"resource_ids": [],
"source": {},
"eval": {
"score": 3,
"helpful": 1,
"harmful": 0,
"confidence": 0.7,
"helpful_history": [],
"harmful_history": []
},
"created_at": 1710000000,
"updated_at": 1710000000,
"status": "pending",
"relationships": []
}
实现位置:knowhub/server.py:KnowledgeProcessor._llm_judge_relations
DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。
【新知识】
Task: {new_task}
Content: {new_content}
【相似知识列表】(向量召回 top-10,按相似度排序)
{existing_list}
格式: [序号] ID: xxx | Task: xxx | Content: xxx
【已知关系类型参考】
- duplicate: task 和 content 语义完全相同,无新增信息
- subset: task语义一致,新知识的content信息完全被某条已有知识覆盖
- superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容
- conflict: 同一 task 下给出相互矛盾的结论
- complement: 描述同一 task 的不同方面,互补
- none: task 语义不同,或无实质关系(task 不同时必须判定为 none,只有 task 语义一致才可能存在其他关系)
**重要**:如果以上类型无法准确描述关系,你可以自定义新的关系类型(英文小写下划线命名),并自行判断新知识应该 approved 还是 rejected。
【输出格式】(严格 JSON,不要其他内容)
{{
"final_decision": "approved",
"relations": [
{{
"old_id": "knowledge-xxx",
"type": "superset",
"reverse_type": "subset"
}}
]
}}
"""
实现位置:knowhub/server.py:KnowledgeProcessor._apply_decision
final_decision: "approved" 或 "rejected"
relations: 关系列表
{"type": type, "target": old_id},同时更新关系缓存表{"type": reverse_type, "target": new_id}POST /api/knowledge
→ 生成 embedding
→ 插入 Milvus (status=pending)
→ 立即返回 {"status": "pending", "knowledge_id": "..."}
→ background_tasks.add_task(processor.process_pending) ← 非阻塞触发
KnowledgeProcessor(后台处理器)
→ 查询所有 status=pending 的知识(每批50条)
→ 逐条处理:pending → processing → approved/rejected
→ asyncio.Lock 防止并发
定时兜底(每60秒)
→ asyncio.create_task(_periodic_processor())
→ 检测超时的 processing 条目(>5分钟)并回滚到 pending
| 场景 | 处理方式 |
|---|---|
| LLM 调用失败 | 重试 2 次,仍失败则 status=approved(宁可放行,不丢数据) |
| LLM 输出无法解析 | 同上,fallback 到 approved |
| 处理超时(>5分钟) | 定时任务检测 processing 状态并回滚到 pending |
| 并发写入相同知识 | processing 状态作为乐观锁,第二个处理器跳过 |
| task 语义不相关(score < 0.75) | 预过滤直接排除,不进入 LLM 判断,视为 none |
| 接口 | 变化 |
|---|---|
POST /api/knowledge |
插入 status=pending,触发后台任务,立即返回 pending 状态 |
POST /api/extract |
批量插入时每条 status=pending,插入后触发后台任务 |
POST /api/knowledge/slim |
重建知识时显式传入 status=approved,跳过去重(已精炼知识) |
GET /api/knowledge |
追加 status in ["approved", "checked"] 过滤 |
GET /api/knowledge/search |
追加 status in ["approved", "checked"] 过滤 |
POST /api/knowledge/migrate |
新增:手动触发 schema 迁移(中转 collection 模式),返回迁移条数 |
GET /api/knowledge/pending |
新增:查询待处理队列 |
POST /api/knowledge/process |
新增:手动触发处理,force=true 可回滚卡死的 processing 条目 |
GET /api/knowledge/status/{id} |
新增:查询单条知识的处理状态和关系 |
{
"status": "pending",
"knowledge_id": "knowledge-20260317-143022-a1b2",
"message": "知识已入队,正在处理去重..."
}
migrate_knowledge.py:历史数据迁移,迁移的是已存在的知识,插入时显式传入 status="approved",relationships=[],跳过去重流程。
# status 过滤(高效,建议加 Trie 索引)
'status == "approved"'
'status == "pending" or status == "processing"'
# relationships 非空(Milvus 2.3+ JSON 查询)
'json_length(relationships) > 0'
正向查询(从知识 A 查询它的所有关系):直接读取 A 的 relationships 字段,O(1)。
反向查询(查询"哪些知识与 A 有 conflict 关系"):通过关系缓存表实现,无需全表扫描。
复杂查询(查询"所有存在 conflict 关系的知识对"):直接读取关系缓存表的 conflict 字段。
| 查询类型 | 方案 | 性能 |
|---|---|---|
| status 过滤 | Milvus Trie 索引 | 极快 |
| 向量召回 + status 过滤 | HNSW + 标量过滤 | 快(现有机制) |
| relationships 正向读取 | 直接读 JSON 字段 | O(1) |
| relationships 反向/复杂查询 | 关系缓存表 | O(1) |
| 文件 | 修改内容 |
|---|---|
knowhub/vector_store.py |
新增 status/relationships 字段;更新所有 output_fields;为 status 添加 Trie 索引 |
knowhub/server.py |
新增 KnowledgeProcessor 类(~200行);改造 save_knowledge / extract_knowledge_from_messages;改造 list/search 追加 status 过滤;新增 3 个接口;更新 KnowledgeIn 模型;实现关系缓存表管理 |
migrate_knowledge.py |
插入时显式传入 status="approved",relationships=[] |
Phase 1 — Schema 扩展(knowhub/vector_store.py)
Phase 2 — 处理器核心逻辑(knowhub/server.py)
KnowledgeProcessor 类_llm_judge_relations 方法(使用上面的 Prompt)_apply_decision 方法(写入 status 和 relationships,同步更新关系缓存表)lifespan 中初始化处理器实例 + 启动定时任务Phase 3 — API 改造(knowhub/server.py)
POST /api/knowledge:status=pending,触发后台任务GET /api/knowledge 和 GET /api/knowledge/search:追加 status 过滤Milvus Lite 不支持 ALTER COLLECTION 和 rename_collection,采用软兼容 + 手动触发迁移接口策略:
.get("status", "approved") / .get("relationships", []) or [] 兼容旧数据,旧数据被视为 approved,不影响检索和去重逻辑POST /api/knowledge/migrate):采用"中转 collection"模式(Milvus Lite 不支持 rename):
knowledge_migration(新 schema)knowledge 逐条读取,补 status="approved", relationships=[],插入 knowledge_migrationknowledgeknowledge(新 schema,空)knowledge_migration 逐条读取,插入 knowledgeknowledge_migrationself.collection 引用实现位置:knowhub/vector_store.py:MilvusStore.migrate_schema