knowledge_manager.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """
  2. Knowledge Manager 工具 - 通过 IM 与知识库管理 Agent 交互
  3. 提供两个工具:
  4. - ask_knowledge: 查询知识库(同步,等待回复)
  5. - upload_knowledge: 上传调研结果(异步,立即返回)
  6. 依赖:IM Client 已初始化(im_setup + im_open_window)
  7. """
  8. import asyncio
  9. import json
  10. import logging
  11. from typing import Optional, Dict, Any, List
  12. from agent.tools import tool, ToolResult, ToolContext
  13. logger = logging.getLogger(__name__)
  14. # IM 工具的全局引用(延迟导入)
  15. _im_chat = None
  16. def _get_im_chat():
  17. global _im_chat
  18. if _im_chat is None:
  19. from agent.tools.builtin.im import chat as im_chat_module
  20. _im_chat = im_chat_module
  21. return _im_chat
  22. def _get_client_and_chat_id(contact_id: str):
  23. """获取 IM Client 实例和当前窗口的 chat_id"""
  24. im = _get_im_chat()
  25. client = im._clients.get(contact_id)
  26. if client is None:
  27. return None, None
  28. # 取第一个打开的窗口
  29. windows = client.list_windows()
  30. chat_id = windows[0] if windows else None
  31. return client, chat_id
  32. def _clear_notifications(contact_id: str, chat_id: str):
  33. """清空 IM 通知计数"""
  34. im = _get_im_chat()
  35. im._notifications.pop((contact_id, chat_id), None)
  36. @tool(
  37. hidden_params=["context"],
  38. inject_params={
  39. "contact_id": {"mode": "default", "key": "im_contact_id"},
  40. }
  41. )
  42. async def ask_knowledge(
  43. query: str,
  44. contact_id: str = "agent_research",
  45. context: Optional[ToolContext] = None,
  46. ) -> ToolResult:
  47. """
  48. 向 Knowledge Manager 查询知识库信息(同步,等待回复)
  49. Args:
  50. query: 查询内容(如:"查询 ControlNet 相关信息")
  51. contact_id: 当前 Agent 的 IM contact_id
  52. context: 工具上下文
  53. Returns:
  54. 查询结果(已有工具、资源、知识及建议)
  55. """
  56. try:
  57. client, chat_id = _get_client_and_chat_id(contact_id)
  58. if client is None or chat_id is None:
  59. return ToolResult(
  60. title="❌ 查询失败",
  61. output="IM Client 未初始化,请先调用 im_setup",
  62. error="im_not_initialized"
  63. )
  64. # 发送查询(带类型标记)
  65. message = f"[ASK] {query}"
  66. client.send_message(
  67. chat_id=chat_id,
  68. receiver="knowledge_manager",
  69. content=message
  70. )
  71. # 等待回复(最多 30 秒)
  72. for _ in range(30):
  73. await asyncio.sleep(1)
  74. pending = client.read_pending(chat_id)
  75. for msg in pending:
  76. if msg.get("sender") == "knowledge_manager":
  77. content = msg.get("content", "")
  78. # 清空 IM 通知计数,防止 notifier 反复提醒
  79. _clear_notifications(contact_id, chat_id)
  80. return ToolResult(
  81. title="📚 知识库查询结果",
  82. output=content,
  83. metadata={"source": "knowledge_manager"}
  84. )
  85. # 超时保底:直接调用 knowledge_search 返回原始结果
  86. logger.warning("Knowledge Manager 超时,fallback 到 knowledge_search")
  87. from agent.tools.builtin.knowledge import knowledge_search
  88. fallback_result = await knowledge_search(query=query, top_k=5, min_score=3)
  89. return ToolResult(
  90. title="📚 知识库查询结果(直连)",
  91. output=fallback_result.output,
  92. metadata={"source": "fallback", "raw": fallback_result.metadata}
  93. )
  94. except Exception as e:
  95. logger.error(f"查询知识库失败: {e}")
  96. return ToolResult(
  97. title="❌ 查询失败",
  98. output=f"错误: {str(e)}",
  99. error=str(e)
  100. )
  101. @tool(
  102. hidden_params=["context"],
  103. inject_params={
  104. "contact_id": {"mode": "default", "key": "im_contact_id"},
  105. }
  106. )
  107. async def upload_knowledge(
  108. data: Dict[str, Any],
  109. finalize: bool = False,
  110. contact_id: str = "agent_research",
  111. context: Optional[ToolContext] = None,
  112. ) -> ToolResult:
  113. """
  114. 上传调研结果到知识库(异步,立即返回)
  115. Args:
  116. data: 调研结果,包含:
  117. - tools: 工具列表
  118. - resources: 资源列表
  119. - knowledge: 知识列表
  120. finalize: 是否最终提交(True=入库,False=仅缓冲)
  121. contact_id: 当前 Agent 的 IM contact_id
  122. context: 工具上下文
  123. Returns:
  124. 上传确认(立即返回,不等待处理完成)
  125. """
  126. try:
  127. client, chat_id = _get_client_and_chat_id(contact_id)
  128. if client is None or chat_id is None:
  129. return ToolResult(
  130. title="❌ 上传失败",
  131. output="IM Client 未初始化,请先调用 im_setup",
  132. error="im_not_initialized"
  133. )
  134. # 构造消息(带类型标记)
  135. if finalize:
  136. action = "最终提交"
  137. message = f"[UPLOAD:FINALIZE] {json.dumps(data, ensure_ascii=False)}"
  138. else:
  139. action = "增量上传"
  140. message = f"[UPLOAD] {json.dumps(data, ensure_ascii=False)}"
  141. # 发送(不等待回复)
  142. client.send_message(
  143. chat_id=chat_id,
  144. receiver="knowledge_manager",
  145. content=message
  146. )
  147. # 等待一小段时间让 KM 回复,然后清空 pending(避免 notifier 反复通知)
  148. await asyncio.sleep(0.5)
  149. client.read_pending(chat_id) # 清空回复,不处理内容
  150. summary = []
  151. if data.get("tools"):
  152. summary.append(f"工具: {len(data['tools'])} 个")
  153. if data.get("resources"):
  154. summary.append(f"资源: {len(data['resources'])} 个")
  155. if data.get("knowledge"):
  156. summary.append(f"知识: {len(data['knowledge'])} 个")
  157. return ToolResult(
  158. title=f"✅ {action}成功",
  159. output=f"已发送到 Knowledge Manager\n\n" + "\n".join(f"- {s}" for s in summary),
  160. long_term_memory=f"{action}: {', '.join(summary)}",
  161. metadata={"finalize": finalize}
  162. )
  163. except Exception as e:
  164. logger.error(f"上传知识失败: {e}")
  165. return ToolResult(
  166. title="❌ 上传失败",
  167. output=f"错误: {str(e)}",
  168. error=str(e)
  169. )