""" KnowHub Server Agent 工具使用经验的共享平台。 FastAPI + Milvus Lite(知识)+ SQLite(资源),单文件部署。 """ import os import re import json import sqlite3 import asyncio import base64 import time import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Optional from pathlib import Path from cryptography.hazmat.primitives.ciphers.aead import AESGCM from fastapi import FastAPI, HTTPException, Query, Header from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field # 导入 LLM 调用(需要 agent 模块在 Python path 中) import sys sys.path.insert(0, str(Path(__file__).parent.parent)) # 加载环境变量 from dotenv import load_dotenv load_dotenv(Path(__file__).parent.parent / ".env") from agent.llm.openrouter import openrouter_llm_call # 导入向量存储和 embedding from knowhub.vector_store import MilvusStore from knowhub.embeddings import get_embedding, get_embeddings_batch BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub") BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API") BRAND_DB = os.getenv("BRAND_DB", "knowhub.db") # 组织密钥配置(格式:org1:key1_base64,org2:key2_base64) ORG_KEYS_RAW = os.getenv("ORG_KEYS", "") ORG_KEYS = {} if ORG_KEYS_RAW: for pair in ORG_KEYS_RAW.split(","): if ":" in pair: org, key_b64 = pair.split(":", 1) ORG_KEYS[org.strip()] = key_b64.strip() DB_PATH = Path(__file__).parent / BRAND_DB MILVUS_DATA_DIR = Path(__file__).parent / "milvus_data" # 全局 Milvus 存储实例 milvus_store: Optional[MilvusStore] = None # --- 数据库 --- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn # --- 加密/解密 --- def get_org_key(resource_id: str) -> Optional[bytes]: """从content_id提取组织前缀,返回对应密钥""" if "/" in resource_id: org = resource_id.split("/")[0] if org in ORG_KEYS: return base64.b64decode(ORG_KEYS[org]) return None def encrypt_content(resource_id: str, plaintext: str) -> str: """加密内容,返回格式:encrypted:AES256-GCM:{base64_data}""" if not plaintext: return "" key = get_org_key(resource_id) if not key: # 没有配置密钥,明文存储(不推荐) return plaintext aesgcm = AESGCM(key) nonce = os.urandom(12) # 96-bit nonce ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) # 组合 nonce + ciphertext encrypted_data = nonce + ciphertext encoded = base64.b64encode(encrypted_data).decode("ascii") return f"encrypted:AES256-GCM:{encoded}" def decrypt_content(resource_id: str, encrypted_text: str, provided_key: Optional[str] = None) -> str: """解密内容,如果没有提供密钥或密钥错误,返回[ENCRYPTED]""" if not encrypted_text: return "" if not encrypted_text.startswith("encrypted:AES256-GCM:"): # 未加密的内容,直接返回 return encrypted_text # 提取加密数据 encoded = encrypted_text.split(":", 2)[2] encrypted_data = base64.b64decode(encoded) nonce = encrypted_data[:12] ciphertext = encrypted_data[12:] # 获取密钥 key = None if provided_key: # 使用提供的密钥 try: key = base64.b64decode(provided_key) except Exception: return "[ENCRYPTED]" else: # 从配置中获取 key = get_org_key(resource_id) if not key: return "[ENCRYPTED]" try: aesgcm = AESGCM(key) plaintext = aesgcm.decrypt(nonce, ciphertext, None) return plaintext.decode("utf-8") except Exception: return "[ENCRYPTED]" def init_db(): """初始化 SQLite(仅用于 resources)""" conn = get_db() conn.execute(""" CREATE TABLE IF NOT EXISTS experiences ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, url TEXT DEFAULT '', category TEXT DEFAULT '', task TEXT NOT NULL, score INTEGER CHECK(score BETWEEN 1 AND 5), outcome TEXT DEFAULT '', tips TEXT DEFAULT '', content_id TEXT DEFAULT '', submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)") conn.execute(""" CREATE TABLE IF NOT EXISTS resources ( id TEXT PRIMARY KEY, title TEXT DEFAULT '', body TEXT NOT NULL, secure_body TEXT DEFAULT '', content_type TEXT DEFAULT 'text', metadata TEXT DEFAULT '{}', sort_order INTEGER DEFAULT 0, submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT DEFAULT '' ) """) conn.commit() conn.close() # --- Models --- class ResourceIn(BaseModel): id: str title: str = "" body: str secure_body: str = "" content_type: str = "text" # text|code|credential|cookie metadata: dict = {} sort_order: int = 0 submitted_by: str = "" class ResourcePatchIn(BaseModel): """PATCH /api/resource/{id} 请求体""" title: Optional[str] = None body: Optional[str] = None secure_body: Optional[str] = None content_type: Optional[str] = None metadata: Optional[dict] = None # Knowledge Models class KnowledgeIn(BaseModel): task: str content: str types: list[str] = ["strategy"] tags: dict = {} scopes: list[str] = ["org:cybertogether"] owner: str = "" message_id: str = "" resource_ids: list[str] = [] source: dict = {} # {name, category, urls, agent_id, submitted_by, timestamp} eval: dict = {} # {score, helpful, harmful, confidence} class KnowledgeOut(BaseModel): id: str message_id: str types: list[str] task: str tags: dict scopes: list[str] owner: str content: str resource_ids: list[str] source: dict eval: dict created_at: str updated_at: str class KnowledgeUpdateIn(BaseModel): add_helpful_case: Optional[dict] = None add_harmful_case: Optional[dict] = None update_score: Optional[int] = Field(default=None, ge=1, le=5) evolve_feedback: Optional[str] = None class KnowledgePatchIn(BaseModel): """PATCH /api/knowledge/{id} 请求体(直接字段编辑)""" task: Optional[str] = None content: Optional[str] = None types: Optional[list[str]] = None tags: Optional[dict] = None scopes: Optional[list[str]] = None owner: Optional[str] = None class MessageExtractIn(BaseModel): """POST /api/extract 请求体(消息历史提取)""" messages: list[dict] # [{role: str, content: str}, ...] agent_id: str = "unknown" submitted_by: str # 必填,作为 owner session_key: str = "" class KnowledgeBatchUpdateIn(BaseModel): feedback_list: list[dict] class KnowledgeSearchResponse(BaseModel): results: list[dict] count: int class ResourceNode(BaseModel): id: str title: str class ResourceOut(BaseModel): id: str title: str body: str secure_body: str = "" content_type: str = "text" metadata: dict = {} toc: Optional[ResourceNode] = None children: list[ResourceNode] prev: Optional[ResourceNode] = None next: Optional[ResourceNode] = None # --- App --- @asynccontextmanager async def lifespan(app: FastAPI): global milvus_store # 初始化 SQLite(resources) init_db() # 初始化 Milvus Lite(knowledge) milvus_store = MilvusStore(data_dir=str(MILVUS_DATA_DIR)) yield # 清理(Milvus Lite 会自动处理) app = FastAPI(title=BRAND_NAME, lifespan=lifespan) # --- Knowledge API --- @app.post("/api/resource", status_code=201) def submit_resource(resource: ResourceIn): conn = get_db() try: now = datetime.now(timezone.utc).isoformat() # 加密敏感内容 encrypted_secure_body = encrypt_content(resource.id, resource.secure_body) conn.execute( "INSERT OR REPLACE INTO resources" "(id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( resource.id, resource.title, resource.body, encrypted_secure_body, resource.content_type, json.dumps(resource.metadata), resource.sort_order, resource.submitted_by, now, now, ), ) conn.commit() return {"status": "ok", "id": resource.id} finally: conn.close() @app.get("/api/resource/{resource_id:path}", response_model=ResourceOut) def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)): conn = get_db() try: row = conn.execute( "SELECT id, title, body, secure_body, content_type, metadata, sort_order FROM resources WHERE id = ?", (resource_id,), ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}") # 解密敏感内容 secure_body = decrypt_content(resource_id, row["secure_body"] or "", x_org_key) # 解析metadata metadata = json.loads(row["metadata"] or "{}") # 计算导航上下文 root_id = resource_id.split("/")[0] if "/" in resource_id else resource_id # TOC (根节点) toc = None if "/" in resource_id: toc_row = conn.execute( "SELECT id, title FROM resources WHERE id = ?", (root_id,), ).fetchone() if toc_row: toc = ResourceNode(id=toc_row["id"], title=toc_row["title"]) # Children (子节点) children = [] children_rows = conn.execute( "SELECT id, title FROM resources WHERE id LIKE ? AND id != ? ORDER BY sort_order", (f"{resource_id}/%", resource_id), ).fetchall() children = [ResourceNode(id=r["id"], title=r["title"]) for r in children_rows] # Prev/Next (同级节点) prev_node = None next_node = None if "/" in resource_id: siblings = conn.execute( "SELECT id, title, sort_order FROM resources WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order", (f"{root_id}/%", f"{root_id}/%/%"), ).fetchall() for i, sib in enumerate(siblings): if sib["id"] == resource_id: if i > 0: prev_node = ResourceNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"]) if i < len(siblings) - 1: next_node = ResourceNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"]) break return ResourceOut( id=row["id"], title=row["title"], body=row["body"], secure_body=secure_body, content_type=row["content_type"], metadata=metadata, toc=toc, children=children, prev=prev_node, next=next_node, ) finally: conn.close() @app.patch("/api/resource/{resource_id:path}") def patch_resource(resource_id: str, patch: ResourcePatchIn): """更新resource字段""" conn = get_db() try: # 检查是否存在 row = conn.execute("SELECT id FROM resources WHERE id = ?", (resource_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}") # 构建更新语句 updates = [] params = [] if patch.title is not None: updates.append("title = ?") params.append(patch.title) if patch.body is not None: updates.append("body = ?") params.append(patch.body) if patch.secure_body is not None: encrypted = encrypt_content(resource_id, patch.secure_body) updates.append("secure_body = ?") params.append(encrypted) if patch.content_type is not None: updates.append("content_type = ?") params.append(patch.content_type) if patch.metadata is not None: updates.append("metadata = ?") params.append(json.dumps(patch.metadata)) if not updates: return {"status": "ok", "message": "No fields to update"} # 添加updated_at updates.append("updated_at = ?") params.append(datetime.now(timezone.utc).isoformat()) # 执行更新 params.append(resource_id) sql = f"UPDATE resources SET {', '.join(updates)} WHERE id = ?" conn.execute(sql, params) conn.commit() return {"status": "ok", "id": resource_id} finally: conn.close() @app.get("/api/resource") def list_resources( content_type: Optional[str] = Query(None), limit: int = Query(100, ge=1, le=1000) ): """列出所有resource""" conn = get_db() try: sql = "SELECT id, title, content_type, metadata, created_at FROM resources" params = [] if content_type: sql += " WHERE content_type = ?" params.append(content_type) sql += " ORDER BY id LIMIT ?" params.append(limit) rows = conn.execute(sql, params).fetchall() results = [] for row in rows: results.append({ "id": row["id"], "title": row["title"], "content_type": row["content_type"], "metadata": json.loads(row["metadata"] or "{}"), "created_at": row["created_at"], }) return {"results": results, "count": len(results)} finally: conn.close() # --- Knowledge API --- # ===== Knowledge API ===== async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[str]: """ 使用 LLM 对候选知识进行精排 Args: query: 查询文本 candidates: 候选知识列表 top_k: 返回数量 Returns: 排序后的知识 ID 列表 """ if not candidates: return [] # 构造 prompt candidates_text = "\n".join([ f"[{i+1}] ID: {c['id']}\nTask: {c['task']}\nContent: {c['content'][:200]}..." for i, c in enumerate(candidates) ]) prompt = f"""你是知识检索专家。根据用户查询,从候选知识中选出最相关的 {top_k} 条。 用户查询:"{query}" 候选知识: {candidates_text} 请输出最相关的 {top_k} 个知识 ID,按相关性从高到低排序,用逗号分隔。 只输出 ID,不要其他内容。""" try: response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.5-flash-lite" ) content = response.get("content", "").strip() # 解析 ID 列表 selected_ids = [ idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-")) ] return selected_ids[:top_k] except Exception as e: print(f"[LLM Rerank] 失败: {e}") return [] @app.get("/api/knowledge/search") async def search_knowledge_api( q: str = Query(..., description="查询文本"), top_k: int = Query(default=5, ge=1, le=20), min_score: int = Query(default=3, ge=1, le=5), types: Optional[str] = None, owner: Optional[str] = None ): """检索知识(向量召回 + LLM 精排)""" try: # 1. 生成查询向量 query_embedding = await get_embedding(q) # 2. 构建过滤表达式 filters = [] if types: type_list = [t.strip() for t in types.split(',') if t.strip()] for t in type_list: filters.append(f'JSON_CONTAINS(types, "{t}")') if owner: filters.append(f'owner == "{owner}"') # 添加 min_score 过滤 filters.append(f'JSON_EXTRACT(eval, "$.score") >= {min_score}') filter_expr = ' and '.join(filters) if filters else None # 3. 向量召回(3*k 个候选) recall_limit = top_k * 3 candidates = milvus_store.search( query_embedding=query_embedding, filters=filter_expr, limit=recall_limit ) if not candidates: return {"results": [], "count": 0, "reranked": False} # 4. LLM 精排 reranked_ids = await _llm_rerank(q, candidates, top_k) if reranked_ids: # 按 LLM 排序返回 id_to_candidate = {c["id"]: c for c in candidates} results = [id_to_candidate[id] for id in reranked_ids if id in id_to_candidate] return {"results": results, "count": len(results), "reranked": True} else: # Fallback:直接返回向量召回的 top k print(f"[Knowledge Search] LLM 精排失败,fallback 到向量 top-{top_k}") return {"results": candidates[:top_k], "count": len(candidates[:top_k]), "reranked": False} except Exception as e: print(f"[Knowledge Search] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/knowledge", status_code=201) async def save_knowledge(knowledge: KnowledgeIn): """保存新知识""" try: # 生成 ID timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') random_suffix = uuid.uuid4().hex[:4] knowledge_id = f"knowledge-{timestamp}-{random_suffix}" now = int(time.time()) # 设置默认值 owner = knowledge.owner or f"agent:{knowledge.source.get('agent_id', 'unknown')}" # 准备 source source = { "name": knowledge.source.get("name", ""), "category": knowledge.source.get("category", ""), "urls": knowledge.source.get("urls", []), "agent_id": knowledge.source.get("agent_id", "unknown"), "submitted_by": knowledge.source.get("submitted_by", ""), "timestamp": datetime.now(timezone.utc).isoformat(), "message_id": knowledge.message_id } # 准备 eval eval_data = { "score": knowledge.eval.get("score", 3), "helpful": knowledge.eval.get("helpful", 1), "harmful": knowledge.eval.get("harmful", 0), "confidence": knowledge.eval.get("confidence", 0.5), "helpful_history": [], "harmful_history": [] } # 生成向量 text = f"{knowledge.task}\n{knowledge.content}" embedding = await get_embedding(text) # 插入 Milvus milvus_store.insert({ "id": knowledge_id, "embedding": embedding, "message_id": knowledge.message_id, "task": knowledge.task, "content": knowledge.content, "types": knowledge.types, "tags": knowledge.tags, "scopes": knowledge.scopes, "owner": owner, "resource_ids": knowledge.resource_ids, "source": source, "eval": eval_data, "created_at": now, "updated_at": now, }) return {"status": "ok", "knowledge_id": knowledge_id} except Exception as e: print(f"[Save Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/knowledge") def list_knowledge( limit: int = Query(default=100, ge=1, le=1000), types: Optional[str] = None, scopes: Optional[str] = None, owner: Optional[str] = None, tags: Optional[str] = None ): """列出知识(支持后端筛选)""" try: # 构建过滤表达式 filters = [] # types 支持多个,用 AND 连接(交集:必须同时包含所有选中的type) if types: type_list = [t.strip() for t in types.split(',') if t.strip()] for t in type_list: filters.append(f'JSON_CONTAINS(types, "{t}")') if scopes: filters.append(f'JSON_CONTAINS(scopes, "{scopes}")') if owner: filters.append(f'owner like "%{owner}%"') # tags 支持多个,用 AND 连接(交集:必须同时包含所有选中的tag) if tags: tag_list = [t.strip() for t in tags.split(',') if t.strip()] for t in tag_list: filters.append(f'JSON_CONTAINS_ANY(tags, ["{t}"])') # 如果没有过滤条件,查询所有 filter_expr = ' and '.join(filters) if filters else 'id != ""' # 查询 Milvus results = milvus_store.query(filter_expr, limit=limit) return {"results": results, "count": len(results)} except Exception as e: print(f"[List Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/knowledge/meta/tags") def get_all_tags(): """获取所有已有的 tags""" try: # 查询所有知识 results = milvus_store.query('id != ""', limit=10000) all_tags = set() for item in results: tags_dict = item.get("tags", {}) if isinstance(tags_dict, dict): for key in tags_dict.keys(): all_tags.add(key) return {"tags": sorted(list(all_tags))} except Exception as e: print(f"[Get Tags] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/knowledge/{knowledge_id}") def get_knowledge(knowledge_id: str): """获取单条知识""" try: result = milvus_store.get_by_id(knowledge_id) if not result: raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}") return result except HTTPException: raise except Exception as e: print(f"[Get Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str: """使用 LLM 进行知识进化重写""" prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。 【原知识内容】: {old_content} 【实战反馈建议】: {feedback} 【重写要求】: 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。 3. 语言:简洁直接,使用中文。 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。 """ try: response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.5-flash-lite" ) evolved = response.get("content", "").strip() if len(evolved) < 5: raise ValueError("LLM output too short") return evolved except Exception as e: print(f"知识进化失败,采用追加模式回退: {e}") return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}" @app.put("/api/knowledge/{knowledge_id}") async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn): """更新知识评估,支持知识进化""" try: # 获取现有知识 existing = milvus_store.get_by_id(knowledge_id) if not existing: raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}") eval_data = existing.get("eval", {}) # 更新评分 if update.update_score is not None: eval_data["score"] = update.update_score # 添加有效案例 if update.add_helpful_case: eval_data["helpful"] = eval_data.get("helpful", 0) + 1 if "helpful_history" not in eval_data: eval_data["helpful_history"] = [] eval_data["helpful_history"].append(update.add_helpful_case) # 添加有害案例 if update.add_harmful_case: eval_data["harmful"] = eval_data.get("harmful", 0) + 1 if "harmful_history" not in eval_data: eval_data["harmful_history"] = [] eval_data["harmful_history"].append(update.add_harmful_case) # 知识进化 content = existing["content"] need_reembed = False if update.evolve_feedback: content = await _evolve_knowledge_with_llm(content, update.evolve_feedback) eval_data["helpful"] = eval_data.get("helpful", 0) + 1 need_reembed = True # 准备更新数据 updates = { "content": content, "eval": eval_data, } # 如果内容变化,重新生成向量 if need_reembed: text = f"{existing['task']}\n{content}" embedding = await get_embedding(text) updates["embedding"] = embedding # 更新 Milvus milvus_store.update(knowledge_id, updates) return {"status": "ok", "knowledge_id": knowledge_id} except HTTPException: raise except Exception as e: print(f"[Update Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.patch("/api/knowledge/{knowledge_id}") async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn): """直接编辑知识字段""" try: # 获取现有知识 existing = milvus_store.get_by_id(knowledge_id) if not existing: raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}") updates = {} need_reembed = False if patch.task is not None: updates["task"] = patch.task need_reembed = True if patch.content is not None: updates["content"] = patch.content need_reembed = True if patch.types is not None: updates["types"] = patch.types if patch.tags is not None: updates["tags"] = patch.tags if patch.scopes is not None: updates["scopes"] = patch.scopes if patch.owner is not None: updates["owner"] = patch.owner if not updates: return {"status": "ok", "knowledge_id": knowledge_id} # 如果 task 或 content 变化,重新生成向量 if need_reembed: task = updates.get("task", existing["task"]) content = updates.get("content", existing["content"]) text = f"{task}\n{content}" embedding = await get_embedding(text) updates["embedding"] = embedding # 更新 Milvus milvus_store.update(knowledge_id, updates) return {"status": "ok", "knowledge_id": knowledge_id} except HTTPException: raise except Exception as e: print(f"[Patch Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/knowledge/batch_update") async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn): """批量反馈知识有效性""" if not batch.feedback_list: return {"status": "ok", "updated": 0} try: # 先处理无需进化的,收集需要进化的 evolution_tasks = [] # [(knowledge_id, old_content, feedback, eval_data)] simple_updates = [] # [(knowledge_id, is_effective, eval_data)] for item in batch.feedback_list: knowledge_id = item.get("knowledge_id") is_effective = item.get("is_effective") feedback = item.get("feedback", "") if not knowledge_id: continue existing = milvus_store.get_by_id(knowledge_id) if not existing: continue eval_data = existing.get("eval", {}) if is_effective and feedback: evolution_tasks.append((knowledge_id, existing["content"], feedback, eval_data, existing["task"])) else: simple_updates.append((knowledge_id, is_effective, eval_data)) # 执行简单更新 for knowledge_id, is_effective, eval_data in simple_updates: if is_effective: eval_data["helpful"] = eval_data.get("helpful", 0) + 1 else: eval_data["harmful"] = eval_data.get("harmful", 0) + 1 milvus_store.update(knowledge_id, {"eval": eval_data}) # 并发执行知识进化 if evolution_tasks: print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...") evolved_results = await asyncio.gather( *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _, _ in evolution_tasks] ) for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results): eval_data["helpful"] = eval_data.get("helpful", 0) + 1 # 重新生成向量 text = f"{task}\n{evolved_content}" embedding = await get_embedding(text) milvus_store.update(knowledge_id, { "content": evolved_content, "eval": eval_data, "embedding": embedding }) return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)} except Exception as e: print(f"[Batch Update] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/knowledge/slim") async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"): """知识库瘦身:合并语义相似知识""" try: # 获取所有知识 all_knowledge = milvus_store.query('id != ""', limit=10000) if len(all_knowledge) < 2: return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"} # 构造发给大模型的内容 entries_text = "" for item in all_knowledge: eval_data = item.get("eval", {}) types = item.get("types", []) entries_text += f"[ID: {item['id']}] [Types: {','.join(types)}] " entries_text += f"[Helpful: {eval_data.get('helpful', 0)}, Harmful: {eval_data.get('harmful', 0)}] [Score: {eval_data.get('score', 3)}]\n" entries_text += f"Task: {item['task']}\n" entries_text += f"Content: {item['content'][:200]}...\n\n" prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作: 【任务】: 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。 2. 合并时保留 helpful 最高的那条的 ID(helpful 取各条之和)。 3. 对于独立的、无重复的知识,保持原样不动。 【当前知识库】: {entries_text} 【输出格式要求】: 严格按以下格式输出每条知识,条目之间用 === 分隔: ID: <保留的id> TYPES: <逗号分隔的type列表> HELPFUL: <合并后的helpful计数> HARMFUL: <合并后的harmful计数> SCORE: <评分> TASK: <任务描述> CONTENT: <合并后的知识内容> === 最后输出合并报告: REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。 禁止输出任何开场白或解释。""" print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(all_knowledge)} 条知识...") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model=model ) content = response.get("content", "").strip() if not content: raise HTTPException(status_code=500, detail="LLM 返回为空") # 解析大模型输出 report_line = "" new_entries = [] blocks = [b.strip() for b in content.split("===") if b.strip()] for block in blocks: if block.startswith("REPORT:"): report_line = block continue lines = block.split("\n") kid, types, helpful, harmful, score, task, content_lines = None, [], 0, 0, 3, "", [] current_field = None for line in lines: if line.startswith("ID:"): kid = line[3:].strip() current_field = None elif line.startswith("TYPES:"): types_str = line[6:].strip() types = [t.strip() for t in types_str.split(",") if t.strip()] current_field = None elif line.startswith("HELPFUL:"): try: helpful = int(line[8:].strip()) except Exception: helpful = 0 current_field = None elif line.startswith("HARMFUL:"): try: harmful = int(line[8:].strip()) except Exception: harmful = 0 current_field = None elif line.startswith("SCORE:"): try: score = int(line[6:].strip()) except Exception: score = 3 current_field = None elif line.startswith("TASK:"): task = line[5:].strip() current_field = "task" elif line.startswith("CONTENT:"): content_lines.append(line[8:].strip()) current_field = "content" elif current_field == "task": task += "\n" + line elif current_field == "content": content_lines.append(line) if kid and content_lines: new_entries.append({ "id": kid, "types": types if types else ["strategy"], "helpful": helpful, "harmful": harmful, "score": score, "task": task.strip(), "content": "\n".join(content_lines).strip() }) if not new_entries: raise HTTPException(status_code=500, detail="解析大模型输出失败") # 生成向量并重建知识库 print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...") # 批量生成向量 texts = [f"{e['task']}\n{e['content']}" for e in new_entries] embeddings = await get_embeddings_batch(texts) # 清空并重建 now = int(time.time()) milvus_store.drop_collection() milvus_store._init_collection() knowledge_list = [] for e, embedding in zip(new_entries, embeddings): eval_data = { "score": e["score"], "helpful": e["helpful"], "harmful": e["harmful"], "confidence": 0.9, "helpful_history": [], "harmful_history": [] } source = { "name": "slim", "category": "exp", "urls": [], "agent_id": "slim", "submitted_by": "system", "timestamp": datetime.now(timezone.utc).isoformat() } knowledge_list.append({ "id": e["id"], "embedding": embedding, "message_id": "", "task": e["task"], "content": e["content"], "types": e["types"], "tags": {}, "scopes": ["org:cybertogether"], "owner": "agent:slim", "resource_ids": [], "source": source, "eval": eval_data, "created_at": now, "updated_at": now }) milvus_store.insert_batch(knowledge_list) result_msg = f"瘦身完成:{len(all_knowledge)} → {len(new_entries)} 条知识" if report_line: result_msg += f"\n{report_line}" print(f"[知识瘦身] {result_msg}") return {"status": "ok", "before": len(all_knowledge), "after": len(new_entries), "report": report_line} except HTTPException: raise except Exception as e: print(f"[Slim Knowledge] 错误: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/extract") async def extract_knowledge_from_messages(extract_req: MessageExtractIn): """从消息历史中提取知识(LLM 分析)""" if not extract_req.submitted_by: raise HTTPException(status_code=400, detail="submitted_by is required") messages = extract_req.messages if not messages or len(messages) == 0: return {"status": "ok", "extracted_count": 0, "knowledge_ids": []} # 构造消息历史文本 messages_text = "" for msg in messages: role = msg.get("role", "unknown") content = msg.get("content", "") messages_text += f"[{role}]: {content}\n\n" # LLM 提取知识 prompt = f"""你是一个知识提取专家。请从以下 Agent 对话历史中提取有价值的知识。 【对话历史】: {messages_text} 【提取要求】: 1. 识别对话中的关键知识点(工具使用经验、问题解决方案、最佳实践、踩坑经验等) 2. 每条知识必须包含: - task: 任务场景描述(在什么情况下,要完成什么目标) - content: 核心知识内容(具体可操作的方法、注意事项) - types: 知识类型(从 strategy/tool/user_profile/usecase/definition/plan 中选择) - score: 评分 1-5(根据知识的价值和可操作性) 3. 只提取有实际价值的知识,不要提取泛泛而谈的内容,一次就成功或比较简单的经验就不要记录了。 4. 如果没有值得提取的知识,返回空列表 【输出格式】: 严格按以下 JSON 格式输出,每条知识之间用逗号分隔: [ {{ "task": "任务场景描述", "content": "核心知识内容", "types": ["strategy"], "score": 4 }}, {{ "task": "另一个任务场景", "content": "另一个知识内容", "types": ["tool"], "score": 5 }} ] 如果没有知识,输出: [] **注意**:只记录经过多次尝试、或经过用户指导才成功的知识,一次就成功或比较简单的经验就不要记录了。 禁止输出任何解释或额外文本,只输出 JSON 数组。""" try: print(f"\n[Extract] 正在从 {len(messages)} 条消息中提取知识...") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.5-flash-lite" ) content = response.get("content", "").strip() # 尝试解析 JSON # 移除可能的 markdown 代码块标记 if content.startswith("```json"): content = content[7:] if content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() extracted_knowledge = json.loads(content) if not isinstance(extracted_knowledge, list): raise ValueError("LLM output is not a list") if not extracted_knowledge: return {"status": "ok", "extracted_count": 0, "knowledge_ids": []} # 批量生成向量 texts = [f"{item.get('task', '')}\n{item.get('content', '')}" for item in extracted_knowledge] embeddings = await get_embeddings_batch(texts) # 保存提取的知识 knowledge_ids = [] now = int(time.time()) knowledge_list = [] for item, embedding in zip(extracted_knowledge, embeddings): task = item.get("task", "") knowledge_content = item.get("content", "") types = item.get("types", ["strategy"]) score = item.get("score", 3) if not task or not knowledge_content: continue # 生成 ID timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') random_suffix = uuid.uuid4().hex[:4] knowledge_id = f"knowledge-{timestamp}-{random_suffix}" # 准备数据 source = { "name": "message_extraction", "category": "exp", "urls": [], "agent_id": extract_req.agent_id, "submitted_by": extract_req.submitted_by, "timestamp": datetime.now(timezone.utc).isoformat(), "session_key": extract_req.session_key } eval_data = { "score": score, "helpful": 1, "harmful": 0, "confidence": 0.7, "helpful_history": [], "harmful_history": [] } knowledge_list.append({ "id": knowledge_id, "embedding": embedding, "message_id": "", "task": task, "content": knowledge_content, "types": types, "tags": {}, "scopes": ["org:cybertogether"], "owner": extract_req.submitted_by, "resource_ids": [], "source": source, "eval": eval_data, "created_at": now, "updated_at": now, }) knowledge_ids.append(knowledge_id) # 批量插入 if knowledge_list: milvus_store.insert_batch(knowledge_list) print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识") return { "status": "ok", "extracted_count": len(knowledge_ids), "knowledge_ids": knowledge_ids } except json.JSONDecodeError as e: print(f"[Extract] JSON 解析失败: {e}") print(f"[Extract] LLM 输出: {content[:500]}") return {"status": "error", "error": "Failed to parse LLM output", "extracted_count": 0} except Exception as e: print(f"[Extract] 提取失败: {e}") return {"status": "error", "error": str(e), "extracted_count": 0} @app.get("/", response_class=HTMLResponse) def frontend(): """KnowHub 管理前端""" return """ KnowHub 管理

KnowHub 全局知识库

加载中...

""" if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=9999)