| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- """
- Knowledge Manager 工具 - 通过 IM 与知识库管理 Agent 交互
- 提供两个工具:
- - ask_knowledge: 查询知识库(同步,等待回复)
- - upload_knowledge: 上传调研结果(异步,立即返回)
- 依赖:IM Client 已初始化(im_setup + im_open_window)
- """
- import asyncio
- import json
- import logging
- from typing import Optional, Dict, Any, List
- from agent.tools import tool, ToolResult, ToolContext
- logger = logging.getLogger(__name__)
- # IM 工具的全局引用(延迟导入)
- _im_chat = None
- def _get_im_chat():
- global _im_chat
- if _im_chat is None:
- from agent.tools.builtin.im import chat as im_chat_module
- _im_chat = im_chat_module
- return _im_chat
- def _get_client_and_chat_id(contact_id: str):
- """获取 IM Client 实例和当前窗口的 chat_id"""
- im = _get_im_chat()
- client = im._clients.get(contact_id)
- if client is None:
- return None, None
- # 取第一个打开的窗口
- windows = client.list_windows()
- chat_id = windows[0] if windows else None
- return client, chat_id
- def _clear_notifications(contact_id: str, chat_id: str):
- """清空 IM 通知计数"""
- im = _get_im_chat()
- im._notifications.pop((contact_id, chat_id), None)
- @tool(
- hidden_params=["context"],
- inject_params={
- "contact_id": {"mode": "default", "key": "im_contact_id"},
- }
- )
- async def ask_knowledge(
- query: str,
- contact_id: str = "agent_research",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 向 Knowledge Manager 查询知识库信息(同步,等待回复)
- Args:
- query: 查询内容(如:"查询 ControlNet 相关信息")
- contact_id: 当前 Agent 的 IM contact_id
- context: 工具上下文
- Returns:
- 查询结果(已有工具、资源、知识及建议)
- """
- try:
- client, chat_id = _get_client_and_chat_id(contact_id)
- if client is None or chat_id is None:
- return ToolResult(
- title="❌ 查询失败",
- output="IM Client 未初始化,请先调用 im_setup",
- error="im_not_initialized"
- )
- # 发送查询(带类型标记)
- message = f"[ASK] {query}"
- client.send_message(
- chat_id=chat_id,
- receiver="knowledge_manager",
- content=message
- )
- # 等待回复(最多 30 秒)
- for _ in range(30):
- await asyncio.sleep(1)
- pending = client.read_pending(chat_id)
- for msg in pending:
- if msg.get("sender") == "knowledge_manager":
- content = msg.get("content", "")
- # 清空 IM 通知计数,防止 notifier 反复提醒
- _clear_notifications(contact_id, chat_id)
- return ToolResult(
- title="📚 知识库查询结果",
- output=content,
- metadata={"source": "knowledge_manager"}
- )
- # 超时保底:直接调用 knowledge_search 返回原始结果
- logger.warning("Knowledge Manager 超时,fallback 到 knowledge_search")
- from agent.tools.builtin.knowledge import knowledge_search
- fallback_result = await knowledge_search(query=query, top_k=5, min_score=3)
- return ToolResult(
- title="📚 知识库查询结果(直连)",
- output=fallback_result.output,
- metadata={"source": "fallback", "raw": fallback_result.metadata}
- )
- except Exception as e:
- logger.error(f"查询知识库失败: {e}")
- return ToolResult(
- title="❌ 查询失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
- @tool(
- hidden_params=["context"],
- inject_params={
- "contact_id": {"mode": "default", "key": "im_contact_id"},
- }
- )
- async def upload_knowledge(
- data: Dict[str, Any],
- finalize: bool = False,
- contact_id: str = "agent_research",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 上传调研结果到知识库(异步,立即返回)
- Args:
- data: 调研结果,包含:
- - tools: 工具列表
- - resources: 资源列表
- - knowledge: 知识列表
- finalize: 是否最终提交(True=入库,False=仅缓冲)
- contact_id: 当前 Agent 的 IM contact_id
- context: 工具上下文
- Returns:
- 上传确认(立即返回,不等待处理完成)
- """
- try:
- client, chat_id = _get_client_and_chat_id(contact_id)
- if client is None or chat_id is None:
- return ToolResult(
- title="❌ 上传失败",
- output="IM Client 未初始化,请先调用 im_setup",
- error="im_not_initialized"
- )
- # 构造消息(带类型标记)
- if finalize:
- action = "最终提交"
- message = f"[UPLOAD:FINALIZE] {json.dumps(data, ensure_ascii=False)}"
- else:
- action = "增量上传"
- message = f"[UPLOAD] {json.dumps(data, ensure_ascii=False)}"
- # 发送(不等待回复)
- client.send_message(
- chat_id=chat_id,
- receiver="knowledge_manager",
- content=message
- )
- # 等待一小段时间让 KM 回复,然后清空 pending(避免 notifier 反复通知)
- await asyncio.sleep(0.5)
- client.read_pending(chat_id) # 清空回复,不处理内容
- summary = []
- if data.get("tools"):
- summary.append(f"工具: {len(data['tools'])} 个")
- if data.get("resources"):
- summary.append(f"资源: {len(data['resources'])} 个")
- if data.get("knowledge"):
- summary.append(f"知识: {len(data['knowledge'])} 个")
- return ToolResult(
- title=f"✅ {action}成功",
- output=f"已发送到 Knowledge Manager\n\n" + "\n".join(f"- {s}" for s in summary),
- long_term_memory=f"{action}: {', '.join(summary)}",
- metadata={"finalize": finalize}
- )
- except Exception as e:
- logger.error(f"上传知识失败: {e}")
- return ToolResult(
- title="❌ 上传失败",
- output=f"错误: {str(e)}",
- error=str(e)
- )
|