模块: gateway/core/executor/
module/file.py:function_name任务执行调度,包括:
说明: Executor 负责"使命/职能对话"的执行部分
pending:待执行running:执行中completed:已完成failed:执行失败cancelled:已取消gateway/core/executor/
├── __init__.py # 导出 TaskManager、ExecutionContext、build_executor_router 等
├── task_manager.py # TaskManager、ExecutionContext、AgentTraceHttpClient、执行管道
├── models.py # TaskRecord、TaskStatus、RunMode
├── api.py # FastAPI 路由 build_executor_router
└── errors.py # ExecutorError、TaskNotFoundError
说明: 无独立 scheduler.py;按 trace_id 互斥 的串行由 task_manager.py 内 _TraceSerialLocks + asyncio 任务完成。
实现位置: gateway/core/executor/task_manager.py
职责:
TraceManager.get_trace)_InMemoryTaskStore);同一 trace_id 下任务 串行(_TraceSerialLocks)AgentTraceHttpClient 调用 Agent:POST /api/traces/{trace_id}/run,轮询 GET /api/traces/{trace_id} 直至终态WorkspaceManager.get_workspace_container_id 有值,在请求体中附带 gateway_exec(docker_container、container_workdir=/home/agent/workspace 等),使 Agent 在沙箱内执行核心接口:
class TaskManager:
@classmethod
def from_env(
cls,
workspace_manager: WorkspaceManager,
trace_manager: TraceManager,
) -> TaskManager: ...
async def submit_task(
self,
trace_id: str,
task_description: str,
mode: RunMode = "async", # "sync" | "async"
metadata: dict | None = None,
) -> str:
"""提交任务,返回 task_id;sync 模式下会阻塞至任务结束"""
pass
def get_task(self, task_id: str) -> dict:
"""查询任务信息"""
pass
def list_tasks(
self,
trace_id: str | None = None,
status: str | None = None,
) -> list[dict]:
"""查询任务列表"""
pass
async def cancel_task(self, task_id: str) -> None:
"""取消任务;运行中会 POST Agent /stop"""
pass
def get_execution_logs(self, task_id: str) -> list[dict]:
"""内存执行日志"""
pass
实现位置: 同 task_manager.py
trace_id 解析 workspace_id 与 Workspace 路径(WorkspaceManager.get_workspace_path);内存记录每任务的执行日志。GATEWAY_AGENT_API_BASE_URL)的 HTTP 封装(post_run、get_trace_status、post_stop)。实现位置: gateway/core/executor/api.py 的 build_executor_router,挂载 Gateway 应用的 Executor API。
调度策略:
实现:
# 每个 Trace 有独立的任务队列
queue_key = f"task_queue:{trace_id}"
# 同一 Trace 的任务串行执行
# 不同 Trace 的任务可以并发执行
调度策略:
实现:
# 数字员工型 Agent 共享一个全局队列
queue_key = f"task_queue:digital_employee:{agent_id}"
# 所有任务串行执行
# Agent 通过工具主动查看队列
说明: 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理
await submit_task(..., mode="sync")TaskRecord(pending)并启动后台 pipelinedone_ev.wait() 上阻塞直至任务结束trace_id 锁 → POST .../run → 轮询 Trace 终态task_id 在返回时任务已处于终态await submit_task(..., mode="async")task_id;pipeline 在后台运行get_task / list_tasks 查询状态;get_execution_logs 查看内存日志completed / failed / cancelledExecutor 构造时注入 WorkspaceManager 与 TraceManager(见 TaskManager.from_env)。
submit_task 内 await trace_manager.get_trace(trace_id)。ExecutionContext 通过 trace_manager.get_workspace_id(trace_id) → workspace_manager.get_workspace_path(workspace_id)。_GatewayExecResolver.resolve(trace_id) 读取 workspace_manager.get_workspace_container_id(workspace_id),非空则写入 gateway_exec 传给 Agent。Workspace 目录、meta、Docker 卷与 Compose 配置 见 Lifecycle 模块(ensure_session、volume_subpath、命名卷名等)。
| 变量 | 说明 |
|---|---|
GATEWAY_AGENT_API_BASE_URL |
Agent API 基址(Compose 中常为 http://api:8000) |
GATEWAY_AGENT_API_TIMEOUT |
HTTP 超时(秒) |
GATEWAY_EXECUTOR_POLL_INTERVAL |
轮询 Trace 状态间隔(秒) |
GATEWAY_EXECUTOR_POLL_MAX_SECONDS |
最长轮询时间(秒) |
Agent 执行任务时可能需要通过 IM 系统通信:
Agent 请求协作:
Agent 通知用户:
任务完成通知: