# Executor(任务执行调度) **模块:** `gateway/core/executor/` ## 文档维护规范 0. **先改文档,再动代码** - 新功能或重大修改需先完成文档更新、并完成审阅后,再进行代码实现 1. **文档分层,链接代码** - 关键实现需标注代码文件路径;格式:`module/file.py:function_name` 2. **简洁快照,日志分离** - 只记录已确认的设计 --- ## 模块职责 任务执行调度,包括: - **任务管理**:接收、存储、查询用户任务 - **执行调度**:调度 Agent 框架执行任务 - **状态管理**:管理任务执行状态和结果 **说明:** Executor 负责"使命/职能对话"的执行部分 --- ## 核心概念 ### Task(任务) - **定义**:用户分配给 Agent 的工作任务 - **特点**: - 有明确的目标和预期结果 - 是"管理-执行"关系的体现 - 不同于 IM 系统的平等对话 ### 任务状态 - `pending`:待执行 - `running`:执行中 - `completed`:已完成 - `failed`:执行失败 - `cancelled`:已取消 ### 执行模式 - **同步执行**:提交任务后等待结果返回 - **异步执行**:提交任务后立即返回,通过查询获取结果 --- ## 模块结构 ``` gateway/core/executor/ ├── task_manager.py # 任务管理 ├── scheduler.py # 调度器 └── execution_context.py # 执行上下文 ``` --- ## 关键功能 ### TaskManager **实现位置:** `gateway/core/executor/task_manager.py` **职责:** - 接收用户任务 - 存储任务信息 - 查询任务状态和结果 **核心接口:** ```python class TaskManager: def submit_task( self, trace_id: str, task_description: str, mode: str = "async", # "sync" | "async" metadata: dict = None ) -> str: """提交任务,返回 task_id""" pass def get_task(self, task_id: str) -> dict: """查询任务信息""" pass def list_tasks( self, trace_id: str = None, status: str = None ) -> list[dict]: """查询任务列表""" pass def cancel_task(self, task_id: str): """取消任务""" pass ``` ### Scheduler **实现位置:** `gateway/core/executor/scheduler.py` **职责:** - 调度 Agent 框架执行任务 - 管理执行队列 - 处理并发和串行逻辑 **核心接口:** ```python class Scheduler: def schedule(self, task_id: str): """调度任务执行""" pass def execute(self, task_id: str) -> dict: """执行任务(调用 Agent 框架)""" pass def get_queue_status(self, trace_id: str) -> dict: """获取队列状态""" pass ``` ### ExecutionContext **实现位置:** `gateway/core/executor/execution_context.py` **职责:** - 管理任务执行上下文 - 提供 Agent 执行所需的环境信息 - 记录执行日志 **核心接口:** ```python class ExecutionContext: def create_context(self, task_id: str, trace_id: str) -> dict: """创建执行上下文""" pass def get_workspace_path(self, trace_id: str) -> str: """获取 Workspace 路径""" pass def log_execution(self, task_id: str, log_entry: dict): """记录执行日志""" pass ``` --- ## 两种 Agent 类型的调度策略 ### 个人助理型 Agent **调度策略:** - 每个用户的任务独立调度 - 支持并发执行(不同用户的任务) - 同一用户的任务需要排队 **实现:** ```python # 每个 Trace 有独立的任务队列 queue_key = f"task_queue:{trace_id}" # 同一 Trace 的任务串行执行 # 不同 Trace 的任务可以并发执行 ``` ### 数字员工型 Agent **调度策略:** - 所有任务串行处理 - 任务队列全局共享 - Agent 主动查看和处理任务 **实现:** ```python # 数字员工型 Agent 共享一个全局队列 queue_key = f"task_queue:digital_employee:{agent_id}" # 所有任务串行执行 # Agent 通过工具主动查看队列 ``` **说明:** 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理 --- ## 典型流程 ### 同步任务执行 1. 用户通过 API 提交任务 2. TaskManager 创建任务记录(状态:pending) 3. Scheduler 立即调度执行 4. 调用 Agent 框架执行任务 5. 等待执行完成 6. 更新任务状态(completed/failed) 7. 返回结果给用户 ### 异步任务执行 1. 用户通过 API 提交任务 2. TaskManager 创建任务记录(状态:pending) 3. 立即返回 task_id 给用户 4. Scheduler 异步调度执行 5. 调用 Agent 框架执行任务 6. 更新任务状态(running → completed/failed) 7. 用户通过 task_id 查询结果 ### 任务取消 1. 用户通过 API 取消任务 2. TaskManager 检查任务状态 3. 如果任务未开始(pending)→ 直接标记为 cancelled 4. 如果任务执行中(running)→ 通知 Agent 框架停止执行 5. 更新任务状态为 cancelled --- ## 与 Lifecycle 模块的集成 Executor 依赖 Lifecycle 模块: 1. **获取 Trace 信息**: ```python trace_info = lifecycle.trace_manager.get_trace(trace_id) ``` 2. **获取 Workspace 路径**: ```python workspace_path = lifecycle.workspace_manager.get_workspace_path(workspace_id) ``` 3. **检查 Trace 状态**: ```python # 确保 Trace 存在且可用 if not trace_info: raise TraceNotFoundError(trace_id) ``` --- ## 与 IM 系统的集成 Agent 执行任务时可能需要通过 IM 系统通信: 1. **Agent 请求协作**: - Agent 执行过程中通过 IM Client 发送消息给其他 Agent - 等待其他 Agent 回复 - 继续执行任务 2. **Agent 通知用户**: - Agent 执行过程中通过 IM Client 发送消息给用户 - 用户在飞书等渠道收到通知 3. **任务完成通知**: - 任务完成后,可以通过 IM Client 通知用户 - 或者用户主动查询任务状态 --- ## 错误处理 ### 任务提交失败 - Trace 不存在 → 返回错误,提示创建 Trace - Workspace 不可用 → 返回错误,提示检查 Workspace ### 任务执行失败 - Agent 框架执行异常 → 记录错误日志,更新任务状态为 failed - 超时 → 标记任务为 failed,记录超时信息 - 资源不足 → 任务重新入队,等待资源释放 ### 并发控制 - 同一 Trace 的任务串行执行,避免冲突 - 全局并发数限制,避免资源耗尽 - 任务队列满时拒绝新任务 --- ## 相关文档 - [需求规划](../requirements.md):任务执行调度需求 - [架构设计](../architecture.md):模块在整体架构中的位置 - [Lifecycle 模块](./lifecycle.md):生命周期管理模块 - [Agent Core 架构](../../../agent/docs/architecture.md):Agent 框架核心设计