knowledge_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. """
  2. Knowledge Manager Agent
  3. 基于 AgentRunner 驱动,有完整的 trace 记录。
  4. 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。
  5. 架构:
  6. IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录
  7. """
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from pathlib import Path
  13. from typing import Optional
  14. # 确保项目路径可用
  15. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.trace import FileSystemTraceStore, Trace, Message
  18. from agent.llm import create_qwen_llm_call
  19. from agent.llm.prompts import SimplePrompt
  20. from agent.tools.builtin.knowledge import KnowledgeConfig
  21. logger = logging.getLogger("agents.knowledge_manager")
  22. # ===== 配置项 =====
  23. ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库)
  24. # Knowledge Manager Agent 配置
  25. def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
  26. """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)"""
  27. tools = [
  28. "knowledge_search",
  29. "knowledge_save",
  30. "knowledge_list",
  31. "knowledge_update",
  32. "resource_save",
  33. "resource_get",
  34. "read_file",
  35. "write_file",
  36. # 本地缓存工具
  37. "cache_research_data",
  38. "organize_cached_data",
  39. "list_cache_status",
  40. ]
  41. # 只有启用入库时才开放 commit_to_database 工具
  42. if enable_db_commit:
  43. tools.append("commit_to_database")
  44. return RunConfig(
  45. model="qwen3.5-plus",
  46. temperature=0.2,
  47. max_iterations=50,
  48. agent_type="knowledge_manager",
  49. name="知识库管理",
  50. goal_compression="none",
  51. # 禁用所有知识提取和反思
  52. knowledge=KnowledgeConfig(
  53. enable_extraction=False,
  54. enable_completion_extraction=False,
  55. enable_injection=False,
  56. ),
  57. tools=tools,
  58. )
  59. async def start_knowledge_manager(
  60. contact_id: str = "knowledge_manager",
  61. server_url: str = "ws://localhost:58005",
  62. chat_id: str = "main",
  63. enable_db_commit: bool = ENABLE_DATABASE_COMMIT
  64. ):
  65. """
  66. 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动)
  67. 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。
  68. LLM 自主决策调用什么工具,所有操作记录到 trace。
  69. Args:
  70. contact_id: IM 身份 ID
  71. server_url: IM Server 地址
  72. chat_id: 聊天窗口 ID
  73. enable_db_commit: 是否允许入库(False=只缓存,True=可入库)
  74. """
  75. logger.info(f"正在启动 Knowledge Manager...")
  76. logger.info(f" - Contact ID: {contact_id}")
  77. logger.info(f" - Server: {server_url}")
  78. logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}")
  79. # 注册内部工具(缓存管理)
  80. try:
  81. sys.path.insert(0, str(Path(__file__).parent.parent))
  82. from internal_tools.cache_manager import (
  83. cache_research_data,
  84. organize_cached_data,
  85. commit_to_database,
  86. list_cache_status,
  87. )
  88. from agent.tools import get_tool_registry
  89. registry = get_tool_registry()
  90. registry.register(cache_research_data)
  91. registry.register(organize_cached_data)
  92. registry.register(commit_to_database)
  93. registry.register(list_cache_status)
  94. logger.info(" ✓ 已注册缓存管理工具")
  95. except Exception as e:
  96. logger.error(f" ✗ 注册缓存工具失败: {e}")
  97. # 导入 IM Client
  98. try:
  99. sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
  100. from client import IMClient
  101. except ImportError as e:
  102. logger.error(f"无法导入 IM Client: {e}")
  103. return
  104. # --- 初始化 AgentRunner ---
  105. store = FileSystemTraceStore(base_path=".trace")
  106. llm_call = create_qwen_llm_call(model=KNOWLEDGE_MANAGER_CONFIG.model)
  107. runner = AgentRunner(
  108. trace_store=store,
  109. llm_call=llm_call,
  110. debug=True,
  111. logger_name="agents.knowledge_manager"
  112. )
  113. # 加载 system prompt
  114. prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
  115. prompt = SimplePrompt(prompt_path)
  116. system_messages = prompt.build_messages()
  117. # Trace 状态(同一个 trace 持续追加,保持上下文)
  118. current_trace_id = None
  119. message_queue = asyncio.Queue() # 消息队列,防止丢消息
  120. upload_buffer = [] # upload 消息缓冲区(用于批处理)
  121. upload_timer = None # 延迟处理定时器
  122. # --- 初始化 IM Client ---
  123. client = IMClient(
  124. contact_id=contact_id,
  125. server_url=server_url,
  126. data_dir=str(Path.home() / ".knowhub" / "im_data")
  127. )
  128. # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知)
  129. class _SilentNotifier:
  130. async def notify(self, count, from_contacts):
  131. pass
  132. # 打开窗口
  133. client.open_window(chat_id=chat_id, notifier=_SilentNotifier())
  134. # --- 消息处理器(从队列中取消息,逐条处理)---
  135. async def message_processor():
  136. nonlocal current_trace_id
  137. while True:
  138. msg = await message_queue.get() # 阻塞等待,零消耗
  139. sender = msg.get("sender")
  140. content = msg.get("content", "")
  141. # 处理完消息后清空 pending,防止 notifier 反复通知
  142. client.read_pending(chat_id)
  143. logger.info(f"[KM] <- 处理消息: {sender}")
  144. logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
  145. try:
  146. # 续跑同一个 trace(首次为 None 会新建,后续复用)
  147. if current_trace_id is None:
  148. messages = system_messages + [{"role": "user", "content": content}]
  149. else:
  150. messages = [{"role": "user", "content": content}]
  151. # 获取配置
  152. km_config = get_knowledge_manager_config(enable_db_commit)
  153. config = RunConfig(
  154. model=km_config.model,
  155. temperature=km_config.temperature,
  156. max_iterations=km_config.max_iterations,
  157. agent_type=km_config.agent_type,
  158. name=km_config.name,
  159. goal_compression=km_config.goal_compression,
  160. tools=km_config.tools,
  161. knowledge=km_config.knowledge,
  162. trace_id=current_trace_id, # 复用 trace,保持完整生命周期
  163. context={
  164. "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数
  165. "current_sender": sender, # 当前处理的消息来源
  166. }
  167. )
  168. # 执行 AgentRunner
  169. response_text = ""
  170. async for item in runner.run(messages=messages, config=config):
  171. if isinstance(item, Trace):
  172. current_trace_id = item.trace_id
  173. if item.status == "running":
  174. logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
  175. elif item.status == "completed":
  176. logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})")
  177. elif item.status == "failed":
  178. logger.error(f"[KM] Trace 失败: {item.error_message}")
  179. elif isinstance(item, Message):
  180. if item.role == "assistant":
  181. msg_content = item.content
  182. if isinstance(msg_content, dict):
  183. text = msg_content.get("text", "")
  184. tool_calls = msg_content.get("tool_calls")
  185. if text:
  186. # 始终记录最新的文本(最后一条就是最终回复)
  187. response_text = text
  188. if tool_calls:
  189. logger.info(f"[KM] 思考: {text[:80]}...")
  190. elif isinstance(msg_content, str) and msg_content:
  191. response_text = msg_content
  192. elif item.role == "tool":
  193. tool_content = item.content
  194. tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
  195. logger.info(f"[KM] 工具: {tool_name}")
  196. # 回复
  197. if response_text:
  198. client.send_message(
  199. chat_id=chat_id,
  200. receiver=sender,
  201. content=response_text
  202. )
  203. logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)")
  204. else:
  205. logger.warning(f"[KM] AgentRunner 没有生成回复")
  206. except Exception as e:
  207. logger.error(f"[KM] 处理失败: {e}", exc_info=True)
  208. message_queue.task_done()
  209. # --- 批处理 upload 消息 ---
  210. async def process_upload_batch():
  211. """批处理 upload 消息(延迟 5 秒,合并多个 upload)"""
  212. nonlocal upload_buffer, upload_timer
  213. await asyncio.sleep(5) # 等待 5 秒,收集更多 upload
  214. if not upload_buffer:
  215. upload_timer = None
  216. return
  217. # 合并所有 upload 消息
  218. batch = upload_buffer.copy()
  219. upload_buffer.clear()
  220. upload_timer = None
  221. logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息")
  222. # 合并成一条消息入队
  223. merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n"
  224. for i, msg in enumerate(batch, 1):
  225. merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n"
  226. await message_queue.put({
  227. "sender": batch[0]["sender"],
  228. "content": merged_content
  229. })
  230. # --- 快速处理 ask 查询(不经过队列)---
  231. async def handle_ask_query(sender: str, content: str):
  232. """快速响应 ask 查询,不阻塞 upload 处理"""
  233. try:
  234. logger.info(f"[KM] <- 快速查询: {sender}")
  235. # 1. 查询数据库
  236. from agent.tools.builtin.knowledge import knowledge_search
  237. db_result = await knowledge_search(query=content, top_k=5, min_score=3)
  238. # 2. 读取缓存(正在处理的 upload 数据)
  239. from knowhub.internal_tools.cache_manager import list_cache_status
  240. cache_status = await list_cache_status()
  241. # 3. 组合回复
  242. response_parts = []
  243. if db_result.output and db_result.output != "未找到相关知识":
  244. response_parts.append("## 数据库中的知识\n\n" + db_result.output)
  245. if cache_status.metadata and cache_status.metadata.get("files"):
  246. cache_files = cache_status.metadata["files"]
  247. if cache_files:
  248. response_parts.append(
  249. f"## 缓存中的数据\n\n"
  250. f"正在处理 {len(cache_files)} 个缓存文件,包含最新调研数据(尚未入库)。\n"
  251. f"如需查看详情,请稍后再次查询。"
  252. )
  253. if not response_parts:
  254. response_parts.append("暂无相关知识,数据库和缓存均为空。")
  255. response_text = "\n\n".join(response_parts)
  256. # 4. 立即回复
  257. client.send_message(
  258. chat_id=chat_id,
  259. receiver=sender,
  260. content=response_text
  261. )
  262. logger.info(f"[KM] -> 快速回复: {sender} ({len(response_text)} 字符)")
  263. except Exception as e:
  264. logger.error(f"[KM] 快速查询失败: {e}", exc_info=True)
  265. client.send_message(
  266. chat_id=chat_id,
  267. receiver=sender,
  268. content=f"查询失败: {e}"
  269. )
  270. # --- 消息回调(ask 快速通道,upload 批处理)---
  271. async def on_message(msg: dict):
  272. nonlocal upload_timer
  273. sender = msg.get("sender")
  274. content = msg.get("content", "")
  275. # 忽略自己发的消息
  276. if sender == contact_id:
  277. return
  278. # 判断消息类型(根据前缀标记)
  279. if content.startswith("[ASK]"):
  280. # 快速通道:立即处理,不入队
  281. query = content[5:].strip() # 去掉 [ASK] 前缀
  282. logger.info(f"[KM] <- 收到查询消息: {sender} (快速通道)")
  283. asyncio.create_task(handle_ask_query(sender, query))
  284. elif content.startswith("[UPLOAD"):
  285. # 批处理:加入缓冲区,延迟处理
  286. logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)")
  287. upload_buffer.append(msg)
  288. # 启动或重置定时器
  289. if upload_timer:
  290. upload_timer.cancel()
  291. upload_timer = asyncio.create_task(process_upload_batch())
  292. else:
  293. # 其他消息:正常入队
  294. logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})")
  295. await message_queue.put(msg)
  296. client.on_message(on_message, chat_id="*")
  297. # 启动消息处理器(后台任务)
  298. processor_task = asyncio.create_task(message_processor())
  299. # 启动 IM Client(事件驱动)
  300. logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)")
  301. try:
  302. await client.run()
  303. finally:
  304. processor_task.cancel()
  305. try:
  306. await processor_task
  307. except asyncio.CancelledError:
  308. pass