| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- """
- Sub-Agent 管理器 - 统一管理 Sub-Agent 创建和执行
- 统一 evaluate、delegate、explore 三种模式的 Sub-Agent 管理
- """
- import asyncio
- from typing import Optional, Dict, Any, List
- from datetime import datetime
- from agent.execution.models import Trace, Message
- from agent.execution.trace_id import generate_sub_trace_id
- from agent.models.goal import Goal, GoalTree
- from agent.services.subagent.signals import Signal
- class SubAgentManager:
- """
- 统一的 Sub-Agent 管理器
- 负责创建、配置和执行不同模式的 Sub-Agent
- """
- def __init__(self, store, signal_bus=None):
- """
- 初始化管理器
- Args:
- store: TraceStore 实例
- signal_bus: SignalBus 实例(可选,用于异步通讯)
- """
- self.store = store
- self.signal_bus = signal_bus
- async def execute(
- self,
- mode: str,
- current_trace_id: str,
- current_goal_id: str,
- options: Dict[str, Any],
- continue_from: Optional[str] = None,
- wait: bool = True,
- run_agent=None
- ) -> Dict[str, Any]:
- """
- 统一的执行逻辑(信号驱动)
- Args:
- mode: 模式 - "evaluate" | "delegate" | "explore"
- current_trace_id: 当前主 Trace ID
- current_goal_id: 当前 Goal ID
- options: 模式特定的选项
- continue_from: 继承的 trace ID(可选)
- wait: True=等待完成信号, False=立即返回
- run_agent: 运行 Agent 的函数
- Returns:
- 根据 mode 返回不同格式的结果
- """
- if not run_agent:
- raise ValueError("run_agent parameter is required")
- # 1. 创建 Sub-Trace
- sub_trace_id = await self._create_sub_trace(
- mode, current_trace_id, current_goal_id,
- options, continue_from
- )
- # 2. 在后台启动 Sub-Agent
- task = asyncio.create_task(
- self._run_subagent_background(
- mode, sub_trace_id, current_trace_id,
- current_goal_id, options, run_agent
- )
- )
- # 3. 发送启动信号
- if self.signal_bus:
- self.signal_bus.emit(Signal(
- type="subagent.start",
- trace_id=sub_trace_id,
- data={
- "parent_trace_id": current_trace_id,
- "mode": mode,
- "task": self._get_task_summary(mode, options)
- }
- ))
- if wait:
- # 4a. 等待完成信号
- return await self._wait_for_completion(
- sub_trace_id, current_trace_id, mode
- )
- else:
- # 4b. 立即返回
- return {
- "subagent_id": sub_trace_id,
- "status": "running",
- "mode": mode
- }
- async def _create_sub_trace(
- self,
- mode: str,
- current_trace_id: str,
- current_goal_id: str,
- options: Dict[str, Any],
- continue_from: Optional[str] = None
- ) -> str:
- """创建 Sub-Trace(不再执行,只创建)"""
- # 1. 配置权限和参数
- allowed_tools = self._get_allowed_tools(mode)
- agent_type = mode if mode != "evaluation" else "evaluator"
- # 2. 更新当前 Goal 为 agent_call 类型
- update_data = {
- "type": "agent_call",
- "agent_call_mode": mode,
- "status": "in_progress"
- }
- # evaluation 模式特殊处理
- if mode == "evaluate":
- update_data["target_goal_id"] = options.get("target_goal_id")
- update_data["evaluation_input"] = options.get("evaluation_input")
- await self.store.update_goal(current_trace_id, current_goal_id, **update_data)
- # 3. 生成或复用 Sub-Trace ID
- if continue_from:
- sub_trace_id = continue_from
- # 验证 trace 存在
- existing_trace = await self.store.get_trace(sub_trace_id)
- if not existing_trace:
- raise ValueError(f"Continue-from trace not found: {continue_from}")
- else:
- sub_trace_id = generate_sub_trace_id(current_trace_id, mode)
- # 4. 构建任务 prompt
- task_prompt = await self._build_task_prompt(mode, options, current_trace_id, continue_from)
- # 5. 创建或复用 Sub-Trace
- if not continue_from:
- # 新建 Sub-Trace
- sub_trace = Trace(
- trace_id=sub_trace_id,
- mode="agent",
- task=task_prompt,
- parent_trace_id=current_trace_id,
- parent_goal_id=current_goal_id,
- agent_type=agent_type,
- context={
- "allowed_tools": allowed_tools,
- "max_turns": self._get_max_turns(mode)
- },
- status="running",
- created_at=datetime.now()
- )
- await self.store.create_trace(sub_trace)
- await self.store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=[sub_trace_id])
- # 推送 sub_trace_started 事件
- await self.store.append_event(current_trace_id, "sub_trace_started", {
- "trace_id": sub_trace_id,
- "parent_trace_id": current_trace_id,
- "parent_goal_id": current_goal_id,
- "agent_type": agent_type,
- "task": self._get_task_summary(mode, options)
- })
- else:
- # 连续记忆:在现有 trace 上继续
- await self.store.append_message(sub_trace_id, Message(
- role="user",
- content=task_prompt,
- created_at=datetime.now()
- ))
- return sub_trace_id
- async def _run_subagent_background(
- self,
- mode: str,
- sub_trace_id: str,
- current_trace_id: str,
- current_goal_id: str,
- options: Dict[str, Any],
- run_agent
- ):
- """在后台运行 Sub-Agent,完成后发送信号"""
- try:
- # 获取 trace 对象
- sub_trace = await self.store.get_trace(sub_trace_id)
- # 运行 agent
- result = await run_agent(sub_trace)
- # 获取最终状态
- updated_trace = await self.store.get_trace(sub_trace_id)
- # 格式化结果
- formatted_result = await self._format_result(
- mode, result, updated_trace, options, current_trace_id
- )
- # 发送完成信号
- if self.signal_bus:
- self.signal_bus.emit(Signal(
- type="subagent.complete",
- trace_id=sub_trace_id,
- data={
- "parent_trace_id": current_trace_id,
- "result": formatted_result,
- "status": "completed"
- }
- ))
- # 推送事件
- await self.store.append_event(current_trace_id, "sub_trace_completed", {
- "trace_id": sub_trace_id,
- "status": "completed",
- "result": formatted_result,
- "stats": {
- "total_messages": updated_trace.total_messages if updated_trace else 0,
- "total_tokens": updated_trace.total_tokens if updated_trace else 0,
- "total_cost": updated_trace.total_cost if updated_trace else 0
- }
- })
- # 更新主 Goal
- await self._update_goal_after_completion(
- mode, current_trace_id, current_goal_id,
- formatted_result, options
- )
- except Exception as e:
- # 发送错误信号
- if self.signal_bus:
- self.signal_bus.emit(Signal(
- type="subagent.error",
- trace_id=sub_trace_id,
- data={
- "parent_trace_id": current_trace_id,
- "error": str(e),
- "mode": mode
- }
- ))
- # 推送失败事件
- await self.store.append_event(current_trace_id, "sub_trace_completed", {
- "trace_id": sub_trace_id,
- "status": "failed",
- "error": str(e)
- })
- # 更新主 Goal 为失败
- await self.store.update_goal(
- current_trace_id, current_goal_id,
- status="failed",
- summary=f"{mode} 失败: {str(e)}"
- )
- async def _wait_for_completion(
- self,
- sub_trace_id: str,
- current_trace_id: str,
- mode: str,
- timeout: float = 300.0 # 5 分钟超时
- ) -> Dict[str, Any]:
- """等待 Sub-Agent 完成信号"""
- start_time = asyncio.get_event_loop().time()
- while True:
- # 检查超时
- if asyncio.get_event_loop().time() - start_time > timeout:
- raise TimeoutError(f"{mode} Sub-Agent 超时({timeout}秒)")
- # 检查信号
- if self.signal_bus:
- signals = self.signal_bus.check_buffer(current_trace_id)
- for signal in signals:
- if signal.trace_id == sub_trace_id:
- if signal.type == "subagent.complete":
- return signal.data["result"]
- elif signal.type == "subagent.error":
- error = signal.data.get("error", "Unknown error")
- raise Exception(f"{mode} 失败: {error}")
- # 短暂休眠,避免忙等待
- await asyncio.sleep(0.1)
- def _get_allowed_tools(self, mode: str) -> Optional[List[str]]:
- """根据 mode 返回允许的工具列表"""
- if mode == "evaluate":
- return ["read_file", "grep_content", "glob_files"]
- elif mode == "explore":
- return ["read_file", "grep_content", "glob_files"]
- elif mode == "delegate":
- return None # 完整权限
- return None
- def _get_max_turns(self, mode: str) -> int:
- """根据 mode 返回最大轮次"""
- if mode == "evaluate":
- return 10
- elif mode == "explore":
- return 20
- elif mode == "delegate":
- return 50
- return 30
- def _get_task_summary(self, mode: str, options: Dict[str, Any]) -> str:
- """获取任务摘要(用于事件)"""
- if mode == "evaluate":
- target_goal_id = options.get("target_goal_id", "unknown")
- return f"评估 Goal {target_goal_id}"
- elif mode == "delegate":
- return options.get("task", "委托任务")
- elif mode == "explore":
- branches = options.get("branches", [])
- return f"探索 {len(branches)} 个方案"
- return "Sub-Agent 任务"
- async def _build_task_prompt(
- self,
- mode: str,
- options: Dict[str, Any],
- current_trace_id: str,
- continue_from: Optional[str]
- ) -> str:
- """构建任务 prompt"""
- if mode == "evaluate":
- return await self._build_evaluation_prompt(options, current_trace_id, continue_from)
- elif mode == "delegate":
- return options.get("task", "")
- elif mode == "explore":
- return self._build_exploration_prompt(options)
- return ""
- async def _build_evaluation_prompt(
- self,
- options: Dict[str, Any],
- current_trace_id: str,
- continue_from: Optional[str]
- ) -> str:
- """构建评估 prompt(参考 evaluate.py)"""
- target_goal_id = options.get("target_goal_id")
- evaluation_input = options.get("evaluation_input", {})
- requirements = options.get("requirements")
- # 获取被评估的 Goal
- goal_tree = await self.store.get_goal_tree(current_trace_id)
- if not goal_tree:
- raise ValueError(f"Goal tree not found for trace: {current_trace_id}")
- target_goal = goal_tree.find(target_goal_id)
- if not target_goal:
- raise ValueError(f"Target goal not found: {target_goal_id}")
- # 获取历史评估结果(如果是连续记忆)
- previous_results = []
- if continue_from and target_goal.evaluation_result:
- previous_results.append(target_goal.evaluation_result)
- # 构建 prompt
- lines = []
- lines.append("# 评估任务")
- lines.append("")
- lines.append("请评估以下任务的执行结果是否满足要求。")
- lines.append("")
- lines.append("## 目标描述")
- lines.append("")
- goal_description = evaluation_input.get("goal_description", target_goal.description)
- lines.append(goal_description)
- lines.append("")
- lines.append("## 执行结果")
- lines.append("")
- actual_result = evaluation_input.get("actual_result")
- if actual_result is not None:
- if isinstance(actual_result, str):
- lines.append(actual_result)
- else:
- import json
- lines.append("```json")
- lines.append(json.dumps(actual_result, ensure_ascii=False, indent=2))
- lines.append("```")
- else:
- lines.append("(无执行结果)")
- lines.append("")
- if requirements:
- lines.append("## 评估要求")
- lines.append("")
- lines.append(requirements)
- lines.append("")
- if previous_results:
- lines.append("## 历史评估记录")
- lines.append("")
- for i, prev in enumerate(previous_results, 1):
- lines.append(f"### 评估 #{i}")
- lines.append(f"- **结论**: {'通过' if prev.get('passed') else '不通过'}")
- lines.append(f"- **理由**: {prev.get('reason', '无')}")
- if prev.get('suggestions'):
- lines.append(f"- **建议**: {', '.join(prev.get('suggestions', []))}")
- lines.append("")
- lines.append("## 输出格式")
- lines.append("")
- lines.append("请按照以下格式输出评估结果:")
- lines.append("")
- lines.append("## 评估结论")
- lines.append("[通过/不通过]")
- lines.append("")
- lines.append("## 评估理由")
- lines.append("[详细说明为什么通过或不通过]")
- lines.append("")
- lines.append("## 修改建议(如果不通过)")
- lines.append("1. [具体的、可操作的建议1]")
- lines.append("2. [具体的、可操作的建议2]")
- return "\n".join(lines)
- def _build_exploration_prompt(self, options: Dict[str, Any]) -> str:
- """构建探索 prompt"""
- branches = options.get("branches", [])
- background = options.get("background", "")
- lines = []
- lines.append("# 探索任务")
- lines.append("")
- if background:
- lines.append(background)
- lines.append("")
- lines.append("请探索以下方案:")
- for i, branch in enumerate(branches, 1):
- lines.append(f"{i}. {branch}")
- return "\n".join(lines)
- async def _format_result(
- self,
- mode: str,
- result: Any,
- trace: Trace,
- options: Dict[str, Any],
- current_trace_id: str
- ) -> Dict[str, Any]:
- """根据 mode 格式化结果"""
- if mode == "evaluate":
- return self._parse_evaluation_result(result)
- elif mode == "delegate":
- summary = result.get("summary", "任务完成") if isinstance(result, dict) else "任务完成"
- return {
- "summary": summary,
- "stats": {
- "total_messages": trace.total_messages if trace else 0,
- "total_tokens": trace.total_tokens if trace else 0,
- "total_cost": trace.total_cost if trace else 0
- }
- }
- elif mode == "explore":
- return {"summary": result if isinstance(result, str) else "探索完成"}
- return {}
- def _parse_evaluation_result(self, agent_result: Any) -> Dict[str, Any]:
- """解析评估结果(参考 evaluate.py)"""
- last_message = agent_result if agent_result else None
- if not last_message:
- return {
- "passed": False,
- "reason": "评估 Agent 未返回结果",
- "suggestions": [],
- "details": {}
- }
- # 解析评估结论
- passed = False
- if "通过" in last_message and "不通过" not in last_message:
- passed = True
- elif "不通过" in last_message:
- passed = False
- # 提取评估理由
- reason = ""
- if "## 评估理由" in last_message:
- parts = last_message.split("## 评估理由")
- if len(parts) > 1:
- reason_section = parts[1].split("##")[0].strip()
- reason = reason_section
- # 提取修改建议
- suggestions = []
- if "## 修改建议" in last_message:
- parts = last_message.split("## 修改建议")
- if len(parts) > 1:
- suggestions_section = parts[1].split("##")[0].strip()
- for line in suggestions_section.split("\n"):
- line = line.strip()
- if line and (line.startswith("-") or line.startswith("*") or line[0].isdigit()):
- suggestion = line.lstrip("-*0123456789. ").strip()
- if suggestion:
- suggestions.append(suggestion)
- return {
- "passed": passed,
- "reason": reason if reason else last_message[:200],
- "suggestions": suggestions,
- "details": {"full_response": last_message}
- }
- async def _update_goal_after_completion(
- self,
- mode: str,
- current_trace_id: str,
- current_goal_id: str,
- result: Dict[str, Any],
- options: Dict[str, Any]
- ):
- """完成后更新 Goal"""
- if mode == "evaluate":
- await self.store.update_goal(
- current_trace_id, current_goal_id,
- evaluation_result=result,
- status="completed",
- summary=f"评估{'通过' if result.get('passed') else '不通过'}"
- )
- elif mode == "delegate":
- task = options.get("task", "任务")
- await self.store.update_goal(
- current_trace_id, current_goal_id,
- status="completed",
- summary=f"已委托完成: {task}"
- )
- elif mode == "explore":
- await self.store.update_goal(
- current_trace_id, current_goal_id,
- status="completed",
- summary="探索完成"
- )
|