""" Sub-Agent 工具 - 统一 explore/delegate/evaluate 作为普通工具运行:创建(或继承)子 Trace,执行并返回结构化结果。 """ import asyncio from datetime import datetime from typing import Any, Dict, List, Optional from agent.tools import tool from agent.trace.models import Trace from agent.trace.trace_id import generate_sub_trace_id from agent.trace.goal_models import GoalTree from agent.trace.websocket import broadcast_sub_trace_started, broadcast_sub_trace_completed def _build_explore_prompt(branches: List[str], background: Optional[str]) -> str: lines = ["# 探索任务", ""] if background: lines.extend([background, ""]) lines.append("请探索以下方案:") for i, branch in enumerate(branches, 1): lines.append(f"{i}. {branch}") return "\n".join(lines) async def _build_evaluate_prompt( store, trace_id: str, target_goal_id: str, evaluation_input: Dict[str, Any], requirements: Optional[str], ) -> str: goal_tree = await store.get_goal_tree(trace_id) target_desc = "" if goal_tree: target_goal = goal_tree.find(target_goal_id) if target_goal: target_desc = target_goal.description goal_description = evaluation_input.get("goal_description") or target_desc or f"Goal {target_goal_id}" actual_result = evaluation_input.get("actual_result", "(无执行结果)") lines = [ "# 评估任务", "", "请评估以下任务的执行结果是否满足要求。", "", "## 目标描述", "", str(goal_description), "", "## 执行结果", "", str(actual_result), "", ] if requirements: lines.extend(["## 评估要求", "", requirements, ""]) lines.extend( [ "## 输出格式", "", "## 评估结论", "[通过/不通过]", "", "## 评估理由", "[详细说明通过或不通过原因]", "", "## 修改建议(如果不通过)", "1. [建议1]", "2. [建议2]", ] ) return "\n".join(lines) # ===== 辅助函数 ===== async def _update_goal_start( store, trace_id: str, goal_id: str, mode: str, sub_trace_ids: List[str] ) -> None: """标记 Goal 开始执行""" if not goal_id: return await store.update_goal( trace_id, goal_id, type="agent_call", agent_call_mode=mode, status="in_progress", sub_trace_ids=sub_trace_ids ) async def _update_goal_complete( store, trace_id: str, goal_id: str, status: str, summary: str, sub_trace_ids: List[str] ) -> None: """标记 Goal 完成""" if not goal_id: return await store.update_goal( trace_id, goal_id, status=status, summary=summary, sub_trace_ids=sub_trace_ids ) def _format_explore_results( branches: List[str], results: List[Dict[str, Any]] ) -> str: """格式化 explore 模式的汇总结果(Markdown)""" lines = ["## 探索结果\n"] successful = 0 failed = 0 total_tokens = 0 total_cost = 0.0 for i, (branch, result) in enumerate(zip(branches, results)): branch_name = chr(ord('A') + i) # A, B, C... lines.append(f"### 方案 {branch_name}: {branch}") if isinstance(result, dict): status = result.get("status", "unknown") if status == "completed": lines.append("**状态**: ✓ 完成") successful += 1 else: lines.append("**状态**: ✗ 失败") failed += 1 summary = result.get("summary", "") if summary: lines.append(f"**摘要**: {summary[:200]}...") # 限制长度 stats = result.get("stats", {}) if stats: messages = stats.get("total_messages", 0) tokens = stats.get("total_tokens", 0) cost = stats.get("total_cost", 0.0) lines.append(f"**统计**: {messages} messages, {tokens} tokens, ${cost:.4f}") total_tokens += tokens total_cost += cost else: lines.append("**状态**: ✗ 异常") failed += 1 lines.append("") lines.append("---\n") lines.append("## 总结") lines.append(f"- 总分支数: {len(branches)}") lines.append(f"- 成功: {successful}") lines.append(f"- 失败: {failed}") lines.append(f"- 总 tokens: {total_tokens}") lines.append(f"- 总成本: ${total_cost:.4f}") return "\n".join(lines) def _format_delegate_result(result: Dict[str, Any]) -> str: """格式化 delegate 模式的详细结果""" lines = ["## 委托任务完成\n"] summary = result.get("summary", "") if summary: lines.append(summary) lines.append("") lines.append("---\n") lines.append("**执行统计**:") stats = result.get("stats", {}) if stats: lines.append(f"- 消息数: {stats.get('total_messages', 0)}") lines.append(f"- Tokens: {stats.get('total_tokens', 0)}") lines.append(f"- 成本: ${stats.get('total_cost', 0.0):.4f}") return "\n".join(lines) def _format_evaluate_result(result: Dict[str, Any]) -> str: """格式化 evaluate 模式的评估结果""" summary = result.get("summary", "") return summary # evaluate 的 summary 已经是格式化的评估结果 def _get_allowed_tools_for_mode(mode: str, context: dict) -> Optional[List[str]]: """获取模式对应的允许工具列表""" if mode == "explore": return ["read_file", "grep_content", "glob_files", "goal"] elif mode in ["delegate", "evaluate"]: # 获取所有工具,排除 subagent runner = context.get("runner") if runner and hasattr(runner, "tools") and hasattr(runner.tools, "registry"): all_tools = list(runner.tools.registry.keys()) return [t for t in all_tools if t != "subagent"] return None # 使用默认(所有工具) def _aggregate_stats(results: List[Dict[str, Any]]) -> Dict[str, Any]: """聚合多个结果的统计信息""" total_messages = 0 total_tokens = 0 total_cost = 0.0 for result in results: if isinstance(result, dict) and "stats" in result: stats = result["stats"] total_messages += stats.get("total_messages", 0) total_tokens += stats.get("total_tokens", 0) total_cost += stats.get("total_cost", 0.0) return { "total_messages": total_messages, "total_tokens": total_tokens, "total_cost": total_cost } # ===== 模式处理函数 ===== async def _handle_explore_mode( branches: List[str], background: Optional[str], continue_from: Optional[str], store, current_trace_id: str, current_goal_id: str, runner ) -> Dict[str, Any]: """Explore 模式:并行探索多个方案""" # 1. 检查 continue_from(不支持) if continue_from: return { "status": "failed", "error": "explore mode does not support continue_from parameter" } # 2. 创建所有 Sub-Traces sub_trace_ids = [] tasks = [] for i, branch in enumerate(branches): # 生成唯一的 sub_trace_id sub_trace_id = generate_sub_trace_id(current_trace_id, f"explore-{i+1:03d}") sub_trace_ids.append({ "trace_id": sub_trace_id, "mission": branch }) # 创建 Sub-Trace parent_trace = await store.get_trace(current_trace_id) sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=branch, parent_trace_id=current_trace_id, parent_goal_id=current_goal_id, agent_type="explore", uid=parent_trace.uid if parent_trace else None, model=parent_trace.model if parent_trace else None, status="running", context={"subagent_mode": "explore", "created_by_tool": "subagent"}, created_at=datetime.now(), ) await store.create_trace(sub_trace) await store.update_goal_tree(sub_trace_id, GoalTree(mission=branch)) # 广播 sub_trace_started await broadcast_sub_trace_started( current_trace_id, sub_trace_id, current_goal_id or "", "explore", branch ) # 创建执行任务 task_coro = runner.run_result( task=branch, trace_id=sub_trace_id, agent_type="explore", tools=["read_file", "grep_content", "glob_files", "goal"] ) tasks.append(task_coro) # 3. 更新主 Goal 为 in_progress await _update_goal_start(store, current_trace_id, current_goal_id, "explore", sub_trace_ids) # 4. 并行执行所有分支 results = await asyncio.gather(*tasks, return_exceptions=True) # 5. 处理结果并广播完成事件 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): # 异常处理 error_result = { "status": "failed", "summary": f"执行出错: {str(result)}", "stats": {"total_messages": 0, "total_tokens": 0, "total_cost": 0.0} } processed_results.append(error_result) await broadcast_sub_trace_completed( current_trace_id, sub_trace_ids[i]["trace_id"], "failed", str(result), {} ) else: processed_results.append(result) await broadcast_sub_trace_completed( current_trace_id, sub_trace_ids[i]["trace_id"], result.get("status", "completed"), result.get("summary", ""), result.get("stats", {}) ) # 6. 格式化汇总结果 aggregated_summary = _format_explore_results(branches, processed_results) # 7. 更新主 Goal 为 completed overall_status = "completed" if any( r.get("status") == "completed" for r in processed_results if isinstance(r, dict) ) else "failed" await _update_goal_complete( store, current_trace_id, current_goal_id, overall_status, aggregated_summary, sub_trace_ids ) # 8. 返回结果 return { "mode": "explore", "status": overall_status, "summary": aggregated_summary, "sub_trace_ids": sub_trace_ids, "branches": branches, "stats": _aggregate_stats(processed_results) } async def _handle_delegate_mode( task: str, continue_from: Optional[str], store, current_trace_id: str, current_goal_id: str, runner, context: dict ) -> Dict[str, Any]: """Delegate 模式:委托单个任务""" # 1. 处理 continue_from 或创建新 Sub-Trace if continue_from: existing_trace = await store.get_trace(continue_from) if not existing_trace: return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"} sub_trace_id = continue_from # 获取 mission goal_tree = await store.get_goal_tree(continue_from) mission = goal_tree.mission if goal_tree else task sub_trace_ids = [{"trace_id": sub_trace_id, "mission": mission}] else: parent_trace = await store.get_trace(current_trace_id) sub_trace_id = generate_sub_trace_id(current_trace_id, "delegate") sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=task, parent_trace_id=current_trace_id, parent_goal_id=current_goal_id, agent_type="delegate", uid=parent_trace.uid if parent_trace else None, model=parent_trace.model if parent_trace else None, status="running", context={"subagent_mode": "delegate", "created_by_tool": "subagent"}, created_at=datetime.now(), ) await store.create_trace(sub_trace) await store.update_goal_tree(sub_trace_id, GoalTree(mission=task)) sub_trace_ids = [{"trace_id": sub_trace_id, "mission": task}] # 广播 sub_trace_started await broadcast_sub_trace_started( current_trace_id, sub_trace_id, current_goal_id or "", "delegate", task ) # 2. 更新主 Goal 为 in_progress await _update_goal_start(store, current_trace_id, current_goal_id, "delegate", sub_trace_ids) # 3. 执行任务 try: allowed_tools = _get_allowed_tools_for_mode("delegate", context) result = await runner.run_result( task=task, trace_id=sub_trace_id, agent_type="delegate", tools=allowed_tools ) # 4. 广播 sub_trace_completed await broadcast_sub_trace_completed( current_trace_id, sub_trace_id, result.get("status", "completed"), result.get("summary", ""), result.get("stats", {}) ) # 5. 格式化结果 formatted_summary = _format_delegate_result(result) # 6. 更新主 Goal 为 completed await _update_goal_complete( store, current_trace_id, current_goal_id, result.get("status", "completed"), formatted_summary, sub_trace_ids ) # 7. 返回结果 return { "mode": "delegate", "sub_trace_id": sub_trace_id, "continue_from": bool(continue_from), **result, "summary": formatted_summary } except Exception as e: # 错误处理 error_msg = str(e) await broadcast_sub_trace_completed( current_trace_id, sub_trace_id, "failed", error_msg, {} ) await _update_goal_complete( store, current_trace_id, current_goal_id, "failed", f"委托任务失败: {error_msg}", sub_trace_ids ) return { "mode": "delegate", "status": "failed", "error": error_msg, "sub_trace_id": sub_trace_id } async def _handle_evaluate_mode( target_goal_id: str, evaluation_input: Dict[str, Any], requirements: Optional[str], continue_from: Optional[str], store, current_trace_id: str, current_goal_id: str, runner, context: dict ) -> Dict[str, Any]: """Evaluate 模式:评估任务结果""" # 1. 构建评估 prompt task_prompt = await _build_evaluate_prompt( store, current_trace_id, target_goal_id, evaluation_input, requirements ) # 2. 处理 continue_from 或创建新 Sub-Trace if continue_from: existing_trace = await store.get_trace(continue_from) if not existing_trace: return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"} sub_trace_id = continue_from # 获取 mission goal_tree = await store.get_goal_tree(continue_from) mission = goal_tree.mission if goal_tree else task_prompt sub_trace_ids = [{"trace_id": sub_trace_id, "mission": mission}] else: parent_trace = await store.get_trace(current_trace_id) sub_trace_id = generate_sub_trace_id(current_trace_id, "evaluate") sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=task_prompt, parent_trace_id=current_trace_id, parent_goal_id=current_goal_id, agent_type="evaluate", uid=parent_trace.uid if parent_trace else None, model=parent_trace.model if parent_trace else None, status="running", context={"subagent_mode": "evaluate", "created_by_tool": "subagent"}, created_at=datetime.now(), ) await store.create_trace(sub_trace) await store.update_goal_tree(sub_trace_id, GoalTree(mission=task_prompt)) sub_trace_ids = [{"trace_id": sub_trace_id, "mission": task_prompt}] # 广播 sub_trace_started await broadcast_sub_trace_started( current_trace_id, sub_trace_id, current_goal_id or "", "evaluate", task_prompt ) # 3. 更新主 Goal 为 in_progress await _update_goal_start(store, current_trace_id, current_goal_id, "evaluate", sub_trace_ids) # 4. 执行评估 try: allowed_tools = _get_allowed_tools_for_mode("evaluate", context) result = await runner.run_result( task=task_prompt, trace_id=sub_trace_id, agent_type="evaluate", tools=allowed_tools ) # 5. 广播 sub_trace_completed await broadcast_sub_trace_completed( current_trace_id, sub_trace_id, result.get("status", "completed"), result.get("summary", ""), result.get("stats", {}) ) # 6. 格式化结果 formatted_summary = _format_evaluate_result(result) # 7. 更新主 Goal 为 completed await _update_goal_complete( store, current_trace_id, current_goal_id, result.get("status", "completed"), formatted_summary, sub_trace_ids ) # 8. 返回结果 return { "mode": "evaluate", "sub_trace_id": sub_trace_id, "continue_from": bool(continue_from), **result, "summary": formatted_summary } except Exception as e: # 错误处理 error_msg = str(e) await broadcast_sub_trace_completed( current_trace_id, sub_trace_id, "failed", error_msg, {} ) await _update_goal_complete( store, current_trace_id, current_goal_id, "failed", f"评估任务失败: {error_msg}", sub_trace_ids ) return { "mode": "evaluate", "status": "failed", "error": error_msg, "sub_trace_id": sub_trace_id } @tool(description="创建 Sub-Agent 执行任务(evaluate/delegate/explore)") async def subagent( mode: str, task: Optional[str] = None, target_goal_id: Optional[str] = None, evaluation_input: Optional[Dict[str, Any]] = None, requirements: Optional[str] = None, branches: Optional[List[str]] = None, background: Optional[str] = None, continue_from: Optional[str] = None, context: Optional[dict] = None, ) -> Dict[str, Any]: # 1. 验证 context if not context: return {"status": "failed", "error": "context is required"} store = context.get("store") current_trace_id = context.get("trace_id") current_goal_id = context.get("goal_id") runner = context.get("runner") missing = [] if not store: missing.append("store") if not current_trace_id: missing.append("trace_id") if not runner: missing.append("runner") if missing: return {"status": "failed", "error": f"Missing required context: {', '.join(missing)}"} # 2. 验证 mode if mode not in {"evaluate", "delegate", "explore"}: return {"status": "failed", "error": "Invalid mode: must be evaluate/delegate/explore"} # 3. 验证模式特定参数 if mode == "delegate" and not task: return {"status": "failed", "error": "delegate mode requires task"} if mode == "explore" and not branches: return {"status": "failed", "error": "explore mode requires branches"} if mode == "evaluate" and (not target_goal_id or evaluation_input is None): return {"status": "failed", "error": "evaluate mode requires target_goal_id and evaluation_input"} # 4. 路由到模式处理函数 if mode == "explore": return await _handle_explore_mode( branches, background, continue_from, store, current_trace_id, current_goal_id, runner ) elif mode == "delegate": return await _handle_delegate_mode( task, continue_from, store, current_trace_id, current_goal_id, runner, context ) else: # evaluate return await _handle_evaluate_mode( target_goal_id, evaluation_input, requirements, continue_from, store, current_trace_id, current_goal_id, runner, context )