router.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. """
  2. Gateway Router
  3. 消息路由、在线状态查询
  4. """
  5. import json
  6. import logging
  7. from typing import Dict, Any, Optional
  8. from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException
  9. from pydantic import BaseModel
  10. from .registry import AgentRegistry
  11. logger = logging.getLogger(__name__)
  12. class SendMessageRequest(BaseModel):
  13. """发送消息请求"""
  14. to: str # 目标 Agent URI
  15. content: Any # 消息内容(字符串或多模态数组)
  16. conversation_id: Optional[str] = None
  17. metadata: Optional[Dict] = None
  18. class SendMessageResponse(BaseModel):
  19. """发送消息响应"""
  20. message_id: str
  21. conversation_id: str
  22. status: str # "sent" | "queued"
  23. class AgentStatusResponse(BaseModel):
  24. """Agent 状态响应"""
  25. agent_uri: str
  26. status: str # "online" | "offline"
  27. last_seen: Optional[str] = None
  28. class GatewayRouter:
  29. """Gateway 路由器"""
  30. def __init__(self, registry: AgentRegistry):
  31. self.registry = registry
  32. self.router = APIRouter(prefix="/gateway", tags=["gateway"])
  33. # 注册路由
  34. self.router.add_api_websocket_route("/connect", self.handle_websocket)
  35. self.router.add_api_route("/send", self.send_message, methods=["POST"])
  36. self.router.add_api_route("/status/{agent_uri:path}", self.get_agent_status, methods=["GET"])
  37. self.router.add_api_route("/agents", self.list_agents, methods=["GET"])
  38. async def handle_websocket(self, websocket: WebSocket):
  39. """
  40. 处理 WebSocket 连接
  41. Agent 通过此端点注册并保持长连接
  42. """
  43. await websocket.accept()
  44. agent_uri = None
  45. try:
  46. # 等待注册消息
  47. data = await websocket.receive_text()
  48. msg = json.loads(data)
  49. if msg.get("type") != "register":
  50. await websocket.send_text(json.dumps({
  51. "type": "error",
  52. "message": "First message must be register"
  53. }))
  54. await websocket.close()
  55. return
  56. agent_uri = msg.get("agent_uri")
  57. if not agent_uri:
  58. await websocket.send_text(json.dumps({
  59. "type": "error",
  60. "message": "agent_uri is required"
  61. }))
  62. await websocket.close()
  63. return
  64. # 注册 Agent
  65. await self.registry.register(
  66. agent_uri=agent_uri,
  67. connection_type="websocket",
  68. websocket=websocket,
  69. capabilities=msg.get("capabilities", []),
  70. metadata=msg.get("metadata", {})
  71. )
  72. # 发送注册成功消息
  73. await websocket.send_text(json.dumps({
  74. "type": "registered",
  75. "agent_uri": agent_uri
  76. }))
  77. logger.info(f"Agent connected: {agent_uri}")
  78. # 保持连接,处理消息
  79. while True:
  80. data = await websocket.receive_text()
  81. msg = json.loads(data)
  82. msg_type = msg.get("type")
  83. if msg_type == "heartbeat":
  84. # 更新心跳
  85. await self.registry.heartbeat(agent_uri)
  86. await websocket.send_text(json.dumps({
  87. "type": "heartbeat_ack"
  88. }))
  89. elif msg_type == "result":
  90. # Agent 返回任务结果
  91. # TODO: 将结果转发给调用方
  92. logger.info(f"Received result from {agent_uri}: {msg.get('task_id')}")
  93. else:
  94. logger.warning(f"Unknown message type: {msg_type}")
  95. except WebSocketDisconnect:
  96. logger.info(f"Agent disconnected: {agent_uri}")
  97. except Exception as e:
  98. logger.error(f"WebSocket error: {e}")
  99. finally:
  100. if agent_uri:
  101. await self.registry.unregister(agent_uri)
  102. async def send_message(self, request: SendMessageRequest) -> SendMessageResponse:
  103. """
  104. 发送消息到目标 Agent
  105. 通过 Gateway 路由消息
  106. """
  107. import uuid
  108. # 查找目标 Agent
  109. connection = self.registry.lookup(request.to)
  110. if not connection:
  111. raise HTTPException(status_code=404, detail=f"Agent not found: {request.to}")
  112. if not self.registry.is_online(request.to):
  113. raise HTTPException(status_code=503, detail=f"Agent offline: {request.to}")
  114. # 生成消息 ID
  115. message_id = f"msg-{uuid.uuid4()}"
  116. conversation_id = request.conversation_id or f"conv-{uuid.uuid4()}"
  117. # 构建消息
  118. message = {
  119. "type": "message",
  120. "message_id": message_id,
  121. "conversation_id": conversation_id,
  122. "from": "gateway", # TODO: 从请求中获取发送方
  123. "content": request.content,
  124. "metadata": request.metadata or {}
  125. }
  126. # 根据连接类型发送
  127. if connection.connection_type == "websocket":
  128. # 通过 WebSocket 发送
  129. await connection.websocket.send_text(json.dumps(message))
  130. status = "sent"
  131. elif connection.connection_type == "http":
  132. # 通过 HTTP 发送
  133. # TODO: 实现 HTTP 转发
  134. status = "queued"
  135. else:
  136. raise HTTPException(status_code=500, detail="Unknown connection type")
  137. return SendMessageResponse(
  138. message_id=message_id,
  139. conversation_id=conversation_id,
  140. status=status
  141. )
  142. async def get_agent_status(self, agent_uri: str) -> AgentStatusResponse:
  143. """查询 Agent 在线状态"""
  144. connection = self.registry.lookup(agent_uri)
  145. if not connection:
  146. return AgentStatusResponse(
  147. agent_uri=agent_uri,
  148. status="offline",
  149. last_seen=None
  150. )
  151. is_online = self.registry.is_online(agent_uri)
  152. return AgentStatusResponse(
  153. agent_uri=agent_uri,
  154. status="online" if is_online else "offline",
  155. last_seen=connection.last_heartbeat.isoformat() if connection else None
  156. )
  157. async def list_agents(
  158. self,
  159. connection_type: Optional[str] = None,
  160. online_only: bool = True
  161. ) -> Dict[str, Any]:
  162. """列出所有 Agent"""
  163. agents = self.registry.list_agents(
  164. connection_type=connection_type,
  165. online_only=online_only
  166. )
  167. return {
  168. "agents": [
  169. {
  170. "agent_uri": a.agent_uri,
  171. "connection_type": a.connection_type,
  172. "capabilities": a.capabilities,
  173. "registered_at": a.registered_at.isoformat(),
  174. "last_heartbeat": a.last_heartbeat.isoformat()
  175. }
  176. for a in agents
  177. ],
  178. "total": len(agents)
  179. }