executor.md 6.8 KB

Executor(任务执行调度)

模块: gateway/core/executor/

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录已确认的设计

模块职责

任务执行调度,包括:

  • 任务管理:接收、存储、查询用户任务
  • 执行调度:调度 Agent 框架执行任务
  • 状态管理:管理任务执行状态和结果

说明: Executor 负责"使命/职能对话"的执行部分


核心概念

Task(任务)

  • 定义:用户分配给 Agent 的工作任务
  • 特点
    • 有明确的目标和预期结果
    • 是"管理-执行"关系的体现
    • 不同于 IM 系统的平等对话

任务状态

  • pending:待执行
  • running:执行中
  • completed:已完成
  • failed:执行失败
  • cancelled:已取消

执行模式

  • 同步执行:提交任务后等待结果返回
  • 异步执行:提交任务后立即返回,通过查询获取结果

模块结构

gateway/core/executor/
├── task_manager.py        # 任务管理
├── scheduler.py           # 调度器
└── execution_context.py   # 执行上下文

关键功能

TaskManager

实现位置: gateway/core/executor/task_manager.py

职责:

  • 接收用户任务
  • 存储任务信息
  • 查询任务状态和结果

核心接口:

class TaskManager:
    def submit_task(
        self,
        trace_id: str,
        task_description: str,
        mode: str = "async",  # "sync" | "async"
        metadata: dict = None
    ) -> str:
        """提交任务,返回 task_id"""
        pass

    def get_task(self, task_id: str) -> dict:
        """查询任务信息"""
        pass

    def list_tasks(
        self,
        trace_id: str = None,
        status: str = None
    ) -> list[dict]:
        """查询任务列表"""
        pass

    def cancel_task(self, task_id: str):
        """取消任务"""
        pass

Scheduler

实现位置: gateway/core/executor/scheduler.py

职责:

  • 调度 Agent 框架执行任务
  • 管理执行队列
  • 处理并发和串行逻辑

核心接口:

class Scheduler:
    def schedule(self, task_id: str):
        """调度任务执行"""
        pass

    def execute(self, task_id: str) -> dict:
        """执行任务(调用 Agent 框架)"""
        pass

    def get_queue_status(self, trace_id: str) -> dict:
        """获取队列状态"""
        pass

ExecutionContext

实现位置: gateway/core/executor/execution_context.py

职责:

  • 管理任务执行上下文
  • 提供 Agent 执行所需的环境信息
  • 记录执行日志

核心接口:

class ExecutionContext:
    def create_context(self, task_id: str, trace_id: str) -> dict:
        """创建执行上下文"""
        pass

    def get_workspace_path(self, trace_id: str) -> str:
        """获取 Workspace 路径"""
        pass

    def log_execution(self, task_id: str, log_entry: dict):
        """记录执行日志"""
        pass

两种 Agent 类型的调度策略

个人助理型 Agent

调度策略:

  • 每个用户的任务独立调度
  • 支持并发执行(不同用户的任务)
  • 同一用户的任务需要排队

实现:

# 每个 Trace 有独立的任务队列
queue_key = f"task_queue:{trace_id}"

# 同一 Trace 的任务串行执行
# 不同 Trace 的任务可以并发执行

数字员工型 Agent

调度策略:

  • 所有任务串行处理
  • 任务队列全局共享
  • Agent 主动查看和处理任务

实现:

# 数字员工型 Agent 共享一个全局队列
queue_key = f"task_queue:digital_employee:{agent_id}"

# 所有任务串行执行
# Agent 通过工具主动查看队列

说明: 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理


典型流程

同步任务执行

  1. 用户通过 API 提交任务
  2. TaskManager 创建任务记录(状态:pending)
  3. Scheduler 立即调度执行
  4. 调用 Agent 框架执行任务
  5. 等待执行完成
  6. 更新任务状态(completed/failed)
  7. 返回结果给用户

异步任务执行

  1. 用户通过 API 提交任务
  2. TaskManager 创建任务记录(状态:pending)
  3. 立即返回 task_id 给用户
  4. Scheduler 异步调度执行
  5. 调用 Agent 框架执行任务
  6. 更新任务状态(running → completed/failed)
  7. 用户通过 task_id 查询结果

任务取消

  1. 用户通过 API 取消任务
  2. TaskManager 检查任务状态
  3. 如果任务未开始(pending)→ 直接标记为 cancelled
  4. 如果任务执行中(running)→ 通知 Agent 框架停止执行
  5. 更新任务状态为 cancelled

与 Lifecycle 模块的集成

Executor 依赖 Lifecycle 模块:

  1. 获取 Trace 信息

    trace_info = lifecycle.trace_manager.get_trace(trace_id)
    
    1. 获取 Workspace 路径python workspace_path = lifecycle.workspace_manager.get_workspace_path(workspace_id)
  2. 检查 Trace 状态

    # 确保 Trace 存在且可用
    if not trace_info:
       raise TraceNotFoundError(trace_id)
    

    与 IM 系统的集成

    Agent 执行任务时可能需要通过 IM 系统通信:

    1. Agent 请求协作
    2. Agent 执行过程中通过 IM Client 发送消息给其他 Agent
    3. 等待其他 Agent 回复
    4. 继续执行任务

    5. Agent 通知用户

    6. Agent 执行过程中通过 IM Client 发送消息给用户

    7. 用户在飞书等渠道收到通知

    8. 任务完成通知

    9. 任务完成后,可以通过 IM Client 通知用户

    10. 或者用户主动查询任务状态


    错误处理

    任务提交失败

    • Trace 不存在 → 返回错误,提示创建 Trace
    • Workspace 不可用 → 返回错误,提示检查 Workspace

    任务执行失败

    • Agent 框架执行异常 → 记录错误日志,更新任务状态为 failed
    • 超时 → 标记任务为 failed,记录超时信息
    • 资源不足 → 任务重新入队,等待资源释放

    并发控制

    • 同一 Trace 的任务串行执行,避免冲突
    • 全局并发数限制,避免资源耗尽
    • 任务队列满时拒绝新任务

    相关文档