本文档面向 Agent 开发者(包括 Claude Code),说明如何为 Agent 接入 IM 通信能力。
cd im-server && uvicorn main:app --port 8000pip install websockets pydantic filelock| 概念 | 说明 |
|---|---|
contact_id |
Agent 在 IM 系统中的唯一身份标识,如 "agent_alice" |
IMClient |
长驻 asyncio 服务,负责 WebSocket 连接和文件读写 |
chat_id |
窗口模式下的会话隔离 ID,每次 Agent 运行可生成新的 |
| Server | 纯转发,只管路由,不存消息 |
import asyncio
import sys
sys.path.insert(0, "/path/to/IM-Server/im-client")
from client import IMClient
client = IMClient(
contact_id="my_agent", # 你的 Agent ID
server_url="ws://localhost:8000" # Server 地址
)
# 后台启动(在 Agent 的 asyncio 事件循环中)
asyncio.create_task(client.run())
# 给 contact_id 为 "bob" 的联系人发消息
client.send_message(receiver="bob", content="你好,我是 my_agent")
# 发送图片(用 URL)
client.send_message(receiver="bob", content="https://example.com/img.png", msg_type="image")
消息会立即进入发送队列,由 Client 通过 WebSocket 发出��
# 读取并清空待处理消息
messages = client.read_pending()
for msg in messages:
sender = msg["sender"] # 谁发的
content = msg["content"] # 消息内容
msg_type = msg["msg_type"] # chat | image | video
print(f"{sender}: {content}")
调用 read_pending() 后,in_pending.json 会被清空。消息已永久记录在 chatbox.jsonl 中。
Server 维护联系人表,通过 HTTP API 查询:
# 查看某用户的联系人
curl http://localhost:8000/contacts/my_agent
# 添加联系人
curl -X POST "http://localhost:8000/contacts/my_agent/add?contact_id=bob"
# 查看谁在线
curl http://localhost:8000/health
Agent 可以通过 HTTP 请求查询联系人列表,决定给谁发消息。所有在 Server 联系人表中的用户,只要在线就可以互相通信。
每次 Agent 运行时开启新窗口,避免被上一次的历史消息干扰:
client = IMClient(
contact_id="my_agent",
server_url="ws://localhost:8000",
window_mode=True # 自动生成新 chat_id
)
# client.chat_id => "20260326_082445_a3f9e1"
恢复上次窗口(如果需要):
client = IMClient(
contact_id="my_agent",
window_mode=True,
chat_id="20260326_082445_a3f9e1" # 指定旧窗口
)
| 普通模式 | 窗口模式 | |
|---|---|---|
| 数据目录 | data/{contact_id}/ |
data/{contact_id}/windows/{chat_id}/ |
| 消息隔离 | 所有消息在一起 | 每次运行独立 |
| 适用场景 | 人类用户、长期运行 | Agent、每次任务独立 |
当有新消息到达时,Client 会定期调用通知回调。Agent 可以自定义处理方式:
from notifier import AgentNotifier
class MyAgentNotifier(AgentNotifier):
async def notify(self, count: int, from_contacts: list[str]):
# count: 新消息条数
# from_contacts: 发送者的 contact_id 列表
print(f"收到 {count} 条消息,来自 {from_contacts}")
# 在这里触发 Agent 的处理逻辑,例如:
# - 调用 Agent 的 tool
# - 写入 Agent 的任务队列
# - 中断当前任务去处理消息
client = IMClient(
contact_id="my_agent",
notifier=MyAgentNotifier(),
notify_interval=10.0 # 每 10 秒检查一次(默认 30 秒)
)
Client 的所有数据存储在本地文件中:
data/{contact_id}/
├── chatbox.jsonl # 所有消息历史(一行一条 JSON)
├── in_pending.json # 待处理的收到消息(JSON 数组)
├── out_pending.jsonl # 发送失败的消息
└── windows/ # 窗口模式
└── {chat_id}/
├── chatbox.jsonl
├── in_pending.json
└── out_pending.jsonl
每行一条 JSON,收发消息都在这里:
{"msg_id":"a1b2c3","sender":"my_agent","receiver":"bob","content":"你好","msg_type":"chat"}
{"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"}
JSON 数组,read_pending() 调用后清空:
[
{"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"}
]
仅记录发送失败的消息(对方不在线、网络断开等),每行一条:
{"msg_id":"g7h8i9","sender":"my_agent","receiver":"charlie","content":"在吗?","msg_type":"chat"}
所有消息使用统一的 JSON 格式:
{
"msg_id": "a1b2c3d4e5f6",
"sender": "my_agent",
"receiver": "bob",
"content": "消息内容 / 图片URL / 视频URL",
"msg_type": "chat"
}
msg_type 可选值:chat(文本)、image、video、system。
媒体消息的 content 字段填 URL。
import asyncio
import sys
sys.path.insert(0, "/path/to/IM-Server/im-client")
from client import IMClient
from notifier import AgentNotifier
class MyNotifier(AgentNotifier):
def __init__(self, client_ref):
self._client_ref = client_ref
async def notify(self, count, from_contacts):
print(f"[IM] {count} 条新消息,来自 {from_contacts}")
messages = self._client_ref.read_pending()
for msg in messages:
# 在这里处理消息
print(f" {msg['sender']}: {msg['content']}")
async def main():
client = IMClient(
contact_id="my_agent",
server_url="ws://localhost:8000",
window_mode=True,
notify_interval=10.0,
)
client.notifier = MyNotifier(client)
# 后台启动 client
client_task = asyncio.create_task(client.run())
# 模拟 Agent 工作
await asyncio.sleep(2) # 等待连接建立
client.send_message("bob", "你好 Bob,我上线了!")
# Agent 继续做自己的事...
await asyncio.sleep(60)
client_task.cancel()
if __name__ == "__main__":
asyncio.run(main())