# Channels(外部渠道接入) **模块:** `gateway/core/channels/` ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现 1. **文档分层,链接代码** - 关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录已确认的设计 --- ## 模块职责 外部渠道接入,包括: - **飞书集成**:接收飞书消息,发送回复 - **消息路由**:将飞书消息路由到对应的 Agent - **渠道管理**:管理渠道配置和状态 **说明:** Channels 模块只用于个人助理型 Agent 的飞书接入(使命/职能对话) --- ## 核心概念 ### Channel(渠道) - **定义**:外部消息来源(飞书、微信等) - **特点**: - 每个渠道有独立的配置 - 每个渠道有独立的消息处理逻辑 ### 路由规则 - **个人助理型 Agent**: - 飞书用户 ID → Trace ID - 每个用户有独立的 Trace - 首次对话自动创建 Trace ### 消息流转 ``` 飞书用户 → 飞书 Webhook → Channels 模块 → Executor 模块 → Agent 执行 → Executor → Channels → 飞书 ``` --- ## 模块结构 ``` gateway/core/channels/ ├── manager.py # ChannelRegistry(通用注册/启停/状态) ├── router.py # ChannelTraceRouter(与 IM 无关的 trace 解析) ├── loader.py # 渠道加载入口 ├── types.py # RouteResult 等 ├── protocols.py # TraceBackend 等协议 │ └── feishu/ # 飞书渠道 ├── manager.py # FeishuChannelManager、FeishuChannelConfig ├── connector.py # FeishuConnector(HTTP 入站解析等) ├── router.py # FeishuMessageRouter ├── bridge.py # FeishuHttpRunApiExecutor(调 Gateway Executor / Agent) ├── api.py # FastAPI 路由装配(Webhook 等) ├── identity.py # 用户身份解析 └── types.py # 飞书侧类型 ``` 飞书侧还包含 `openclaw-lark` 等子项目(Node/TS IM 逻辑),Compose 中可由独立 `feishu` 服务承载 HTTP;Gateway 通过 `FEISHU_HTTP_BASE_URL` 等与其通信。 --- ## 关键功能 ### FeishuConnector **实现位置:** `gateway/core/channels/feishu/connector.py` **职责:** - 接收飞书 Webhook 消息 - 调用飞书 API 发送回复 - 管理飞书连接状态 **核心接口:** ```python class FeishuConnector: def __init__(self, app_id: str, app_secret: str): """初始化飞书连接器""" pass def handle_webhook(self, event: dict) -> dict: """处理飞书 Webhook 事件""" pass def send_message(self, user_id: str, text: str): """发送消息给飞书用户""" pass def get_user_info(self, user_id: str) -> dict: """获取飞书用户信息""" pass ``` ### ChannelTraceRouter **实现位置:** `gateway/core/channels/router.py` **职责:** 与具体 IM 无关;按 `workspace_prefix` 生成 `workspace_id`,并委托 `TraceBackend` 解析已绑定的 Agent `trace_id`(**不**在 Gateway 内预分配 UUID)。 ```python class ChannelTraceRouter: async def get_trace_id( self, channel: str, user_id: str, *, create_if_missing: bool = True ) -> str: ... async def create_trace_for_user(self, channel: str, user_id: str) -> str: ... ``` ### FeishuMessageRouter **实现位置:** `gateway/core/channels/feishu/router.py` **职责:** 飞书消息 → `workspace_id` / 路由决策;与 `FeishuChannelManager`、`bridge` 协同。 ### FeishuChannelManager 与 Bridge **实现位置:** `gateway/core/channels/feishu/manager.py`、`feishu/bridge.py` - **FeishuChannelManager**:组装配置(`FeishuChannelConfig`)、依赖(Trace/Workspace/Executor)、Webhook 处理与跟单策略。 - **FeishuHttpRunApiExecutor**:在渠道流程中调用 Gateway **Executor**(或等价 HTTP)提交 `submit_task`,并在 Agent 返回后 `TraceManager.bind_agent_trace`。 - 配置项含 `workspace_prefix`、`auto_create_trace`、`dispatch_card_actions`、`stop_container_on_trace_terminal` 等(见 `FeishuChannelConfig` 字段注释)。 ### ChannelRegistry **实现位置:** `gateway/core/channels/manager.py` **职责:** 通用渠道注册表、启停与状态查询(`register_channel`、`start_channel`、`stop_channel`、`get_channel_status`)。 --- ## 典型流程 ### 用户首次通过飞书发送消息(与当前实现一致) 1. 飞书 Webhook → `FeishuConnector` / `api` 解析事件与用户标识。 2. 解析 `workspace_id`(如 `feishu:`);`TraceManager.prepare_workspace_session(workspace_id)` → **Workspace 目录 + 沙箱**(`ensure_session`)。 3. 通过 **Bridge** 调用 Executor:`TaskManager.submit_task`;内部 HTTP 调 Agent `POST .../run` 创建/续跑 Trace。 4. Agent 返回 `trace_id` 后,`TraceManager.bind_agent_trace(workspace_id, trace_id, ...)`。 5. 渠道侧跟单(WebSocket/HTTP)将 Assistant 内容回写飞书;Typing、卡片回调等由 `FeishuChannelConfig` 开关控制。 ### 用户再次发送消息 1. 已绑定 `trace_id` 时,可直接 `submit_task(trace_id, ...)` 续跑。 2. Workspace/沙箱通常 **跨多轮复用**(默认不在单次 Trace 终态停容器,见 `stop_container_on_trace_terminal`)。 --- ## 与其他模块的集成 ### 与 Lifecycle 模块 ```python # 准备会话目录与沙箱;Agent 返回 trace_id 后再绑定 await trace_mgr.prepare_workspace_session(f"feishu:{user_id}") # ... Agent run 得到 trace_id 后: await trace_mgr.bind_agent_trace( f"feishu:{user_id}", agent_trace_id=trace_id, agent_type="personal_assistant", ) ``` 详见 [Lifecycle 模块](./lifecycle.md)(目录布局、命名卷、`volume_subpath`)。 ### 与 Executor 模块 ```python task_id = await task_mgr.submit_task( trace_id=trace_id, task_description=message_text, mode="async", ) ``` --- ## 配置示例 ```yaml # config.yaml channels: feishu: enabled: true app_id: "cli_xxx" app_secret: "xxx" webhook_url: "https://gateway.example.com/webhook/feishu" # 路由配置 routing: # 自动为新用户创建 Trace auto_create_trace: true # Workspace ID 前缀 workspace_prefix: "feishu" ``` --- ## 与 IM Server 的区别 ### Gateway Channels(个人助理型) - **对话性质**:使命/职能对话 - **飞书账号**:个人助理账号 - **Trace 管理**:每个用户独立 Trace - **消息处理**:转换为任务提交给 Executor ### IM Server Channels(数字员工型) - **对话性质**:主体间平等交流 - **飞书账号**:数字员工账号 - **Trace 管理**:所有用户共享 Trace - **消息处理**:通过 IM Client 管理消息队列 --- ## 错误处理 ### Webhook 处理失败 - 飞书 Webhook 验证失败 → 返回 401 - 消息格式错误 → 记录日志,返回 400 - 内部错误 → 返回 500,通知管理员 ### 任务提交失败 - Trace 创建失败 → 向用户发送错误提示 - Executor 繁忙 → 消息入队,稍后处理 - Agent 执行超时 → 向用户发送超时提示 ### 飞书 API 调用失败 - Token 过期 → 自动刷新 Token - 限流 → 等待后重试 - 网络错误 → 重试 3 次,失败后记录日志 --- ## 相关文档 - [需求规划](../requirements.md):外部渠道接入需求 - [架构设计](../architecture.md):模块在整体架构中的位置 - [Lifecycle 模块](./lifecycle.md):Workspace、沙箱卷、Trace 绑定 - [Executor 模块](./executor.md):任务提交、`gateway_exec`、Agent HTTP - 仓库根目录 `docker-compose.yml`:`gateway` / `feishu` / `api` 服务与环境变量