""" Trace 树可视化工具 读取 trace 目录并生成树形结构的可视化输出 """ import json import sys from pathlib import Path from datetime import datetime def load_trace_meta(trace_dir): """加载 trace 的 meta.json""" meta_file = trace_dir / "meta.json" if not meta_file.exists(): return None with open(meta_file, 'r', encoding='utf-8') as f: return json.load(f) def format_duration(start_str, end_str): """计算并格式化持续时间""" if not start_str or not end_str: return "N/A" try: start = datetime.fromisoformat(start_str) end = datetime.fromisoformat(end_str) duration = (end - start).total_seconds() return f"{duration:.1f}s" except: return "N/A" def extract_mode_from_trace_id(trace_id): """从 trace_id 中提取模式""" if '@delegate-' in trace_id: return 'delegate' elif '@explore-' in trace_id: return 'explore' elif '@evaluate-' in trace_id: return 'evaluate' return 'main' def print_trace_tree(trace_base_path, output_file=None): """打印 trace 树结构""" trace_base = Path(trace_base_path) if not trace_base.exists(): print(f"错误: Trace 目录不存在: {trace_base}") return # 查找所有 trace 目录 all_traces = {} main_trace_id = None for trace_dir in sorted(trace_base.iterdir()): if not trace_dir.is_dir(): continue meta = load_trace_meta(trace_dir) if not meta: continue trace_id = meta['trace_id'] all_traces[trace_id] = { 'meta': meta, 'dir': trace_dir, 'children': [] } # 找到主 trace if meta.get('parent_trace_id') is None: main_trace_id = trace_id if not main_trace_id: print("错误: 未找到主 trace") return # 构建树结构 for trace_id, trace_info in all_traces.items(): parent_id = trace_info['meta'].get('parent_trace_id') if parent_id and parent_id in all_traces: all_traces[parent_id]['children'].append(trace_id) # 输出函数 def output(text): print(text) if output_file: output_file.write(text + '\n') # 打印树 output("=" * 80) output("Trace 执行树") output("=" * 80) output("") def print_node(trace_id, prefix="", is_last=True): trace_info = all_traces[trace_id] meta = trace_info['meta'] # 树形连接符 connector = "└── " if is_last else "├── " # 提取信息 mode = extract_mode_from_trace_id(trace_id) task = meta.get('task', 'N/A') if len(task) > 60: task = task[:60] + "..." status = meta.get('status', 'unknown') messages = meta.get('total_messages', 0) tokens = meta.get('total_tokens', 0) duration = format_duration( meta.get('created_at'), meta.get('completed_at') ) # 状态符号 status_symbol = { 'completed': '✓', 'failed': '✗', 'running': '⟳', }.get(status, '?') # 打印节点 output(f"{prefix}{connector}[{mode}] {status_symbol} {trace_id[:8]}") output(f"{prefix}{' ' if is_last else '│ '}Task: {task}") output(f"{prefix}{' ' if is_last else '│ '}Stats: {messages} msgs, {tokens:,} tokens, {duration}") # 打印子节点 children = trace_info['children'] for i, child_id in enumerate(children): is_last_child = (i == len(children) - 1) child_prefix = prefix + (" " if is_last else "│ ") print_node(child_id, child_prefix, is_last_child) # 从主 trace 开始打印 print_node(main_trace_id) output("") output("=" * 80) output("统计信息") output("=" * 80) # 统计各模式的数量 mode_counts = {} total_messages = 0 total_tokens = 0 for trace_info in all_traces.values(): meta = trace_info['meta'] mode = extract_mode_from_trace_id(meta['trace_id']) mode_counts[mode] = mode_counts.get(mode, 0) + 1 total_messages += meta.get('total_messages', 0) total_tokens += meta.get('total_tokens', 0) output(f"总 Trace 数: {len(all_traces)}") output(f" - main: {mode_counts.get('main', 0)}") output(f" - delegate: {mode_counts.get('delegate', 0)}") output(f" - explore: {mode_counts.get('explore', 0)}") output(f" - evaluate: {mode_counts.get('evaluate', 0)}") output(f"") output(f"总消息数: {total_messages}") output(f"总 Token 数: {total_tokens:,}") output("=" * 80) if __name__ == "__main__": if len(sys.argv) < 2: print("用法: python visualize_trace.py [output_file]") print("示例: python visualize_trace.py .trace") sys.exit(1) trace_dir = sys.argv[1] output_path = sys.argv[2] if len(sys.argv) > 2 else None output_file = None if output_path: output_file = open(output_path, 'w', encoding='utf-8') try: print_trace_tree(trace_dir, output_file) finally: if output_file: output_file.close() print(f"\n✓ 输出已保存到: {output_path}")