| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- """
- Knowledge Manager 工具 - 通过 HTTP API 与 KnowHub 交互
- 提供两个工具:
- - ask_knowledge: 查询知识库(同步阻塞,等待 Librarian Agent 整合回答)
- - upload_knowledge: 上传调研结果(异步,校验后立即返回)
- 通过 KnowHub HTTP API 调用,不依赖 IM。
- """
- import os
- import json
- import logging
- from typing import Optional, Dict, Any
- import httpx
- from agent.tools import tool, ToolResult, ToolContext
- logger = logging.getLogger(__name__)
- KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:8000").rstrip("/")
- @tool(
- hidden_params=["context"],
- inject_params={
- "trace_id": {"mode": "default", "key": "trace_id"},
- }
- )
- async def ask_knowledge(
- query: str,
- trace_id: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 向知识库查询信息(同步阻塞,等待整合回答)
- KnowHub 内部使用 Librarian Agent 整合检索结果,返回带引用的回答。
- 同一 trace_id 的多次查询复用同一个 Librarian Agent,积累任务理解。
- Args:
- query: 查询内容(如:"ControlNet 相关的工具知识")
- trace_id: 调用方的 trace_id,用于 Librarian Agent 续跑
- context: 工具上下文
- Returns:
- 整合回答 + source_ids + 各 source 摘要
- """
- try:
- async with httpx.AsyncClient(timeout=60.0) as client:
- response = await client.post(
- f"{KNOWHUB_API}/api/knowledge/ask",
- json={
- "query": query,
- "trace_id": trace_id,
- }
- )
- response.raise_for_status()
- result = response.json()
- source_ids = result.get("source_ids", [])
- sources = result.get("sources", [])
- resp_text = result.get("response", "")
- return ToolResult(
- title=f"📚 知识库查询结果({len(source_ids)} 条来源)",
- output=resp_text,
- metadata={
- "source_ids": source_ids,
- "sources": sources,
- }
- )
- except httpx.HTTPStatusError as e:
- # ask 端点不可用时降级到直接搜索
- if e.response.status_code == 404:
- logger.warning("ask 端点不可用,降级到 knowledge_search")
- from agent.tools.builtin.knowledge import knowledge_search
- fallback = await knowledge_search(query=query, top_k=5, min_score=3)
- return ToolResult(
- title="📚 知识库查询结果(直连)",
- output=fallback.output,
- metadata={"source": "fallback", "raw": fallback.metadata}
- )
- raise
- except Exception as e:
- logger.error(f"查询知识库失败: {e}")
- # 网络错误也降级
- logger.warning("ask 请求失败,降级到 knowledge_search")
- try:
- from agent.tools.builtin.knowledge import knowledge_search
- fallback = await knowledge_search(query=query, top_k=5, min_score=3)
- return ToolResult(
- title="📚 知识库查询结果(直连)",
- output=fallback.output,
- metadata={"source": "fallback", "raw": fallback.metadata}
- )
- except Exception as e2:
- return ToolResult(
- title="❌ 查询失败",
- output=f"错误: {str(e)}(降级也失败: {str(e2)})",
- error=str(e)
- )
- @tool(
- hidden_params=["context"],
- inject_params={
- "trace_id": {"mode": "default", "key": "trace_id"},
- }
- )
- async def upload_knowledge(
- data: Dict[str, Any],
- source_type: str = "research",
- finalize: bool = False,
- trace_id: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 上传调研结果或执行经验到知识库(异步,校验后立即返回)
- KnowHub 校验格式后立即返回,后台队列处理去重和入库。
- Args:
- data: 结构化数据,包含:
- - tools: 工具列表
- - resources: 资源列表
- - knowledge: 知识列表
- source_type: 数据来源分类。调研结果填 "research",执行经验填 "execution"。
- finalize: 是否最终提交(True=入库,False=仅缓冲)
- trace_id: 调用方的 trace_id
- context: 工具上下文
- Returns:
- 上传确认(立即返回,不等待处理完成)
- """
- try:
- # 标记来源类型
- if "knowledge" in data and isinstance(data["knowledge"], list):
- for k in data["knowledge"]:
- if "source" not in k:
- k["source"] = {}
- if "category" not in k["source"]:
- k["source"]["category"] = source_type
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(
- f"{KNOWHUB_API}/api/knowledge/upload",
- json={
- "data": data,
- "trace_id": trace_id,
- "finalize": finalize,
- }
- )
- response.raise_for_status()
- summary = []
- if data.get("tools"):
- summary.append(f"工具: {len(data['tools'])} 个")
- if data.get("resources"):
- summary.append(f"资源: {len(data['resources'])} 个")
- if data.get("knowledge"):
- summary.append(f"知识: {len(data['knowledge'])} 个")
- action = "最终提交" if finalize else f"增量上传({source_type})"
- return ToolResult(
- title=f"✅ {action}成功",
- output=f"已提交到 KnowHub\n\n" + "\n".join(f"- {s}" for s in summary),
- long_term_memory=f"{action}: {', '.join(summary)}",
- metadata={"finalize": finalize}
- )
- except Exception as e:
- logger.error(f"上传知识失败: {e}")
- return ToolResult(
- title="❌ 上传失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
|