""" 渠道层通用 Protocol——所有 IM 渠道(飞书、微信等)共用的最小接口约定。 各渠道可在自己的模块中声明更严格的子 Protocol(窄化参数类型), 实现 TraceBackend / ExecutorBackend 时只需满足此处的宽松签名即可跨渠道复用。 """ from __future__ import annotations from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable if TYPE_CHECKING: from fastapi import APIRouter @runtime_checkable class TraceBackend(Protocol): """渠道会话与 Agent ``trace_id``:先准备 Workspace,再在 API 返回后绑定。""" async def prepare_session( self, *, channel: str, user_id: str, workspace_id: str, agent_type: str, metadata: dict[str, object], ) -> None: ... async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None: ... async def bind_agent_trace_id( self, *, channel: str, user_id: str, workspace_id: str, agent_trace_id: str, agent_type: str, metadata: dict[str, object], ) -> None: ... @runtime_checkable class UserIdentityResolver(Protocol): """将渠道入站事件映射为网关内统一 user_id。 各渠道实现可声明更严格的事件类型(如 ``IncomingFeishuEvent``), runtime_checkable 仅校验方法存在性,不校验参数类型。 """ def resolve_user_id(self, event: Any) -> str: ... @runtime_checkable class ExecutorBackend(Protocol): """接收解析后的入站消息,触发业务处理,返回 task_id。 ``reply_context`` 与 ``event`` 的具体类型由各渠道自行约束; ``connector`` 须实现 ``send_text(reply_context, text) -> dict`` 以便向用户发送状态或结果。 """ async def handle_inbound_message( self, existing_agent_trace_id: str, text: str, reply_context: Any, connector: Any, *, event: Any, ) -> tuple[str, str]: """返回 ``(task_id, agent_trace_id)``;无已绑定 trace 时 ``existing_agent_trace_id`` 传空串。""" ... @runtime_checkable class ChannelPlugin(Protocol): """渠道插件接口——每个渠道的 Api 类须实现此接口以支持自动注册。 ``from_env()`` 负责从环境变量读取渠道配置并构造实例; ``build_router()`` 返回该渠道挂载到 FastAPI 的 ``APIRouter``。 """ @classmethod def from_env(cls) -> ChannelPlugin: ... def build_router(self) -> APIRouter: ...