| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040 |
- """
- KnowHub Server
- Agent 工具使用经验的共享平台。
- FastAPI + Milvus Lite(知识)+ SQLite(资源),单文件部署。
- """
- import os
- import re
- import json
- import asyncio
- import base64
- import time
- import uuid
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone
- from typing import Optional, List, Dict
- from pathlib import Path
- from cryptography.hazmat.primitives.ciphers.aead import AESGCM
- from fastapi import FastAPI, HTTPException, Query, Header, Body, BackgroundTasks
- from fastapi.responses import HTMLResponse, FileResponse
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel, Field
- # 导入 LLM 调用(需要 agent 模块在 Python path 中)
- import sys
- sys.path.insert(0, str(Path(__file__).parent.parent))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv(Path(__file__).parent.parent / ".env")
- from agent.llm import create_openrouter_llm_call, create_qwen_llm_call
- _dedup_llm = create_openrouter_llm_call(model="google/gemini-2.5-flash-lite")
- _tool_analysis_llm = create_qwen_llm_call(model="qwen3.5-plus")
- # 导入向量存储和 embedding
- from knowhub.knowhub_db.pg_store import PostgreSQLStore
- from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
- from knowhub.embeddings import get_embedding, get_embeddings_batch
- BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
- BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
- BRAND_DB = os.getenv("BRAND_DB", "knowhub.db")
- # 组织密钥配置(格式:org1:key1_base64,org2:key2_base64)
- ORG_KEYS_RAW = os.getenv("ORG_KEYS", "")
- ORG_KEYS = {}
- if ORG_KEYS_RAW:
- for pair in ORG_KEYS_RAW.split(","):
- if ":" in pair:
- org, key_b64 = pair.split(":", 1)
- ORG_KEYS[org.strip()] = key_b64.strip()
- DB_PATH = Path(__file__).parent / BRAND_DB
- # 全局 PostgreSQL 存储实例
- pg_store: Optional[PostgreSQLStore] = None
- pg_resource_store: Optional[PostgreSQLResourceStore] = None
- # --- 加密/解密 ---
- def get_org_key(resource_id: str) -> Optional[bytes]:
- """从content_id提取组织前缀,返回对应密钥"""
- if "/" in resource_id:
- org = resource_id.split("/")[0]
- if org in ORG_KEYS:
- return base64.b64decode(ORG_KEYS[org])
- return None
- def encrypt_content(resource_id: str, plaintext: str) -> str:
- """加密内容,返回格式:encrypted:AES256-GCM:{base64_data}"""
- if not plaintext:
- return ""
- key = get_org_key(resource_id)
- if not key:
- # 没有配置密钥,明文存储(不推荐)
- return plaintext
- aesgcm = AESGCM(key)
- nonce = os.urandom(12) # 96-bit nonce
- ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
- # 组合 nonce + ciphertext
- encrypted_data = nonce + ciphertext
- encoded = base64.b64encode(encrypted_data).decode("ascii")
- return f"encrypted:AES256-GCM:{encoded}"
- def decrypt_content(resource_id: str, encrypted_text: str, provided_key: Optional[str] = None) -> str:
- """解密内容,如果没有提供密钥或密钥错误,返回[ENCRYPTED]"""
- if not encrypted_text:
- return ""
- if not encrypted_text.startswith("encrypted:AES256-GCM:"):
- # 未加密的内容,直接返回
- return encrypted_text
- # 提取加密数据
- encoded = encrypted_text.split(":", 2)[2]
- encrypted_data = base64.b64decode(encoded)
- nonce = encrypted_data[:12]
- ciphertext = encrypted_data[12:]
- # 获取密钥
- key = None
- if provided_key:
- # 使用提供的密钥
- try:
- key = base64.b64decode(provided_key)
- except Exception:
- return "[ENCRYPTED]"
- else:
- # 从配置中获取
- key = get_org_key(resource_id)
- if not key:
- return "[ENCRYPTED]"
- try:
- aesgcm = AESGCM(key)
- plaintext = aesgcm.decrypt(nonce, ciphertext, None)
- return plaintext.decode("utf-8")
- except Exception:
- return "[ENCRYPTED]"
- def serialize_milvus_result(data):
- """将 Milvus 返回的数据转换为可序列化的字典"""
- # 基本类型直接返回
- if data is None or isinstance(data, (str, int, float, bool)):
- return data
- # 字典类型递归处理
- if isinstance(data, dict):
- return {k: serialize_milvus_result(v) for k, v in data.items()}
- # 列表/元组类型递归处理
- if isinstance(data, (list, tuple)):
- return [serialize_milvus_result(item) for item in data]
- # 尝试转换为字典(对于有 to_dict 方法的对象)
- if hasattr(data, 'to_dict') and callable(getattr(data, 'to_dict')):
- try:
- return serialize_milvus_result(data.to_dict())
- except:
- pass
- # 尝试转换为列表(对于可迭代对象,如 RepeatedScalarContainer)
- if hasattr(data, '__iter__') and not isinstance(data, (str, bytes, dict)):
- try:
- # 强制转换为列表并递归处理
- result = []
- for item in data:
- result.append(serialize_milvus_result(item))
- return result
- except:
- pass
- # 尝试获取对象的属性字典
- if hasattr(data, '__dict__'):
- try:
- return serialize_milvus_result(vars(data))
- except:
- pass
- # 最后的 fallback:对于无法处理的类型,返回 None 而不是字符串表示
- # 这样可以避免产生无法序列化的字符串
- return None
- # --- Models ---
- class ResourceIn(BaseModel):
- id: str
- title: str = ""
- body: str
- secure_body: str = ""
- content_type: str = "text" # text|code|credential|cookie
- metadata: dict = {}
- sort_order: int = 0
- submitted_by: str = ""
- class ResourcePatchIn(BaseModel):
- """PATCH /api/resource/{id} 请求体"""
- title: Optional[str] = None
- body: Optional[str] = None
- secure_body: Optional[str] = None
- content_type: Optional[str] = None
- metadata: Optional[dict] = None
- # Knowledge Models
- class KnowledgeIn(BaseModel):
- task: str
- content: str
- types: list[str] = ["strategy"]
- tags: dict = {}
- scopes: list[str] = ["org:cybertogether"]
- owner: str = ""
- message_id: str = ""
- resource_ids: list[str] = []
- source: dict = {} # {name, category, urls, agent_id, submitted_by, timestamp}
- eval: dict = {} # {score, helpful, harmful, confidence}
- class KnowledgeOut(BaseModel):
- id: str
- message_id: str
- types: list[str]
- task: str
- tags: dict
- scopes: list[str]
- owner: str
- content: str
- resource_ids: list[str]
- source: dict
- eval: dict
- created_at: str
- updated_at: str
- class KnowledgeUpdateIn(BaseModel):
- add_helpful_case: Optional[dict] = None
- add_harmful_case: Optional[dict] = None
- update_score: Optional[int] = Field(default=None, ge=1, le=5)
- evolve_feedback: Optional[str] = None
- class KnowledgePatchIn(BaseModel):
- """PATCH /api/knowledge/{id} 请求体(直接字段编辑)"""
- task: Optional[str] = None
- content: Optional[str] = None
- types: Optional[list[str]] = None
- tags: Optional[dict] = None
- scopes: Optional[list[str]] = None
- owner: Optional[str] = None
- class MessageExtractIn(BaseModel):
- """POST /api/extract 请求体(消息历史提取)"""
- messages: list[dict] # [{role: str, content: str}, ...]
- agent_id: str = "unknown"
- submitted_by: str # 必填,作为 owner
- session_key: str = ""
- class KnowledgeBatchUpdateIn(BaseModel):
- feedback_list: list[dict]
- class KnowledgeVerifyIn(BaseModel):
- action: str # "approve" | "reject"
- verified_by: str = "user"
- class KnowledgeBatchVerifyIn(BaseModel):
- knowledge_ids: List[str]
- action: str # "approve"
- verified_by: str
- class KnowledgeSearchResponse(BaseModel):
- results: list[dict]
- count: int
- class ResourceNode(BaseModel):
- id: str
- title: str
- class ResourceOut(BaseModel):
- id: str
- title: str
- body: str
- secure_body: str = ""
- content_type: str = "text"
- metadata: dict = {}
- toc: Optional[ResourceNode] = None
- children: list[ResourceNode]
- prev: Optional[ResourceNode] = None
- next: Optional[ResourceNode] = None
- # --- Dedup: Globals & Prompt ---
- knowledge_processor: Optional["KnowledgeProcessor"] = None
- DEDUP_RELATION_PROMPT = """你是知识库管理专家。请判断【新知识】与【相似知识列表】中每条知识的关系。
- 【新知识】
- Task: {new_task}
- Content: {new_content}
- 【相似知识列表】(向量召回 top-10,按相似度排序)
- {existing_list}
- 格式: [序号] ID: xxx | Task: xxx | Content: xxx
- 【关系类型定义】
- - duplicate: task 和 content 语义完全相同,无新增信息 → 新知识应 rejected
- - subset: task语义一致,新知识的content信息完全被某条已有知识覆盖 → 新知识应 rejected
- - superset: task语义一致,新知识包含某条已有知识的全部信息,且有额外内容 → 新知识应 approved
- - conflict: 同一 task 下给出相互矛盾的结论 → 新知识应 approved
- - complement: 描述同一 task 的不同方面,互补 → 新知识应 approved
- - none: task 语义不同,或无实质关系 → 新知识应 approved,不写入 relations
- 【判断步骤】
- 第一步:逐条比较新知识的 task 与列表中每条知识的 task 语义是否一致。
- - task 语义一致 = 两者描述的是同一个问题或目标(即使措辞不同)
- - task 语义不同 = 描述的是不同的问题、不同的工具、不同的场景
- - 如果 task 语义不同,该条关系直接判定为 none,**不再看 content**
- - 只有 task 语义一致时,才进入第二步比较 content
- 第二步:对 task 语义一致的知识,比较 content,判断具体关系类型(duplicate/subset/superset/conflict/complement)。
- **规则**:
- 1. 如果以上类型无法准确描述,可自定义关系类型(英文小写下划线),并自行决定 approved/rejected
- 2. final_decision 为 rejected 时,relations 中必须至少有一条关系说明拒绝原因(type 不能为 none)
- 【输出格式】(严格 JSON,不要其他内容)
- 示例1 - 无关知识(task 不同):
- {{
- "final_decision": "approved",
- "relations": []
- }}
- 示例2 - 重复知识:
- {{
- "final_decision": "rejected",
- "relations": [
- {{
- "old_id": "knowledge-xxx",
- "type": "duplicate",
- "reverse_type": "duplicate"
- }}
- ]
- }}
- 示例3 - 互补知识:
- {{
- "final_decision": "approved",
- "relations": [
- {{
- "old_id": "knowledge-xxx",
- "type": "complement",
- "reverse_type": "complement"
- }}
- ]
- }}
- """
- TOOL_ANALYSIS_PROMPT = """\
- 分析以下知识条目,判断是否涉及"图像创作或解构任务中使用的工具"。
- 工具范畴(包括但不限于):
- - AI 生图平台/模型:Midjourney、Stable Diffusion、DALL-E、Flux、ComfyUI
- - SD 插件/节点:ControlNet、IP-Adapter、InstantID、DWPose、DSINE
- - 图像处理库:rembg、PIL/Pillow、OpenCV、scikit-image
- - LoRA/checkpoint 模型、ComfyUI 自定义节点、AI 绘图辅助工具
- 知识条目:
- task: {task}
- content: {content}
- 要求:
- - 如果涉及上述工具,提取每个工具的信息并以 JSON 格式返回。
- - 如果不涉及任何工具,返回 {{"has_tools": false}}。
- - 只输出 JSON,不要输出其他内容。
- 输出格式:
- {{
- "has_tools": true,
- "tools": [
- {{
- "name": "工具名称(原名)",
- "slug": "小写英文短名,空格换下划线,如 controlnet、ip_adapter",
- "category": "image_gen | image_process | model | plugin | workflow | other",
- "version": "版本号或 null",
- "description": "一句话功能介绍",
- "usage": "核心用法",
- "scenarios": ["应用场景1", "应用场景2"],
- "input": "输入类型描述或 null",
- "output": "输出类型描述或 null",
- "source": "来源/文档链接或 null",
- "status": "未接入"
- }}
- ]
- }}
- """
- # --- Dedup: RelationCache ---
- class RelationCache:
- """关系缓存,存储在内存中"""
- def __init__(self):
- self._cache: Dict[str, List[str]] = {}
- def load(self) -> dict:
- return self._cache
- def save(self, cache: dict):
- self._cache = cache
- def add_relation(self, relation_type: str, knowledge_id: str):
- if relation_type not in self._cache:
- self._cache[relation_type] = []
- if knowledge_id not in self._cache[relation_type]:
- self._cache[relation_type].append(knowledge_id)
- # --- Dedup: KnowledgeProcessor ---
- class KnowledgeProcessor:
- def __init__(self):
- self._lock = asyncio.Lock()
- self._relation_cache = RelationCache()
- async def process_pending(self):
- """持续处理 pending 和 dedup_passed 知识直到队列为空,有锁防并发"""
- if self._lock.locked():
- return
- async with self._lock:
- # 第一阶段:处理 pending(去重)
- while True:
- try:
- pending = pg_store.query('status == "pending"', limit=50)
- except Exception as e:
- print(f"[KnowledgeProcessor] 查询 pending 失败: {e}")
- break
- if not pending:
- break
- for knowledge in pending:
- await self._process_one(knowledge)
- # 第二阶段:处理 dedup_passed(工具关联)
- while True:
- try:
- dedup_passed = pg_store.query('status == "dedup_passed"', limit=50)
- except Exception as e:
- print(f"[KnowledgeProcessor] 查询 dedup_passed 失败: {e}")
- break
- if not dedup_passed:
- break
- for knowledge in dedup_passed:
- await self._analyze_tool_relation(knowledge)
- async def _process_one(self, knowledge: dict):
- kid = knowledge["id"]
- now = int(time.time())
- # 乐观锁:pending → processing(时间戳存秒级)
- try:
- pg_store.update(kid, {"status": "processing", "updated_at": now})
- except Exception as e:
- print(f"[KnowledgeProcessor] 锁定 {kid} 失败: {e}")
- return
- try:
- # 向量召回 top-10(只召回 approved/checked)
- embedding = knowledge.get("embedding")
- if not embedding:
- embedding = await get_embedding(knowledge["task"])
- candidates = pg_store.search(
- query_embedding=embedding,
- filters='(status == "approved" or status == "checked")',
- limit=10
- )
- candidates = [c for c in candidates if c["id"] != kid]
- # 只保留相似度 >= 0.75 的候选,低于阈值的 task 语义差异太大,直接视为 none
- candidates = [c for c in candidates if c.get("score", 0) >= 0.75]
- if not candidates:
- pg_store.update(kid, {"status": "dedup_passed", "updated_at": now})
- return
- llm_result = await self._llm_judge_relations(knowledge, candidates)
- await self._apply_decision(knowledge, llm_result)
- except Exception as e:
- print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},回退到 pending")
- try:
- pg_store.update(kid, {"status": "pending", "updated_at": int(time.time())})
- except Exception:
- pass
- async def _llm_judge_relations(self, new_knowledge: dict, candidates: list) -> dict:
- existing_list = "\n".join([
- f"[{i+1}] ID: {c['id']} | Task: {c['task']} | Content: {c['content'][:300]}"
- for i, c in enumerate(candidates)
- ])
- prompt = DEDUP_RELATION_PROMPT.format(
- new_task=new_knowledge["task"],
- new_content=new_knowledge["content"],
- existing_list=existing_list
- )
- for attempt in range(3):
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 清理 markdown 代码块
- if "```" in content:
- parts = content.split("```")
- for part in parts:
- part = part.strip()
- if part.startswith("json"):
- part = part[4:].strip()
- try:
- result = json.loads(part)
- if "final_decision" in result:
- content = part
- break
- except Exception:
- continue
- result = json.loads(content)
- assert result.get("final_decision") in ("approved", "rejected")
- return result
- except Exception as e:
- print(f"[LLM Judge] 第{attempt+1}次失败: {e}")
- if attempt < 2:
- await asyncio.sleep(1)
- return {"final_decision": "approved", "relations": []}
- async def _apply_decision(self, new_knowledge: dict, llm_result: dict):
- kid = new_knowledge["id"]
- final_decision = llm_result.get("final_decision", "approved")
- relations = llm_result.get("relations", [])
- now = int(time.time())
- # 强制规则:如果存在 duplicate 或 subset 关系,必须 rejected
- if any(rel.get("type") in ("duplicate", "subset") for rel in relations):
- final_decision = "rejected"
- if final_decision == "rejected":
- # 记录 rejected 知识的关系(便于溯源为什么被拒绝)
- rejected_relationships = []
- for rel in relations:
- old_id = rel.get("old_id")
- rel_type = rel.get("type", "none")
- if old_id and rel_type != "none":
- rejected_relationships.append({"type": rel_type, "target": old_id})
- if rel_type in ("duplicate", "subset") and old_id:
- try:
- old = pg_store.get_by_id(old_id)
- if not old:
- continue
- eval_data = old.get("eval") or {}
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- helpful_history = eval_data.get("helpful_history") or []
- helpful_history.append({
- "source": "dedup",
- "related_id": kid,
- "relation_type": rel_type,
- "timestamp": now
- })
- eval_data["helpful_history"] = helpful_history
- pg_store.update(old_id, {"eval": eval_data, "updated_at": now})
- except Exception as e:
- print(f"[Apply Decision] 更新旧知识 {old_id} helpful 失败: {e}")
- pg_store.update(kid, {"status": "rejected", "relationships": json.dumps(rejected_relationships), "updated_at": now})
- else:
- new_relationships = []
- for rel in relations:
- rel_type = rel.get("type", "none")
- reverse_type = rel.get("reverse_type", "none")
- old_id = rel.get("old_id")
- if not old_id or rel_type == "none":
- continue
- new_relationships.append({"type": rel_type, "target": old_id})
- self._relation_cache.add_relation(rel_type, kid)
- self._relation_cache.add_relation(rel_type, old_id)
- if reverse_type and reverse_type != "none":
- try:
- old = pg_store.get_by_id(old_id)
- if old:
- old_rels = old.get("relationships") or []
- old_rels.append({"type": reverse_type, "target": kid})
- pg_store.update(old_id, {"relationships": json.dumps(old_rels), "updated_at": now})
- self._relation_cache.add_relation(reverse_type, old_id)
- self._relation_cache.add_relation(reverse_type, kid)
- except Exception as e:
- print(f"[Apply Decision] 更新旧知识关系 {old_id} 失败: {e}")
- pg_store.update(kid, {
- "status": "dedup_passed",
- "relationships": json.dumps(new_relationships),
- "updated_at": now
- })
- async def _llm_analyze_tools(self, knowledge: dict) -> dict:
- """使用 LLM 分析知识中涉及的工具(复用迁移脚本逻辑)"""
- task = (knowledge.get("task") or "")[:600]
- content = (knowledge.get("content") or "")[:1200]
- prompt = TOOL_ANALYSIS_PROMPT.format(task=task, content=content)
- try:
- response = await _tool_analysis_llm(
- messages=[{"role": "user", "content": prompt}],
- max_tokens=2048,
- temperature=0.1,
- )
- raw = (response.get("content") or "").strip()
- if raw.startswith("```"):
- lines = raw.split("\n")
- inner = []
- in_block = False
- for line in lines:
- if line.startswith("```"):
- in_block = not in_block
- continue
- if in_block:
- inner.append(line)
- raw = "\n".join(inner).strip()
- return json.loads(raw)
- except Exception as e:
- print(f"[Tool Analysis LLM] 调用失败: {e}")
- raise
- async def _create_or_get_tool_resource(self, tool_info: dict) -> Optional[str]:
- """创建或获取工具资源(存入 PostgreSQL tool_table)"""
- category = tool_info.get("category", "other")
- slug = tool_info.get("slug", "")
- if not slug:
- return None
- tool_id = f"tools/{category}/{slug}"
- now_ts = int(time.time())
- cursor = pg_store._get_cursor()
- try:
- cursor.execute("SELECT id FROM tool_table WHERE id = %s", (tool_id,))
- if cursor.fetchone():
- return tool_id
- cursor.execute("""
- INSERT INTO tool_table (id, name, version, introduction, tutorial, input, output,
- updated_time, status, knowledge, case_knowledge, process_knowledge)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- tool_id,
- tool_info.get("name", slug),
- tool_info.get("version") or None,
- tool_info.get("description", ""),
- tool_info.get("usage", ""),
- json.dumps(tool_info.get("input", "")),
- json.dumps(tool_info.get("output", "")),
- now_ts,
- tool_info.get("status", "未接入"),
- json.dumps([]),
- json.dumps([]),
- json.dumps([]),
- ))
- pg_store.conn.commit()
- print(f"[Tool Resource] 创建新工具: {tool_id}")
- return tool_id
- finally:
- cursor.close()
- async def _update_tool_knowledge_index(self, tool_id: str, knowledge_id: str):
- """更新工具的 knowledge 关联索引(PostgreSQL tool_table)"""
- now_ts = int(time.time())
- cursor = pg_store._get_cursor()
- try:
- cursor.execute("SELECT knowledge FROM tool_table WHERE id = %s", (tool_id,))
- row = cursor.fetchone()
- if not row:
- return
- knowledge_ids = row["knowledge"] if isinstance(row["knowledge"], list) else json.loads(row["knowledge"] or "[]")
- if knowledge_id not in knowledge_ids:
- knowledge_ids.append(knowledge_id)
- cursor.execute(
- "UPDATE tool_table SET knowledge = %s, updated_time = %s WHERE id = %s",
- (json.dumps(knowledge_ids), now_ts, tool_id)
- )
- pg_store.conn.commit()
- finally:
- cursor.close()
- async def _analyze_tool_relation(self, knowledge: dict):
- """分析知识与工具的关联关系"""
- kid = knowledge["id"]
- now = int(time.time())
- # 乐观锁:dedup_passed → analyzing
- try:
- pg_store.update(kid, {"status": "analyzing", "updated_at": now})
- except Exception as e:
- print(f"[Tool Analysis] 锁定 {kid} 失败: {e}")
- return
- try:
- tool_analysis = await self._llm_analyze_tools(knowledge)
- has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
- existing_tags = knowledge.get("tags") or {}
- has_tool_tag = existing_tags.get("tool") is True
- # 情况1:LLM 判定无工具,但有 tool tag → 重新分析一次
- if not has_tools and has_tool_tag:
- print(f"[Tool Analysis] {kid} LLM 判定无工具但有 tool tag,重新分析")
- tool_analysis = await self._llm_analyze_tools(knowledge)
- has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
- # 重新分析后仍然不一致 → 知识模糊,rejected
- if not has_tools:
- pg_store.update(kid, {"status": "rejected", "updated_at": now})
- print(f"[Tool Analysis] {kid} 两次判定不一致,知识模糊,rejected")
- return
- # 情况2:无工具且无 tool tag → 直接 approved
- if not has_tools:
- pg_store.update(kid, {"status": "approved", "updated_at": now})
- return
- # 情况3/4:有工具 → 创建资源并关联
- tool_ids = []
- for tool_info in (tool_analysis.get("tools") or []):
- tool_id = await self._create_or_get_tool_resource(tool_info)
- if tool_id:
- tool_ids.append(tool_id)
- existing_resource_ids = knowledge.get("resource_ids") or []
- updated_resource_ids = list(set(existing_resource_ids + tool_ids))
- updates: dict = {
- "status": "approved",
- "resource_ids": updated_resource_ids,
- "updated_at": now
- }
- # 有工具但无 tool tag → 添加 tag
- if not has_tool_tag:
- updated_tags = dict(existing_tags)
- updated_tags["tool"] = True
- updates["tags"] = updated_tags
- print(f"[Tool Analysis] {kid} 添加 tool tag")
- pg_store.update(kid, updates)
- for tool_id in tool_ids:
- await self._update_tool_knowledge_index(tool_id, kid)
- print(f"[Tool Analysis] {kid} 关联了 {len(tool_ids)} 个工具")
- except Exception as e:
- print(f"[Tool Analysis] {kid} 分析失败: {e},回退到 dedup_passed")
- try:
- pg_store.update(kid, {"status": "dedup_passed", "updated_at": int(time.time())})
- except Exception:
- pass
- async def _periodic_processor():
- """每60秒检测超时条目并回滚:processing(>5min)→pending,analyzing(>10min)→dedup_passed"""
- while True:
- await asyncio.sleep(60)
- try:
- now = int(time.time())
- # 回滚超时的 processing(5分钟 → pending)
- timeout_5min = now - 300
- processing = pg_store.query('status == "processing"', limit=200)
- for item in processing:
- updated_at = item.get("updated_at", 0) or 0
- updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
- if updated_at_sec < timeout_5min:
- print(f"[Periodic] 回滚超时 processing → pending: {item['id']}")
- pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
- # 回滚超时的 analyzing(10分钟 → dedup_passed)
- timeout_10min = now - 600
- analyzing = pg_store.query('status == "analyzing"', limit=200)
- for item in analyzing:
- updated_at = item.get("updated_at", 0) or 0
- updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
- if updated_at_sec < timeout_10min:
- print(f"[Periodic] 回滚超时 analyzing → dedup_passed: {item['id']}")
- pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
- except Exception as e:
- print(f"[Periodic] 定时任务错误: {e}")
- # --- App ---
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- global pg_store, pg_resource_store, knowledge_processor
- # 初始化 PostgreSQL(knowledge + resources)
- pg_store = PostgreSQLStore()
- pg_resource_store = PostgreSQLResourceStore()
- # 初始化去重处理器 + 启动定时兜底任务
- knowledge_processor = KnowledgeProcessor()
- periodic_task = asyncio.create_task(_periodic_processor())
- yield
- # 清理
- periodic_task.cancel()
- try:
- await periodic_task
- except asyncio.CancelledError:
- pass
- pg_store.close()
- pg_resource_store.close()
- app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
- # 挂载静态文件
- STATIC_DIR = Path(__file__).parent / "static"
- if STATIC_DIR.exists():
- app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
- # --- Knowledge API ---
- @app.post("/api/resource", status_code=201)
- def submit_resource(resource: ResourceIn):
- """提交资源(存入 PostgreSQL resources 表)"""
- try:
- # 加密敏感内容
- encrypted_secure_body = encrypt_content(resource.id, resource.secure_body)
- pg_resource_store.insert_or_update({
- 'id': resource.id,
- 'title': resource.title,
- 'body': resource.body,
- 'secure_body': encrypted_secure_body,
- 'content_type': resource.content_type,
- 'metadata': resource.metadata,
- 'sort_order': resource.sort_order,
- 'submitted_by': resource.submitted_by
- })
- return {"status": "ok", "id": resource.id}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/resource/{resource_id:path}", response_model=ResourceOut)
- def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)):
- """获取资源详情(从 PostgreSQL)"""
- try:
- row = pg_resource_store.get_by_id(resource_id)
- if not row:
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- # 解密敏感内容
- secure_body = decrypt_content(resource_id, row.get("secure_body", ""), x_org_key)
- # 计算导航上下文
- root_id = resource_id.split("/")[0] if "/" in resource_id else resource_id
- # TOC (根节点)
- toc = None
- if "/" in resource_id:
- toc_row = pg_resource_store.get_by_id(root_id)
- if toc_row:
- toc = ResourceNode(id=toc_row["id"], title=toc_row["title"])
- # Children (子节点)
- children_rows = pg_resource_store.list_resources(prefix=f"{resource_id}/", limit=1000)
- children = [ResourceNode(id=r["id"], title=r["title"]) for r in children_rows
- if r["id"].count("/") == resource_id.count("/") + 1]
- # Prev/Next (同级节点)
- prev_node, next_node = pg_resource_store.get_siblings(resource_id)
- prev = ResourceNode(id=prev_node["id"], title=prev_node["title"]) if prev_node else None
- next = ResourceNode(id=next_node["id"], title=next_node["title"]) if next_node else None
- return ResourceOut(
- id=row["id"],
- title=row["title"],
- body=row["body"],
- secure_body=secure_body,
- content_type=row["content_type"],
- metadata=row.get("metadata", {}),
- toc=toc,
- children=children,
- prev=prev,
- next=next,
- )
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/resource/{resource_id:path}")
- def patch_resource(resource_id: str, patch: ResourcePatchIn):
- """更新resource字段(PostgreSQL)"""
- try:
- # 检查是否存在
- if not pg_resource_store.get_by_id(resource_id):
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- # 构建更新字典
- updates = {}
- if patch.title is not None:
- updates['title'] = patch.title
- if patch.body is not None:
- updates['body'] = patch.body
- if patch.secure_body is not None:
- updates['secure_body'] = encrypt_content(resource_id, patch.secure_body)
- if patch.content_type is not None:
- updates['content_type'] = patch.content_type
- if patch.metadata is not None:
- updates['metadata'] = patch.metadata
- if not updates:
- return {"status": "ok", "message": "No fields to update"}
- pg_resource_store.update(resource_id, updates)
- return {"status": "ok", "id": resource_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/resource")
- def list_resources(
- content_type: Optional[str] = Query(None),
- limit: int = Query(100, ge=1, le=1000)
- ):
- """列出所有resource(PostgreSQL)"""
- try:
- results = pg_resource_store.list_resources(
- content_type=content_type,
- limit=limit
- )
- return {"results": results, "count": len(results)}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/resource/{resource_id:path}")
- def delete_resource(resource_id: str):
- """删除单个resource(PostgreSQL)"""
- try:
- if not pg_resource_store.get_by_id(resource_id):
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- pg_resource_store.delete(resource_id)
- return {"status": "ok", "id": resource_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # --- Knowledge API ---
- # ===== Knowledge API =====
- async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[str]:
- """
- 使用 LLM 对候选知识进行精排
- Args:
- query: 查询文本
- candidates: 候选知识列表
- top_k: 返回数量
- Returns:
- 排序后的知识 ID 列表
- """
- if not candidates:
- return []
- # 构造 prompt
- candidates_text = "\n".join([
- f"[{i+1}] ID: {c['id']}\nTask: {c['task']}\nContent: {c['content'][:200]}..."
- for i, c in enumerate(candidates)
- ])
- prompt = f"""你是知识检索专家。根据用户查询,从候选知识中选出最相关的 {top_k} 条。
- 用户查询:"{query}"
- 候选知识:
- {candidates_text}
- 请输出最相关的 {top_k} 个知识 ID,按相关性从高到低排序,用逗号分隔。
- 只输出 ID,不要其他内容。"""
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 解析 ID 列表
- selected_ids = [
- idx.strip()
- for idx in re.split(r'[,\s]+', content)
- if idx.strip().startswith(("knowledge-", "research-"))
- ]
- return selected_ids[:top_k]
- except Exception as e:
- print(f"[LLM Rerank] 失败: {e}")
- return []
- @app.get("/api/knowledge/search")
- async def search_knowledge_api(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(default=5, ge=1, le=20),
- min_score: int = Query(default=3, ge=1, le=5),
- types: Optional[str] = None,
- owner: Optional[str] = None
- ):
- """检索知识(向量召回 + LLM 精排)"""
- try:
- # 1. 生成查询向量
- query_embedding = await get_embedding(q)
- # 2. 构建过滤表达式
- filters = []
- if types:
- type_list = [t.strip() for t in types.split(',') if t.strip()]
- for t in type_list:
- filters.append(f'array_contains(types, "{t}")')
- if owner:
- owner_list = [o.strip() for o in owner.split(',') if o.strip()]
- if len(owner_list) == 1:
- filters.append(f'owner == "{owner_list[0]}"')
- else:
- # 多个owner用OR连接
- owner_filters = [f'owner == "{o}"' for o in owner_list]
- filters.append(f'({" or ".join(owner_filters)})')
- # 添加 min_score 过滤
- filters.append(f'eval["score"] >= {min_score}')
- # 只搜索 approved 和 checked 的知识
- filters.append('(status == "approved" or status == "checked")')
- filter_expr = ' and '.join(filters) if filters else None
- # 3. 向量召回(3*k 个候选)
- recall_limit = top_k * 3
- candidates = pg_store.search(
- query_embedding=query_embedding,
- filters=filter_expr,
- limit=recall_limit
- )
- if not candidates:
- return {"results": [], "count": 0, "reranked": False}
- # 转换为可序列化的格式
- serialized_candidates = [serialize_milvus_result(c) for c in candidates]
- # 4. LLM 精排
- reranked_ids = await _llm_rerank(q, serialized_candidates, top_k)
- if reranked_ids:
- # 按 LLM 排序返回
- id_to_candidate = {c["id"]: c for c in serialized_candidates}
- results = [id_to_candidate[id] for id in reranked_ids if id in id_to_candidate]
- return {"results": results, "count": len(results), "reranked": True}
- else:
- # Fallback:直接返回向量召回的 top k
- print(f"[Knowledge Search] LLM 精排失败,fallback 到向量 top-{top_k}")
- return {"results": serialized_candidates[:top_k], "count": len(serialized_candidates[:top_k]), "reranked": False}
- except Exception as e:
- print(f"[Knowledge Search] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge", status_code=201)
- async def save_knowledge(knowledge: KnowledgeIn, background_tasks: BackgroundTasks):
- """保存新知识"""
- try:
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- now = int(time.time())
- # 设置默认值
- owner = knowledge.owner or f"agent:{knowledge.source.get('agent_id', 'unknown')}"
- # 准备 source
- source = {
- "name": knowledge.source.get("name", ""),
- "category": knowledge.source.get("category", ""),
- "urls": knowledge.source.get("urls", []),
- "agent_id": knowledge.source.get("agent_id", "unknown"),
- "submitted_by": knowledge.source.get("submitted_by", ""),
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "message_id": knowledge.message_id
- }
- # 准备 eval
- eval_data = {
- "score": knowledge.eval.get("score", 3),
- "helpful": knowledge.eval.get("helpful", 1),
- "harmful": knowledge.eval.get("harmful", 0),
- "confidence": knowledge.eval.get("confidence", 0.5),
- "helpful_history": [],
- "harmful_history": []
- }
- # 生成向量(只基于 task,因为搜索时用户描述的是任务场景)
- embedding = await get_embedding(knowledge.task)
- # 提取 tag keys(用于高效筛选)
- tag_keys = list(knowledge.tags.keys()) if isinstance(knowledge.tags, dict) else []
- # 准备插入数据
- insert_data = {
- "id": knowledge_id,
- "embedding": embedding,
- "message_id": knowledge.message_id,
- "task": knowledge.task,
- "content": knowledge.content,
- "types": knowledge.types,
- "tags": knowledge.tags,
- "tag_keys": tag_keys,
- "scopes": knowledge.scopes,
- "owner": owner,
- "resource_ids": knowledge.resource_ids,
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "pending",
- "relationships": json.dumps([]),
- }
- print(f"[Save Knowledge] 插入数据: {json.dumps({k: v for k, v in insert_data.items() if k != 'embedding'}, ensure_ascii=False)}")
- # 插入 Milvus
- pg_store.insert(insert_data)
- # 触发后台去重处理
- background_tasks.add_task(knowledge_processor.process_pending)
- return {"status": "pending", "knowledge_id": knowledge_id, "message": "知识已入队,正在处理去重..."}
- except Exception as e:
- print(f"[Save Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge")
- def list_knowledge(
- page: int = Query(default=1, ge=1),
- page_size: int = Query(default=200, ge=1, le=500),
- types: Optional[str] = None,
- scopes: Optional[str] = None,
- owner: Optional[str] = None,
- tags: Optional[str] = None,
- status: Optional[str] = None
- ):
- """列出知识(支持后端筛选和分页)"""
- try:
- # 构建过滤表达式
- filters = []
- # types 支持多个,用 AND 连接(交集:必须同时包含所有选中的type)
- if types:
- type_list = [t.strip() for t in types.split(',') if t.strip()]
- for t in type_list:
- filters.append(f'array_contains(types, "{t}")')
- if scopes:
- filters.append(f'array_contains(scopes, "{scopes}")')
- if owner:
- owner_list = [o.strip() for o in owner.split(',') if o.strip()]
- if len(owner_list) == 1:
- filters.append(f'owner == "{owner_list[0]}"')
- else:
- # 多个owner用OR连接
- owner_filters = [f'owner == "{o}"' for o in owner_list]
- filters.append(f'({" or ".join(owner_filters)})')
- # tags 支持多个,用 AND 连接(使用 tag_keys 数组进行高效筛选)
- if tags:
- tag_list = [t.strip() for t in tags.split(',') if t.strip()]
- for t in tag_list:
- filters.append(f'array_contains(tag_keys, "{t}")')
- # 只返回指定 status 的知识(默认 approved 和 checked)
- status_list = [s.strip() for s in (status or "approved,checked").split(',') if s.strip()]
- status_conditions = ' or '.join([f'status == "{s}"' for s in status_list])
- filters.append(f'({status_conditions})')
- # 如果没有过滤条件,查询所有
- filter_expr = ' and '.join(filters) if filters else 'id != ""'
- # 查询 Milvus(先获取所有符合条件的数据)
- # Milvus 的 limit 是总数限制,我们需要获取足够多的数据来支持分页
- max_limit = 10000 # 设置一个合理的上限
- results = pg_store.query(filter_expr, limit=max_limit)
- # 转换为可序列化的格式
- serialized_results = [serialize_milvus_result(r) for r in results]
- # 按 created_at 降序排序(最新的在前)
- serialized_results.sort(key=lambda x: x.get('created_at', 0), reverse=True)
- # 计算分页
- total = len(serialized_results)
- total_pages = (total + page_size - 1) // page_size # 向上取整
- start_idx = (page - 1) * page_size
- end_idx = start_idx + page_size
- page_results = serialized_results[start_idx:end_idx]
- return {
- "results": page_results,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": total_pages
- }
- }
- except Exception as e:
- print(f"[List Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/meta/tags")
- def get_all_tags():
- """获取所有已有的 tags"""
- try:
- # 查询所有知识
- results = pg_store.query('id != ""', limit=10000)
- all_tags = set()
- for item in results:
- # 转换为标准字典
- serialized_item = serialize_milvus_result(item)
- tags_dict = serialized_item.get("tags", {})
- if isinstance(tags_dict, dict):
- for key in tags_dict.keys():
- all_tags.add(key)
- return {"tags": sorted(list(all_tags))}
- except Exception as e:
- print(f"[Get Tags] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/pending")
- def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
- """查询待处理队列(pending + processing + dedup_passed + analyzing)"""
- try:
- pending = pg_store.query(
- 'status == "pending" or status == "processing" or status == "dedup_passed" or status == "analyzing"',
- limit=limit
- )
- serialized = [serialize_milvus_result(r) for r in pending]
- return {"results": serialized, "count": len(serialized)}
- except Exception as e:
- print(f"[Pending] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/process")
- async def trigger_process(force: bool = Query(default=False)):
- """手动触发去重处理。force=true 时先回滚所有 processing → pending,analyzing → dedup_passed"""
- try:
- if force:
- processing = pg_store.query('status == "processing"', limit=200)
- for item in processing:
- pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
- print(f"[Manual Process] 回滚 {len(processing)} 条 processing → pending")
- analyzing = pg_store.query('status == "analyzing"', limit=200)
- for item in analyzing:
- pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
- print(f"[Manual Process] 回滚 {len(analyzing)} 条 analyzing → dedup_passed")
- asyncio.create_task(knowledge_processor.process_pending())
- return {"status": "ok", "message": "处理任务已触发"}
- except Exception as e:
- print(f"[Manual Process] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/migrate")
- async def migrate_knowledge_schema():
- """手动触发 schema 迁移(PostgreSQL不需要此功能)"""
- return {"status": "ok", "message": "PostgreSQL不需要schema迁移"}
- @app.get("/api/knowledge/status/{knowledge_id}")
- def get_knowledge_status(knowledge_id: str):
- """查询单条知识的处理状态和关系"""
- try:
- result = pg_store.get_by_id(knowledge_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- serialized = serialize_milvus_result(result)
- return {
- "id": knowledge_id,
- "status": serialized.get("status", "approved"),
- "relationships": serialized.get("relationships", []),
- "created_at": serialized.get("created_at"),
- "updated_at": serialized.get("updated_at"),
- }
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Knowledge Status] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/{knowledge_id}")
- def get_knowledge(knowledge_id: str):
- """获取单条知识"""
- try:
- result = pg_store.get_by_id(knowledge_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- return serialize_milvus_result(result)
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Get Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
- """使用 LLM 进行知识进化重写"""
- prompt = f"""你是一个 AI Agent 知识库管理员。请根据反馈建议,对现有的知识内容进行重写进化。
- 【原知识内容】:
- {old_content}
- 【实战反馈建议】:
- {feedback}
- 【重写要求】:
- 1. 融合知识:将反馈中的避坑指南、新参数或修正后的选择逻辑融入原知识,使其更具通用性和准确性。
- 2. 保持结构:如果原内容有特定格式(如 Markdown、代码示例等),请保持该格式。
- 3. 语言:简洁直接,使用中文。
- 4. 禁止:严禁输出任何开场白、解释语或额外的 Markdown 标题,直接返回重写后的正文。
- """
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- evolved = response.get("content", "").strip()
- if len(evolved) < 5:
- raise ValueError("LLM output too short")
- return evolved
- except Exception as e:
- print(f"知识进化失败,采用追加模式回退: {e}")
- return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}"
- @app.put("/api/knowledge/{knowledge_id}")
- async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
- """更新知识评估,支持知识进化"""
- try:
- # 获取现有知识
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- eval_data = existing.get("eval", {})
- # 更新评分
- if update.update_score is not None:
- eval_data["score"] = update.update_score
- # 添加有效案例
- if update.add_helpful_case:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- if "helpful_history" not in eval_data:
- eval_data["helpful_history"] = []
- eval_data["helpful_history"].append(update.add_helpful_case)
- # 添加有害案例
- if update.add_harmful_case:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- if "harmful_history" not in eval_data:
- eval_data["harmful_history"] = []
- eval_data["harmful_history"].append(update.add_harmful_case)
- # 知识进化
- content = existing["content"]
- need_reembed = False
- if update.evolve_feedback:
- content = await _evolve_knowledge_with_llm(content, update.evolve_feedback)
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- need_reembed = True
- # 准备更新数据
- updates = {
- "content": content,
- "eval": eval_data,
- }
- # 如果内容变化,重新生成向量
- if need_reembed:
- embedding = await get_embedding(existing['task'])
- updates["embedding"] = embedding
- # 更新 Milvus
- pg_store.update(knowledge_id, updates)
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Update Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/knowledge/{knowledge_id}")
- async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
- """直接编辑知识字段"""
- try:
- # 获取现有知识
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- updates = {}
- need_reembed = False
- if patch.task is not None:
- updates["task"] = patch.task
- need_reembed = True
- if patch.content is not None:
- updates["content"] = patch.content
- # content 变化不需要重新生成 embedding(只基于 task)
- if patch.types is not None:
- updates["types"] = patch.types
- if patch.tags is not None:
- updates["tags"] = patch.tags
- # 同时更新 tag_keys
- updates["tag_keys"] = list(patch.tags.keys()) if isinstance(patch.tags, dict) else []
- if patch.scopes is not None:
- updates["scopes"] = patch.scopes
- if patch.owner is not None:
- updates["owner"] = patch.owner
- if not updates:
- return {"status": "ok", "knowledge_id": knowledge_id}
- # 如果 task 变化,重新生成向量
- if need_reembed:
- task = updates.get("task", existing["task"])
- embedding = await get_embedding(task)
- updates["embedding"] = embedding
- # 更新 Milvus
- pg_store.update(knowledge_id, updates)
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Patch Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/knowledge/{knowledge_id}")
- def delete_knowledge(knowledge_id: str):
- """删除单条知识"""
- try:
- # 检查知识是否存在
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- # 从 PostgreSQL 删除
- pg_store.delete(knowledge_id)
- print(f"[Delete Knowledge] 已删除知识: {knowledge_id}")
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Delete Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_delete")
- def batch_delete_knowledge(knowledge_ids: List[str] = Body(...)):
- """批量删除知识"""
- try:
- if not knowledge_ids:
- raise HTTPException(status_code=400, detail="knowledge_ids cannot be empty")
- # 批量删除
- cursor = pg_store._get_cursor()
- try:
- cursor.execute(
- "DELETE FROM knowledge WHERE id = ANY(%s)",
- (knowledge_ids,)
- )
- pg_store.conn.commit()
- deleted_count = cursor.rowcount
- print(f"[Batch Delete] 已删除 {deleted_count} 条知识")
- return {"status": "ok", "deleted_count": deleted_count}
- finally:
- cursor.close()
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Batch Delete] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_verify")
- async def batch_verify_knowledge(batch: KnowledgeBatchVerifyIn):
- """批量验证通过(approved → checked)"""
- if not batch.knowledge_ids:
- return {"status": "ok", "updated": 0}
- try:
- now_iso = datetime.now(timezone.utc).isoformat()
- updated_count = 0
- for kid in batch.knowledge_ids:
- existing = pg_store.get_by_id(kid)
- if not existing:
- continue
- eval_data = existing.get("eval") or {}
- eval_data["verification"] = {
- "status": "checked",
- "verified_by": batch.verified_by,
- "verified_at": now_iso,
- "note": None,
- "issue_type": None,
- "issue_action": None,
- }
- pg_store.update(kid, {"eval": eval_data, "status": "checked", "updated_at": int(time.time())})
- updated_count += 1
- return {"status": "ok", "updated": updated_count}
- except Exception as e:
- print(f"[Batch Verify] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/{knowledge_id}/verify")
- async def verify_knowledge(knowledge_id: str, verify: KnowledgeVerifyIn):
- """知识验证:approve 切换 approved↔checked,reject 设为 rejected"""
- try:
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- current_status = existing.get("status", "approved")
- if verify.action == "approve":
- # checked → approved(取消验证),其他 → checked
- new_status = "approved" if current_status == "checked" else "checked"
- pg_store.update(knowledge_id, {
- "status": new_status,
- "updated_at": int(time.time())
- })
- return {"status": "ok", "new_status": new_status,
- "message": "已取消验证" if new_status == "approved" else "验证通过"}
- elif verify.action == "reject":
- pg_store.update(knowledge_id, {
- "status": "rejected",
- "updated_at": int(time.time())
- })
- return {"status": "ok", "new_status": "rejected", "message": "已拒绝"}
- else:
- raise HTTPException(status_code=400, detail=f"Unknown action: {verify.action}")
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Verify Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_update")
- async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
- """批量反馈知识有效性"""
- if not batch.feedback_list:
- return {"status": "ok", "updated": 0}
- try:
- # 先处理无需进化的,收集需要进化的
- evolution_tasks = [] # [(knowledge_id, old_content, feedback, eval_data)]
- simple_updates = [] # [(knowledge_id, is_effective, eval_data)]
- for item in batch.feedback_list:
- knowledge_id = item.get("knowledge_id")
- is_effective = item.get("is_effective")
- feedback = item.get("feedback", "")
- if not knowledge_id:
- continue
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- continue
- eval_data = existing.get("eval", {})
- if is_effective and feedback:
- evolution_tasks.append((knowledge_id, existing["content"], feedback, eval_data, existing["task"]))
- else:
- simple_updates.append((knowledge_id, is_effective, eval_data))
- # 执行简单更新
- for knowledge_id, is_effective, eval_data in simple_updates:
- if is_effective:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- else:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- pg_store.update(knowledge_id, {"eval": eval_data})
- # 并发执行知识进化
- if evolution_tasks:
- print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
- evolved_results = await asyncio.gather(
- *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _, _ in evolution_tasks]
- )
- for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results):
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- # 重新生成向量(只基于 task)
- embedding = await get_embedding(task)
- pg_store.update(knowledge_id, {
- "content": evolved_content,
- "eval": eval_data,
- "embedding": embedding
- })
- return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)}
- except Exception as e:
- print(f"[Batch Update] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/slim")
- async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"):
- """知识库瘦身:合并语义相似知识"""
- try:
- # 获取所有知识
- all_knowledge = pg_store.query('id != ""', limit=10000)
- # 转换为可序列化的格式
- all_knowledge = [serialize_milvus_result(item) for item in all_knowledge]
- if len(all_knowledge) < 2:
- return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"}
- # 构造发给大模型的内容
- entries_text = ""
- for item in all_knowledge:
- eval_data = item.get("eval", {})
- types = item.get("types", [])
- entries_text += f"[ID: {item['id']}] [Types: {','.join(types)}] "
- entries_text += f"[Helpful: {eval_data.get('helpful', 0)}, Harmful: {eval_data.get('harmful', 0)}] [Score: {eval_data.get('score', 3)}]\n"
- entries_text += f"Task: {item['task']}\n"
- entries_text += f"Content: {item['content'][:200]}...\n\n"
- prompt = f"""你是一个 AI Agent 知识库管理员。以下是当前知识库的全部条目,请执行瘦身操作:
- 【任务】:
- 1. 识别语义高度相似或重复的知识,将它们合并为一条更精炼、更通用的知识。
- 2. 合并时保留 helpful 最高的那条的 ID(helpful 取各条之和)。
- 3. 对于独立的、无重复的知识,保持原样不动。
- 【当前知识库】:
- {entries_text}
- 【输出格式要求】:
- 严格按以下格式输出每条知识,条目之间用 === 分隔:
- ID: <保留的id>
- TYPES: <逗号分隔的type列表>
- HELPFUL: <合并后的helpful计数>
- HARMFUL: <合并后的harmful计数>
- SCORE: <评分>
- TASK: <任务描述>
- CONTENT: <合并后的知识内容>
- ===
- 最后输出合并报告:
- REPORT: 原有 X 条,合并后 Y 条,精简了 Z 条。
- 禁止输出任何开场白或解释。"""
- print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(all_knowledge)} 条知识...")
- slim_llm = create_openrouter_llm_call(model=model)
- response = await slim_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- if not content:
- raise HTTPException(status_code=500, detail="LLM 返回为空")
- # 解析大模型输出
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- kid, types, helpful, harmful, score, task, content_lines = None, [], 0, 0, 3, "", []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- kid = line[3:].strip()
- current_field = None
- elif line.startswith("TYPES:"):
- types_str = line[6:].strip()
- types = [t.strip() for t in types_str.split(",") if t.strip()]
- current_field = None
- elif line.startswith("HELPFUL:"):
- try:
- helpful = int(line[8:].strip())
- except Exception:
- helpful = 0
- current_field = None
- elif line.startswith("HARMFUL:"):
- try:
- harmful = int(line[8:].strip())
- except Exception:
- harmful = 0
- current_field = None
- elif line.startswith("SCORE:"):
- try:
- score = int(line[6:].strip())
- except Exception:
- score = 3
- current_field = None
- elif line.startswith("TASK:"):
- task = line[5:].strip()
- current_field = "task"
- elif line.startswith("CONTENT:"):
- content_lines.append(line[8:].strip())
- current_field = "content"
- elif current_field == "task":
- task += "\n" + line
- elif current_field == "content":
- content_lines.append(line)
- if kid and content_lines:
- new_entries.append({
- "id": kid,
- "types": types if types else ["strategy"],
- "helpful": helpful,
- "harmful": harmful,
- "score": score,
- "task": task.strip(),
- "content": "\n".join(content_lines).strip()
- })
- if not new_entries:
- raise HTTPException(status_code=500, detail="解析大模型输出失败")
- # 生成向量并重建知识库
- print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...")
- # 批量生成向量(只基于 task)
- texts = [e['task'] for e in new_entries]
- embeddings = await get_embeddings_batch(texts)
- # 清空并重建(PostgreSQL使用TRUNCATE)
- cursor = pg_store._get_cursor()
- try:
- cursor.execute("TRUNCATE TABLE knowledge")
- pg_store.conn.commit()
- finally:
- cursor.close()
- knowledge_list = []
- for e, embedding in zip(new_entries, embeddings):
- eval_data = {
- "score": e["score"],
- "helpful": e["helpful"],
- "harmful": e["harmful"],
- "confidence": 0.9,
- "helpful_history": [],
- "harmful_history": []
- }
- source = {
- "name": "slim",
- "category": "exp",
- "urls": [],
- "agent_id": "slim",
- "submitted_by": "system",
- "timestamp": datetime.now(timezone.utc).isoformat()
- }
- knowledge_list.append({
- "id": e["id"],
- "embedding": embedding,
- "message_id": "",
- "task": e["task"],
- "content": e["content"],
- "types": e["types"],
- "tags": {},
- "tag_keys": [],
- "scopes": ["org:cybertogether"],
- "owner": "agent:slim",
- "resource_ids": [],
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "approved",
- "relationships": json.dumps([])
- })
- pg_store.insert_batch(knowledge_list)
- result_msg = f"瘦身完成:{len(all_knowledge)} → {len(new_entries)} 条知识"
- if report_line:
- result_msg += f"\n{report_line}"
- print(f"[知识瘦身] {result_msg}")
- return {"status": "ok", "before": len(all_knowledge), "after": len(new_entries), "report": report_line}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Slim Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/extract")
- async def extract_knowledge_from_messages(extract_req: MessageExtractIn, background_tasks: BackgroundTasks):
- """从消息历史中提取知识(LLM 分析)"""
- if not extract_req.submitted_by:
- raise HTTPException(status_code=400, detail="submitted_by is required")
- messages = extract_req.messages
- if not messages or len(messages) == 0:
- return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
- # 构造消息历史文本
- messages_text = ""
- for msg in messages:
- role = msg.get("role", "unknown")
- content = msg.get("content", "")
- messages_text += f"[{role}]: {content}\n\n"
- # LLM 提取知识
- prompt = f"""你是一个知识提取专家。请从以下 Agent 对话历史中提取有价值的知识。
- 【对话历史】:
- {messages_text}
- 【提取要求】:
- 1. 识别对话中的关键知识点(工具使用经验、问题解决方案、最佳实践、踩坑经验等)
- 2. 每条知识必须包含:
- - task: 任务场景描述(在什么情况下,要完成什么目标)
- - content: 核心知识内容(具体可操作的方法、注意事项)
- - types: 知识类型(从 strategy/tool/user_profile/usecase/definition/plan 中选择)
- - score: 评分 1-5(根据知识的价值和可操作性)
- 3. 只提取有实际价值的知识,不要提取泛泛而谈的内容,一次就成功或比较简单的经验就不要记录了。
- 4. 如果没有值得提取的知识,返回空列表
- 【输出格式】:
- 严格按以下 JSON 格式输出,每条知识之间用逗号分隔:
- [
- {{
- "task": "任务场景描述",
- "content": "核心知识内容",
- "types": ["strategy"],
- "score": 4
- }},
- {{
- "task": "另一个任务场景",
- "content": "另一个知识内容",
- "types": ["tool"],
- "score": 5
- }}
- ]
- 如果没有知识,输出: []
- **注意**:只记录经过多次尝试、或经过用户指导才成功的知识,一次就成功或比较简单的经验就不要记录了。
- 禁止输出任何解释或额外文本,只输出 JSON 数组。"""
- try:
- print(f"\n[Extract] 正在从 {len(messages)} 条消息中提取知识...")
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 尝试解析 JSON
- # 移除可能的 markdown 代码块标记
- if content.startswith("```json"):
- content = content[7:]
- if content.startswith("```"):
- content = content[3:]
- if content.endswith("```"):
- content = content[:-3]
- content = content.strip()
- extracted_knowledge = json.loads(content)
- if not isinstance(extracted_knowledge, list):
- raise ValueError("LLM output is not a list")
- if not extracted_knowledge:
- return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
- # 批量生成向量(只基于 task)
- texts = [item.get('task', '') for item in extracted_knowledge]
- embeddings = await get_embeddings_batch(texts)
- # 保存提取的知识
- knowledge_ids = []
- now = int(time.time())
- knowledge_list = []
- for item, embedding in zip(extracted_knowledge, embeddings):
- task = item.get("task", "")
- knowledge_content = item.get("content", "")
- types = item.get("types", ["strategy"])
- score = item.get("score", 3)
- if not task or not knowledge_content:
- continue
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- # 准备数据
- source = {
- "name": "message_extraction",
- "category": "exp",
- "urls": [],
- "agent_id": extract_req.agent_id,
- "submitted_by": extract_req.submitted_by,
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "session_key": extract_req.session_key
- }
- eval_data = {
- "score": score,
- "helpful": 1,
- "harmful": 0,
- "confidence": 0.7,
- "helpful_history": [],
- "harmful_history": []
- }
- knowledge_list.append({
- "id": knowledge_id,
- "embedding": embedding,
- "message_id": "",
- "task": task,
- "content": knowledge_content,
- "types": types,
- "tags": {},
- "tag_keys": [],
- "scopes": ["org:cybertogether"],
- "owner": extract_req.submitted_by,
- "resource_ids": [],
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "pending",
- "relationships": json.dumps([]),
- })
- knowledge_ids.append(knowledge_id)
- # 批量插入
- if knowledge_list:
- pg_store.insert_batch(knowledge_list)
- background_tasks.add_task(knowledge_processor.process_pending)
- print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
- return {
- "status": "ok",
- "extracted_count": len(knowledge_ids),
- "knowledge_ids": knowledge_ids
- }
- except json.JSONDecodeError as e:
- print(f"[Extract] JSON 解析失败: {e}")
- print(f"[Extract] LLM 输出: {content[:500]}")
- return {"status": "error", "error": "Failed to parse LLM output", "extracted_count": 0}
- except Exception as e:
- print(f"[Extract] 提取失败: {e}")
- return {"status": "error", "error": str(e), "extracted_count": 0}
- @app.get("/", response_class=FileResponse)
- def frontend():
- """KnowHub 管理前端"""
- index_file = STATIC_DIR / "index.html"
- if not index_file.exists():
- return HTMLResponse("<h1>KnowHub Frontend Not Found</h1><p>Please ensure knowhub/static/index.html exists.</p>", status_code=404)
- return FileResponse(str(index_file))
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=9999)
|