| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- """
- Knowledge Manager Agent
- 基于 AgentRunner 驱动,有完整的 trace 记录。
- 通过 IM Client 事件驱动,收到消息时传给 AgentRunner 处理。
- 架构:
- IM 消息 → user message → AgentRunner → LLM 自主决策 → 工具调用 → trace 记录
- """
- import asyncio
- import json
- import logging
- import sys
- from pathlib import Path
- from typing import Optional
- # 确保项目路径可用
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.tools.builtin.knowledge import KnowledgeConfig
- logger = logging.getLogger("agents.knowledge_manager")
- # ===== 配置项 =====
- ENABLE_DATABASE_COMMIT = False # 是否允许入库(False=只缓存,True=可入库)
- # Knowledge Manager Agent 配置
- def get_knowledge_manager_config(enable_db_commit: bool = ENABLE_DATABASE_COMMIT) -> RunConfig:
- """获取 Knowledge Manager 配置(根据是否允许入库动态调整工具列表)"""
- tools = [
- "knowledge_search",
- "knowledge_save",
- "knowledge_list",
- "knowledge_update",
- "resource_save",
- "resource_get",
- "read_file",
- "write_file",
- # 本地缓存工具
- "cache_research_data",
- "organize_cached_data",
- "list_cache_status",
- ]
- # 只有启用入库时才开放 commit_to_database 工具
- if enable_db_commit:
- tools.append("commit_to_database")
- return RunConfig(
- model="qwen3.5-plus",
- temperature=0.2,
- max_iterations=50,
- agent_type="knowledge_manager",
- name="知识库管理",
- goal_compression="none",
- # 禁用所有知识提取和反思
- knowledge=KnowledgeConfig(
- enable_extraction=False,
- enable_completion_extraction=False,
- enable_injection=False,
- ),
- tools=tools,
- )
- async def start_knowledge_manager(
- contact_id: str = "knowledge_manager",
- server_url: str = "ws://localhost:58005",
- chat_id: str = "main",
- enable_db_commit: bool = ENABLE_DATABASE_COMMIT
- ):
- """
- 启动 Knowledge Manager(AgentRunner 驱动 + 事件驱动)
- 收到 IM 消息时,将消息作为 user message 传给 AgentRunner。
- LLM 自主决策调用什么工具,所有操作记录到 trace。
- Args:
- contact_id: IM 身份 ID
- server_url: IM Server 地址
- chat_id: 聊天窗口 ID
- enable_db_commit: 是否允许入库(False=只缓存,True=可入库)
- """
- logger.info(f"正在启动 Knowledge Manager...")
- logger.info(f" - Contact ID: {contact_id}")
- logger.info(f" - Server: {server_url}")
- logger.info(f" - 入库功能: {'启用' if enable_db_commit else '禁用(仅缓存)'}")
- # 注册内部工具(缓存管理)
- try:
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from internal_tools.cache_manager import (
- cache_research_data,
- organize_cached_data,
- commit_to_database,
- list_cache_status,
- )
- from agent.tools import get_tool_registry
- registry = get_tool_registry()
- registry.register(cache_research_data)
- registry.register(organize_cached_data)
- registry.register(commit_to_database)
- registry.register(list_cache_status)
- logger.info(" ✓ 已注册缓存管理工具")
- except Exception as e:
- logger.error(f" ✗ 注册缓存工具失败: {e}")
- # 导入 IM Client
- try:
- sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
- from client import IMClient
- except ImportError as e:
- logger.error(f"无法导入 IM Client: {e}")
- return
- # --- 初始化 AgentRunner ---
- store = FileSystemTraceStore(base_path=".trace")
- llm_call = create_qwen_llm_call(model=KNOWLEDGE_MANAGER_CONFIG.model)
- runner = AgentRunner(
- trace_store=store,
- llm_call=llm_call,
- debug=True,
- logger_name="agents.knowledge_manager"
- )
- # 加载 system prompt
- prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
- prompt = SimplePrompt(prompt_path)
- system_messages = prompt.build_messages()
- # Trace 状态(同一个 trace 持续追加,保持上下文)
- current_trace_id = None
- message_queue = asyncio.Queue() # 消息队列,防止丢消息
- upload_buffer = [] # upload 消息缓冲区(用于批处理)
- upload_timer = None # 延迟处理定时器
- # --- 初始化 IM Client ---
- client = IMClient(
- contact_id=contact_id,
- server_url=server_url,
- data_dir=str(Path.home() / ".knowhub" / "im_data")
- )
- # 静默 notifier(消息由 on_message 回调处理,不需要 notifier 通知)
- class _SilentNotifier:
- async def notify(self, count, from_contacts):
- pass
- # 打开窗口
- client.open_window(chat_id=chat_id, notifier=_SilentNotifier())
- # --- 消息处理器(从队列中取消息,逐条处理)---
- async def message_processor():
- nonlocal current_trace_id
- while True:
- msg = await message_queue.get() # 阻塞等待,零消耗
- sender = msg.get("sender")
- content = msg.get("content", "")
- # 处理完消息后清空 pending,防止 notifier 反复通知
- client.read_pending(chat_id)
- logger.info(f"[KM] <- 处理消息: {sender}")
- logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
- try:
- # 续跑同一个 trace(首次为 None 会新建,后续复用)
- if current_trace_id is None:
- messages = system_messages + [{"role": "user", "content": content}]
- else:
- messages = [{"role": "user", "content": content}]
- # 获取配置
- km_config = get_knowledge_manager_config(enable_db_commit)
- config = RunConfig(
- model=km_config.model,
- temperature=km_config.temperature,
- max_iterations=km_config.max_iterations,
- agent_type=km_config.agent_type,
- name=km_config.name,
- goal_compression=km_config.goal_compression,
- tools=km_config.tools,
- knowledge=km_config.knowledge,
- trace_id=current_trace_id, # 复用 trace,保持完整生命周期
- context={
- "km_queue_size": message_queue.qsize(), # 当前队列中待处理消息数
- "current_sender": sender, # 当前处理的消息来源
- }
- )
- # 执行 AgentRunner
- response_text = ""
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
- elif item.status == "completed":
- logger.info(f"[KM] Trace 完成 (消息: {item.total_messages})")
- elif item.status == "failed":
- logger.error(f"[KM] Trace 失败: {item.error_message}")
- elif isinstance(item, Message):
- if item.role == "assistant":
- msg_content = item.content
- if isinstance(msg_content, dict):
- text = msg_content.get("text", "")
- tool_calls = msg_content.get("tool_calls")
- if text:
- # 始终记录最新的文本(最后一条就是最终回复)
- response_text = text
- if tool_calls:
- logger.info(f"[KM] 思考: {text[:80]}...")
- elif isinstance(msg_content, str) and msg_content:
- response_text = msg_content
- elif item.role == "tool":
- tool_content = item.content
- tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
- logger.info(f"[KM] 工具: {tool_name}")
- # 回复
- if response_text:
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=response_text
- )
- logger.info(f"[KM] -> 已回复: {sender} ({len(response_text)} 字符)")
- else:
- logger.warning(f"[KM] AgentRunner 没有生成回复")
- except Exception as e:
- logger.error(f"[KM] 处理失败: {e}", exc_info=True)
- message_queue.task_done()
- # --- 批处理 upload 消息 ---
- async def process_upload_batch():
- """批处理 upload 消息(延迟 5 秒,合并多个 upload)"""
- nonlocal upload_buffer, upload_timer
- await asyncio.sleep(5) # 等待 5 秒,收集更多 upload
- if not upload_buffer:
- upload_timer = None
- return
- # 合并所有 upload 消息
- batch = upload_buffer.copy()
- upload_buffer.clear()
- upload_timer = None
- logger.info(f"[KM] 批处理 {len(batch)} 条 upload 消息")
- # 合并成一条消息入队
- merged_content = f"[UPLOAD:BATCH] 收到 {len(batch)} 条上传请求,请合并处理:\n"
- for i, msg in enumerate(batch, 1):
- merged_content += f"\n--- 第 {i} 条 ---\n{msg['content']}\n"
- await message_queue.put({
- "sender": batch[0]["sender"],
- "content": merged_content
- })
- # --- 快速处理 ask 查询(不经过队列)---
- async def handle_ask_query(sender: str, content: str):
- """快速响应 ask 查询,不阻塞 upload 处理"""
- try:
- logger.info(f"[KM] <- 快速查询: {sender}")
- # 1. 查询数据库
- from agent.tools.builtin.knowledge import knowledge_search
- db_result = await knowledge_search(query=content, top_k=5, min_score=3)
- # 2. 读取缓存(正在处理的 upload 数据)
- from knowhub.internal_tools.cache_manager import list_cache_status
- cache_status = await list_cache_status()
- # 3. 组合回复
- response_parts = []
- if db_result.output and db_result.output != "未找到相关知识":
- response_parts.append("## 数据库中的知识\n\n" + db_result.output)
- if cache_status.metadata and cache_status.metadata.get("files"):
- cache_files = cache_status.metadata["files"]
- if cache_files:
- response_parts.append(
- f"## 缓存中的数据\n\n"
- f"正在处理 {len(cache_files)} 个缓存文件,包含最新调研数据(尚未入库)。\n"
- f"如需查看详情,请稍后再次查询。"
- )
- if not response_parts:
- response_parts.append("暂无相关知识,数据库和缓存均为空。")
- response_text = "\n\n".join(response_parts)
- # 4. 立即回复
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=response_text
- )
- logger.info(f"[KM] -> 快速回复: {sender} ({len(response_text)} 字符)")
- except Exception as e:
- logger.error(f"[KM] 快速查询失败: {e}", exc_info=True)
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=f"查询失败: {e}"
- )
- # --- 消息回调(ask 快速通道,upload 批处理)---
- async def on_message(msg: dict):
- nonlocal upload_timer
- sender = msg.get("sender")
- content = msg.get("content", "")
- # 忽略自己发的消息
- if sender == contact_id:
- return
- # 判断消息类型(根据前缀标记)
- if content.startswith("[ASK]"):
- # 快速通道:立即处理,不入队
- query = content[5:].strip() # 去掉 [ASK] 前缀
- logger.info(f"[KM] <- 收到查询消息: {sender} (快速通道)")
- asyncio.create_task(handle_ask_query(sender, query))
- elif content.startswith("[UPLOAD"):
- # 批处理:加入缓冲区,延迟处理
- logger.info(f"[KM] <- 收到上传消息: {sender} (加入批处理缓冲区)")
- upload_buffer.append(msg)
- # 启动或重置定时器
- if upload_timer:
- upload_timer.cancel()
- upload_timer = asyncio.create_task(process_upload_batch())
- else:
- # 其他消息:正常入队
- logger.info(f"[KM] <- 收到消息: {sender} (入队,队列长度: {message_queue.qsize() + 1})")
- await message_queue.put(msg)
- client.on_message(on_message, chat_id="*")
- # 启动消息处理器(后台任务)
- processor_task = asyncio.create_task(message_processor())
- # 启动 IM Client(事件驱动)
- logger.info("✅ Knowledge Manager 已启动(AgentRunner 驱动)")
- try:
- await client.run()
- finally:
- processor_task.cancel()
- try:
- await processor_task
- except asyncio.CancelledError:
- pass
|