knowledge_manager.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. """
  2. Knowledge Manager Agent
  3. 基于 AgentRunner 驱动,有完整的 trace 记录。
  4. 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。
  5. 架构:
  6. IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录
  7. """
  8. import asyncio
  9. import json
  10. import logging
  11. import sys
  12. from pathlib import Path
  13. from typing import Optional
  14. # 确保项目路径可用
  15. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  16. from agent.core.runner import AgentRunner, RunConfig
  17. from agent.trace import FileSystemTraceStore, Trace, Message
  18. from agent.llm import create_qwen_llm_call
  19. from agent.llm.prompts import SimplePrompt
  20. from agent.tools.builtin.knowledge import KnowledgeConfig
  21. logger = logging.getLogger("agents.knowledge_manager")
  22. # ===== 配置项 =====
  23. ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库)
  24. # Knowledge Manager Agent 配置
  25. def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
  26. """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)"""
  27. tools = [
  28. "knowledge_search",
  29. "knowledge_save",
  30. "knowledge_list",
  31. "knowledge_update",
  32. "resource_save",
  33. "resource_get",
  34. "read_file",
  35. "write_file",
  36. # 本地缓存工具
  37. "cache_research_data",
  38. "organize_cached_data",
  39. "list_cache_status",
  40. ]
  41. # 只有启用入库时才开放 commit_to_database 工具
  42. if enable_db_commit:
  43. tools.append("commit_to_database")
  44. return RunConfig(
  45. model="qwen3.5-plus",
  46. temperature=0.2,
  47. max_iterations=50,
  48. agent_type="knowledge_manager",
  49. name="知识库管理",
  50. goal_compression="none",
  51. # 禁用所有知识提取和反思
  52. knowledge=KnowledgeConfig(
  53. enable_extraction=False,
  54. enable_completion_extraction=False,
  55. enable_injection=False,
  56. ),
  57. tools=tools,
  58. )
  59. async def start_knowledge_manager(
  60. contact_id: str = "knowledge_manager",
  61. server_url: str = "ws://43.106.118.91:8105",
  62. chat_id: str = "main",
  63. enable_db_commit: bool = ENABLE_DATABASE_COMMIT
  64. ):
  65. """
  66. 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动)
  67. 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。
  68. LLM 自主决策调用什么工具,所有操作记录到 trace。
  69. Args:
  70. contact_id: IM 身份 ID
  71. server_url: IM Server 地址
  72. chat_id: 聊天窗口 ID
  73. enable_db_commit: 是否允许入库(False=只缓存,True=可入库)
  74. """
  75. logger.info(f"正在启动 Knowledge Manager...")
  76. logger.info(f" - Contact ID: {contact_id}")
  77. logger.info(f" - Server: {server_url}")
  78. logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}")
  79. # 注册内部工具(缓存管理)
  80. try:
  81. sys.path.insert(0, str(Path(__file__).parent.parent))
  82. from internal_tools.cache_manager import (
  83. cache_research_data,
  84. organize_cached_data,
  85. commit_to_database,
  86. list_cache_status,
  87. )
  88. from agent.tools import get_tool_registry
  89. registry = get_tool_registry()
  90. registry.register(cache_research_data)
  91. registry.register(organize_cached_data)
  92. registry.register(commit_to_database)
  93. registry.register(list_cache_status)
  94. logger.info(" ✓ 已注册缓存管理工具")
  95. except Exception as e:
  96. logger.error(f" ✗ 注册缓存工具失败: {e}")
  97. # 导入 IM Client
  98. try:
  99. sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
  100. from client import IMClient
  101. except ImportError as e:
  102. logger.error(f"无法导入 IM Client: {e}")
  103. return
  104. # --- 初始化 AgentRunner ---
  105. km_config = get_knowledge_manager_config(enable_db_commit)
  106. store = FileSystemTraceStore(base_path=".trace")
  107. llm_call = create_qwen_llm_call(model=km_config.model)
  108. runner = AgentRunner(
  109. trace_store=store,
  110. llm_call=llm_call,
  111. debug=True,
  112. logger_name="agents.knowledge_manager"
  113. )
  114. # 加载 system prompt
  115. prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
  116. prompt = SimplePrompt(prompt_path)
  117. system_messages = prompt.build_messages()
  118. # Trace 状态(同一个 trace 持续追加,保持上下文)
  119. current_trace_id = None
  120. message_queue = asyncio.Queue() # 消息队列,防止丢消息
  121. upload_buffer = [] # upload 消息缓冲区(用于批处理)
  122. upload_timer = None # 延迟处理定时器
  123. # --- 初始化 IM Client ---
  124. client = IMClient(
  125. contact_id=contact_id,
  126. server_url=server_url,
  127. data_dir=str(Path.home() / ".knowhub" / "im_data")
  128. )
  129. # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知)
  130. class _SilentNotifier:
  131. async def notify(self, count, from_contacts):
  132. pass
  133. # 打开窗口
  134. client.open_window(chat_id=chat_id, notifier=_SilentNotifier())
  135. # --- 消息处理器(从队列中取消息,逐条处理)---
  136. async def message_processor():
  137. nonlocal current_trace_id
  138. while True:
  139. msg = await message_queue.get() # 阻塞等待,零消耗
  140. sender = msg.get("sender")
  141. content = msg.get("content", "")
  142. # 处理完消息后清空 pending,防止 notifier 反复通知
  143. client.read_pending(chat_id)
  144. logger.info(f"[KM] <- 处理消息: {sender}")
  145. logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
  146. try:
  147. # 续跑同一个 trace(首次为 None 会新建,后续复用)
  148. if current_trace_id is None:
  149. messages = system_messages + [{"role": "user", "content": content}]
  150. else:
  151. messages = [{"role": "user", "content": content}]
  152. # 获取配置
  153. km_config = get_knowledge_manager_config(enable_db_commit)
  154. config = RunConfig(
  155. model=km_config.model,
  156. temperature=km_config.temperature,
  157. max_iterations=km_config.max_iterations,
  158. agent_type=km_config.agent_type,
  159. name=km_config.name,
  160. goal_compression=km_config.goal_compression,
  161. tools=km_config.tools,
  162. knowledge=km_config.knowledge,
  163. trace_id=current_trace_id, # 复用 trace,保持完整生命周期
  164. context={
  165. "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数
  166. "current_sender": sender, # 当前处理的消息来源
  167. }
  168. )
  169. # 执行 AgentRunner
  170. response_text = ""
  171. async for item in runner.run(messages=messages, config=config):
  172. if isinstance(item, Trace):
  173. current_trace_id = item.trace_id
  174. if item.status == "running":
  175. logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
  176. elif item.status == "completed":
  177. logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})")
  178. elif item.status == "failed":
  179. logger.error(f"[KM] Trace 失败: {item.error_message}")
  180. elif isinstance(item, Message):
  181. if item.role == "assistant":
  182. msg_content = item.content
  183. if isinstance(msg_content, dict):
  184. text = msg_content.get("text", "")
  185. tool_calls = msg_content.get("tool_calls")
  186. if text:
  187. # 始终记录最新的文本(最后一条就是最终回复)
  188. response_text = text
  189. if tool_calls:
  190. logger.info(f"[KM] 思考: {text[:80]}...")
  191. elif isinstance(msg_content, str) and msg_content:
  192. response_text = msg_content
  193. elif item.role == "tool":
  194. tool_content = item.content
  195. tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
  196. logger.info(f"[KM] 工具: {tool_name}")
  197. # 回复
  198. if response_text:
  199. client.send_message(
  200. chat_id=chat_id,
  201. receiver=sender,
  202. content=response_text
  203. )
  204. logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)")
  205. else:
  206. logger.warning(f"[KM] AgentRunner 没有生成回复")
  207. except Exception as e:
  208. logger.error(f"[KM] 处理失败: {e}", exc_info=True)
  209. message_queue.task_done()
  210. # --- 批处理 upload 消息 ---
  211. async def process_upload_batch():
  212. """批处理 upload 消息(延迟 5 秒,合并多个 upload)"""
  213. nonlocal upload_buffer, upload_timer
  214. await asyncio.sleep(5) # 等待 5 秒,收集更多 upload
  215. if not upload_buffer:
  216. upload_timer = None
  217. return
  218. # 合并所有 upload 消息
  219. batch = upload_buffer.copy()
  220. upload_buffer.clear()
  221. upload_timer = None
  222. logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息")
  223. # 合并成一条消息入队
  224. merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n"
  225. for i, msg in enumerate(batch, 1):
  226. merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n"
  227. await message_queue.put({
  228. "sender": batch[0]["sender"],
  229. "content": merged_content
  230. })
  231. # --- 快速处理 ask 查询(不经过队列)---
  232. async def handle_ask_query(sender: str, content: str):
  233. """快速响应 ask 查询,不阻塞 upload 处理"""
  234. try:
  235. logger.info(f"[KM] <- 快速查询: {sender}")
  236. # 1. 查询数据库
  237. from agent.tools.builtin.knowledge import knowledge_search
  238. db_result = await knowledge_search(query=content, top_k=5, min_score=3)
  239. # 2. 读取缓存(正在处理的 upload 数据)
  240. from knowhub.internal_tools.cache_manager import list_cache_status
  241. cache_status = await list_cache_status()
  242. # 3. 组合回复
  243. response_parts = []
  244. if db_result.output and db_result.output != "未找到相关知识":
  245. response_parts.append("## 数据库中的知识\n\n" + db_result.output)
  246. if cache_status.metadata and cache_status.metadata.get("files"):
  247. cache_files = cache_status.metadata["files"]
  248. if cache_files:
  249. response_parts.append(
  250. f"## 缓存中的数据\n\n"
  251. f"正在处理 {len(cache_files)} 个缓存文件,包含最新调研数据(尚未入库)。\n"
  252. f"如需查看详情,请稍后再次查询。"
  253. )
  254. if not response_parts:
  255. response_parts.append("暂无相关知识,数据库和缓存均为空。")
  256. response_text = "\n\n".join(response_parts)
  257. # 4. 立即回复
  258. client.send_message(
  259. chat_id=chat_id,
  260. receiver=sender,
  261. content=response_text
  262. )
  263. logger.info(f"[KM] -> 快速回复: {sender} ({len(response_text)} 字符)")
  264. except Exception as e:
  265. logger.error(f"[KM] 快速查询失败: {e}", exc_info=True)
  266. client.send_message(
  267. chat_id=chat_id,
  268. receiver=sender,
  269. content=f"查询失败: {e}"
  270. )
  271. # --- 消息回调(ask 快速通道,upload 批处理)---
  272. async def on_message(msg: dict):
  273. nonlocal upload_timer
  274. sender = msg.get("sender")
  275. content = msg.get("content", "")
  276. # 忽略自己发的消息
  277. if sender == contact_id:
  278. return
  279. # 判断消息类型(根据前缀标记)
  280. if content.startswith("[ASK]"):
  281. # 快速通道:立即处理,不入队
  282. query = content[5:].strip() # 去掉 [ASK] 前缀
  283. logger.info(f"[KM] <- 收到查询消息: {sender} (快速通道)")
  284. asyncio.create_task(handle_ask_query(sender, query))
  285. elif content.startswith("[UPLOAD"):
  286. # 批处理:加入缓冲区,延迟处理
  287. logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)")
  288. upload_buffer.append(msg)
  289. # 启动或重置定时器
  290. if upload_timer:
  291. upload_timer.cancel()
  292. upload_timer = asyncio.create_task(process_upload_batch())
  293. else:
  294. # 其他消息:正常入队
  295. logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})")
  296. await message_queue.put(msg)
  297. client.on_message(on_message, chat_id="*")
  298. # 启动消息处理器(后台任务)
  299. processor_task = asyncio.create_task(message_processor())
  300. # 启动 IM Client(事件驱动)
  301. logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)")
  302. try:
  303. await client.run()
  304. finally:
  305. processor_task.cancel()
  306. try:
  307. await processor_task
  308. except asyncio.CancelledError:
  309. pass