librarian.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """
  2. Knowledge Manager 工具 - 通过 HTTP API 与 KnowHub 交互
  3. 提供两个工具:
  4. - ask_knowledge: 查询知识库(同步阻塞,等待 Librarian Agent 整合回答)
  5. - upload_knowledge: 上传调研结果(异步,校验后立即返回)
  6. 通过 KnowHub HTTP API 调用,不依赖 IM。
  7. """
  8. import os
  9. import json
  10. import logging
  11. from typing import Optional, Dict, Any
  12. import httpx
  13. from agent.tools import tool, ToolResult, ToolContext
  14. logger = logging.getLogger(__name__)
  15. KNOWHUB_API = os.getenv("KNOWHUB_API", "http://localhost:8000").rstrip("/")
  16. @tool(
  17. hidden_params=["context"],
  18. inject_params={
  19. "trace_id": {"mode": "default", "key": "trace_id"},
  20. }
  21. )
  22. async def ask_knowledge(
  23. query: str,
  24. trace_id: str = "",
  25. context: Optional[ToolContext] = None,
  26. ) -> ToolResult:
  27. """
  28. 向知识库查询信息(同步阻塞,等待整合回答)
  29. KnowHub 内部使用 Librarian Agent 整合检索结果,返回带引用的回答。
  30. 同一 trace_id 的多次查询复用同一个 Librarian Agent,积累任务理解。
  31. Args:
  32. query: 查询内容(如:"ControlNet 相关的工具知识")
  33. trace_id: 调用方的 trace_id,用于 Librarian Agent 续跑
  34. context: 工具上下文
  35. Returns:
  36. 整合回答 + source_ids + 各 source 摘要
  37. """
  38. try:
  39. async with httpx.AsyncClient(timeout=60.0) as client:
  40. response = await client.post(
  41. f"{KNOWHUB_API}/api/knowledge/ask",
  42. json={
  43. "query": query,
  44. "trace_id": trace_id,
  45. }
  46. )
  47. response.raise_for_status()
  48. result = response.json()
  49. source_ids = result.get("source_ids", [])
  50. sources = result.get("sources", [])
  51. resp_text = result.get("response", "")
  52. return ToolResult(
  53. title=f"📚 知识库查询结果({len(source_ids)} 条来源)",
  54. output=resp_text,
  55. metadata={
  56. "source_ids": source_ids,
  57. "sources": sources,
  58. }
  59. )
  60. except httpx.HTTPStatusError as e:
  61. # ask 端点不可用时降级到直接搜索
  62. if e.response.status_code == 404:
  63. logger.warning("ask 端点不可用,降级到 knowledge_search")
  64. from agent.tools.builtin.knowledge import knowledge_search
  65. fallback = await knowledge_search(query=query, top_k=5, min_score=3)
  66. return ToolResult(
  67. title="📚 知识库查询结果(直连)",
  68. output=fallback.output,
  69. metadata={"source": "fallback", "raw": fallback.metadata}
  70. )
  71. raise
  72. except Exception as e:
  73. logger.error(f"查询知识库失败: {e}")
  74. # 网络错误也降级
  75. logger.warning("ask 请求失败,降级到 knowledge_search")
  76. try:
  77. from agent.tools.builtin.knowledge import knowledge_search
  78. fallback = await knowledge_search(query=query, top_k=5, min_score=3)
  79. return ToolResult(
  80. title="📚 知识库查询结果(直连)",
  81. output=fallback.output,
  82. metadata={"source": "fallback", "raw": fallback.metadata}
  83. )
  84. except Exception as e2:
  85. return ToolResult(
  86. title="❌ 查询失败",
  87. output=f"错误: {str(e)}(降级也失败: {str(e2)})",
  88. error=str(e)
  89. )
  90. @tool(
  91. hidden_params=["context"],
  92. inject_params={
  93. "trace_id": {"mode": "default", "key": "trace_id"},
  94. }
  95. )
  96. async def upload_knowledge(
  97. data: Dict[str, Any],
  98. source_type: str = "research",
  99. finalize: bool = False,
  100. trace_id: str = "",
  101. context: Optional[ToolContext] = None,
  102. ) -> ToolResult:
  103. """
  104. 上传调研结果或执行经验到知识库(异步,校验后立即返回)
  105. KnowHub 校验格式后立即返回,后台队列处理去重和入库。
  106. Args:
  107. data: 结构化数据,包含:
  108. - tools: 工具列表
  109. - resources: 资源列表
  110. - knowledge: 知识列表
  111. source_type: 数据来源分类。调研结果填 "research",执行经验填 "execution"。
  112. finalize: 是否最终提交(True=入库,False=仅缓冲)
  113. trace_id: 调用方的 trace_id
  114. context: 工具上下文
  115. Returns:
  116. 上传确认(立即返回,不等待处理完成)
  117. """
  118. try:
  119. # 标记来源类型
  120. if "knowledge" in data and isinstance(data["knowledge"], list):
  121. for k in data["knowledge"]:
  122. if "source" not in k:
  123. k["source"] = {}
  124. if "category" not in k["source"]:
  125. k["source"]["category"] = source_type
  126. async with httpx.AsyncClient(timeout=30.0) as client:
  127. response = await client.post(
  128. f"{KNOWHUB_API}/api/knowledge/upload",
  129. json={
  130. "data": data,
  131. "trace_id": trace_id,
  132. "finalize": finalize,
  133. }
  134. )
  135. response.raise_for_status()
  136. summary = []
  137. if data.get("tools"):
  138. summary.append(f"工具: {len(data['tools'])} 个")
  139. if data.get("resources"):
  140. summary.append(f"资源: {len(data['resources'])} 个")
  141. if data.get("knowledge"):
  142. summary.append(f"知识: {len(data['knowledge'])} 个")
  143. action = "最终提交" if finalize else f"增量上传({source_type})"
  144. return ToolResult(
  145. title=f"✅ {action}成功",
  146. output=f"已提交到 KnowHub\n\n" + "\n".join(f"- {s}" for s in summary),
  147. long_term_memory=f"{action}: {', '.join(summary)}",
  148. metadata={"finalize": finalize}
  149. )
  150. except Exception as e:
  151. logger.error(f"上传知识失败: {e}")
  152. return ToolResult(
  153. title="❌ 上传失败",
  154. output=f"错误: {str(e)}",
  155. error=str(e)
  156. )