""" Sub-Agent 工具 - agent / evaluate agent: 创建 Agent 执行任务(单任务 delegate 或多任务并行 explore) evaluate: 评估目标执行结果是否满足要求 """ import asyncio from datetime import datetime from typing import Any, Dict, List, Optional, Union from agent.tools import tool from agent.trace.models import Trace, Messages from agent.trace.trace_id import generate_sub_trace_id from agent.trace.goal_models import GoalTree from agent.trace.websocket import broadcast_sub_trace_started, broadcast_sub_trace_completed def _make_run_config(**kwargs): """延迟导入 RunConfig 以避免循环导入""" from agent.core.runner import RunConfig return RunConfig(**kwargs) # ===== 辅助函数 ===== async def _update_collaborator( store, trace_id: str, name: str, sub_trace_id: str, status: str, summary: str = "", ) -> None: """ 更新 trace.context["collaborators"] 中的协作者信息。 如果同名协作者已存在则更新,否则追加。 """ trace = await store.get_trace(trace_id) if not trace: return collaborators = trace.context.get("collaborators", []) # 查找已有记录 existing = None for c in collaborators: if c.get("trace_id") == sub_trace_id: existing = c break if existing: existing["status"] = status if summary: existing["summary"] = summary else: collaborators.append({ "name": name, "type": "agent", "trace_id": sub_trace_id, "status": status, "summary": summary, }) trace.context["collaborators"] = collaborators await store.update_trace(trace_id, context=trace.context) async def _update_goal_start( store, trace_id: str, goal_id: str, mode: str, sub_trace_ids: List[str] ) -> None: """标记 Goal 开始执行""" if not goal_id: return await store.update_goal( trace_id, goal_id, type="agent_call", agent_call_mode=mode, status="in_progress", sub_trace_ids=sub_trace_ids ) async def _update_goal_complete( store, trace_id: str, goal_id: str, status: str, summary: str, sub_trace_ids: List[str] ) -> None: """标记 Goal 完成""" if not goal_id: return await store.update_goal( trace_id, goal_id, status=status, summary=summary, sub_trace_ids=sub_trace_ids ) def _aggregate_stats(results: List[Dict[str, Any]]) -> Dict[str, Any]: """聚合多个结果的统计信息""" total_messages = 0 total_tokens = 0 total_cost = 0.0 for result in results: if isinstance(result, dict) and "stats" in result: stats = result["stats"] total_messages += stats.get("total_messages", 0) total_tokens += stats.get("total_tokens", 0) total_cost += stats.get("total_cost", 0.0) return { "total_messages": total_messages, "total_tokens": total_tokens, "total_cost": total_cost } def _get_allowed_tools(single: bool, context: dict) -> Optional[List[str]]: """获取允许工具列表。single=True: 全部(去掉 agent/evaluate); single=False: 只读""" if not single: return ["read_file", "grep_content", "glob_files", "goal"] # single (delegate): 获取所有工具,排除 agent 和 evaluate runner = context.get("runner") if runner and hasattr(runner, "tools") and hasattr(runner.tools, "registry"): all_tools = list(runner.tools.registry.keys()) return [t for t in all_tools if t not in ("agent", "evaluate")] return None def _format_single_result(result: Dict[str, Any], sub_trace_id: str, continued: bool) -> Dict[str, Any]: """格式化单任务(delegate)结果""" lines = ["## 委托任务完成\n"] summary = result.get("summary", "") if summary: lines.append(summary) lines.append("") lines.append("---\n") lines.append("**执行统计**:") stats = result.get("stats", {}) if stats: lines.append(f"- 消息数: {stats.get('total_messages', 0)}") lines.append(f"- Tokens: {stats.get('total_tokens', 0)}") lines.append(f"- 成本: ${stats.get('total_cost', 0.0):.4f}") formatted_summary = "\n".join(lines) return { "mode": "delegate", "sub_trace_id": sub_trace_id, "continue_from": continued, **result, "summary": formatted_summary, } def _format_multi_result( tasks: List[str], results: List[Dict[str, Any]], sub_trace_ids: List[Dict] ) -> Dict[str, Any]: """格式化多任务(explore)聚合结果""" lines = ["## 探索结果\n"] successful = 0 failed = 0 total_tokens = 0 total_cost = 0.0 for i, (task_item, result) in enumerate(zip(tasks, results)): branch_name = chr(ord('A') + i) lines.append(f"### 方案 {branch_name}: {task_item}") if isinstance(result, dict): status = result.get("status", "unknown") if status == "completed": lines.append("**状态**: ✓ 完成") successful += 1 else: lines.append("**状态**: ✗ 失败") failed += 1 summary = result.get("summary", "") if summary: lines.append(f"**摘要**: {summary[:200]}...") stats = result.get("stats", {}) if stats: messages = stats.get("total_messages", 0) tokens = stats.get("total_tokens", 0) cost = stats.get("total_cost", 0.0) lines.append(f"**统计**: {messages} messages, {tokens} tokens, ${cost:.4f}") total_tokens += tokens total_cost += cost else: lines.append("**状态**: ✗ 异常") failed += 1 lines.append("") lines.append("---\n") lines.append("## 总结") lines.append(f"- 总分支数: {len(tasks)}") lines.append(f"- 成功: {successful}") lines.append(f"- 失败: {failed}") lines.append(f"- 总 tokens: {total_tokens}") lines.append(f"- 总成本: ${total_cost:.4f}") aggregated_summary = "\n".join(lines) overall_status = "completed" if successful > 0 else "failed" return { "mode": "explore", "status": overall_status, "summary": aggregated_summary, "sub_trace_ids": sub_trace_ids, "tasks": tasks, "stats": _aggregate_stats(results), } async def _get_goal_description(store, trace_id: str, goal_id: str) -> str: """从 GoalTree 获取目标描述""" if not goal_id: return "" goal_tree = await store.get_goal_tree(trace_id) if goal_tree: target_goal = goal_tree.find(goal_id) if target_goal: return target_goal.description return f"Goal {goal_id}" def _build_evaluate_prompt(goal_description: str, messages: Optional[Messages]) -> str: """ 构建评估 prompt。 Args: goal_description: 代码从 GoalTree 注入的目标描述 messages: 模型提供的消息(执行结果+上下文) """ # 从 messages 提取文本内容 result_text = "" if messages: parts = [] for msg in messages: content = msg.get("content", "") if isinstance(content, str): parts.append(content) elif isinstance(content, list): # 多模态内容,提取文本部分 for item in content: if isinstance(item, dict) and item.get("type") == "text": parts.append(item.get("text", "")) result_text = "\n".join(parts) lines = [ "# 评估任务", "", "请评估以下任务的执行结果是否满足要求。", "", "## 目标描述", "", goal_description, "", "## 执行结果", "", result_text or "(无执行结果)", "", "## 输出格式", "", "## 评估结论", "[通过/不通过]", "", "## 评估理由", "[详细说明通过或不通过原因]", "", "## 修改建议(如果不通过)", "1. [建议1]", "2. [建议2]", ] return "\n".join(lines) def _make_event_printer(label: str): """ 创建子 Agent 执行过程打印函数。 当父 runner.debug=True 时,传给 run_result(on_event=...), 实时输出子 Agent 的工具调用和助手消息。 """ prefix = f" [{label}]" def on_event(item): from agent.trace.models import Trace, Message if isinstance(item, Message): if item.role == "assistant": content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if text: preview = text[:120] + "..." if len(text) > 120 else text print(f"{prefix} {preview}") if tool_calls: for tc in tool_calls: name = tc.get("function", {}).get("name", "unknown") print(f"{prefix} 🛠️ {name}") elif item.role == "tool": content = item.content if isinstance(content, dict): name = content.get("tool_name", "unknown") desc = item.description or "" desc_short = (desc[:60] + "...") if len(desc) > 60 else desc suffix = f": {desc_short}" if desc_short else "" print(f"{prefix} ✅ {name}{suffix}") elif isinstance(item, Trace): if item.status == "completed": print(f"{prefix} ✓ 完成") elif item.status == "failed": err = (item.error_message or "")[:80] print(f"{prefix} ✗ 失败: {err}") return on_event # ===== 统一内部执行函数 ===== async def _run_agents( tasks: List[str], per_agent_msgs: List[Messages], continue_from: Optional[str], store, trace_id: str, goal_id: str, runner, context: dict, agent_type: Optional[str] = None, skills: Optional[List[str]] = None, ) -> Dict[str, Any]: """ 统一 agent 执行逻辑。 single (len(tasks)==1): delegate 模式,全量工具(排除 agent/evaluate) multi (len(tasks)>1): explore 模式,只读工具,并行执行 """ single = len(tasks) == 1 parent_trace = await store.get_trace(trace_id) # continue_from: 复用已有 trace(仅 single) sub_trace_id = None continued = False if single and continue_from: existing = await store.get_trace(continue_from) if not existing: return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"} sub_trace_id = continue_from continued = True goal_tree = await store.get_goal_tree(continue_from) mission = goal_tree.mission if goal_tree else tasks[0] sub_trace_ids = [{"trace_id": sub_trace_id, "mission": mission}] else: sub_trace_ids = [] # 创建 sub-traces 和执行协程 coros = [] all_sub_trace_ids = list(sub_trace_ids) # copy for continue_from case for i, (task_item, msgs) in enumerate(zip(tasks, per_agent_msgs)): if single and continued: # continue_from 已经设置了 sub_trace_id pass else: resolved_agent_type = agent_type or ("delegate" if single else "explore") suffix = "delegate" if single else f"explore-{i+1:03d}" stid = generate_sub_trace_id(trace_id, suffix) sub_trace = Trace( trace_id=stid, mode="agent", task=task_item, parent_trace_id=trace_id, parent_goal_id=goal_id, agent_type=resolved_agent_type, uid=parent_trace.uid if parent_trace else None, model=parent_trace.model if parent_trace else None, status="running", context={"created_by_tool": "agent"}, created_at=datetime.now(), ) await store.create_trace(sub_trace) await store.update_goal_tree(stid, GoalTree(mission=task_item)) all_sub_trace_ids.append({"trace_id": stid, "mission": task_item}) # 广播 sub_trace_started await broadcast_sub_trace_started( trace_id, stid, goal_id or "", resolved_agent_type, task_item, ) if single: sub_trace_id = stid # 注册为活跃协作者 cur_stid = sub_trace_id if single else all_sub_trace_ids[-1]["trace_id"] collab_name = task_item[:30] if single and not continued else ( f"delegate-{cur_stid[:8]}" if single else f"explore-{i+1}" ) await _update_collaborator( store, trace_id, name=collab_name, sub_trace_id=cur_stid, status="running", summary=task_item[:80], ) # 构建消息 agent_msgs = list(msgs) + [{"role": "user", "content": task_item}] allowed_tools = _get_allowed_tools(single, context) debug = getattr(runner, 'debug', False) agent_label = (agent_type or ("delegate" if single else f"explore-{i+1}")) on_event = _make_event_printer(agent_label) if debug else None coro = runner.run_result( messages=agent_msgs, config=_make_run_config( trace_id=cur_stid, agent_type=agent_type or ("delegate" if single else "explore"), model=parent_trace.model if parent_trace else "gpt-4o", uid=parent_trace.uid if parent_trace else None, tools=allowed_tools, name=task_item[:50], skills=skills, ), on_event=on_event, ) coros.append((i, cur_stid, collab_name, coro)) # 更新主 Goal 为 in_progress await _update_goal_start( store, trace_id, goal_id, "delegate" if single else "explore", all_sub_trace_ids, ) # 执行 if single: # 单任务直接执行(带异常处理) _, stid, collab_name, coro = coros[0] try: result = await coro await broadcast_sub_trace_completed( trace_id, stid, result.get("status", "completed"), result.get("summary", ""), result.get("stats", {}), ) await _update_collaborator( store, trace_id, name=collab_name, sub_trace_id=stid, status=result.get("status", "completed"), summary=result.get("summary", "")[:80], ) formatted = _format_single_result(result, stid, continued) await _update_goal_complete( store, trace_id, goal_id, result.get("status", "completed"), formatted["summary"], all_sub_trace_ids, ) return formatted except Exception as e: error_msg = str(e) await broadcast_sub_trace_completed( trace_id, stid, "failed", error_msg, {}, ) await _update_collaborator( store, trace_id, name=collab_name, sub_trace_id=stid, status="failed", summary=error_msg[:80], ) await _update_goal_complete( store, trace_id, goal_id, "failed", f"委托任务失败: {error_msg}", all_sub_trace_ids, ) return { "mode": "delegate", "status": "failed", "error": error_msg, "sub_trace_id": stid, } else: # 多任务并行执行 raw_results = await asyncio.gather( *(coro for _, _, _, coro in coros), return_exceptions=True, ) processed_results = [] for idx, raw in enumerate(raw_results): _, stid, collab_name, _ = coros[idx] if isinstance(raw, Exception): error_result = { "status": "failed", "summary": f"执行出错: {str(raw)}", "stats": {"total_messages": 0, "total_tokens": 0, "total_cost": 0.0}, } processed_results.append(error_result) await broadcast_sub_trace_completed( trace_id, stid, "failed", str(raw), {}, ) await _update_collaborator( store, trace_id, name=collab_name, sub_trace_id=stid, status="failed", summary=str(raw)[:80], ) else: processed_results.append(raw) await broadcast_sub_trace_completed( trace_id, stid, raw.get("status", "completed"), raw.get("summary", ""), raw.get("stats", {}), ) await _update_collaborator( store, trace_id, name=collab_name, sub_trace_id=stid, status=raw.get("status", "completed"), summary=raw.get("summary", "")[:80], ) formatted = _format_multi_result(tasks, processed_results, all_sub_trace_ids) await _update_goal_complete( store, trace_id, goal_id, formatted["status"], formatted["summary"], all_sub_trace_ids, ) return formatted # ===== 工具定义 ===== @tool(description="创建 Agent 执行任务") async def agent( task: Union[str, List[str]], messages: Optional[Union[Messages, List[Messages]]] = None, continue_from: Optional[str] = None, agent_type: Optional[str] = None, skills: Optional[List[str]] = None, context: Optional[dict] = None, ) -> Dict[str, Any]: """ 创建 Agent 执行任务。 单任务 (task: str): delegate 模式,全量工具 多任务 (task: List[str]): explore 模式,只读工具,并行执行 Args: task: 任务描述。字符串=单任务,列表=多任务并行 messages: 预置消息。1D 列表=所有 agent 共享;2D 列表=per-agent continue_from: 继续已有 trace(仅单任务) agent_type: 子 Agent 类型,决定 preset 和默认 skills(如 "deconstruct") skills: 附加到 system prompt 的 skill 名称列表,覆盖 preset 默认值 context: 框架自动注入的上下文 """ if not context: return {"status": "failed", "error": "context is required"} store = context.get("store") trace_id = context.get("trace_id") goal_id = context.get("goal_id") runner = context.get("runner") missing = [] if not store: missing.append("store") if not trace_id: missing.append("trace_id") if not runner: missing.append("runner") if missing: return {"status": "failed", "error": f"Missing required context: {', '.join(missing)}"} # 归一化 task → list single = isinstance(task, str) tasks = [task] if single else task if not tasks: return {"status": "failed", "error": "task is required"} # 归一化 messages → List[Messages](per-agent) if messages is None: per_agent_msgs: List[Messages] = [[] for _ in tasks] elif messages and isinstance(messages[0], list): per_agent_msgs = messages # 2D: per-agent else: per_agent_msgs = [messages] * len(tasks) # 1D: 共享 if continue_from and not single: return {"status": "failed", "error": "continue_from requires single task"} return await _run_agents( tasks, per_agent_msgs, continue_from, store, trace_id, goal_id, runner, context, agent_type=agent_type, skills=skills, ) @tool(description="评估目标执行结果是否满足要求") async def evaluate( messages: Optional[Messages] = None, target_goal_id: Optional[str] = None, continue_from: Optional[str] = None, context: Optional[dict] = None, ) -> Dict[str, Any]: """ 评估目标执行结果是否满足要求。 代码自动从 GoalTree 注入目标描述。模型把执行结果和上下文放在 messages 中。 Args: messages: 执行结果和上下文消息(OpenAI 格式) target_goal_id: 要评估的目标 ID(默认当前 goal_id) continue_from: 继续已有评估 trace context: 框架自动注入的上下文 """ if not context: return {"status": "failed", "error": "context is required"} store = context.get("store") trace_id = context.get("trace_id") current_goal_id = context.get("goal_id") runner = context.get("runner") missing = [] if not store: missing.append("store") if not trace_id: missing.append("trace_id") if not runner: missing.append("runner") if missing: return {"status": "failed", "error": f"Missing required context: {', '.join(missing)}"} # target_goal_id 默认 context["goal_id"] goal_id = target_goal_id or current_goal_id # 从 GoalTree 获取目标描述 goal_desc = await _get_goal_description(store, trace_id, goal_id) # 构建 evaluator prompt eval_prompt = _build_evaluate_prompt(goal_desc, messages) # 获取父 Trace 信息 parent_trace = await store.get_trace(trace_id) # 处理 continue_from 或创建新 Sub-Trace if continue_from: existing_trace = await store.get_trace(continue_from) if not existing_trace: return {"status": "failed", "error": f"Continue-from trace not found: {continue_from}"} sub_trace_id = continue_from goal_tree = await store.get_goal_tree(continue_from) mission = goal_tree.mission if goal_tree else eval_prompt sub_trace_ids = [{"trace_id": sub_trace_id, "mission": mission}] else: sub_trace_id = generate_sub_trace_id(trace_id, "evaluate") sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=eval_prompt, parent_trace_id=trace_id, parent_goal_id=current_goal_id, agent_type="evaluate", uid=parent_trace.uid if parent_trace else None, model=parent_trace.model if parent_trace else None, status="running", context={"created_by_tool": "evaluate"}, created_at=datetime.now(), ) await store.create_trace(sub_trace) await store.update_goal_tree(sub_trace_id, GoalTree(mission=eval_prompt)) sub_trace_ids = [{"trace_id": sub_trace_id, "mission": eval_prompt}] # 广播 sub_trace_started await broadcast_sub_trace_started( trace_id, sub_trace_id, current_goal_id or "", "evaluate", eval_prompt, ) # 更新主 Goal 为 in_progress await _update_goal_start(store, trace_id, current_goal_id, "evaluate", sub_trace_ids) # 注册为活跃协作者 eval_name = f"评估: {(goal_id or 'unknown')[:20]}" await _update_collaborator( store, trace_id, name=eval_name, sub_trace_id=sub_trace_id, status="running", summary=f"评估 Goal {goal_id}", ) # 执行评估 try: # evaluate 使用只读工具 + goal allowed_tools = ["read_file", "grep_content", "glob_files", "goal"] result = await runner.run_result( messages=[{"role": "user", "content": eval_prompt}], config=_make_run_config( trace_id=sub_trace_id, agent_type="evaluate", model=parent_trace.model if parent_trace else "gpt-4o", uid=parent_trace.uid if parent_trace else None, tools=allowed_tools, name=f"评估: {goal_id}", ), on_event=_make_event_printer("evaluate") if getattr(runner, 'debug', False) else None, ) await broadcast_sub_trace_completed( trace_id, sub_trace_id, result.get("status", "completed"), result.get("summary", ""), result.get("stats", {}), ) await _update_collaborator( store, trace_id, name=eval_name, sub_trace_id=sub_trace_id, status=result.get("status", "completed"), summary=result.get("summary", "")[:80], ) formatted_summary = result.get("summary", "") await _update_goal_complete( store, trace_id, current_goal_id, result.get("status", "completed"), formatted_summary, sub_trace_ids, ) return { "mode": "evaluate", "sub_trace_id": sub_trace_id, "continue_from": bool(continue_from), **result, "summary": formatted_summary, } except Exception as e: error_msg = str(e) await broadcast_sub_trace_completed( trace_id, sub_trace_id, "failed", error_msg, {}, ) await _update_collaborator( store, trace_id, name=eval_name, sub_trace_id=sub_trace_id, status="failed", summary=error_msg[:80], ) await _update_goal_complete( store, trace_id, current_goal_id, "failed", f"评估任务失败: {error_msg}", sub_trace_ids, ) return { "mode": "evaluate", "status": "failed", "error": error_msg, "sub_trace_id": sub_trace_id, }