| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- """
- Trace 元数据以 Agent API(HTTP)为准;Gateway 在 Agent 返回 trace_id 后通过 ``bind_agent_trace`` 登记引用。
- ``get_trace`` / ``list_traces``:优先请求 Agent API,失败时返回本地登记信息。
- """
- from __future__ import annotations
- import logging
- from typing import Any
- import httpx
- from utils.env_parse import env_float, env_str
- from gateway.core.lifecycle.errors import LifecycleError
- from gateway.core.lifecycle.workspace import WorkspaceManager
- logger = logging.getLogger(__name__)
- class TraceManager:
- def __init__(
- self,
- *,
- workspace_manager: WorkspaceManager,
- agent_api_base_url: str,
- http_timeout: float,
- ) -> None:
- self._wm = workspace_manager
- self._base = agent_api_base_url.rstrip("/")
- self._timeout = http_timeout
- self._local_meta: dict[str, dict[str, Any]] = {}
- @classmethod
- def from_env(cls, workspace_manager: WorkspaceManager) -> TraceManager:
- return cls(
- workspace_manager=workspace_manager,
- agent_api_base_url=env_str("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000"),
- http_timeout=env_float("GATEWAY_AGENT_API_TIMEOUT", 60.0),
- )
- async def prepare_workspace_session(self, workspace_id: str) -> None:
- await self._wm.ensure_session(workspace_id)
- async def bind_agent_trace(
- self,
- workspace_id: str,
- agent_trace_id: str,
- agent_type: str,
- metadata: dict[str, Any] | None = None,
- ) -> None:
- await self._wm.add_trace_ref(workspace_id, agent_trace_id)
- self._local_meta[agent_trace_id] = {
- "trace_id": agent_trace_id,
- "workspace_id": workspace_id,
- "agent_type": agent_type,
- "metadata": dict(metadata or {}),
- "agent_api_status": "bound",
- }
- async def release_agent_trace(self, workspace_id: str, agent_trace_id: str) -> None:
- """解除 Trace 与 Workspace 的绑定(Gateway 本地登记 + meta 引用)。"""
- await self._wm.remove_trace_ref(workspace_id, agent_trace_id)
- self._local_meta.pop(agent_trace_id, None)
- async def get_trace(self, trace_id: str) -> dict[str, Any]:
- async with httpx.AsyncClient(timeout=self._timeout) as client:
- try:
- r = await client.get(f"{self._base}/api/traces/{trace_id}")
- if r.status_code == 200:
- body = r.json()
- trace = body.get("trace")
- if isinstance(trace, dict):
- return {"source": "agent_api", **trace}
- except httpx.RequestError as e:
- logger.warning("TraceManager.get_trace HTTP 失败 trace_id=%s err=%s", trace_id, e)
- local = self._local_meta.get(trace_id)
- if local:
- return {"source": "gateway_local", **local}
- wid = self._wm.get_workspace_id_for_trace(trace_id)
- if wid:
- return {
- "source": "gateway_local",
- "trace_id": trace_id,
- "workspace_id": wid,
- "agent_api_status": "unknown",
- }
- raise LifecycleError(f"Trace 不存在: {trace_id}")
- async def list_traces(
- self,
- workspace_id: str | None = None,
- agent_type: str | None = None,
- *,
- limit: int = 50,
- ) -> list[dict[str, Any]]:
- params: dict[str, str | int] = {"limit": min(limit, 100)}
- if agent_type:
- params["agent_type"] = agent_type
- # Agent API 使用 uid 过滤;飞书侧 uid 常为裸 user_id,workspace_id 形如 feishu:<uid>
- if workspace_id and ":" in workspace_id:
- prefix, _, rest = workspace_id.partition(":")
- if prefix == "feishu" and rest:
- params["uid"] = rest
- async with httpx.AsyncClient(timeout=self._timeout) as client:
- try:
- r = await client.get(f"{self._base}/api/traces", params=params)
- if r.status_code == 200:
- data = r.json()
- traces = data.get("traces")
- if isinstance(traces, list):
- return traces
- except httpx.RequestError as e:
- logger.warning("TraceManager.list_traces HTTP 失败 err=%s", e)
- if workspace_id:
- return [m for m in self._local_meta.values() if m.get("workspace_id") == workspace_id]
- return list(self._local_meta.values())
- def get_workspace_id(self, trace_id: str) -> str:
- wid = self._wm.get_workspace_id_for_trace(trace_id)
- if wid:
- return wid
- m = self._local_meta.get(trace_id)
- if m and m.get("workspace_id"):
- return str(m["workspace_id"])
- raise LifecycleError(f"无法解析 Trace 的 workspace_id: {trace_id}")
|