channels.md 6.5 KB

Channels(外部渠道接入)

模块: gateway/core/channels/

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录已确认的设计

模块职责

外部渠道接入,包括:

  • 飞书集成:接收飞书消息,发送回复
  • 消息路由:将飞书消息路由到对应的 Agent
  • 渠道管理:管理渠道配置和状态

说明: Channels 模块只用于个人助理型 Agent 的飞书接入(使命/职能对话)


核心概念

Channel(渠道)

  • 定义:外部消息来源(飞书、微信等)
  • 特点
    • 每个渠道有独立的配置
    • 每个渠道有独立的消息处理逻辑

路由规则

  • 个人助理型 Agent
    • 飞书用户 ID → Trace ID
    • 每个用户有独立的 Trace
    • 首次对话自动创建 Trace

消息流转

飞书用户 → 飞书 Webhook → Channels 模块 → Executor 模块 → Agent 执行 → Executor → Channels → 飞书

模块结构

gateway/core/channels/
├── feishu/                    # 飞书集成
│   ├── connector.py           # 飞书连接器
│   ├── webhook.py             # Webhook 处理
│   └── api.py                 # 飞书 API 调用
│
├── router.py                  # 消息路由
└── channel_manager.py         # 渠道管理

关键功能

FeishuConnector

实现位置: gateway/core/channels/feishu/connector.py

职责:

  • 接收飞书 Webhook 消息
  • 调用飞书 API 发送回复
  • 管理飞书连接状态

核心接口:

class FeishuConnector:
    def __init__(self, app_id: str, app_secret: str):
        """初始化飞书连接器"""
        pass

    def handle_webhook(self, event: dict) -> dict:
        """处理飞书 Webhook 事件"""
        pass

    def send_message(self, user_id: str, text: str):
        """发送消息给飞书用户"""
        pass

    def get_user_info(self, user_id: str) -> dict:
        """获取飞书用户信息"""
        pass

MessageRouter

实现位置: gateway/core/channels/router.py

职责:

  • 根据飞书用户 ID 查找对应的 Trace
  • 如果 Trace 不存在,自动创建
  • 将消息转换为任务提交给 Executor

核心接口:

class MessageRouter:
    def route_message(self, channel: str, user_id: str, message: dict) -> str:
        """路由消息,返回 task_id"""
        pass

    def get_trace_id(self, channel: str, user_id: str) -> str:
        """获取或创建 Trace ID"""
        pass

    def create_trace_for_user(self, channel: str, user_id: str) -> str:
        """为用户创建 Trace"""
        pass

ChannelManager

实现位置: gateway/core/channels/channel_manager.py

职责:

  • 管理所有渠道的配置
  • 启动和停止渠道
  • 监控渠道状态

核心接口:

class ChannelManager:
    def register_channel(self, channel_id: str, config: dict):
        """注册渠道"""
        pass

    def start_channel(self, channel_id: str):
        """启动渠道"""
        pass

    def stop_channel(self, channel_id: str):
        """停止渠道"""
        pass

    def get_channel_status(self, channel_id: str) -> dict:
        """获取渠道状态"""
        pass

典型流程

用户首次通过飞书发送消息

  1. 飞书用户发送消息
  2. 飞书 Webhook 触发,调用 FeishuConnector.handle_webhook()
  3. FeishuConnector 提取用户 ID 和消息内容
  4. 调用 MessageRouter.route_message()
  5. MessageRouter 查询是否有对应的 Trace
  6. 无 Trace → 调用 Lifecycle.TraceManager.create_trace()
  7. 创建 Trace(workspace_id=feishu_user_id, agent_type="personal_assistant")
  8. 调用 Executor.TaskManager.submit_task()
  9. Executor 调度 Agent 执行
  10. Agent 执行完成,返回结果
  11. Executor 调用 FeishuConnector.send_message() 发送回复
  12. 用户在飞书收到回复

用户再次发送消息

  1. 飞书用户发送消息
  2. FeishuConnector 处理 Webhook
  3. MessageRouter 查询到已有 Trace
  4. 直接提交任务给 Executor
  5. Agent 执行并回复

与其他模块的集成

与 Lifecycle 模块

# 创建 Trace
from gateway.core.lifecycle import TraceManager

trace_mgr = TraceManager()
trace_id = trace_mgr.create_trace(
    workspace_id=f"feishu:{user_id}",
    agent_type="personal_assistant"
)

与 Executor 模块

# 提交任务
from gateway.core.executor import TaskManager

task_mgr = TaskManager()
task_id = task_mgr.submit_task(
    trace_id=trace_id,
    task_description=message_text,
    mode="async"
)

配置示例

# config.yaml
channels:
  feishu:
    enabled: true
    app_id: "cli_xxx"
    app_secret: "xxx"
    webhook_url: "https://gateway.example.com/webhook/feishu"

    # 路由配置
    routing:
      # 自动为新用户创建 Trace
      auto_create_trace: true
      # Workspace ID 前缀
      workspace_prefix: "feishu"

与 IM Server 的区别

Gateway Channels(个人助理型)

  • 对话性质:使命/职能对话
  • 飞书账号:个人助理账号
  • Trace 管理:每个用户独立 Trace
  • 消息处理:转换为任务提交给 Executor

IM Server Channels(数字员工型)

  • 对话性质:主体间平等交流
  • 飞书账号:数字员工账号
  • Trace 管理:所有用户共享 Trace
  • 消息处理:通过 IM Client 管理消息队列

错误处理

Webhook 处理失败

  • 飞书 Webhook 验证失败 → 返回 401
  • 消息格式错误 → 记录日志,返回 400
  • 内部错误 → 返回 500,通知管理员

任务提交失败

  • Trace 创建失败 → 向用户发送错误提示
  • Executor 繁忙 → 消息入队,稍后处理
  • Agent 执行超时 → 向用户发送超时提示

飞书 API 调用失败

  • Token 过期 → 自动刷新 Token
  • 限流 → 等待后重试
  • 网络错误 → 重试 3 次,失败后记录日志

相关文档