"""IM 工具 — 将 im-client 接入 Agent 框架。 新架构:一个 Agent (contact_id) = 一个 IMClient 实例,该实例管理多个窗口 (chat_id)。 """ import asyncio import json import logging import os import sys from typing import Optional from agent.tools import tool, ToolResult, ToolContext # 将 im-client 目录加入 sys.path _IM_CLIENT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "im-client")) if _IM_CLIENT_DIR not in sys.path: sys.path.insert(0, _IM_CLIENT_DIR) from client import IMClient # noqa: E402 from notifier import AgentNotifier # noqa: E402 logger = logging.getLogger(__name__) # ── 全局状态 ── _clients: dict[str, IMClient] = {} _tasks: dict[str, asyncio.Task] = {} _notifications: dict[tuple[str, str], dict] = {} # (contact_id, chat_id) -> 通知 class _ToolNotifier(AgentNotifier): """内部通知器:按 (contact_id, chat_id) 分发通知。""" def __init__(self, contact_id: str, chat_id: str): self._key = (contact_id, chat_id) async def notify(self, count: int, from_contacts: list[str]): _notifications[self._key] = {"count": count, "from": from_contacts} # ── Tool 1: 初始化连接 ── @tool( hidden_params=["context"], display={ "zh": {"name": "初始化 IM 连接", "params": {"contact_id": "你的身份 ID", "server_url": "服务器地址"}}, "en": {"name": "Setup IM Connection", "params": {"contact_id": "Your identity ID", "server_url": "Server URL"}}, } ) async def im_setup( contact_id: str, server_url: str = "ws://localhost:8000", notify_interval: float = 10.0, context: Optional[ToolContext] = None, ) -> ToolResult: """初始化 IM Client 并连接 Server。 Args: contact_id: 你在 IM 系统中的身份 ID server_url: Server 的 WebSocket 地址 notify_interval: 检查新消息的间隔秒数 """ if contact_id in _clients: return ToolResult(title="IM 已连接", output=f"已连接: {contact_id}") client = IMClient(contact_id=contact_id, server_url=server_url, notify_interval=notify_interval) _clients[contact_id] = client loop = asyncio.get_event_loop() _tasks[contact_id] = loop.create_task(client.run()) # 等待一小段时间,让连接尝试开始(非阻塞) await asyncio.sleep(0.5) return ToolResult(title="IM 连接成功", output=f"已启动 IM Client: {contact_id},后台连接中...") # ── Tool 2: 窗口管理 ── @tool( hidden_params=["context"], display={ "zh": {"name": "打开 IM 窗口", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}}, "en": {"name": "Open IM Window", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}}, } ) async def im_open_window( contact_id: str, chat_id: str | None = None, context: Optional[ToolContext] = None, ) -> ToolResult: """打开一个新的聊天窗口。 Args: contact_id: Agent ID chat_id: 窗口 ID(留空自动生成) """ client = _clients.get(contact_id) if client is None: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") actual_chat_id = client.open_window(chat_id=chat_id, notifier=_ToolNotifier(contact_id, chat_id or "")) if chat_id is None: client._notifiers[actual_chat_id] = _ToolNotifier(contact_id, actual_chat_id) return ToolResult(title="窗口已打开", output=f"窗口 ID: {actual_chat_id}") @tool( hidden_params=["context"], display={ "zh": {"name": "关闭 IM 窗口", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}}, "en": {"name": "Close IM Window", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}}, } ) async def im_close_window( contact_id: str, chat_id: str, context: Optional[ToolContext] = None, ) -> ToolResult: """关闭一个聊天窗口。 Args: contact_id: Agent ID chat_id: 窗口 ID """ client = _clients.get(contact_id) if client is None: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") client.close_window(chat_id) _notifications.pop((contact_id, chat_id), None) return ToolResult(title="窗口已关闭", output=f"已关闭窗口: {chat_id}") # ── Tool 3: 检查通知 ── @tool( hidden_params=["context"], display={ "zh": {"name": "检查 IM 新消息通知", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}}, "en": {"name": "Check IM Notifications", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}}, } ) async def im_check_notification( contact_id: str, chat_id: str, context: Optional[ToolContext] = None, ) -> ToolResult: """检查某个窗口是否有新消息通知。 Args: contact_id: Agent ID chat_id: 窗口 ID """ notification = _notifications.pop((contact_id, chat_id), None) if notification is None: return ToolResult(title="无新消息", output="当前没有新消息通知") return ToolResult( title=f"有 {notification['count']} 条新消息", output=json.dumps(notification, ensure_ascii=False), metadata=notification, ) # ── Tool 4: 接收消息 ── @tool( hidden_params=["context"], display={ "zh": {"name": "接收 IM 消息", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}}, "en": {"name": "Receive IM Messages", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}}, } ) async def im_receive_messages( contact_id: str, chat_id: str, context: Optional[ToolContext] = None, ) -> ToolResult: """读取某个窗口的待处理消息。 Args: contact_id: Agent ID chat_id: 窗口 ID """ client = _clients.get(contact_id) if client is None: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") raw = client.read_pending(chat_id) if not raw: return ToolResult(title="无待处理消息", output="[]") messages = [ { "sender": m.get("sender", "unknown"), "sender_chat_id": m.get("sender_chat_id"), "content": m.get("content", ""), "msg_type": m.get("msg_type", "chat"), } for m in raw ] return ToolResult( title=f"收到 {len(messages)} 条消息", output=json.dumps(messages, ensure_ascii=False, indent=2), metadata={"messages": messages}, ) # ── Tool 5: 发送消息 ── @tool( hidden_params=["context"], display={ "zh": {"name": "发送 IM 消息", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID", "receiver": "接收者 ID", "content": "消息内容"}}, "en": {"name": "Send IM Message", "params": {"contact_id": "Agent ID", "chat_id": "Window ID", "receiver": "Receiver ID", "content": "Message content"}}, } ) async def im_send_message( contact_id: str, chat_id: str, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None, context: Optional[ToolContext] = None, ) -> ToolResult: """从某个窗口发送消息。 Args: contact_id: 发送方 Agent ID chat_id: 发送方窗口 ID receiver: 接收方 contact_id content: 消息内容 msg_type: 消息类型 receiver_chat_id: 接收方窗口 ID(不指定则广播) """ client = _clients.get(contact_id) if client is None: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") client.send_message(chat_id, receiver, content, msg_type, receiver_chat_id) target = f"{receiver}:{receiver_chat_id}" if receiver_chat_id else f"{receiver}:*" return ToolResult( title=f"已发送给 {target}", output=f"[{contact_id}:{chat_id}] 已发送给 {target}: {content[:50]}", ) # ── Tool 6: 查询联系人 ── @tool( hidden_params=["context"], display={ "zh": {"name": "查询 IM 联系人", "params": {"contact_id": "Agent ID", "server_http_url": "服务器 HTTP 地址"}}, "en": {"name": "Get IM Contacts", "params": {"contact_id": "Agent ID", "server_http_url": "Server HTTP URL"}}, } ) async def im_get_contacts( contact_id: str, server_http_url: str = "http://localhost:8000", context: Optional[ToolContext] = None, ) -> ToolResult: """查询联系人列表和当前在线用户。 Args: contact_id: Agent ID server_http_url: Server 的 HTTP 地址 """ if contact_id not in _clients: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") import httpx result = {} async with httpx.AsyncClient() as http: try: r = await http.get(f"{server_http_url}/contacts/{contact_id}") result["contacts"] = r.json().get("contacts", []) except Exception as e: result["contacts_error"] = str(e) try: r = await http.get(f"{server_http_url}/health") result["online"] = r.json().get("online", {}) except Exception as e: result["online_error"] = str(e) return ToolResult( title="联系人查询完成", output=json.dumps(result, ensure_ascii=False, indent=2), metadata=result, ) # ── Tool 7: 查询聊天历史 ── @tool( hidden_params=["context"], display={ "zh": {"name": "查询 IM 聊天历史", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID", "peer_id": "联系人 ID", "limit": "最大条数"}}, "en": {"name": "Get IM Chat History", "params": {"contact_id": "Agent ID", "chat_id": "Window ID", "peer_id": "Contact ID", "limit": "Max records"}}, } ) async def im_get_chat_history( contact_id: str, chat_id: str, peer_id: str | None = None, limit: int = 20, context: Optional[ToolContext] = None, ) -> ToolResult: """查询某个窗口的聊天历史。 Args: contact_id: Agent ID chat_id: 窗口 ID peer_id: 筛选与某个联系人的聊天(留空返回所有) limit: 最多返回条数 """ client = _clients.get(contact_id) if client is None: return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化") messages = client.get_chat_history(chat_id, peer_id, limit) return ToolResult( title=f"查到 {len(messages)} 条记录", output=json.dumps(messages, ensure_ascii=False, indent=2), metadata={"messages": messages}, )