| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330 |
- """
- KnowHub Server
- Agent 工具使用经验的共享平台。
- FastAPI + Milvus Lite(知识)+ SQLite(资源),单文件部署。
- """
- import os
- import re
- import json
- import asyncio
- import base64
- import time
- import uuid
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone
- from typing import Optional, List, Dict
- from pathlib import Path
- from cryptography.hazmat.primitives.ciphers.aead import AESGCM
- from fastapi import FastAPI, HTTPException, Query, Header, Body, BackgroundTasks
- from fastapi.responses import HTMLResponse, FileResponse
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel, Field
- # 导入 LLM 调用(需要 agent 模块在 Python path 中)
- import sys
- sys.path.insert(0, str(Path(__file__).parent.parent))
- # 加载环境变量
- from dotenv import load_dotenv
- load_dotenv(Path(__file__).parent.parent / ".env")
- from agent.llm import create_openrouter_llm_call, create_qwen_llm_call
- from knowhub.kb_manage_prompts import (
- DEDUP_RELATION_PROMPT,
- TOOL_ANALYSIS_PROMPT,
- RERANK_PROMPT_TEMPLATE,
- KNOWLEDGE_EVOLVE_PROMPT_TEMPLATE,
- KNOWLEDGE_SLIM_PROMPT_TEMPLATE,
- MESSAGE_EXTRACT_PROMPT_TEMPLATE,
- )
- _dedup_llm = create_openrouter_llm_call(model="google/gemini-2.5-flash-lite")
- _tool_analysis_llm = create_qwen_llm_call(model="qwen3.5-plus")
- # 导入向量存储和 embedding
- from knowhub.knowhub_db.pg_store import PostgreSQLStore
- from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
- from knowhub.knowhub_db.pg_tool_store import PostgreSQLToolStore
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
- from knowhub.embeddings import get_embedding, get_embeddings_batch
- BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
- BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
- BRAND_DB = os.getenv("BRAND_DB", "knowhub.db")
- # 组织密钥配置(格式:org1:key1_base64,org2:key2_base64)
- ORG_KEYS_RAW = os.getenv("ORG_KEYS", "")
- ORG_KEYS = {}
- if ORG_KEYS_RAW:
- for pair in ORG_KEYS_RAW.split(","):
- if ":" in pair:
- org, key_b64 = pair.split(":", 1)
- ORG_KEYS[org.strip()] = key_b64.strip()
- DB_PATH = Path(__file__).parent / BRAND_DB
- # 全局 PostgreSQL 存储实例
- pg_store: Optional[PostgreSQLStore] = None
- pg_resource_store: Optional[PostgreSQLResourceStore] = None
- pg_tool_store: Optional[PostgreSQLToolStore] = None
- pg_capability_store: Optional[PostgreSQLCapabilityStore] = None
- pg_requirement_store: Optional[PostgreSQLRequirementStore] = None
- # --- 加密/解密 ---
- def get_org_key(resource_id: str) -> Optional[bytes]:
- """从content_id提取组织前缀,返回对应密钥"""
- if "/" in resource_id:
- org = resource_id.split("/")[0]
- if org in ORG_KEYS:
- return base64.b64decode(ORG_KEYS[org])
- return None
- def encrypt_content(resource_id: str, plaintext: str) -> str:
- """加密内容,返回格式:encrypted:AES256-GCM:{base64_data}"""
- if not plaintext:
- return ""
- key = get_org_key(resource_id)
- if not key:
- # 没有配置密钥,明文存储(不推荐)
- return plaintext
- aesgcm = AESGCM(key)
- nonce = os.urandom(12) # 96-bit nonce
- ciphertext = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
- # 组合 nonce + ciphertext
- encrypted_data = nonce + ciphertext
- encoded = base64.b64encode(encrypted_data).decode("ascii")
- return f"encrypted:AES256-GCM:{encoded}"
- def decrypt_content(resource_id: str, encrypted_text: str, provided_key: Optional[str] = None) -> str:
- """解密内容,如果没有提供密钥或密钥错误,返回[ENCRYPTED]"""
- if not encrypted_text:
- return ""
- if not encrypted_text.startswith("encrypted:AES256-GCM:"):
- # 未加密的内容,直接返回
- return encrypted_text
- # 提取加密数据
- encoded = encrypted_text.split(":", 2)[2]
- encrypted_data = base64.b64decode(encoded)
- nonce = encrypted_data[:12]
- ciphertext = encrypted_data[12:]
- # 获取密钥
- key = None
- if provided_key:
- # 使用提供的密钥
- try:
- key = base64.b64decode(provided_key)
- except Exception:
- return "[ENCRYPTED]"
- else:
- # 从配置中获取
- key = get_org_key(resource_id)
- if not key:
- return "[ENCRYPTED]"
- try:
- aesgcm = AESGCM(key)
- plaintext = aesgcm.decrypt(nonce, ciphertext, None)
- return plaintext.decode("utf-8")
- except Exception:
- return "[ENCRYPTED]"
- def serialize_milvus_result(data):
- """将 Milvus 返回的数据转换为可序列化的字典"""
- # 基本类型直接返回
- if data is None or isinstance(data, (str, int, float, bool)):
- return data
- # 字典类型递归处理
- if isinstance(data, dict):
- return {k: serialize_milvus_result(v) for k, v in data.items()}
- # 列表/元组类型递归处理
- if isinstance(data, (list, tuple)):
- return [serialize_milvus_result(item) for item in data]
- # 尝试转换为字典(对于有 to_dict 方法的对象)
- if hasattr(data, 'to_dict') and callable(getattr(data, 'to_dict')):
- try:
- return serialize_milvus_result(data.to_dict())
- except:
- pass
- # 尝试转换为列表(对于可迭代对象,如 RepeatedScalarContainer)
- if hasattr(data, '__iter__') and not isinstance(data, (str, bytes, dict)):
- try:
- # 强制转换为列表并递归处理
- result = []
- for item in data:
- result.append(serialize_milvus_result(item))
- return result
- except:
- pass
- # 尝试获取对象的属性字典
- if hasattr(data, '__dict__'):
- try:
- return serialize_milvus_result(vars(data))
- except:
- pass
- # 最后的 fallback:对于无法处理的类型,返回 None 而不是字符串表示
- # 这样可以避免产生无法序列化的字符串
- return None
- # --- Models ---
- class ResourceIn(BaseModel):
- id: str
- title: str = ""
- body: str
- secure_body: str = ""
- content_type: str = "text" # text|code|credential|cookie
- metadata: dict = {}
- sort_order: int = 0
- submitted_by: str = ""
- class ResourcePatchIn(BaseModel):
- """PATCH /api/resource/{id} 请求体"""
- title: Optional[str] = None
- body: Optional[str] = None
- secure_body: Optional[str] = None
- content_type: Optional[str] = None
- metadata: Optional[dict] = None
- # Knowledge Models
- class KnowledgeIn(BaseModel):
- task: str
- content: str
- types: list[str] = ["strategy"]
- tags: dict = {}
- scopes: list[str] = ["org:cybertogether"]
- owner: str = ""
- message_id: str = ""
- resource_ids: list[str] = []
- source: dict = {} # {name, category, urls, agent_id, submitted_by, timestamp}
- eval: dict = {} # {score, helpful, harmful, confidence}
- support_capability: list[str] = []
- tools: list[str] = []
- class KnowledgeOut(BaseModel):
- id: str
- message_id: str
- types: list[str]
- task: str
- tags: dict
- scopes: list[str]
- owner: str
- content: str
- resource_ids: list[str]
- source: dict
- eval: dict
- created_at: str
- updated_at: str
- class KnowledgeUpdateIn(BaseModel):
- add_helpful_case: Optional[dict] = None
- add_harmful_case: Optional[dict] = None
- update_score: Optional[int] = Field(default=None, ge=1, le=5)
- evolve_feedback: Optional[str] = None
- class KnowledgePatchIn(BaseModel):
- """PATCH /api/knowledge/{id} 请求体(直接字段编辑)"""
- task: Optional[str] = None
- content: Optional[str] = None
- types: Optional[list[str]] = None
- tags: Optional[dict] = None
- scopes: Optional[list[str]] = None
- owner: Optional[str] = None
- support_capability: Optional[list[str]] = None
- tools: Optional[list[str]] = None
- class MessageExtractIn(BaseModel):
- """POST /api/extract 请求体(消息历史提取)"""
- messages: list[dict] # [{role: str, content: str}, ...]
- agent_id: str = "unknown"
- submitted_by: str # 必填,作为 owner
- session_key: str = ""
- class KnowledgeBatchUpdateIn(BaseModel):
- feedback_list: list[dict]
- class KnowledgeVerifyIn(BaseModel):
- action: str # "approve" | "reject"
- verified_by: str = "user"
- class KnowledgeBatchVerifyIn(BaseModel):
- knowledge_ids: List[str]
- action: str # "approve"
- verified_by: str
- class KnowledgeSearchResponse(BaseModel):
- results: list[dict]
- count: int
- # --- Tool Models ---
- class ToolIn(BaseModel):
- id: str
- name: str = ""
- version: Optional[str] = None
- introduction: str = ""
- tutorial: str = ""
- input: dict | str = ""
- output: dict | str = ""
- status: str = "未接入"
- capabilities: list[str] = []
- tool_knowledge: list[str] = []
- case_knowledge: list[str] = []
- process_knowledge: list[str] = []
- implemented_tool_ids: list[str] = []
- class ToolPatchIn(BaseModel):
- name: Optional[str] = None
- version: Optional[str] = None
- introduction: Optional[str] = None
- tutorial: Optional[str] = None
- input: Optional[dict | str] = None
- output: Optional[dict | str] = None
- status: Optional[str] = None
- capabilities: Optional[list[str]] = None
- tool_knowledge: Optional[list[str]] = None
- case_knowledge: Optional[list[str]] = None
- process_knowledge: Optional[list[str]] = None
- implemented_tool_ids: Optional[list[str]] = None
- # --- Capability Models ---
- class CapabilityIn(BaseModel):
- id: str
- name: str = ""
- criterion: str = ""
- description: str = ""
- requirements: list[str] = []
- implements: dict = {}
- tools: list[str] = []
- source_knowledge: list[str] = []
- class CapabilityPatchIn(BaseModel):
- name: Optional[str] = None
- criterion: Optional[str] = None
- description: Optional[str] = None
- requirements: Optional[list[str]] = None
- implements: Optional[dict] = None
- tools: Optional[list[str]] = None
- source_knowledge: Optional[list[str]] = None
- # --- Requirement Models ---
- class RequirementIn(BaseModel):
- id: str
- description: str = ""
- atomics: list[str] = []
- source_nodes: list[dict] = []
- status: str = "未满足"
- match_result: str = ""
- class RequirementPatchIn(BaseModel):
- description: Optional[str] = None
- atomics: Optional[list[str]] = None
- source_nodes: Optional[list[dict]] = None
- status: Optional[str] = None
- match_result: Optional[str] = None
- class ResourceNode(BaseModel):
- id: str
- title: str
- class ResourceOut(BaseModel):
- id: str
- title: str
- body: str
- secure_body: str = ""
- content_type: str = "text"
- metadata: dict = {}
- toc: Optional[ResourceNode] = None
- children: list[ResourceNode]
- prev: Optional[ResourceNode] = None
- next: Optional[ResourceNode] = None
- # --- Dedup: Globals & Prompt ---
- knowledge_processor: Optional["KnowledgeProcessor"] = None
- # --- Dedup: RelationCache ---
- class RelationCache:
- """关系缓存,存储在内存中"""
- def __init__(self):
- self._cache: Dict[str, List[str]] = {}
- def load(self) -> dict:
- return self._cache
- def save(self, cache: dict):
- self._cache = cache
- def add_relation(self, relation_type: str, knowledge_id: str):
- if relation_type not in self._cache:
- self._cache[relation_type] = []
- if knowledge_id not in self._cache[relation_type]:
- self._cache[relation_type].append(knowledge_id)
- # --- Dedup: KnowledgeProcessor ---
- class KnowledgeProcessor:
- def __init__(self):
- self._lock = asyncio.Lock()
- self._relation_cache = RelationCache()
- async def process_pending(self):
- """持续处理 pending 和 dedup_passed 知识直到队列为空,有锁防并发"""
- if self._lock.locked():
- return
- async with self._lock:
- # 第一阶段:处理 pending(去重)
- while True:
- try:
- pending = pg_store.query('status == "pending"', limit=50)
- except Exception as e:
- print(f"[KnowledgeProcessor] 查询 pending 失败: {e}")
- break
- if not pending:
- break
- for knowledge in pending:
- await self._process_one(knowledge)
- # 第二阶段:处理 dedup_passed(工具关联)
- while True:
- try:
- dedup_passed = pg_store.query('status == "dedup_passed"', limit=50)
- except Exception as e:
- print(f"[KnowledgeProcessor] 查询 dedup_passed 失败: {e}")
- break
- if not dedup_passed:
- break
- for knowledge in dedup_passed:
- await self._analyze_tool_relation(knowledge)
- async def _process_one(self, knowledge: dict):
- kid = knowledge["id"]
- now = int(time.time())
- # 乐观锁:pending → processing(时间戳存秒级)
- try:
- pg_store.update(kid, {"status": "processing", "updated_at": now})
- except Exception as e:
- print(f"[KnowledgeProcessor] 锁定 {kid} 失败: {e}")
- return
- try:
- # 向量召回 top-10(只召回 approved/checked)
- embedding = knowledge.get("task_embedding") or knowledge.get("embedding")
- if not embedding:
- embedding = await get_embedding(knowledge["task"])
- candidates = pg_store.search(
- query_embedding=embedding,
- filters='(status == "approved" or status == "checked")',
- limit=10
- )
- candidates = [c for c in candidates if c["id"] != kid]
- # 只保留相似度 >= 0.75 的候选,低于阈值的 task 语义差异太大,直接视为 none
- candidates = [c for c in candidates if c.get("score", 0) >= 0.75]
- if not candidates:
- pg_store.update(kid, {"status": "dedup_passed", "updated_at": now})
- return
- llm_result = await self._llm_judge_relations(knowledge, candidates)
- await self._apply_decision(knowledge, llm_result)
- except Exception as e:
- print(f"[KnowledgeProcessor] 处理 {kid} 失败: {e},回退到 pending")
- try:
- pg_store.update(kid, {"status": "pending", "updated_at": int(time.time())})
- except Exception:
- pass
- async def _llm_judge_relations(self, new_knowledge: dict, candidates: list) -> dict:
- existing_list = "\n".join([
- f"[{i+1}] ID: {c['id']} | Task: {c['task']} | Content: {c['content'][:300]}"
- for i, c in enumerate(candidates)
- ])
- prompt = DEDUP_RELATION_PROMPT.format(
- new_task=new_knowledge["task"],
- new_content=new_knowledge["content"],
- existing_list=existing_list
- )
- for attempt in range(3):
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 清理 markdown 代码块
- if "```" in content:
- parts = content.split("```")
- for part in parts:
- part = part.strip()
- if part.startswith("json"):
- part = part[4:].strip()
- try:
- result = json.loads(part)
- if "final_decision" in result:
- content = part
- break
- except Exception:
- continue
- result = json.loads(content)
- assert result.get("final_decision") in ("approved", "rejected")
- return result
- except Exception as e:
- print(f"[LLM Judge] 第{attempt+1}次失败: {e}")
- if attempt < 2:
- await asyncio.sleep(1)
- return {"final_decision": "approved", "relations": []}
- async def _apply_decision(self, new_knowledge: dict, llm_result: dict):
- kid = new_knowledge["id"]
- final_decision = llm_result.get("final_decision", "approved")
- relations = llm_result.get("relations", [])
- now = int(time.time())
- # 强制规则:如果存在 duplicate 或 subset 关系,必须 rejected
- if any(rel.get("type") in ("duplicate", "subset") for rel in relations):
- final_decision = "rejected"
- if final_decision == "rejected":
- # 记录 rejected 知识的关系(便于溯源为什么被拒绝)
- rejected_relationships = []
- for rel in relations:
- old_id = rel.get("old_id")
- rel_type = rel.get("type", "none")
- if old_id and rel_type != "none":
- rejected_relationships.append({"type": rel_type, "target": old_id})
- if rel_type in ("duplicate", "subset") and old_id:
- try:
- old = pg_store.get_by_id(old_id)
- if not old:
- continue
- eval_data = old.get("eval") or {}
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- helpful_history = eval_data.get("helpful_history") or []
- helpful_history.append({
- "source": "dedup",
- "related_id": kid,
- "relation_type": rel_type,
- "timestamp": now
- })
- eval_data["helpful_history"] = helpful_history
- pg_store.update(old_id, {"eval": eval_data, "updated_at": now})
- except Exception as e:
- print(f"[Apply Decision] 更新旧知识 {old_id} helpful 失败: {e}")
- pg_store.update(kid, {"status": "rejected", "relationships": json.dumps(rejected_relationships), "updated_at": now})
- else:
- new_relationships = []
- for rel in relations:
- rel_type = rel.get("type", "none")
- reverse_type = rel.get("reverse_type", "none")
- old_id = rel.get("old_id")
- if not old_id or rel_type == "none":
- continue
- new_relationships.append({"type": rel_type, "target": old_id})
- self._relation_cache.add_relation(rel_type, kid)
- self._relation_cache.add_relation(rel_type, old_id)
- if reverse_type and reverse_type != "none":
- try:
- old = pg_store.get_by_id(old_id)
- if old:
- old_rels = old.get("relationships") or []
- old_rels.append({"type": reverse_type, "target": kid})
- pg_store.update(old_id, {"relationships": json.dumps(old_rels), "updated_at": now})
- self._relation_cache.add_relation(reverse_type, old_id)
- self._relation_cache.add_relation(reverse_type, kid)
- except Exception as e:
- print(f"[Apply Decision] 更新旧知识关系 {old_id} 失败: {e}")
- pg_store.update(kid, {
- "status": "dedup_passed",
- "relationships": json.dumps(new_relationships),
- "updated_at": now
- })
- async def _llm_analyze_tools(self, knowledge: dict) -> dict:
- """使用 LLM 分析知识中涉及的工具(复用迁移脚本逻辑)"""
- task = (knowledge.get("task") or "")[:600]
- content = (knowledge.get("content") or "")[:1200]
- prompt = TOOL_ANALYSIS_PROMPT.format(task=task, content=content)
- try:
- response = await _tool_analysis_llm(
- messages=[{"role": "user", "content": prompt}],
- max_tokens=2048,
- temperature=0.1,
- )
- raw = (response.get("content") or "").strip()
- if raw.startswith("```"):
- lines = raw.split("\n")
- inner = []
- in_block = False
- for line in lines:
- if line.startswith("```"):
- in_block = not in_block
- continue
- if in_block:
- inner.append(line)
- raw = "\n".join(inner).strip()
- return json.loads(raw)
- except Exception as e:
- print(f"[Tool Analysis LLM] 调用失败: {e}")
- raise
- async def _create_or_get_tool_resource(self, tool_info: dict) -> Optional[str]:
- """创建或获取工具资源(存入 PostgreSQL tool_table)"""
- category = tool_info.get("category", "other")
- slug = tool_info.get("slug", "")
- if not slug:
- return None
- tool_id = f"tools/{category}/{slug}"
- now_ts = int(time.time())
- cursor = pg_store._get_cursor()
- try:
- cursor.execute("SELECT id FROM tool_table WHERE id = %s", (tool_id,))
- if cursor.fetchone():
- return tool_id
- cursor.execute("""
- INSERT INTO tool_table (id, name, version, introduction, tutorial, input, output,
- updated_time, status, tool_knowledge, case_knowledge, process_knowledge)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- tool_id,
- tool_info.get("name", slug),
- tool_info.get("version") or None,
- tool_info.get("description", ""),
- tool_info.get("usage", ""),
- json.dumps(tool_info.get("input", "")),
- json.dumps(tool_info.get("output", "")),
- now_ts,
- tool_info.get("status", "未接入"),
- json.dumps([]),
- json.dumps([]),
- json.dumps([]),
- ))
- pg_store.conn.commit()
- print(f"[Tool Resource] 创建新工具: {tool_id}")
- return tool_id
- finally:
- cursor.close()
- async def _update_tool_knowledge_index(self, tool_id: str, knowledge_id: str, knowledge_types: list = None):
- """根据知识类型更新工具的关联索引(tool_knowledge / case_knowledge / process_knowledge)"""
- # 确定目标字段
- if knowledge_types and 'plan' in knowledge_types:
- target_field = 'process_knowledge'
- elif knowledge_types and 'usecase' in knowledge_types:
- target_field = 'case_knowledge'
- else:
- target_field = 'tool_knowledge'
- now_ts = int(time.time())
- cursor = pg_store._get_cursor()
- try:
- cursor.execute(f"SELECT {target_field} FROM tool_table WHERE id = %s", (tool_id,))
- row = cursor.fetchone()
- if not row:
- return
- knowledge_ids = row[target_field] if isinstance(row[target_field], list) else json.loads(row[target_field] or "[]")
- if knowledge_id not in knowledge_ids:
- knowledge_ids.append(knowledge_id)
- cursor.execute(
- f"UPDATE tool_table SET {target_field} = %s, updated_time = %s WHERE id = %s",
- (json.dumps(knowledge_ids), now_ts, tool_id)
- )
- pg_store.conn.commit()
- finally:
- cursor.close()
- async def _analyze_tool_relation(self, knowledge: dict):
- """分析知识与工具的关联关系"""
- kid = knowledge["id"]
- now = int(time.time())
- # 乐观锁:dedup_passed → analyzing
- try:
- pg_store.update(kid, {"status": "analyzing", "updated_at": now})
- except Exception as e:
- print(f"[Tool Analysis] 锁定 {kid} 失败: {e}")
- return
- try:
- tool_analysis = await self._llm_analyze_tools(knowledge)
- has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
- existing_tags = knowledge.get("tags") or {}
- has_tool_tag = existing_tags.get("tool") is True
- # 情况1:LLM 判定无工具,但有 tool tag → 重新分析一次
- if not has_tools and has_tool_tag:
- print(f"[Tool Analysis] {kid} LLM 判定无工具但有 tool tag,重新分析")
- tool_analysis = await self._llm_analyze_tools(knowledge)
- has_tools = bool(tool_analysis and tool_analysis.get("has_tools"))
- # 重新分析后仍然不一致 → 知识模糊,rejected
- if not has_tools:
- pg_store.update(kid, {"status": "rejected", "updated_at": now})
- print(f"[Tool Analysis] {kid} 两次判定不一致,知识模糊,rejected")
- return
- # 情况2:无工具且无 tool tag → 直接 approved
- if not has_tools:
- pg_store.update(kid, {"status": "approved", "updated_at": now})
- return
- # 情况3/4:有工具 → 创建资源并关联
- tool_ids = []
- for tool_info in (tool_analysis.get("tools") or []):
- tool_id = await self._create_or_get_tool_resource(tool_info)
- if tool_id:
- tool_ids.append(tool_id)
- existing_resource_ids = knowledge.get("resource_ids") or []
- updated_resource_ids = list(set(existing_resource_ids + tool_ids))
- updates: dict = {
- "status": "approved",
- "resource_ids": updated_resource_ids,
- "updated_at": now
- }
- # 有工具但无 tool tag → 添加 tag
- if not has_tool_tag:
- updated_tags = dict(existing_tags)
- updated_tags["tool"] = True
- updates["tags"] = updated_tags
- print(f"[Tool Analysis] {kid} 添加 tool tag")
- pg_store.update(kid, updates)
- k_types = knowledge.get("types") or []
- for tool_id in tool_ids:
- await self._update_tool_knowledge_index(tool_id, kid, k_types)
- print(f"[Tool Analysis] {kid} 关联了 {len(tool_ids)} 个工具")
- except Exception as e:
- print(f"[Tool Analysis] {kid} 分析失败: {e},回退到 dedup_passed")
- try:
- pg_store.update(kid, {"status": "dedup_passed", "updated_at": int(time.time())})
- except Exception:
- pass
- async def _periodic_processor():
- """每60秒检测超时条目并回滚:processing(>5min)→pending,analyzing(>10min)→dedup_passed"""
- while True:
- await asyncio.sleep(60)
- try:
- now = int(time.time())
- # 回滚超时的 processing(5分钟 → pending)
- timeout_5min = now - 300
- processing = pg_store.query('status == "processing"', limit=200)
- for item in processing:
- updated_at = item.get("updated_at", 0) or 0
- updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
- if updated_at_sec < timeout_5min:
- print(f"[Periodic] 回滚超时 processing → pending: {item['id']}")
- pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
- # 回滚超时的 analyzing(10分钟 → dedup_passed)
- timeout_10min = now - 600
- analyzing = pg_store.query('status == "analyzing"', limit=200)
- for item in analyzing:
- updated_at = item.get("updated_at", 0) or 0
- updated_at_sec = updated_at // 1000 if updated_at > 1_000_000_000_000 else updated_at
- if updated_at_sec < timeout_10min:
- print(f"[Periodic] 回滚超时 analyzing → dedup_passed: {item['id']}")
- pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
- except Exception as e:
- print(f"[Periodic] 定时任务错误: {e}")
- # --- App ---
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- global pg_store, pg_resource_store, pg_tool_store, pg_capability_store, pg_requirement_store, knowledge_processor
- # 初始化 PostgreSQL(knowledge + resources + tools + capabilities + requirements)
- pg_store = PostgreSQLStore()
- pg_resource_store = PostgreSQLResourceStore()
- pg_tool_store = PostgreSQLToolStore()
- pg_capability_store = PostgreSQLCapabilityStore()
- pg_requirement_store = PostgreSQLRequirementStore()
- # 初始化去重处理器 + 启动定时兜底任务
- knowledge_processor = KnowledgeProcessor()
- periodic_task = asyncio.create_task(_periodic_processor())
- yield
- # 清理
- periodic_task.cancel()
- try:
- await periodic_task
- except asyncio.CancelledError:
- pass
- pg_store.close()
- pg_resource_store.close()
- pg_tool_store.close()
- pg_capability_store.close()
- pg_requirement_store.close()
- app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
- # 挂载静态文件
- STATIC_DIR = Path(__file__).parent / "frontend" / "dist"
- if STATIC_DIR.exists():
- app.mount("/assets", StaticFiles(directory=str(STATIC_DIR / "assets")), name="assets")
- # --- Knowledge API ---
- @app.post("/api/resource", status_code=201)
- def submit_resource(resource: ResourceIn):
- """提交资源(存入 PostgreSQL resources 表)"""
- try:
- # 加密敏感内容
- encrypted_secure_body = encrypt_content(resource.id, resource.secure_body)
- pg_resource_store.insert_or_update({
- 'id': resource.id,
- 'title': resource.title,
- 'body': resource.body,
- 'secure_body': encrypted_secure_body,
- 'content_type': resource.content_type,
- 'metadata': resource.metadata,
- 'sort_order': resource.sort_order,
- 'submitted_by': resource.submitted_by
- })
- return {"status": "ok", "id": resource.id}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/resource/{resource_id:path}", response_model=ResourceOut)
- def get_resource(resource_id: str, x_org_key: Optional[str] = Header(None)):
- """获取资源详情(从 PostgreSQL)"""
- try:
- row = pg_resource_store.get_by_id(resource_id)
- if not row:
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- # 解密敏感内容
- secure_body = decrypt_content(resource_id, row.get("secure_body", ""), x_org_key)
- # 计算导航上下文
- root_id = resource_id.split("/")[0] if "/" in resource_id else resource_id
- # TOC (根节点)
- toc = None
- if "/" in resource_id:
- toc_row = pg_resource_store.get_by_id(root_id)
- if toc_row:
- toc = ResourceNode(id=toc_row["id"], title=toc_row["title"])
- # Children (子节点)
- children_rows = pg_resource_store.list_resources(prefix=f"{resource_id}/", limit=1000)
- children = [ResourceNode(id=r["id"], title=r["title"]) for r in children_rows
- if r["id"].count("/") == resource_id.count("/") + 1]
- # Prev/Next (同级节点)
- prev_node, next_node = pg_resource_store.get_siblings(resource_id)
- prev = ResourceNode(id=prev_node["id"], title=prev_node["title"]) if prev_node else None
- next = ResourceNode(id=next_node["id"], title=next_node["title"]) if next_node else None
- return ResourceOut(
- id=row["id"],
- title=row["title"],
- body=row["body"],
- secure_body=secure_body,
- content_type=row["content_type"],
- metadata=row.get("metadata", {}),
- toc=toc,
- children=children,
- prev=prev,
- next=next,
- )
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/resource/{resource_id:path}")
- def patch_resource(resource_id: str, patch: ResourcePatchIn):
- """更新resource字段(PostgreSQL)"""
- try:
- # 检查是否存在
- if not pg_resource_store.get_by_id(resource_id):
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- # 构建更新字典
- updates = {}
- if patch.title is not None:
- updates['title'] = patch.title
- if patch.body is not None:
- updates['body'] = patch.body
- if patch.secure_body is not None:
- updates['secure_body'] = encrypt_content(resource_id, patch.secure_body)
- if patch.content_type is not None:
- updates['content_type'] = patch.content_type
- if patch.metadata is not None:
- updates['metadata'] = patch.metadata
- if not updates:
- return {"status": "ok", "message": "No fields to update"}
- pg_resource_store.update(resource_id, updates)
- return {"status": "ok", "id": resource_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/resource")
- def list_resources(
- content_type: Optional[str] = Query(None),
- limit: int = Query(100, ge=1, le=1000)
- ):
- """列出所有resource(PostgreSQL)"""
- try:
- results = pg_resource_store.list_resources(
- content_type=content_type,
- limit=limit
- )
- return {"results": results, "count": len(results)}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/resource/{resource_id:path}")
- def delete_resource(resource_id: str):
- """删除单个resource(PostgreSQL)"""
- try:
- if not pg_resource_store.get_by_id(resource_id):
- raise HTTPException(status_code=404, detail=f"Resource not found: {resource_id}")
- pg_resource_store.delete(resource_id)
- return {"status": "ok", "id": resource_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # --- Knowledge API ---
- # ===== Knowledge API =====
- async def _llm_rerank(query: str, candidates: list[dict], top_k: int) -> list[str]:
- """
- 使用 LLM 对候选知识进行精排
- Args:
- query: 查询文本
- candidates: 候选知识列表
- top_k: 返回数量
- Returns:
- 排序后的知识 ID 列表
- """
- if not candidates:
- return []
- # 构造 prompt
- candidates_text = "\n".join([
- f"[{i+1}] ID: {c['id']}\nTask: {c['task']}\nContent: {c['content'][:200]}..."
- for i, c in enumerate(candidates)
- ])
- prompt = RERANK_PROMPT_TEMPLATE.format(
- top_k=top_k,
- query=query,
- candidates_text=candidates_text
- )
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 解析 ID 列表
- selected_ids = [
- idx.strip()
- for idx in re.split(r'[,\s]+', content)
- if idx.strip().startswith(("knowledge-", "research-"))
- ]
- return selected_ids[:top_k]
- except Exception as e:
- print(f"[LLM Rerank] 失败: {e}")
- return []
- @app.get("/api/knowledge/search")
- async def search_knowledge_api(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(default=5, ge=1, le=20),
- min_score: int = Query(default=3, ge=1, le=5),
- types: Optional[str] = None,
- owner: Optional[str] = None
- ):
- """检索知识(向量召回 + LLM 精排)"""
- try:
- # 1. 生成查询向量
- query_embedding = await get_embedding(q)
- # 2. 构建过滤表达式
- filters = []
- if types:
- type_list = [t.strip() for t in types.split(',') if t.strip()]
- for t in type_list:
- filters.append(f'array_contains(types, "{t}")')
- if owner:
- owner_list = [o.strip() for o in owner.split(',') if o.strip()]
- if len(owner_list) == 1:
- filters.append(f'owner == "{owner_list[0]}"')
- else:
- # 多个owner用OR连接
- owner_filters = [f'owner == "{o}"' for o in owner_list]
- filters.append(f'({" or ".join(owner_filters)})')
- # 添加 min_score 过滤
- filters.append(f'eval["score"] >= {min_score}')
- # 只搜索 approved 和 checked 的知识
- filters.append('(status == "approved" or status == "checked")')
- filter_expr = ' and '.join(filters) if filters else None
- # 3. 向量召回(3*k 个候选)
- recall_limit = top_k * 3
- candidates = pg_store.search(
- query_embedding=query_embedding,
- filters=filter_expr,
- limit=recall_limit
- )
- if not candidates:
- return {"results": [], "count": 0, "reranked": False}
- # 转换为可序列化的格式
- serialized_candidates = [serialize_milvus_result(c) for c in candidates]
- # 为了保证搜索的极致速度,直接返回向量召回的 top-k(跳过缓慢的 LLM 精排)
- return {"results": serialized_candidates[:top_k], "count": len(serialized_candidates[:top_k]), "reranked": False}
-
- except Exception as e:
- print(f"[Knowledge Search] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge", status_code=201)
- async def save_knowledge(knowledge: KnowledgeIn, background_tasks: BackgroundTasks):
- """保存新知识"""
- try:
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- now = int(time.time())
- # 设置默认值
- owner = knowledge.owner or f"agent:{knowledge.source.get('agent_id', 'unknown')}"
- # 准备 source
- source = {
- "name": knowledge.source.get("name", ""),
- "category": knowledge.source.get("category", ""),
- "urls": knowledge.source.get("urls", []),
- "agent_id": knowledge.source.get("agent_id", "unknown"),
- "submitted_by": knowledge.source.get("submitted_by", ""),
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "message_id": knowledge.message_id
- }
- # 准备 eval
- eval_data = {
- "score": knowledge.eval.get("score", 3),
- "helpful": knowledge.eval.get("helpful", 1),
- "harmful": knowledge.eval.get("harmful", 0),
- "confidence": knowledge.eval.get("confidence", 0.5),
- "helpful_history": [],
- "harmful_history": []
- }
- # 生成向量(task_embedding + content_embedding 双向量)
- task_embedding = await get_embedding(knowledge.task)
- content_embedding = await get_embedding(knowledge.content)
- # 提取 tag keys(用于高效筛选)
- tag_keys = list(knowledge.tags.keys()) if isinstance(knowledge.tags, dict) else []
- # 准备插入数据
- insert_data = {
- "id": knowledge_id,
- "task_embedding": task_embedding,
- "content_embedding": content_embedding,
- "message_id": knowledge.message_id,
- "task": knowledge.task,
- "content": knowledge.content,
- "types": knowledge.types,
- "tags": knowledge.tags,
- "tag_keys": tag_keys,
- "scopes": knowledge.scopes,
- "owner": owner,
- "resource_ids": knowledge.resource_ids,
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "pending",
- "relationships": json.dumps([]),
- "support_capability": knowledge.support_capability,
- "tools": knowledge.tools,
- }
- print(f"[Save Knowledge] 插入数据: {json.dumps({k: v for k, v in insert_data.items() if k != 'embedding'}, ensure_ascii=False)}")
- # 插入 Milvus
- pg_store.insert(insert_data)
- # 触发后台去重处理
- background_tasks.add_task(knowledge_processor.process_pending)
- return {"status": "pending", "knowledge_id": knowledge_id, "message": "知识已入队,正在处理去重..."}
- except Exception as e:
- print(f"[Save Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge")
- def list_knowledge(
- page: int = Query(default=1, ge=1),
- page_size: int = Query(default=20, ge=1, le=1000),
- types: Optional[str] = None,
- scopes: Optional[str] = None,
- owner: Optional[str] = None,
- tags: Optional[str] = None,
- status: Optional[str] = None
- ):
- """列出知识(支持后端筛选和分页)"""
- try:
- # 构建过滤表达式
- filters = []
- # types 支持多个,用 AND 连接(交集:必须同时包含所有选中的type)
- if types:
- type_list = [t.strip() for t in types.split(',') if t.strip()]
- for t in type_list:
- filters.append(f'array_contains(types, "{t}")')
- if scopes:
- filters.append(f'array_contains(scopes, "{scopes}")')
- if owner:
- owner_list = [o.strip() for o in owner.split(',') if o.strip()]
- if len(owner_list) == 1:
- filters.append(f'owner == "{owner_list[0]}"')
- else:
- # 多个owner用OR连接
- owner_filters = [f'owner == "{o}"' for o in owner_list]
- filters.append(f'({" or ".join(owner_filters)})')
- # tags 支持多个,用 AND 连接(使用 tag_keys 数组进行高效筛选)
- if tags:
- tag_list = [t.strip() for t in tags.split(',') if t.strip()]
- for t in tag_list:
- filters.append(f'array_contains(tag_keys, "{t}")')
- # 只返回指定 status 的知识(默认 approved 和 checked)
- status_list = [s.strip() for s in (status or "approved,checked").split(',') if s.strip()]
- status_conditions = ' or '.join([f'status == "{s}"' for s in status_list])
- filters.append(f'({status_conditions})')
- # 如果没有过滤条件,查询所有
- filter_expr = ' and '.join(filters) if filters else 'id != ""'
- # 查询 Milvus(先获取所有符合条件的数据)
- # Milvus 的 limit 是总数限制,我们需要获取足够多的数据来支持分页
- max_limit = 10000 # 设置一个合理的上限
- results = pg_store.query(filter_expr, limit=max_limit)
- # 转换为可序列化的格式
- serialized_results = [serialize_milvus_result(r) for r in results]
- # 按 created_at 降序排序(最新的在前)
- serialized_results.sort(key=lambda x: x.get('created_at', 0), reverse=True)
- # 计算分页
- total = len(serialized_results)
- total_pages = (total + page_size - 1) // page_size # 向上取整
- start_idx = (page - 1) * page_size
- end_idx = start_idx + page_size
- page_results = serialized_results[start_idx:end_idx]
- return {
- "results": page_results,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": total_pages
- }
- }
- except Exception as e:
- print(f"[List Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/meta/tags")
- def get_all_tags():
- """获取所有已有的 tags"""
- try:
- # 查询所有知识
- results = pg_store.query('id != ""', limit=10000)
- all_tags = set()
- for item in results:
- # 转换为标准字典
- serialized_item = serialize_milvus_result(item)
- tags_dict = serialized_item.get("tags", {})
- if isinstance(tags_dict, dict):
- for key in tags_dict.keys():
- all_tags.add(key)
- return {"tags": sorted(list(all_tags))}
- except Exception as e:
- print(f"[Get Tags] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/pending")
- def get_pending_knowledge(limit: int = Query(default=50, ge=1, le=200)):
- """查询待处理队列(pending + processing + dedup_passed + analyzing)"""
- try:
- pending = pg_store.query(
- 'status == "pending" or status == "processing" or status == "dedup_passed" or status == "analyzing"',
- limit=limit
- )
- serialized = [serialize_milvus_result(r) for r in pending]
- return {"results": serialized, "count": len(serialized)}
- except Exception as e:
- print(f"[Pending] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/process")
- async def trigger_process(force: bool = Query(default=False)):
- """手动触发去重处理。force=true 时先回滚所有 processing → pending,analyzing → dedup_passed"""
- try:
- if force:
- processing = pg_store.query('status == "processing"', limit=200)
- for item in processing:
- pg_store.update(item["id"], {"status": "pending", "updated_at": int(time.time())})
- print(f"[Manual Process] 回滚 {len(processing)} 条 processing → pending")
- analyzing = pg_store.query('status == "analyzing"', limit=200)
- for item in analyzing:
- pg_store.update(item["id"], {"status": "dedup_passed", "updated_at": int(time.time())})
- print(f"[Manual Process] 回滚 {len(analyzing)} 条 analyzing → dedup_passed")
- asyncio.create_task(knowledge_processor.process_pending())
- return {"status": "ok", "message": "处理任务已触发"}
- except Exception as e:
- print(f"[Manual Process] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/migrate")
- async def migrate_knowledge_schema():
- """手动触发 schema 迁移(PostgreSQL不需要此功能)"""
- return {"status": "ok", "message": "PostgreSQL不需要schema迁移"}
- @app.get("/api/knowledge/status/{knowledge_id}")
- def get_knowledge_status(knowledge_id: str):
- """查询单条知识的处理状态和关系"""
- try:
- result = pg_store.get_by_id(knowledge_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- serialized = serialize_milvus_result(result)
- return {
- "id": knowledge_id,
- "status": serialized.get("status", "approved"),
- "relationships": serialized.get("relationships", []),
- "created_at": serialized.get("created_at"),
- "updated_at": serialized.get("updated_at"),
- }
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Knowledge Status] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/knowledge/{knowledge_id}")
- def get_knowledge(knowledge_id: str):
- """获取单条知识"""
- try:
- result = pg_store.get_by_id(knowledge_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- return serialize_milvus_result(result)
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Get Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- async def _evolve_knowledge_with_llm(old_content: str, feedback: str) -> str:
- """使用 LLM 进行知识进化重写"""
- prompt = KNOWLEDGE_EVOLVE_PROMPT_TEMPLATE.format(
- old_content=old_content,
- feedback=feedback
- )
- try:
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- evolved = response.get("content", "").strip()
- if len(evolved) < 5:
- raise ValueError("LLM output too short")
- return evolved
- except Exception as e:
- print(f"知识进化失败,采用追加模式回退: {e}")
- return f"{old_content}\n\n---\n[Update {datetime.now().strftime('%Y-%m-%d')}]: {feedback}"
- @app.put("/api/knowledge/{knowledge_id}")
- async def update_knowledge(knowledge_id: str, update: KnowledgeUpdateIn):
- """更新知识评估,支持知识进化"""
- try:
- # 获取现有知识
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- eval_data = existing.get("eval", {})
- # 更新评分
- if update.update_score is not None:
- eval_data["score"] = update.update_score
- # 添加有效案例
- if update.add_helpful_case:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- if "helpful_history" not in eval_data:
- eval_data["helpful_history"] = []
- eval_data["helpful_history"].append(update.add_helpful_case)
- # 添加有害案例
- if update.add_harmful_case:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- if "harmful_history" not in eval_data:
- eval_data["harmful_history"] = []
- eval_data["harmful_history"].append(update.add_harmful_case)
- # 知识进化
- content = existing["content"]
- need_reembed = False
- if update.evolve_feedback:
- content = await _evolve_knowledge_with_llm(content, update.evolve_feedback)
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- need_reembed = True
- # 准备更新数据
- updates = {
- "content": content,
- "eval": eval_data,
- }
- # 如果内容变化,重新生成向量
- if need_reembed:
- embedding = await get_embedding(existing['task'])
- updates["task_embedding"] = embedding
- # 更新 Milvus
- pg_store.update(knowledge_id, updates)
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Update Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/knowledge/{knowledge_id}")
- async def patch_knowledge(knowledge_id: str, patch: KnowledgePatchIn):
- """直接编辑知识字段"""
- try:
- # 获取现有知识
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- updates = {}
- need_reembed = False
- need_content_reembed = False
- if patch.task is not None:
- updates["task"] = patch.task
- need_reembed = True
- if patch.content is not None:
- updates["content"] = patch.content
- need_content_reembed = True
- if patch.types is not None:
- updates["types"] = patch.types
- if patch.tags is not None:
- updates["tags"] = patch.tags
- # 同时更新 tag_keys
- updates["tag_keys"] = list(patch.tags.keys()) if isinstance(patch.tags, dict) else []
- if patch.scopes is not None:
- updates["scopes"] = patch.scopes
- if patch.owner is not None:
- updates["owner"] = patch.owner
- if patch.support_capability is not None:
- updates["support_capability"] = patch.support_capability
- if patch.tools is not None:
- updates["tools"] = patch.tools
- if not updates:
- return {"status": "ok", "knowledge_id": knowledge_id}
- # 如果 task 变化,重新生成 task_embedding
- if need_reembed:
- task = updates.get("task", existing["task"])
- embedding = await get_embedding(task)
- updates["task_embedding"] = embedding
- # 如果 content 变化,重新生成 content_embedding
- if need_content_reembed:
- content = updates.get("content", existing["content"])
- content_embedding = await get_embedding(content)
- updates["content_embedding"] = content_embedding
- # 更新 Milvus
- pg_store.update(knowledge_id, updates)
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Patch Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/knowledge/{knowledge_id}")
- def delete_knowledge(knowledge_id: str):
- """删除单条知识"""
- try:
- # 检查知识是否存在
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- # 从 PostgreSQL 删除
- pg_store.delete(knowledge_id)
- print(f"[Delete Knowledge] 已删除知识: {knowledge_id}")
- return {"status": "ok", "knowledge_id": knowledge_id}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Delete Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_delete")
- def batch_delete_knowledge(knowledge_ids: List[str] = Body(...)):
- """批量删除知识"""
- try:
- if not knowledge_ids:
- raise HTTPException(status_code=400, detail="knowledge_ids cannot be empty")
- # 批量删除
- cursor = pg_store._get_cursor()
- try:
- cursor.execute(
- "DELETE FROM knowledge WHERE id = ANY(%s)",
- (knowledge_ids,)
- )
- pg_store.conn.commit()
- deleted_count = cursor.rowcount
- print(f"[Batch Delete] 已删除 {deleted_count} 条知识")
- return {"status": "ok", "deleted_count": deleted_count}
- finally:
- cursor.close()
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Batch Delete] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_verify")
- async def batch_verify_knowledge(batch: KnowledgeBatchVerifyIn):
- """批量验证通过(approved → checked)"""
- if not batch.knowledge_ids:
- return {"status": "ok", "updated": 0}
- try:
- now_iso = datetime.now(timezone.utc).isoformat()
- updated_count = 0
- for kid in batch.knowledge_ids:
- existing = pg_store.get_by_id(kid)
- if not existing:
- continue
- eval_data = existing.get("eval") or {}
- eval_data["verification"] = {
- "status": "checked",
- "verified_by": batch.verified_by,
- "verified_at": now_iso,
- "note": None,
- "issue_type": None,
- "issue_action": None,
- }
- pg_store.update(kid, {"eval": eval_data, "status": "checked", "updated_at": int(time.time())})
- updated_count += 1
- return {"status": "ok", "updated": updated_count}
- except Exception as e:
- print(f"[Batch Verify] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/{knowledge_id}/verify")
- async def verify_knowledge(knowledge_id: str, verify: KnowledgeVerifyIn):
- """知识验证:approve 切换 approved↔checked,reject 设为 rejected"""
- try:
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Knowledge not found: {knowledge_id}")
- current_status = existing.get("status", "approved")
- if verify.action == "approve":
- # checked → approved(取消验证),其他 → checked
- new_status = "approved" if current_status == "checked" else "checked"
- pg_store.update(knowledge_id, {
- "status": new_status,
- "updated_at": int(time.time())
- })
- return {"status": "ok", "new_status": new_status,
- "message": "已取消验证" if new_status == "approved" else "验证通过"}
- elif verify.action == "reject":
- pg_store.update(knowledge_id, {
- "status": "rejected",
- "updated_at": int(time.time())
- })
- return {"status": "ok", "new_status": "rejected", "message": "已拒绝"}
- else:
- raise HTTPException(status_code=400, detail=f"Unknown action: {verify.action}")
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Verify Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/batch_update")
- async def batch_update_knowledge(batch: KnowledgeBatchUpdateIn):
- """批量反馈知识有效性"""
- if not batch.feedback_list:
- return {"status": "ok", "updated": 0}
- try:
- # 先处理无需进化的,收集需要进化的
- evolution_tasks = [] # [(knowledge_id, old_content, feedback, eval_data)]
- simple_updates = [] # [(knowledge_id, is_effective, eval_data)]
- for item in batch.feedback_list:
- knowledge_id = item.get("knowledge_id")
- is_effective = item.get("is_effective")
- feedback = item.get("feedback", "")
- if not knowledge_id:
- continue
- existing = pg_store.get_by_id(knowledge_id)
- if not existing:
- continue
- eval_data = existing.get("eval", {})
- if is_effective and feedback:
- evolution_tasks.append((knowledge_id, existing["content"], feedback, eval_data, existing["task"]))
- else:
- simple_updates.append((knowledge_id, is_effective, eval_data))
- # 执行简单更新
- for knowledge_id, is_effective, eval_data in simple_updates:
- if is_effective:
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- else:
- eval_data["harmful"] = eval_data.get("harmful", 0) + 1
- pg_store.update(knowledge_id, {"eval": eval_data})
- # 并发执行知识进化
- if evolution_tasks:
- print(f"🧬 并发处理 {len(evolution_tasks)} 条知识进化...")
- evolved_results = await asyncio.gather(
- *[_evolve_knowledge_with_llm(old, fb) for _, old, fb, _, _ in evolution_tasks]
- )
- for (knowledge_id, _, _, eval_data, task), evolved_content in zip(evolution_tasks, evolved_results):
- eval_data["helpful"] = eval_data.get("helpful", 0) + 1
- # 重新生成向量(只基于 task)
- embedding = await get_embedding(task)
- pg_store.update(knowledge_id, {
- "content": evolved_content,
- "eval": eval_data,
- "task_embedding": embedding
- })
- return {"status": "ok", "updated": len(simple_updates) + len(evolution_tasks)}
- except Exception as e:
- print(f"[Batch Update] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/knowledge/slim")
- async def slim_knowledge(model: str = "google/gemini-2.5-flash-lite"):
- """知识库瘦身:合并语义相似知识"""
- try:
- # 获取所有知识
- all_knowledge = pg_store.query('id != ""', limit=10000)
- # 转换为可序列化的格式
- all_knowledge = [serialize_milvus_result(item) for item in all_knowledge]
- if len(all_knowledge) < 2:
- return {"status": "ok", "message": f"知识库仅有 {len(all_knowledge)} 条,无需瘦身"}
- # 构造发给大模型的内容
- entries_text = ""
- for item in all_knowledge:
- eval_data = item.get("eval", {})
- types = item.get("types", [])
- entries_text += f"[ID: {item['id']}] [Types: {','.join(types)}] "
- entries_text += f"[Helpful: {eval_data.get('helpful', 0)}, Harmful: {eval_data.get('harmful', 0)}] [Score: {eval_data.get('score', 3)}]\n"
- entries_text += f"Task: {item['task']}\n"
- entries_text += f"Content: {item['content'][:200]}...\n\n"
- prompt = KNOWLEDGE_SLIM_PROMPT_TEMPLATE.format(entries_text=entries_text)
- print(f"\n[知识瘦身] 正在调用 {model} 分析 {len(all_knowledge)} 条知识...")
- slim_llm = create_openrouter_llm_call(model=model)
- response = await slim_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- if not content:
- raise HTTPException(status_code=500, detail="LLM 返回为空")
- # 解析大模型输出
- report_line = ""
- new_entries = []
- blocks = [b.strip() for b in content.split("===") if b.strip()]
- for block in blocks:
- if block.startswith("REPORT:"):
- report_line = block
- continue
- lines = block.split("\n")
- kid, types, helpful, harmful, score, task, content_lines = None, [], 0, 0, 3, "", []
- current_field = None
- for line in lines:
- if line.startswith("ID:"):
- kid = line[3:].strip()
- current_field = None
- elif line.startswith("TYPES:"):
- types_str = line[6:].strip()
- types = [t.strip() for t in types_str.split(",") if t.strip()]
- current_field = None
- elif line.startswith("HELPFUL:"):
- try:
- helpful = int(line[8:].strip())
- except Exception:
- helpful = 0
- current_field = None
- elif line.startswith("HARMFUL:"):
- try:
- harmful = int(line[8:].strip())
- except Exception:
- harmful = 0
- current_field = None
- elif line.startswith("SCORE:"):
- try:
- score = int(line[6:].strip())
- except Exception:
- score = 3
- current_field = None
- elif line.startswith("TASK:"):
- task = line[5:].strip()
- current_field = "task"
- elif line.startswith("CONTENT:"):
- content_lines.append(line[8:].strip())
- current_field = "content"
- elif current_field == "task":
- task += "\n" + line
- elif current_field == "content":
- content_lines.append(line)
- if kid and content_lines:
- new_entries.append({
- "id": kid,
- "types": types if types else ["strategy"],
- "helpful": helpful,
- "harmful": harmful,
- "score": score,
- "task": task.strip(),
- "content": "\n".join(content_lines).strip()
- })
- if not new_entries:
- raise HTTPException(status_code=500, detail="解析大模型输出失败")
- # 生成向量并重建知识库
- print(f"[知识瘦身] 正在为 {len(new_entries)} 条知识生成向量...")
- # 批量生成向量(只基于 task)
- texts = [e['task'] for e in new_entries]
- embeddings = await get_embeddings_batch(texts)
- # 清空并重建(PostgreSQL使用TRUNCATE)
- cursor = pg_store._get_cursor()
- try:
- cursor.execute("TRUNCATE TABLE knowledge")
- pg_store.conn.commit()
- finally:
- cursor.close()
- knowledge_list = []
- for e, embedding in zip(new_entries, embeddings):
- eval_data = {
- "score": e["score"],
- "helpful": e["helpful"],
- "harmful": e["harmful"],
- "confidence": 0.9,
- "helpful_history": [],
- "harmful_history": []
- }
- source = {
- "name": "slim",
- "category": "exp",
- "urls": [],
- "agent_id": "slim",
- "submitted_by": "system",
- "timestamp": datetime.now(timezone.utc).isoformat()
- }
- knowledge_list.append({
- "id": e["id"],
- "task_embedding": embedding,
- "message_id": "",
- "task": e["task"],
- "content": e["content"],
- "types": e["types"],
- "tags": {},
- "tag_keys": [],
- "scopes": ["org:cybertogether"],
- "owner": "agent:slim",
- "resource_ids": [],
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "approved",
- "relationships": json.dumps([])
- })
- pg_store.insert_batch(knowledge_list)
- result_msg = f"瘦身完成:{len(all_knowledge)} → {len(new_entries)} 条知识"
- if report_line:
- result_msg += f"\n{report_line}"
- print(f"[知识瘦身] {result_msg}")
- return {"status": "ok", "before": len(all_knowledge), "after": len(new_entries), "report": report_line}
- except HTTPException:
- raise
- except Exception as e:
- print(f"[Slim Knowledge] 错误: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/extract")
- async def extract_knowledge_from_messages(extract_req: MessageExtractIn, background_tasks: BackgroundTasks):
- """从消息历史中提取知识(LLM 分析)"""
- if not extract_req.submitted_by:
- raise HTTPException(status_code=400, detail="submitted_by is required")
- messages = extract_req.messages
- if not messages or len(messages) == 0:
- return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
- # 构造消息历史文本
- messages_text = ""
- for msg in messages:
- role = msg.get("role", "unknown")
- content = msg.get("content", "")
- messages_text += f"[{role}]: {content}\n\n"
- # LLM 提取知识
- prompt = MESSAGE_EXTRACT_PROMPT_TEMPLATE.format(messages_text=messages_text)
- try:
- print(f"\n[Extract] 正在从 {len(messages)} 条消息中提取知识...")
- response = await _dedup_llm(
- messages=[{"role": "user", "content": prompt}],
- )
- content = response.get("content", "").strip()
- # 尝试解析 JSON
- # 移除可能的 markdown 代码块标记
- if content.startswith("```json"):
- content = content[7:]
- if content.startswith("```"):
- content = content[3:]
- if content.endswith("```"):
- content = content[:-3]
- content = content.strip()
- extracted_knowledge = json.loads(content)
- if not isinstance(extracted_knowledge, list):
- raise ValueError("LLM output is not a list")
- if not extracted_knowledge:
- return {"status": "ok", "extracted_count": 0, "knowledge_ids": []}
- # 批量生成向量(只基于 task)
- texts = [item.get('task', '') for item in extracted_knowledge]
- embeddings = await get_embeddings_batch(texts)
- # 保存提取的知识
- knowledge_ids = []
- now = int(time.time())
- knowledge_list = []
- for item, embedding in zip(extracted_knowledge, embeddings):
- task = item.get("task", "")
- knowledge_content = item.get("content", "")
- types = item.get("types", ["strategy"])
- score = item.get("score", 3)
- if not task or not knowledge_content:
- continue
- # 生成 ID
- timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- random_suffix = uuid.uuid4().hex[:4]
- knowledge_id = f"knowledge-{timestamp}-{random_suffix}"
- # 准备数据
- source = {
- "name": "message_extraction",
- "category": "exp",
- "urls": [],
- "agent_id": extract_req.agent_id,
- "submitted_by": extract_req.submitted_by,
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "session_key": extract_req.session_key
- }
- eval_data = {
- "score": score,
- "helpful": 1,
- "harmful": 0,
- "confidence": 0.7,
- "helpful_history": [],
- "harmful_history": []
- }
- knowledge_list.append({
- "id": knowledge_id,
- "task_embedding": embedding,
- "message_id": "",
- "task": task,
- "content": knowledge_content,
- "types": types,
- "tags": {},
- "tag_keys": [],
- "scopes": ["org:cybertogether"],
- "owner": extract_req.submitted_by,
- "resource_ids": [],
- "source": source,
- "eval": eval_data,
- "created_at": now,
- "updated_at": now,
- "status": "pending",
- "relationships": json.dumps([]),
- })
- knowledge_ids.append(knowledge_id)
- # 批量插入
- if knowledge_list:
- pg_store.insert_batch(knowledge_list)
- background_tasks.add_task(knowledge_processor.process_pending)
- print(f"[Extract] 成功提取并保存 {len(knowledge_ids)} 条知识")
- return {
- "status": "ok",
- "extracted_count": len(knowledge_ids),
- "knowledge_ids": knowledge_ids
- }
- except json.JSONDecodeError as e:
- print(f"[Extract] JSON 解析失败: {e}")
- print(f"[Extract] LLM 输出: {content[:500]}")
- return {"status": "error", "error": "Failed to parse LLM output", "extracted_count": 0}
- except Exception as e:
- print(f"[Extract] 提取失败: {e}")
- return {"status": "error", "error": str(e), "extracted_count": 0}
- # ===== Tool API =====
- @app.post("/api/tool", status_code=201)
- async def create_tool(tool: ToolIn):
- """创建或更新工具"""
- try:
- now = int(time.time())
- embedding = await get_embedding(f"{tool.name} {tool.introduction}")
- pg_tool_store.insert_or_update({
- 'id': tool.id,
- 'name': tool.name,
- 'version': tool.version,
- 'introduction': tool.introduction,
- 'tutorial': tool.tutorial,
- 'input': tool.input,
- 'output': tool.output,
- 'updated_time': now,
- 'status': tool.status,
- 'capabilities': tool.capabilities,
- 'tool_knowledge': tool.tool_knowledge,
- 'case_knowledge': tool.case_knowledge,
- 'process_knowledge': tool.process_knowledge,
- 'embedding': embedding,
- })
- return {"status": "ok", "id": tool.id}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/tool")
- def list_tools(
- status: Optional[str] = Query(None),
- limit: int = Query(100, ge=1, le=1000),
- offset: int = Query(0, ge=0),
- ):
- """列出工具"""
- try:
- results = pg_tool_store.list_all(limit=limit, offset=offset, status=status)
- total = pg_tool_store.count(status=status)
- return {"results": results, "total": total}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/tool/search")
- async def search_tools(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(5, ge=1, le=100),
- status: Optional[str] = None,
- ):
- """向量检索工具"""
- try:
- query_embedding = await get_embedding(q)
- results = pg_tool_store.search(query_embedding, limit=top_k, status=status)
- return {"results": results, "count": len(results)}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/tool/{tool_id:path}")
- def get_tool(tool_id: str):
- """获取单个工具详情"""
- try:
- result = pg_tool_store.get_by_id(tool_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Tool not found: {tool_id}")
- return result
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/tool/{tool_id:path}")
- async def patch_tool(tool_id: str, patch: ToolPatchIn):
- """更新工具字段"""
- try:
- if not pg_tool_store.get_by_id(tool_id):
- raise HTTPException(status_code=404, detail=f"Tool not found: {tool_id}")
- updates = {}
- need_reembed = False
- for field in ('name', 'version', 'introduction', 'tutorial', 'input', 'output',
- 'status', 'capabilities', 'tool_knowledge', 'case_knowledge', 'process_knowledge'):
- value = getattr(patch, field)
- if value is not None:
- updates[field] = value
- if field in ('name', 'introduction'):
- need_reembed = True
- if not updates:
- return {"status": "ok", "id": tool_id}
- updates['updated_time'] = int(time.time())
- if need_reembed:
- existing = pg_tool_store.get_by_id(tool_id)
- name = updates.get('name', existing['name'])
- intro = updates.get('introduction', existing['introduction'])
- updates['embedding'] = await get_embedding(f"{name} {intro}")
- pg_tool_store.update(tool_id, updates)
- return {"status": "ok", "id": tool_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/tool/{tool_id:path}")
- def delete_tool(tool_id: str):
- """删除工具"""
- try:
- if not pg_tool_store.get_by_id(tool_id):
- raise HTTPException(status_code=404, detail=f"Tool not found: {tool_id}")
- pg_tool_store.delete(tool_id)
- return {"status": "ok", "id": tool_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # ===== Capability API =====
- @app.post("/api/capability", status_code=201)
- async def create_capability(cap: CapabilityIn):
- """创建或更新原子能力"""
- try:
- embedding = await get_embedding(f"{cap.name} {cap.description}")
- pg_capability_store.insert_or_update({
- 'id': cap.id,
- 'name': cap.name,
- 'criterion': cap.criterion,
- 'description': cap.description,
- 'requirements': cap.requirements,
- 'implements': cap.implements,
- 'tools': cap.tools,
- 'source_knowledge': cap.source_knowledge,
- 'embedding': embedding,
- })
- return {"status": "ok", "id": cap.id}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/capability")
- def list_capabilities(
- limit: int = Query(100, ge=1, le=1000),
- offset: int = Query(0, ge=0),
- ):
- """列出原子能力"""
- try:
- results = pg_capability_store.list_all(limit=limit, offset=offset)
- total = pg_capability_store.count()
- return {"results": results, "total": total}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/capability/search")
- async def search_capabilities(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(5, ge=1, le=100),
- ):
- """向量检索原子能力"""
- try:
- query_embedding = await get_embedding(q)
- results = pg_capability_store.search(query_embedding, limit=top_k)
- return {"results": results, "count": len(results)}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/capability/{cap_id}")
- def get_capability(cap_id: str):
- """获取单个原子能力"""
- try:
- result = pg_capability_store.get_by_id(cap_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Capability not found: {cap_id}")
- return result
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/capability/{cap_id}")
- async def patch_capability(cap_id: str, patch: CapabilityPatchIn):
- """更新原子能力字段"""
- try:
- existing = pg_capability_store.get_by_id(cap_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Capability not found: {cap_id}")
- updates = {}
- need_reembed = False
- for field in ('name', 'criterion', 'description', 'requirements',
- 'implements', 'tools', 'source_knowledge'):
- value = getattr(patch, field)
- if value is not None:
- updates[field] = value
- if field in ('name', 'description'):
- need_reembed = True
- if not updates:
- return {"status": "ok", "id": cap_id}
- if need_reembed:
- name = updates.get('name', existing['name'])
- desc = updates.get('description', existing['description'])
- updates['embedding'] = await get_embedding(f"{name} {desc}")
- pg_capability_store.update(cap_id, updates)
- return {"status": "ok", "id": cap_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/capability/{cap_id}")
- def delete_capability(cap_id: str):
- """删除原子能力"""
- try:
- if not pg_capability_store.get_by_id(cap_id):
- raise HTTPException(status_code=404, detail=f"Capability not found: {cap_id}")
- pg_capability_store.delete(cap_id)
- return {"status": "ok", "id": cap_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- # ===== Requirement API =====
- @app.post("/api/requirement", status_code=201)
- async def create_requirement(req: RequirementIn):
- """创建或更新需求"""
- try:
- embedding = await get_embedding(req.description)
- pg_requirement_store.insert_or_update({
- 'id': req.id,
- 'description': req.description,
- 'atomics': req.atomics,
- 'source_nodes': req.source_nodes,
- 'status': req.status,
- 'match_result': req.match_result,
- 'embedding': embedding,
- })
- return {"status": "ok", "id": req.id}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/requirement")
- def list_requirements(
- status: Optional[str] = Query(None),
- limit: int = Query(100, ge=1, le=1000),
- offset: int = Query(0, ge=0),
- ):
- """列出需求"""
- try:
- results = pg_requirement_store.list_all(limit=limit, offset=offset, status=status)
- total = pg_requirement_store.count(status=status)
- return {"results": results, "total": total}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/requirement/search")
- async def search_requirements(
- q: str = Query(..., description="查询文本"),
- top_k: int = Query(5, ge=1, le=100),
- ):
- """向量检索需求"""
- try:
- query_embedding = await get_embedding(q)
- results = pg_requirement_store.search(query_embedding, limit=top_k)
- return {"results": results, "count": len(results)}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/requirement/{req_id}")
- def get_requirement(req_id: str):
- """获取单个需求"""
- try:
- result = pg_requirement_store.get_by_id(req_id)
- if not result:
- raise HTTPException(status_code=404, detail=f"Requirement not found: {req_id}")
- return result
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.patch("/api/requirement/{req_id}")
- async def patch_requirement(req_id: str, patch: RequirementPatchIn):
- """更新需求字段"""
- try:
- existing = pg_requirement_store.get_by_id(req_id)
- if not existing:
- raise HTTPException(status_code=404, detail=f"Requirement not found: {req_id}")
- updates = {}
- need_reembed = False
- for field in ('description', 'atomics', 'source_nodes', 'status', 'match_result'):
- value = getattr(patch, field)
- if value is not None:
- updates[field] = value
- if field == 'description':
- need_reembed = True
- if not updates:
- return {"status": "ok", "id": req_id}
- if need_reembed:
- updates['embedding'] = await get_embedding(updates['description'])
- pg_requirement_store.update(req_id, updates)
- return {"status": "ok", "id": req_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/requirement/{req_id}")
- def delete_requirement(req_id: str):
- """删除需求"""
- try:
- if not pg_requirement_store.get_by_id(req_id):
- raise HTTPException(status_code=404, detail=f"Requirement not found: {req_id}")
- pg_requirement_store.delete(req_id)
- return {"status": "ok", "id": req_id}
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/")
- def frontend():
- """KnowHub 管理前端"""
- index_file = STATIC_DIR / "index.html"
- if not index_file.exists():
- return HTMLResponse("<h1>KnowHub Frontend Not Found</h1><p>Please ensure knowhub/frontend/dist/index.html exists. Run 'yarn build' in frontend directory.</p>", status_code=404)
- return FileResponse(str(index_file))
- @app.get("/category_tree.json")
- def serve_category_tree():
- """类别树JSON数据"""
- tree_file = STATIC_DIR / "category_tree.json"
- if not tree_file.exists():
- return {"error": "Not Found"}
- return FileResponse(str(tree_file))
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=9999)
|