""" Knowledge Manager 工具 - 通过 IM 与知识库管理 Agent 交互 提供两个工具: - ask_knowledge: 查询知识库(同步,等待回复) - upload_knowledge: 上传调研结果(异步,立即返回) 依赖:IM Client 已初始化(im_setup + im_open_window) """ import asyncio import json import logging from typing import Optional, Dict, Any, List from agent.tools import tool, ToolResult, ToolContext logger = logging.getLogger(__name__) # IM 工具的全局引用(延迟导入) _im_chat = None def _get_im_chat(): global _im_chat if _im_chat is None: from agent.tools.builtin.im import chat as im_chat_module _im_chat = im_chat_module return _im_chat def _get_client_and_chat_id(contact_id: str): """获取 IM Client 实例和当前窗口的 chat_id""" im = _get_im_chat() client = im._clients.get(contact_id) if client is None: return None, None # 取第一个打开的窗口 windows = client.list_windows() chat_id = windows[0] if windows else None return client, chat_id def _clear_notifications(contact_id: str, chat_id: str): """清空 IM 通知计数""" im = _get_im_chat() im._notifications.pop((contact_id, chat_id), None) @tool( hidden_params=["context"], inject_params={ "contact_id": {"mode": "default", "key": "im_contact_id"}, } ) async def ask_knowledge( query: str, contact_id: str = "agent_research", context: Optional[ToolContext] = None, ) -> ToolResult: """ 向 Knowledge Manager 查询知识库信息(同步,等待回复) Args: query: 查询内容(如:"查询 ControlNet 相关信息") contact_id: 当前 Agent 的 IM contact_id context: 工具上下文 Returns: 查询结果(已有工具、资源、知识及建议) """ try: client, chat_id = _get_client_and_chat_id(contact_id) if client is None or chat_id is None: return ToolResult( title="❌ 查询失败", output="IM Client 未初始化,请先调用 im_setup", error="im_not_initialized" ) # 发送查询(带类型标记) message = f"[ASK] {query}" client.send_message( chat_id=chat_id, receiver="knowledge_manager", content=message ) # 等待回复(最多 30 秒) for _ in range(30): await asyncio.sleep(1) pending = client.read_pending(chat_id) for msg in pending: if msg.get("sender") == "knowledge_manager": content = msg.get("content", "") # 清空 IM 通知计数,防止 notifier 反复提醒 _clear_notifications(contact_id, chat_id) return ToolResult( title="📚 知识库查询结果", output=content, metadata={"source": "knowledge_manager"} ) # 超时保底:直接调用 knowledge_search 返回原始结果 logger.warning("Knowledge Manager 超时,fallback 到 knowledge_search") from agent.tools.builtin.knowledge import knowledge_search fallback_result = await knowledge_search(query=query, top_k=5, min_score=3) return ToolResult( title="📚 知识库查询结果(直连)", output=fallback_result.output, metadata={"source": "fallback", "raw": fallback_result.metadata} ) except Exception as e: logger.error(f"查询知识库失败: {e}") return ToolResult( title="❌ 查询失败", output=f"错误: {str(e)}", error=str(e) ) @tool( hidden_params=["context"], inject_params={ "contact_id": {"mode": "default", "key": "im_contact_id"}, } ) async def upload_knowledge( data: Dict[str, Any], finalize: bool = False, contact_id: str = "agent_research", context: Optional[ToolContext] = None, ) -> ToolResult: """ 上传调研结果到知识库(异步,立即返回) Args: data: 调研结果,包含: - tools: 工具列表 - resources: 资源列表 - knowledge: 知识列表 finalize: 是否最终提交(True=入库,False=仅缓冲) contact_id: 当前 Agent 的 IM contact_id context: 工具上下文 Returns: 上传确认(立即返回,不等待处理完成) """ try: client, chat_id = _get_client_and_chat_id(contact_id) if client is None or chat_id is None: return ToolResult( title="❌ 上传失败", output="IM Client 未初始化,请先调用 im_setup", error="im_not_initialized" ) # 构造消息(带类型标记) if finalize: action = "最终提交" message = f"[UPLOAD:FINALIZE] {json.dumps(data, ensure_ascii=False)}" else: action = "增量上传" message = f"[UPLOAD] {json.dumps(data, ensure_ascii=False)}" # 发送(不等待回复) client.send_message( chat_id=chat_id, receiver="knowledge_manager", content=message ) # 等待一小段时间让 KM 回复,然后清空 pending(避免 notifier 反复通知) await asyncio.sleep(0.5) client.read_pending(chat_id) # 清空回复,不处理内容 summary = [] if data.get("tools"): summary.append(f"工具: {len(data['tools'])} 个") if data.get("resources"): summary.append(f"资源: {len(data['resources'])} 个") if data.get("knowledge"): summary.append(f"知识: {len(data['knowledge'])} 个") return ToolResult( title=f"✅ {action}成功", output=f"已发送到 Knowledge Manager\n\n" + "\n".join(f"- {s}" for s in summary), long_term_memory=f"{action}: {', '.join(summary)}", metadata={"finalize": finalize} ) except Exception as e: logger.error(f"上传知识失败: {e}") return ToolResult( title="❌ 上传失败", output=f"错误: {str(e)}", error=str(e) )