""" Explore 工具 - 并行探索多个方案 启动多个 Sub-Trace 并行执行不同的探索方向,汇总结果返回。 """ import asyncio from typing import List, Optional, Dict, Any from datetime import datetime from .models import Trace, Message from .trace_id import generate_sub_trace_id from .goal_models import Goal async def explore_tool( current_trace_id: str, current_goal_id: str, branches: List[str], background: Optional[str] = None, store=None, run_agent=None ) -> str: """ 并行探索多个方向,汇总结果 Args: current_trace_id: 当前主 Trace ID current_goal_id: 当前 Goal ID branches: 探索方向列表(每个元素是一个探索任务描述) background: 可选,背景信息(如果提供则用作各 Sub-Trace 的初始 context) store: TraceStore 实例 run_agent: 运行 Agent 的函数 Returns: 汇总结果字符串 Example: >>> result = await explore_tool( ... current_trace_id="abc123", ... current_goal_id="2", ... branches=["JWT 方案", "Session 方案"], ... store=store, ... run_agent=run_agent_func ... ) """ if not store: raise ValueError("store parameter is required") if not run_agent: raise ValueError("run_agent parameter is required") # 1. 创建 agent_call Goal goal = Goal( id=current_goal_id, type="agent_call", description=f"并行探索 {len(branches)} 个方案", reason="探索多个可行方案", agent_call_mode="explore", sub_trace_ids=[], status="in_progress" ) # 更新 Goal(标记为 agent_call) await store.update_goal(current_trace_id, current_goal_id, type="agent_call", agent_call_mode="explore", status="in_progress") # 2. 为每个分支创建 Sub-Trace sub_traces = [] sub_trace_ids = [] for i, desc in enumerate(branches): # 生成 Sub-Trace ID sub_trace_id = generate_sub_trace_id(current_trace_id, "explore") # 创建 Sub-Trace sub_trace = Trace( trace_id=sub_trace_id, mode="agent", task=desc, parent_trace_id=current_trace_id, parent_goal_id=current_goal_id, agent_type="explore", context={ "allowed_tools": ["read", "grep", "glob"], # 探索模式:只读权限 "max_turns": 20, "background": background }, status="running", created_at=datetime.now() ) # 保存 Sub-Trace await store.create_trace(sub_trace) sub_traces.append(sub_trace) sub_trace_ids.append(sub_trace_id) # 推送 sub_trace_started 事件 await store.append_event(current_trace_id, "sub_trace_started", { "trace_id": sub_trace_id, "parent_trace_id": current_trace_id, "parent_goal_id": current_goal_id, "agent_type": "explore", "task": desc }) # 更新主 Goal 的 sub_trace_ids await store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=sub_trace_ids) # 3. 并行执行所有 Sub-Traces results = await asyncio.gather( *[run_agent(st, background=background) for st in sub_traces], return_exceptions=True ) # 4. 收集元数据并汇总结果 sub_trace_metadata = {} summary_parts = ["## 探索结果\n"] for i, (sub_trace, result) in enumerate(zip(sub_traces, results), 1): branch_name = chr(ord('A') + i - 1) # A, B, C... if isinstance(result, Exception): # 处理异常情况 summary_parts.append(f"### 方案 {branch_name}: {sub_trace.task}") summary_parts.append(f"⚠️ 执行出错: {str(result)}\n") sub_trace_metadata[sub_trace.trace_id] = { "task": sub_trace.task, "status": "failed", "summary": f"执行出错: {str(result)}", "last_message": None, "stats": { "message_count": 0, "total_tokens": 0, "total_cost": 0.0 } } else: # 获取 Sub-Trace 的最终状态 updated_trace = await store.get_trace(sub_trace.trace_id) # 获取最后一条 assistant 消息 messages = await store.get_trace_messages(sub_trace.trace_id) last_message = None for msg in reversed(messages): if msg.role == "assistant": last_message = msg break # 构建元数据 # 优先使用 result 中的 summary,否则使用最后一条消息的内容 summary_text = None if isinstance(result, dict) and result.get("summary"): summary_text = result.get("summary") elif last_message and last_message.content: # 使用最后一条消息的内容作为 summary(截断至 200 字符) content_text = last_message.content if isinstance(content_text, dict) and "text" in content_text: content_text = content_text["text"] elif not isinstance(content_text, str): content_text = str(content_text) summary_text = content_text[:200] if content_text else "执行完成" else: summary_text = "执行完成" sub_trace_metadata[sub_trace.trace_id] = { "task": sub_trace.task, "status": updated_trace.status if updated_trace else "unknown", "summary": summary_text, "last_message": { "role": last_message.role, "description": last_message.description, "content": last_message.content[:500] if last_message.content else None, "created_at": last_message.created_at.isoformat() } if last_message else None, "stats": { "message_count": updated_trace.total_messages if updated_trace else 0, "total_tokens": updated_trace.total_tokens if updated_trace else 0, "total_cost": updated_trace.total_cost if updated_trace else 0.0 } } # 组装摘要文本 summary_parts.append(f"### 方案 {branch_name}: {sub_trace.task}") if updated_trace and updated_trace.status == "completed": summary_parts.append(f"{summary_text}\n") summary_parts.append(f"📊 统计: {updated_trace.total_messages} 条消息, " f"{updated_trace.total_tokens} tokens, " f"成本 ${updated_trace.total_cost:.4f}\n") else: summary_parts.append(f"未完成\n") # 推送 sub_trace_completed 事件 await store.append_event(current_trace_id, "sub_trace_completed", { "trace_id": sub_trace.trace_id, "status": "completed" if not isinstance(result, Exception) else "failed", "summary": result.get("summary", "") if isinstance(result, dict) else "" }) summary_parts.append("\n---") summary_parts.append(f"已完成 {len(branches)} 个方案的探索,请根据结果选择继续的方向。") summary = "\n".join(summary_parts) # 5. 完成主 Goal,保存元数据 await store.update_goal(current_trace_id, current_goal_id, status="completed", summary=f"探索了 {len(branches)} 个方案", sub_trace_metadata=sub_trace_metadata) return summary def create_explore_tool_schema() -> Dict[str, Any]: """ 创建 explore 工具的 JSON Schema Returns: 工具的 JSON Schema """ return { "type": "function", "function": { "name": "explore", "description": "并行探索多个方向,汇总结果。用于需要对比多个方案或尝试不同实现方式的场景。", "parameters": { "type": "object", "properties": { "branches": { "type": "array", "items": {"type": "string"}, "description": "探索方向列表,每个元素是一个探索任务的描述", "minItems": 2, "maxItems": 5 }, "background": { "type": "string", "description": "可选的背景信息,用于初始化各 Sub-Trace 的上下文" } }, "required": ["branches"] } } }