""" Sub-Agent 管理器 - 统一管理 Sub-Agent 创建和执行 统一 evaluate、delegate、explore 三种模式的 Sub-Agent 管理 """ import asyncio from typing import Optional, Dict, Any, List from datetime import datetime from agent.execution.models import Trace, Message from agent.execution.trace_id import generate_sub_trace_id from agent.models.goal import Goal, GoalTree from agent.services.subagent.signals import Signal class SubAgentManager: """ 统一的 Sub-Agent 管理器 负责创建、配置和执行不同模式的 Sub-Agent """ def __init__(self, store, signal_bus=None): """ 初始化管理器 Args: store: TraceStore 实例 signal_bus: SignalBus 实例(可选,用于异步通讯) """ self.store = store self.signal_bus = signal_bus async def execute( self, mode: str, current_trace_id: str, current_goal_id: str, options: Dict[str, Any], continue_from: Optional[str] = None, wait: bool = True, run_agent=None ) -> Dict[str, Any]: """ 统一的执行逻辑(信号驱动) Args: mode: 模式 - "evaluate" | "delegate" | "explore" current_trace_id: 当前主 Trace ID current_goal_id: 当前 Goal ID options: 模式特定的选项 continue_from: 继承的 trace ID(可选) wait: True=等待完成信号, False=立即返回 run_agent: 运行 Agent 的函数 Returns: 根据 mode 返回不同格式的结果 """ if not run_agent: raise ValueError("run_agent parameter is required") # 1. 创建 Sub-Trace sub_trace_id = await self._create_sub_trace( mode, current_trace_id, current_goal_id, options, continue_from ) # 2. 在后台启动 Sub-Agent task = asyncio.create_task( self._run_subagent_background( mode, sub_trace_id, current_trace_id, current_goal_id, options, run_agent ) ) # 3. 发送启动信号 if self.signal_bus: self.signal_bus.emit(Signal( type="subagent.start", trace_id=sub_trace_id, data={ "parent_trace_id": current_trace_id, "mode": mode, "task": self._get_task_summary(mode, options) } )) if wait: # 4a. 等待完成信号 return await self._wait_for_completion( sub_trace_id, current_trace_id, mode ) else: # 4b. 立即返回 return { "subagent_id": sub_trace_id, "status": "running", "mode": mode } async def _create_sub_trace( self, mode: str, current_trace_id: str, current_goal_id: str, options: Dict[str, Any], continue_from: Optional[str] = None ) -> str: """创建 Sub-Trace(不再执行,只创建)""" # 1. 配置权限和参数 allowed_tools = self._get_allowed_tools(mode) agent_type = mode if mode != "evaluation" else "evaluator" # 2. 更新当前 Goal 为 agent_call 类型 update_data = { "type": "agent_call", "agent_call_mode": mode, "status": "in_progress" } # evaluation 模式特殊处理 if mode == "evaluate": update_data["target_goal_id"] = options.get("target_goal_id") update_data["evaluation_input"] = options.get("evaluation_input") await self.store.update_goal(current_trace_id, current_goal_id, **update_data) # 3. 生成或复用 Sub-Trace ID if continue_from: sub_trace_id = continue_from # 验证 trace 存在 existing_trace = await self.store.get_trace(sub_trace_id) if not existing_trace: raise ValueError(f"Continue-from trace not found: {continue_from}") else: sub_trace_id = generate_sub_trace_id(current_trace_id, mode) # 4. 构建任务 prompt task_prompt = await self._build_task_prompt(mode, options, current_trace_id, continue_from) # 5. 创建或复用 Sub-Trace if not continue_from: # 新建 Sub-Trace sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=task_prompt, parent_trace_id=current_trace_id, parent_goal_id=current_goal_id, agent_type=agent_type, context={ "allowed_tools": allowed_tools, "max_turns": self._get_max_turns(mode) }, status="running", created_at=datetime.now() ) await self.store.create_trace(sub_trace) await self.store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=[sub_trace_id]) # 推送 sub_trace_started 事件 await self.store.append_event(current_trace_id, "sub_trace_started", { "trace_id": sub_trace_id, "parent_trace_id": current_trace_id, "parent_goal_id": current_goal_id, "agent_type": agent_type, "task": self._get_task_summary(mode, options) }) else: # 连续记忆:在现有 trace 上继续 await self.store.append_message(sub_trace_id, Message( role="user", content=task_prompt, created_at=datetime.now() )) return sub_trace_id async def _run_subagent_background( self, mode: str, sub_trace_id: str, current_trace_id: str, current_goal_id: str, options: Dict[str, Any], run_agent ): """在后台运行 Sub-Agent,完成后发送信号""" try: # 获取 trace 对象 sub_trace = await self.store.get_trace(sub_trace_id) # 运行 agent result = await run_agent(sub_trace) # 获取最终状态 updated_trace = await self.store.get_trace(sub_trace_id) # 格式化结果 formatted_result = await self._format_result( mode, result, updated_trace, options, current_trace_id ) # 发送完成信号 if self.signal_bus: self.signal_bus.emit(Signal( type="subagent.complete", trace_id=sub_trace_id, data={ "parent_trace_id": current_trace_id, "result": formatted_result, "status": "completed" } )) # 推送事件 await self.store.append_event(current_trace_id, "sub_trace_completed", { "trace_id": sub_trace_id, "status": "completed", "result": formatted_result, "stats": { "total_messages": updated_trace.total_messages if updated_trace else 0, "total_tokens": updated_trace.total_tokens if updated_trace else 0, "total_cost": updated_trace.total_cost if updated_trace else 0 } }) # 更新主 Goal await self._update_goal_after_completion( mode, current_trace_id, current_goal_id, formatted_result, options ) except Exception as e: # 发送错误信号 if self.signal_bus: self.signal_bus.emit(Signal( type="subagent.error", trace_id=sub_trace_id, data={ "parent_trace_id": current_trace_id, "error": str(e), "mode": mode } )) # 推送失败事件 await self.store.append_event(current_trace_id, "sub_trace_completed", { "trace_id": sub_trace_id, "status": "failed", "error": str(e) }) # 更新主 Goal 为失败 await self.store.update_goal( current_trace_id, current_goal_id, status="failed", summary=f"{mode} 失败: {str(e)}" ) async def _wait_for_completion( self, sub_trace_id: str, current_trace_id: str, mode: str, timeout: float = 300.0 # 5 分钟超时 ) -> Dict[str, Any]: """等待 Sub-Agent 完成信号""" start_time = asyncio.get_event_loop().time() while True: # 检查超时 if asyncio.get_event_loop().time() - start_time > timeout: raise TimeoutError(f"{mode} Sub-Agent 超时({timeout}秒)") # 检查信号 if self.signal_bus: signals = self.signal_bus.check_buffer(current_trace_id) for signal in signals: if signal.trace_id == sub_trace_id: if signal.type == "subagent.complete": return signal.data["result"] elif signal.type == "subagent.error": error = signal.data.get("error", "Unknown error") raise Exception(f"{mode} 失败: {error}") # 短暂休眠,避免忙等待 await asyncio.sleep(0.1) def _get_allowed_tools(self, mode: str) -> Optional[List[str]]: """根据 mode 返回允许的工具列表""" if mode == "evaluate": return ["read_file", "grep_content", "glob_files"] elif mode == "explore": return ["read_file", "grep_content", "glob_files"] elif mode == "delegate": return None # 完整权限 return None def _get_max_turns(self, mode: str) -> int: """根据 mode 返回最大轮次""" if mode == "evaluate": return 10 elif mode == "explore": return 20 elif mode == "delegate": return 50 return 30 def _get_task_summary(self, mode: str, options: Dict[str, Any]) -> str: """获取任务摘要(用于事件)""" if mode == "evaluate": target_goal_id = options.get("target_goal_id", "unknown") return f"评估 Goal {target_goal_id}" elif mode == "delegate": return options.get("task", "委托任务") elif mode == "explore": branches = options.get("branches", []) return f"探索 {len(branches)} 个方案" return "Sub-Agent 任务" async def _build_task_prompt( self, mode: str, options: Dict[str, Any], current_trace_id: str, continue_from: Optional[str] ) -> str: """构建任务 prompt""" if mode == "evaluate": return await self._build_evaluation_prompt(options, current_trace_id, continue_from) elif mode == "delegate": return options.get("task", "") elif mode == "explore": return self._build_exploration_prompt(options) return "" async def _build_evaluation_prompt( self, options: Dict[str, Any], current_trace_id: str, continue_from: Optional[str] ) -> str: """构建评估 prompt(参考 evaluate.py)""" target_goal_id = options.get("target_goal_id") evaluation_input = options.get("evaluation_input", {}) requirements = options.get("requirements") # 获取被评估的 Goal goal_tree = await self.store.get_goal_tree(current_trace_id) if not goal_tree: raise ValueError(f"Goal tree not found for trace: {current_trace_id}") target_goal = goal_tree.find(target_goal_id) if not target_goal: raise ValueError(f"Target goal not found: {target_goal_id}") # 获取历史评估结果(如果是连续记忆) previous_results = [] if continue_from and target_goal.evaluation_result: previous_results.append(target_goal.evaluation_result) # 构建 prompt lines = [] lines.append("# 评估任务") lines.append("") lines.append("请评估以下任务的执行结果是否满足要求。") lines.append("") lines.append("## 目标描述") lines.append("") goal_description = evaluation_input.get("goal_description", target_goal.description) lines.append(goal_description) lines.append("") lines.append("## 执行结果") lines.append("") actual_result = evaluation_input.get("actual_result") if actual_result is not None: if isinstance(actual_result, str): lines.append(actual_result) else: import json lines.append("```json") lines.append(json.dumps(actual_result, ensure_ascii=False, indent=2)) lines.append("```") else: lines.append("(无执行结果)") lines.append("") if requirements: lines.append("## 评估要求") lines.append("") lines.append(requirements) lines.append("") if previous_results: lines.append("## 历史评估记录") lines.append("") for i, prev in enumerate(previous_results, 1): lines.append(f"### 评估 #{i}") lines.append(f"- **结论**: {'通过' if prev.get('passed') else '不通过'}") lines.append(f"- **理由**: {prev.get('reason', '无')}") if prev.get('suggestions'): lines.append(f"- **建议**: {', '.join(prev.get('suggestions', []))}") lines.append("") lines.append("## 输出格式") lines.append("") lines.append("请按照以下格式输出评估结果:") lines.append("") lines.append("## 评估结论") lines.append("[通过/不通过]") lines.append("") lines.append("## 评估理由") lines.append("[详细说明为什么通过或不通过]") lines.append("") lines.append("## 修改建议(如果不通过)") lines.append("1. [具体的、可操作的建议1]") lines.append("2. [具体的、可操作的建议2]") return "\n".join(lines) def _build_exploration_prompt(self, options: Dict[str, Any]) -> str: """构建探索 prompt""" branches = options.get("branches", []) background = options.get("background", "") lines = [] lines.append("# 探索任务") lines.append("") if background: lines.append(background) lines.append("") lines.append("请探索以下方案:") for i, branch in enumerate(branches, 1): lines.append(f"{i}. {branch}") return "\n".join(lines) async def _format_result( self, mode: str, result: Any, trace: Trace, options: Dict[str, Any], current_trace_id: str ) -> Dict[str, Any]: """根据 mode 格式化结果""" if mode == "evaluate": return self._parse_evaluation_result(result) elif mode == "delegate": summary = result.get("summary", "任务完成") if isinstance(result, dict) else "任务完成" return { "summary": summary, "stats": { "total_messages": trace.total_messages if trace else 0, "total_tokens": trace.total_tokens if trace else 0, "total_cost": trace.total_cost if trace else 0 } } elif mode == "explore": return {"summary": result if isinstance(result, str) else "探索完成"} return {} def _parse_evaluation_result(self, agent_result: Any) -> Dict[str, Any]: """解析评估结果(参考 evaluate.py)""" last_message = agent_result if agent_result else None if not last_message: return { "passed": False, "reason": "评估 Agent 未返回结果", "suggestions": [], "details": {} } # 解析评估结论 passed = False if "通过" in last_message and "不通过" not in last_message: passed = True elif "不通过" in last_message: passed = False # 提取评估理由 reason = "" if "## 评估理由" in last_message: parts = last_message.split("## 评估理由") if len(parts) > 1: reason_section = parts[1].split("##")[0].strip() reason = reason_section # 提取修改建议 suggestions = [] if "## 修改建议" in last_message: parts = last_message.split("## 修改建议") if len(parts) > 1: suggestions_section = parts[1].split("##")[0].strip() for line in suggestions_section.split("\n"): line = line.strip() if line and (line.startswith("-") or line.startswith("*") or line[0].isdigit()): suggestion = line.lstrip("-*0123456789. ").strip() if suggestion: suggestions.append(suggestion) return { "passed": passed, "reason": reason if reason else last_message[:200], "suggestions": suggestions, "details": {"full_response": last_message} } async def _update_goal_after_completion( self, mode: str, current_trace_id: str, current_goal_id: str, result: Dict[str, Any], options: Dict[str, Any] ): """完成后更新 Goal""" if mode == "evaluate": await self.store.update_goal( current_trace_id, current_goal_id, evaluation_result=result, status="completed", summary=f"评估{'通过' if result.get('passed') else '不通过'}" ) elif mode == "delegate": task = options.get("task", "任务") await self.store.update_goal( current_trace_id, current_goal_id, status="completed", summary=f"已委托完成: {task}" ) elif mode == "explore": await self.store.update_goal( current_trace_id, current_goal_id, status="completed", summary="探索完成" )