| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- from __future__ import annotations
- from typing import Protocol, runtime_checkable
- @runtime_checkable
- class TraceBackend(Protocol):
- """与 Lifecycle.TraceManager 对接前的抽象:按渠道用户解析 trace_id。"""
- async def get_or_create_trace(
- self,
- *,
- channel: str,
- user_id: str,
- workspace_id: str,
- agent_type: str,
- metadata: dict[str, object],
- ) -> str:
- ...
- @runtime_checkable
- class ChannelRegistration(Protocol):
- """注册到 ``ChannelRegistry`` 的渠道配置需至少提供 ``enabled``。"""
- enabled: bool
- class ChannelRegistry:
- """
- 与具体 IM 无关:渠道注册、启停、状态查询。
- """
- def __init__(self) -> None:
- self._registry: dict[str, ChannelRegistration] = {}
- self._running: dict[str, bool] = {}
- def register_channel(self, channel_id: str, cfg: ChannelRegistration) -> None:
- self._registry[channel_id] = cfg
- if channel_id not in self._running:
- self._running[channel_id] = cfg.enabled
- def start_channel(self, channel_id: str) -> None:
- self._running[channel_id] = True
- def stop_channel(self, channel_id: str) -> None:
- self._running[channel_id] = False
- def get_channel_status(self, channel_id: str) -> dict[str, str | bool]:
- cfg = self._registry.get(channel_id)
- return {
- "channel_id": channel_id,
- "registered": cfg is not None,
- "running": bool(self._running.get(channel_id)),
- "enabled": bool(cfg and cfg.enabled),
- }
|