knowledge_manager_v2.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. """
  2. Knowledge Manager V2 - 轻量级缓存 + 按需 Agent
  3. 架构:
  4. 1. 收到 upload 消息 → 直接调用 cache_research_data,不启动 Agent
  5. 2. 收到 query/organize 消息 → 启动 Agent 处理
  6. 3. 消息队列保证不丢消息
  7. 优势:
  8. - upload 操作轻量快速
  9. - 只在需要时才启动 Agent(查询、整理、入库)
  10. """
  11. import asyncio
  12. import json
  13. import logging
  14. import sys
  15. from pathlib import Path
  16. from typing import Optional
  17. # 确保项目路径可用
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from agent.core.runner import AgentRunner, RunConfig
  20. from agent.trace import FileSystemTraceStore, Trace, Message
  21. from agent.llm import create_qwen_llm_call
  22. from agent.llm.prompts import SimplePrompt
  23. from agent.tools.builtin.knowledge import KnowledgeConfig
  24. # 导入缓存工具
  25. from knowhub.internal_tools.cache_manager import cache_research_data
  26. logger = logging.getLogger("agents.knowledge_manager_v2")
  27. # Knowledge Manager Agent 配置
  28. KNOWLEDGE_MANAGER_CONFIG = RunConfig(
  29. model="qwen3.5-plus",
  30. temperature=0.2,
  31. max_iterations=50,
  32. agent_type="knowledge_manager",
  33. name="知识库管理",
  34. goal_compression="none",
  35. knowledge=KnowledgeConfig(
  36. enable_extraction=False,
  37. enable_completion_extraction=False,
  38. enable_injection=False,
  39. ),
  40. tools=[
  41. "knowledge_search",
  42. "knowledge_save",
  43. "knowledge_list",
  44. "knowledge_update",
  45. "resource_save",
  46. "resource_get",
  47. "read_file",
  48. "write_file",
  49. "cache_research_data",
  50. "organize_cached_data",
  51. "commit_to_database",
  52. "list_cache_status",
  53. ],
  54. )
  55. async def start_knowledge_manager_v2(
  56. contact_id: str = "knowledge_manager",
  57. server_url: str = "ws://localhost:58005",
  58. chat_id: str = "main"
  59. ):
  60. """
  61. 启动 Knowledge Manager V2(轻量级缓存 + 按需 Agent)
  62. 消息类型:
  63. 1. upload 消息 → 直接缓存,不启动 Agent
  64. 2. query/organize/commit 消息 → 启动 Agent 处理
  65. """
  66. logger.info(f"正在启动 Knowledge Manager V2...")
  67. logger.info(f" - Contact ID: {contact_id}")
  68. logger.info(f" - Server: {server_url}")
  69. # 导入 IM Client
  70. try:
  71. sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
  72. from client import IMClient
  73. except ImportError as e:
  74. logger.error(f"无法导入 IM Client: {e}")
  75. return
  76. # --- 初始化 AgentRunner ---
  77. store = FileSystemTraceStore(base_path=".trace")
  78. llm_call = create_qwen_llm_call(model=KNOWLEDGE_MANAGER_CONFIG.model)
  79. runner = AgentRunner(
  80. trace_store=store,
  81. llm_call=llm_call,
  82. debug=True
  83. )
  84. # 加载 system prompt
  85. prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
  86. prompt = SimplePrompt(prompt_path)
  87. system_messages = prompt.build_messages()
  88. # Trace 状态
  89. current_trace_id = None
  90. message_queue = asyncio.Queue()
  91. # --- 初始化 IM Client ---
  92. client = IMClient(
  93. contact_id=contact_id,
  94. server_url=server_url,
  95. data_dir=str(Path.home() / ".knowhub" / "im_data")
  96. )
  97. client.open_window(chat_id=chat_id)
  98. # --- 消息分类处理器 ---
  99. async def message_processor():
  100. nonlocal current_trace_id
  101. while True:
  102. msg = await message_queue.get()
  103. sender = msg.get("sender")
  104. content = msg.get("content", "")
  105. logger.info(f"[KM] <- 收到消息: {sender}")
  106. logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
  107. try:
  108. # 判断消息类型
  109. is_upload = "增量上传" in content or "最终提交" in content
  110. if is_upload:
  111. # 直接缓存,不启动 Agent
  112. await handle_upload_message(content, sender, client, chat_id)
  113. else:
  114. # 启动 Agent 处理
  115. await handle_agent_message(
  116. content, sender, client, chat_id,
  117. runner, system_messages, current_trace_id
  118. )
  119. except Exception as e:
  120. logger.error(f"[KM] 处理失败: {e}", exc_info=True)
  121. # 回复错误
  122. client.send_message(
  123. chat_id=chat_id,
  124. receiver=sender,
  125. content=f"处理失败: {str(e)}"
  126. )
  127. message_queue.task_done()
  128. async def handle_upload_message(content: str, sender: str, client, chat_id: str):
  129. """处理 upload 消息:直接缓存,不启动 Agent"""
  130. logger.info(f"[KM] 处理 upload 消息(轻量级)")
  131. # 提取 JSON 数据
  132. try:
  133. # 找到 JSON 部分
  134. json_start = content.find("{")
  135. if json_start == -1:
  136. raise ValueError("消息中没有 JSON 数据")
  137. json_str = content[json_start:]
  138. data = json.loads(json_str)
  139. # 直接调用缓存工具
  140. result = await cache_research_data(data=data, source=sender)
  141. # 回复
  142. response = result.output if hasattr(result, 'output') else str(result)
  143. client.send_message(
  144. chat_id=chat_id,
  145. receiver=sender,
  146. content=response
  147. )
  148. logger.info(f"[KM] -> 已缓存并回复: {sender}")
  149. except Exception as e:
  150. logger.error(f"[KM] 缓存失败: {e}")
  151. client.send_message(
  152. chat_id=chat_id,
  153. receiver=sender,
  154. content=f"缓存失败: {str(e)}"
  155. )
  156. async def handle_agent_message(
  157. content: str, sender: str, client, chat_id: str,
  158. runner, system_messages, trace_id
  159. ):
  160. """处理需要 Agent 的消息:查询、整理、入库"""
  161. logger.info(f"[KM] 处理 Agent 消息(重量级)")
  162. nonlocal current_trace_id
  163. # 构造 user message
  164. if current_trace_id is None:
  165. messages = system_messages + [{"role": "user", "content": content}]
  166. else:
  167. messages = [{"role": "user", "content": content}]
  168. config = RunConfig(
  169. model=KNOWLEDGE_MANAGER_CONFIG.model,
  170. temperature=KNOWLEDGE_MANAGER_CONFIG.temperature,
  171. max_iterations=KNOWLEDGE_MANAGER_CONFIG.max_iterations,
  172. agent_type=KNOWLEDGE_MANAGER_CONFIG.agent_type,
  173. name=KNOWLEDGE_MANAGER_CONFIG.name,
  174. goal_compression=KNOWLEDGE_MANAGER_CONFIG.goal_compression,
  175. tools=KNOWLEDGE_MANAGER_CONFIG.tools,
  176. knowledge=KNOWLEDGE_MANAGER_CONFIG.knowledge,
  177. trace_id=current_trace_id,
  178. )
  179. # 执行 AgentRunner
  180. response_text = ""
  181. async for item in runner.run(messages=messages, config=config):
  182. if isinstance(item, Trace):
  183. current_trace_id = item.trace_id
  184. if item.status == "running":
  185. logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
  186. elif item.status == "completed":
  187. logger.info(f"[KM] Trace 完成")
  188. elif item.status == "failed":
  189. logger.error(f"[KM] Trace 失败: {item.error_message}")
  190. elif isinstance(item, Message):
  191. if item.role == "assistant":
  192. msg_content = item.content
  193. if isinstance(msg_content, dict):
  194. text = msg_content.get("text", "")
  195. tool_calls = msg_content.get("tool_calls")
  196. if text and not tool_calls:
  197. response_text = text
  198. elif item.role == "tool":
  199. tool_content = item.content
  200. tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
  201. logger.info(f"[KM] 工具: {tool_name}")
  202. # 回复
  203. if response_text:
  204. client.send_message(
  205. chat_id=chat_id,
  206. receiver=sender,
  207. content=response_text
  208. )
  209. logger.info(f"[KM] -> 已回复: {sender}")
  210. else:
  211. logger.warning(f"[KM] AgentRunner 没有生成回复")
  212. # --- 消息回调(只入队)---
  213. async def on_message(msg: dict):
  214. sender = msg.get("sender")
  215. if sender == contact_id:
  216. return
  217. logger.info(f"[KM] <- 入队: {sender} (队列长度: {message_queue.qsize() + 1})")
  218. await message_queue.put(msg)
  219. client.on_message(on_message, chat_id="*")
  220. # 启动消息处理器
  221. processor_task = asyncio.create_task(message_processor())
  222. # 启动 IM Client
  223. logger.info("✅ Knowledge Manager V2 已启动(轻量级缓存 + 按需 Agent)")
  224. try:
  225. await client.run()
  226. finally:
  227. processor_task.cancel()
  228. try:
  229. await processor_task
  230. except asyncio.CancelledError:
  231. pass
  232. if __name__ == "__main__":
  233. asyncio.run(start_knowledge_manager_v2())