| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- """
- Step 树 Debug 输出
- 将 Step 树以完整格式输出到文件,便于开发调试。
- 使用方式:
- 1. 命令行实时查看:
- watch -n 0.5 cat .trace/tree.txt
- 2. VS Code 打开文件自动刷新:
- code .trace/tree.txt
- 3. 代码中使用:
- from agent.debug import dump_tree
- dump_tree(trace, steps)
- """
- import json
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- # 默认输出路径
- DEFAULT_DUMP_PATH = ".trace/tree.txt"
- DEFAULT_JSON_PATH = ".trace/tree.json"
- class StepTreeDumper:
- """Step 树 Debug 输出器"""
- def __init__(self, output_path: str = DEFAULT_DUMP_PATH):
- self.output_path = Path(output_path)
- self.output_path.parent.mkdir(parents=True, exist_ok=True)
- def dump(
- self,
- trace: Optional[Dict[str, Any]] = None,
- steps: Optional[List[Dict[str, Any]]] = None,
- title: str = "Step Tree Debug",
- ) -> str:
- """
- 输出完整的树形结构到文件
- Args:
- trace: Trace 字典(可选)
- steps: Step 字典列表
- title: 输出标题
- Returns:
- 输出的文本内容
- """
- lines = []
- # 标题和时间
- lines.append("=" * 60)
- lines.append(f" {title}")
- lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- lines.append("=" * 60)
- lines.append("")
- # Trace 信息
- if trace:
- lines.append("## Trace")
- lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}")
- lines.append(f" task: {trace.get('task', 'N/A')}")
- lines.append(f" status: {trace.get('status', 'N/A')}")
- lines.append(f" total_steps: {trace.get('total_steps', 0)}")
- lines.append(f" total_tokens: {trace.get('total_tokens', 0)}")
- lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}")
- lines.append("")
- # Step 树
- if steps:
- lines.append("## Steps")
- lines.append("")
- # 构建树结构
- tree = self._build_tree(steps)
- tree_output = self._render_tree(tree, steps)
- lines.append(tree_output)
- content = "\n".join(lines)
- # 写入文件
- self.output_path.write_text(content, encoding="utf-8")
- return content
- def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
- """构建父子关系映射"""
- # parent_id -> [child_ids]
- children: Dict[str, List[str]] = {"__root__": []}
- for step in steps:
- step_id = step.get("step_id", "")
- parent_id = step.get("parent_id")
- if parent_id is None:
- children["__root__"].append(step_id)
- else:
- if parent_id not in children:
- children[parent_id] = []
- children[parent_id].append(step_id)
- return children
- def _render_tree(
- self,
- tree: Dict[str, List[str]],
- steps: List[Dict[str, Any]],
- parent_id: str = "__root__",
- indent: int = 0,
- ) -> str:
- """递归渲染树结构"""
- # step_id -> step 映射
- step_map = {s.get("step_id"): s for s in steps}
- lines = []
- child_ids = tree.get(parent_id, [])
- for i, step_id in enumerate(child_ids):
- step = step_map.get(step_id, {})
- is_last = i == len(child_ids) - 1
- # 渲染当前节点
- node_output = self._render_node(step, indent, is_last)
- lines.append(node_output)
- # 递归渲染子节点
- if step_id in tree:
- child_output = self._render_tree(tree, steps, step_id, indent + 1)
- lines.append(child_output)
- return "\n".join(lines)
- def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str:
- """渲染单个节点的完整信息"""
- lines = []
- # 缩进和连接符
- prefix = " " * indent
- connector = "└── " if is_last else "├── "
- child_prefix = " " * indent + (" " if is_last else "│ ")
- # 状态图标
- status = step.get("status", "unknown")
- status_icons = {
- "completed": "✓",
- "in_progress": "→",
- "planned": "○",
- "failed": "✗",
- "skipped": "⊘",
- }
- icon = status_icons.get(status, "?")
- # 类型和描述
- step_type = step.get("step_type", "unknown")
- description = step.get("description", "")
- # 第一行:类型和描述
- lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}")
- # 详细信息
- step_id = step.get("step_id", "")[:8] # 只显示前 8 位
- lines.append(f"{child_prefix}id: {step_id}...")
- # 执行指标
- if step.get("duration_ms") is not None:
- lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
- if step.get("tokens") is not None:
- lines.append(f"{child_prefix}tokens: {step.get('tokens')}")
- if step.get("cost") is not None:
- lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}")
- # summary(如果有)
- if step.get("summary"):
- summary = step.get("summary", "")
- # 截断长 summary
- if len(summary) > 100:
- summary = summary[:100] + "..."
- lines.append(f"{child_prefix}summary: {summary}")
- # data 内容(格式化输出)
- data = step.get("data", {})
- if data:
- lines.append(f"{child_prefix}data:")
- data_lines = self._format_data(data, child_prefix + " ")
- lines.append(data_lines)
- # 时间
- created_at = step.get("created_at", "")
- if created_at:
- if isinstance(created_at, str):
- # 只显示时间部分
- time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
- else:
- time_part = created_at.strftime("%H:%M:%S")
- lines.append(f"{child_prefix}time: {time_part}")
- lines.append("") # 空行分隔
- return "\n".join(lines)
- def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 200) -> str:
- """格式化 data 字典"""
- lines = []
- for key, value in data.items():
- # 格式化值
- if isinstance(value, str):
- if len(value) > max_value_len:
- value_str = value[:max_value_len] + f"... ({len(value)} chars)"
- else:
- value_str = value
- # 处理多行字符串
- if "\n" in value_str:
- first_line = value_str.split("\n")[0]
- value_str = first_line + f"... ({value_str.count(chr(10))+1} lines)"
- elif isinstance(value, (dict, list)):
- value_str = json.dumps(value, ensure_ascii=False, indent=2)
- if len(value_str) > max_value_len:
- value_str = value_str[:max_value_len] + "..."
- # 缩进多行
- value_str = value_str.replace("\n", "\n" + prefix + " ")
- else:
- value_str = str(value)
- lines.append(f"{prefix}{key}: {value_str}")
- return "\n".join(lines)
- def dump_tree(
- trace: Optional[Any] = None,
- steps: Optional[List[Any]] = None,
- output_path: str = DEFAULT_DUMP_PATH,
- title: str = "Step Tree Debug",
- ) -> str:
- """
- 便捷函数:输出 Step 树到文件
- Args:
- trace: Trace 对象或字典
- steps: Step 对象或字典列表
- output_path: 输出文件路径
- title: 输出标题
- Returns:
- 输出的文本内容
- 示例:
- from agent.debug import dump_tree
- # 每次 step 变化后调用
- dump_tree(trace, steps)
- # 自定义路径
- dump_tree(trace, steps, output_path=".debug/my_trace.txt")
- """
- # 转换为字典
- trace_dict = None
- if trace is not None:
- trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
- steps_list = []
- if steps:
- for step in steps:
- if hasattr(step, "to_dict"):
- steps_list.append(step.to_dict())
- else:
- steps_list.append(step)
- dumper = StepTreeDumper(output_path)
- return dumper.dump(trace_dict, steps_list, title)
- def dump_json(
- trace: Optional[Any] = None,
- steps: Optional[List[Any]] = None,
- output_path: str = DEFAULT_JSON_PATH,
- ) -> str:
- """
- 输出完整的 JSON 格式(用于程序化分析)
- Args:
- trace: Trace 对象或字典
- steps: Step 对象或字典列表
- output_path: 输出文件路径
- Returns:
- JSON 字符串
- """
- path = Path(output_path)
- path.parent.mkdir(parents=True, exist_ok=True)
- # 转换为字典
- trace_dict = None
- if trace is not None:
- trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
- steps_list = []
- if steps:
- for step in steps:
- if hasattr(step, "to_dict"):
- steps_list.append(step.to_dict())
- else:
- steps_list.append(step)
- data = {
- "generated_at": datetime.now().isoformat(),
- "trace": trace_dict,
- "steps": steps_list,
- }
- content = json.dumps(data, ensure_ascii=False, indent=2)
- path.write_text(content, encoding="utf-8")
- return content
|