""" 实现 ``gateway.core.channels.protocols.TraceBackend``: prepare_workspace → bind_agent_trace(与 Agent API 返回的 trace_id 对齐)。 """ from __future__ import annotations import asyncio import logging from typing import Any from gateway.core.lifecycle.trace.manager import TraceManager logger = logging.getLogger(__name__) class LifecycleTraceBackend: def __init__( self, trace_manager: TraceManager, *, release_previous_trace_ref_on_bind: bool = False, ) -> None: self._tm = trace_manager self._release_previous_trace_ref_on_bind = release_previous_trace_ref_on_bind self._lock = asyncio.Lock() self._channel_user_trace: dict[tuple[str, str], str] = {} async def prepare_session( self, *, channel: str, user_id: str, workspace_id: str, agent_type: str, metadata: dict[str, object], ) -> None: _ = user_id, agent_type, metadata await self._tm.prepare_workspace_session(workspace_id) async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None: async with self._lock: return self._channel_user_trace.get((channel, user_id)) async def bind_agent_trace_id( self, *, channel: str, user_id: str, workspace_id: str, agent_trace_id: str, agent_type: str, metadata: dict[str, object], ) -> None: key = (channel, user_id) async with self._lock: prev_tid = self._channel_user_trace.get(key) if prev_tid == agent_trace_id: return if prev_tid and self._release_previous_trace_ref_on_bind: await self._tm.release_agent_trace(workspace_id, prev_tid) logger.info( "Lifecycle: 已解除旧 trace_id=%s workspace_id=%s(将绑定新 trace)", prev_tid, workspace_id, ) meta_any: dict[str, Any] = {k: v for k, v in metadata.items()} meta_any["channel"] = channel meta_any["user_id"] = user_id await self._tm.bind_agent_trace( workspace_id, agent_trace_id, agent_type, metadata=meta_any, ) async with self._lock: self._channel_user_trace[key] = agent_trace_id logger.info( "Lifecycle: 已绑定 Agent trace_id=%s workspace_id=%s channel=%s user=%s", agent_trace_id, workspace_id, channel, user_id, ) async def forget_trace_binding(self, channel: str, user_id: str, *, workspace_id: str) -> None: """清除渠道侧 (channel,user)→trace 映射,并 release_agent_trace(用于 Trace 终态后放弃续跑同 trace)。""" async with self._lock: tid = self._channel_user_trace.pop((channel, user_id), None) if tid: await self._tm.release_agent_trace(workspace_id, tid) logger.info( "Lifecycle: 已 forget 绑定 trace_id=%s workspace_id=%s channel=%s user=%s", tid, workspace_id, channel, user_id, )