""" Trace 元数据以 Agent API(HTTP)为准;Gateway 在 Agent 返回 trace_id 后通过 ``bind_agent_trace`` 登记引用。 ``get_trace`` / ``list_traces``:优先请求 Agent API,失败时返回本地登记信息。 """ from __future__ import annotations import logging from typing import Any import httpx from utils.env_parse import env_float, env_str from gateway.core.lifecycle.errors import LifecycleError from gateway.core.lifecycle.workspace import WorkspaceManager logger = logging.getLogger(__name__) class TraceManager: def __init__( self, *, workspace_manager: WorkspaceManager, agent_api_base_url: str, http_timeout: float, ) -> None: self._wm = workspace_manager self._base = agent_api_base_url.rstrip("/") self._timeout = http_timeout self._local_meta: dict[str, dict[str, Any]] = {} @classmethod def from_env(cls, workspace_manager: WorkspaceManager) -> TraceManager: return cls( workspace_manager=workspace_manager, agent_api_base_url=env_str("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000"), http_timeout=env_float("GATEWAY_AGENT_API_TIMEOUT", 60.0), ) async def prepare_workspace_session(self, workspace_id: str) -> None: await self._wm.ensure_session(workspace_id) async def bind_agent_trace( self, workspace_id: str, agent_trace_id: str, agent_type: str, metadata: dict[str, Any] | None = None, ) -> None: await self._wm.add_trace_ref(workspace_id, agent_trace_id) self._local_meta[agent_trace_id] = { "trace_id": agent_trace_id, "workspace_id": workspace_id, "agent_type": agent_type, "metadata": dict(metadata or {}), "agent_api_status": "bound", } async def release_agent_trace(self, workspace_id: str, agent_trace_id: str) -> None: """解除 Trace 与 Workspace 的绑定(Gateway 本地登记 + meta 引用)。""" await self._wm.remove_trace_ref(workspace_id, agent_trace_id) self._local_meta.pop(agent_trace_id, None) async def get_trace(self, trace_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self._timeout) as client: try: r = await client.get(f"{self._base}/api/traces/{trace_id}") if r.status_code == 200: body = r.json() trace = body.get("trace") if isinstance(trace, dict): return {"source": "agent_api", **trace} except httpx.RequestError as e: logger.warning("TraceManager.get_trace HTTP 失败 trace_id=%s err=%s", trace_id, e) local = self._local_meta.get(trace_id) if local: return {"source": "gateway_local", **local} wid = self._wm.get_workspace_id_for_trace(trace_id) if wid: return { "source": "gateway_local", "trace_id": trace_id, "workspace_id": wid, "agent_api_status": "unknown", } raise LifecycleError(f"Trace 不存在: {trace_id}") async def list_traces( self, workspace_id: str | None = None, agent_type: str | None = None, *, limit: int = 50, ) -> list[dict[str, Any]]: params: dict[str, str | int] = {"limit": min(limit, 100)} if agent_type: params["agent_type"] = agent_type # Agent API 使用 uid 过滤;飞书侧 uid 常为裸 user_id,workspace_id 形如 feishu: if workspace_id and ":" in workspace_id: prefix, _, rest = workspace_id.partition(":") if prefix == "feishu" and rest: params["uid"] = rest async with httpx.AsyncClient(timeout=self._timeout) as client: try: r = await client.get(f"{self._base}/api/traces", params=params) if r.status_code == 200: data = r.json() traces = data.get("traces") if isinstance(traces, list): return traces except httpx.RequestError as e: logger.warning("TraceManager.list_traces HTTP 失败 err=%s", e) if workspace_id: return [m for m in self._local_meta.values() if m.get("workspace_id") == workspace_id] return list(self._local_meta.values()) def get_workspace_id(self, trace_id: str) -> str: wid = self._wm.get_workspace_id_for_trace(trace_id) if wid: return wid m = self._local_meta.get(trace_id) if m and m.get("workspace_id"): return str(m["workspace_id"]) raise LifecycleError(f"无法解析 Trace 的 workspace_id: {trace_id}")