knowledge_manager.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """
  2. Knowledge Manager Agent
  3. 基于 AgentRunner 驱动,有完整的 trace 记录。
  4. 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。
  5. 架构:
  6. IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录
  7. """
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from datetime import datetime
  13. from pathlib import Path
  14. from typing import Optional
  15. # 确保项目路径可用
  16. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  17. from agent.core.runner import AgentRunner, RunConfig
  18. from agent.trace import FileSystemTraceStore, Trace, Message
  19. from agent.llm import create_qwen_llm_call
  20. from agent.llm.prompts import SimplePrompt
  21. from agent.tools.builtin.knowledge import KnowledgeConfig
  22. logger = logging.getLogger("agents.knowledge_manager")
  23. # ===== 配置项 =====
  24. ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库)
  25. # Knowledge Manager Agent 配置
  26. def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
  27. """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)"""
  28. tools = [
  29. # 只读查询工具(用于跨表检索和关联分析)
  30. "knowledge_search",
  31. "knowledge_list",
  32. "tool_search",
  33. "tool_list",
  34. "capability_search",
  35. "capability_list",
  36. "requirement_search",
  37. "requirement_list",
  38. # 文件工具(用于维护 pre_upload_list.json 草稿)
  39. "read_file",
  40. "write_file",
  41. # 本地缓存工具
  42. "list_cache_status",
  43. # 树节点匹配工具(需求 → 内容树挂载)
  44. "match_tree_nodes",
  45. ]
  46. # 只有启用入库时才开放 commit_to_database 工具
  47. if enable_db_commit:
  48. tools.append("commit_to_database")
  49. return RunConfig(
  50. model="qwen3.5-plus",
  51. temperature=0.2,
  52. max_iterations=50,
  53. agent_type="knowledge_manager",
  54. name="知识库管理",
  55. goal_compression="none",
  56. # 禁用所有知识提取和反思
  57. knowledge=KnowledgeConfig(
  58. enable_extraction=False,
  59. enable_completion_extraction=False,
  60. enable_injection=False,
  61. ),
  62. tools=tools,
  63. )
  64. async def start_knowledge_manager(
  65. contact_id: str = "knowledge_manager",
  66. server_url: str = "ws://43.106.118.91:8105",
  67. chat_id: str = "main",
  68. enable_db_commit: bool = ENABLE_DATABASE_COMMIT
  69. ):
  70. """
  71. 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动)
  72. 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。
  73. LLM 自主决策调用什么工具,所有操作记录到 trace。
  74. Args:
  75. contact_id: IM 身份 ID
  76. server_url: IM Server 地址
  77. chat_id: 聊天窗口 ID
  78. enable_db_commit: 是否允许入库(False=只缓存,True=可入库)
  79. """
  80. logger.info(f"正在启动 Knowledge Manager...")
  81. logger.info(f" - Contact ID: {contact_id}")
  82. logger.info(f" - Server: {server_url}")
  83. logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}")
  84. # 注册内部工具(缓存管理 + 树匹配)
  85. try:
  86. sys.path.insert(0, str(Path(__file__).parent.parent))
  87. from internal_tools.cache_manager import (
  88. cache_research_data,
  89. organize_cached_data,
  90. commit_to_database,
  91. list_cache_status,
  92. )
  93. from internal_tools.tree_matcher import match_tree_nodes
  94. from agent.tools import get_tool_registry
  95. registry = get_tool_registry()
  96. registry.register(cache_research_data)
  97. registry.register(organize_cached_data)
  98. registry.register(commit_to_database)
  99. registry.register(list_cache_status)
  100. registry.register(match_tree_nodes)
  101. logger.info(" ✓ 已注册缓存管理工具 + 树节点匹配工具")
  102. except Exception as e:
  103. logger.error(f" ✗ 注册内部工具失败: {e}")
  104. # 导入 IM Client
  105. try:
  106. sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
  107. from client import IMClient
  108. except ImportError as e:
  109. logger.error(f"无法导入 IM Client: {e}")
  110. return
  111. # --- 初始化 AgentRunner ---
  112. km_config = get_knowledge_manager_config(enable_db_commit)
  113. store = FileSystemTraceStore(base_path=".trace")
  114. llm_call = create_qwen_llm_call(model=km_config.model)
  115. runner = AgentRunner(
  116. trace_store=store,
  117. llm_call=llm_call,
  118. debug=True,
  119. logger_name="agents.knowledge_manager"
  120. )
  121. # 加载 system prompt
  122. prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
  123. prompt = SimplePrompt(prompt_path)
  124. system_messages = prompt.build_messages()
  125. # Trace 状态(同一个 trace 持续追加,保持上下文)
  126. current_trace_id = None
  127. message_queue = asyncio.Queue() # 消息队列,防止丢消息
  128. upload_buffer = [] # upload 消息缓冲区(用于批处理)
  129. upload_timer = None # 延迟处理定时器
  130. # --- 初始化 IM Client ---
  131. client = IMClient(
  132. contact_id=contact_id,
  133. server_url=server_url,
  134. data_dir=str(Path.home() / ".knowhub" / "im_data")
  135. )
  136. # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知)
  137. class _SilentNotifier:
  138. async def notify(self, count, from_contacts):
  139. pass
  140. # 打开窗口
  141. client.open_window(chat_id=chat_id, notifier=_SilentNotifier())
  142. # --- 消息处理器(从队列中取消息,逐条处理)---
  143. async def message_processor():
  144. nonlocal current_trace_id
  145. while True:
  146. msg = await message_queue.get() # 阻塞等待,零消耗
  147. sender = msg.get("sender")
  148. content = msg.get("content", "")
  149. # 处理完消息后清空 pending,防止 notifier 反复通知
  150. client.read_pending(chat_id)
  151. logger.info(f"[KM] <- 处理消息: {sender}")
  152. logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
  153. try:
  154. # 续跑同一个 trace(首次为 None 会新建,后续复用)
  155. if current_trace_id is None:
  156. messages = system_messages + [{"role": "user", "content": content}]
  157. else:
  158. messages = [{"role": "user", "content": content}]
  159. # 获取配置
  160. km_config = get_knowledge_manager_config(enable_db_commit)
  161. config = RunConfig(
  162. model=km_config.model,
  163. temperature=km_config.temperature,
  164. max_iterations=km_config.max_iterations,
  165. agent_type=km_config.agent_type,
  166. name=km_config.name,
  167. goal_compression=km_config.goal_compression,
  168. tools=km_config.tools,
  169. knowledge=km_config.knowledge,
  170. trace_id=current_trace_id, # 复用 trace,保持完整生命周期
  171. context={
  172. "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数
  173. "current_sender": sender, # 当前处理的消息来源
  174. }
  175. )
  176. # 执行 AgentRunner
  177. response_text = ""
  178. async for item in runner.run(messages=messages, config=config):
  179. if isinstance(item, Trace):
  180. current_trace_id = item.trace_id
  181. if item.status == "running":
  182. logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
  183. elif item.status == "completed":
  184. logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})")
  185. elif item.status == "failed":
  186. logger.error(f"[KM] Trace 失败: {item.error_message}")
  187. elif isinstance(item, Message):
  188. if item.role == "assistant":
  189. msg_content = item.content
  190. if isinstance(msg_content, dict):
  191. text = msg_content.get("text", "")
  192. tool_calls = msg_content.get("tool_calls")
  193. if text:
  194. # 始终记录最新的文本(最后一条就是最终回复)
  195. response_text = text
  196. if tool_calls:
  197. logger.info(f"[KM] 思考: {text[:80]}...")
  198. elif isinstance(msg_content, str) and msg_content:
  199. response_text = msg_content
  200. elif item.role == "tool":
  201. tool_content = item.content
  202. tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
  203. logger.info(f"[KM] 工具: {tool_name}")
  204. # 回复
  205. if response_text:
  206. client.send_message(
  207. chat_id=chat_id,
  208. receiver=sender,
  209. content=response_text
  210. )
  211. logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)")
  212. else:
  213. logger.warning(f"[KM] AgentRunner 没有生成回复")
  214. except Exception as e:
  215. logger.error(f"[KM] 处理失败: {e}", exc_info=True)
  216. message_queue.task_done()
  217. # --- 批处理 upload 消息 ---
  218. async def process_upload_batch():
  219. """批处理 upload 消息(延迟 5 秒,合并多个 upload)"""
  220. nonlocal upload_buffer, upload_timer
  221. await asyncio.sleep(5) # 等待 5 秒,收集更多 upload
  222. if not upload_buffer:
  223. upload_timer = None
  224. return
  225. # 合并所有 upload 消息
  226. batch = upload_buffer.copy()
  227. upload_buffer.clear()
  228. upload_timer = None
  229. logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息")
  230. # 保存原始消息到 buffer 目录(便于回溯和重跑)
  231. try:
  232. buffer_dir = Path(".cache/.knowledge/buffer")
  233. buffer_dir.mkdir(parents=True, exist_ok=True)
  234. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  235. buffer_file = buffer_dir / f"upload_{timestamp}.json"
  236. buffer_data = [{"sender": m.get("sender"), "content": m.get("content", "")} for m in batch]
  237. buffer_file.write_text(json.dumps(buffer_data, ensure_ascii=False, indent=2), encoding="utf-8")
  238. logger.info(f"[KM] 原始上传数据已保存: {buffer_file}")
  239. except Exception as e:
  240. logger.warning(f"[KM] 保存原始上传数据失败: {e}")
  241. # 合并成一条消息入队
  242. merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n"
  243. for i, msg in enumerate(batch, 1):
  244. merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n"
  245. await message_queue.put({
  246. "sender": batch[0]["sender"],
  247. "content": merged_content
  248. })
  249. # --- 消息回调(所有消息统一经过 AgentRunner 处理)---
  250. async def on_message(msg: dict):
  251. nonlocal upload_timer
  252. sender = msg.get("sender")
  253. content = msg.get("content", "")
  254. # 忽略自己发的消息
  255. if sender == contact_id:
  256. return
  257. # 判断消息类型(根据前缀标记)
  258. if content.startswith("[ASK]"):
  259. # ASK 查询:入队,由 AgentRunner 驱动(带 Trace、可跨表推理)
  260. logger.info(f"[KM] <- 收到查询消息: {sender} (入队)")
  261. await message_queue.put(msg)
  262. elif content.startswith("[UPLOAD"):
  263. # 批处理:加入缓冲区,延迟处理
  264. logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)")
  265. upload_buffer.append(msg)
  266. # 启动或重置定时器
  267. if upload_timer:
  268. upload_timer.cancel()
  269. upload_timer = asyncio.create_task(process_upload_batch())
  270. else:
  271. # 其他消息:正常入队
  272. logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})")
  273. await message_queue.put(msg)
  274. client.on_message(on_message, chat_id="*")
  275. # 启动消息处理器(后台任务)
  276. processor_task = asyncio.create_task(message_processor())
  277. # 启动 IM Client(事件驱动)
  278. logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)")
  279. try:
  280. await client.run()
  281. finally:
  282. processor_task.cancel()
  283. try:
  284. await processor_task
  285. except asyncio.CancelledError:
  286. pass