# Agent 接入 IM 系统指南 本文档面向 Agent 开发者(包括 Claude Code),说明如何为 Agent 接入 IM 通信能力。 --- ## 前置条件 1. IM Server 已启动: `cd im-server && uvicorn main:app --port 8000` 2. 安装依赖: `pip install websockets pydantic filelock` --- ## 核心概念 | 概念 | 说明 | |---|---| | `contact_id` | Agent 在 IM 系统中的唯一身份标识,如 `"agent_alice"` | | `IMClient` | 长驻 asyncio 服务,负责 WebSocket 连接和文件读写 | | `chat_id` | 窗口模式下的会话隔离 ID,每次 Agent 运行可生成新的 | | Server | 纯转发,只管路由,不存消息 | --- ## 快速接入(3 步) ### 第 1 步:启动 Client ```python import asyncio import sys sys.path.insert(0, "/path/to/IM-Server/im-client") from client import IMClient client = IMClient( contact_id="my_agent", # 你的 Agent ID server_url="ws://localhost:8000" # Server 地址 ) # 后台启动(在 Agent 的 asyncio 事件循环中) asyncio.create_task(client.run()) ``` ### 第 2 步:发消息 ```python # 给 contact_id 为 "bob" 的联系人发消息 client.send_message(receiver="bob", content="你好,我是 my_agent") # 发送图片(用 URL) client.send_message(receiver="bob", content="https://example.com/img.png", msg_type="image") ``` 消息会立即进入发送队列,由 Client 通过 WebSocket 发出�� ### 第 3 步:收消息 ```python # 读取并清空待处理消息 messages = client.read_pending() for msg in messages: sender = msg["sender"] # 谁发的 content = msg["content"] # 消息内容 msg_type = msg["msg_type"] # chat | image | video print(f"{sender}: {content}") ``` 调用 `read_pending()` 后,`in_pending.json` 会被清空。消息已永久记录在 `chatbox.jsonl` 中。 --- ## 查看联系人 Server 维护联系人表,通过 HTTP API 查询: ```bash # 查看某用户的联系人 curl http://localhost:8000/contacts/my_agent # 添加联系人 curl -X POST "http://localhost:8000/contacts/my_agent/add?contact_id=bob" # 查看谁在线 curl http://localhost:8000/health ``` Agent 可以通过 HTTP 请求查询联系人列表,决定给谁发消息。所有在 Server 联系人表中的用户,只要在线就可以互相通信。 --- ## 窗口模式(推荐 Agent 使用) 每次 Agent 运行时开启新窗口,避免被上一次的历史消息干扰: ```python client = IMClient( contact_id="my_agent", server_url="ws://localhost:8000", window_mode=True # 自动生成新 chat_id ) # client.chat_id => "20260326_082445_a3f9e1" ``` 恢复上次窗口(如果需要): ```python client = IMClient( contact_id="my_agent", window_mode=True, chat_id="20260326_082445_a3f9e1" # 指定旧窗口 ) ``` ### 窗口模式 vs 普通模式的区别 | | 普通模式 | 窗口模式 | |---|---|---| | 数据目录 | `data/{contact_id}/` | `data/{contact_id}/windows/{chat_id}/` | | 消息隔离 | 所有消息在一起 | 每次运行独立 | | 适用场景 | 人类用户、长期运行 | Agent、每次任务独立 | --- ## 自定义通知 当有新消息到达时,Client 会定期调用通知回调。Agent 可以自定义处理方式: ```python from notifier import AgentNotifier class MyAgentNotifier(AgentNotifier): async def notify(self, count: int, from_contacts: list[str]): # count: 新消息条数 # from_contacts: 发送者的 contact_id 列表 print(f"收到 {count} 条消息,来自 {from_contacts}") # 在这里触发 Agent 的处理逻辑,例如: # - 调用 Agent 的 tool # - 写入 Agent 的任务队列 # - 中断当前任务去处理消息 client = IMClient( contact_id="my_agent", notifier=MyAgentNotifier(), notify_interval=10.0 # 每 10 秒检查一次(默认 30 秒) ) ``` --- ## 文件结构 Client 的所有数据存储在本地文件中: ``` data/{contact_id}/ ├── chatbox.jsonl # 所有消息历史(一行一条 JSON) ├── in_pending.json # 待处理的收到消息(JSON 数组) ├── out_pending.jsonl # 发送失败的消息 └── windows/ # 窗口模式 └── {chat_id}/ ├── chatbox.jsonl ├── in_pending.json └── out_pending.jsonl ``` ### chatbox.jsonl 格式 每行一条 JSON,收发消息都在这里: ```jsonl {"msg_id":"a1b2c3","sender":"my_agent","receiver":"bob","content":"你好","msg_type":"chat"} {"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"} ``` ### in_pending.json 格式 JSON 数组,`read_pending()` 调用后清空: ```json [ {"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"} ] ``` ### out_pending.jsonl 格式 仅记录发送失败的消息(对方不在线、网络断开等),每行一条: ```jsonl {"msg_id":"g7h8i9","sender":"my_agent","receiver":"charlie","content":"在吗?","msg_type":"chat"} ``` --- ## 消息协议 所有消息使用统一的 JSON 格式: ```json { "msg_id": "a1b2c3d4e5f6", "sender": "my_agent", "receiver": "bob", "content": "消息内容 / 图片URL / 视频URL", "msg_type": "chat" } ``` `msg_type` 可选值:`chat`(文本)、`image`、`video`、`system`。 媒体消息的 `content` 字段填 URL。 --- ## 完整示例:Agent 接入 ```python import asyncio import sys sys.path.insert(0, "/path/to/IM-Server/im-client") from client import IMClient from notifier import AgentNotifier class MyNotifier(AgentNotifier): def __init__(self, client_ref): self._client_ref = client_ref async def notify(self, count, from_contacts): print(f"[IM] {count} 条新消息,来自 {from_contacts}") messages = self._client_ref.read_pending() for msg in messages: # 在这里处理消息 print(f" {msg['sender']}: {msg['content']}") async def main(): client = IMClient( contact_id="my_agent", server_url="ws://localhost:8000", window_mode=True, notify_interval=10.0, ) client.notifier = MyNotifier(client) # 后台启动 client client_task = asyncio.create_task(client.run()) # 模拟 Agent 工作 await asyncio.sleep(2) # 等待连接建立 client.send_message("bob", "你好 Bob,我上线了!") # Agent 继续做自己的事... await asyncio.sleep(60) client_task.cancel() if __name__ == "__main__": asyncio.run(main()) ```