# Channels(外部渠道接入) **模块:** `gateway/core/channels/` ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现 1. **文档分层,链接代码** - 关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录已确认的设计 --- ## 模块职责 外部渠道接入,包括: - **飞书集成**:接收飞书消息,发送回复 - **消息路由**:将飞书消息路由到对应的 Agent - **渠道管理**:管理渠道配置和状态 **说明:** Channels 模块只用于个人助理型 Agent 的飞书接入(使命/职能对话) --- ## 核心概念 ### Channel(渠道) - **定义**:外部消息来源(飞书、微信等) - **特点**: - 每个渠道有独立的配置 - 每个渠道有独立的消息处理逻辑 ### 路由规则 - **个人助理型 Agent**: - 飞书用户 ID → Trace ID - 每个用户有独立的 Trace - 首次对话自动创建 Trace ### 消息流转 ``` 飞书用户 → 飞书 Webhook → Channels 模块 → Executor 模块 → Agent 执行 → Executor → Channels → 飞书 ``` --- ## 模块结构 ``` gateway/core/channels/ ├── feishu/ # 飞书集成 │ ├── connector.py # 飞书连接器 │ ├── webhook.py # Webhook 处理 │ └── api.py # 飞书 API 调用 │ ├── router.py # 消息路由 └── channel_manager.py # 渠道管理 ``` --- ## 关键功能 ### FeishuConnector **实现位置:** `gateway/core/channels/feishu/connector.py` **职责:** - 接收飞书 Webhook 消息 - 调用飞书 API 发送回复 - 管理飞书连接状态 **核心接口:** ```python class FeishuConnector: def __init__(self, app_id: str, app_secret: str): """初始化飞书连接器""" pass def handle_webhook(self, event: dict) -> dict: """处理飞书 Webhook 事件""" pass def send_message(self, user_id: str, text: str): """发送消息给飞书用户""" pass def get_user_info(self, user_id: str) -> dict: """获取飞书用户信息""" pass ``` ### MessageRouter **实现位置:** `gateway/core/channels/router.py` **职责:** - 根据飞书用户 ID 查找对应的 Trace - 如果 Trace 不存在,自动创建 - 将消息转换为任务提交给 Executor **核心接口:** ```python class MessageRouter: def route_message(self, channel: str, user_id: str, message: dict) -> str: """路由消息,返回 task_id""" pass def get_trace_id(self, channel: str, user_id: str) -> str: """获取或创建 Trace ID""" pass def create_trace_for_user(self, channel: str, user_id: str) -> str: """为用户创建 Trace""" pass ``` ### ChannelManager **实现位置:** `gateway/core/channels/channel_manager.py` **职责:** - 管理所有渠道的配置 - 启动和停止渠道 - 监控渠道状态 **核心接口:** ```python class ChannelManager: def register_channel(self, channel_id: str, config: dict): """注册渠道""" pass def start_channel(self, channel_id: str): """启动渠道""" pass def stop_channel(self, channel_id: str): """停止渠道""" pass def get_channel_status(self, channel_id: str) -> dict: """获取渠道状态""" pass ``` --- ## 典型流程 ### 用户首次通过飞书发送消息 1. 飞书用户发送消息 2. 飞书 Webhook 触发,调用 `FeishuConnector.handle_webhook()` 3. FeishuConnector 提取用户 ID 和消息内容 4. 调用 `MessageRouter.route_message()` 5. MessageRouter 查询是否有对应的 Trace 6. 无 Trace → 调用 `Lifecycle.TraceManager.create_trace()` 7. 创建 Trace(workspace_id=feishu_user_id, agent_type="personal_assistant") 8. 调用 `Executor.TaskManager.submit_task()` 9. Executor 调度 Agent 执行 10. Agent 执行完成,返回结果 11. Executor 调用 `FeishuConnector.send_message()` 发送回复 12. 用户在飞书收到回复 ### 用户再次发送消息 1. 飞书用户发送消息 2. FeishuConnector 处理 Webhook 3. MessageRouter 查询到已有 Trace 4. 直接提交任务给 Executor 5. Agent 执行并回复 --- ## 与其他模块的集成 ### 与 Lifecycle 模块 ```python # 创建 Trace from gateway.core.lifecycle import TraceManager trace_mgr = TraceManager() trace_id = trace_mgr.create_trace( workspace_id=f"feishu:{user_id}", agent_type="personal_assistant" ) ``` ### 与 Executor 模块 ```python # 提交任务 from gateway.core.executor import TaskManager task_mgr = TaskManager() task_id = task_mgr.submit_task( trace_id=trace_id, task_description=message_text, mode="async" ) ``` --- ## 配置示例 ```yaml # config.yaml channels: feishu: enabled: true app_id: "cli_xxx" app_secret: "xxx" webhook_url: "https://gateway.example.com/webhook/feishu" # 路由配置 routing: # 自动为新用户创建 Trace auto_create_trace: true # Workspace ID 前缀 workspace_prefix: "feishu" ``` --- ## 与 IM Server 的区别 ### Gateway Channels(个人助理型) - **对话性质**:使命/职能对话 - **飞书账号**:个人助理账号 - **Trace 管理**:每个用户独立 Trace - **消息处理**:转换为任务提交给 Executor ### IM Server Channels(数字员工型) - **对话性质**:主体间平等交流 - **飞书账号**:数字员工账号 - **Trace 管理**:所有用户共享 Trace - **消息处理**:通过 IM Client 管理消息队列 --- ## 错误处理 ### Webhook 处理失败 - 飞书 Webhook 验证失败 → 返回 401 - 消息格式错误 → 记录日志,返回 400 - 内部错误 → 返回 500,通知管理员 ### 任务提交失败 - Trace 创建失败 → 向用户发送错误提示 - Executor 繁忙 → 消息入队,稍后处理 - Agent 执行超时 → 向用户发送超时提示 ### 飞书 API 调用失败 - Token 过期 → 自动刷新 Token - 限流 → 等待后重试 - 网络错误 → 重试 3 次,失败后记录日志 --- ## 相关文档 - [需求规划](../requirements.md):外部渠道接入需求 - [架构设计](../architecture.md):模块在整体架构中的位置 - [Lifecycle 模块](./lifecycle.md):生命周期管理模块 - [Executor 模块](./executor.md):任务执行调度模块