""" KnowHub Server Agent 工具使用经验的共享平台。 FastAPI + SQLite,单文件部署。 """ import os import re import json import sqlite3 import asyncio from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Optional from pathlib import Path from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel, Field # 导入 LLM 调用(需要 agent 模块在 Python path 中) import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from agent.llm.openrouter import openrouter_llm_call BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub") BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API") BRAND_DB = os.getenv("BRAND_DB", "knowhub.db") DB_PATH = Path(__file__).parent / BRAND_DB # --- 数据库 --- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db(): conn = get_db() conn.execute(""" CREATE TABLE IF NOT EXISTS experiences ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, url TEXT DEFAULT '', category TEXT DEFAULT '', task TEXT NOT NULL, score INTEGER CHECK(score BETWEEN 1 AND 5), outcome TEXT DEFAULT '', tips TEXT DEFAULT '', content_id TEXT DEFAULT '', submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)") conn.execute(""" CREATE TABLE IF NOT EXISTS contents ( id TEXT PRIMARY KEY, title TEXT DEFAULT '', body TEXT NOT NULL, sort_order INTEGER DEFAULT 0, submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS knowledge ( id TEXT PRIMARY KEY, message_id TEXT DEFAULT '', tags_type TEXT NOT NULL, scenario TEXT NOT NULL, content TEXT NOT NULL, source_urls TEXT DEFAULT '', source_agent_id TEXT DEFAULT '', source_timestamp TEXT NOT NULL, eval_score INTEGER DEFAULT 3 CHECK(eval_score BETWEEN 1 AND 5), eval_helpful INTEGER DEFAULT 0, eval_harmful INTEGER DEFAULT 0, eval_helpful_history TEXT DEFAULT '[]', eval_harmful_history TEXT DEFAULT '[]', metrics_helpful INTEGER DEFAULT 1, metrics_harmful INTEGER DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT DEFAULT '' ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_tags ON knowledge(tags_type)") conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_scenario ON knowledge(scenario)") conn.commit() conn.close() # --- Models --- class ExperienceIn(BaseModel): name: str url: str = "" category: str = "" task: str score: int = Field(ge=1, le=5) outcome: str = "" tips: str = "" content_id: str = "" submitted_by: str = "" class ExperienceOut(BaseModel): task: str score: int outcome: str tips: str content_id: str submitted_by: str created_at: str class ResourceResult(BaseModel): name: str url: str relevant_experiences: list[ExperienceOut] avg_score: float experience_count: int class SearchResponse(BaseModel): results: list[ResourceResult] class ResourceDetailResponse(BaseModel): name: str url: str category: str avg_score: float experience_count: int experiences: list[ExperienceOut] class ContentIn(BaseModel): id: str title: str = "" body: str sort_order: int = 0 submitted_by: str = "" # Knowledge Models class KnowledgeIn(BaseModel): scenario: str content: str tags_type: list[str] urls: list[str] = [] agent_id: str = "research_agent" score: int = Field(default=3, ge=1, le=5) message_id: str = "" class KnowledgeOut(BaseModel): id: str message_id: str tags: dict scenario: str content: str source: dict eval: dict metrics: dict created_at: str updated_at: str class KnowledgeUpdateIn(BaseModel): add_helpful_case: Optional[dict] = None add_harmful_case: Optional[dict] = None update_score: Optional[int] = Field(default=None, ge=1, le=5) evolve_feedback: Optional[str] = None class KnowledgeBatchUpdateIn(BaseModel): feedback_list: list[dict] class KnowledgeSearchResponse(BaseModel): results: list[dict] count: int class ContentNode(BaseModel): id: str title: str class ContentOut(BaseModel): id: str title: str body: str toc: Optional[ContentNode] = None children: list[ContentNode] prev: Optional[ContentNode] = None next: Optional[ContentNode] = None # --- App --- @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield app = FastAPI(title=BRAND_NAME, lifespan=lifespan) def _search_rows(conn: sqlite3.Connection, q: str, category: Optional[str]) -> list[sqlite3.Row]: """LIKE 搜索,拆词后 AND 连接,匹配 task + tips + outcome + name""" terms = q.split() if not terms: return [] conditions = [] params: list[str] = [] for term in terms: like = f"%{term}%" conditions.append( "(task LIKE ? OR tips LIKE ? OR outcome LIKE ? OR name LIKE ?)" ) params.extend([like, like, like, like]) if category: conditions.append("category = ?") params.append(category) sql = ( "SELECT name, url, category, task, score, outcome, tips, content_id, " "submitted_by, created_at FROM experiences WHERE " + " AND ".join(conditions) + " ORDER BY created_at DESC" ) return conn.execute(sql, params).fetchall() def _group_by_resource(rows: list[sqlite3.Row], limit: int) -> list[ResourceResult]: """按 name 分组并聚合""" groups: dict[str, list[sqlite3.Row]] = {} for row in rows: name = row["name"] if name not in groups: groups[name] = [] groups[name].append(row) results = [] for resource_name, resource_rows in groups.items(): scores = [r["score"] for r in resource_rows] avg = sum(scores) / len(scores) results.append(ResourceResult( name=resource_name, url=resource_rows[0]["url"], relevant_experiences=[ ExperienceOut( task=r["task"], score=r["score"], outcome=r["outcome"], tips=r["tips"], content_id=r["content_id"], submitted_by=r["submitted_by"], created_at=r["created_at"], ) for r in resource_rows ], avg_score=round(avg, 1), experience_count=len(resource_rows), )) results.sort(key=lambda r: r.avg_score * r.experience_count, reverse=True) return results[:limit] @app.get("/api/search", response_model=SearchResponse) def search_experiences( q: str = Query(..., min_length=1), category: Optional[str] = None, limit: int = Query(default=10, ge=1, le=50), ): conn = get_db() try: rows = _search_rows(conn, q, category) return SearchResponse(results=_group_by_resource(rows, limit)) finally: conn.close() @app.post("/api/experience", status_code=201) def submit_experience(exp: ExperienceIn): conn = get_db() try: now = datetime.now(timezone.utc).isoformat() conn.execute( "INSERT INTO experiences" "(name, url, category, task, score, outcome, tips, content_id, submitted_by, created_at)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (exp.name, exp.url, exp.category, exp.task, exp.score, exp.outcome, exp.tips, exp.content_id, exp.submitted_by, now), ) conn.commit() return {"status": "ok"} finally: conn.close() @app.get("/api/resource/{name}", response_model=ResourceDetailResponse) def get_resource_experiences(name: str): conn = get_db() try: rows = conn.execute( "SELECT name, url, category, task, score, outcome, tips, content_id, " "submitted_by, created_at FROM experiences " "WHERE name = ? ORDER BY created_at DESC", (name,), ).fetchall() if not rows: raise HTTPException(status_code=404, detail=f"No experiences found for resource: {name}") scores = [r["score"] for r in rows] avg = sum(scores) / len(scores) return ResourceDetailResponse( name=name, url=rows[0]["url"], category=rows[0]["category"], avg_score=round(avg, 1), experience_count=len(rows), experiences=[ ExperienceOut( task=r["task"], score=r["score"], outcome=r["outcome"], tips=r["tips"], content_id=r["content_id"], submitted_by=r["submitted_by"], created_at=r["created_at"], ) for r in rows ], ) finally: conn.close() @app.post("/api/content", status_code=201) def submit_content(content: ContentIn): conn = get_db() try: now = datetime.now(timezone.utc).isoformat() conn.execute( "INSERT OR REPLACE INTO contents" "(id, title, body, sort_order, submitted_by, created_at)" " VALUES (?, ?, ?, ?, ?, ?)", (content.id, content.title, content.body, content.sort_order, content.submitted_by, now), ) conn.commit() return {"status": "ok"} finally: conn.close() @app.get("/api/content/{content_id:path}", response_model=ContentOut) def get_content(content_id: str): conn = get_db() try: row = conn.execute( "SELECT id, title, body, sort_order FROM contents WHERE id = ?", (content_id,), ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Content not found: {content_id}") # 计算导航上下文 root_id = content_id.split("/")[0] if "/" in content_id else content_id # TOC (根节点) toc = None if "/" in content_id: toc_row = conn.execute( "SELECT id, title FROM contents WHERE id = ?", (root_id,), ).fetchone() if toc_row: toc = ContentNode(id=toc_row["id"], title=toc_row["title"]) # Children (子节点) children = [] children_rows = conn.execute( "SELECT id, title FROM contents WHERE id LIKE ? AND id != ? ORDER BY sort_order", (f"{content_id}/%", content_id), ).fetchall() children = [ContentNode(id=r["id"], title=r["title"]) for r in children_rows] # Prev/Next (同级节点) prev_node = None next_node = None if "/" in content_id: siblings = conn.execute( "SELECT id, title, sort_order FROM contents WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order", (f"{root_id}/%", f"{root_id}/%/%"), ).fetchall() for i, sib in enumerate(siblings): if sib["id"] == content_id: if i > 0: prev_node = ContentNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"]) if i < len(siblings) - 1: next_node = ContentNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"]) break return ContentOut( id=row["id"], title=row["title"], body=row["body"], toc=toc, children=children, prev=prev_node, next=next_node, ) finally: conn.close() # ===== Knowledge API ===== # 两阶段检索逻辑 async def _route_knowledge_by_llm(query_text: str, metadata_list: list[dict], k: int = 5) -> list[str]: """ 第一阶段:语义路由。 让 LLM 挑选出 2*k 个语义相关的 ID。 """ if not metadata_list: return [] routing_k = k * 2 routing_data = [ { "id": m["id"], "tags": m["tags"], "scenario": m["scenario"][:100] } for m in metadata_list ] prompt = f""" 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。 任务需求:"{query_text}" 可选知识列表: {json.dumps(routing_data, ensure_ascii=False, indent=1)} 请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。 """ try: print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.0-flash-001" ) content = response.get("content", "").strip() selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))] print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}") return selected_ids except Exception as e: print(f"LLM 知识路由失败: {e}") return [] async def _search_knowledge_two_stage( query_text: str, top_k: int = 5, min_score: int = 3, tags_filter: Optional[list[str]] = None, conn: sqlite3.Connection = None ) -> list[dict]: """ 两阶段检索:语义路由 + 质量精排 """ if conn is None: conn = get_db() should_close = True else: should_close = False try: # 阶段 1: 解析所有知识 query = "SELECT * FROM knowledge" rows = conn.execute(query).fetchall() if not rows: return [] content_map = {} metadata_list = [] for row in rows: kid = row["id"] tags_type = row["tags_type"].split(",") if row["tags_type"] else [] # 标签过滤 if tags_filter: if not any(tag in tags_type for tag in tags_filter): continue scenario = row["scenario"] content_text = row["content"] meta_item = { "id": kid, "tags": {"type": tags_type}, "scenario": scenario, "score": row["eval_score"], "helpful": row["metrics_helpful"], "harmful": row["metrics_harmful"], } metadata_list.append(meta_item) content_map[kid] = { "scenario": scenario, "content": content_text, "tags": {"type": tags_type}, "score": meta_item["score"], "helpful": meta_item["helpful"], "harmful": meta_item["harmful"], "message_id": row["message_id"], "source": { "urls": row["source_urls"].split(",") if row["source_urls"] else [], "agent_id": row["source_agent_id"], "timestamp": row["source_timestamp"] }, "created_at": row["created_at"], "updated_at": row["updated_at"] } if not metadata_list: return [] # 阶段 2: 语义路由 (取 2*k) candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k) # 阶段 3: 质量精排 print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...") scored_items = [] for kid in candidate_ids: if kid in content_map: item = content_map[kid] score = item["score"] helpful = item["helpful"] harmful = item["harmful"] # 计算综合分:基础分 + helpful - harmful*2 quality_score = score + helpful - (harmful * 2.0) # 过滤门槛 if score < min_score or quality_score < 0: print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})") continue scored_items.append({ "id": kid, "message_id": item["message_id"], "scenario": item["scenario"], "content": item["content"], "tags": item["tags"], "score": score, "quality_score": quality_score, "metrics": { "helpful": helpful, "harmful": harmful }, "source": item["source"], "created_at": item["created_at"], "updated_at": item["updated_at"] }) # 按照质量分排序 final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True) # 截取最终的 top_k result = final_sorted[:top_k] print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}") print(f"[Knowledge System] 检索结束。\n") return result finally: if should_close: conn.close() @app.get("/api/knowledge/search") async def search_knowledge_api( q: str = Query(..., description="查询文本"), top_k: int = Query(default=5, ge=1, le=20), min_score: int = Query(default=3, ge=1, le=5), tags_type: Optional[str] = None ): """检索知识(两阶段:语义路由 + 质量精排)""" conn = get_db() try: tags_filter = tags_type.split(",") if tags_type else None results = await _search_knowledge_two_stage( query_text=q, top_k=top_k, min_score=min_score, tags_filter=tags_filter, conn=conn ) return {"results": results, "count": len(results)} finally: conn.close() @app.post("/api/knowledge", status_code=201) def save_knowledge(knowledge: KnowledgeIn): """保存新知识""" import uuid conn = get_db() try: # 生成 ID timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') random_suffix = uuid.uuid4().hex[:4] knowledge_id = f"knowledge-{timestamp}-{random_suffix}" now = datetime.now(timezone.utc).isoformat() conn.execute( """INSERT INTO knowledge (id, message_id, tags_type, scenario, content, source_urls, source_agent_id, source_timestamp, eval_score, eval_helpful, eval_harmful, eval_helpful_history, eval_harmful_history, metrics_helpful, metrics_harmful, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( knowledge_id, knowledge.message_id, ",".join(knowledge.tags_type), knowledge.scenario, knowledge.content, ",".join(knowledge.urls), knowledge.agent_id, now, knowledge.score, 0, # eval_helpful 0, # eval_harmful "[]", # eval_helpful_history "[]", # eval_harmful_history 1, # metrics_helpful 0, # metrics_harmful now, now, ), ) conn.commit() return {"status": "ok", "knowledge_id": knowledge_id} finally: conn.close() @app.get("/api/knowledge") def list_knowledge( limit: int = Query(default=10, ge=1, le=100), tags_type: Optional[str] = None ): """列出知识""" conn = get_db() try: query = "SELECT * FROM knowledge" params = [] if tags_type: query += " WHERE tags_type LIKE ?" params.append(f"%{tags_type}%") query += " ORDER BY created_at DESC LIMIT ?" params.append(limit) rows = conn.execute(query, params).fetchall() results = [] for row in rows: results.append({ "id": row["id"], "message_id": row["message_id"], "tags": {"type": row["tags_type"].split(",") if row["tags_type"] else []}, "scenario": row["scenario"], "content": row["content"], "source": { "urls": row["source_urls"].split(",") if row["source_urls"] else [], "agent_id": row["source_agent_id"], "timestamp": row["source_timestamp"] }, "eval": { "score": row["eval_score"], "helpful": row["eval_helpful"], "harmful": row["eval_harmful"] }, "metrics": { "helpful": row["metrics_helpful"], "harmful": row["metrics_harmful"] }, "created_at": row["created_at"], "updated_at": row["updated_at"] }) return {"results": results, "count": len(results)} finally: conn.close() @app.get("/api/knowledge/{knowledge_id}") def get_knowledge(knowledge_id: str): """获取单条知识""" conn = get_db() try: row = conn.execute( "SELECT * FROM knowledge WHERE id = ?", (knowledge_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}") return { "id": row["id"], "message_id": row["message_id"], "tags": {"type": row["tags_type"].split(",") if row["tags_type"] else []}, "scenario": row["scenario"], "content": row["content"], "source": { "urls": row["source_urls"].split(",") if row["source_urls"] else [], "agent_id": row["source_agent_id"], "timestamp": row["source_timestamp"] }, "eval": { "score": row["eval_score"], "helpful": row["eval_helpful"], "harmful": row["eval_harmful"], "helpful_history": [], "harmful_history": [] }, "metrics": { "helpful": row["metrics_helpful"], "harmful": row["metrics_harmful"] }, "created_at": row["created_at"], "updated_at": row["updated_at"] } finally: conn.close() async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str: """使用 LLM 进行知识进化重写""" prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。 【原知识内容】: {old_content} 【实战反馈建议】: {feedback} 【重写要求】: 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。 3. 语言:简洁直接,使用中文。 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。 """ try: response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model="google/gemini-2.0-flash-001" ) evolved = response.get("content", "").strip() if len(evolved) < 5: raise ValueError("LLM output too short") return evolved except Exception as e: print(f"知识进化失败,采用追加模式回退: {e}") return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}" @app.put("/api/knowledge/{knowledge_id}") async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn): """更新知识评估,支持知识进化""" conn = get_db() try: row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}") now = datetime.now(timezone.utc).isoformat() updates = {"updated_at": now} if update.update_score is not None: updates["eval_score"] = update.update_score if update.add_helpful_case: helpful_history = json.loads(row["eval_helpful_history"] or "[]") helpful_history.append(update.add_helpful_case) updates["eval_helpful"] = row["eval_helpful"] + 1 updates["eval_helpful_history"] = json.dumps(helpful_history, ensure_ascii=False) updates["metrics_helpful"] = row["metrics_helpful"] + 1 if update.add_harmful_case: harmful_history = json.loads(row["eval_harmful_history"] or "[]") harmful_history.append(update.add_harmful_case) updates["eval_harmful"] = row["eval_harmful"] + 1 updates["eval_harmful_history"] = json.dumps(harmful_history, ensure_ascii=False) updates["metrics_harmful"] = row["metrics_harmful"] + 1 if update.evolve_feedback: evolved_content = await _evolve_knowledge_with_llm(row["content"], update.evolve_feedback) updates["content"] = evolved_content updates["metrics_helpful"] = updates.get("metrics_helpful", row["metrics_helpful"]) + 1 set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [knowledge_id] conn.execute(f"UPDATE knowledge SET {set_clause} WHERE id = ?", values) conn.commit() return {"status": "ok", "knowledge_id": knowledge_id} finally: conn.close() @app.post("/api/knowledge/batch_update") async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn): """批量反馈知识有效性""" if not batch.feedback_list: return {"status": "ok", "updated": 0} conn = get_db() try: # 先处理无需进化的,收集需要进化的 evolution_tasks = [] # [(knowledge_id, old_content, feedback)] simple_updates = [] # [(knowledge_id, is_effective)] for item in batch.feedback_list: knowledge_id = item.get("knowledge_id") is_effective = item.get("is_effective") feedback = item.get("feedback", "") if not knowledge_id: continue row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone() if not row: continue if is_effective and feedback: evolution_tasks.append((knowledge_id, row["content"], feedback, row["metrics_helpful"])) else: simple_updates.append((knowledge_id, is_effective, row["metrics_helpful"], row["metrics_harmful"])) # 执行简单更新 now = datetime.now(timezone.utc).isoformat() for knowledge_id, is_effective, cur_helpful, cur_harmful in simple_updates: if is_effective: conn.execute( "UPDATE knowledge SET metrics_helpful = ?, updated_at = ? WHERE id = ?", (cur_helpful + 1, now, knowledge_id) ) else: conn.execute( "UPDATE knowledge SET metrics_harmful = ?, updated_at = ? WHERE id = ?", (cur_harmful + 1, now, knowledge_id) ) # 并发执行知识进化 if evolution_tasks: print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...") evolved_results = await asyncio.gather( *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _ in evolution_tasks] ) for (knowledge_id, _, _, cur_helpful), evolved_content in zip(evolution_tasks, evolved_results): conn.execute( "UPDATE knowledge SET content = ?, metrics_helpful = ?, updated_at = ? WHERE id = ?", (evolved_content, cur_helpful + 1, now, knowledge_id) ) conn.commit() return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)} finally: conn.close() @app.post("/api/knowledge/slim") async def slim_knowledge(model: str = "anthropic/claude-sonnet-4-5"): """知识库瘦身:合并语义相似知识""" conn = get_db() try: rows = conn.execute("SELECT * FROM knowledge ORDER BY metrics_helpful DESC").fetchall() if len(rows) < 2: return {"status": "ok", "message": f"知识库仅有 {len(rows)} 条,无需瘦身"} # 构造发给大模型的内容 entries_text = "" for row in rows: entries_text += f"[ID: {row['id']}] [Tags: {row['tags_type']}] " entries_text += f"[Helpful: {row['metrics_helpful']}, Harmful: {row['metrics_harmful']}] [Score: {row['eval_score']}]\n" entries_text += f"Scenario: {row['scenario']}\n" entries_text += f"Content: {row['content'][:200]}...\n\n" prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作: 【任务】: 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。 2. 合并时保留 helpful 最高的那条的 ID(metrics_helpful 取各条之和)。 3. 对于独立的、无重复的知识,保持原样不动。 【当前知识库】: {entries_text} 【输出格式要求】: 严格按以下格式输出每条知识,条目之间用 === 分隔: ID: <保留的id> TAGS: <逗号分隔的type列表> HELPFUL: <合并后的helpful计数> HARMFUL: <合并后的harmful计数> SCORE: <评分> SCENARIO: <场景描述> CONTENT: <合并后的知识内容> === 最后输出合并报告: REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。 禁止输出任何开场白或解释。""" print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(rows)} 条知识...") response = await openrouter_llm_call( messages=[{"role": "user", "content": prompt}], model=model ) content = response.get("content", "").strip() if not content: raise HTTPException(status_code=500, detail="LLM 返回为空") # 解析大模型输出 report_line = "" new_entries = [] blocks = [b.strip() for b in content.split("===") if b.strip()] for block in blocks: if block.startswith("REPORT:"): report_line = block continue lines = block.split("\n") kid, tags, helpful, harmful, score, scenario, content_lines = None, "", 0, 0, 3, "", [] current_field = None for line in lines: if line.startswith("ID:"): kid = line[3:].strip() current_field = None elif line.startswith("TAGS:"): tags = line[5:].strip() current_field = None elif line.startswith("HELPFUL:"): try: helpful = int(line[8:].strip()) except Exception: helpful = 0 current_field = None elif line.startswith("HARMFUL:"): try: harmful = int(line[8:].strip()) except Exception: harmful = 0 current_field = None elif line.startswith("SCORE:"): try: score = int(line[6:].strip()) except Exception: score = 3 current_field = None elif line.startswith("SCENARIO:"): scenario = line[9:].strip() current_field = "scenario" elif line.startswith("CONTENT:"): content_lines.append(line[8:].strip()) current_field = "content" elif current_field == "scenario": scenario += "\n" + line elif current_field == "content": content_lines.append(line) if kid and content_lines: new_entries.append({ "id": kid, "tags": tags, "helpful": helpful, "harmful": harmful, "score": score, "scenario": scenario.strip(), "content": "\n".join(content_lines).strip() }) if not new_entries: raise HTTPException(status_code=500, detail="解析大模型输出失败") # 原子化写回 now = datetime.now(timezone.utc).isoformat() conn.execute("DELETE FROM knowledge") for e in new_entries: conn.execute( """INSERT INTO knowledge (id, message_id, tags_type, scenario, content, source_urls, source_agent_id, source_timestamp, eval_score, eval_helpful, eval_harmful, eval_helpful_history, eval_harmful_history, metrics_helpful, metrics_harmful, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (e["id"], "", e["tags"], e["scenario"], e["content"], "", "slim", now, e["score"], 0, 0, "[]", "[]", e["helpful"], e["harmful"], now, now) ) conn.commit() result_msg = f"瘦身完成:{len(rows)} → {len(new_entries)} 条知识" if report_line: result_msg += f"\n{report_line}" print(f"[知识瘦身] {result_msg}") return {"status": "ok", "before": len(rows), "after": len(new_entries), "report": report_line} finally: conn.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)