""" Gateway Client SDK 通用的 Gateway 客户端实现,提供: - Agent 注册和连接管理 - 消息发送和接收 - 在线状态查询 - Agent 列表查询 不依赖任何特定的 Agent 框架。 """ import asyncio import json import logging from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable, Dict, List, Optional from urllib.parse import urlencode import httpx logger = logging.getLogger(__name__) @dataclass class Message: """MAMP 协议消息""" conversation_id: str content: List[Dict[str, Any]] # 多模态内容 metadata: Dict[str, Any] = field(default_factory=dict) message_id: Optional[str] = None timestamp: Optional[str] = None @dataclass class AgentCard: """Agent 身份卡片""" agent_id: str agent_name: str agent_type: Optional[str] = None capabilities: List[str] = field(default_factory=list) description: Optional[str] = None version: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) class GatewayClient: """ Gateway 客户端 使用示例: ```python client = GatewayClient( gateway_url="http://localhost:8001", agent_card=AgentCard( agent_id="my-agent-001", agent_name="My Agent", capabilities=["search", "analyze"] ) ) # 发送消息 await client.send_message( to_agent_id="target-agent", content=[{"type": "text", "text": "Hello"}], conversation_id="conv-123" ) # 查询在线 Agent agents = await client.list_agents() ``` """ def __init__( self, gateway_url: str, agent_card: AgentCard, on_message: Optional[Callable[[Message], None]] = None, auto_reconnect: bool = True, heartbeat_interval: int = 30, ): """ 初始化 Gateway 客户端 Args: gateway_url: Gateway HTTP API 地址(如 http://localhost:8001) agent_card: Agent 身份信息 on_message: 收到消息时的回调函数 auto_reconnect: 是否自动重连 heartbeat_interval: 心跳间隔(秒) """ self.gateway_url = gateway_url.rstrip("/") self.agent_card = agent_card self.on_message = on_message self.auto_reconnect = auto_reconnect self.heartbeat_interval = heartbeat_interval self._http_client: Optional[httpx.AsyncClient] = None self._connected = False async def __aenter__(self): """异步上下文管理器入口""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.disconnect() async def connect(self): """连接到 Gateway""" if self._connected: return self._http_client = httpx.AsyncClient(timeout=30.0) # 注册 Agent try: response = await self._http_client.post( f"{self.gateway_url}/gateway/register", json={ "agent_id": self.agent_card.agent_id, "agent_name": self.agent_card.agent_name, "agent_type": self.agent_card.agent_type, "capabilities": self.agent_card.capabilities, "description": self.agent_card.description, "version": self.agent_card.version, "metadata": self.agent_card.metadata, } ) response.raise_for_status() self._connected = True logger.info(f"Connected to Gateway: {self.agent_card.agent_id}") except Exception as e: logger.error(f"Failed to connect to Gateway: {e}") raise async def disconnect(self): """断开连接""" if not self._connected: return try: if self._http_client: await self._http_client.post( f"{self.gateway_url}/gateway/unregister", json={"agent_id": self.agent_card.agent_id} ) await self._http_client.aclose() self._http_client = None self._connected = False logger.info(f"Disconnected from Gateway: {self.agent_card.agent_id}") except Exception as e: logger.error(f"Error during disconnect: {e}") async def send_message( self, to_agent_id: str, content: List[Dict[str, Any]], conversation_id: str, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 发送消息到其他 Agent Args: to_agent_id: 目标 Agent ID content: 消息内容(MAMP 格式) conversation_id: 对话 ID metadata: 元数据 Returns: 发送结果 """ if not self._connected: raise RuntimeError("Not connected to Gateway") response = await self._http_client.post( f"{self.gateway_url}/gateway/send", json={ "from_agent_id": self.agent_card.agent_id, "to_agent_id": to_agent_id, "conversation_id": conversation_id, "content": content, "metadata": metadata or {} } ) response.raise_for_status() return response.json() async def list_agents( self, online_only: bool = True, agent_type: Optional[str] = None ) -> List[Dict[str, Any]]: """ 查询 Agent 列表 Args: online_only: 只返回在线 Agent agent_type: 过滤 Agent 类型 Returns: Agent 列表 """ if not self._connected: raise RuntimeError("Not connected to Gateway") params = {"online_only": online_only} if agent_type: params["agent_type"] = agent_type response = await self._http_client.get( f"{self.gateway_url}/gateway/agents", params=params ) response.raise_for_status() data = response.json() return data.get("agents", []) async def get_agent_status(self, agent_id: str) -> Dict[str, Any]: """ 查询指定 Agent 的状态 Args: agent_id: Agent ID Returns: Agent 状态信息 """ if not self._connected: raise RuntimeError("Not connected to Gateway") from urllib.parse import quote encoded_id = quote(agent_id, safe='') response = await self._http_client.get( f"{self.gateway_url}/gateway/status/{encoded_id}" ) response.raise_for_status() return response.json() async def heartbeat(self): """发送心跳""" if not self._connected: return try: response = await self._http_client.post( f"{self.gateway_url}/gateway/heartbeat", json={"agent_id": self.agent_card.agent_id} ) response.raise_for_status() except Exception as e: logger.error(f"Heartbeat failed: {e}") if self.auto_reconnect: await self.connect()