channels.md 7.8 KB

Channels(外部渠道接入)

模块: gateway/core/channels/

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录已确认的设计

模块职责

外部渠道接入,包括:

  • 飞书集成:接收飞书消息,发送回复
  • 消息路由:将飞书消息路由到对应的 Agent
  • 渠道管理:管理渠道配置和状态

说明: Channels 模块只用于个人助理型 Agent 的飞书接入(使命/职能对话)


核心概念

Channel(渠道)

  • 定义:外部消息来源(飞书、微信等)
  • 特点
    • 每个渠道有独立的配置
    • 每个渠道有独立的消息处理逻辑

路由规则

  • 个人助理型 Agent
    • 飞书用户 ID → Trace ID
    • 每个用户有独立的 Trace
    • 首次对话自动创建 Trace

消息流转

飞书用户 → 飞书 Webhook → Channels 模块 → Executor 模块 → Agent 执行 → Executor → Channels → 飞书

模块结构

gateway/core/channels/
├── manager.py                 # ChannelRegistry(通用注册/启停/状态)
├── router.py                  # ChannelTraceRouter(与 IM 无关的 trace 解析)
├── loader.py                  # 渠道加载入口
├── types.py                   # RouteResult 等
├── protocols.py               # TraceBackend 等协议
│
└── feishu/                    # 飞书渠道
    ├── manager.py             # FeishuChannelManager、FeishuChannelConfig
    ├── connector.py           # FeishuConnector(HTTP 入站解析等)
    ├── router.py              # FeishuMessageRouter
    ├── bridge.py              # FeishuHttpRunApiExecutor(调 Gateway Executor / Agent)
    ├── api.py                 # FastAPI 路由装配(Webhook 等)
    ├── identity.py            # 用户身份解析
    └── types.py               # 飞书侧类型

飞书侧还包含 openclaw-lark 等子项目(Node/TS IM 逻辑),Compose 中可由独立 feishu 服务承载 HTTP;Gateway 通过 FEISHU_HTTP_BASE_URL 等与其通信。


关键功能

FeishuConnector

实现位置: gateway/core/channels/feishu/connector.py

职责:

  • 接收飞书 Webhook 消息
  • 调用飞书 API 发送回复
  • 管理飞书连接状态

核心接口:

class FeishuConnector:
    def __init__(self, app_id: str, app_secret: str):
        """初始化飞书连接器"""
        pass

    def handle_webhook(self, event: dict) -> dict:
        """处理飞书 Webhook 事件"""
        pass

    def send_message(self, user_id: str, text: str):
        """发送消息给飞书用户"""
        pass

    def get_user_info(self, user_id: str) -> dict:
        """获取飞书用户信息"""
        pass

ChannelTraceRouter

实现位置: gateway/core/channels/router.py

职责: 与具体 IM 无关;按 workspace_prefix 生成 workspace_id,并委托 TraceBackend 解析已绑定的 Agent trace_id在 Gateway 内预分配 UUID)。

class ChannelTraceRouter:
    async def get_trace_id(
        self, channel: str, user_id: str, *, create_if_missing: bool = True
    ) -> str: ...

    async def create_trace_for_user(self, channel: str, user_id: str) -> str: ...

FeishuMessageRouter

实现位置: gateway/core/channels/feishu/router.py

职责: 飞书消息 → workspace_id / 路由决策;与 FeishuChannelManagerbridge 协同。

FeishuChannelManager 与 Bridge

实现位置: gateway/core/channels/feishu/manager.pyfeishu/bridge.py

  • FeishuChannelManager:组装配置(FeishuChannelConfig)、依赖(Trace/Workspace/Executor)、Webhook 处理与跟单策略。
  • FeishuHttpRunApiExecutor:在渠道流程中调用 Gateway Executor(或等价 HTTP)提交 submit_task,并在 Agent 返回后 TraceManager.bind_agent_trace
  • 配置项含 workspace_prefixauto_create_tracedispatch_card_actionsstop_container_on_trace_terminal 等(见 FeishuChannelConfig 字段注释)。

ChannelRegistry

实现位置: gateway/core/channels/manager.py

职责: 通用渠道注册表、启停与状态查询(register_channelstart_channelstop_channelget_channel_status)。


典型流程

用户首次通过飞书发送消息(与当前实现一致)

  1. 飞书 Webhook → FeishuConnector / api 解析事件与用户标识。
  2. 解析 workspace_id(如 feishu:<open_id>);TraceManager.prepare_workspace_session(workspace_id)Workspace 目录 + 沙箱ensure_session)。
  3. 通过 Bridge 调用 Executor:TaskManager.submit_task;内部 HTTP 调 Agent POST .../run 创建/续跑 Trace。
  4. Agent 返回 trace_id 后,TraceManager.bind_agent_trace(workspace_id, trace_id, ...)
  5. 渠道侧跟单(WebSocket/HTTP)将 Assistant 内容回写飞书;Typing、卡片回调等由 FeishuChannelConfig 开关控制。

用户再次发送消息

  1. 已绑定 trace_id 时,可直接 submit_task(trace_id, ...) 续跑。
  2. Workspace/沙箱通常 跨多轮复用(默认不在单次 Trace 终态停容器,见 stop_container_on_trace_terminal)。

与其他模块的集成

与 Lifecycle 模块

# 准备会话目录与沙箱;Agent 返回 trace_id 后再绑定
await trace_mgr.prepare_workspace_session(f"feishu:{user_id}")
# ... Agent run 得到 trace_id 后:
await trace_mgr.bind_agent_trace(
    f"feishu:{user_id}",
    agent_trace_id=trace_id,
    agent_type="personal_assistant",
)

详见 Lifecycle 模块(目录布局、命名卷、volume_subpath)。

与 Executor 模块

task_id = await task_mgr.submit_task(
    trace_id=trace_id,
    task_description=message_text,
    mode="async",
)

配置示例

# config.yaml
channels:
  feishu:
    enabled: true
    app_id: "cli_xxx"
    app_secret: "xxx"
    webhook_url: "https://gateway.example.com/webhook/feishu"

    # 路由配置
    routing:
      # 自动为新用户创建 Trace
      auto_create_trace: true
      # Workspace ID 前缀
      workspace_prefix: "feishu"

与 IM Server 的区别

Gateway Channels(个人助理型)

  • 对话性质:使命/职能对话
  • 飞书账号:个人助理账号
  • Trace 管理:每个用户独立 Trace
  • 消息处理:转换为任务提交给 Executor

IM Server Channels(数字员工型)

  • 对话性质:主体间平等交流
  • 飞书账号:数字员工账号
  • Trace 管理:所有用户共享 Trace
  • 消息处理:通过 IM Client 管理消息队列

错误处理

Webhook 处理失败

  • 飞书 Webhook 验证失败 → 返回 401
  • 消息格式错误 → 记录日志,返回 400
  • 内部错误 → 返回 500,通知管理员

任务提交失败

  • Trace 创建失败 → 向用户发送错误提示
  • Executor 繁忙 → 消息入队,稍后处理
  • Agent 执行超时 → 向用户发送超时提示

飞书 API 调用失败

  • Token 过期 → 自动刷新 Token
  • 限流 → 等待后重试
  • 网络错误 → 重试 3 次,失败后记录日志

相关文档

  • 需求规划:外部渠道接入需求
  • 架构设计:模块在整体架构中的位置
  • Lifecycle 模块:Workspace、沙箱卷、Trace 绑定
  • Executor 模块:任务提交、gateway_exec、Agent HTTP
  • 仓库根目录 docker-compose.ymlgateway / feishu / api 服务与环境变量