| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- """
- Knowledge Manager V2 - 轻量级缓存 + 按需 Agent
- 架构:
- 1. 收到 upload 消息 → 直接调用 cache_research_data,不启动 Agent
- 2. 收到 query/organize 消息 → 启动 Agent 处理
- 3. 消息队列保证不丢消息
- 优势:
- - upload 操作轻量快速
- - 只在需要时才启动 Agent(查询、整理、入库)
- """
- import asyncio
- import json
- import logging
- import sys
- from pathlib import Path
- from typing import Optional
- # 确保项目路径可用
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from agent.core.runner import AgentRunner, RunConfig
- from agent.trace import FileSystemTraceStore, Trace, Message
- from agent.llm import create_qwen_llm_call
- from agent.llm.prompts import SimplePrompt
- from agent.tools.builtin.knowledge import KnowledgeConfig
- # 导入缓存工具
- from knowhub.internal_tools.cache_manager import cache_research_data
- logger = logging.getLogger("agents.knowledge_manager_v2")
- # Knowledge Manager Agent 配置
- KNOWLEDGE_MANAGER_CONFIG = RunConfig(
- model="qwen3.5-plus",
- temperature=0.2,
- max_iterations=50,
- agent_type="knowledge_manager",
- name="知识库管理",
- goal_compression="none",
- knowledge=KnowledgeConfig(
- enable_extraction=False,
- enable_completion_extraction=False,
- enable_injection=False,
- ),
- tools=[
- "knowledge_search",
- "knowledge_save",
- "knowledge_list",
- "knowledge_update",
- "resource_save",
- "resource_get",
- "read_file",
- "write_file",
- "cache_research_data",
- "organize_cached_data",
- "commit_to_database",
- "list_cache_status",
- ],
- )
- async def start_knowledge_manager_v2(
- contact_id: str = "knowledge_manager",
- server_url: str = "ws://localhost:58005",
- chat_id: str = "main"
- ):
- """
- 启动 Knowledge Manager V2(轻量级缓存 + 按需 Agent)
- 消息类型:
- 1. upload 消息 → 直接缓存,不启动 Agent
- 2. query/organize/commit 消息 → 启动 Agent 处理
- """
- logger.info(f"正在启动 Knowledge Manager V2...")
- logger.info(f" - Contact ID: {contact_id}")
- logger.info(f" - Server: {server_url}")
- # 导入 IM Client
- try:
- sys.path.insert(0, str(Path(__file__).parent.parent.parent / "im-client"))
- from client import IMClient
- except ImportError as e:
- logger.error(f"无法导入 IM Client: {e}")
- return
- # --- 初始化 AgentRunner ---
- store = FileSystemTraceStore(base_path=".trace")
- llm_call = create_qwen_llm_call(model=KNOWLEDGE_MANAGER_CONFIG.model)
- runner = AgentRunner(
- trace_store=store,
- llm_call=llm_call,
- debug=True
- )
- # 加载 system prompt
- prompt_path = Path(__file__).parent / "knowledge_manager.prompt"
- prompt = SimplePrompt(prompt_path)
- system_messages = prompt.build_messages()
- # Trace 状态
- current_trace_id = None
- message_queue = asyncio.Queue()
- # --- 初始化 IM Client ---
- client = IMClient(
- contact_id=contact_id,
- server_url=server_url,
- data_dir=str(Path.home() / ".knowhub" / "im_data")
- )
- client.open_window(chat_id=chat_id)
- # --- 消息分类处理器 ---
- async def message_processor():
- nonlocal current_trace_id
- while True:
- msg = await message_queue.get()
- sender = msg.get("sender")
- content = msg.get("content", "")
- logger.info(f"[KM] <- 收到消息: {sender}")
- logger.info(f"[KM] 内容: {content[:120]}{'...' if len(content) > 120 else ''}")
- try:
- # 判断消息类型
- is_upload = "增量上传" in content or "最终提交" in content
- if is_upload:
- # 直接缓存,不启动 Agent
- await handle_upload_message(content, sender, client, chat_id)
- else:
- # 启动 Agent 处理
- await handle_agent_message(
- content, sender, client, chat_id,
- runner, system_messages, current_trace_id
- )
- except Exception as e:
- logger.error(f"[KM] 处理失败: {e}", exc_info=True)
- # 回复错误
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=f"处理失败: {str(e)}"
- )
- message_queue.task_done()
- async def handle_upload_message(content: str, sender: str, client, chat_id: str):
- """处理 upload 消息:直接缓存,不启动 Agent"""
- logger.info(f"[KM] 处理 upload 消息(轻量级)")
- # 提取 JSON 数据
- try:
- # 找到 JSON 部分
- json_start = content.find("{")
- if json_start == -1:
- raise ValueError("消息中没有 JSON 数据")
- json_str = content[json_start:]
- data = json.loads(json_str)
- # 直接调用缓存工具
- result = await cache_research_data(data=data, source=sender)
- # 回复
- response = result.output if hasattr(result, 'output') else str(result)
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=response
- )
- logger.info(f"[KM] -> 已缓存并回复: {sender}")
- except Exception as e:
- logger.error(f"[KM] 缓存失败: {e}")
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=f"缓存失败: {str(e)}"
- )
- async def handle_agent_message(
- content: str, sender: str, client, chat_id: str,
- runner, system_messages, trace_id
- ):
- """处理需要 Agent 的消息:查询、整理、入库"""
- logger.info(f"[KM] 处理 Agent 消息(重量级)")
- nonlocal current_trace_id
- # 构造 user message
- if current_trace_id is None:
- messages = system_messages + [{"role": "user", "content": content}]
- else:
- messages = [{"role": "user", "content": content}]
- config = RunConfig(
- model=KNOWLEDGE_MANAGER_CONFIG.model,
- temperature=KNOWLEDGE_MANAGER_CONFIG.temperature,
- max_iterations=KNOWLEDGE_MANAGER_CONFIG.max_iterations,
- agent_type=KNOWLEDGE_MANAGER_CONFIG.agent_type,
- name=KNOWLEDGE_MANAGER_CONFIG.name,
- goal_compression=KNOWLEDGE_MANAGER_CONFIG.goal_compression,
- tools=KNOWLEDGE_MANAGER_CONFIG.tools,
- knowledge=KNOWLEDGE_MANAGER_CONFIG.knowledge,
- trace_id=current_trace_id,
- )
- # 执行 AgentRunner
- response_text = ""
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- logger.info(f"[KM] Trace: {item.trace_id[:8]}...")
- elif item.status == "completed":
- logger.info(f"[KM] Trace 完成")
- elif item.status == "failed":
- logger.error(f"[KM] Trace 失败: {item.error_message}")
- elif isinstance(item, Message):
- if item.role == "assistant":
- msg_content = item.content
- if isinstance(msg_content, dict):
- text = msg_content.get("text", "")
- tool_calls = msg_content.get("tool_calls")
- if text and not tool_calls:
- response_text = text
- elif item.role == "tool":
- tool_content = item.content
- tool_name = tool_content.get("tool_name", "?") if isinstance(tool_content, dict) else "?"
- logger.info(f"[KM] 工具: {tool_name}")
- # 回复
- if response_text:
- client.send_message(
- chat_id=chat_id,
- receiver=sender,
- content=response_text
- )
- logger.info(f"[KM] -> 已回复: {sender}")
- else:
- logger.warning(f"[KM] AgentRunner 没有生成回复")
- # --- 消息回调(只入队)---
- async def on_message(msg: dict):
- sender = msg.get("sender")
- if sender == contact_id:
- return
- logger.info(f"[KM] <- 入队: {sender} (队列长度: {message_queue.qsize() + 1})")
- await message_queue.put(msg)
- client.on_message(on_message, chat_id="*")
- # 启动消息处理器
- processor_task = asyncio.create_task(message_processor())
- # 启动 IM Client
- logger.info("✅ Knowledge Manager V2 已启动(轻量级缓存 + 按需 Agent)")
- try:
- await client.run()
- finally:
- processor_task.cancel()
- try:
- await processor_task
- except asyncio.CancelledError:
- pass
- if __name__ == "__main__":
- asyncio.run(start_knowledge_manager_v2())
|