executor.md 8.1 KB

Executor(任务执行调度)

模块: gateway/core/executor/

文档维护规范

  1. 先改文档,再动代码 - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现
  2. 文档分层,链接代码 - 关键实现需标注代码文件路径;格式:module/file.py:function_name
  3. 简洁快照,日志分离 - 只记录已确认的设计

模块职责

任务执行调度,包括:

  • 任务管理:接收、存储、查询用户任务
  • 执行调度:调度 Agent 框架执行任务
  • 状态管理:管理任务执行状态和结果

说明: Executor 负责"使命/职能对话"的执行部分


核心概念

Task(任务)

  • 定义:用户分配给 Agent 的工作任务
  • 特点
    • 有明确的目标和预期结果
    • 是"管理-执行"关系的体现
    • 不同于 IM 系统的平等对话

任务状态

  • pending:待执行
  • running:执行中
  • completed:已完成
  • failed:执行失败
  • cancelled:已取消

执行模式

  • 同步执行:提交任务后等待结果返回
  • 异步执行:提交任务后立即返回,通过查询获取结果

模块结构

gateway/core/executor/
├── __init__.py            # 导出 TaskManager、ExecutionContext、build_executor_router 等
├── task_manager.py        # TaskManager、ExecutionContext、AgentTraceHttpClient、执行管道
├── models.py              # TaskRecord、TaskStatus、RunMode
├── api.py                 # FastAPI 路由 build_executor_router
└── errors.py              # ExecutorError、TaskNotFoundError

说明: 无独立 scheduler.py;按 trace_id 互斥 的串行由 task_manager.py_TraceSerialLocks + asyncio 任务完成。


关键功能

TaskManager

实现位置: gateway/core/executor/task_manager.py

职责:

  • 校验 Trace 存在(TraceManager.get_trace
  • 内存任务表_InMemoryTaskStore);同一 trace_id 下任务 串行_TraceSerialLocks
  • 通过 AgentTraceHttpClient 调用 Agent:POST /api/traces/{trace_id}/run,轮询 GET /api/traces/{trace_id} 直至终态
  • WorkspaceManager.get_workspace_container_id 有值,在请求体中附带 gateway_execdocker_containercontainer_workdir=/home/agent/workspace 等),使 Agent 在沙箱内执行

核心接口:

class TaskManager:
    @classmethod
    def from_env(
        cls,
        workspace_manager: WorkspaceManager,
        trace_manager: TraceManager,
    ) -> TaskManager: ...

    async def submit_task(
        self,
        trace_id: str,
        task_description: str,
        mode: RunMode = "async",  # "sync" | "async"
        metadata: dict | None = None,
    ) -> str:
        """提交任务,返回 task_id;sync 模式下会阻塞至任务结束"""
        pass

    def get_task(self, task_id: str) -> dict:
        """查询任务信息"""
        pass

    def list_tasks(
        self,
        trace_id: str | None = None,
        status: str | None = None,
    ) -> list[dict]:
        """查询任务列表"""
        pass

    async def cancel_task(self, task_id: str) -> None:
        """取消任务;运行中会 POST Agent /stop"""
        pass

    def get_execution_logs(self, task_id: str) -> list[dict]:
        """内存执行日志"""
        pass

ExecutionContext 与 AgentTraceHttpClient

实现位置:task_manager.py

  • ExecutionContext:按 trace_id 解析 workspace_idWorkspace 路径WorkspaceManager.get_workspace_path);内存记录每任务的执行日志。
  • AgentTraceHttpClient:对 Agent 基址(GATEWAY_AGENT_API_BASE_URL)的 HTTP 封装(post_runget_trace_statuspost_stop)。

HTTP 路由

实现位置: gateway/core/executor/api.pybuild_executor_router,挂载 Gateway 应用的 Executor API。


两种 Agent 类型的调度策略

个人助理型 Agent

调度策略:

  • 每个用户的任务独立调度
  • 支持并发执行(不同用户的任务)
  • 同一用户的任务需要排队

实现:

# 每个 Trace 有独立的任务队列
queue_key = f"task_queue:{trace_id}"

# 同一 Trace 的任务串行执行
# 不同 Trace 的任务可以并发执行

数字员工型 Agent

调度策略:

  • 所有任务串行处理
  • 任务队列全局共享
  • Agent 主动查看和处理任务

实现:

# 数字员工型 Agent 共享一个全局队列
queue_key = f"task_queue:digital_employee:{agent_id}"

# 所有任务串行执行
# Agent 通过工具主动查看队列

说明: 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理


典型流程

同步任务执行

  1. 调用 await submit_task(..., mode="sync")
  2. 内存写入 TaskRecord(pending)并启动后台 pipeline
  3. 当前协程done_ev.wait() 上阻塞直至任务结束
  4. Pipeline 持 trace_id 锁 → POST .../run → 轮询 Trace 终态
  5. 返回的 task_id 在返回时任务已处于终态

异步任务执行

  1. 调用 await submit_task(..., mode="async")
  2. 立即返回 task_id;pipeline 在后台运行
  3. 通过 get_task / list_tasks 查询状态;get_execution_logs 查看内存日志
  4. 终态:completed / failed / cancelled

任务取消

  1. 用户通过 API 取消任务
  2. TaskManager 检查任务状态
  3. 如果任务未开始(pending)→ 直接标记为 cancelled
  4. 如果任务执行中(running)→ 通知 Agent 框架停止执行
  5. 更新任务状态为 cancelled

与 Lifecycle 模块的集成

Executor 构造时注入 WorkspaceManagerTraceManager(见 TaskManager.from_env)。

  1. 提交前校验submit_taskawait trace_manager.get_trace(trace_id)
  2. Workspace 路径ExecutionContext 通过 trace_manager.get_workspace_id(trace_id)workspace_manager.get_workspace_path(workspace_id)
  3. 沙箱执行_GatewayExecResolver.resolve(trace_id) 读取 workspace_manager.get_workspace_container_id(workspace_id),非空则写入 gateway_exec 传给 Agent。

Workspace 目录、meta、Docker 卷与 Compose 配置Lifecycle 模块ensure_sessionvolume_subpath、命名卷名等)。

环境变量(Executor / Agent HTTP)

变量 说明
GATEWAY_AGENT_API_BASE_URL Agent API 基址(Compose 中常为 http://api:8000
GATEWAY_AGENT_API_TIMEOUT HTTP 超时(秒)
GATEWAY_EXECUTOR_POLL_INTERVAL 轮询 Trace 状态间隔(秒)
GATEWAY_EXECUTOR_POLL_MAX_SECONDS 最长轮询时间(秒)

与 IM 系统的集成

Agent 执行任务时可能需要通过 IM 系统通信:

  1. Agent 请求协作

    • Agent 执行过程中通过 IM Client 发送消息给其他 Agent
    • 等待其他 Agent 回复
    • 继续执行任务
  2. Agent 通知用户

    • Agent 执行过程中通过 IM Client 发送消息给用户
    • 用户在飞书等渠道收到通知
  3. 任务完成通知

    • 任务完成后,可以通过 IM Client 通知用户
    • 或者用户主动查询任务状态

错误处理

任务提交失败

  • Trace 不存在 → 返回错误,提示创建 Trace
  • Workspace 不可用 → 返回错误,提示检查 Workspace

任务执行失败

  • Agent 框架执行异常 → 记录错误日志,更新任务状态为 failed
  • 超时 → 标记任务为 failed,记录超时信息
  • 资源不足 → 任务重新入队,等待资源释放

并发控制

  • 同一 Trace 的任务串行执行,避免冲突
  • 全局并发数限制,避免资源耗尽
  • 任务队列满时拒绝新任务

相关文档