elksmmx 11 часов назад
Родитель
Сommit
9e3b43a2ed
3 измененных файлов с 602 добавлено и 17 удалено
  1. 437 11
      knowhub/server.py
  2. 163 6
      knowhub/vector_store.py
  3. 2 0
      migrate_knowledge.py

+ 437 - 11
knowhub/server.py

@@ -19,7 +19,7 @@ from typing import Optional, List
 from pathlib import Path
 from cryptography.hazmat.primitives.ciphers.aead import AESGCM
 
-from fastapi import FastAPI, HTTPException, Query, Header, Body
+from fastapi import FastAPI, HTTPException, Query, Header, Body, BackgroundTasks
 from fastapi.responses import HTMLResponse
 from pydantic import BaseModel, Field
 
@@ -215,6 +215,14 @@ def init_db():
         )
     """)
 
+    conn.execute("""
+        CREATE TABLE IF NOT EXISTS relation_cache (
+            id   INTEGER PRIMARY KEY CHECK(id = 1),
+            data TEXT NOT NULL DEFAULT '{}'
+        )
+    """)
+    conn.execute("INSERT OR IGNORE INTO relation_cache(id, data) VALUES(1, '{}')")
+
     conn.commit()
     conn.close()
 
@@ -323,11 +331,290 @@ class ResourceOut(BaseModel):
     next: Optional[ResourceNode] = None
 
 
+# --- Dedup: Globals & Prompt ---
+
+knowledge_processor: Optional["KnowledgeProcessor"] = None
+
+DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。
+
+【新知识】
+Task: {new_task}
+Content: {new_content}
+
+【相似知识列表】(向量召回 top-10,按相似度排序)
+{existing_list}
+格式: [序号] ID: xxx | Task: xxx | Content: xxx
+
+【关系类型定义】
+- duplicate: task 和 content 语义完全相同,无新增信息 → 新知识应 rejected
+- subset: task语义一致,新知识的content信息完全被某条已有知识覆盖 → 新知识应 rejected
+- superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容 → 新知识应 approved
+- conflict: 同一 task 下给出相互矛盾的结论 → 新知识应 approved
+- complement: 描述同一 task 的不同方面,互补 → 新知识应 approved
+- none: task 语义不同,或无实质关系 → 新知识应 approved,不写入 relations
+
+【判断步骤】
+第一步:逐条比较新知识的 task 与列表中每条知识的 task 语义是否一致。
+- task 语义一致 = 两者描述的是同一个问题或目标(即使措辞不同)
+- task 语义不同 = 描述的是不同的问题、不同的工具、不同的场景
+- 如果 task 语义不同,该条关系直接判定为 none,**不再看 content**
+- 只有 task 语义一致时,才进入第二步比较 content
+
+第二步:对 task 语义一致的知识,比较 content,判断具体关系类型(duplicate/subset/superset/conflict/complement)。
+
+**规则**:
+1. 如果以上类型无法准确描述,可自定义关系类型(英文小写下划线),并自行决定 approved/rejected
+2. final_decision 为 rejected 时,relations 中必须至少有一条关系说明拒绝原因(type 不能为 none)
+
+【输出格式】(严格 JSON,不要其他内容)
+
+示例1 - 无关知识(task 不同):
+{{
+  "final_decision": "approved",
+  "relations": []
+}}
+
+示例2 - 重复知识:
+{{
+  "final_decision": "rejected",
+  "relations": [
+    {{
+      "old_id": "knowledge-xxx",
+      "type": "duplicate",
+      "reverse_type": "duplicate"
+    }}
+  ]
+}}
+
+示例3 - 互补知识:
+{{
+  "final_decision": "approved",
+  "relations": [
+    {{
+      "old_id": "knowledge-xxx",
+      "type": "complement",
+      "reverse_type": "complement"
+    }}
+  ]
+}}
+
+"""
+
+
+# --- Dedup: RelationCache ---
+
+class RelationCache:
+    """关系缓存,存储在 SQLite relation_cache 表(单行 JSON)"""
+
+    def load(self) -> dict:
+        conn = get_db()
+        try:
+            row = conn.execute("SELECT data FROM relation_cache WHERE id=1").fetchone()
+            return json.loads(row["data"]) if row else {}
+        finally:
+            conn.close()
+
+    def save(self, cache: dict):
+        conn = get_db()
+        try:
+            conn.execute("UPDATE relation_cache SET data=? WHERE id=1", (json.dumps(cache),))
+            conn.commit()
+        finally:
+            conn.close()
+
+    def add_relation(self, relation_type: str, knowledge_id: str):
+        cache = self.load()
+        if relation_type not in cache:
+            cache[relation_type] = []
+        if knowledge_id not in cache[relation_type]:
+            cache[relation_type].append(knowledge_id)
+        self.save(cache)
+
+
+# --- Dedup: KnowledgeProcessor ---
+
+class KnowledgeProcessor:
+    def __init__(self):
+        self._lock = asyncio.Lock()
+        self._relation_cache = RelationCache()
+
+    async def process_pending(self):
+        """持续处理 pending 知识直到队列为空,有锁防并发"""
+        if self._lock.locked():
+            return
+        async with self._lock:
+            while True:
+                try:
+                    pending = milvus_store.query('status == "pending"', limit=50)
+                except Exception as e:
+                    print(f"[KnowledgeProcessor] 查询 pending 失败: {e}")
+                    break
+                if not pending:
+                    break
+                for knowledge in pending:
+                    await self._process_one(knowledge)
+
+    async def _process_one(self, knowledge: dict):
+        kid = knowledge["id"]
+        now = int(time.time())
+        # 乐观锁:pending → processing(时间戳存秒级)
+        try:
+            milvus_store.update(kid, {"status": "processing", "updated_at": now})
+        except Exception as e:
+            print(f"[KnowledgeProcessor] 锁定 {kid} 失败: {e}")
+            return
+        try:
+            # 向量召回 top-10(只召回 approved/checked)
+            embedding = knowledge.get("embedding")
+            if not embedding:
+                embedding = await get_embedding(knowledge["task"])
+            candidates = milvus_store.search(
+                query_embedding=embedding,
+                filters='(status == "approved" or status == "checked")',
+                limit=10
+            )
+            candidates = [c for c in candidates if c["id"] != kid]
+
+            if not candidates:
+                milvus_store.update(kid, {"status": "approved", "updated_at": now})
+                return
+
+            llm_result = await self._llm_judge_relations(knowledge, candidates)
+            await self._apply_decision(knowledge, llm_result)
+
+        except Exception as e:
+            print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},fallback 到 approved")
+            try:
+                milvus_store.update(kid, {"status": "approved", "updated_at": int(time.time())})
+            except Exception:
+                pass
+
+    async def _llm_judge_relations(self, new_knowledge: dict, candidates: list) -> dict:
+        existing_list = "\n".join([
+            f"[{i+1}] ID: {c['id']} | Task: {c['task']} | Content: {c['content'][:300]}"
+            for i, c in enumerate(candidates)
+        ])
+        prompt = DEDUP_RELATION_PROMPT.format(
+            new_task=new_knowledge["task"],
+            new_content=new_knowledge["content"],
+            existing_list=existing_list
+        )
+        for attempt in range(3):
+            try:
+                response = await openrouter_llm_call(
+                    messages=[{"role": "user", "content": prompt}],
+                    model="google/gemini-2.5-flash-lite"
+                )
+                content = response.get("content", "").strip()
+                # 清理 markdown 代码块
+                if "```" in content:
+                    parts = content.split("```")
+                    for part in parts:
+                        part = part.strip()
+                        if part.startswith("json"):
+                            part = part[4:].strip()
+                        try:
+                            result = json.loads(part)
+                            if "final_decision" in result:
+                                content = part
+                                break
+                        except Exception:
+                            continue
+                result = json.loads(content)
+                assert result.get("final_decision") in ("approved", "rejected")
+                return result
+            except Exception as e:
+                print(f"[LLM Judge] 第{attempt+1}次失败: {e}")
+                if attempt < 2:
+                    await asyncio.sleep(1)
+        return {"final_decision": "approved", "relations": []}
+
+    async def _apply_decision(self, new_knowledge: dict, llm_result: dict):
+        kid = new_knowledge["id"]
+        final_decision = llm_result.get("final_decision", "approved")
+        relations = llm_result.get("relations", [])
+        now = int(time.time())
+
+        # 强制规则:如果存在 duplicate 或 subset 关系,必须 rejected
+        if any(rel.get("type") in ("duplicate", "subset") for rel in relations):
+            final_decision = "rejected"
+
+        if final_decision == "rejected":
+            milvus_store.update(kid, {"status": "rejected", "updated_at": now})
+            for rel in relations:
+                if rel.get("type") in ("duplicate", "subset"):
+                    old_id = rel.get("old_id")
+                    if not old_id:
+                        continue
+                    try:
+                        old = milvus_store.get_by_id(old_id)
+                        if not old:
+                            continue
+                        eval_data = old.get("eval") or {}
+                        eval_data["helpful"] = eval_data.get("helpful", 0) + 1
+                        helpful_history = eval_data.get("helpful_history") or []
+                        helpful_history.append({
+                            "source": "dedup",
+                            "related_id": kid,
+                            "relation_type": rel["type"],
+                            "timestamp": now
+                        })
+                        eval_data["helpful_history"] = helpful_history
+                        milvus_store.update(old_id, {"eval": eval_data, "updated_at": now})
+                    except Exception as e:
+                        print(f"[Apply Decision] 更新旧知识 {old_id} helpful 失败: {e}")
+        else:
+            new_relationships = []
+            for rel in relations:
+                rel_type = rel.get("type", "none")
+                reverse_type = rel.get("reverse_type", "none")
+                old_id = rel.get("old_id")
+                if not old_id or rel_type == "none":
+                    continue
+                new_relationships.append({"type": rel_type, "target": old_id})
+                self._relation_cache.add_relation(rel_type, kid)
+                self._relation_cache.add_relation(rel_type, old_id)
+                if reverse_type and reverse_type != "none":
+                    try:
+                        old = milvus_store.get_by_id(old_id)
+                        if old:
+                            old_rels = old.get("relationships") or []
+                            old_rels.append({"type": reverse_type, "target": kid})
+                            milvus_store.update(old_id, {"relationships": json.dumps(old_rels), "updated_at": now})
+                            self._relation_cache.add_relation(reverse_type, old_id)
+                            self._relation_cache.add_relation(reverse_type, kid)
+                    except Exception as e:
+                        print(f"[Apply Decision] 更新旧知识关系 {old_id} 失败: {e}")
+            milvus_store.update(kid, {
+                "status": "approved",
+                "relationships": json.dumps(new_relationships),
+                "updated_at": now
+            })
+
+
+async def _periodic_processor():
+    """每60秒检测超时 processing 条目(>5分钟)并回滚到 pending"""
+    while True:
+        await asyncio.sleep(60)
+        try:
+            timeout_threshold = int(time.time()) - 300
+            processing = milvus_store.query('status == "processing"', limit=200)
+            for item in processing:
+                updated_at = item.get("updated_at", 0) or 0
+                # query 返回的 updated_at 已乘以 1000(毫秒),转回秒
+                updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
+                if updated_at_sec < timeout_threshold:
+                    print(f"[Periodic] 回滚超时 processing: {item['id']}")
+                    milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
+        except Exception as e:
+            print(f"[Periodic] 定时任务错误: {e}")
+
+
 # --- App ---
 
 @asynccontextmanager
 async def lifespan(app: FastAPI):
-    global milvus_store
+    global milvus_store, knowledge_processor
 
     # 初始化 SQLite(resources)
     init_db()
@@ -335,9 +622,18 @@ async def lifespan(app: FastAPI):
     # 初始化 Milvus Lite(knowledge)
     milvus_store = MilvusStore(data_dir=str(MILVUS_DATA_DIR))
 
+    # 初始化去重处理器 + 启动定时兜底任务
+    knowledge_processor = KnowledgeProcessor()
+    periodic_task = asyncio.create_task(_periodic_processor())
+
     yield
 
-    # 清理(Milvus Lite 会自动处理)
+    # 清理
+    periodic_task.cancel()
+    try:
+        await periodic_task
+    except asyncio.CancelledError:
+        pass
 
 
 app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
@@ -623,6 +919,9 @@ async def search_knowledge_api(
         # 添加 min_score 过滤
         filters.append(f'eval["score"] >= {min_score}')
 
+        # 只搜索 approved 和 checked 的知识
+        filters.append('(status == "approved" or status == "checked")')
+
         filter_expr = ' and '.join(filters) if filters else None
 
         # 3. 向量召回(3*k 个候选)
@@ -658,7 +957,7 @@ async def search_knowledge_api(
 
 
 @app.post("/api/knowledge", status_code=201)
-async def save_knowledge(knowledge: KnowledgeIn):
+async def save_knowledge(knowledge: KnowledgeIn, background_tasks: BackgroundTasks):
     """保存新知识"""
     try:
         # 生成 ID
@@ -715,6 +1014,8 @@ async def save_knowledge(knowledge: KnowledgeIn):
             "eval": eval_data,
             "created_at": now,
             "updated_at": now,
+            "status": "pending",
+            "relationships": json.dumps([]),
         }
 
         print(f"[Save Knowledge] 插入数据: {json.dumps({k: v for k, v in insert_data.items() if k != 'embedding'}, ensure_ascii=False)}")
@@ -722,7 +1023,10 @@ async def save_knowledge(knowledge: KnowledgeIn):
         # 插入 Milvus
         milvus_store.insert(insert_data)
 
-        return {"status": "ok", "knowledge_id": knowledge_id}
+        # 触发后台去重处理
+        background_tasks.add_task(knowledge_processor.process_pending)
+
+        return {"status": "pending", "knowledge_id": knowledge_id, "message": "知识已入队,正在处理去重..."}
 
     except Exception as e:
         print(f"[Save Knowledge] 错误: {e}")
@@ -736,7 +1040,8 @@ def list_knowledge(
     types: Optional[str] = None,
     scopes: Optional[str] = None,
     owner: Optional[str] = None,
-    tags: Optional[str] = None
+    tags: Optional[str] = None,
+    status: Optional[str] = None
 ):
     """列出知识(支持后端筛选和分页)"""
     try:
@@ -767,6 +1072,11 @@ def list_knowledge(
             for t in tag_list:
                 filters.append(f'array_contains(tag_keys, "{t}")')
 
+        # 只返回指定 status 的知识(默认 approved 和 checked)
+        status_list = [s.strip() for s in (status or "approved,checked").split(',') if s.strip()]
+        status_conditions = ' or '.join([f'status == "{s}"' for s in status_list])
+        filters.append(f'({status_conditions})')
+
         # 如果没有过滤条件,查询所有
         filter_expr = ' and '.join(filters) if filters else 'id != ""'
 
@@ -826,12 +1136,72 @@ def get_all_tags():
         raise HTTPException(status_code=500, detail=str(e))
 
 
+@app.get("/api/knowledge/pending")
+def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
+    """查询待处理队列(pending + processing)"""
+    try:
+        pending = milvus_store.query('status == "pending" or status == "processing"', limit=limit)
+        serialized = [serialize_milvus_result(r) for r in pending]
+        return {"results": serialized, "count": len(serialized)}
+    except Exception as e:
+        print(f"[Pending] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@app.post("/api/knowledge/process")
+async def trigger_process(force: bool = Query(default=False)):
+    """手动触发去重处理。force=true 时先回滚所有 processing → pending"""
+    try:
+        if force:
+            processing = milvus_store.query('status == "processing"', limit=200)
+            for item in processing:
+                milvus_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
+            print(f"[Manual Process] 回滚 {len(processing)} 条 processing")
+        asyncio.create_task(knowledge_processor.process_pending())
+        return {"status": "ok", "message": "处理任务已触发"}
+    except Exception as e:
+        print(f"[Manual Process] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@app.post("/api/knowledge/migrate")
+async def migrate_knowledge_schema():
+    """手动触发 schema 迁移(中转 collection 模式,将旧数据升级到含 status/relationships 的新 schema)"""
+    try:
+        count = milvus_store.migrate_schema()
+        return {"status": "ok", "migrated": count, "message": f"迁移完成,共迁移 {count} 条知识"}
+    except Exception as e:
+        print(f"[Migrate] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@app.get("/api/knowledge/status/{knowledge_id}")
+def get_knowledge_status(knowledge_id: str):
+    """查询单条知识的处理状态和关系"""
+    try:
+        result = milvus_store.get_by_id(knowledge_id)
+        if not result:
+            raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
+        serialized = serialize_milvus_result(result)
+        return {
+            "id": knowledge_id,
+            "status": serialized.get("status", "approved"),
+            "relationships": serialized.get("relationships", []),
+            "created_at": serialized.get("created_at"),
+            "updated_at": serialized.get("updated_at"),
+        }
+    except HTTPException:
+        raise
+    except Exception as e:
+        print(f"[Knowledge Status] 错误: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
 @app.get("/api/knowledge/{knowledge_id}")
 def get_knowledge(knowledge_id: str):
     """获取单条知识"""
     try:
         result = milvus_store.get_by_id(knowledge_id)
-
         if not result:
             raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
 
@@ -1268,7 +1638,9 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
                 "source": source,
                 "eval": eval_data,
                 "created_at": now,
-                "updated_at": now
+                "updated_at": now,
+                "status": "approved",
+                "relationships": json.dumps([])
             })
 
         milvus_store.insert_batch(knowledge_list)
@@ -1288,7 +1660,7 @@ REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
 
 
 @app.post("/api/extract")
-async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
+async def extract_knowledge_from_messages(extract_req: MessageExtractIn, background_tasks: BackgroundTasks):
     """从消息历史中提取知识(LLM 分析)"""
     if not extract_req.submitted_by:
         raise HTTPException(status_code=400, detail="submitted_by is required")
@@ -1430,12 +1802,15 @@ async def extract_knowledge_from_messages(extract_req: MessageExtractIn):
                 "eval": eval_data,
                 "created_at": now,
                 "updated_at": now,
+                "status": "pending",
+                "relationships": json.dumps([]),
             })
             knowledge_ids.append(knowledge_id)
 
         # 批量插入
         if knowledge_list:
             milvus_store.insert_batch(knowledge_list)
+            background_tasks.add_task(knowledge_processor.process_pending)
 
         print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
 
@@ -1500,7 +1875,7 @@ def frontend():
 
         <!-- 筛选栏 -->
         <div class="bg-white rounded-lg shadow p-6 mb-6">
-            <div class="grid grid-cols-1 md:grid-cols-4 gap-4">
+            <div class="grid grid-cols-1 md:grid-cols-5 gap-4">
                 <div>
                     <label class="block text-sm font-medium text-gray-700 mb-2">类型 (Types)</label>
                     <div class="space-y-2">
@@ -1526,6 +1901,15 @@ def frontend():
                     <label class="block text-sm font-medium text-gray-700 mb-2">Scopes</label>
                     <input type="text" id="scopesFilter" placeholder="输入 scope" class="w-full border rounded px-3 py-2">
                 </div>
+                <div>
+                    <label class="block text-sm font-medium text-gray-700 mb-2">Status</label>
+                    <div class="space-y-2">
+                        <label class="flex items-center"><input type="checkbox" value="approved" class="mr-2 status-filter" checked> Approved</label>
+                        <label class="flex items-center"><input type="checkbox" value="checked" class="mr-2 status-filter" checked> Checked</label>
+                        <label class="flex items-center"><input type="checkbox" value="rejected" class="mr-2 status-filter"> Rejected</label>
+                        <label class="flex items-center"><input type="checkbox" value="pending" class="mr-2 status-filter"> Pending</label>
+                    </div>
+                </div>
             </div>
             <button onclick="applyFilters()" class="mt-4 bg-gray-600 hover:bg-gray-700 text-white px-4 py-2 rounded">
                 应用筛选
@@ -1584,6 +1968,10 @@ def frontend():
                     <label class="block text-sm font-medium mb-1">Owner</label>
                     <input type="text" id="ownerInput" class="w-full border rounded px-3 py-2">
                 </div>
+                <div id="relationshipsSection" class="hidden">
+                    <label class="block text-sm font-medium text-gray-700 mb-2">关联知识</label>
+                    <div id="relationshipsList" class="space-y-1 text-sm bg-gray-50 rounded p-3"></div>
+                </div>
                 <div class="flex gap-2 pt-4">
                     <button type="submit" class="bg-blue-600 hover:bg-blue-700 text-white px-6 py-2 rounded">保存</button>
                     <button type="button" onclick="closeModal()" class="bg-gray-300 hover:bg-gray-400 px-6 py-2 rounded">取消</button>
@@ -1645,6 +2033,11 @@ def frontend():
                 params.append('scopes', scopesFilter);
             }
 
+            const selectedStatus = Array.from(document.querySelectorAll('.status-filter:checked')).map(el => el.value);
+            if (selectedStatus.length > 0) {
+                params.append('status', selectedStatus.join(','));
+            }
+
             try {
                 const res = await fetch(`/api/knowledge?${params.toString()}`);
                 if (!res.ok) {
@@ -1777,6 +2170,15 @@ def frontend():
                 }
                 const eval_data = k.eval || {};
                 const isChecked = selectedIds.has(k.id);
+                const statusColor = {
+                    'approved': 'bg-green-100 text-green-800',
+                    'checked':  'bg-blue-100 text-blue-800',
+                    'rejected': 'bg-red-100 text-red-800',
+                    'pending':  'bg-yellow-100 text-yellow-800',
+                    'processing': 'bg-orange-100 text-orange-800',
+                };
+                const statusClass = statusColor[k.status] || 'bg-gray-100 text-gray-800';
+                const statusLabel = k.status || 'approved';
 
                 return `
                 <div class="bg-white rounded-lg shadow p-6 hover:shadow-lg transition relative">
@@ -1790,7 +2192,10 @@ def frontend():
                             <div class="flex gap-2 flex-wrap">
                                 ${types.map(t => `<span class="bg-blue-100 text-blue-800 text-xs px-2 py-1 rounded">${t}</span>`).join('')}
                             </div>
-                            <span class="text-sm text-gray-500">${eval_data.score || 3}/5</span>
+                            <div class="flex items-center gap-2">
+                                <span class="text-xs px-2 py-1 rounded ${statusClass}">${statusLabel}</span>
+                                <span class="text-sm text-gray-500">${eval_data.score || 3}/5</span>
+                            </div>
                         </div>
                         <h3 class="text-lg font-semibold text-gray-800 mb-2">${escapeHtml(k.task)}</h3>
                         <p class="text-sm text-gray-600 mb-2">${escapeHtml(k.content.substring(0, 150))}${k.content.length > 150 ? '...' : ''}</p>
@@ -1901,6 +2306,27 @@ def frontend():
                 el.checked = types.includes(el.value);
             });
 
+            // 填充 relationships
+            const rels = Array.isArray(k.relationships) ? k.relationships : [];
+            const section = document.getElementById('relationshipsSection');
+            if (rels.length > 0) {
+                const typeColor = {
+                    superset: 'text-green-700', subset: 'text-orange-600',
+                    conflict: 'text-red-600', complement: 'text-blue-600',
+                    duplicate: 'text-gray-500'
+                };
+                document.getElementById('relationshipsList').innerHTML = rels.map(r =>
+                    `<div class="flex gap-2 items-center">
+                        <span class="font-medium ${typeColor[r.type] || 'text-gray-700'}">[${r.type}]</span>
+                        <span class="font-mono text-xs text-gray-500 cursor-pointer hover:underline"
+                              onclick="openEditModal('${r.target}')">${r.target}</span>
+                    </div>`
+                ).join('');
+                section.classList.remove('hidden');
+            } else {
+                section.classList.add('hidden');
+            }
+
             document.getElementById('modal').classList.remove('hidden');
         }
 

+ 163 - 6
knowhub/vector_store.py

@@ -99,6 +99,10 @@ class MilvusStore:
                 FieldSchema(name="eval", dtype=DataType.JSON),
                 FieldSchema(name="created_at", dtype=DataType.INT64),
                 FieldSchema(name="updated_at", dtype=DataType.INT64),
+                FieldSchema(name="status", dtype=DataType.VARCHAR,
+                           max_length=20, default_value="approved"),
+                FieldSchema(name="relationships", dtype=DataType.VARCHAR,
+                           max_length=10000, default_value="[]"),
             ]
 
             schema = CollectionSchema(fields, description="KnowHub Knowledge")
@@ -112,6 +116,12 @@ class MilvusStore:
             }
             self.collection.create_index("embedding", index_params)
 
+            # 为 status 创建 Trie 标量索引(加速过滤)
+            try:
+                self.collection.create_index("status", {"index_type": "Trie"})
+            except Exception:
+                pass
+
         self.collection.load()
 
     def insert(self, knowledge: Dict):
@@ -161,7 +171,8 @@ class MilvusStore:
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
                           "tags", "tag_keys", "scopes", "owner", "resource_ids",
-                          "source", "eval", "created_at", "updated_at"]
+                          "source", "eval", "created_at", "updated_at",
+                          "status", "relationships"]
         )
 
         if not results or not results[0]:
@@ -185,6 +196,8 @@ class MilvusStore:
                 "eval": hit.entity.get("eval"),
                 "created_at": hit.entity.get("created_at") * 1000 if hit.entity.get("created_at") else None,
                 "updated_at": hit.entity.get("updated_at") * 1000 if hit.entity.get("updated_at") else None,
+                "status": hit.entity.get("status", "approved"),
+                "relationships": json.loads(hit.entity.get("relationships") or "[]"),
             }
             for hit in results[0]
         ]
@@ -204,7 +217,8 @@ class MilvusStore:
             expr=filters,
             output_fields=["id", "message_id", "task", "content", "types",
                           "tags", "tag_keys", "scopes", "owner", "resource_ids",
-                          "source", "eval", "created_at", "updated_at"],
+                          "source", "eval", "created_at", "updated_at",
+                          "status", "relationships"],
             limit=limit
         )
 
@@ -223,6 +237,13 @@ class MilvusStore:
                 r["scopes"] = list(r["scopes"])
             if r.get("resource_ids") and not isinstance(r["resource_ids"], list):
                 r["resource_ids"] = list(r["resource_ids"])
+            # 兼容旧数据(无 status/relationships 字段)
+            if "status" not in r:
+                r["status"] = "approved"
+            if "relationships" not in r or r["relationships"] is None:
+                r["relationships"] = []
+            else:
+                r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"]
 
         return results
 
@@ -238,9 +259,10 @@ class MilvusStore:
         """
         results = self.collection.query(
             expr=f'id == "{knowledge_id}"',
-            output_fields=["id", "message_id", "task", "content", "types",
+            output_fields=["id", "embedding", "message_id", "task", "content", "types",
                           "tags", "tag_keys", "scopes", "owner", "resource_ids",
-                          "source", "eval", "created_at", "updated_at"]
+                          "source", "eval", "created_at", "updated_at",
+                          "status", "relationships"]
         )
 
         if not results:
@@ -260,6 +282,13 @@ class MilvusStore:
             r["scopes"] = list(r["scopes"])
         if r.get("resource_ids") and not isinstance(r["resource_ids"], list):
             r["resource_ids"] = list(r["resource_ids"])
+        # 兼容旧数据
+        if "status" not in r:
+            r["status"] = "approved"
+        if "relationships" not in r or r["relationships"] is None:
+            r["relationships"] = []
+        else:
+            r["relationships"] = json.loads(r["relationships"]) if isinstance(r["relationships"], str) else r["relationships"]
 
         return r
 
@@ -280,10 +309,18 @@ class MilvusStore:
         existing.update(updates)
         existing["updated_at"] = int(time.time())
 
-        # 3. 删除旧数据
+        # 3. 还原 get_by_id 的展示层转换,确保存储格式正确
+        # created_at 被 get_by_id 乘以 1000(毫秒),需还原为秒
+        if existing.get("created_at") and existing["created_at"] > 1_000_000_000_000:
+            existing["created_at"] = existing["created_at"] // 1000
+        # relationships 被 get_by_id 反序列化为 list,需还原为 JSON 字符串
+        if isinstance(existing.get("relationships"), list):
+            existing["relationships"] = json.dumps(existing["relationships"])
+
+        # 4. 删除旧数据
         self.delete(knowledge_id)
 
-        # 4. 插入新数据
+        # 5. 插入新数据
         self.insert(existing)
 
     def delete(self, knowledge_id: str):
@@ -303,3 +340,123 @@ class MilvusStore:
     def drop_collection(self):
         """删除 collection(危险操作)"""
         utility.drop_collection("knowledge")
+
+    def migrate_schema(self) -> int:
+        """
+        将旧 collection(无 status/relationships 字段)迁移到新 schema。
+        采用中转 collection 模式,Step 3 之前数据始终有两份副本。
+        返回迁移的知识条数。
+        """
+        MIGRATION_NAME = "knowledge_migration"
+        MAIN_NAME = "knowledge"
+
+        # 如果中转 collection 已存在(上次迁移中途失败),先清理
+        if utility.has_collection(MIGRATION_NAME):
+            print(f"[Migrate] 检测到残留中转 collection,清理...")
+            utility.drop_collection(MIGRATION_NAME)
+
+        # Step 1: 创建中转 collection(新 schema)
+        print(f"[Migrate] Step 1: 创建中转 collection {MIGRATION_NAME}...")
+        fields = [
+            FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=100, is_primary=True),
+            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
+            FieldSchema(name="message_id", dtype=DataType.VARCHAR, max_length=100),
+            FieldSchema(name="task", dtype=DataType.VARCHAR, max_length=2000),
+            FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=50000),
+            FieldSchema(name="types", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=50),
+            FieldSchema(name="tags", dtype=DataType.JSON),
+            FieldSchema(name="tag_keys", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=100),
+            FieldSchema(name="scopes", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=20, max_length=100),
+            FieldSchema(name="owner", dtype=DataType.VARCHAR, max_length=200),
+            FieldSchema(name="resource_ids", dtype=DataType.ARRAY, element_type=DataType.VARCHAR, max_capacity=50, max_length=200),
+            FieldSchema(name="source", dtype=DataType.JSON),
+            FieldSchema(name="eval", dtype=DataType.JSON),
+            FieldSchema(name="created_at", dtype=DataType.INT64),
+            FieldSchema(name="updated_at", dtype=DataType.INT64),
+            FieldSchema(name="status", dtype=DataType.VARCHAR, max_length=20, default_value="approved"),
+            FieldSchema(name="relationships", dtype=DataType.VARCHAR, max_length=10000, default_value="[]"),
+        ]
+        schema = CollectionSchema(fields, description="KnowHub Knowledge")
+        migration_col = Collection(MIGRATION_NAME, schema)
+        migration_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}})
+        try:
+            migration_col.create_index("status", {"index_type": "Trie"})
+        except Exception:
+            pass
+        migration_col.load()
+
+        # Step 2: 从旧 collection 逐批读取,补字段,插入中转
+        print(f"[Migrate] Step 2: 读取旧数据并插入中转 collection...")
+        batch_size = 200
+        offset = 0
+        total = 0
+        while True:
+            batch = self.collection.query(
+                expr='id != ""',
+                output_fields=["id", "embedding", "message_id", "task", "content", "types",
+                               "tags", "tag_keys", "scopes", "owner", "resource_ids",
+                               "source", "eval", "created_at", "updated_at"],
+                limit=batch_size,
+                offset=offset
+            )
+            if not batch:
+                break
+            for item in batch:
+                item["status"] = item.get("status", "approved")
+                item["relationships"] = item.get("relationships") or []
+                # 时间戳已是秒级(query 返回原始值,未乘 1000)
+            migration_col.insert(batch)
+            migration_col.flush()
+            total += len(batch)
+            offset += len(batch)
+            print(f"[Migrate] 已迁移 {total} 条...")
+            if len(batch) < batch_size:
+                break
+
+        # Step 3: drop 旧 collection
+        print(f"[Migrate] Step 3: drop 旧 collection {MAIN_NAME}...")
+        self.collection.release()
+        utility.drop_collection(MAIN_NAME)
+
+        # Step 4: 创建新 collection(同名,新 schema)
+        print(f"[Migrate] Step 4: 创建新 collection {MAIN_NAME}...")
+        new_col = Collection(MAIN_NAME, schema)
+        new_col.create_index("embedding", {"metric_type": "COSINE", "index_type": "HNSW", "params": {"M": 16, "efConstruction": 200}})
+        try:
+            new_col.create_index("status", {"index_type": "Trie"})
+        except Exception:
+            pass
+        new_col.load()
+
+        # Step 5: 从中转 collection 读取,插入新 collection
+        print(f"[Migrate] Step 5: 从中转 collection 回写到新 collection...")
+        offset = 0
+        while True:
+            batch = migration_col.query(
+                expr='id != ""',
+                output_fields=["id", "embedding", "message_id", "task", "content", "types",
+                               "tags", "tag_keys", "scopes", "owner", "resource_ids",
+                               "source", "eval", "created_at", "updated_at",
+                               "status", "relationships"],
+                limit=batch_size,
+                offset=offset
+            )
+            if not batch:
+                break
+            new_col.insert(batch)
+            new_col.flush()
+            offset += len(batch)
+            if len(batch) < batch_size:
+                break
+
+        # Step 6: drop 中转 collection
+        print(f"[Migrate] Step 6: drop 中转 collection {MIGRATION_NAME}...")
+        migration_col.release()
+        utility.drop_collection(MIGRATION_NAME)
+
+        # Step 7: 更新 self.collection 引用
+        print(f"[Migrate] Step 7: 更新 collection 引用...")
+        self.collection = new_col
+
+        print(f"[Migrate] 迁移完成,共迁移 {total} 条知识。")
+        return total

+ 2 - 0
migrate_knowledge.py

@@ -106,6 +106,8 @@ async def migrate_knowledge():
                 "eval": eval_data,
                 "created_at": created_at,
                 "updated_at": updated_at,
+                "status": "approved",
+                "relationships": [],
             })
 
             # 收集 task 用于生成 embedding(只基于 task)