| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """
- Gateway Client SDK
- 通用的 Gateway 客户端实现,提供:
- - Agent 注册和连接管理
- - 消息发送和接收
- - 在线状态查询
- - Agent 列表查询
- 不依赖任何特定的 Agent 框架。
- """
- import asyncio
- import json
- import logging
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import Any, Callable, Dict, List, Optional
- from urllib.parse import urlencode
- import httpx
- logger = logging.getLogger(__name__)
- @dataclass
- class Message:
- """MAMP 协议消息"""
- conversation_id: str
- content: List[Dict[str, Any]] # 多模态内容
- metadata: Dict[str, Any] = field(default_factory=dict)
- message_id: Optional[str] = None
- timestamp: Optional[str] = None
- @dataclass
- class AgentCard:
- """Agent 身份卡片"""
- agent_id: str
- agent_name: str
- agent_type: Optional[str] = None
- capabilities: List[str] = field(default_factory=list)
- description: Optional[str] = None
- version: Optional[str] = None
- metadata: Dict[str, Any] = field(default_factory=dict)
- class GatewayClient:
- """
- Gateway 客户端
- 使用示例:
- ```python
- client = GatewayClient(
- gateway_url="http://localhost:8001",
- agent_card=AgentCard(
- agent_id="my-agent-001",
- agent_name="My Agent",
- capabilities=["search", "analyze"]
- )
- )
- # 发送消息
- await client.send_message(
- to_agent_id="target-agent",
- content=[{"type": "text", "text": "Hello"}],
- conversation_id="conv-123"
- )
- # 查询在线 Agent
- agents = await client.list_agents()
- ```
- """
- def __init__(
- self,
- gateway_url: str,
- agent_card: AgentCard,
- on_message: Optional[Callable[[Message], None]] = None,
- auto_reconnect: bool = True,
- heartbeat_interval: int = 30,
- ):
- """
- 初始化 Gateway 客户端
- Args:
- gateway_url: Gateway HTTP API 地址(如 http://localhost:8001)
- agent_card: Agent 身份信息
- on_message: 收到消息时的回调函数
- auto_reconnect: 是否自动重连
- heartbeat_interval: 心跳间隔(秒)
- """
- self.gateway_url = gateway_url.rstrip("/")
- self.agent_card = agent_card
- self.on_message = on_message
- self.auto_reconnect = auto_reconnect
- self.heartbeat_interval = heartbeat_interval
- self._http_client: Optional[httpx.AsyncClient] = None
- self._connected = False
- async def __aenter__(self):
- """异步上下文管理器入口"""
- await self.connect()
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- """异步上下文管理器出口"""
- await self.disconnect()
- async def connect(self):
- """连接到 Gateway"""
- if self._connected:
- return
- self._http_client = httpx.AsyncClient(timeout=30.0)
- # 注册 Agent
- try:
- response = await self._http_client.post(
- f"{self.gateway_url}/gateway/register",
- json={
- "agent_id": self.agent_card.agent_id,
- "agent_name": self.agent_card.agent_name,
- "agent_type": self.agent_card.agent_type,
- "capabilities": self.agent_card.capabilities,
- "description": self.agent_card.description,
- "version": self.agent_card.version,
- "metadata": self.agent_card.metadata,
- }
- )
- response.raise_for_status()
- self._connected = True
- logger.info(f"Connected to Gateway: {self.agent_card.agent_id}")
- except Exception as e:
- logger.error(f"Failed to connect to Gateway: {e}")
- raise
- async def disconnect(self):
- """断开连接"""
- if not self._connected:
- return
- try:
- if self._http_client:
- await self._http_client.post(
- f"{self.gateway_url}/gateway/unregister",
- json={"agent_id": self.agent_card.agent_id}
- )
- await self._http_client.aclose()
- self._http_client = None
- self._connected = False
- logger.info(f"Disconnected from Gateway: {self.agent_card.agent_id}")
- except Exception as e:
- logger.error(f"Error during disconnect: {e}")
- async def send_message(
- self,
- to_agent_id: str,
- content: List[Dict[str, Any]],
- conversation_id: str,
- metadata: Optional[Dict[str, Any]] = None
- ) -> Dict[str, Any]:
- """
- 发送消息到其他 Agent
- Args:
- to_agent_id: 目标 Agent ID
- content: 消息内容(MAMP 格式)
- conversation_id: 对话 ID
- metadata: 元数据
- Returns:
- 发送结果
- """
- if not self._connected:
- raise RuntimeError("Not connected to Gateway")
- response = await self._http_client.post(
- f"{self.gateway_url}/gateway/send",
- json={
- "from_agent_id": self.agent_card.agent_id,
- "to_agent_id": to_agent_id,
- "conversation_id": conversation_id,
- "content": content,
- "metadata": metadata or {}
- }
- )
- response.raise_for_status()
- return response.json()
- async def list_agents(
- self,
- online_only: bool = True,
- agent_type: Optional[str] = None
- ) -> List[Dict[str, Any]]:
- """
- 查询 Agent 列表
- Args:
- online_only: 只返回在线 Agent
- agent_type: 过滤 Agent 类型
- Returns:
- Agent 列表
- """
- if not self._connected:
- raise RuntimeError("Not connected to Gateway")
- params = {"online_only": online_only}
- if agent_type:
- params["agent_type"] = agent_type
- response = await self._http_client.get(
- f"{self.gateway_url}/gateway/agents",
- params=params
- )
- response.raise_for_status()
- data = response.json()
- return data.get("agents", [])
- async def get_agent_status(self, agent_id: str) -> Dict[str, Any]:
- """
- 查询指定 Agent 的状态
- Args:
- agent_id: Agent ID
- Returns:
- Agent 状态信息
- """
- if not self._connected:
- raise RuntimeError("Not connected to Gateway")
- from urllib.parse import quote
- encoded_id = quote(agent_id, safe='')
- response = await self._http_client.get(
- f"{self.gateway_url}/gateway/status/{encoded_id}"
- )
- response.raise_for_status()
- return response.json()
- async def heartbeat(self):
- """发送心跳"""
- if not self._connected:
- return
- try:
- response = await self._http_client.post(
- f"{self.gateway_url}/gateway/heartbeat",
- json={"agent_id": self.agent_card.agent_id}
- )
- response.raise_for_status()
- except Exception as e:
- logger.error(f"Heartbeat failed: {e}")
- if self.auto_reconnect:
- await self.connect()
|