| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- """
- 实现 ``gateway.core.channels.protocols.TraceBackend``:
- prepare_workspace → bind_agent_trace(与 Agent API 返回的 trace_id 对齐)。
- """
- from __future__ import annotations
- import asyncio
- import logging
- from typing import Any
- from gateway.core.lifecycle.trace.manager import TraceManager
- logger = logging.getLogger(__name__)
- class LifecycleTraceBackend:
- def __init__(self, trace_manager: TraceManager) -> None:
- self._tm = trace_manager
- self._lock = asyncio.Lock()
- self._channel_user_trace: dict[tuple[str, str], str] = {}
- async def prepare_session(
- self,
- *,
- channel: str,
- user_id: str,
- workspace_id: str,
- agent_type: str,
- metadata: dict[str, object],
- ) -> None:
- _ = user_id, agent_type, metadata
- await self._tm.prepare_workspace_session(workspace_id)
- async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None:
- async with self._lock:
- return self._channel_user_trace.get((channel, user_id))
- async def bind_agent_trace_id(
- self,
- *,
- channel: str,
- user_id: str,
- workspace_id: str,
- agent_trace_id: str,
- agent_type: str,
- metadata: dict[str, object],
- ) -> None:
- key = (channel, user_id)
- async with self._lock:
- prev_tid = self._channel_user_trace.get(key)
- if prev_tid == agent_trace_id:
- return
- if prev_tid:
- await self._tm.release_agent_trace(workspace_id, prev_tid)
- logger.info(
- "Lifecycle: 已解除旧 trace_id=%s workspace_id=%s(将绑定新 trace)",
- prev_tid,
- workspace_id,
- )
- meta_any: dict[str, Any] = {k: v for k, v in metadata.items()}
- meta_any["channel"] = channel
- meta_any["user_id"] = user_id
- await self._tm.bind_agent_trace(
- workspace_id,
- agent_trace_id,
- agent_type,
- metadata=meta_any,
- )
- async with self._lock:
- self._channel_user_trace[key] = agent_trace_id
- logger.info(
- "Lifecycle: 已绑定 Agent trace_id=%s workspace_id=%s channel=%s user=%s",
- agent_trace_id,
- workspace_id,
- channel,
- user_id,
- )
- async def forget_trace_binding(self, channel: str, user_id: str, *, workspace_id: str) -> None:
- """清除渠道侧 (channel,user)→trace 映射,并 release_agent_trace(用于 Trace 终态后放弃续跑同 trace)。"""
- async with self._lock:
- tid = self._channel_user_trace.pop((channel, user_id), None)
- if tid:
- await self._tm.release_agent_trace(workspace_id, tid)
- logger.info(
- "Lifecycle: 已 forget 绑定 trace_id=%s workspace_id=%s channel=%s user=%s",
- tid,
- workspace_id,
- channel,
- user_id,
- )
|