模块: gateway/core/executor/
module/file.py:function_name任务执行调度,包括:
说明: Executor 负责"使命/职能对话"的执行部分
pending:待执行running:执行中completed:已完成failed:执行失败cancelled:已取消gateway/core/executor/
├── task_manager.py # 任务管理
├── scheduler.py # 调度器
└── execution_context.py # 执行上下文
实现位置: gateway/core/executor/task_manager.py
职责:
核心接口:
class TaskManager:
def submit_task(
self,
trace_id: str,
task_description: str,
mode: str = "async", # "sync" | "async"
metadata: dict = None
) -> str:
"""提交任务,返回 task_id"""
pass
def get_task(self, task_id: str) -> dict:
"""查询任务信息"""
pass
def list_tasks(
self,
trace_id: str = None,
status: str = None
) -> list[dict]:
"""查询任务列表"""
pass
def cancel_task(self, task_id: str):
"""取消任务"""
pass
实现位置: gateway/core/executor/scheduler.py
职责:
核心接口:
class Scheduler:
def schedule(self, task_id: str):
"""调度任务执行"""
pass
def execute(self, task_id: str) -> dict:
"""执行任务(调用 Agent 框架)"""
pass
def get_queue_status(self, trace_id: str) -> dict:
"""获取队列状态"""
pass
实现位置: gateway/core/executor/execution_context.py
职责:
核心接口:
class ExecutionContext:
def create_context(self, task_id: str, trace_id: str) -> dict:
"""创建执行上下文"""
pass
def get_workspace_path(self, trace_id: str) -> str:
"""获取 Workspace 路径"""
pass
def log_execution(self, task_id: str, log_entry: dict):
"""记录执行日志"""
pass
调度策略:
实现:
# 每个 Trace 有独立的任务队列
queue_key = f"task_queue:{trace_id}"
# 同一 Trace 的任务串行执行
# 不同 Trace 的任务可以并发执行
调度策略:
实现:
# 数字员工型 Agent 共享一个全局队列
queue_key = f"task_queue:digital_employee:{agent_id}"
# 所有任务串行执行
# Agent 通过工具主动查看队列
说明: 数字员工型 Agent 的任务队列可能由 IM Client 管理,Executor 只负责任务的提交和状态管理
Executor 依赖 Lifecycle 模块:
获取 Trace 信息:
trace_info = lifecycle.trace_manager.get_trace(trace_id)
python
workspace_path = lifecycle.workspace_manager.get_workspace_path(workspace_id)
检查 Trace 状态:
# 确保 Trace 存在且可用
if not trace_info:
raise TraceNotFoundError(trace_id)
Agent 执行任务时可能需要通过 IM 系统通信:
继续执行任务
Agent 通知用户:
Agent 执行过程中通过 IM Client 发送消息给用户
用户在飞书等渠道收到通知
任务完成通知:
任务完成后,可以通过 IM Client 通知用户
或者用户主动查询任务状态