""" Knowledge Manager Agent 基于 AgentRunner 驱动,有完整的 trace 记录。 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。 架构: IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录 """ import asyncio import json import logging import sys from pathlib import Path from typing import Optional # 确保项目路径可用 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.llm.prompts import SimplePrompt from agent.tools.builtin.knowledge import KnowledgeConfig logger = logging.getLogger("agents.knowledge_manager") # ===== 配置项 ===== ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库) # Knowledge Manager Agent 配置 def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig: """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)""" tools = [ "knowledge_search", "knowledge_save", "knowledge_list", "knowledge_update", "resource_save", "resource_get", "read_file", "write_file", # 本地缓存工具 "cache_research_data", "organize_cached_data", "list_cache_status", ] # 只有启用入库时才开放 commit_to_database 工具 if enable_db_commit: tools.append("commit_to_database") return RunConfig( model="qwen3.5-plus", temperature=0.2, max_iterations=50, agent_type="knowledge_manager", name="知识库管理", goal_compression="none", # 禁用所有知识提取和反思 knowledge=KnowledgeConfig( enable_extraction=False, enable_completion_extraction=False, enable_injection=False, ), tools=tools, ) async def start_knowledge_manager( contact_id: str = "knowledge_manager", server_url: str = "ws://43.106.118.91:8105", chat_id: str = "main", enable_db_commit: bool = ENABLE_DATABASE_COMMIT ): """ 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动) 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。 LLM 自主决策调用什么工具,所有操作记录到 trace。 Args: contact_id: IM 身份 ID server_url: IM Server 地址 chat_id: 聊天窗口 ID enable_db_commit: 是否允许入库(False=只缓存,True=可入库) """ logger.info(f"正在启动 Knowledge Manager...") logger.info(f" - Contact ID: {contact_id}") logger.info(f" - Server: {server_url}") logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}") # 注册内部工具(缓存管理) try: sys.path.insert(0, str(Path(__file__).parent.parent)) from internal_tools.cache_manager import ( cache_research_data, organize_cached_data, commit_to_database, list_cache_status, ) from agent.tools import get_tool_registry registry = get_tool_registry() registry.register(cache_research_data) registry.register(organize_cached_data) registry.register(commit_to_database) registry.register(list_cache_status) logger.info(" ✓ 已注册缓存管理工具") except Exception as e: logger.error(f" ✗ 注册缓存工具失败: {e}") # 导入 IM Client try: sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client")) from client import IMClient except ImportError as e: logger.error(f"无法导入 IM Client: {e}") return # --- 初始化 AgentRunner --- km_config = get_knowledge_manager_config(enable_db_commit) store = FileSystemTraceStore(base_path=".trace") llm_call = create_qwen_llm_call(model=km_config.model) runner = AgentRunner( trace_store=store, llm_call=llm_call, debug=True, logger_name="agents.knowledge_manager" ) # 加载 system prompt prompt_path = Path(__file__).parent / "knowledge_manager.prompt" prompt = SimplePrompt(prompt_path) system_messages = prompt.build_messages() # Trace 状态(同一个 trace 持续追加,保持上下文) current_trace_id = None message_queue = asyncio.Queue() # 消息队列,防止丢消息 upload_buffer = [] # upload 消息缓冲区(用于批处理) upload_timer = None # 延迟处理定时器 # --- 初始化 IM Client --- client = IMClient( contact_id=contact_id, server_url=server_url, data_dir=str(Path.home() / ".knowhub" / "im_data") ) # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知) class _SilentNotifier: async def notify(self, count, from_contacts): pass # 打开窗口 client.open_window(chat_id=chat_id, notifier=_SilentNotifier()) # --- 消息处理器(从队列中取消息,逐条处理)--- async def message_processor(): nonlocal current_trace_id while True: msg = await message_queue.get() # 阻塞等待,零消耗 sender = msg.get("sender") content = msg.get("content", "") # 处理完消息后清空 pending,防止 notifier 反复通知 client.read_pending(chat_id) logger.info(f"[KM] <- 处理消息: {sender}") logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}") try: # 续跑同一个 trace(首次为 None 会新建,后续复用) if current_trace_id is None: messages = system_messages + [{"role": "user", "content": content}] else: messages = [{"role": "user", "content": content}] # 获取配置 km_config = get_knowledge_manager_config(enable_db_commit) config = RunConfig( model=km_config.model, temperature=km_config.temperature, max_iterations=km_config.max_iterations, agent_type=km_config.agent_type, name=km_config.name, goal_compression=km_config.goal_compression, tools=km_config.tools, knowledge=km_config.knowledge, trace_id=current_trace_id, # 复用 trace,保持完整生命周期 context={ "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数 "current_sender": sender, # 当前处理的消息来源 } ) # 执行 AgentRunner response_text = "" async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": logger.info(f"[KM] Trace: {item.trace_id[:8]}...") elif item.status == "completed": logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})") elif item.status == "failed": logger.error(f"[KM] Trace 失败: {item.error_message}") elif isinstance(item, Message): if item.role == "assistant": msg_content = item.content if isinstance(msg_content, dict): text = msg_content.get("text", "") tool_calls = msg_content.get("tool_calls") if text: # 始终记录最新的文本(最后一条就是最终回复) response_text = text if tool_calls: logger.info(f"[KM] 思考: {text[:80]}...") elif isinstance(msg_content, str) and msg_content: response_text = msg_content elif item.role == "tool": tool_content = item.content tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?" logger.info(f"[KM] 工具: {tool_name}") # 回复 if response_text: client.send_message( chat_id=chat_id, receiver=sender, content=response_text ) logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)") else: logger.warning(f"[KM] AgentRunner 没有生成回复") except Exception as e: logger.error(f"[KM] 处理失败: {e}", exc_info=True) message_queue.task_done() # --- 批处理 upload 消息 --- async def process_upload_batch(): """批处理 upload 消息(延迟 5 秒,合并多个 upload)""" nonlocal upload_buffer, upload_timer await asyncio.sleep(5) # 等待 5 秒,收集更多 upload if not upload_buffer: upload_timer = None return # 合并所有 upload 消息 batch = upload_buffer.copy() upload_buffer.clear() upload_timer = None logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息") # 合并成一条消息入队 merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n" for i, msg in enumerate(batch, 1): merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n" await message_queue.put({ "sender": batch[0]["sender"], "content": merged_content }) # --- 快速处理 ask 查询(不经过队列)--- async def handle_ask_query(sender: str, content: str): """快速响应 ask 查询,不阻塞 upload 处理""" try: logger.info(f"[KM] <- 快速查询: {sender}") # 1. 查询数据库 from agent.tools.builtin.knowledge import knowledge_search db_result = await knowledge_search(query=content, top_k=5, min_score=3) # 2. 读取缓存(正在处理的 upload 数据) from knowhub.internal_tools.cache_manager import list_cache_status cache_status = await list_cache_status() # 3. 组合回复 response_parts = [] if db_result.output and db_result.output != "未找到相关知识": response_parts.append("## 数据库中的知识\n\n" + db_result.output) if cache_status.metadata and cache_status.metadata.get("files"): cache_files = cache_status.metadata["files"] if cache_files: response_parts.append( f"## 缓存中的数据\n\n" f"正在处理 {len(cache_files)} 个缓存文件,包含最新调研数据(尚未入库)。\n" f"如需查看详情,请稍后再次查询。" ) if not response_parts: response_parts.append("暂无相关知识,数据库和缓存均为空。") response_text = "\n\n".join(response_parts) # 4. 立即回复 client.send_message( chat_id=chat_id, receiver=sender, content=response_text ) logger.info(f"[KM] -> 快速回复: {sender} ({len(response_text)} 字符)") except Exception as e: logger.error(f"[KM] 快速查询失败: {e}", exc_info=True) client.send_message( chat_id=chat_id, receiver=sender, content=f"查询失败: {e}" ) # --- 消息回调(ask 快速通道,upload 批处理)--- async def on_message(msg: dict): nonlocal upload_timer sender = msg.get("sender") content = msg.get("content", "") # 忽略自己发的消息 if sender == contact_id: return # 判断消息类型(根据前缀标记) if content.startswith("[ASK]"): # 快速通道:立即处理,不入队 query = content[5:].strip() # 去掉 [ASK] 前缀 logger.info(f"[KM] <- 收到查询消息: {sender} (快速通道)") asyncio.create_task(handle_ask_query(sender, query)) elif content.startswith("[UPLOAD"): # 批处理:加入缓冲区,延迟处理 logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)") upload_buffer.append(msg) # 启动或重置定时器 if upload_timer: upload_timer.cancel() upload_timer = asyncio.create_task(process_upload_batch()) else: # 其他消息:正常入队 logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})") await message_queue.put(msg) client.on_message(on_message, chat_id="*") # 启动消息处理器(后台任务) processor_task = asyncio.create_task(message_processor()) # 启动 IM Client(事件驱动) logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)") try: await client.run() finally: processor_task.cancel() try: await processor_task except asyncio.CancelledError: pass