| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- """
- Explore 工具 - 并行探索多个方案
- 启动多个 Sub-Trace 并行执行不同的探索方向,汇总结果返回。
- """
- import asyncio
- from typing import List, Optional, Dict, Any
- from datetime import datetime
- from .models import Trace, Message
- from .trace_id import generate_sub_trace_id
- from .goal_models import Goal
- async def explore_tool(
- current_trace_id: str,
- current_goal_id: str,
- branches: List[str],
- background: Optional[str] = None,
- store=None,
- run_agent=None
- ) -> str:
- """
- 并行探索多个方向,汇总结果
- Args:
- current_trace_id: 当前主 Trace ID
- current_goal_id: 当前 Goal ID
- branches: 探索方向列表(每个元素是一个探索任务描述)
- background: 可选,背景信息(如果提供则用作各 Sub-Trace 的初始 context)
- store: TraceStore 实例
- run_agent: 运行 Agent 的函数
- Returns:
- 汇总结果字符串
- Example:
- >>> result = await explore_tool(
- ... current_trace_id="abc123",
- ... current_goal_id="2",
- ... branches=["JWT 方案", "Session 方案"],
- ... store=store,
- ... run_agent=run_agent_func
- ... )
- """
- if not store:
- raise ValueError("store parameter is required")
- if not run_agent:
- raise ValueError("run_agent parameter is required")
- # 1. 创建 agent_call Goal
- goal = Goal(
- id=current_goal_id,
- type="agent_call",
- description=f"并行探索 {len(branches)} 个方案",
- reason="探索多个可行方案",
- agent_call_mode="explore",
- sub_trace_ids=[],
- status="in_progress"
- )
- # 更新 Goal(标记为 agent_call)
- await store.update_goal(current_trace_id, current_goal_id,
- type="agent_call",
- agent_call_mode="explore",
- status="in_progress")
- # 2. 为每个分支创建 Sub-Trace
- sub_traces = []
- sub_trace_ids = []
- for i, desc in enumerate(branches):
- # 生成 Sub-Trace ID
- sub_trace_id = generate_sub_trace_id(current_trace_id, "explore")
- # 创建 Sub-Trace
- sub_trace = Trace(
- trace_id=sub_trace_id,
- mode="agent",
- task=desc,
- parent_trace_id=current_trace_id,
- parent_goal_id=current_goal_id,
- agent_type="explore",
- context={
- "allowed_tools": ["read", "grep", "glob"], # 探索模式:只读权限
- "max_turns": 20,
- "background": background
- },
- status="running",
- created_at=datetime.now()
- )
- # 保存 Sub-Trace
- await store.create_trace(sub_trace)
- sub_traces.append(sub_trace)
- sub_trace_ids.append(sub_trace_id)
- # 推送 sub_trace_started 事件
- await store.append_event(current_trace_id, "sub_trace_started", {
- "trace_id": sub_trace_id,
- "parent_trace_id": current_trace_id,
- "parent_goal_id": current_goal_id,
- "agent_type": "explore",
- "task": desc
- })
- # 更新主 Goal 的 sub_trace_ids
- await store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=sub_trace_ids)
- # 3. 并行执行所有 Sub-Traces
- results = await asyncio.gather(
- *[run_agent(st, background=background) for st in sub_traces],
- return_exceptions=True
- )
- # 4. 收集元数据并汇总结果
- sub_trace_metadata = {}
- summary_parts = ["## 探索结果\n"]
- for i, (sub_trace, result) in enumerate(zip(sub_traces, results), 1):
- branch_name = chr(ord('A') + i - 1) # A, B, C...
- if isinstance(result, Exception):
- # 处理异常情况
- summary_parts.append(f"### 方案 {branch_name}: {sub_trace.task}")
- summary_parts.append(f"⚠️ 执行出错: {str(result)}\n")
- sub_trace_metadata[sub_trace.trace_id] = {
- "task": sub_trace.task,
- "status": "failed",
- "summary": f"执行出错: {str(result)}",
- "last_message": None,
- "stats": {
- "message_count": 0,
- "total_tokens": 0,
- "total_cost": 0.0
- }
- }
- else:
- # 获取 Sub-Trace 的最终状态
- updated_trace = await store.get_trace(sub_trace.trace_id)
- # 获取最后一条 assistant 消息
- messages = await store.get_trace_messages(sub_trace.trace_id)
- last_message = None
- for msg in reversed(messages):
- if msg.role == "assistant":
- last_message = msg
- break
- # 构建元数据
- # 优先使用 result 中的 summary,否则使用最后一条消息的内容
- summary_text = None
- if isinstance(result, dict) and result.get("summary"):
- summary_text = result.get("summary")
- elif last_message and last_message.content:
- # 使用最后一条消息的内容作为 summary(截断至 200 字符)
- content_text = last_message.content
- if isinstance(content_text, dict) and "text" in content_text:
- content_text = content_text["text"]
- elif not isinstance(content_text, str):
- content_text = str(content_text)
- summary_text = content_text[:200] if content_text else "执行完成"
- else:
- summary_text = "执行完成"
- sub_trace_metadata[sub_trace.trace_id] = {
- "task": sub_trace.task,
- "status": updated_trace.status if updated_trace else "unknown",
- "summary": summary_text,
- "last_message": {
- "role": last_message.role,
- "description": last_message.description,
- "content": last_message.content[:500] if last_message.content else None,
- "created_at": last_message.created_at.isoformat()
- } if last_message else None,
- "stats": {
- "message_count": updated_trace.total_messages if updated_trace else 0,
- "total_tokens": updated_trace.total_tokens if updated_trace else 0,
- "total_cost": updated_trace.total_cost if updated_trace else 0.0
- }
- }
- # 组装摘要文本
- summary_parts.append(f"### 方案 {branch_name}: {sub_trace.task}")
- if updated_trace and updated_trace.status == "completed":
- summary_parts.append(f"{summary_text}\n")
- summary_parts.append(f"📊 统计: {updated_trace.total_messages} 条消息, "
- f"{updated_trace.total_tokens} tokens, "
- f"成本 ${updated_trace.total_cost:.4f}\n")
- else:
- summary_parts.append(f"未完成\n")
- # 推送 sub_trace_completed 事件
- await store.append_event(current_trace_id, "sub_trace_completed", {
- "trace_id": sub_trace.trace_id,
- "status": "completed" if not isinstance(result, Exception) else "failed",
- "summary": result.get("summary", "") if isinstance(result, dict) else ""
- })
- summary_parts.append("\n---")
- summary_parts.append(f"已完成 {len(branches)} 个方案的探索,请根据结果选择继续的方向。")
- summary = "\n".join(summary_parts)
- # 5. 完成主 Goal,保存元数据
- await store.update_goal(current_trace_id, current_goal_id,
- status="completed",
- summary=f"探索了 {len(branches)} 个方案",
- sub_trace_metadata=sub_trace_metadata)
- return summary
- def create_explore_tool_schema() -> Dict[str, Any]:
- """
- 创建 explore 工具的 JSON Schema
- Returns:
- 工具的 JSON Schema
- """
- return {
- "type": "function",
- "function": {
- "name": "explore",
- "description": "并行探索多个方向,汇总结果。用于需要对比多个方案或尝试不同实现方式的场景。",
- "parameters": {
- "type": "object",
- "properties": {
- "branches": {
- "type": "array",
- "items": {"type": "string"},
- "description": "探索方向列表,每个元素是一个探索任务的描述",
- "minItems": 2,
- "maxItems": 5
- },
- "background": {
- "type": "string",
- "description": "可选的背景信息,用于初始化各 Sub-Trace 的上下文"
- }
- },
- "required": ["branches"]
- }
- }
- }
|