"""IM Agent Tools — 供 Agent 在 tool-use loop 中调用的工具函数。 新架构:一个 Agent (contact_id) = 一个 IMClient 实例,该实例管理多个窗口 (chat_id)。 使用方式: 1. 调用 setup(contact_id) 初始化 Agent 的 IMClient 2. 调用 open_window(contact_id, chat_id) 打开窗口 3. 在每轮 loop 中调用 check_notification(contact_id, chat_id) 检查该窗口的新消息 4. 有通知时调用 receive_messages(contact_id, chat_id) 读取消息 5. 发消息调用 send_message(contact_id, chat_id, receiver, content) """ import asyncio import httpx from client import IMClient from notifier import AgentNotifier # ── 全局状态 ── _clients: dict[str, IMClient] = {} _tasks: dict[str, asyncio.Task] = {} _notifications: dict[tuple[str, str], dict] = {} # (contact_id, chat_id) -> 通知 class _ToolNotifier(AgentNotifier): """内部通知器:按 (contact_id, chat_id) 分发通知。""" def __init__(self, contact_id: str, chat_id: str): self._key = (contact_id, chat_id) async def notify(self, count: int, from_contacts: list[str]): _notifications[self._key] = {"count": count, "from": from_contacts} # ── Tool 1: 初始化 Agent ── def setup(contact_id: str, server_url: str = "ws://localhost:8000", notify_interval: float = 10.0) -> str: """初始化一个 Agent 的 IMClient(一个实例管理多个窗口)。 Args: contact_id: Agent 的身份 ID server_url: Server 地址 notify_interval: 检查新消息的间隔秒数 Returns: 状态描述 """ if contact_id in _clients: return f"已连接: {contact_id}" client = IMClient(contact_id=contact_id, server_url=server_url, notify_interval=notify_interval) _clients[contact_id] = client loop = asyncio.get_event_loop() _tasks[contact_id] = loop.create_task(client.run()) return f"已启动 IM Client: {contact_id}" def teardown(contact_id: str) -> str: """停止并移除一个 Agent 的 IMClient。""" task = _tasks.pop(contact_id, None) if task: task.cancel() _clients.pop(contact_id, None) # 清理该 contact_id 的所有通知 keys_to_remove = [k for k in _notifications if k[0] == contact_id] for k in keys_to_remove: _notifications.pop(k, None) return f"已停止: {contact_id}" # ── Tool 2: 窗口管理 ── def open_window(contact_id: str, chat_id: str | None = None) -> str: """为某个 Agent 打开一个新窗口。 Args: contact_id: Agent ID chat_id: 窗口 ID(留空自动生成) Returns: 窗口的 chat_id """ client = _clients.get(contact_id) if client is None: return f"错误: {contact_id} 未初始化" actual_chat_id = client.open_window(chat_id=chat_id, notifier=_ToolNotifier(contact_id, chat_id or "")) # 更新 notifier 的 chat_id if chat_id is None: client._notifiers[actual_chat_id] = _ToolNotifier(contact_id, actual_chat_id) return actual_chat_id def close_window(contact_id: str, chat_id: str) -> str: """关闭某个窗口。""" client = _clients.get(contact_id) if client is None: return f"错误: {contact_id} 未初始化" client.close_window(chat_id) _notifications.pop((contact_id, chat_id), None) return f"已关闭窗口: {chat_id}" def list_windows(contact_id: str) -> list[str]: """列出某个 Agent 的所有窗口。""" client = _clients.get(contact_id) if client is None: return [] return client.list_windows() # ── Tool 3: 检查通知 ── def check_notification(contact_id: str, chat_id: str) -> dict | None: """检查某个窗口是否有新消息通知。 Returns: 有新消息: {"count": 3, "from": ["alice", "bob"]} 没有新消息: None """ return _notifications.pop((contact_id, chat_id), None) # ── Tool 4: 接收消息 ── def receive_messages(contact_id: str, chat_id: str) -> list[dict]: """读取某个窗口的待处理消息,读取后自动清空。 Returns: 消息列表,每条格式: { "sender": "alice", "sender_chat_id": "...", "content": "你好", "msg_type": "chat" } """ client = _clients.get(contact_id) if client is None: return [] raw = client.read_pending(chat_id) return [ { "sender": m.get("sender", "unknown"), "sender_chat_id": m.get("sender_chat_id"), "content": m.get("content", ""), "msg_type": m.get("msg_type", "chat"), } for m in raw ] # ── Tool 5: 发送消息 ── def send_message( contact_id: str, chat_id: str, receiver: str, content: str, msg_type: str = "chat", receiver_chat_id: str | None = None, ) -> str: """从某个窗口发送消息。 Args: contact_id: 发送方 Agent ID chat_id: 发送方窗口 ID receiver: 接收方 contact_id content: 消息内容 msg_type: 消息类型 receiver_chat_id: 接收方窗口 ID(不指定则广播) Returns: 状态描述 """ client = _clients.get(contact_id) if client is None: return f"错误: {contact_id} 未初始化" client.send_message(chat_id, receiver, content, msg_type, receiver_chat_id) target = f"{receiver}:{receiver_chat_id}" if receiver_chat_id else f"{receiver}:*" return f"[{contact_id}:{chat_id}] 已发送给 {target}: {content[:50]}" # ── Tool 6: 查询联系人 ── def get_contacts(contact_id: str, server_http_url: str = "http://localhost:8000") -> dict: """查询某个 Agent 的联系人列表和在线用户。""" if contact_id not in _clients: return {"error": f"{contact_id} 未初始化"} result = {} with httpx.Client() as http: try: r = http.get(f"{server_http_url}/contacts/{contact_id}") result["contacts"] = r.json().get("contacts", []) except Exception as e: result["contacts_error"] = str(e) try: r = http.get(f"{server_http_url}/health") result["online"] = r.json().get("online", {}) except Exception as e: result["online_error"] = str(e) return result # ── Tool 7: 查询聊天历史 ── def get_chat_history(contact_id: str, chat_id: str, peer_id: str | None = None, limit: int = 20) -> list[dict]: """查询某个窗口的聊天历史。""" client = _clients.get(contact_id) if client is None: return [] return client.get_chat_history(chat_id, peer_id, limit)