| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048 |
- """
- KnowHub Server
- Agent 工具使用经验的共享平台。
- FastAPI + SQLite,单文件部署。
- """
- import os
- import re
- import json
- import sqlite3
- import asyncio
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone
- from typing import Optional
- from pathlib import Path
- from fastapi import FastAPI, HTTPException, Query
- from pydantic import BaseModel, Field
- # 导入 LLM 调用(需要 agent 模块在 Python path 中)
- import sys
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from agent.llm.openrouter import openrouter_llm_call
- BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
- BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
- BRAND_DB = os.getenv("BRAND_DB", "knowhub.db")
- DB_PATH = Path(__file__).parent / BRAND_DB
- # --- 数据库 ---
- def get_db() -> sqlite3.Connection:
- conn = sqlite3.connect(str(DB_PATH))
- conn.row_factory = sqlite3.Row
- conn.execute("PRAGMA journal_mode=WAL")
- return conn
- def init_db():
- conn = get_db()
- conn.execute("""
- CREATE TABLE IF NOT EXISTS experiences (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- url TEXT DEFAULT '',
- category TEXT DEFAULT '',
- task TEXT NOT NULL,
- score INTEGER CHECK(score BETWEEN 1 AND 5),
- outcome TEXT DEFAULT '',
- tips TEXT DEFAULT '',
- content_id TEXT DEFAULT '',
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)")
- conn.execute("""
- CREATE TABLE IF NOT EXISTS contents (
- id TEXT PRIMARY KEY,
- title TEXT DEFAULT '',
- body TEXT NOT NULL,
- sort_order INTEGER DEFAULT 0,
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.execute("""
- CREATE TABLE IF NOT EXISTS knowledge (
- id TEXT PRIMARY KEY,
- message_id TEXT DEFAULT '',
- tags_type TEXT NOT NULL,
- scenario TEXT NOT NULL,
- content TEXT NOT NULL,
- source_urls TEXT DEFAULT '',
- source_agent_id TEXT DEFAULT '',
- source_timestamp TEXT NOT NULL,
- eval_score INTEGER DEFAULT 3 CHECK(eval_score BETWEEN 1 AND 5),
- eval_helpful INTEGER DEFAULT 0,
- eval_harmful INTEGER DEFAULT 0,
- eval_helpful_history TEXT DEFAULT '[]',
- eval_harmful_history TEXT DEFAULT '[]',
- metrics_helpful INTEGER DEFAULT 1,
- metrics_harmful INTEGER DEFAULT 0,
- created_at TEXT NOT NULL,
- updated_at TEXT DEFAULT ''
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_tags ON knowledge(tags_type)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_knowledge_scenario ON knowledge(scenario)")
- conn.commit()
- conn.close()
- # --- Models ---
- class ExperienceIn(BaseModel):
- name: str
- url: str = ""
- category: str = ""
- task: str
- score: int = Field(ge=1, le=5)
- outcome: str = ""
- tips: str = ""
- content_id: str = ""
- submitted_by: str = ""
- class ExperienceOut(BaseModel):
- task: str
- score: int
- outcome: str
- tips: str
- content_id: str
- submitted_by: str
- created_at: str
- class ResourceResult(BaseModel):
- name: str
- url: str
- relevant_experiences: list[ExperienceOut]
- avg_score: float
- experience_count: int
- class SearchResponse(BaseModel):
- results: list[ResourceResult]
- class ResourceDetailResponse(BaseModel):
- name: str
- url: str
- category: str
- avg_score: float
- experience_count: int
- experiences: list[ExperienceOut]
- class ContentIn(BaseModel):
- id: str
- title: str = ""
- body: str
- sort_order: int = 0
- submitted_by: str = ""
- # Knowledge Models
- class KnowledgeIn(BaseModel):
- scenario: str
- content: str
- tags_type: list[str]
- urls: list[str] = []
- agent_id: str = "research_agent"
- score: int = Field(default=3, ge=1, le=5)
- message_id: str = ""
- class KnowledgeOut(BaseModel):
- id: str
- message_id: str
- tags: dict
- scenario: str
- content: str
- source: dict
- eval: dict
- metrics: dict
- created_at: str
- updated_at: str
- class KnowledgeUpdateIn(BaseModel):
- add_helpful_case: Optional[dict] = None
- add_harmful_case: Optional[dict] = None
- update_score: Optional[int] = Field(default=None, ge=1, le=5)
- evolve_feedback: Optional[str] = None
- class KnowledgeBatchUpdateIn(BaseModel):
- feedback_list: list[dict]
- class KnowledgeSearchResponse(BaseModel):
- results: list[dict]
- count: int
- class ContentNode(BaseModel):
- id: str
- title: str
- class ContentOut(BaseModel):
- id: str
- title: str
- body: str
- toc: Optional[ContentNode] = None
- children: list[ContentNode]
- prev: Optional[ContentNode] = None
- next: Optional[ContentNode] = None
- # --- App ---
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- init_db()
- yield
- app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
- def _search_rows(conn: sqlite3.Connection, q: str, category: Optional[str]) -> list[sqlite3.Row]:
- """LIKE 搜索,拆词后 AND 连接,匹配 task + tips + outcome + name"""
- terms = q.split()
- if not terms:
- return []
- conditions = []
- params: list[str] = []
- for term in terms:
- like = f"%{term}%"
- conditions.append(
- "(task LIKE ? OR tips LIKE ? OR outcome LIKE ? OR name LIKE ?)"
- )
- params.extend([like, like, like, like])
- if category:
- conditions.append("category = ?")
- params.append(category)
- sql = (
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences WHERE "
- + " AND ".join(conditions)
- + " ORDER BY created_at DESC"
- )
- return conn.execute(sql, params).fetchall()
- def _group_by_resource(rows: list[sqlite3.Row], limit: int) -> list[ResourceResult]:
- """按 name 分组并聚合"""
- groups: dict[str, list[sqlite3.Row]] = {}
- for row in rows:
- name = row["name"]
- if name not in groups:
- groups[name] = []
- groups[name].append(row)
- results = []
- for resource_name, resource_rows in groups.items():
- scores = [r["score"] for r in resource_rows]
- avg = sum(scores) / len(scores)
- results.append(ResourceResult(
- name=resource_name,
- url=resource_rows[0]["url"],
- relevant_experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in resource_rows
- ],
- avg_score=round(avg, 1),
- experience_count=len(resource_rows),
- ))
- results.sort(key=lambda r: r.avg_score * r.experience_count, reverse=True)
- return results[:limit]
- @app.get("/api/search", response_model=SearchResponse)
- def search_experiences(
- q: str = Query(..., min_length=1),
- category: Optional[str] = None,
- limit: int = Query(default=10, ge=1, le=50),
- ):
- conn = get_db()
- try:
- rows = _search_rows(conn, q, category)
- return SearchResponse(results=_group_by_resource(rows, limit))
- finally:
- conn.close()
- @app.post("/api/experience", status_code=201)
- def submit_experience(exp: ExperienceIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT INTO experiences"
- "(name, url, category, task, score, outcome, tips, content_id, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (exp.name, exp.url, exp.category, exp.task,
- exp.score, exp.outcome, exp.tips, exp.content_id, exp.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/resource/{name}", response_model=ResourceDetailResponse)
- def get_resource_experiences(name: str):
- conn = get_db()
- try:
- rows = conn.execute(
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences "
- "WHERE name = ? ORDER BY created_at DESC",
- (name,),
- ).fetchall()
- if not rows:
- raise HTTPException(status_code=404, detail=f"No experiences found for resource: {name}")
- scores = [r["score"] for r in rows]
- avg = sum(scores) / len(scores)
- return ResourceDetailResponse(
- name=name,
- url=rows[0]["url"],
- category=rows[0]["category"],
- avg_score=round(avg, 1),
- experience_count=len(rows),
- experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in rows
- ],
- )
- finally:
- conn.close()
- @app.post("/api/content", status_code=201)
- def submit_content(content: ContentIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT OR REPLACE INTO contents"
- "(id, title, body, sort_order, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?)",
- (content.id, content.title, content.body, content.sort_order, content.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/content/{content_id:path}", response_model=ContentOut)
- def get_content(content_id: str):
- conn = get_db()
- try:
- row = conn.execute(
- "SELECT id, title, body, sort_order FROM contents WHERE id = ?",
- (content_id,),
- ).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Content not found: {content_id}")
- # 计算导航上下文
- root_id = content_id.split("/")[0] if "/" in content_id else content_id
- # TOC (根节点)
- toc = None
- if "/" in content_id:
- toc_row = conn.execute(
- "SELECT id, title FROM contents WHERE id = ?",
- (root_id,),
- ).fetchone()
- if toc_row:
- toc = ContentNode(id=toc_row["id"], title=toc_row["title"])
- # Children (子节点)
- children = []
- children_rows = conn.execute(
- "SELECT id, title FROM contents WHERE id LIKE ? AND id != ? ORDER BY sort_order",
- (f"{content_id}/%", content_id),
- ).fetchall()
- children = [ContentNode(id=r["id"], title=r["title"]) for r in children_rows]
- # Prev/Next (同级节点)
- prev_node = None
- next_node = None
- if "/" in content_id:
- siblings = conn.execute(
- "SELECT id, title, sort_order FROM contents WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order",
- (f"{root_id}/%", f"{root_id}/%/%"),
- ).fetchall()
- for i, sib in enumerate(siblings):
- if sib["id"] == content_id:
- if i > 0:
- prev_node = ContentNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"])
- if i < len(siblings) - 1:
- next_node = ContentNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"])
- break
- return ContentOut(
- id=row["id"],
- title=row["title"],
- body=row["body"],
- toc=toc,
- children=children,
- prev=prev_node,
- next=next_node,
- )
- finally:
- conn.close()
- # ===== Knowledge API =====
- # 两阶段检索逻辑
- async def _route_knowledge_by_llm(query_text: str, metadata_list: list[dict], k: int = 5) -> list[str]:
- """
- 第一阶段:语义路由。
- 让 LLM 挑选出 2*k 个语义相关的 ID。
- """
- if not metadata_list:
- return []
- routing_k = k * 2
- routing_data = [
- {
- "id": m["id"],
- "tags": m["tags"],
- "scenario": m["scenario"][:100]
- } for m in metadata_list
- ]
- prompt = f"""
- 你是一个知识检索专家。根据用户的当前任务需求,从下列原子知识元数据中挑选出最相关的最多 {routing_k} 个知识 ID。
- 任务需求:"{query_text}"
- 可选知识列表:
- {json.dumps(routing_data, ensure_ascii=False, indent=1)}
- 请直接输出 ID 列表,用逗号分隔(例如: knowledge-20260302-001, research-20260302-002)。若无相关项请输出 "None"。
- """
- try:
- print(f"\n[Step 1: 知识语义路由] 任务: '{query_text}' | 候选总数: {len(metadata_list)} | 目标提取数: {routing_k}")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- content = response.get("content", "").strip()
- selected_ids = [idx.strip() for idx in re.split(r'[,\s]+', content) if idx.strip().startswith(("knowledge-", "research-"))]
- print(f"[Step 1: 知识语义路由] LLM 初选 ID ({len(selected_ids)}个): {selected_ids}")
- return selected_ids
- except Exception as e:
- print(f"LLM 知识路由失败: {e}")
- return []
- async def _search_knowledge_two_stage(
- query_text: str,
- top_k: int = 5,
- min_score: int = 3,
- tags_filter: Optional[list[str]] = None,
- conn: sqlite3.Connection = None
- ) -> list[dict]:
- """
- 两阶段检索:语义路由 + 质量精排
- """
- if conn is None:
- conn = get_db()
- should_close = True
- else:
- should_close = False
- try:
- # 阶段 1: 解析所有知识
- query = "SELECT * FROM knowledge"
- rows = conn.execute(query).fetchall()
- if not rows:
- return []
- content_map = {}
- metadata_list = []
- for row in rows:
- kid = row["id"]
- tags_type = row["tags_type"].split(",") if row["tags_type"] else []
- # 标签过滤
- if tags_filter:
- if not any(tag in tags_type for tag in tags_filter):
- continue
- scenario = row["scenario"]
- content_text = row["content"]
- meta_item = {
- "id": kid,
- "tags": {"type": tags_type},
- "scenario": scenario,
- "score": row["eval_score"],
- "helpful": row["metrics_helpful"],
- "harmful": row["metrics_harmful"],
- }
- metadata_list.append(meta_item)
- content_map[kid] = {
- "scenario": scenario,
- "content": content_text,
- "tags": {"type": tags_type},
- "score": meta_item["score"],
- "helpful": meta_item["helpful"],
- "harmful": meta_item["harmful"],
- "message_id": row["message_id"],
- "source": {
- "urls": row["source_urls"].split(",") if row["source_urls"] else [],
- "agent_id": row["source_agent_id"],
- "timestamp": row["source_timestamp"]
- },
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- }
- if not metadata_list:
- return []
- # 阶段 2: 语义路由 (取 2*k)
- candidate_ids = await _route_knowledge_by_llm(query_text, metadata_list, k=top_k)
- # 阶段 3: 质量精排
- print(f"[Step 2: 知识质量精排] 正在根据评分和反馈进行打分...")
- scored_items = []
- for kid in candidate_ids:
- if kid in content_map:
- item = content_map[kid]
- score = item["score"]
- helpful = item["helpful"]
- harmful = item["harmful"]
- # 计算综合分:基础分 + helpful - harmful*2
- quality_score = score + helpful - (harmful * 2.0)
- # 过滤门槛
- if score < min_score or quality_score < 0:
- print(f" - 剔除低质量知识: {kid} (Score: {score}, Helpful: {helpful}, Harmful: {harmful})")
- continue
- scored_items.append({
- "id": kid,
- "message_id": item["message_id"],
- "scenario": item["scenario"],
- "content": item["content"],
- "tags": item["tags"],
- "score": score,
- "quality_score": quality_score,
- "metrics": {
- "helpful": helpful,
- "harmful": harmful
- },
- "source": item["source"],
- "created_at": item["created_at"],
- "updated_at": item["updated_at"]
- })
- # 按照质量分排序
- final_sorted = sorted(scored_items, key=lambda x: x["quality_score"], reverse=True)
- # 截取最终的 top_k
- result = final_sorted[:top_k]
- print(f"[Step 2: 知识质量精排] 最终选定知识: {[it['id'] for it in result]}")
- print(f"[Knowledge System] 检索结束。\n")
- return result
- finally:
- if should_close:
- conn.close()
- @app.get("/api/knowledge/search")
- async def search_knowledge_api(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(default=5, ge=1, le=20),
- min_score: int = Query(default=3, ge=1, le=5),
- tags_type: Optional[str] = None
- ):
- """检索知识(两阶段:语义路由 + 质量精排)"""
- conn = get_db()
- try:
- tags_filter = tags_type.split(",") if tags_type else None
- results = await _search_knowledge_two_stage(
- query_text=q,
- top_k=top_k,
- min_score=min_score,
- tags_filter=tags_filter,
- conn=conn
- )
- return {"results": results, "count": len(results)}
- finally:
- conn.close()
- @app.post("/api/knowledge", status_code=201)
- def save_knowledge(knowledge: KnowledgeIn):
- """保存新知识"""
- import uuid
- conn = get_db()
- try:
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- """INSERT INTO knowledge
- (id, message_id, tags_type, scenario, content,
- source_urls, source_agent_id, source_timestamp,
- eval_score, eval_helpful, eval_harmful,
- eval_helpful_history, eval_harmful_history,
- metrics_helpful, metrics_harmful, created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (
- knowledge_id,
- knowledge.message_id,
- ",".join(knowledge.tags_type),
- knowledge.scenario,
- knowledge.content,
- ",".join(knowledge.urls),
- knowledge.agent_id,
- now,
- knowledge.score,
- 0, # eval_helpful
- 0, # eval_harmful
- "[]", # eval_helpful_history
- "[]", # eval_harmful_history
- 1, # metrics_helpful
- 0, # metrics_harmful
- now,
- now,
- ),
- )
- conn.commit()
- return {"status": "ok", "knowledge_id": knowledge_id}
- finally:
- conn.close()
- @app.get("/api/knowledge")
- def list_knowledge(
- limit: int = Query(default=10, ge=1, le=100),
- tags_type: Optional[str] = None
- ):
- """列出知识"""
- conn = get_db()
- try:
- query = "SELECT * FROM knowledge"
- params = []
- if tags_type:
- query += " WHERE tags_type LIKE ?"
- params.append(f"%{tags_type}%")
- query += " ORDER BY created_at DESC LIMIT ?"
- params.append(limit)
- rows = conn.execute(query, params).fetchall()
- results = []
- for row in rows:
- results.append({
- "id": row["id"],
- "message_id": row["message_id"],
- "tags": {"type": row["tags_type"].split(",") if row["tags_type"] else []},
- "scenario": row["scenario"],
- "content": row["content"],
- "source": {
- "urls": row["source_urls"].split(",") if row["source_urls"] else [],
- "agent_id": row["source_agent_id"],
- "timestamp": row["source_timestamp"]
- },
- "eval": {
- "score": row["eval_score"],
- "helpful": row["eval_helpful"],
- "harmful": row["eval_harmful"]
- },
- "metrics": {
- "helpful": row["metrics_helpful"],
- "harmful": row["metrics_harmful"]
- },
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- })
- return {"results": results, "count": len(results)}
- finally:
- conn.close()
- @app.get("/api/knowledge/{knowledge_id}")
- def get_knowledge(knowledge_id: str):
- """获取单条知识"""
- conn = get_db()
- try:
- row = conn.execute(
- "SELECT * FROM knowledge WHERE id = ?",
- (knowledge_id,)
- ).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- return {
- "id": row["id"],
- "message_id": row["message_id"],
- "tags": {"type": row["tags_type"].split(",") if row["tags_type"] else []},
- "scenario": row["scenario"],
- "content": row["content"],
- "source": {
- "urls": row["source_urls"].split(",") if row["source_urls"] else [],
- "agent_id": row["source_agent_id"],
- "timestamp": row["source_timestamp"]
- },
- "eval": {
- "score": row["eval_score"],
- "helpful": row["eval_helpful"],
- "harmful": row["eval_harmful"],
- "helpful_history": [],
- "harmful_history": []
- },
- "metrics": {
- "helpful": row["metrics_helpful"],
- "harmful": row["metrics_harmful"]
- },
- "created_at": row["created_at"],
- "updated_at": row["updated_at"]
- }
- finally:
- conn.close()
- async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
- """使用 LLM 进行知识进化重写"""
- prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。
- 【原知识内容】:
- {old_content}
- 【实战反馈建议】:
- {feedback}
- 【重写要求】:
- 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。
- 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。
- 3. 语言:简洁直接,使用中文。
- 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。
- """
- try:
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model="google/gemini-2.0-flash-001"
- )
- evolved = response.get("content", "").strip()
- if len(evolved) < 5:
- raise ValueError("LLM output too short")
- return evolved
- except Exception as e:
- print(f"知识进化失败,采用追加模式回退: {e}")
- return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}"
- @app.put("/api/knowledge/{knowledge_id}")
- async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
- """更新知识评估,支持知识进化"""
- conn = get_db()
- try:
- row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- now = datetime.now(timezone.utc).isoformat()
- updates = {"updated_at": now}
- if update.update_score is not None:
- updates["eval_score"] = update.update_score
- if update.add_helpful_case:
- helpful_history = json.loads(row["eval_helpful_history"] or "[]")
- helpful_history.append(update.add_helpful_case)
- updates["eval_helpful"] = row["eval_helpful"] + 1
- updates["eval_helpful_history"] = json.dumps(helpful_history, ensure_ascii=False)
- updates["metrics_helpful"] = row["metrics_helpful"] + 1
- if update.add_harmful_case:
- harmful_history = json.loads(row["eval_harmful_history"] or "[]")
- harmful_history.append(update.add_harmful_case)
- updates["eval_harmful"] = row["eval_harmful"] + 1
- updates["eval_harmful_history"] = json.dumps(harmful_history, ensure_ascii=False)
- updates["metrics_harmful"] = row["metrics_harmful"] + 1
- if update.evolve_feedback:
- evolved_content = await _evolve_knowledge_with_llm(row["content"], update.evolve_feedback)
- updates["content"] = evolved_content
- updates["metrics_helpful"] = updates.get("metrics_helpful", row["metrics_helpful"]) + 1
- set_clause = ", ".join(f"{k} = ?" for k in updates)
- values = list(updates.values()) + [knowledge_id]
- conn.execute(f"UPDATE knowledge SET {set_clause} WHERE id = ?", values)
- conn.commit()
- return {"status": "ok", "knowledge_id": knowledge_id}
- finally:
- conn.close()
- @app.post("/api/knowledge/batch_update")
- async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
- """批量反馈知识有效性"""
- if not batch.feedback_list:
- return {"status": "ok", "updated": 0}
- conn = get_db()
- try:
- # 先处理无需进化的,收集需要进化的
- evolution_tasks = [] # [(knowledge_id, old_content, feedback)]
- simple_updates = [] # [(knowledge_id, is_effective)]
- for item in batch.feedback_list:
- knowledge_id = item.get("knowledge_id")
- is_effective = item.get("is_effective")
- feedback = item.get("feedback", "")
- if not knowledge_id:
- continue
- row = conn.execute("SELECT * FROM knowledge WHERE id = ?", (knowledge_id,)).fetchone()
- if not row:
- continue
- if is_effective and feedback:
- evolution_tasks.append((knowledge_id, row["content"], feedback, row["metrics_helpful"]))
- else:
- simple_updates.append((knowledge_id, is_effective, row["metrics_helpful"], row["metrics_harmful"]))
- # 执行简单更新
- now = datetime.now(timezone.utc).isoformat()
- for knowledge_id, is_effective, cur_helpful, cur_harmful in simple_updates:
- if is_effective:
- conn.execute(
- "UPDATE knowledge SET metrics_helpful = ?, updated_at = ? WHERE id = ?",
- (cur_helpful + 1, now, knowledge_id)
- )
- else:
- conn.execute(
- "UPDATE knowledge SET metrics_harmful = ?, updated_at = ? WHERE id = ?",
- (cur_harmful + 1, now, knowledge_id)
- )
- # 并发执行知识进化
- if evolution_tasks:
- print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
- evolved_results = await asyncio.gather(
- *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _ in evolution_tasks]
- )
- for (knowledge_id, _, _, cur_helpful), evolved_content in zip(evolution_tasks, evolved_results):
- conn.execute(
- "UPDATE knowledge SET content = ?, metrics_helpful = ?, updated_at = ? WHERE id = ?",
- (evolved_content, cur_helpful + 1, now, knowledge_id)
- )
- conn.commit()
- return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)}
- finally:
- conn.close()
- @app.post("/api/knowledge/slim")
- async def slim_knowledge(model: str = "anthropic/claude-sonnet-4-5"):
- """知识库瘦身:合并语义相似知识"""
- conn = get_db()
- try:
- rows = conn.execute("SELECT * FROM knowledge ORDER BY metrics_helpful DESC").fetchall()
- if len(rows) < 2:
- return {"status": "ok", "message": f"知识库仅有 {len(rows)} 条,无需瘦身"}
- # 构造发给大模型的内容
- entries_text = ""
- for row in rows:
- entries_text += f"[ID: {row['id']}] [Tags: {row['tags_type']}] "
- entries_text += f"[Helpful: {row['metrics_helpful']}, Harmful: {row['metrics_harmful']}] [Score: {row['eval_score']}]\n"
- entries_text += f"Scenario: {row['scenario']}\n"
- entries_text += f"Content: {row['content'][:200]}...\n\n"
- prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作:
- 【任务】:
- 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。
- 2. 合并时保留 helpful 最高的那条的 ID(metrics_helpful 取各条之和)。
- 3. 对于独立的、无重复的知识,保持原样不动。
- 【当前知识库】:
- {entries_text}
- 【输出格式要求】:
- 严格按以下格式输出每条知识,条目之间用 === 分隔:
- ID: <保留的id>
- TAGS: <逗号分隔的type列表>
- HELPFUL: <合并后的helpful计数>
- HARMFUL: <合并后的harmful计数>
- SCORE: <评分>
- SCENARIO: <场景描述>
- CONTENT: <合并后的知识内容>
- ===
- 最后输出合并报告:
- REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
- 禁止输出任何开场白或解释。"""
- print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(rows)} 条知识...")
- response = await openrouter_llm_call(
- messages=[{"role": "user", "content": prompt}],
- model=model
- )
- content = response.get("content", "").strip()
- if not content:
- raise HTTPException(status_code=500, detail="LLM 返回为空")
- # 解析大模型输出
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- kid, tags, helpful, harmful, score, scenario, content_lines = None, "", 0, 0, 3, "", []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- kid = line[3:].strip()
- current_field = None
- elif line.startswith("TAGS:"):
- tags = line[5:].strip()
- current_field = None
- elif line.startswith("HELPFUL:"):
- try:
- helpful = int(line[8:].strip())
- except Exception:
- helpful = 0
- current_field = None
- elif line.startswith("HARMFUL:"):
- try:
- harmful = int(line[8:].strip())
- except Exception:
- harmful = 0
- current_field = None
- elif line.startswith("SCORE:"):
- try:
- score = int(line[6:].strip())
- except Exception:
- score = 3
- current_field = None
- elif line.startswith("SCENARIO:"):
- scenario = line[9:].strip()
- current_field = "scenario"
- elif line.startswith("CONTENT:"):
- content_lines.append(line[8:].strip())
- current_field = "content"
- elif current_field == "scenario":
- scenario += "\n" + line
- elif current_field == "content":
- content_lines.append(line)
- if kid and content_lines:
- new_entries.append({
- "id": kid,
- "tags": tags,
- "helpful": helpful,
- "harmful": harmful,
- "score": score,
- "scenario": scenario.strip(),
- "content": "\n".join(content_lines).strip()
- })
- if not new_entries:
- raise HTTPException(status_code=500, detail="解析大模型输出失败")
- # 原子化写回
- now = datetime.now(timezone.utc).isoformat()
- conn.execute("DELETE FROM knowledge")
- for e in new_entries:
- conn.execute(
- """INSERT INTO knowledge
- (id, message_id, tags_type, scenario, content,
- source_urls, source_agent_id, source_timestamp,
- eval_score, eval_helpful, eval_harmful,
- eval_helpful_history, eval_harmful_history,
- metrics_helpful, metrics_harmful, created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
- (e["id"], "", e["tags"], e["scenario"], e["content"],
- "", "slim", now,
- e["score"], 0, 0, "[]", "[]",
- e["helpful"], e["harmful"], now, now)
- )
- conn.commit()
- result_msg = f"瘦身完成:{len(rows)} → {len(new_entries)} 条知识"
- if report_line:
- result_msg += f"\n{report_line}"
- print(f"[知识瘦身] {result_msg}")
- return {"status": "ok", "before": len(rows), "after": len(new_entries), "report": report_line}
- finally:
- conn.close()
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
|