| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091 |
- """
- KnowHub Server
- Agent 工具使用经验的共享平台。
- FastAPI + SQLite,单文件部署。
- """
- import os
- import re
- import json
- import sqlite3
- import asyncio
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone
- from typing import Optional
- from pathlib import Path
- from fastapi import FastAPI, HTTPException, Query
- from pydantic import BaseModel, Field
- # 导入 LLM 调用(需要 agent 模块在 Python path 中)
- import sys
- sys.path.insert(0, str(Path(__file__).parent.parent))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv(Path(__file__).parent.parent / ".env")
- from agent.llm.openrouter import openrouter_llm_call
- BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
- BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
- BRAND_DB = os.getenv("BRAND_DB", "knowhub.db")
- DB_PATH = Path(__file__).parent / BRAND_DB
- # --- 数据库 ---
- def get_db() -> sqlite3.Connection:
- conn = sqlite3.connect(str(DB_PATH))
- conn.row_factory = sqlite3.Row
- conn.execute("PRAGMA journal_mode=WAL")
- return conn
- def init_db():
- conn = get_db()
- conn.execute("""
- CREATE TABLE IF NOT EXISTS experiences (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- url TEXT DEFAULT '',
- category TEXT DEFAULT '',
- task TEXT NOT NULL,
- score INTEGER CHECK(score BETWEEN 1 AND 5),
- outcome TEXT DEFAULT '',
- tips TEXT DEFAULT '',
- content_id TEXT DEFAULT '',
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)")
- conn.execute("""
- CREATE TABLE IF NOT EXISTS contents (
- id TEXT PRIMARY KEY,
- title TEXT DEFAULT '',
- body TEXT NOT NULL,
- sort_order INTEGER DEFAULT 0,
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.execute("""
- CREATE TABLE IF NOT EXISTS knowledge (
- id TEXT PRIMARY KEY,
- message_id TEXT DEFAULT '',
- types TEXT NOT NULL, -- JSON array: ["strategy", "tool"]
- task TEXT NOT NULL,
- tags TEXT DEFAULT '{}', -- JSON object: {"category": "...", "domain": "..."}
- scopes TEXT DEFAULT '["org:cybertogether"]', -- JSON array
- owner TEXT DEFAULT '',
- content TEXT NOT NULL,
- source TEXT DEFAULT '{}', -- JSON object: {name, category, urls, agent_id, submitted_by, timestamp}
- eval TEXT DEFAULT '{}', -- JSON object: {score, helpful, harmful, confidence, histories}
- created_at TEXT NOT NULL,
- updated_at TEXT DEFAULT ''
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_types ON knowledge(types)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_task ON knowledge(task)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_owner ON knowledge(owner)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_scopes ON knowledge(scopes)")
- conn.commit()
- conn.close()
- # --- Models ---
- class ExperienceIn(BaseModel):
- name: str
- url: str = ""
- category: str = ""
- task: str
- score: int = Field(ge=1, le=5)
- outcome: str = ""
- tips: str = ""
- content_id: str = ""
- submitted_by: str = ""
- class ExperienceOut(BaseModel):
- task: str
- score: int
- outcome: str
- tips: str
- content_id: str
- submitted_by: str
- created_at: str
- class ResourceResult(BaseModel):
- name: str
- url: str
- relevant_experiences: list[ExperienceOut]
- avg_score: float
- experience_count: int
- class SearchResponse(BaseModel):
- results: list[ResourceResult]
- class ResourceDetailResponse(BaseModel):
- name: str
- url: str
- category: str
- avg_score: float
- experience_count: int
- experiences: list[ExperienceOut]
- class ContentIn(BaseModel):
- id: str
- title: str = ""
- body: str
- sort_order: int = 0
- submitted_by: str = ""
- # Knowledge Models
- class KnowledgeIn(BaseModel):
- task: str
- content: str
- types: list[str] = ["strategy"]
- tags: dict = {}
- scopes: list[str] = ["org:cybertogether"]
- owner: str = ""
- message_id: str = ""
- source: dict = {} # {name, category, urls, agent_id, submitted_by, timestamp}
- eval: dict = {} # {score, helpful, harmful, confidence}
- class KnowledgeOut(BaseModel):
- id: str
- message_id: str
- types: list[str]
- task: str
- tags: dict
- scopes: list[str]
- owner: str
- content: str
- source: dict
- eval: dict
- created_at: str
- updated_at: str
- class KnowledgeUpdateIn(BaseModel):
- add_helpful_case: Optional[dict] = None
- add_harmful_case: Optional[dict] = None
- update_score: Optional[int] = Field(default=None, ge=1, le=5)
- evolve_feedback: Optional[str] = None
- class KnowledgeBatchUpdateIn(BaseModel):
- feedback_list: list[dict]
- class KnowledgeSearchResponse(BaseModel):
- results: list[dict]
- count: int
- class ContentNode(BaseModel):
- id: str
- title: str
- class ContentOut(BaseModel):
- id: str
- title: str
- body: str
- toc: Optional[ContentNode] = None
- children: list[ContentNode]
- prev: Optional[ContentNode] = None
- next: Optional[ContentNode] = None
- # --- App ---
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- init_db()
- yield
- app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
- def _search_rows(conn: sqlite3.Connection, q: str, category: Optional[str]) -> list[sqlite3.Row]:
- """LIKE 搜索,拆词后 AND 连接,匹配 task + tips + outcome + name"""
- terms = q.split()
- if not terms:
- return []
- conditions = []
- params: list[str] = []
- for term in terms:
- like = f"%{term}%"
- conditions.append(
- "(task LIKE ? OR tips LIKE ? OR outcome LIKE ? OR name LIKE ?)"
- )
- params.extend([like, like, like, like])
- if category:
- conditions.append("category = ?")
- params.append(category)
- sql = (
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences WHERE "
- + " AND ".join(conditions)
- + " ORDER BY created_at DESC"
- )
- return conn.execute(sql, params).fetchall()
- def _group_by_resource(rows: list[sqlite3.Row], limit: int) -> list[ResourceResult]:
- """按 name 分组并聚合"""
- groups: dict[str, list[sqlite3.Row]] = {}
- for row in rows:
- name = row["name"]
- if name not in groups:
- groups[name] = []
- groups[name].append(row)
- results = []
- for resource_name, resource_rows in groups.items():
- scores = [r["score"] for r in resource_rows]
- avg = sum(scores) / len(scores)
- results.append(ResourceResult(
- name=resource_name,
- url=resource_rows[0]["url"],
- relevant_experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in resource_rows
- ],
- avg_score=round(avg, 1),
- experience_count=len(resource_rows),
- ))
- results.sort(key=lambda r: r.avg_score * r.experience_count, reverse=True)
- return results[:limit]
- @app.get("/api/search", response_model=SearchResponse)
- def search_experiences(
- q: str = Query(..., min_length=1),
- category: Optional[str] = None,
- limit: int = Query(default=10, ge=1, le=50),
- ):
- conn = get_db()
- try:
- rows = _search_rows(conn, q, category)
- return SearchResponse(results=_group_by_resource(rows, limit))
- finally:
- conn.close()
- @app.post("/api/experience", status_code=201)
- def submit_experience(exp: ExperienceIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT INTO experiences"
- "(name, url, category, task, score, outcome, tips, content_id, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (exp.name, exp.url, exp.category, exp.task,
- exp.score, exp.outcome, exp.tips, exp.content_id, exp.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/resource/{name}", response_model=ResourceDetailResponse)
- def get_resource_experiences(name: str):
- conn = get_db()
- try:
- rows = conn.execute(
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences "
- "WHERE name = ? ORDER BY created_at DESC",
- (name,),
- ).fetchall()
- if not rows:
- raise HTTPException(status_code=404, detail=f"No experiences found for resource: {name}")
- scores = [r["score"] for r in rows]
- avg = sum(scores) / len(scores)
- return ResourceDetailResponse(
- name=name,
- url=rows[0]["url"],
- category=rows[0]["category"],
- avg_score=round(avg, 1),
- experience_count=len(rows),
- experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in rows
- ],
- )
- finally:
- conn.close()
- @app.post("/api/content", status_code=201)
- def submit_content(content: ContentIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT OR REPLACE INTO contents"
- "(id, title, body, sort_order, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?)",
- (content.id, content.title, content.body, content.sort_order, content.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/content/{content_id:path}", response_model=ContentOut)
- def get_content(content_id: str):
- conn = get_db()
- try:
- row = conn.execute(
- "SELECT id, title, body, sort_order FROM contents WHERE id = ?",
- (content_id,),
- ).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Content not found: {content_id}")
- # 计算导航上下文
- root_id = content_id.split("/")[0] if "/" in content_id else content_id
- # TOC (根节点)
- toc = None
- if "/" in content_id:
- toc_row = conn.execute(
- "SELECT id, title FROM contents WHERE id = ?",
- (root_id,),
- ).fetchone()
- if toc_row:
- toc = ContentNode(id=toc_row["id"], title=toc_row["title"])
- # Children (子节点)
- children = []
- children_rows = conn.execute(
- "SELECT id, title FROM contents WHERE id LIKE ? AND id != ? ORDER BY sort_order",
- (f"{content_id}/%", content_id),
- ).fetchall()
- children = [ContentNode(id=r["id"], title=r["title"]) for r in children_rows]
- # Prev/Next (同级节点)
- prev_node = None
- next_node = None
- if "/" in content_id:
- siblings = conn.execute(
- "SELECT id, title, sort_order FROM contents WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order",
- (f"{root_id}/%", f"{root_id}/%/%"),
- ).fetchall()
- for i, sib in enumerate(siblings):
- if sib["id"] == content_id:
- if i > 0:
- prev_node = ContentNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"])
- if i < len(siblings) - 1:
- next_node = ContentNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"])
- break
- return ContentOut(
- id=row["id"],
- title=row["title"],
- body=row["body"],
- toc=toc,
- children=children,
- prev=prev_node,
- next=next_node,
- )
- finally:
- conn.close()
- # ===== Knowledge API =====
- # 两阶段检索逻辑
- async def _route_knowledge_by_llm(query_text: str, metadata_list: list[dict], k: int = 5) -> list[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- routing_k = k * 2
- routing_data = [
- {
- "id": m["id"],
- "types": m["types"],
- "task": m["task"][:100]
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
- 任务需求:"{query_text}"
- 可选知识列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
- print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- print(f"LLM 知识路由失败: {e}")
- return []
- async def _search_knowledge_two_stage(
- query_text: str,
- top_k: int = 5,
- min_score: int = 3,
- types_filter: Optional[list[str]] = None,
- conn: sqlite3.Connection = None
- ) -> list[dict]:
- """
- 两阶段检索:语义路由 + 质量精排
- """
- if conn is None:
- conn = get_db()
- should_close = True
- else:
- should_close = False
- try:
- # 阶段 1: 解析所有知识
- query = "SELECT * FROM knowledge"
- rows = conn.execute(query).fetchall()
- if not rows:
- return []
- content_map = {}
- metadata_list = []
- for row in rows:
- kid = row["id"]
- types = json.loads(row["types"])
- # 标签过滤
- if types_filter:
- if not any(t in types for t in types_filter):
- continue
- task = row["task"]
- content_text = row["content"]
- eval_data = json.loads(row["eval"])
- source = json.loads(row["source"])
- meta_item = {
- "id": kid,
- "types": types,
- "task": task,
- "score": eval_data.get("score", 3),
- "helpful": eval_data.get("helpful", 0),
- "harmful": eval_data.get("harmful", 0),
- }
- metadata_list.append(meta_item)
- content_map[kid] = {
- "task": task,
- "content": content_text,
- "types": types,
- "tags": json.loads(row["tags"]),
- "scopes": json.loads(row["scopes"]),
- "owner": row["owner"],
- "score": meta_item["score"],
- "helpful": meta_item["helpful"],
- "harmful": meta_item["harmful"],
- "message_id": row["message_id"],
- "source": source,
- "eval": eval_data,
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- }
- if not metadata_list:
- return []
- # 阶段 2: 语义路由 (取 2*k)
- candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k)
- # 阶段 3: 质量精排
- print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...")
- scored_items = []
- for kid in candidate_ids:
- if kid in content_map:
- item = content_map[kid]
- score = item["score"]
- helpful = item["helpful"]
- harmful = item["harmful"]
- # 计算综合分:基础分 + helpful - harmful*2
- quality_score = score + helpful - (harmful * 2.0)
- # 过滤门槛
- if score < min_score or quality_score < 0:
- print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})")
- continue
- scored_items.append({
- "id": kid,
- "message_id": item["message_id"],
- "types": item["types"],
- "task": item["task"],
- "tags": item["tags"],
- "scopes": item["scopes"],
- "owner": item["owner"],
- "content": item["content"],
- "source": item["source"],
- "eval": item["eval"],
- "quality_score": quality_score,
- "created_at": item["created_at"],
- "updated_at": item["updated_at"]
- })
- # 按照质量分排序
- final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True)
- # 截取最终的 top_k
- result = final_sorted[:top_k]
- print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}")
- print(f"[Knowledge System] 检索结束。\n")
- return result
- finally:
- if should_close:
- conn.close()
- @app.get("/api/knowledge/search")
- async def search_knowledge_api(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(default=5, ge=1, le=20),
- min_score: int = Query(default=3, ge=1, le=5),
- types: Optional[str] = None
- ):
- """检索知识(两阶段:语义路由 + 质量精排)"""
- conn = get_db()
- try:
- types_filter = types.split(",") if types else None
- results = await _search_knowledge_two_stage(
- query_text=q,
- top_k=top_k,
- min_score=min_score,
- types_filter=types_filter,
- conn=conn
- )
- return {"results": results, "count": len(results)}
- finally:
- conn.close()
- @app.post("/api/knowledge", status_code=201)
- def save_knowledge(knowledge: KnowledgeIn):
- """保存新知识"""
- import uuid
- conn = get_db()
- try:
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- now = datetime.now(timezone.utc).isoformat()
- # 设置默认值
- owner = knowledge.owner or f"agent:{knowledge.source.get('agent_id', 'unknown')}"
- # 准备 source
- source = {
- "name": knowledge.source.get("name", ""),
- "category": knowledge.source.get("category", ""),
- "urls": knowledge.source.get("urls", []),
- "agent_id": knowledge.source.get("agent_id", "unknown"),
- "submitted_by": knowledge.source.get("submitted_by", ""),
- "timestamp": now,
- "message_id": knowledge.message_id
- }
- # 准备 eval
- eval_data = {
- "score": knowledge.eval.get("score", 3),
- "helpful": knowledge.eval.get("helpful", 1),
- "harmful": knowledge.eval.get("harmful", 0),
- "confidence": knowledge.eval.get("confidence", 0.5),
- "helpful_history": [],
- "harmful_history": []
- }
- conn.execute(
- """INSERT INTO knowledge
- (id, message_id, types, task, tags, scopes, owner, content,
- source, eval, created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (
- knowledge_id,
- knowledge.message_id,
- json.dumps(knowledge.types),
- knowledge.task,
- json.dumps(knowledge.tags),
- json.dumps(knowledge.scopes),
- owner,
- knowledge.content,
- json.dumps(source),
- json.dumps(eval_data),
- now,
- now,
- ),
- )
- conn.commit()
- return {"status": "ok", "knowledge_id": knowledge_id}
- finally:
- conn.close()
- @app.get("/api/knowledge")
- def list_knowledge(
- limit: int = Query(default=10, ge=1, le=100),
- types: Optional[str] = None,
- scopes: Optional[str] = None
- ):
- """列出知识"""
- conn = get_db()
- try:
- query = "SELECT * FROM knowledge"
- params = []
- conditions = []
- if types:
- conditions.append("types LIKE ?")
- params.append(f"%{types}%")
- if scopes:
- conditions.append("scopes LIKE ?")
- params.append(f"%{scopes}%")
- if conditions:
- query += " WHERE " + " AND ".join(conditions)
- query += " ORDER BY created_at DESC LIMIT ?"
- params.append(limit)
- rows = conn.execute(query, params).fetchall()
- results = []
- for row in rows:
- results.append({
- "id": row["id"],
- "message_id": row["message_id"],
- "types": json.loads(row["types"]),
- "task": row["task"],
- "tags": json.loads(row["tags"]),
- "scopes": json.loads(row["scopes"]),
- "owner": row["owner"],
- "content": row["content"],
- "source": json.loads(row["source"]),
- "eval": json.loads(row["eval"]),
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- })
- return {"results": results, "count": len(results)}
- finally:
- conn.close()
- @app.get("/api/knowledge/{knowledge_id}")
- def get_knowledge(knowledge_id: str):
- """获取单条知识"""
- conn = get_db()
- try:
- row = conn.execute(
- "SELECT * FROM knowledge WHERE id = ?",
- (knowledge_id,)
- ).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- return {
- "id": row["id"],
- "message_id": row["message_id"],
- "types": json.loads(row["types"]),
- "task": row["task"],
- "tags": json.loads(row["tags"]),
- "scopes": json.loads(row["scopes"]),
- "owner": row["owner"],
- "content": row["content"],
- "source": json.loads(row["source"]),
- "eval": json.loads(row["eval"]),
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- }
- finally:
- conn.close()
- async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
- """使用 LLM 进行知识进化重写"""
- prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。
- 【原知识内容】:
- {old_content}
- 【实战反馈建议】:
- {feedback}
- 【重写要求】:
- 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。
- 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。
- 3. 语言:简洁直接,使用中文。
- 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。
- """
- try:
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- evolved = response.get("content", "").strip()
- if len(evolved) < 5:
- raise ValueError("LLM output too short")
- return evolved
- except Exception as e:
- print(f"知识进化失败,采用追加模式回退: {e}")
- return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}"
- @app.put("/api/knowledge/{knowledge_id}")
- async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
- """更新知识评估,支持知识进化"""
- conn = get_db()
- try:
- row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- now = datetime.now(timezone.utc).isoformat()
- eval_data = json.loads(row["eval"])
- # 更新评分
- if update.update_score is not None:
- eval_data["score"] = update.update_score
- # 添加有效案例
- if update.add_helpful_case:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- if "helpful_history" not in eval_data:
- eval_data["helpful_history"] = []
- eval_data["helpful_history"].append(update.add_helpful_case)
- # 添加有害案例
- if update.add_harmful_case:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- if "harmful_history" not in eval_data:
- eval_data["harmful_history"] = []
- eval_data["harmful_history"].append(update.add_harmful_case)
- # 知识进化
- content = row["content"]
- if update.evolve_feedback:
- content = await _evolve_knowledge_with_llm(content, update.evolve_feedback)
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- # 更新数据库
- conn.execute(
- "UPDATE knowledge SET content = ?, eval = ?, updated_at = ? WHERE id = ?",
- (content, json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
- )
- conn.commit()
- return {"status": "ok", "knowledge_id": knowledge_id}
- finally:
- conn.close()
- @app.post("/api/knowledge/batch_update")
- async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
- """批量反馈知识有效性"""
- if not batch.feedback_list:
- return {"status": "ok", "updated": 0}
- conn = get_db()
- try:
- # 先处理无需进化的,收集需要进化的
- evolution_tasks = [] # [(knowledge_id, old_content, feedback, eval_data)]
- simple_updates = [] # [(knowledge_id, is_effective, eval_data)]
- for item in batch.feedback_list:
- knowledge_id = item.get("knowledge_id")
- is_effective = item.get("is_effective")
- feedback = item.get("feedback", "")
- if not knowledge_id:
- continue
- row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
- if not row:
- continue
- eval_data = json.loads(row["eval"])
- if is_effective and feedback:
- evolution_tasks.append((knowledge_id, row["content"], feedback, eval_data))
- else:
- simple_updates.append((knowledge_id, is_effective, eval_data))
- # 执行简单更新
- now = datetime.now(timezone.utc).isoformat()
- for knowledge_id, is_effective, eval_data in simple_updates:
- if is_effective:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- else:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- conn.execute(
- "UPDATE knowledge SET eval = ?, updated_at = ? WHERE id = ?",
- (json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
- )
- # 并发执行知识进化
- if evolution_tasks:
- print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
- evolved_results = await asyncio.gather(
- *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _ in evolution_tasks]
- )
- for (knowledge_id, _, _, eval_data), evolved_content in zip(evolution_tasks, evolved_results):
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- conn.execute(
- "UPDATE knowledge SET content = ?, eval = ?, updated_at = ? WHERE id = ?",
- (evolved_content, json.dumps(eval_data, ensure_ascii=False), now, knowledge_id)
- )
- conn.commit()
- return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)}
- finally:
- conn.close()
- @app.post("/api/knowledge/slim")
- async def slim_knowledge(model: str = "google/gemini-2.0-flash-001"):
- """知识库瘦身:合并语义相似知识"""
- conn = get_db()
- try:
- rows = conn.execute("SELECT * FROM knowledge").fetchall()
- if len(rows) < 2:
- return {"status": "ok", "message": f"知识库仅有 {len(rows)} 条,无需瘦身"}
- # 构造发给大模型的内容
- entries_text = ""
- for row in rows:
- eval_data = json.loads(row["eval"])
- types = json.loads(row["types"])
- entries_text += f"[ID: {row['id']}] [Types: {','.join(types)}] "
- entries_text += f"[Helpful: {eval_data.get('helpful', 0)}, Harmful: {eval_data.get('harmful', 0)}] [Score: {eval_data.get('score', 3)}]\n"
- entries_text += f"Task: {row['task']}\n"
- entries_text += f"Content: {row['content'][:200]}...\n\n"
- prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作:
- 【任务】:
- 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。
- 2. 合并时保留 helpful 最高的那条的 ID(helpful 取各条之和)。
- 3. 对于独立的、无重复的知识,保持原样不动。
- 【当前知识库】:
- {entries_text}
- 【输出格式要求】:
- 严格按以下格式输出每条知识,条目之间用 === 分隔:
- ID: <保留的id>
- TYPES: <逗号分隔的type列表>
- HELPFUL: <合并后的helpful计数>
- HARMFUL: <合并后的harmful计数>
- SCORE: <评分>
- TASK: <任务描述>
- CONTENT: <合并后的知识内容>
- ===
- 最后输出合并报告:
- REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
- 禁止输出任何开场白或解释。"""
- print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(rows)} 条知识...")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model=model
- )
- content = response.get("content", "").strip()
- if not content:
- raise HTTPException(status_code=500, detail="LLM 返回为空")
- # 解析大模型输出
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- kid, types, helpful, harmful, score, task, content_lines = None, [], 0, 0, 3, "", []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- kid = line[3:].strip()
- current_field = None
- elif line.startswith("TYPES:"):
- types_str = line[6:].strip()
- types = [t.strip() for t in types_str.split(",") if t.strip()]
- current_field = None
- elif line.startswith("HELPFUL:"):
- try:
- helpful = int(line[8:].strip())
- except Exception:
- helpful = 0
- current_field = None
- elif line.startswith("HARMFUL:"):
- try:
- harmful = int(line[8:].strip())
- except Exception:
- harmful = 0
- current_field = None
- elif line.startswith("SCORE:"):
- try:
- score = int(line[6:].strip())
- except Exception:
- score = 3
- current_field = None
- elif line.startswith("TASK:"):
- task = line[5:].strip()
- current_field = "task"
- elif line.startswith("CONTENT:"):
- content_lines.append(line[8:].strip())
- current_field = "content"
- elif current_field == "task":
- task += "\n" + line
- elif current_field == "content":
- content_lines.append(line)
- if kid and content_lines:
- new_entries.append({
- "id": kid,
- "types": types if types else ["strategy"],
- "helpful": helpful,
- "harmful": harmful,
- "score": score,
- "task": task.strip(),
- "content": "\n".join(content_lines).strip()
- })
- if not new_entries:
- raise HTTPException(status_code=500, detail="解析大模型输出失败")
- # 原子化写回
- now = datetime.now(timezone.utc).isoformat()
- conn.execute("DELETE FROM knowledge")
- for e in new_entries:
- eval_data = {
- "score": e["score"],
- "helpful": e["helpful"],
- "harmful": e["harmful"],
- "confidence": 0.9,
- "helpful_history": [],
- "harmful_history": []
- }
- source = {
- "name": "slim",
- "category": "exp",
- "urls": [],
- "agent_id": "slim",
- "submitted_by": "system",
- "timestamp": now
- }
- conn.execute(
- """INSERT INTO knowledge
- (id, message_id, types, task, tags, scopes, owner, content, source, eval, created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (
- e["id"],
- "",
- json.dumps(e["types"]),
- e["task"],
- json.dumps({}),
- json.dumps(["org:cybertogether"]),
- "agent:slim",
- e["content"],
- json.dumps(source, ensure_ascii=False),
- json.dumps(eval_data, ensure_ascii=False),
- now,
- now
- )
- )
- conn.commit()
- result_msg = f"瘦身完成:{len(rows)} → {len(new_entries)} 条知识"
- if report_line:
- result_msg += f"\n{report_line}"
- print(f"[知识瘦身] {result_msg}")
- return {"status": "ok", "before": len(rows), "after": len(new_entries), "report": report_line}
- finally:
- conn.close()
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=9999)
|