""" Knowledge Manager V2 - 轻量级缓存 + 按需 Agent 架构: 1. 收到 upload 消息 → 直接调用 cache_research_data,不启动 Agent 2. 收到 query/organize 消息 → 启动 Agent 处理 3. 消息队列保证不丢消息 优势: - upload 操作轻量快速 - 只在需要时才启动 Agent(查询、整理、入库) """ import asyncio import json import logging import sys from pathlib import Path from typing import Optional # 确保项目路径可用 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_qwen_llm_call from agent.llm.prompts import SimplePrompt from agent.tools.builtin.knowledge import KnowledgeConfig # 导入缓存工具 from knowhub.internal_tools.cache_manager import cache_research_data logger = logging.getLogger("agents.knowledge_manager_v2") # Knowledge Manager Agent 配置 KNOWLEDGE_MANAGER_CONFIG = RunConfig( model="qwen3.5-plus", temperature=0.2, max_iterations=50, agent_type="knowledge_manager", name="知识库管理", goal_compression="none", knowledge=KnowledgeConfig( enable_extraction=False, enable_completion_extraction=False, enable_injection=False, ), tools=[ "knowledge_search", "knowledge_save", "knowledge_list", "knowledge_update", "resource_save", "resource_get", "read_file", "write_file", "cache_research_data", "organize_cached_data", "commit_to_database", "list_cache_status", ], ) async def start_knowledge_manager_v2( contact_id: str = "knowledge_manager", server_url: str = "ws://43.106.118.91:8105", chat_id: str = "main" ): """ 启动 Knowledge Manager V2(轻量级缓存 + 按需 Agent) 消息类型: 1. upload 消息 → 直接缓存,不启动 Agent 2. query/organize/commit 消息 → 启动 Agent 处理 """ logger.info(f"正在启动 Knowledge Manager V2...") logger.info(f" - Contact ID: {contact_id}") logger.info(f" - Server: {server_url}") # 导入 IM Client try: sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client")) from client import IMClient except ImportError as e: logger.error(f"无法导入 IM Client: {e}") return # --- 初始化 AgentRunner --- store = FileSystemTraceStore(base_path=".trace") llm_call = create_qwen_llm_call(model=KNOWLEDGE_MANAGER_CONFIG.model) runner = AgentRunner( trace_store=store, llm_call=llm_call, debug=True ) # 加载 system prompt prompt_path = Path(__file__).parent / "knowledge_manager.prompt" prompt = SimplePrompt(prompt_path) system_messages = prompt.build_messages() # Trace 状态 current_trace_id = None message_queue = asyncio.Queue() # --- 初始化 IM Client --- client = IMClient( contact_id=contact_id, server_url=server_url, data_dir=str(Path.home() / ".knowhub" / "im_data") ) client.open_window(chat_id=chat_id) # --- 消息分类处理器 --- async def message_processor(): nonlocal current_trace_id while True: msg = await message_queue.get() sender = msg.get("sender") content = msg.get("content", "") logger.info(f"[KM] <- 收到消息: {sender}") logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}") try: # 判断消息类型 is_upload = "增量上传" in content or "最终提交" in content if is_upload: # 直接缓存,不启动 Agent await handle_upload_message(content, sender, client, chat_id) else: # 启动 Agent 处理 await handle_agent_message( content, sender, client, chat_id, runner, system_messages, current_trace_id ) except Exception as e: logger.error(f"[KM] 处理失败: {e}", exc_info=True) # 回复错误 client.send_message( chat_id=chat_id, receiver=sender, content=f"处理失败: {str(e)}" ) message_queue.task_done() async def handle_upload_message(content: str, sender: str, client, chat_id: str): """处理 upload 消息:直接缓存,不启动 Agent""" logger.info(f"[KM] 处理 upload 消息(轻量级)") # 提取 JSON 数据 try: # 找到 JSON 部分 json_start = content.find("{") if json_start == -1: raise ValueError("消息中没有 JSON 数据") json_str = content[json_start:] data = json.loads(json_str) # 直接调用缓存工具 result = await cache_research_data(data=data, source=sender) # 回复 response = result.output if hasattr(result, 'output') else str(result) client.send_message( chat_id=chat_id, receiver=sender, content=response ) logger.info(f"[KM] -> 已缓存并回复: {sender}") except Exception as e: logger.error(f"[KM] 缓存失败: {e}") client.send_message( chat_id=chat_id, receiver=sender, content=f"缓存失败: {str(e)}" ) async def handle_agent_message( content: str, sender: str, client, chat_id: str, runner, system_messages, trace_id ): """处理需要 Agent 的消息:查询、整理、入库""" logger.info(f"[KM] 处理 Agent 消息(重量级)") nonlocal current_trace_id # 构造 user message if current_trace_id is None: messages = system_messages + [{"role": "user", "content": content}] else: messages = [{"role": "user", "content": content}] config = RunConfig( model=KNOWLEDGE_MANAGER_CONFIG.model, temperature=KNOWLEDGE_MANAGER_CONFIG.temperature, max_iterations=KNOWLEDGE_MANAGER_CONFIG.max_iterations, agent_type=KNOWLEDGE_MANAGER_CONFIG.agent_type, name=KNOWLEDGE_MANAGER_CONFIG.name, goal_compression=KNOWLEDGE_MANAGER_CONFIG.goal_compression, tools=KNOWLEDGE_MANAGER_CONFIG.tools, knowledge=KNOWLEDGE_MANAGER_CONFIG.knowledge, trace_id=current_trace_id, ) # 执行 AgentRunner response_text = "" async for item in runner.run(messages=messages, config=config): if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": logger.info(f"[KM] Trace: {item.trace_id[:8]}...") elif item.status == "completed": logger.info(f"[KM] Trace 完成") elif item.status == "failed": logger.error(f"[KM] Trace 失败: {item.error_message}") elif isinstance(item, Message): if item.role == "assistant": msg_content = item.content if isinstance(msg_content, dict): text = msg_content.get("text", "") tool_calls = msg_content.get("tool_calls") if text and not tool_calls: response_text = text elif item.role == "tool": tool_content = item.content tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?" logger.info(f"[KM] 工具: {tool_name}") # 回复 if response_text: client.send_message( chat_id=chat_id, receiver=sender, content=response_text ) logger.info(f"[KM] -> 已回复: {sender}") else: logger.warning(f"[KM] AgentRunner 没有生成回复") # --- 消息回调(只入队)--- async def on_message(msg: dict): sender = msg.get("sender") if sender == contact_id: return logger.info(f"[KM] <- 入队: {sender} (队列长度: {message_queue.qsize() + 1})") await message_queue.put(msg) client.on_message(on_message, chat_id="*") # 启动消息处理器 processor_task = asyncio.create_task(message_processor()) # 启动 IM Client logger.info("✅ Knowledge Manager V2 已启动(轻量级缓存 + 按需 Agent)") try: await client.run() finally: processor_task.cancel() try: await processor_task except asyncio.CancelledError: pass if __name__ == "__main__": asyncio.run(start_knowledge_manager_v2())