""" Step 树 Debug 输出 将 Step 树以完整格式输出到文件,便于开发调试。 使用方式: 1. 命令行实时查看: watch -n 0.5 cat .trace/tree.txt 2. VS Code 打开文件自动刷新: code .trace/tree.txt 3. 代码中使用: from agent.debug import dump_tree dump_tree(trace, steps) """ import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional # 默认输出路径 DEFAULT_DUMP_PATH = ".trace/tree.txt" DEFAULT_JSON_PATH = ".trace/tree.json" DEFAULT_MD_PATH = ".trace/tree.md" class StepTreeDumper: """Step 树 Debug 输出器""" def __init__(self, output_path: str = DEFAULT_DUMP_PATH): self.output_path = Path(output_path) self.output_path.parent.mkdir(parents=True, exist_ok=True) def dump( self, trace: Optional[Dict[str, Any]] = None, steps: Optional[List[Dict[str, Any]]] = None, title: str = "Step Tree Debug", ) -> str: """ 输出完整的树形结构到文件 Args: trace: Trace 字典(可选) steps: Step 字典列表 title: 输出标题 Returns: 输出的文本内容 """ lines = [] # 标题和时间 lines.append("=" * 60) lines.append(f" {title}") lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("=" * 60) lines.append("") # Trace 信息 if trace: lines.append("## Trace") lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}") lines.append(f" task: {trace.get('task', 'N/A')}") lines.append(f" status: {trace.get('status', 'N/A')}") lines.append(f" total_steps: {trace.get('total_steps', 0)}") lines.append(f" total_tokens: {trace.get('total_tokens', 0)}") lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}") lines.append("") # Step 树 if steps: lines.append("## Steps") lines.append("") # 构建树结构 tree = self._build_tree(steps) tree_output = self._render_tree(tree, steps) lines.append(tree_output) content = "\n".join(lines) # 写入文件 self.output_path.write_text(content, encoding="utf-8") return content def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]: """构建父子关系映射""" # parent_id -> [child_ids] children: Dict[str, List[str]] = {"__root__": []} for step in steps: step_id = step.get("step_id", "") parent_id = step.get("parent_id") if parent_id is None: children["__root__"].append(step_id) else: if parent_id not in children: children[parent_id] = [] children[parent_id].append(step_id) return children def _render_tree( self, tree: Dict[str, List[str]], steps: List[Dict[str, Any]], parent_id: str = "__root__", indent: int = 0, ) -> str: """递归渲染树结构""" # step_id -> step 映射 step_map = {s.get("step_id"): s for s in steps} lines = [] child_ids = tree.get(parent_id, []) for i, step_id in enumerate(child_ids): step = step_map.get(step_id, {}) is_last = i == len(child_ids) - 1 # 渲染当前节点 node_output = self._render_node(step, indent, is_last) lines.append(node_output) # 递归渲染子节点 if step_id in tree: child_output = self._render_tree(tree, steps, step_id, indent + 1) lines.append(child_output) return "\n".join(lines) def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str: """渲染单个节点的完整信息""" lines = [] # 缩进和连接符 prefix = " " * indent connector = "└── " if is_last else "├── " child_prefix = " " * indent + (" " if is_last else "│ ") # 状态图标 status = step.get("status", "unknown") status_icons = { "completed": "✓", "in_progress": "→", "planned": "○", "failed": "✗", "skipped": "⊘", } icon = status_icons.get(status, "?") # 类型和描述 step_type = step.get("step_type", "unknown") description = step.get("description", "") # 第一行:类型和描述 lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}") # 详细信息 step_id = step.get("step_id", "")[:8] # 只显示前 8 位 lines.append(f"{child_prefix}id: {step_id}...") # 执行指标 if step.get("duration_ms") is not None: lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms") if step.get("tokens") is not None: lines.append(f"{child_prefix}tokens: {step.get('tokens')}") if step.get("cost") is not None: lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}") # summary(如果有) if step.get("summary"): summary = step.get("summary", "") # 截断长 summary if len(summary) > 100: summary = summary[:100] + "..." lines.append(f"{child_prefix}summary: {summary}") # data 内容(格式化输出) data = step.get("data", {}) if data: lines.append(f"{child_prefix}data:") data_lines = self._format_data(data, child_prefix + " ") lines.append(data_lines) # 时间 created_at = step.get("created_at", "") if created_at: if isinstance(created_at, str): # 只显示时间部分 time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at else: time_part = created_at.strftime("%H:%M:%S") lines.append(f"{child_prefix}time: {time_part}") lines.append("") # 空行分隔 return "\n".join(lines) def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 200) -> str: """格式化 data 字典""" lines = [] for key, value in data.items(): # 格式化值 if isinstance(value, str): if len(value) > max_value_len: value_str = value[:max_value_len] + f"... ({len(value)} chars)" else: value_str = value # 处理多行字符串 if "\n" in value_str: first_line = value_str.split("\n")[0] value_str = first_line + f"... ({value_str.count(chr(10))+1} lines)" elif isinstance(value, (dict, list)): value_str = json.dumps(value, ensure_ascii=False, indent=2) if len(value_str) > max_value_len: value_str = value_str[:max_value_len] + "..." # 缩进多行 value_str = value_str.replace("\n", "\n" + prefix + " ") else: value_str = str(value) lines.append(f"{prefix}{key}: {value_str}") return "\n".join(lines) def dump_markdown( self, trace: Optional[Dict[str, Any]] = None, steps: Optional[List[Dict[str, Any]]] = None, title: str = "Step Tree Debug", output_path: Optional[str] = None, ) -> str: """ 输出 Markdown 格式(支持折叠,完整内容) Args: trace: Trace 字典(可选) steps: Step 字典列表 title: 输出标题 output_path: 输出路径(默认 .trace/tree.md) Returns: 输出的 Markdown 内容 """ lines = [] # 标题 lines.append(f"# {title}") lines.append("") lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*") lines.append("") # Trace 信息 if trace: lines.append("## Trace") lines.append("") lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`") lines.append(f"- **task**: {trace.get('task', 'N/A')}") lines.append(f"- **status**: {trace.get('status', 'N/A')}") lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}") lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}") lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}") lines.append("") # Steps if steps: lines.append("## Steps") lines.append("") # 构建树并渲染为 Markdown tree = self._build_tree(steps) step_map = {s.get("step_id"): s for s in steps} md_output = self._render_markdown_tree(tree, step_map, level=3) lines.append(md_output) content = "\n".join(lines) # 写入文件 if output_path is None: output_path = str(self.output_path).replace(".txt", ".md") Path(output_path).write_text(content, encoding="utf-8") return content def _render_markdown_tree( self, tree: Dict[str, List[str]], step_map: Dict[str, Dict[str, Any]], parent_id: str = "__root__", level: int = 3, ) -> str: """递归渲染 Markdown 树""" lines = [] child_ids = tree.get(parent_id, []) for step_id in child_ids: step = step_map.get(step_id, {}) # 渲染节点 node_md = self._render_markdown_node(step, level) lines.append(node_md) # 递归子节点 if step_id in tree: child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1) lines.append(child_md) return "\n".join(lines) def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str: """渲染单个节点的 Markdown""" lines = [] # 标题 status = step.get("status", "unknown") status_icons = { "completed": "✓", "in_progress": "→", "planned": "○", "failed": "✗", "skipped": "⊘", } icon = status_icons.get(status, "?") step_type = step.get("step_type", "unknown") description = step.get("description", "") heading = "#" * level lines.append(f"{heading} [{icon}] {step_type}: {description}") lines.append("") # 基本信息 lines.append("**基本信息**") lines.append("") step_id = step.get("step_id", "")[:16] lines.append(f"- **id**: `{step_id}...`") if step.get("duration_ms") is not None: lines.append(f"- **duration**: {step.get('duration_ms')}ms") if step.get("tokens") is not None: lines.append(f"- **tokens**: {step.get('tokens')}") if step.get("cost") is not None: lines.append(f"- **cost**: ${step.get('cost'):.4f}") created_at = step.get("created_at", "") if created_at: if isinstance(created_at, str): time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at else: time_part = created_at.strftime("%H:%M:%S") lines.append(f"- **time**: {time_part}") lines.append("") # Summary if step.get("summary"): lines.append("
") lines.append("📝 Summary") lines.append("") lines.append(f"```\n{step.get('summary')}\n```") lines.append("") lines.append("
") lines.append("") # Data(完整输出,不截断) data = step.get("data", {}) if data: lines.append(self._render_markdown_data(data)) lines.append("") return "\n".join(lines) def _render_markdown_data(self, data: Dict[str, Any]) -> str: """渲染 data 字典为可折叠的 Markdown""" lines = [] # 定义输出顺序(重要的放前面) key_order = ["messages", "tools", "response", "content", "tool_calls", "model"] # 先按顺序输出重要的 key remaining_keys = set(data.keys()) for key in key_order: if key in data: lines.append(self._render_data_item(key, data[key])) remaining_keys.remove(key) # 再输出剩余的 key for key in sorted(remaining_keys): lines.append(self._render_data_item(key, data[key])) return "\n".join(lines) def _render_data_item(self, key: str, value: Any) -> str: """渲染单个 data 项""" # 确定图标 icon_map = { "messages": "📨", "response": "🤖", "tools": "🛠️", "tool_calls": "🔧", "model": "🎯", "error": "❌", "content": "💬", } icon = icon_map.get(key, "📄") # 特殊处理:跳过 None 值 if value is None: return "" # 判断是否需要折叠(长内容或复杂结构) needs_collapse = False if isinstance(value, str): needs_collapse = len(value) > 100 or "\n" in value elif isinstance(value, (dict, list)): needs_collapse = True if needs_collapse: lines = [] # 可折叠块 lines.append("
") lines.append(f"{icon} {key.capitalize()}") lines.append("") # 格式化内容 if isinstance(value, str): # 检查是否包含图片 base64 if "data:image" in value or (isinstance(value, str) and len(value) > 10000): lines.append("```") lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]") lines.append(value[:200] + "...") lines.append("```") else: lines.append("```") lines.append(value) lines.append("```") elif isinstance(value, (dict, list)): # 递归截断图片 base64 truncated_value = self._truncate_image_data(value) lines.append("```json") lines.append(json.dumps(truncated_value, ensure_ascii=False, indent=2)) lines.append("```") lines.append("") lines.append("
") return "\n".join(lines) else: # 简单值,直接显示 return f"- **{icon} {key}**: `{value}`" def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any: """递归截断对象中的图片 base64 数据""" if isinstance(obj, dict): result = {} for key, value in obj.items(): # 检测图片 URL(data:image/...;base64,...) if isinstance(value, str) and value.startswith("data:image"): # 提取 MIME 类型和数据长度 header_end = value.find(",") if header_end > 0: header = value[:header_end] data = value[header_end+1:] data_size_kb = len(data) / 1024 result[key] = f"" else: result[key] = value[:max_length] + f"... ({len(value)} chars)" else: result[key] = self._truncate_image_data(value, max_length) return result elif isinstance(obj, list): return [self._truncate_image_data(item, max_length) for item in obj] elif isinstance(obj, str) and len(obj) > 100000: # 超长字符串(可能是未检测到的 base64) return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)" else: return obj def dump_tree( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_DUMP_PATH, title: str = "Step Tree Debug", ) -> str: """ 便捷函数:输出 Step 树到文件 Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径 title: 输出标题 Returns: 输出的文本内容 示例: from agent.debug import dump_tree # 每次 step 变化后调用 dump_tree(trace, steps) # 自定义路径 dump_tree(trace, steps, output_path=".debug/my_trace.txt") """ # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) dumper = StepTreeDumper(output_path) return dumper.dump(trace_dict, steps_list, title) def dump_json( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_JSON_PATH, ) -> str: """ 输出完整的 JSON 格式(用于程序化分析) Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径 Returns: JSON 字符串 """ path = Path(output_path) path.parent.mkdir(parents=True, exist_ok=True) # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) data = { "generated_at": datetime.now().isoformat(), "trace": trace_dict, "steps": steps_list, } content = json.dumps(data, ensure_ascii=False, indent=2) path.write_text(content, encoding="utf-8") return content def dump_markdown( trace: Optional[Any] = None, steps: Optional[List[Any]] = None, output_path: str = DEFAULT_MD_PATH, title: str = "Step Tree Debug", ) -> str: """ 便捷函数:输出 Markdown 格式(支持折叠,完整内容) Args: trace: Trace 对象或字典 steps: Step 对象或字典列表 output_path: 输出文件路径(默认 .trace/tree.md) title: 输出标题 Returns: 输出的 Markdown 内容 示例: from agent.debug import dump_markdown # 输出完整可折叠的 Markdown dump_markdown(trace, steps) # 自定义路径 dump_markdown(trace, steps, output_path=".debug/debug.md") """ # 转换为字典 trace_dict = None if trace is not None: trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace steps_list = [] if steps: for step in steps: if hasattr(step, "to_dict"): steps_list.append(step.to_dict()) else: steps_list.append(step) dumper = StepTreeDumper(output_path) return dumper.dump_markdown(trace_dict, steps_list, title, output_path)