| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825 |
- """
- Step 树 Debug 输出
- 将 Step 树以完整格式输出到文件,便于开发调试。
- 使用方式:
- 1. 命令行实时查看:
- watch -n 0.5 cat .trace/tree.txt
- 2. VS Code 打开文件自动刷新:
- code .trace/tree.txt
- 3. 代码中使用:
- from agent.trace import dump_tree
- dump_tree(trace, steps)
- """
- import json
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- # 默认输出路径
- DEFAULT_DUMP_PATH = ".trace/tree.txt"
- DEFAULT_JSON_PATH = ".trace/tree.json"
- DEFAULT_MD_PATH = ".trace/tree.md"
- class StepTreeDumper:
- """Step 树 Debug 输出器"""
- def __init__(self, output_path: str = DEFAULT_DUMP_PATH):
- self.output_path = Path(output_path)
- self.output_path.parent.mkdir(parents=True, exist_ok=True)
- def dump(
- self,
- trace: Optional[Dict[str, Any]] = None,
- steps: Optional[List[Dict[str, Any]]] = None,
- title: str = "Step Tree Debug",
- ) -> str:
- """
- 输出完整的树形结构到文件
- Args:
- trace: Trace 字典(可选)
- steps: Step 字典列表
- title: 输出标题
- Returns:
- 输出的文本内容
- """
- lines = []
- # 标题和时间
- lines.append("=" * 60)
- lines.append(f" {title}")
- lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- lines.append("=" * 60)
- lines.append("")
- # Trace 信息
- if trace:
- lines.append("## Trace")
- lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}")
- lines.append(f" task: {trace.get('task', 'N/A')}")
- lines.append(f" status: {trace.get('status', 'N/A')}")
- lines.append(f" total_steps: {trace.get('total_steps', 0)}")
- lines.append(f" total_tokens: {trace.get('total_tokens', 0)}")
- lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}")
- lines.append("")
- # 统计摘要
- if steps:
- lines.append("## Statistics")
- stats = self._calculate_statistics(steps)
- lines.append(f" Total steps: {stats['total']}")
- lines.append(f" By type:")
- for step_type, count in sorted(stats['by_type'].items()):
- lines.append(f" {step_type}: {count}")
- lines.append(f" By status:")
- for status, count in sorted(stats['by_status'].items()):
- lines.append(f" {status}: {count}")
- if stats['total_duration_ms'] > 0:
- lines.append(f" Total duration: {stats['total_duration_ms']}ms")
- if stats['total_tokens'] > 0:
- lines.append(f" Total tokens: {stats['total_tokens']}")
- if stats['total_cost'] > 0:
- lines.append(f" Total cost: ${stats['total_cost']:.4f}")
- lines.append("")
- # Step 树
- if steps:
- lines.append("## Steps")
- lines.append("")
- # 构建树结构
- tree = self._build_tree(steps)
- tree_output = self._render_tree(tree, steps)
- lines.append(tree_output)
- content = "\n".join(lines)
- # 写入文件
- self.output_path.write_text(content, encoding="utf-8")
- return content
- def _calculate_statistics(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]:
- """计算统计信息"""
- stats = {
- 'total': len(steps),
- 'by_type': {},
- 'by_status': {},
- 'total_duration_ms': 0,
- 'total_tokens': 0,
- 'total_cost': 0.0,
- }
- for step in steps:
- # 按类型统计
- step_type = step.get('step_type', 'unknown')
- stats['by_type'][step_type] = stats['by_type'].get(step_type, 0) + 1
- # 按状态统计
- status = step.get('status', 'unknown')
- stats['by_status'][status] = stats['by_status'].get(status, 0) + 1
- # 累计指标
- if step.get('duration_ms'):
- stats['total_duration_ms'] += step.get('duration_ms', 0)
- if step.get('tokens'):
- stats['total_tokens'] += step.get('tokens', 0)
- if step.get('cost'):
- stats['total_cost'] += step.get('cost', 0.0)
- return stats
- def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
- """构建父子关系映射"""
- # parent_id -> [child_ids]
- children: Dict[str, List[str]] = {"__root__": []}
- for step in steps:
- step_id = step.get("step_id", "")
- parent_id = step.get("parent_id")
- if parent_id is None:
- children["__root__"].append(step_id)
- else:
- if parent_id not in children:
- children[parent_id] = []
- children[parent_id].append(step_id)
- return children
- def _render_tree(
- self,
- tree: Dict[str, List[str]],
- steps: List[Dict[str, Any]],
- parent_id: str = "__root__",
- indent: int = 0,
- ) -> str:
- """递归渲染树结构"""
- # step_id -> step 映射
- step_map = {s.get("step_id"): s for s in steps}
- lines = []
- child_ids = tree.get(parent_id, [])
- for i, step_id in enumerate(child_ids):
- step = step_map.get(step_id, {})
- is_last = i == len(child_ids) - 1
- # 渲染当前节点
- node_output = self._render_node(step, indent, is_last)
- lines.append(node_output)
- # 递归渲染子节点
- if step_id in tree:
- child_output = self._render_tree(tree, steps, step_id, indent + 1)
- lines.append(child_output)
- return "\n".join(lines)
- def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str:
- """渲染单个节点的完整信息"""
- lines = []
- # 缩进和连接符
- prefix = " " * indent
- connector = "└── " if is_last else "├── "
- child_prefix = " " * indent + (" " if is_last else "│ ")
- # 状态图标
- status = step.get("status", "unknown")
- status_icons = {
- "completed": "✓",
- "in_progress": "→",
- "planned": "○",
- "failed": "✗",
- "skipped": "⊘",
- "awaiting_approval": "⏸",
- }
- icon = status_icons.get(status, "?")
- # 类型和描述
- step_type = step.get("step_type", "unknown")
- description = step.get("description", "")
- # 第一行:类型和描述
- lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}")
- # 详细信息
- step_id = step.get("step_id", "")[:8] # 只显示前 8 位
- lines.append(f"{child_prefix}id: {step_id}...")
- # 关键字段:sequence, status, parent_id
- sequence = step.get("sequence")
- if sequence is not None:
- lines.append(f"{child_prefix}sequence: {sequence}")
- lines.append(f"{child_prefix}status: {status}")
- parent_id = step.get("parent_id")
- if parent_id:
- lines.append(f"{child_prefix}parent_id: {parent_id[:8]}...")
- # 执行指标
- if step.get("duration_ms") is not None:
- lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
- if step.get("tokens") is not None:
- lines.append(f"{child_prefix}tokens: {step.get('tokens')}")
- if step.get("cost") is not None:
- lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}")
- # summary(如果有)
- if step.get("summary"):
- summary = step.get("summary", "")
- # 截断长 summary
- if len(summary) > 100:
- summary = summary[:100] + "..."
- lines.append(f"{child_prefix}summary: {summary}")
- # 错误信息(结构化显示)
- error = step.get("error")
- if error:
- lines.append(f"{child_prefix}error:")
- lines.append(f"{child_prefix} code: {error.get('code', 'UNKNOWN')}")
- error_msg = error.get('message', '')
- if len(error_msg) > 200:
- error_msg = error_msg[:200] + "..."
- lines.append(f"{child_prefix} message: {error_msg}")
- lines.append(f"{child_prefix} retryable: {error.get('retryable', True)}")
- # data 内容(格式化输出,更激进的截断)
- data = step.get("data", {})
- if data:
- lines.append(f"{child_prefix}data:")
- data_lines = self._format_data(data, child_prefix + " ", max_value_len=150)
- lines.append(data_lines)
- # 时间
- created_at = step.get("created_at", "")
- if created_at:
- if isinstance(created_at, str):
- # 只显示时间部分
- time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
- else:
- time_part = created_at.strftime("%H:%M:%S")
- lines.append(f"{child_prefix}time: {time_part}")
- lines.append("") # 空行分隔
- return "\n".join(lines)
- def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 150) -> str:
- """格式化 data 字典(更激进的截断策略)"""
- lines = []
- for key, value in data.items():
- # 格式化值
- if isinstance(value, str):
- # 检测图片数据
- if value.startswith("data:image") or (len(value) > 10000 and not "\n" in value[:100]):
- lines.append(f"{prefix}{key}: [IMAGE_DATA: {len(value)} chars, truncated]")
- continue
- if len(value) > max_value_len:
- value_str = value[:max_value_len] + f"... ({len(value)} chars)"
- else:
- value_str = value
- # 处理多行字符串
- if "\n" in value_str:
- first_line = value_str.split("\n")[0]
- line_count = value.count("\n") + 1
- value_str = first_line + f"... ({line_count} lines)"
- elif isinstance(value, (dict, list)):
- value_str = json.dumps(value, ensure_ascii=False, indent=2)
- if len(value_str) > max_value_len:
- value_str = value_str[:max_value_len] + "..."
- # 缩进多行
- value_str = value_str.replace("\n", "\n" + prefix + " ")
- else:
- value_str = str(value)
- lines.append(f"{prefix}{key}: {value_str}")
- return "\n".join(lines)
- def dump_markdown(
- self,
- trace: Optional[Dict[str, Any]] = None,
- steps: Optional[List[Dict[str, Any]]] = None,
- title: str = "Step Tree Debug",
- output_path: Optional[str] = None,
- ) -> str:
- """
- 输出 Markdown 格式(支持折叠,完整内容)
- Args:
- trace: Trace 字典(可选)
- steps: Step 字典列表
- title: 输出标题
- output_path: 输出路径(默认 .trace/tree.md)
- Returns:
- 输出的 Markdown 内容
- """
- lines = []
- # 标题
- lines.append(f"# {title}")
- lines.append("")
- lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
- lines.append("")
- # Trace 信息
- if trace:
- lines.append("## Trace")
- lines.append("")
- lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`")
- lines.append(f"- **task**: {trace.get('task', 'N/A')}")
- lines.append(f"- **status**: {trace.get('status', 'N/A')}")
- lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}")
- lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}")
- lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
- lines.append("")
- # 统计摘要
- if steps:
- lines.append("## Statistics")
- lines.append("")
- stats = self._calculate_statistics(steps)
- lines.append(f"- **Total steps**: {stats['total']}")
- lines.append("")
- lines.append("**By type:**")
- lines.append("")
- for step_type, count in sorted(stats['by_type'].items()):
- lines.append(f"- `{step_type}`: {count}")
- lines.append("")
- lines.append("**By status:**")
- lines.append("")
- for status, count in sorted(stats['by_status'].items()):
- lines.append(f"- `{status}`: {count}")
- lines.append("")
- if stats['total_duration_ms'] > 0:
- lines.append(f"- **Total duration**: {stats['total_duration_ms']}ms")
- if stats['total_tokens'] > 0:
- lines.append(f"- **Total tokens**: {stats['total_tokens']}")
- if stats['total_cost'] > 0:
- lines.append(f"- **Total cost**: ${stats['total_cost']:.4f}")
- lines.append("")
- # Steps
- if steps:
- lines.append("## Steps")
- lines.append("")
- # 构建树并渲染为 Markdown
- tree = self._build_tree(steps)
- step_map = {s.get("step_id"): s for s in steps}
- md_output = self._render_markdown_tree(tree, step_map, level=3)
- lines.append(md_output)
- content = "\n".join(lines)
- # 写入文件
- if output_path is None:
- output_path = str(self.output_path).replace(".txt", ".md")
- Path(output_path).write_text(content, encoding="utf-8")
- return content
- def _render_markdown_tree(
- self,
- tree: Dict[str, List[str]],
- step_map: Dict[str, Dict[str, Any]],
- parent_id: str = "__root__",
- level: int = 3,
- ) -> str:
- """递归渲染 Markdown 树"""
- lines = []
- child_ids = tree.get(parent_id, [])
- for step_id in child_ids:
- step = step_map.get(step_id, {})
- # 渲染节点
- node_md = self._render_markdown_node(step, level)
- lines.append(node_md)
- # 递归子节点
- if step_id in tree:
- child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1)
- lines.append(child_md)
- return "\n".join(lines)
- def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str:
- """渲染单个节点的 Markdown"""
- lines = []
- # 标题
- status = step.get("status", "unknown")
- status_icons = {
- "completed": "✓",
- "in_progress": "→",
- "planned": "○",
- "failed": "✗",
- "skipped": "⊘",
- "awaiting_approval": "⏸",
- }
- icon = status_icons.get(status, "?")
- step_type = step.get("step_type", "unknown")
- description = step.get("description", "")
- heading = "#" * level
- lines.append(f"{heading} [{icon}] {step_type}: {description}")
- lines.append("")
- # 基本信息
- lines.append("**基本信息**")
- lines.append("")
- step_id = step.get("step_id", "")[:16]
- lines.append(f"- **id**: `{step_id}...`")
- # 关键字段
- sequence = step.get("sequence")
- if sequence is not None:
- lines.append(f"- **sequence**: {sequence}")
- lines.append(f"- **status**: {status}")
- parent_id = step.get("parent_id")
- if parent_id:
- lines.append(f"- **parent_id**: `{parent_id[:16]}...`")
- # 执行指标
- if step.get("duration_ms") is not None:
- lines.append(f"- **duration**: {step.get('duration_ms')}ms")
- if step.get("tokens") is not None:
- lines.append(f"- **tokens**: {step.get('tokens')}")
- if step.get("cost") is not None:
- lines.append(f"- **cost**: ${step.get('cost'):.4f}")
- created_at = step.get("created_at", "")
- if created_at:
- if isinstance(created_at, str):
- time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
- else:
- time_part = created_at.strftime("%H:%M:%S")
- lines.append(f"- **time**: {time_part}")
- lines.append("")
- # 错误信息
- error = step.get("error")
- if error:
- lines.append("<details>")
- lines.append("<summary><b>❌ Error</b></summary>")
- lines.append("")
- lines.append(f"- **code**: `{error.get('code', 'UNKNOWN')}`")
- lines.append(f"- **retryable**: {error.get('retryable', True)}")
- lines.append(f"- **message**:")
- lines.append("```")
- error_msg = error.get('message', '')
- if len(error_msg) > 500:
- error_msg = error_msg[:500] + "..."
- lines.append(error_msg)
- lines.append("```")
- lines.append("")
- lines.append("</details>")
- lines.append("")
- # Summary
- if step.get("summary"):
- lines.append("<details>")
- lines.append("<summary><b>📝 Summary</b></summary>")
- lines.append("")
- summary = step.get('summary', '')
- if len(summary) > 1000:
- summary = summary[:1000] + "..."
- lines.append(f"```\n{summary}\n```")
- lines.append("")
- lines.append("</details>")
- lines.append("")
- # Data(更激进的截断)
- data = step.get("data", {})
- if data:
- lines.append(self._render_markdown_data(data))
- lines.append("")
- return "\n".join(lines)
- def _render_markdown_data(self, data: Dict[str, Any]) -> str:
- """渲染 data 字典为可折叠的 Markdown"""
- lines = []
- # 定义输出顺序(重要的放前面)
- key_order = ["messages", "tools", "response", "content", "tool_calls", "model"]
- # 先按顺序输出重要的 key
- remaining_keys = set(data.keys())
- for key in key_order:
- if key in data:
- lines.append(self._render_data_item(key, data[key]))
- remaining_keys.remove(key)
- # 再输出剩余的 key
- for key in sorted(remaining_keys):
- lines.append(self._render_data_item(key, data[key]))
- return "\n".join(lines)
- def _render_data_item(self, key: str, value: Any) -> str:
- """渲染单个 data 项(更激进的截断)"""
- # 确定图标
- icon_map = {
- "messages": "📨",
- "response": "🤖",
- "tools": "🛠️",
- "tool_calls": "🔧",
- "model": "🎯",
- "error": "❌",
- "content": "💬",
- "output": "📤",
- "arguments": "⚙️",
- }
- icon = icon_map.get(key, "📄")
- # 特殊处理:跳过 None 值
- if value is None:
- return ""
- # 特殊处理 messages 中的图片引用
- if key == 'messages' and isinstance(value, list):
- # 统计图片数量
- image_count = 0
- for msg in value:
- if isinstance(msg, dict):
- content = msg.get('content', [])
- if isinstance(content, list):
- for item in content:
- if isinstance(item, dict) and item.get('type') == 'image_url':
- url = item.get('image_url', {}).get('url', '')
- if url.startswith('blob://'):
- image_count += 1
- if image_count > 0:
- # 显示图片摘要
- lines = []
- lines.append("<details>")
- lines.append(f"<summary><b>📨 Messages (含 {image_count} 张图片)</b></summary>")
- lines.append("")
- lines.append("```json")
- # 渲染消息,图片显示为简化格式
- simplified_messages = []
- for msg in value:
- if isinstance(msg, dict):
- simplified_msg = msg.copy()
- content = msg.get('content', [])
- if isinstance(content, list):
- new_content = []
- for item in content:
- if isinstance(item, dict) and item.get('type') == 'image_url':
- url = item.get('image_url', {}).get('url', '')
- if url.startswith('blob://'):
- blob_ref = url.replace('blob://', '')
- size = item.get('image_url', {}).get('size', 0)
- size_kb = size / 1024 if size > 0 else 0
- new_content.append({
- 'type': 'image_url',
- 'image_url': {
- 'url': f'[IMAGE: {blob_ref[:8]}... ({size_kb:.1f}KB)]'
- }
- })
- else:
- new_content.append(item)
- else:
- new_content.append(item)
- simplified_msg['content'] = new_content
- simplified_messages.append(simplified_msg)
- else:
- simplified_messages.append(msg)
- lines.append(json.dumps(simplified_messages, ensure_ascii=False, indent=2))
- lines.append("```")
- lines.append("")
- lines.append("</details>")
- return "\n".join(lines)
- # 判断是否需要折叠(长内容或复杂结构)
- needs_collapse = False
- if isinstance(value, str):
- needs_collapse = len(value) > 100 or "\n" in value
- elif isinstance(value, (dict, list)):
- needs_collapse = True
- if needs_collapse:
- lines = []
- # 可折叠块
- lines.append("<details>")
- lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
- lines.append("")
- # 格式化内容(更激进的截断)
- if isinstance(value, str):
- # 检查是否包含图片 base64
- if "data:image" in value or (isinstance(value, str) and len(value) > 10000 and not "\n" in value[:100]):
- lines.append("```")
- lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
- lines.append("```")
- elif len(value) > 2000:
- # 超长文本,只显示前500字符
- lines.append("```")
- lines.append(value[:500])
- lines.append(f"... (truncated, total {len(value)} chars)")
- lines.append("```")
- else:
- lines.append("```")
- lines.append(value)
- lines.append("```")
- elif isinstance(value, (dict, list)):
- # 递归截断图片 base64
- truncated_value = self._truncate_image_data(value)
- json_str = json.dumps(truncated_value, ensure_ascii=False, indent=2)
- # 如果 JSON 太长,也截断
- if len(json_str) > 3000:
- json_str = json_str[:3000] + "\n... (truncated)"
- lines.append("```json")
- lines.append(json_str)
- lines.append("```")
- lines.append("")
- lines.append("</details>")
- return "\n".join(lines)
- else:
- # 简单值,直接显示
- return f"- **{icon} {key}**: `{value}`"
- def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any:
- """递归截断对象中的图片 base64 数据"""
- if isinstance(obj, dict):
- result = {}
- for key, value in obj.items():
- # 检测图片 URL(data:image/...;base64,...)
- if isinstance(value, str) and value.startswith("data:image"):
- # 提取 MIME 类型和数据长度
- header_end = value.find(",")
- if header_end > 0:
- header = value[:header_end]
- data = value[header_end+1:]
- data_size_kb = len(data) / 1024
- result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
- else:
- result[key] = value[:max_length] + f"... ({len(value)} chars)"
- # 检测 blob 引用
- elif isinstance(value, str) and value.startswith("blob://"):
- blob_ref = value.replace("blob://", "")
- result[key] = f"<BLOB_REF: {blob_ref[:8]}...>"
- else:
- result[key] = self._truncate_image_data(value, max_length)
- return result
- elif isinstance(obj, list):
- return [self._truncate_image_data(item, max_length) for item in obj]
- elif isinstance(obj, str) and len(obj) > 100000:
- # 超长字符串(可能是未检测到的 base64)
- return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)"
- else:
- return obj
- def dump_tree(
- trace: Optional[Any] = None,
- steps: Optional[List[Any]] = None,
- output_path: str = DEFAULT_DUMP_PATH,
- title: str = "Step Tree Debug",
- ) -> str:
- """
- 便捷函数:输出 Step 树到文件
- Args:
- trace: Trace 对象或字典
- steps: Step 对象或字典列表
- output_path: 输出文件路径
- title: 输出标题
- Returns:
- 输出的文本内容
- 示例:
- from agent.debug import dump_tree
- # 每次 step 变化后调用
- dump_tree(trace, steps)
- # 自定义路径
- dump_tree(trace, steps, output_path=".debug/my_trace.txt")
- """
- # 转换为字典
- trace_dict = None
- if trace is not None:
- trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
- steps_list = []
- if steps:
- for step in steps:
- if hasattr(step, "to_dict"):
- steps_list.append(step.to_dict())
- else:
- steps_list.append(step)
- dumper = StepTreeDumper(output_path)
- return dumper.dump(trace_dict, steps_list, title)
- def dump_json(
- trace: Optional[Any] = None,
- steps: Optional[List[Any]] = None,
- output_path: str = DEFAULT_JSON_PATH,
- ) -> str:
- """
- 输出完整的 JSON 格式(用于程序化分析)
- Args:
- trace: Trace 对象或字典
- steps: Step 对象或字典列表
- output_path: 输出文件路径
- Returns:
- JSON 字符串
- """
- path = Path(output_path)
- path.parent.mkdir(parents=True, exist_ok=True)
- # 转换为字典
- trace_dict = None
- if trace is not None:
- trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
- steps_list = []
- if steps:
- for step in steps:
- if hasattr(step, "to_dict"):
- steps_list.append(step.to_dict())
- else:
- steps_list.append(step)
- data = {
- "generated_at": datetime.now().isoformat(),
- "trace": trace_dict,
- "steps": steps_list,
- }
- content = json.dumps(data, ensure_ascii=False, indent=2)
- path.write_text(content, encoding="utf-8")
- return content
- def dump_markdown(
- trace: Optional[Any] = None,
- steps: Optional[List[Any]] = None,
- output_path: str = DEFAULT_MD_PATH,
- title: str = "Step Tree Debug",
- ) -> str:
- """
- 便捷函数:输出 Markdown 格式(支持折叠,完整内容)
- Args:
- trace: Trace 对象或字典
- steps: Step 对象或字典列表
- output_path: 输出文件路径(默认 .trace/tree.md)
- title: 输出标题
- Returns:
- 输出的 Markdown 内容
- 示例:
- from agent.debug import dump_markdown
- # 输出完整可折叠的 Markdown
- dump_markdown(trace, steps)
- # 自定义路径
- dump_markdown(trace, steps, output_path=".debug/debug.md")
- """
- # 转换为字典
- trace_dict = None
- if trace is not None:
- trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
- steps_list = []
- if steps:
- for step in steps:
- if hasattr(step, "to_dict"):
- steps_list.append(step.to_dict())
- else:
- steps_list.append(step)
- dumper = StepTreeDumper(output_path)
- return dumper.dump_markdown(trace_dict, steps_list, title, output_path)
|