knowledge_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. """
  2. Knowledge Manager Agent
  3. 基于 AgentRunner 驱动,有完整的 trace 记录。
  4. 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。
  5. 架构:
  6. IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录
  7. """
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from pathlib import Path
  13. from typing import Optional
  14. # 确保项目路径可用
  15. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.trace import FileSystemTraceStore, Trace, Message
  18. from agent.llm import create_qwen_llm_call
  19. from agent.llm.prompts import SimplePrompt
  20. from agent.tools.builtin.knowledge import KnowledgeConfig
  21. logger = logging.getLogger("agents.knowledge_manager")
  22. # ===== 配置项 =====
  23. ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库)
  24. # Knowledge Manager Agent 配置
  25. def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
  26. """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)"""
  27. tools = [
  28. # 只读查询工具(用于跨表检索和关联分析)
  29. "knowledge_search",
  30. "knowledge_list",
  31. "tool_search",
  32. "tool_list",
  33. "capability_search",
  34. "capability_list",
  35. "requirement_search",
  36. "requirement_list",
  37. # 文件工具(用于维护 pre_upload_list.json 草稿)
  38. "read_file",
  39. "write_file",
  40. # 本地缓存工具
  41. "list_cache_status",
  42. ]
  43. # 只有启用入库时才开放 commit_to_database 工具
  44. if enable_db_commit:
  45. tools.append("commit_to_database")
  46. return RunConfig(
  47. model="qwen3.5-plus",
  48. temperature=0.2,
  49. max_iterations=50,
  50. agent_type="knowledge_manager",
  51. name="知识库管理",
  52. goal_compression="none",
  53. # 禁用所有知识提取和反思
  54. knowledge=KnowledgeConfig(
  55. enable_extraction=False,
  56. enable_completion_extraction=False,
  57. enable_injection=False,
  58. ),
  59. tools=tools,
  60. )
  61. async def start_knowledge_manager(
  62. contact_id: str = "knowledge_manager",
  63. server_url: str = "ws://43.106.118.91:8105",
  64. chat_id: str = "main",
  65. enable_db_commit: bool = ENABLE_DATABASE_COMMIT
  66. ):
  67. """
  68. 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动)
  69. 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。
  70. LLM 自主决策调用什么工具,所有操作记录到 trace。
  71. Args:
  72. contact_id: IM 身份 ID
  73. server_url: IM Server 地址
  74. chat_id: 聊天窗口 ID
  75. enable_db_commit: 是否允许入库(False=只缓存,True=可入库)
  76. """
  77. logger.info(f"正在启动 Knowledge Manager...")
  78. logger.info(f" - Contact ID: {contact_id}")
  79. logger.info(f" - Server: {server_url}")
  80. logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}")
  81. # 注册内部工具(缓存管理)
  82. try:
  83. sys.path.insert(0, str(Path(__file__).parent.parent))
  84. from internal_tools.cache_manager import (
  85. cache_research_data,
  86. organize_cached_data,
  87. commit_to_database,
  88. list_cache_status,
  89. )
  90. from agent.tools import get_tool_registry
  91. registry = get_tool_registry()
  92. registry.register(cache_research_data)
  93. registry.register(organize_cached_data)
  94. registry.register(commit_to_database)
  95. registry.register(list_cache_status)
  96. logger.info(" ✓ 已注册缓存管理工具")
  97. except Exception as e:
  98. logger.error(f" ✗ 注册缓存工具失败: {e}")
  99. # 导入 IM Client
  100. try:
  101. sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
  102. from client import IMClient
  103. except ImportError as e:
  104. logger.error(f"无法导入 IM Client: {e}")
  105. return
  106. # --- 初始化 AgentRunner ---
  107. km_config = get_knowledge_manager_config(enable_db_commit)
  108. store = FileSystemTraceStore(base_path=".trace")
  109. llm_call = create_qwen_llm_call(model=km_config.model)
  110. runner = AgentRunner(
  111. trace_store=store,
  112. llm_call=llm_call,
  113. debug=True,
  114. logger_name="agents.knowledge_manager"
  115. )
  116. # 加载 system prompt
  117. prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
  118. prompt = SimplePrompt(prompt_path)
  119. system_messages = prompt.build_messages()
  120. # Trace 状态(同一个 trace 持续追加,保持上下文)
  121. current_trace_id = None
  122. message_queue = asyncio.Queue() # 消息队列,防止丢消息
  123. upload_buffer = [] # upload 消息缓冲区(用于批处理)
  124. upload_timer = None # 延迟处理定时器
  125. # --- 初始化 IM Client ---
  126. client = IMClient(
  127. contact_id=contact_id,
  128. server_url=server_url,
  129. data_dir=str(Path.home() / ".knowhub" / "im_data")
  130. )
  131. # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知)
  132. class _SilentNotifier:
  133. async def notify(self, count, from_contacts):
  134. pass
  135. # 打开窗口
  136. client.open_window(chat_id=chat_id, notifier=_SilentNotifier())
  137. # --- 消息处理器(从队列中取消息,逐条处理)---
  138. async def message_processor():
  139. nonlocal current_trace_id
  140. while True:
  141. msg = await message_queue.get() # 阻塞等待,零消耗
  142. sender = msg.get("sender")
  143. content = msg.get("content", "")
  144. # 处理完消息后清空 pending,防止 notifier 反复通知
  145. client.read_pending(chat_id)
  146. logger.info(f"[KM] <- 处理消息: {sender}")
  147. logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
  148. try:
  149. # 续跑同一个 trace(首次为 None 会新建,后续复用)
  150. if current_trace_id is None:
  151. messages = system_messages + [{"role": "user", "content": content}]
  152. else:
  153. messages = [{"role": "user", "content": content}]
  154. # 获取配置
  155. km_config = get_knowledge_manager_config(enable_db_commit)
  156. config = RunConfig(
  157. model=km_config.model,
  158. temperature=km_config.temperature,
  159. max_iterations=km_config.max_iterations,
  160. agent_type=km_config.agent_type,
  161. name=km_config.name,
  162. goal_compression=km_config.goal_compression,
  163. tools=km_config.tools,
  164. knowledge=km_config.knowledge,
  165. trace_id=current_trace_id, # 复用 trace,保持完整生命周期
  166. context={
  167. "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数
  168. "current_sender": sender, # 当前处理的消息来源
  169. }
  170. )
  171. # 执行 AgentRunner
  172. response_text = ""
  173. async for item in runner.run(messages=messages, config=config):
  174. if isinstance(item, Trace):
  175. current_trace_id = item.trace_id
  176. if item.status == "running":
  177. logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
  178. elif item.status == "completed":
  179. logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})")
  180. elif item.status == "failed":
  181. logger.error(f"[KM] Trace 失败: {item.error_message}")
  182. elif isinstance(item, Message):
  183. if item.role == "assistant":
  184. msg_content = item.content
  185. if isinstance(msg_content, dict):
  186. text = msg_content.get("text", "")
  187. tool_calls = msg_content.get("tool_calls")
  188. if text:
  189. # 始终记录最新的文本(最后一条就是最终回复)
  190. response_text = text
  191. if tool_calls:
  192. logger.info(f"[KM] 思考: {text[:80]}...")
  193. elif isinstance(msg_content, str) and msg_content:
  194. response_text = msg_content
  195. elif item.role == "tool":
  196. tool_content = item.content
  197. tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
  198. logger.info(f"[KM] 工具: {tool_name}")
  199. # 回复
  200. if response_text:
  201. client.send_message(
  202. chat_id=chat_id,
  203. receiver=sender,
  204. content=response_text
  205. )
  206. logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)")
  207. else:
  208. logger.warning(f"[KM] AgentRunner 没有生成回复")
  209. except Exception as e:
  210. logger.error(f"[KM] 处理失败: {e}", exc_info=True)
  211. message_queue.task_done()
  212. # --- 批处理 upload 消息 ---
  213. async def process_upload_batch():
  214. """批处理 upload 消息(延迟 5 秒,合并多个 upload)"""
  215. nonlocal upload_buffer, upload_timer
  216. await asyncio.sleep(5) # 等待 5 秒,收集更多 upload
  217. if not upload_buffer:
  218. upload_timer = None
  219. return
  220. # 合并所有 upload 消息
  221. batch = upload_buffer.copy()
  222. upload_buffer.clear()
  223. upload_timer = None
  224. logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息")
  225. # 合并成一条消息入队
  226. merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n"
  227. for i, msg in enumerate(batch, 1):
  228. merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n"
  229. await message_queue.put({
  230. "sender": batch[0]["sender"],
  231. "content": merged_content
  232. })
  233. # --- 消息回调(所有消息统一经过 AgentRunner 处理)---
  234. async def on_message(msg: dict):
  235. nonlocal upload_timer
  236. sender = msg.get("sender")
  237. content = msg.get("content", "")
  238. # 忽略自己发的消息
  239. if sender == contact_id:
  240. return
  241. # 判断消息类型(根据前缀标记)
  242. if content.startswith("[ASK]"):
  243. # ASK 查询:入队,由 AgentRunner 驱动(带 Trace、可跨表推理)
  244. logger.info(f"[KM] <- 收到查询消息: {sender} (入队)")
  245. await message_queue.put(msg)
  246. elif content.startswith("[UPLOAD"):
  247. # 批处理:加入缓冲区,延迟处理
  248. logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)")
  249. upload_buffer.append(msg)
  250. # 启动或重置定时器
  251. if upload_timer:
  252. upload_timer.cancel()
  253. upload_timer = asyncio.create_task(process_upload_batch())
  254. else:
  255. # 其他消息:正常入队
  256. logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})")
  257. await message_queue.put(msg)
  258. client.on_message(on_message, chat_id="*")
  259. # 启动消息处理器(后台任务)
  260. processor_task = asyncio.create_task(message_processor())
  261. # 启动 IM Client(事件驱动)
  262. logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)")
  263. try:
  264. await client.run()
  265. finally:
  266. processor_task.cancel()
  267. try:
  268. await processor_task
  269. except asyncio.CancelledError:
  270. pass