""" Step 树 Debug 输出 将 Step 树以完整格式输出到文件,便于开发调试。 使用方式: 1. 命令行实时查看: watch -n 0.5 cat .trace/tree.txt 2. VS Code 打开文件自动刷新: code .trace/tree.txt 3. 代码中使用: from agent.trace import dump_tree dump_tree(trace, steps) """ import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional # 默认输出路径 DEFAULT_DUMP_PATH = ".trace/tree.txt" DEFAULT_JSON_PATH = ".trace/tree.json" DEFAULT_MD_PATH = ".trace/tree.md" class StepTreeDumper: """Step 树 Debug 输出器""" def __init__(self, output_path: str = DEFAULT_DUMP_PATH): self.output_path = Path(output_path) self.output_path.parent.mkdir(parents=True, exist_ok=True) def dump( self, trace: Optional[Dict[str, Any]] = None, steps: Optional[List[Dict[str, Any]]] = None, title: str = "Step Tree Debug", ) -> str: """ 输出完整的树形结构到文件 Args: trace: Trace 字典(可选) steps: Step 字典列表 title: 输出标题 Returns: 输出的文本内容 """ lines = [] # 标题和时间 lines.append("=" * 60) lines.append(f" {title}") lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("=" * 60) lines.append("") # Trace 信息 if trace: lines.append("## Trace") lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}") lines.append(f" task: {trace.get('task', 'N/A')}") lines.append(f" status: {trace.get('status', 'N/A')}") lines.append(f" total_steps: {trace.get('total_steps', 0)}") lines.append(f" total_tokens: {trace.get('total_tokens', 0)}") lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}") lines.append("") # 统计摘要 if steps: lines.append("## Statistics") stats = self._calculate_statistics(steps) lines.append(f" Total steps: {stats['total']}") lines.append(f" By type:") for step_type, count in sorted(stats['by_type'].items()): lines.append(f" {step_type}: {count}") lines.append(f" By status:") for status, count in sorted(stats['by_status'].items()): lines.append(f" {status}: {count}") if stats['total_duration_ms'] > 0: lines.append(f" Total duration: {stats['total_duration_ms']}ms") if stats['total_tokens'] > 0: lines.append(f" Total tokens: {stats['total_tokens']}") if stats['total_cost'] > 0: lines.append(f" Total cost: ${stats['total_cost']:.4f}") lines.append("") # Step 树 if steps: lines.append("## Steps") lines.append("") # 构建树结构 tree = self._build_tree(steps) tree_output = self._render_tree(tree, steps) lines.append(tree_output) content = "\n".join(lines) # 写入文件 self.output_path.write_text(content, encoding="utf-8") return content def _calculate_statistics(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]: """计算统计信息""" stats = { 'total': len(steps), 'by_type': {}, 'by_status': {}, 'total_duration_ms': 0, 'total_tokens': 0, 'total_cost': 0.0, } for step in steps: # 按类型统计 step_type = step.get('step_type', 'unknown') stats['by_type'][step_type] = stats['by_type'].get(step_type, 0) + 1 # 按状态统计 status = step.get('status', 'unknown') stats['by_status'][status] = stats['by_status'].get(status, 0) + 1 # 累计指标 if step.get('duration_ms'): stats['total_duration_ms'] += step.get('duration_ms', 0) if step.get('tokens'): stats['total_tokens'] += step.get('tokens', 0) if step.get('cost'): stats['total_cost'] += step.get('cost', 0.0) return stats def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]: """构建父子关系映射""" # parent_id -> [child_ids] children: Dict[str, List[str]] = {"__root__": []} for step in steps: step_id = step.get("step_id", "") parent_id = step.get("parent_id") if parent_id is None: children["__root__"].append(step_id) else: if parent_id not in children: children[parent_id] = [] children[parent_id].append(step_id) return children def _render_tree( self, tree: Dict[str, List[str]], steps: List[Dict[str, Any]], parent_id: str = "__root__", indent: int = 0, ) -> str: """递归渲染树结构""" # step_id -> step 映射 step_map = {s.get("step_id"): s for s in steps} lines = [] child_ids = tree.get(parent_id, []) for i, step_id in enumerate(child_ids): step = step_map.get(step_id, {}) is_last = i == len(child_ids) - 1 # 渲染当前节点 node_output = self._render_node(step, indent, is_last) lines.append(node_output) # 递归渲染子节点 if step_id in tree: child_output = self._render_tree(tree, steps, step_id, indent + 1) lines.append(child_output) return "\n".join(lines) def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str: """渲染单个节点的完整信息""" lines = [] # 缩进和连接符 prefix = " " * indent connector = "└── " if is_last else "├── " child_prefix = " " * indent + (" " if is_last else "│ ") # 状态图标 status = step.get("status", "unknown") status_icons = { "completed": "✓", "in_progress": "→", "planned": "○", "failed": "✗", "skipped": "⊘", "awaiting_approval": "⏸", } icon = status_icons.get(status, "?") # 类型和描述 step_type = step.get("step_type", "unknown") description = step.get("description", "") # 第一行:类型和描述 lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}") # 详细信息 step_id = step.get("step_id", "")[:8] # 只显示前 8 位 lines.append(f"{child_prefix}id: {step_id}...") # 关键字段:sequence, status, parent_id sequence = step.get("sequence") if sequence is not None: lines.append(f"{child_prefix}sequence: {sequence}") lines.append(f"{child_prefix}status: {status}") parent_id = step.get("parent_id") if parent_id: lines.append(f"{child_prefix}parent_id: {parent_id[:8]}...") # 执行指标 if step.get("duration_ms") is not None: lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms") if step.get("tokens") is not None: lines.append(f"{child_prefix}tokens: {step.get('tokens')}") if step.get("cost") is not None: lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}") # summary(如果有) if step.get("summary"): summary = step.get("summary", "") # 截断长 summary if len(summary) > 100: summary = summary[:100] + "..." lines.append(f"{child_prefix}summary: {summary}") # 错误信息(结构化显示) error = step.get("error") if error: lines.append(f"{child_prefix}error:") lines.append(f"{child_prefix} code: {error.get('code', 'UNKNOWN')}") error_msg = error.get('message', '') if len(error_msg) > 200: error_msg = error_msg[:200] + "..." lines.append(f"{child_prefix} message: {error_msg}") lines.append(f"{child_prefix} retryable: {error.get('retryable', True)}") # data 内容(格式化输出,更激进的截断) data = step.get("data", {}) if data: lines.append(f"{child_prefix}data:") data_lines = self._format_data(data, child_prefix + " ", max_value_len=150) lines.append(data_lines) # 时间 created_at = step.get("created_at", "") if created_at: if isinstance(created_at, str): # 只显示时间部分 time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at else: time_part = created_at.strftime("%H:%M:%S") lines.append(f"{child_prefix}time: {time_part}") lines.append("") # 空行分隔 return "\n".join(lines) def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 150) -> str: """格式化 data 字典(更激进的截断策略)""" lines = [] for key, value in data.items(): # 格式化值 if isinstance(value, str): # 检测图片数据 if value.startswith("data:image") or (len(value) > 10000 and not "\n" in value[:100]): lines.append(f"{prefix}{key}: [IMAGE_DATA: {len(value)} chars, truncated]") continue if len(value) > max_value_len: value_str = value[:max_value_len] + f"... ({len(value)} chars)" else: value_str = value # 处理多行字符串 if "\n" in value_str: first_line = value_str.split("\n")[0] line_count = value.count("\n") + 1 value_str = first_line + f"... ({line_count} lines)" elif isinstance(value, (dict, list)): value_str = json.dumps(value, ensure_ascii=False, indent=2) if len(value_str) > max_value_len: value_str = value_str[:max_value_len] + "..." # 缩进多行 value_str = value_str.replace("\n", "\n" + prefix + " ") else: value_str = str(value) lines.append(f"{prefix}{key}: {value_str}") return "\n".join(lines) def dump_markdown( self, trace: Optional[Dict[str, Any]] = None, steps: Optional[List[Dict[str, Any]]] = None, title: str = "Step Tree Debug", output_path: Optional[str] = None, ) -> str: """ 输出 Markdown 格式(支持折叠,完整内容) Args: trace: Trace 字典(可选) steps: Step 字典列表 title: 输出标题 output_path: 输出路径(默认 .trace/tree.md) Returns: 输出的 Markdown 内容 """ lines = [] # 标题 lines.append(f"# {title}") lines.append("") lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*") lines.append("") # Trace 信息 if trace: lines.append("## Trace") lines.append("") lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`") lines.append(f"- **task**: {trace.get('task', 'N/A')}") lines.append(f"- **status**: {trace.get('status', 'N/A')}") lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}") lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}") lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}") lines.append("") # 统计摘要 if steps: lines.append("## Statistics") lines.append("") stats = self._calculate_statistics(steps) lines.append(f"- **Total steps**: {stats['total']}") lines.append("") lines.append("**By type:**") lines.append("") for step_type, count in sorted(stats['by_type'].items()): lines.append(f"- `{step_type}`: {count}") lines.append("") lines.append("**By status:**") lines.append("") for status, count in sorted(stats['by_status'].items()): lines.append(f"- `{status}`: {count}") lines.append("") if stats['total_duration_ms'] > 0: lines.append(f"- **Total duration**: {stats['total_duration_ms']}ms") if stats['total_tokens'] > 0: lines.append(f"- **Total tokens**: {stats['total_tokens']}") if stats['total_cost'] > 0: lines.append(f"- **Total cost**: ${stats['total_cost']:.4f}") lines.append("") # Steps if steps: lines.append("## Steps") lines.append("") # 构建树并渲染为 Markdown tree = self._build_tree(steps) step_map = {s.get("step_id"): s for s in steps} md_output = self._render_markdown_tree(tree, step_map, level=3) lines.append(md_output) content = "\n".join(lines) # 写入文件 if output_path is None: output_path = str(self.output_path).replace(".txt", ".md") Path(output_path).write_text(content, encoding="utf-8") return content def _render_markdown_tree( self, tree: Dict[str, List[str]], step_map: Dict[str, Dict[str, Any]], parent_id: str = "__root__", level: int = 3, ) -> str: """递归渲染 Markdown 树""" lines = [] child_ids = tree.get(parent_id, []) for step_id in child_ids: step = step_map.get(step_id, {}) # 渲染节点 node_md = self._render_markdown_node(step, level) lines.append(node_md) # 递归子节点 if step_id in tree: child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1) lines.append(child_md) return "\n".join(lines) def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str: """渲染单个节点的 Markdown""" lines = [] # 标题 status = step.get("status", "unknown") status_icons = { "completed": "✓", "in_progress": "→", "planned": "○", "failed": "✗", "skipped": "⊘", "awaiting_approval": "⏸", } icon = status_icons.get(status, "?") step_type = step.get("step_type", "unknown") description = step.get("description", "") heading = "#" * level lines.append(f"{heading} [{icon}] {step_type}: {description}") lines.append("") # 基本信息 lines.append("**基本信息**") lines.append("") step_id = step.get("step_id", "")[:16] lines.append(f"- **id**: `{step_id}...`") # 关键字段 sequence = step.get("sequence") if sequence is not None: lines.append(f"- **sequence**: {sequence}") lines.append(f"- **status**: {status}") parent_id = step.get("parent_id") if parent_id: lines.append(f"- **parent_id**: `{parent_id[:16]}...`") # 执行指标 if step.get("duration_ms") is not None: lines.append(f"- **duration**: {step.get('duration_ms')}ms") if step.get("tokens") is not None: lines.append(f"- **tokens**: {step.get('tokens')}") if step.get("cost") is not None: lines.append(f"- **cost**: ${step.get('cost'):.4f}") created_at = step.get("created_at", "") if created_at: if isinstance(created_at, str): time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at else: time_part = created_at.strftime("%H:%M:%S") lines.append(f"- **time**: {time_part}") lines.append("") # 错误信息 error = step.get("error") if error: lines.append("
") lines.append("❌ Error") lines.append("") lines.append(f"- **code**: `{error.get('code', 'UNKNOWN')}`") lines.append(f"- **retryable**: {error.get('retryable', True)}") lines.append(f"- **message**:") lines.append("```") error_msg = error.get('message', '') if len(error_msg) > 500: error_msg = error_msg[:500] + "..." lines.append(error_msg) lines.append("```") lines.append("") lines.append("
") lines.append("") # Summary if step.get("summary"): lines.append("
") lines.append("📝 Summary") lines.append("") summary = step.get('summary', '') if len(summary) > 1000: summary = summary[:1000] + "..." lines.append(f"```\n{summary}\n```") lines.append("") lines.append("
") lines.append("") # Data(更激进的截断) data = step.get("data", {}) if data: lines.append(self._render_markdown_data(data)) lines.append("") return "\n".join(lines) def _render_markdown_data(self, data: Dict[str, Any]) -> str: """渲染 data 字典为可折叠的 Markdown""" lines = [] # 定义输出顺序(重要的放前面) key_order = ["messages", "tools", "response", "content", "tool_calls", "model"] # 先按顺序输出重要的 key remaining_keys = set(data.keys()) for key in key_order: if key in data: lines.append(self._render_data_item(key, data[key])) remaining_keys.remove(key) # 再输出剩余的 key for key in sorted(remaining_keys): lines.append(self._render_data_item(key, data[key])) return "\n".join(lines) def _render_data_item(self, key: str, value: Any) -> str: """渲染单个 data 项(更激进的截断)""" # 确定图标 icon_map = { "messages": "📨", "response": "🤖", "tools": "🛠️", "tool_calls": "🔧", "model": "🎯", "error": "❌", "content": "💬", "output": "📤", "arguments": "⚙️", } icon = icon_map.get(key, "📄") # 特殊处理:跳过 None 值 if value is None: return "" # 特殊处理 messages 中的图片引用 if key == 'messages' and isinstance(value, list): # 统计图片数量 image_count = 0 for msg in value: if isinstance(msg, dict): content = msg.get('content', []) if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get('type') == 'image_url': url = item.get('image_url', {}).get('url', '') if url.startswith('blob://'): image_count += 1 if image_count > 0: # 显示图片摘要 lines = [] lines.append("
") lines.append(f"📨 Messages (含 {image_count} 张图片)") lines.append("") lines.append("```json") # 渲染消息,图片显示为简化格式 simplified_messages = [] for msg in value: if isinstance(msg, dict): simplified_msg = msg.copy() content = msg.get('content', []) if isinstance(content, list): new_content = [] for item in content: if isinstance(item, dict) and item.get('type') == 'image_url': url = item.get('image_url', {}).get('url', '') if url.startswith('blob://'): blob_ref = url.replace('blob://', '') size = item.get('image_url', {}).get('size', 0) size_kb = size / 1024 if size > 0 else 0 new_content.append({ 'type': 'image_url', 'image_url': { 'url': f'[IMAGE: {blob_ref[:8]}... ({size_kb:.1f}KB)]' } }) else: new_content.append(item) else: new_content.append(item) simplified_msg['content'] = new_content simplified_messages.append(simplified_msg) else: simplified_messages.append(msg) lines.append(json.dumps(simplified_messages, ensure_ascii=False, indent=2)) lines.append("```") lines.append("") lines.append("
") return "\n".join(lines) # 判断是否需要折叠(长内容或复杂结构) needs_collapse = False if isinstance(value, str): needs_collapse = len(value) > 100 or "\n" in value elif isinstance(value, (dict, list)): needs_collapse = True if needs_collapse: lines = [] # 可折叠块 lines.append("
") lines.append(f"{icon} {key.capitalize()}") lines.append("") # 格式化内容(更激进的截断) if isinstance(value, str): # 检查是否包含图片 base64 if "data:image" in value or (isinstance(value, str) and len(value) > 10000 and not "\n" in value[:100]): lines.append("```") lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]") lines.append("```") elif len(value) > 2000: # 超长文本,只显示前500字符 lines.append("```") lines.append(value[:500]) lines.append(f"... (truncated, total {len(value)} chars)") lines.append("```") else: lines.append("```") lines.append(value) lines.append("```") elif isinstance(value, (dict, list)): # 递归截断图片 base64 truncated_value = self._truncate_image_data(value) json_str = json.dumps(truncated_value, ensure_ascii=False, indent=2) # 如果 JSON 太长,也截断 if len(json_str) > 3000: json_str = json_str[:3000] + "\n... (truncated)" lines.append("```json") lines.append(json_str) lines.append("```") lines.append("") lines.append("
") return "\n".join(lines) else: # 简单值,直接显示 return f"- **{icon} {key}**: `{value}`" def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any: """递归截断对象中的图片 base64 数据""" if isinstance(obj, dict): result = {} for key, value in obj.items(): # 检测图片 URL(data:image/...;base64,...) if isinstance(value, str) and value.startswith("data:image"): # 提取 MIME 类型和数据长度 header_end = value.find(",") if header_end > 0: header = value[:header_end] data = value[header_end+1:] data_size_kb = len(data) / 1024 result[key] = f"" else: result[key] = value[:max_length] + f"... ({len(value)} chars)" # 检测 blob 引用 elif isinstance(value, str) and value.startswith("blob://"): blob_ref = value.replace("blob://", "") result[key] = f"" else: result[key] = self._truncate_image_data(value, max_length) return result elif isinstance(obj, list): return [self._truncate_image_data(item, max_length) for item in obj] elif isinstance(obj, str) and len(obj) > 100000: # 超长字符串(可能是未检测到的 base64) return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)" else: return obj def dump_tree( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_DUMP_PATH, title: str = "Step Tree Debug", ) -> str: """ 便捷函数:输出 Step 树到文件 Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径 title: 输出标题 Returns: 输出的文本内容 示例: from agent.debug import dump_tree # 每次 step 变化后调用 dump_tree(trace, steps) # 自定义路径 dump_tree(trace, steps, output_path=".debug/my_trace.txt") """ # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) dumper = StepTreeDumper(output_path) return dumper.dump(trace_dict, steps_list, title) def dump_json( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_JSON_PATH, ) -> str: """ 输出完整的 JSON 格式(用于程序化分析) Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径 Returns: JSON 字符串 """ path = Path(output_path) path.parent.mkdir(parents=True, exist_ok=True) # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) data = { "generated_at": datetime.now().isoformat(), "trace": trace_dict, "steps": steps_list, } content = json.dumps(data, ensure_ascii=False, indent=2) path.write_text(content, encoding="utf-8") return content def dump_markdown( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_MD_PATH, title: str = "Step Tree Debug", ) -> str: """ 便捷函数:输出 Markdown 格式(支持折叠,完整内容) Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径(默认 .trace/tree.md) title: 输出标题 Returns: 输出的 Markdown 内容 示例: from agent.debug import dump_markdown # 输出完整可折叠的 Markdown dump_markdown(trace, steps) # 自定义路径 dump_markdown(trace, steps, output_path=".debug/debug.md") """ # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) dumper = StepTreeDumper(output_path) return dumper.dump_markdown(trace_dict, steps_list, title, output_path)