manager.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. Trace 元数据以 Agent API(HTTP)为准;Gateway 在 Agent 返回 trace_id 后通过 ``bind_agent_trace`` 登记引用。
  3. ``get_trace`` / ``list_traces``:优先请求 Agent API,失败时返回本地登记信息。
  4. """
  5. from __future__ import annotations
  6. import logging
  7. from typing import Any
  8. import httpx
  9. from utils.env_parse import env_float, env_str
  10. from gateway.core.lifecycle.errors import LifecycleError
  11. from gateway.core.lifecycle.workspace import WorkspaceManager
  12. logger = logging.getLogger(__name__)
  13. class TraceManager:
  14. def __init__(
  15. self,
  16. *,
  17. workspace_manager: WorkspaceManager,
  18. agent_api_base_url: str,
  19. http_timeout: float,
  20. ) -> None:
  21. self._wm = workspace_manager
  22. self._base = agent_api_base_url.rstrip("/")
  23. self._timeout = http_timeout
  24. self._local_meta: dict[str, dict[str, Any]] = {}
  25. @classmethod
  26. def from_env(cls, workspace_manager: WorkspaceManager) -> TraceManager:
  27. return cls(
  28. workspace_manager=workspace_manager,
  29. agent_api_base_url=env_str("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000"),
  30. http_timeout=env_float("GATEWAY_AGENT_API_TIMEOUT", 60.0),
  31. )
  32. async def prepare_workspace_session(self, workspace_id: str) -> None:
  33. await self._wm.ensure_session(workspace_id)
  34. async def bind_agent_trace(
  35. self,
  36. workspace_id: str,
  37. agent_trace_id: str,
  38. agent_type: str,
  39. metadata: dict[str, Any] | None = None,
  40. ) -> None:
  41. await self._wm.add_trace_ref(workspace_id, agent_trace_id)
  42. self._local_meta[agent_trace_id] = {
  43. "trace_id": agent_trace_id,
  44. "workspace_id": workspace_id,
  45. "agent_type": agent_type,
  46. "metadata": dict(metadata or {}),
  47. "agent_api_status": "bound",
  48. }
  49. async def release_agent_trace(self, workspace_id: str, agent_trace_id: str) -> None:
  50. """解除 Trace 与 Workspace 的绑定(Gateway 本地登记 + meta 引用)。"""
  51. await self._wm.remove_trace_ref(workspace_id, agent_trace_id)
  52. self._local_meta.pop(agent_trace_id, None)
  53. async def get_trace(self, trace_id: str) -> dict[str, Any]:
  54. async with httpx.AsyncClient(timeout=self._timeout) as client:
  55. try:
  56. r = await client.get(f"{self._base}/api/traces/{trace_id}")
  57. if r.status_code == 200:
  58. body = r.json()
  59. trace = body.get("trace")
  60. if isinstance(trace, dict):
  61. return {"source": "agent_api", **trace}
  62. except httpx.RequestError as e:
  63. logger.warning("TraceManager.get_trace HTTP 失败 trace_id=%s err=%s", trace_id, e)
  64. local = self._local_meta.get(trace_id)
  65. if local:
  66. return {"source": "gateway_local", **local}
  67. wid = self._wm.get_workspace_id_for_trace(trace_id)
  68. if wid:
  69. return {
  70. "source": "gateway_local",
  71. "trace_id": trace_id,
  72. "workspace_id": wid,
  73. "agent_api_status": "unknown",
  74. }
  75. raise LifecycleError(f"Trace 不存在: {trace_id}")
  76. async def list_traces(
  77. self,
  78. workspace_id: str | None = None,
  79. agent_type: str | None = None,
  80. *,
  81. limit: int = 50,
  82. ) -> list[dict[str, Any]]:
  83. params: dict[str, str | int] = {"limit": min(limit, 100)}
  84. if agent_type:
  85. params["agent_type"] = agent_type
  86. # Agent API 使用 uid 过滤;飞书侧 uid 常为裸 user_id,workspace_id 形如 feishu:<uid>
  87. if workspace_id and ":" in workspace_id:
  88. prefix, _, rest = workspace_id.partition(":")
  89. if prefix == "feishu" and rest:
  90. params["uid"] = rest
  91. async with httpx.AsyncClient(timeout=self._timeout) as client:
  92. try:
  93. r = await client.get(f"{self._base}/api/traces", params=params)
  94. if r.status_code == 200:
  95. data = r.json()
  96. traces = data.get("traces")
  97. if isinstance(traces, list):
  98. return traces
  99. except httpx.RequestError as e:
  100. logger.warning("TraceManager.list_traces HTTP 失败 err=%s", e)
  101. if workspace_id:
  102. return [m for m in self._local_meta.values() if m.get("workspace_id") == workspace_id]
  103. return list(self._local_meta.values())
  104. def get_workspace_id(self, trace_id: str) -> str:
  105. wid = self._wm.get_workspace_id_for_trace(trace_id)
  106. if wid:
  107. return wid
  108. m = self._local_meta.get(trace_id)
  109. if m and m.get("workspace_id"):
  110. return str(m["workspace_id"])
  111. raise LifecycleError(f"无法解析 Trace 的 workspace_id: {trace_id}")