| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606 |
- """
- Sub-Agent 工具 - 统一 explore/delegate/evaluate
- 作为普通工具运行:创建(或继承)子 Trace,执行并返回结构化结果。
- """
- import asyncio
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from agent.tools import tool
- from agent.trace.models import Trace
- from agent.trace.trace_id import generate_sub_trace_id
- from agent.trace.goal_models import GoalTree
- from agent.trace.websocket import broadcast_sub_trace_started, broadcast_sub_trace_completed
- def _build_explore_prompt(branches: List[str], background: Optional[str]) -> str:
- lines = ["# 探索任务", ""]
- if background:
- lines.extend([background, ""])
- lines.append("请探索以下方案:")
- for i, branch in enumerate(branches, 1):
- lines.append(f"{i}. {branch}")
- return "\n".join(lines)
- async def _build_evaluate_prompt(
- store,
- trace_id: str,
- target_goal_id: str,
- evaluation_input: Dict[str, Any],
- requirements: Optional[str],
- ) -> str:
- goal_tree = await store.get_goal_tree(trace_id)
- target_desc = ""
- if goal_tree:
- target_goal = goal_tree.find(target_goal_id)
- if target_goal:
- target_desc = target_goal.description
- goal_description = evaluation_input.get("goal_description") or target_desc or f"Goal {target_goal_id}"
- actual_result = evaluation_input.get("actual_result", "(无执行结果)")
- lines = [
- "# 评估任务",
- "",
- "请评估以下任务的执行结果是否满足要求。",
- "",
- "## 目标描述",
- "",
- str(goal_description),
- "",
- "## 执行结果",
- "",
- str(actual_result),
- "",
- ]
- if requirements:
- lines.extend(["## 评估要求", "", requirements, ""])
- lines.extend(
- [
- "## 输出格式",
- "",
- "## 评估结论",
- "[通过/不通过]",
- "",
- "## 评估理由",
- "[详细说明通过或不通过原因]",
- "",
- "## 修改建议(如果不通过)",
- "1. [建议1]",
- "2. [建议2]",
- ]
- )
- return "\n".join(lines)
- # ===== 辅助函数 =====
- async def _update_goal_start(
- store, trace_id: str, goal_id: str, mode: str, sub_trace_ids: List[str]
- ) -> None:
- """标记 Goal 开始执行"""
- if not goal_id:
- return
- await store.update_goal(
- trace_id, goal_id,
- type="agent_call",
- agent_call_mode=mode,
- status="in_progress",
- sub_trace_ids=sub_trace_ids
- )
- async def _update_goal_complete(
- store, trace_id: str, goal_id: str,
- status: str, summary: str, sub_trace_ids: List[str]
- ) -> None:
- """标记 Goal 完成"""
- if not goal_id:
- return
- await store.update_goal(
- trace_id, goal_id,
- status=status,
- summary=summary,
- sub_trace_ids=sub_trace_ids
- )
- def _format_explore_results(
- branches: List[str], results: List[Dict[str, Any]]
- ) -> str:
- """格式化 explore 模式的汇总结果(Markdown)"""
- lines = ["## 探索结果\n"]
- successful = 0
- failed = 0
- total_tokens = 0
- total_cost = 0.0
- for i, (branch, result) in enumerate(zip(branches, results)):
- branch_name = chr(ord('A') + i) # A, B, C...
- lines.append(f"### 方案 {branch_name}: {branch}")
- if isinstance(result, dict):
- status = result.get("status", "unknown")
- if status == "completed":
- lines.append("**状态**: ✓ 完成")
- successful += 1
- else:
- lines.append("**状态**: ✗ 失败")
- failed += 1
- summary = result.get("summary", "")
- if summary:
- lines.append(f"**摘要**: {summary[:200]}...") # 限制长度
- stats = result.get("stats", {})
- if stats:
- messages = stats.get("total_messages", 0)
- tokens = stats.get("total_tokens", 0)
- cost = stats.get("total_cost", 0.0)
- lines.append(f"**统计**: {messages} messages, {tokens} tokens, ${cost:.4f}")
- total_tokens += tokens
- total_cost += cost
- else:
- lines.append("**状态**: ✗ 异常")
- failed += 1
- lines.append("")
- lines.append("---\n")
- lines.append("## 总结")
- lines.append(f"- 总分支数: {len(branches)}")
- lines.append(f"- 成功: {successful}")
- lines.append(f"- 失败: {failed}")
- lines.append(f"- 总 tokens: {total_tokens}")
- lines.append(f"- 总成本: ${total_cost:.4f}")
- return "\n".join(lines)
- def _format_delegate_result(result: Dict[str, Any]) -> str:
- """格式化 delegate 模式的详细结果"""
- lines = ["## 委托任务完成\n"]
- summary = result.get("summary", "")
- if summary:
- lines.append(summary)
- lines.append("")
- lines.append("---\n")
- lines.append("**执行统计**:")
- stats = result.get("stats", {})
- if stats:
- lines.append(f"- 消息数: {stats.get('total_messages', 0)}")
- lines.append(f"- Tokens: {stats.get('total_tokens', 0)}")
- lines.append(f"- 成本: ${stats.get('total_cost', 0.0):.4f}")
- return "\n".join(lines)
- def _format_evaluate_result(result: Dict[str, Any]) -> str:
- """格式化 evaluate 模式的评估结果"""
- summary = result.get("summary", "")
- return summary # evaluate 的 summary 已经是格式化的评估结果
- def _get_allowed_tools_for_mode(mode: str, context: dict) -> Optional[List[str]]:
- """获取模式对应的允许工具列表"""
- if mode == "explore":
- return ["read_file", "grep_content", "glob_files", "goal"]
- elif mode in ["delegate", "evaluate"]:
- # 获取所有工具,排除 subagent
- runner = context.get("runner")
- if runner and hasattr(runner, "tools") and hasattr(runner.tools, "registry"):
- all_tools = list(runner.tools.registry.keys())
- return [t for t in all_tools if t != "subagent"]
- return None # 使用默认(所有工具)
- def _aggregate_stats(results: List[Dict[str, Any]]) -> Dict[str, Any]:
- """聚合多个结果的统计信息"""
- total_messages = 0
- total_tokens = 0
- total_cost = 0.0
- for result in results:
- if isinstance(result, dict) and "stats" in result:
- stats = result["stats"]
- total_messages += stats.get("total_messages", 0)
- total_tokens += stats.get("total_tokens", 0)
- total_cost += stats.get("total_cost", 0.0)
- return {
- "total_messages": total_messages,
- "total_tokens": total_tokens,
- "total_cost": total_cost
- }
- # ===== 模式处理函数 =====
- async def _handle_explore_mode(
- branches: List[str],
- background: Optional[str],
- continue_from: Optional[str],
- store, current_trace_id: str, current_goal_id: str, runner
- ) -> Dict[str, Any]:
- """Explore 模式:并行探索多个方案"""
- # 1. 检查 continue_from(不支持)
- if continue_from:
- return {
- "status": "failed",
- "error": "explore mode does not support continue_from parameter"
- }
- # 2. 创建所有 Sub-Traces
- sub_trace_ids = []
- tasks = []
- for i, branch in enumerate(branches):
- # 生成唯一的 sub_trace_id
- sub_trace_id = generate_sub_trace_id(current_trace_id, f"explore-{i+1:03d}")
- sub_trace_ids.append(sub_trace_id)
- # 创建 Sub-Trace
- parent_trace = await store.get_trace(current_trace_id)
- sub_trace = Trace(
- trace_id=sub_trace_id,
- mode="agent",
- task=branch,
- parent_trace_id=current_trace_id,
- parent_goal_id=current_goal_id,
- agent_type="explore",
- uid=parent_trace.uid if parent_trace else None,
- model=parent_trace.model if parent_trace else None,
- status="running",
- context={"subagent_mode": "explore", "created_by_tool": "subagent"},
- created_at=datetime.now(),
- )
- await store.create_trace(sub_trace)
- await store.update_goal_tree(sub_trace_id, GoalTree(mission=branch))
- # 广播 sub_trace_started
- await broadcast_sub_trace_started(
- current_trace_id, sub_trace_id, current_goal_id or "",
- "explore", branch
- )
- # 创建执行任务
- task_coro = runner.run_result(
- task=branch,
- trace_id=sub_trace_id,
- agent_type="explore",
- tools=["read_file", "grep_content", "glob_files", "goal"]
- )
- tasks.append(task_coro)
- # 3. 更新主 Goal 为 in_progress
- await _update_goal_start(store, current_trace_id, current_goal_id, "explore", sub_trace_ids)
- # 4. 并行执行所有分支
- results = await asyncio.gather(*tasks, return_exceptions=True)
- # 5. 处理结果并广播完成事件
- processed_results = []
- for i, result in enumerate(results):
- if isinstance(result, Exception):
- # 异常处理
- error_result = {
- "status": "failed",
- "summary": f"执行出错: {str(result)}",
- "stats": {"total_messages": 0, "total_tokens": 0, "total_cost": 0.0}
- }
- processed_results.append(error_result)
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_ids[i],
- "failed", str(result), {}
- )
- else:
- processed_results.append(result)
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_ids[i],
- result.get("status", "completed"),
- result.get("summary", ""),
- result.get("stats", {})
- )
- # 6. 格式化汇总结果
- aggregated_summary = _format_explore_results(branches, processed_results)
- # 7. 更新主 Goal 为 completed
- overall_status = "completed" if any(
- r.get("status") == "completed" for r in processed_results if isinstance(r, dict)
- ) else "failed"
- await _update_goal_complete(
- store, current_trace_id, current_goal_id,
- overall_status, aggregated_summary, sub_trace_ids
- )
- # 8. 返回结果
- return {
- "mode": "explore",
- "status": overall_status,
- "summary": aggregated_summary,
- "sub_trace_ids": sub_trace_ids,
- "branches": branches,
- "stats": _aggregate_stats(processed_results)
- }
- async def _handle_delegate_mode(
- task: str,
- continue_from: Optional[str],
- store, current_trace_id: str, current_goal_id: str, runner, context: dict
- ) -> Dict[str, Any]:
- """Delegate 模式:委托单个任务"""
- # 1. 处理 continue_from 或创建新 Sub-Trace
- if continue_from:
- existing_trace = await store.get_trace(continue_from)
- if not existing_trace:
- return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"}
- sub_trace_id = continue_from
- sub_trace_ids = [sub_trace_id]
- else:
- parent_trace = await store.get_trace(current_trace_id)
- sub_trace_id = generate_sub_trace_id(current_trace_id, "delegate")
- sub_trace = Trace(
- trace_id=sub_trace_id,
- mode="agent",
- task=task,
- parent_trace_id=current_trace_id,
- parent_goal_id=current_goal_id,
- agent_type="delegate",
- uid=parent_trace.uid if parent_trace else None,
- model=parent_trace.model if parent_trace else None,
- status="running",
- context={"subagent_mode": "delegate", "created_by_tool": "subagent"},
- created_at=datetime.now(),
- )
- await store.create_trace(sub_trace)
- await store.update_goal_tree(sub_trace_id, GoalTree(mission=task))
- sub_trace_ids = [sub_trace_id]
- # 广播 sub_trace_started
- await broadcast_sub_trace_started(
- current_trace_id, sub_trace_id, current_goal_id or "",
- "delegate", task
- )
- # 2. 更新主 Goal 为 in_progress
- await _update_goal_start(store, current_trace_id, current_goal_id, "delegate", sub_trace_ids)
- # 3. 执行任务
- try:
- allowed_tools = _get_allowed_tools_for_mode("delegate", context)
- result = await runner.run_result(
- task=task,
- trace_id=sub_trace_id,
- agent_type="delegate",
- tools=allowed_tools
- )
- # 4. 广播 sub_trace_completed
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_id,
- result.get("status", "completed"),
- result.get("summary", ""),
- result.get("stats", {})
- )
- # 5. 格式化结果
- formatted_summary = _format_delegate_result(result)
- # 6. 更新主 Goal 为 completed
- await _update_goal_complete(
- store, current_trace_id, current_goal_id,
- result.get("status", "completed"), formatted_summary, sub_trace_ids
- )
- # 7. 返回结果
- return {
- "mode": "delegate",
- "sub_trace_id": sub_trace_id,
- "continue_from": bool(continue_from),
- **result,
- "summary": formatted_summary
- }
- except Exception as e:
- # 错误处理
- error_msg = str(e)
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_id,
- "failed", error_msg, {}
- )
- await _update_goal_complete(
- store, current_trace_id, current_goal_id,
- "failed", f"委托任务失败: {error_msg}", sub_trace_ids
- )
- return {
- "mode": "delegate",
- "status": "failed",
- "error": error_msg,
- "sub_trace_id": sub_trace_id
- }
- async def _handle_evaluate_mode(
- target_goal_id: str,
- evaluation_input: Dict[str, Any],
- requirements: Optional[str],
- continue_from: Optional[str],
- store, current_trace_id: str, current_goal_id: str, runner, context: dict
- ) -> Dict[str, Any]:
- """Evaluate 模式:评估任务结果"""
- # 1. 构建评估 prompt
- task_prompt = await _build_evaluate_prompt(
- store, current_trace_id, target_goal_id,
- evaluation_input, requirements
- )
- # 2. 处理 continue_from 或创建新 Sub-Trace
- if continue_from:
- existing_trace = await store.get_trace(continue_from)
- if not existing_trace:
- return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"}
- sub_trace_id = continue_from
- sub_trace_ids = [sub_trace_id]
- else:
- parent_trace = await store.get_trace(current_trace_id)
- sub_trace_id = generate_sub_trace_id(current_trace_id, "evaluate")
- sub_trace = Trace(
- trace_id=sub_trace_id,
- mode="agent",
- task=task_prompt,
- parent_trace_id=current_trace_id,
- parent_goal_id=current_goal_id,
- agent_type="evaluate",
- uid=parent_trace.uid if parent_trace else None,
- model=parent_trace.model if parent_trace else None,
- status="running",
- context={"subagent_mode": "evaluate", "created_by_tool": "subagent"},
- created_at=datetime.now(),
- )
- await store.create_trace(sub_trace)
- await store.update_goal_tree(sub_trace_id, GoalTree(mission=task_prompt))
- sub_trace_ids = [sub_trace_id]
- # 广播 sub_trace_started
- await broadcast_sub_trace_started(
- current_trace_id, sub_trace_id, current_goal_id or "",
- "evaluate", task_prompt
- )
- # 3. 更新主 Goal 为 in_progress
- await _update_goal_start(store, current_trace_id, current_goal_id, "evaluate", sub_trace_ids)
- # 4. 执行评估
- try:
- allowed_tools = _get_allowed_tools_for_mode("evaluate", context)
- result = await runner.run_result(
- task=task_prompt,
- trace_id=sub_trace_id,
- agent_type="evaluate",
- tools=allowed_tools
- )
- # 5. 广播 sub_trace_completed
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_id,
- result.get("status", "completed"),
- result.get("summary", ""),
- result.get("stats", {})
- )
- # 6. 格式化结果
- formatted_summary = _format_evaluate_result(result)
- # 7. 更新主 Goal 为 completed
- await _update_goal_complete(
- store, current_trace_id, current_goal_id,
- result.get("status", "completed"), formatted_summary, sub_trace_ids
- )
- # 8. 返回结果
- return {
- "mode": "evaluate",
- "sub_trace_id": sub_trace_id,
- "continue_from": bool(continue_from),
- **result,
- "summary": formatted_summary
- }
- except Exception as e:
- # 错误处理
- error_msg = str(e)
- await broadcast_sub_trace_completed(
- current_trace_id, sub_trace_id,
- "failed", error_msg, {}
- )
- await _update_goal_complete(
- store, current_trace_id, current_goal_id,
- "failed", f"评估任务失败: {error_msg}", sub_trace_ids
- )
- return {
- "mode": "evaluate",
- "status": "failed",
- "error": error_msg,
- "sub_trace_id": sub_trace_id
- }
- @tool(description="创建 Sub-Agent 执行任务(evaluate/delegate/explore)")
- async def subagent(
- mode: str,
- task: Optional[str] = None,
- target_goal_id: Optional[str] = None,
- evaluation_input: Optional[Dict[str, Any]] = None,
- requirements: Optional[str] = None,
- branches: Optional[List[str]] = None,
- background: Optional[str] = None,
- continue_from: Optional[str] = None,
- context: Optional[dict] = None,
- ) -> Dict[str, Any]:
- # 1. 验证 context
- if not context:
- return {"status": "failed", "error": "context is required"}
- store = context.get("store")
- current_trace_id = context.get("trace_id")
- current_goal_id = context.get("goal_id")
- runner = context.get("runner")
- missing = []
- if not store:
- missing.append("store")
- if not current_trace_id:
- missing.append("trace_id")
- if not runner:
- missing.append("runner")
- if missing:
- return {"status": "failed", "error": f"Missing required context: {', '.join(missing)}"}
- # 2. 验证 mode
- if mode not in {"evaluate", "delegate", "explore"}:
- return {"status": "failed", "error": "Invalid mode: must be evaluate/delegate/explore"}
- # 3. 验证模式特定参数
- if mode == "delegate" and not task:
- return {"status": "failed", "error": "delegate mode requires task"}
- if mode == "explore" and not branches:
- return {"status": "failed", "error": "explore mode requires branches"}
- if mode == "evaluate" and (not target_goal_id or evaluation_input is None):
- return {"status": "failed", "error": "evaluate mode requires target_goal_id and evaluation_input"}
- # 4. 路由到模式处理函数
- if mode == "explore":
- return await _handle_explore_mode(
- branches, background, continue_from,
- store, current_trace_id, current_goal_id, runner
- )
- elif mode == "delegate":
- return await _handle_delegate_mode(
- task, continue_from,
- store, current_trace_id, current_goal_id, runner, context
- )
- else: # evaluate
- return await _handle_evaluate_mode(
- target_goal_id, evaluation_input, requirements, continue_from,
- store, current_trace_id, current_goal_id, runner, context
- )
|