| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- """IM Agent Tools — 供 Agent 在 tool-use loop 中调用的工具函数。
- 新架构:一个 Agent (contact_id) = 一个 IMClient 实例,该实例管理多个窗口 (chat_id)。
- 使用方式:
- 1. 调用 setup(contact_id) 初始化 Agent 的 IMClient
- 2. 调用 open_window(contact_id, chat_id) 打开窗口
- 3. 在每轮 loop 中调用 check_notification(contact_id, chat_id) 检查该窗口的新消息
- 4. 有通知时调用 receive_messages(contact_id, chat_id) 读取消息
- 5. 发消息调用 send_message(contact_id, chat_id, receiver, content)
- """
- import asyncio
- import httpx
- from client import IMClient
- from notifier import AgentNotifier
- # ── 全局状态 ──
- _clients: dict[str, IMClient] = {}
- _tasks: dict[str, asyncio.Task] = {}
- _notifications: dict[tuple[str, str], dict] = {} # (contact_id, chat_id) -> 通知
- class _ToolNotifier(AgentNotifier):
- """内部通知器:按 (contact_id, chat_id) 分发通知。"""
- def __init__(self, contact_id: str, chat_id: str):
- self._key = (contact_id, chat_id)
- async def notify(self, count: int, from_contacts: list[str]):
- _notifications[self._key] = {"count": count, "from": from_contacts}
- # ── Tool 1: 初始化 Agent ──
- def setup(contact_id: str, server_url: str = "ws://localhost:8000", notify_interval: float = 10.0) -> str:
- """初始化一个 Agent 的 IMClient(一个实例管理多个窗口)。
- Args:
- contact_id: Agent 的身份 ID
- server_url: Server 地址
- notify_interval: 检查新消息的间隔秒数
- Returns:
- 状态描述
- """
- if contact_id in _clients:
- return f"已连接: {contact_id}"
- client = IMClient(contact_id=contact_id, server_url=server_url, notify_interval=notify_interval)
- _clients[contact_id] = client
- loop = asyncio.get_event_loop()
- _tasks[contact_id] = loop.create_task(client.run())
- return f"已启动 IM Client: {contact_id}"
- def teardown(contact_id: str) -> str:
- """停止并移除一个 Agent 的 IMClient。"""
- task = _tasks.pop(contact_id, None)
- if task:
- task.cancel()
- _clients.pop(contact_id, None)
- # 清理该 contact_id 的所有通知
- keys_to_remove = [k for k in _notifications if k[0] == contact_id]
- for k in keys_to_remove:
- _notifications.pop(k, None)
- return f"已停止: {contact_id}"
- # ── Tool 2: 窗口管理 ──
- def open_window(contact_id: str, chat_id: str | None = None) -> str:
- """为某个 Agent 打开一个新窗口。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID(留空自动生成)
- Returns:
- 窗口的 chat_id
- """
- client = _clients.get(contact_id)
- if client is None:
- return f"错误: {contact_id} 未初始化"
- actual_chat_id = client.open_window(chat_id=chat_id, notifier=_ToolNotifier(contact_id, chat_id or ""))
- # 更新 notifier 的 chat_id
- if chat_id is None:
- client._notifiers[actual_chat_id] = _ToolNotifier(contact_id, actual_chat_id)
- return actual_chat_id
- def close_window(contact_id: str, chat_id: str) -> str:
- """关闭某个窗口。"""
- client = _clients.get(contact_id)
- if client is None:
- return f"错误: {contact_id} 未初始化"
- client.close_window(chat_id)
- _notifications.pop((contact_id, chat_id), None)
- return f"已关闭窗口: {chat_id}"
- def list_windows(contact_id: str) -> list[str]:
- """列出某个 Agent 的所有窗口。"""
- client = _clients.get(contact_id)
- if client is None:
- return []
- return client.list_windows()
- # ── Tool 3: 检查通知 ──
- def check_notification(contact_id: str, chat_id: str) -> dict | None:
- """检查某个窗口是否有新消息通知。
- Returns:
- 有新消息: {"count": 3, "from": ["alice", "bob"]}
- 没有新消息: None
- """
- return _notifications.pop((contact_id, chat_id), None)
- # ── Tool 4: 接收消息 ──
- def receive_messages(contact_id: str, chat_id: str) -> list[dict]:
- """读取某个窗口的待处理消息,读取后自动清空。
- Returns:
- 消息列表,每条格式:
- {
- "sender": "alice",
- "sender_chat_id": "...",
- "content": "你好",
- "msg_type": "chat"
- }
- """
- client = _clients.get(contact_id)
- if client is None:
- return []
- raw = client.read_pending(chat_id)
- return [
- {
- "sender": m.get("sender", "unknown"),
- "sender_chat_id": m.get("sender_chat_id"),
- "content": m.get("content", ""),
- "msg_type": m.get("msg_type", "chat"),
- }
- for m in raw
- ]
- # ── Tool 5: 发送消息 ──
- def send_message(
- contact_id: str,
- chat_id: str,
- receiver: str,
- content: str,
- msg_type: str = "chat",
- receiver_chat_id: str | None = None,
- ) -> str:
- """从某个窗口发送消息。
- Args:
- contact_id: 发送方 Agent ID
- chat_id: 发送方窗口 ID
- receiver: 接收方 contact_id
- content: 消息内容
- msg_type: 消息类型
- receiver_chat_id: 接收方窗口 ID(不指定则广播)
- Returns:
- 状态描述
- """
- client = _clients.get(contact_id)
- if client is None:
- return f"错误: {contact_id} 未初始化"
- client.send_message(chat_id, receiver, content, msg_type, receiver_chat_id)
- target = f"{receiver}:{receiver_chat_id}" if receiver_chat_id else f"{receiver}:*"
- return f"[{contact_id}:{chat_id}] 已发送给 {target}: {content[:50]}"
- # ── Tool 6: 查询联系人 ──
- def get_contacts(contact_id: str, server_http_url: str = "http://localhost:8000") -> dict:
- """查询某个 Agent 的联系人列表和在线用户。"""
- if contact_id not in _clients:
- return {"error": f"{contact_id} 未初始化"}
- result = {}
- with httpx.Client() as http:
- try:
- r = http.get(f"{server_http_url}/contacts/{contact_id}")
- result["contacts"] = r.json().get("contacts", [])
- except Exception as e:
- result["contacts_error"] = str(e)
- try:
- r = http.get(f"{server_http_url}/health")
- result["online"] = r.json().get("online", {})
- except Exception as e:
- result["online_error"] = str(e)
- return result
- # ── Tool 7: 查询聊天历史 ──
- def get_chat_history(contact_id: str, chat_id: str, peer_id: str | None = None, limit: int = 20) -> list[dict]:
- """查询某个窗口的聊天历史。"""
- client = _clients.get(contact_id)
- if client is None:
- return []
- return client.get_chat_history(chat_id, peer_id, limit)
|