| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- """IM 工具 — 将 im-client 接入 Agent 框架。
- 新架构:一个 Agent (contact_id) = 一个 IMClient 实例,该实例管理多个窗口 (chat_id)。
- """
- import asyncio
- import json
- import logging
- import os
- import sys
- from typing import Optional
- from agent.tools import tool, ToolResult, ToolContext
- # 将 im-client 目录加入 sys.path
- _IM_CLIENT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "im-client"))
- if _IM_CLIENT_DIR not in sys.path:
- sys.path.insert(0, _IM_CLIENT_DIR)
- from client import IMClient # noqa: E402
- from notifier import AgentNotifier # noqa: E402
- logger = logging.getLogger(__name__)
- # ── 全局状态 ──
- _clients: dict[str, IMClient] = {}
- _tasks: dict[str, asyncio.Task] = {}
- _notifications: dict[tuple[str, str], dict] = {} # (contact_id, chat_id) -> 通知
- class _ToolNotifier(AgentNotifier):
- """内部通知器:按 (contact_id, chat_id) 分发通知。"""
- def __init__(self, contact_id: str, chat_id: str):
- self._key = (contact_id, chat_id)
- async def notify(self, count: int, from_contacts: list[str]):
- _notifications[self._key] = {"count": count, "from": from_contacts}
- # ── Tool 1: 初始化连接 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "初始化 IM 连接", "params": {"contact_id": "你的身份 ID", "server_url": "服务器地址"}},
- "en": {"name": "Setup IM Connection", "params": {"contact_id": "Your identity ID", "server_url": "Server URL"}},
- }
- )
- async def im_setup(
- contact_id: str,
- server_url: str = "ws://localhost:8000",
- notify_interval: float = 10.0,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """初始化 IM Client 并连接 Server。
- Args:
- contact_id: 你在 IM 系统中的身份 ID
- server_url: Server 的 WebSocket 地址
- notify_interval: 检查新消息的间隔秒数
- """
- if contact_id in _clients:
- return ToolResult(title="IM 已连接", output=f"已连接: {contact_id}")
- client = IMClient(contact_id=contact_id, server_url=server_url, notify_interval=notify_interval)
- _clients[contact_id] = client
- loop = asyncio.get_event_loop()
- _tasks[contact_id] = loop.create_task(client.run())
- # 等待一小段时间,让连接尝试开始(非阻塞)
- await asyncio.sleep(0.5)
- return ToolResult(title="IM 连接成功", output=f"已启动 IM Client: {contact_id},后台连接中...")
- # ── Tool 2: 窗口管理 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "打开 IM 窗口", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}},
- "en": {"name": "Open IM Window", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}},
- }
- )
- async def im_open_window(
- contact_id: str,
- chat_id: str | None = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """打开一个新的聊天窗口。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID(留空自动生成)
- """
- client = _clients.get(contact_id)
- if client is None:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- actual_chat_id = client.open_window(chat_id=chat_id, notifier=_ToolNotifier(contact_id, chat_id or ""))
- if chat_id is None:
- client._notifiers[actual_chat_id] = _ToolNotifier(contact_id, actual_chat_id)
- return ToolResult(title="窗口已打开", output=f"窗口 ID: {actual_chat_id}")
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "关闭 IM 窗口", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}},
- "en": {"name": "Close IM Window", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}},
- }
- )
- async def im_close_window(
- contact_id: str,
- chat_id: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """关闭一个聊天窗口。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID
- """
- client = _clients.get(contact_id)
- if client is None:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- client.close_window(chat_id)
- _notifications.pop((contact_id, chat_id), None)
- return ToolResult(title="窗口已关闭", output=f"已关闭窗口: {chat_id}")
- # ── Tool 3: 检查通知 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "检查 IM 新消息通知", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}},
- "en": {"name": "Check IM Notifications", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}},
- }
- )
- async def im_check_notification(
- contact_id: str,
- chat_id: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """检查某个窗口是否有新消息通知。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID
- """
- notification = _notifications.pop((contact_id, chat_id), None)
- if notification is None:
- return ToolResult(title="无新消息", output="当前没有新消息通知")
- return ToolResult(
- title=f"有 {notification['count']} 条新消息",
- output=json.dumps(notification, ensure_ascii=False),
- metadata=notification,
- )
- # ── Tool 4: 接收消息 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "接收 IM 消息", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID"}},
- "en": {"name": "Receive IM Messages", "params": {"contact_id": "Agent ID", "chat_id": "Window ID"}},
- }
- )
- async def im_receive_messages(
- contact_id: str,
- chat_id: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """读取某个窗口的待处理消息。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID
- """
- client = _clients.get(contact_id)
- if client is None:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- raw = client.read_pending(chat_id)
- if not raw:
- return ToolResult(title="无待处理消息", output="[]")
- messages = [
- {
- "sender": m.get("sender", "unknown"),
- "sender_chat_id": m.get("sender_chat_id"),
- "content": m.get("content", ""),
- "msg_type": m.get("msg_type", "chat"),
- }
- for m in raw
- ]
- return ToolResult(
- title=f"收到 {len(messages)} 条消息",
- output=json.dumps(messages, ensure_ascii=False, indent=2),
- metadata={"messages": messages},
- )
- # ── Tool 5: 发送消息 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "发送 IM 消息", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID", "receiver": "接收者 ID", "content": "消息内容"}},
- "en": {"name": "Send IM Message", "params": {"contact_id": "Agent ID", "chat_id": "Window ID", "receiver": "Receiver ID", "content": "Message content"}},
- }
- )
- async def im_send_message(
- contact_id: str,
- chat_id: str,
- receiver: str,
- content: str,
- msg_type: str = "chat",
- receiver_chat_id: str | None = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """从某个窗口发送消息。
- Args:
- contact_id: 发送方 Agent ID
- chat_id: 发送方窗口 ID
- receiver: 接收方 contact_id
- content: 消息内容
- msg_type: 消息类型
- receiver_chat_id: 接收方窗口 ID(不指定则广播)
- """
- client = _clients.get(contact_id)
- if client is None:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- client.send_message(chat_id, receiver, content, msg_type, receiver_chat_id)
- target = f"{receiver}:{receiver_chat_id}" if receiver_chat_id else f"{receiver}:*"
- return ToolResult(
- title=f"已发送给 {target}",
- output=f"[{contact_id}:{chat_id}] 已发送给 {target}: {content[:50]}",
- )
- # ── Tool 6: 查询联系人 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "查询 IM 联系人", "params": {"contact_id": "Agent ID", "server_http_url": "服务器 HTTP 地址"}},
- "en": {"name": "Get IM Contacts", "params": {"contact_id": "Agent ID", "server_http_url": "Server HTTP URL"}},
- }
- )
- async def im_get_contacts(
- contact_id: str,
- server_http_url: str = "http://localhost:8000",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """查询联系人列表和当前在线用户。
- Args:
- contact_id: Agent ID
- server_http_url: Server 的 HTTP 地址
- """
- if contact_id not in _clients:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- import httpx
- result = {}
- async with httpx.AsyncClient() as http:
- try:
- r = await http.get(f"{server_http_url}/contacts/{contact_id}")
- result["contacts"] = r.json().get("contacts", [])
- except Exception as e:
- result["contacts_error"] = str(e)
- try:
- r = await http.get(f"{server_http_url}/health")
- result["online"] = r.json().get("online", {})
- except Exception as e:
- result["online_error"] = str(e)
- return ToolResult(
- title="联系人查询完成",
- output=json.dumps(result, ensure_ascii=False, indent=2),
- metadata=result,
- )
- # ── Tool 7: 查询聊天历史 ──
- @tool(
- hidden_params=["context"],
- display={
- "zh": {"name": "查询 IM 聊天历史", "params": {"contact_id": "Agent ID", "chat_id": "窗口 ID", "peer_id": "联系人 ID", "limit": "最大条数"}},
- "en": {"name": "Get IM Chat History", "params": {"contact_id": "Agent ID", "chat_id": "Window ID", "peer_id": "Contact ID", "limit": "Max records"}},
- }
- )
- async def im_get_chat_history(
- contact_id: str,
- chat_id: str,
- peer_id: str | None = None,
- limit: int = 20,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """查询某个窗口的聊天历史。
- Args:
- contact_id: Agent ID
- chat_id: 窗口 ID
- peer_id: 筛选与某个联系人的聊天(留空返回所有)
- limit: 最多返回条数
- """
- client = _clients.get(contact_id)
- if client is None:
- return ToolResult(title="未连接", output=f"错误: {contact_id} 未初始化", error="未初始化")
- messages = client.get_chat_history(chat_id, peer_id, limit)
- return ToolResult(
- title=f"查到 {len(messages)} 条记录",
- output=json.dumps(messages, ensure_ascii=False, indent=2),
- metadata={"messages": messages},
- )
|