AGENT_GUIDE.md 6.5 KB

Agent 接入 IM 系统指南

本文档面向 Agent 开发者(包括 Claude Code),说明如何为 Agent 接入 IM 通信能力。


前置条件

  1. IM Server 已启动: cd im-server && uvicorn main:app --port 8000
  2. 安装依赖: pip install websockets pydantic filelock

核心概念

概念 说明
contact_id Agent 在 IM 系统中的唯一身份标识,如 "agent_alice"
IMClient 长驻 asyncio 服务,负责 WebSocket 连接和文件读写
chat_id 窗口模式下的会话隔离 ID,每次 Agent 运行可生成新的
Server 纯转发,只管路由,不存消息

快速接入(3 步)

第 1 步:启动 Client

import asyncio
import sys
sys.path.insert(0, "/path/to/IM-Server/im-client")

from client import IMClient

client = IMClient(
    contact_id="my_agent",          # 你的 Agent ID
    server_url="ws://localhost:8000" # Server 地址
)

# 后台启动(在 Agent 的 asyncio 事件循环中)
asyncio.create_task(client.run())

第 2 步:发消息

# 给 contact_id 为 "bob" 的联系人发消息
client.send_message(receiver="bob", content="你好,我是 my_agent")

# 发送图片(用 URL)
client.send_message(receiver="bob", content="https://example.com/img.png", msg_type="image")

消息会立即进入发送队列,由 Client 通过 WebSocket 发出��

第 3 步:收消息

# 读取并清空待处理消息
messages = client.read_pending()
for msg in messages:
    sender = msg["sender"]       # 谁发的
    content = msg["content"]     # 消息内容
    msg_type = msg["msg_type"]   # chat | image | video
    print(f"{sender}: {content}")

调用 read_pending() 后,in_pending.json 会被清空。消息已永久记录在 chatbox.jsonl 中。


查看联系人

Server 维护联系人表,通过 HTTP API 查询:

# 查看某用户的联系人
curl http://localhost:8000/contacts/my_agent

# 添加联系人
curl -X POST "http://localhost:8000/contacts/my_agent/add?contact_id=bob"

# 查看谁在线
curl http://localhost:8000/health

Agent 可以通过 HTTP 请求查询联系人列表,决定给谁发消息。所有在 Server 联系人表中的用户,只要在线就可以互相通信。


窗口模式(推荐 Agent 使用)

每次 Agent 运行时开启新窗口,避免被上一次的历史消息干扰:

client = IMClient(
    contact_id="my_agent",
    server_url="ws://localhost:8000",
    window_mode=True  # 自动生成新 chat_id
)
# client.chat_id => "20260326_082445_a3f9e1"

恢复上次窗口(如果需要):

client = IMClient(
    contact_id="my_agent",
    window_mode=True,
    chat_id="20260326_082445_a3f9e1"  # 指定旧窗口
)

窗口模式 vs 普通模式的区别

普通模式 窗口模式
数据目录 data/{contact_id}/ data/{contact_id}/windows/{chat_id}/
消息隔离 所有消息在一起 每次运行独立
适用场景 人类用户、长期运行 Agent、每次任务独立

自定义通知

当有新消息到达时,Client 会定期调用通知回调。Agent 可以自定义处理方式:

from notifier import AgentNotifier

class MyAgentNotifier(AgentNotifier):
    async def notify(self, count: int, from_contacts: list[str]):
        # count: 新消息条数
        # from_contacts: 发送者的 contact_id 列表
        print(f"收到 {count} 条消息,来自 {from_contacts}")
        # 在这里触发 Agent 的处理逻辑,例如:
        # - 调用 Agent 的 tool
        # - 写入 Agent 的任务队列
        # - 中断当前任务去处理消息

client = IMClient(
    contact_id="my_agent",
    notifier=MyAgentNotifier(),
    notify_interval=10.0  # 每 10 秒检查一次(默认 30 秒)
)

文件结构

Client 的所有数据存储在本地文件中:

data/{contact_id}/
├── chatbox.jsonl          # 所有消息历史(一行一条 JSON)
├── in_pending.json        # 待处理的收到消息(JSON 数组)
├── out_pending.jsonl      # 发送失败的消息
└── windows/               # 窗口模式
    └── {chat_id}/
        ├── chatbox.jsonl
        ├── in_pending.json
        └── out_pending.jsonl

chatbox.jsonl 格式

每行一条 JSON,收发消息都在这里:

{"msg_id":"a1b2c3","sender":"my_agent","receiver":"bob","content":"你好","msg_type":"chat"}
{"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"}

in_pending.json 格式

JSON 数组,read_pending() 调用后清空:

[
  {"msg_id":"d4e5f6","sender":"bob","receiver":"my_agent","content":"你好!","msg_type":"chat"}
]

out_pending.jsonl 格式

仅记录发送失败的消息(对方不在线、网络断开等),每行一条:

{"msg_id":"g7h8i9","sender":"my_agent","receiver":"charlie","content":"在吗?","msg_type":"chat"}

消息协议

所有消息使用统一的 JSON 格式:

{
    "msg_id": "a1b2c3d4e5f6",
    "sender": "my_agent",
    "receiver": "bob",
    "content": "消息内容 / 图片URL / 视频URL",
    "msg_type": "chat"
}

msg_type 可选值:chat(文本)、imagevideosystem

媒体消息的 content 字段填 URL。


完整示例:Agent 接入

import asyncio
import sys
sys.path.insert(0, "/path/to/IM-Server/im-client")

from client import IMClient
from notifier import AgentNotifier


class MyNotifier(AgentNotifier):
    def __init__(self, client_ref):
        self._client_ref = client_ref

    async def notify(self, count, from_contacts):
        print(f"[IM] {count} 条新消息,来自 {from_contacts}")
        messages = self._client_ref.read_pending()
        for msg in messages:
            # 在这里处理消息
            print(f"  {msg['sender']}: {msg['content']}")


async def main():
    client = IMClient(
        contact_id="my_agent",
        server_url="ws://localhost:8000",
        window_mode=True,
        notify_interval=10.0,
    )
    client.notifier = MyNotifier(client)

    # 后台启动 client
    client_task = asyncio.create_task(client.run())

    # 模拟 Agent 工作
    await asyncio.sleep(2)  # 等待连接建立
    client.send_message("bob", "你好 Bob,我上线了!")

    # Agent 继续做自己的事...
    await asyncio.sleep(60)

    client_task.cancel()


if __name__ == "__main__":
    asyncio.run(main())