# Executor(任务执行调度) **模块:** `gateway/core/executor/` ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现 1. **文档分层,链接代码** - 关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录已确认的设计 --- ## 模块职责 任务执行调度,包括: - **任务管理**:接收、存储、查询用户任务 - **执行调度**:调度 Agent 框架执行任务 - **状态管理**:管理任务执行状态和结果 **说明:** Executor 负责"使命/职能对话"的执行部分 --- ## 核心概念 ### Task(任务) - **定义**:用户分配给 Agent 的工作任务 - **特点**: - 有明确的目标和预期结果 - 是"管理-执行"关系的体现 - 不同于 IM 系统的平等对话 ### 任务状态 - `pending`:待执行 - `running`:执行中 - `completed`:已完成 - `failed`:执行失败 - `cancelled`:已取消 ### 执行模式 - **同步执行**:提交任务后等待结果返回 - **异步执行**:提交任务后立即返回,通过查询获取结果 --- ## 模块结构 ``` gateway/core/executor/ ├── __init__.py # 导出 TaskManager、ExecutionContext、build_executor_router 等 ├── task_manager.py # TaskManager、ExecutionContext、AgentTraceHttpClient、执行管道 ├── models.py # TaskRecord、TaskStatus、RunMode ├── api.py # FastAPI 路由 build_executor_router └── errors.py # ExecutorError、TaskNotFoundError ``` **说明:** 无独立 `scheduler.py`;按 **trace_id 互斥** 的串行由 `task_manager.py` 内 `_TraceSerialLocks` + `asyncio` 任务完成。 --- ## 关键功能 ### TaskManager **实现位置:** `gateway/core/executor/task_manager.py` **职责:** - 校验 Trace 存在(`TraceManager.get_trace`) - **内存任务表**(`_InMemoryTaskStore`);同一 `trace_id` 下任务 **串行**(`_TraceSerialLocks`) - 通过 `AgentTraceHttpClient` 调用 Agent:`POST /api/traces/{trace_id}/run`,轮询 `GET /api/traces/{trace_id}` 直至终态 - 若 `WorkspaceManager.get_workspace_container_id` 有值,在请求体中附带 **`gateway_exec`**(`docker_container`、`container_workdir=/home/agent/workspace` 等),使 Agent 在沙箱内执行 **核心接口:** ```python class TaskManager: @classmethod def from_env( cls, workspace_manager: WorkspaceManager, trace_manager: TraceManager, ) -> TaskManager: ... async def submit_task( self, trace_id: str, task_description: str, mode: RunMode = "async", # "sync" | "async" metadata: dict | None = None, ) -> str: """提交任务,返回 task_id;sync 模式下会阻塞至任务结束""" pass def get_task(self, task_id: str) -> dict: """查询任务信息""" pass def list_tasks( self, trace_id: str | None = None, status: str | None = None, ) -> list[dict]: """查询任务列表""" pass async def cancel_task(self, task_id: str) -> None: """取消任务;运行中会 POST Agent /stop""" pass def get_execution_logs(self, task_id: str) -> list[dict]: """内存执行日志""" pass ``` ### ExecutionContext 与 AgentTraceHttpClient **实现位置:** 同 `task_manager.py` - **ExecutionContext**:按 `trace_id` 解析 `workspace_id` 与 **Workspace 路径**(`WorkspaceManager.get_workspace_path`);内存记录每任务的执行日志。 - **AgentTraceHttpClient**:对 Agent 基址(`GATEWAY_AGENT_API_BASE_URL`)的 HTTP 封装(`post_run`、`get_trace_status`、`post_stop`)。 ### HTTP 路由 **实现位置:** `gateway/core/executor/api.py` 的 `build_executor_router`,挂载 Gateway 应用的 Executor API。 --- ## 两种 Agent 类型的调度策略 ### 个人助理型 Agent **调度策略:** - 每个用户的任务独立调度 - 支持并发执行(不同用户的任务) - 同一用户的任务需要排队 **实现:** ```python # 每个 Trace 有独立的任务队列 queue_key = f"task_queue:{trace_id}" # 同一 Trace 的任务串行执行 # 不同 Trace 的任务可以并发执行 ``` ### 数字员工型 Agent **调度策略:** - 所有任务串行处理 - 任务队列全局共享 - Agent 主动查看和处理任务 **实现:** ```python # 数字员工型 Agent 共享一个全局队列 queue_key = f"task_queue:digital_employee:{agent_id}" # 所有任务串行执行 # Agent 通过工具主动查看队列 ``` **说明:** 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理 --- ## 典型流程 ### 同步任务执行 1. 调用 `await submit_task(..., mode="sync")` 2. 内存写入 `TaskRecord`(pending)并启动后台 pipeline 3. **当前协程**在 `done_ev.wait()` 上阻塞直至任务结束 4. Pipeline 持 `trace_id` 锁 → `POST .../run` → 轮询 Trace 终态 5. 返回的 `task_id` 在返回时任务已处于终态 ### 异步任务执行 1. 调用 `await submit_task(..., mode="async")` 2. 立即返回 `task_id`;pipeline 在后台运行 3. 通过 `get_task` / `list_tasks` 查询状态;`get_execution_logs` 查看内存日志 4. 终态:`completed` / `failed` / `cancelled` ### 任务取消 1. 用户通过 API 取消任务 2. TaskManager 检查任务状态 3. 如果任务未开始(pending)→ 直接标记为 cancelled 4. 如果任务执行中(running)→ 通知 Agent 框架停止执行 5. 更新任务状态为 cancelled --- ## 与 Lifecycle 模块的集成 Executor 构造时注入 `WorkspaceManager` 与 `TraceManager`(见 `TaskManager.from_env`)。 1. **提交前校验**:`submit_task` 内 `await trace_manager.get_trace(trace_id)`。 2. **Workspace 路径**:`ExecutionContext` 通过 `trace_manager.get_workspace_id(trace_id)` → `workspace_manager.get_workspace_path(workspace_id)`。 3. **沙箱执行**:`_GatewayExecResolver.resolve(trace_id)` 读取 `workspace_manager.get_workspace_container_id(workspace_id)`,非空则写入 `gateway_exec` 传给 Agent。 **Workspace 目录、meta、Docker 卷与 Compose 配置** 见 [Lifecycle 模块](./lifecycle.md)(`ensure_session`、`volume_subpath`、命名卷名等)。 ### 环境变量(Executor / Agent HTTP) | 变量 | 说明 | |------|------| | `GATEWAY_AGENT_API_BASE_URL` | Agent API 基址(Compose 中常为 `http://api:8000`) | | `GATEWAY_AGENT_API_TIMEOUT` | HTTP 超时(秒) | | `GATEWAY_EXECUTOR_POLL_INTERVAL` | 轮询 Trace 状态间隔(秒) | | `GATEWAY_EXECUTOR_POLL_MAX_SECONDS` | 最长轮询时间(秒) | --- ## 与 IM 系统的集成 Agent 执行任务时可能需要通过 IM 系统通信: 1. **Agent 请求协作**: - Agent 执行过程中通过 IM Client 发送消息给其他 Agent - 等待其他 Agent 回复 - 继续执行任务 2. **Agent 通知用户**: - Agent 执行过程中通过 IM Client 发送消息给用户 - 用户在飞书等渠道收到通知 3. **任务完成通知**: - 任务完成后,可以通过 IM Client 通知用户 - 或者用户主动查询任务状态 --- ## 错误处理 ### 任务提交失败 - Trace 不存在 → 返回错误,提示创建 Trace - Workspace 不可用 → 返回错误,提示检查 Workspace ### 任务执行失败 - Agent 框架执行异常 → 记录错误日志,更新任务状态为 failed - 超时 → 标记任务为 failed,记录超时信息 - 资源不足 → 任务重新入队,等待资源释放 ### 并发控制 - 同一 Trace 的任务串行执行,避免冲突 - 全局并发数限制,避免资源耗尽 - 任务队列满时拒绝新任务 --- ## 相关文档 - [需求规划](../requirements.md):任务执行调度需求 - [架构设计](../architecture.md):模块在整体架构中的位置 - [Lifecycle 模块](./lifecycle.md):生命周期管理模块 - [Agent Core 架构](../../../agent/docs/architecture.md):Agent 框架核心设计