protocols.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """
  2. 渠道层通用 Protocol——所有 IM 渠道(飞书、微信等)共用的最小接口约定。
  3. 各渠道可在自己的模块中声明更严格的子 Protocol(窄化参数类型),
  4. 实现 TraceBackend / ExecutorBackend 时只需满足此处的宽松签名即可跨渠道复用。
  5. """
  6. from __future__ import annotations
  7. from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
  8. if TYPE_CHECKING:
  9. from fastapi import APIRouter
  10. @runtime_checkable
  11. class TraceBackend(Protocol):
  12. """渠道会话与 Agent ``trace_id``:先准备 Workspace,再在 API 返回后绑定。"""
  13. async def prepare_session(
  14. self,
  15. *,
  16. channel: str,
  17. user_id: str,
  18. workspace_id: str,
  19. agent_type: str,
  20. metadata: dict[str, object],
  21. ) -> None:
  22. ...
  23. async def get_existing_trace_id(self, channel: str, user_id: str) -> str | None:
  24. ...
  25. async def bind_agent_trace_id(
  26. self,
  27. *,
  28. channel: str,
  29. user_id: str,
  30. workspace_id: str,
  31. agent_trace_id: str,
  32. agent_type: str,
  33. metadata: dict[str, object],
  34. ) -> None:
  35. ...
  36. @runtime_checkable
  37. class UserIdentityResolver(Protocol):
  38. """将渠道入站事件映射为网关内统一 user_id。
  39. 各渠道实现可声明更严格的事件类型(如 ``IncomingFeishuEvent``),
  40. runtime_checkable 仅校验方法存在性,不校验参数类型。
  41. """
  42. def resolve_user_id(self, event: Any) -> str:
  43. ...
  44. @runtime_checkable
  45. class ExecutorBackend(Protocol):
  46. """接收解析后的入站消息,触发业务处理,返回 task_id。
  47. ``reply_context`` 与 ``event`` 的具体类型由各渠道自行约束;
  48. ``connector`` 须实现 ``send_text(reply_context, text) -> dict`` 以便向用户发送状态或结果。
  49. """
  50. async def handle_inbound_message(
  51. self,
  52. existing_agent_trace_id: str,
  53. text: str,
  54. reply_context: Any,
  55. connector: Any,
  56. *,
  57. event: Any,
  58. ) -> tuple[str, str]:
  59. """返回 ``(task_id, agent_trace_id)``;无已绑定 trace 时 ``existing_agent_trace_id`` 传空串。"""
  60. ...
  61. @runtime_checkable
  62. class ChannelPlugin(Protocol):
  63. """渠道插件接口——每个渠道的 Api 类须实现此接口以支持自动注册。
  64. ``from_env()`` 负责从环境变量读取渠道配置并构造实例;
  65. ``build_router()`` 返回该渠道挂载到 FastAPI 的 ``APIRouter``。
  66. """
  67. @classmethod
  68. def from_env(cls) -> ChannelPlugin:
  69. ...
  70. def build_router(self) -> APIRouter:
  71. ...