"""
Step 树 Debug 输出
将 Step 树以完整格式输出到文件,便于开发调试。
使用方式:
1. 命令行实时查看:
watch -n 0.5 cat .trace/tree.txt
2. VS Code 打开文件自动刷新:
code .trace/tree.txt
3. 代码中使用:
from agent.execution import dump_tree
dump_tree(trace, steps)
"""
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
# 默认输出路径
DEFAULT_DUMP_PATH = ".trace/tree.txt"
DEFAULT_JSON_PATH = ".trace/tree.json"
DEFAULT_MD_PATH = ".trace/tree.md"
class StepTreeDumper:
"""Step 树 Debug 输出器"""
def __init__(self, output_path: str = DEFAULT_DUMP_PATH):
self.output_path = Path(output_path)
self.output_path.parent.mkdir(parents=True, exist_ok=True)
def dump(
self,
trace: Optional[Dict[str, Any]] = None,
steps: Optional[List[Dict[str, Any]]] = None,
title: str = "Step Tree Debug",
) -> str:
"""
输出完整的树形结构到文件
Args:
trace: Trace 字典(可选)
steps: Step 字典列表
title: 输出标题
Returns:
输出的文本内容
"""
lines = []
# 标题和时间
lines.append("=" * 60)
lines.append(f" {title}")
lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
lines.append("=" * 60)
lines.append("")
# Trace 信息
if trace:
lines.append("## Trace")
lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}")
lines.append(f" task: {trace.get('task', 'N/A')}")
lines.append(f" status: {trace.get('status', 'N/A')}")
lines.append(f" total_steps: {trace.get('total_steps', 0)}")
lines.append(f" total_tokens: {trace.get('total_tokens', 0)}")
lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}")
lines.append("")
# 统计摘要
if steps:
lines.append("## Statistics")
stats = self._calculate_statistics(steps)
lines.append(f" Total steps: {stats['total']}")
lines.append(f" By type:")
for step_type, count in sorted(stats['by_type'].items()):
lines.append(f" {step_type}: {count}")
lines.append(f" By status:")
for status, count in sorted(stats['by_status'].items()):
lines.append(f" {status}: {count}")
if stats['total_duration_ms'] > 0:
lines.append(f" Total duration: {stats['total_duration_ms']}ms")
if stats['total_tokens'] > 0:
lines.append(f" Total tokens: {stats['total_tokens']}")
if stats['total_cost'] > 0:
lines.append(f" Total cost: ${stats['total_cost']:.4f}")
lines.append("")
# Step 树
if steps:
lines.append("## Steps")
lines.append("")
# 构建树结构
tree = self._build_tree(steps)
tree_output = self._render_tree(tree, steps)
lines.append(tree_output)
content = "\n".join(lines)
# 写入文件
self.output_path.write_text(content, encoding="utf-8")
return content
def _calculate_statistics(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]:
"""计算统计信息"""
stats = {
'total': len(steps),
'by_type': {},
'by_status': {},
'total_duration_ms': 0,
'total_tokens': 0,
'total_cost': 0.0,
}
for step in steps:
# 按类型统计
step_type = step.get('step_type', 'unknown')
stats['by_type'][step_type] = stats['by_type'].get(step_type, 0) + 1
# 按状态统计
status = step.get('status', 'unknown')
stats['by_status'][status] = stats['by_status'].get(status, 0) + 1
# 累计指标
if step.get('duration_ms'):
stats['total_duration_ms'] += step.get('duration_ms', 0)
if step.get('tokens'):
stats['total_tokens'] += step.get('tokens', 0)
if step.get('cost'):
stats['total_cost'] += step.get('cost', 0.0)
return stats
def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""构建父子关系映射"""
# parent_id -> [child_ids]
children: Dict[str, List[str]] = {"__root__": []}
for step in steps:
step_id = step.get("step_id", "")
parent_id = step.get("parent_id")
if parent_id is None:
children["__root__"].append(step_id)
else:
if parent_id not in children:
children[parent_id] = []
children[parent_id].append(step_id)
return children
def _render_tree(
self,
tree: Dict[str, List[str]],
steps: List[Dict[str, Any]],
parent_id: str = "__root__",
indent: int = 0,
) -> str:
"""递归渲染树结构"""
# step_id -> step 映射
step_map = {s.get("step_id"): s for s in steps}
lines = []
child_ids = tree.get(parent_id, [])
for i, step_id in enumerate(child_ids):
step = step_map.get(step_id, {})
is_last = i == len(child_ids) - 1
# 渲染当前节点
node_output = self._render_node(step, indent, is_last)
lines.append(node_output)
# 递归渲染子节点
if step_id in tree:
child_output = self._render_tree(tree, steps, step_id, indent + 1)
lines.append(child_output)
return "\n".join(lines)
def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str:
"""渲染单个节点的完整信息"""
lines = []
# 缩进和连接符
prefix = " " * indent
connector = "└── " if is_last else "├── "
child_prefix = " " * indent + (" " if is_last else "│ ")
# 状态图标
status = step.get("status", "unknown")
status_icons = {
"completed": "✓",
"in_progress": "→",
"planned": "○",
"failed": "✗",
"skipped": "⊘",
"awaiting_approval": "⏸",
}
icon = status_icons.get(status, "?")
# 类型和描述
step_type = step.get("step_type", "unknown")
description = step.get("description", "")
# 第一行:类型和描述
lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}")
# 详细信息
step_id = step.get("step_id", "")[:8] # 只显示前 8 位
lines.append(f"{child_prefix}id: {step_id}...")
# 关键字段:sequence, status, parent_id
sequence = step.get("sequence")
if sequence is not None:
lines.append(f"{child_prefix}sequence: {sequence}")
lines.append(f"{child_prefix}status: {status}")
parent_id = step.get("parent_id")
if parent_id:
lines.append(f"{child_prefix}parent_id: {parent_id[:8]}...")
# 执行指标
if step.get("duration_ms") is not None:
lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
if step.get("tokens") is not None:
lines.append(f"{child_prefix}tokens: {step.get('tokens')}")
if step.get("cost") is not None:
lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}")
# summary(如果有)
if step.get("summary"):
summary = step.get("summary", "")
# 截断长 summary
if len(summary) > 100:
summary = summary[:100] + "..."
lines.append(f"{child_prefix}summary: {summary}")
# 错误信息(结构化显示)
error = step.get("error")
if error:
lines.append(f"{child_prefix}error:")
lines.append(f"{child_prefix} code: {error.get('code', 'UNKNOWN')}")
error_msg = error.get('message', '')
if len(error_msg) > 200:
error_msg = error_msg[:200] + "..."
lines.append(f"{child_prefix} message: {error_msg}")
lines.append(f"{child_prefix} retryable: {error.get('retryable', True)}")
# data 内容(格式化输出,更激进的截断)
data = step.get("data", {})
if data:
lines.append(f"{child_prefix}data:")
data_lines = self._format_data(data, child_prefix + " ", max_value_len=150)
lines.append(data_lines)
# 时间
created_at = step.get("created_at", "")
if created_at:
if isinstance(created_at, str):
# 只显示时间部分
time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
else:
time_part = created_at.strftime("%H:%M:%S")
lines.append(f"{child_prefix}time: {time_part}")
lines.append("") # 空行分隔
return "\n".join(lines)
def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 150) -> str:
"""格式化 data 字典(更激进的截断策略)"""
lines = []
for key, value in data.items():
# 格式化值
if isinstance(value, str):
# 检测图片数据
if value.startswith("data:image") or (len(value) > 10000 and not "\n" in value[:100]):
lines.append(f"{prefix}{key}: [IMAGE_DATA: {len(value)} chars, truncated]")
continue
if len(value) > max_value_len:
value_str = value[:max_value_len] + f"... ({len(value)} chars)"
else:
value_str = value
# 处理多行字符串
if "\n" in value_str:
first_line = value_str.split("\n")[0]
line_count = value.count("\n") + 1
value_str = first_line + f"... ({line_count} lines)"
elif isinstance(value, (dict, list)):
value_str = json.dumps(value, ensure_ascii=False, indent=2)
if len(value_str) > max_value_len:
value_str = value_str[:max_value_len] + "..."
# 缩进多行
value_str = value_str.replace("\n", "\n" + prefix + " ")
else:
value_str = str(value)
lines.append(f"{prefix}{key}: {value_str}")
return "\n".join(lines)
def dump_markdown(
self,
trace: Optional[Dict[str, Any]] = None,
steps: Optional[List[Dict[str, Any]]] = None,
title: str = "Step Tree Debug",
output_path: Optional[str] = None,
) -> str:
"""
输出 Markdown 格式(支持折叠,完整内容)
Args:
trace: Trace 字典(可选)
steps: Step 字典列表
title: 输出标题
output_path: 输出路径(默认 .trace/tree.md)
Returns:
输出的 Markdown 内容
"""
lines = []
# 标题
lines.append(f"# {title}")
lines.append("")
lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
lines.append("")
# Trace 信息
if trace:
lines.append("## Trace")
lines.append("")
lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`")
lines.append(f"- **task**: {trace.get('task', 'N/A')}")
lines.append(f"- **status**: {trace.get('status', 'N/A')}")
lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}")
lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}")
lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
lines.append("")
# 统计摘要
if steps:
lines.append("## Statistics")
lines.append("")
stats = self._calculate_statistics(steps)
lines.append(f"- **Total steps**: {stats['total']}")
lines.append("")
lines.append("**By type:**")
lines.append("")
for step_type, count in sorted(stats['by_type'].items()):
lines.append(f"- `{step_type}`: {count}")
lines.append("")
lines.append("**By status:**")
lines.append("")
for status, count in sorted(stats['by_status'].items()):
lines.append(f"- `{status}`: {count}")
lines.append("")
if stats['total_duration_ms'] > 0:
lines.append(f"- **Total duration**: {stats['total_duration_ms']}ms")
if stats['total_tokens'] > 0:
lines.append(f"- **Total tokens**: {stats['total_tokens']}")
if stats['total_cost'] > 0:
lines.append(f"- **Total cost**: ${stats['total_cost']:.4f}")
lines.append("")
# Steps
if steps:
lines.append("## Steps")
lines.append("")
# 构建树并渲染为 Markdown
tree = self._build_tree(steps)
step_map = {s.get("step_id"): s for s in steps}
md_output = self._render_markdown_tree(tree, step_map, level=3)
lines.append(md_output)
content = "\n".join(lines)
# 写入文件
if output_path is None:
output_path = str(self.output_path).replace(".txt", ".md")
Path(output_path).write_text(content, encoding="utf-8")
return content
def _render_markdown_tree(
self,
tree: Dict[str, List[str]],
step_map: Dict[str, Dict[str, Any]],
parent_id: str = "__root__",
level: int = 3,
) -> str:
"""递归渲染 Markdown 树"""
lines = []
child_ids = tree.get(parent_id, [])
for step_id in child_ids:
step = step_map.get(step_id, {})
# 渲染节点
node_md = self._render_markdown_node(step, level)
lines.append(node_md)
# 递归子节点
if step_id in tree:
child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1)
lines.append(child_md)
return "\n".join(lines)
def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str:
"""渲染单个节点的 Markdown"""
lines = []
# 标题
status = step.get("status", "unknown")
status_icons = {
"completed": "✓",
"in_progress": "→",
"planned": "○",
"failed": "✗",
"skipped": "⊘",
"awaiting_approval": "⏸",
}
icon = status_icons.get(status, "?")
step_type = step.get("step_type", "unknown")
description = step.get("description", "")
heading = "#" * level
lines.append(f"{heading} [{icon}] {step_type}: {description}")
lines.append("")
# 基本信息
lines.append("**基本信息**")
lines.append("")
step_id = step.get("step_id", "")[:16]
lines.append(f"- **id**: `{step_id}...`")
# 关键字段
sequence = step.get("sequence")
if sequence is not None:
lines.append(f"- **sequence**: {sequence}")
lines.append(f"- **status**: {status}")
parent_id = step.get("parent_id")
if parent_id:
lines.append(f"- **parent_id**: `{parent_id[:16]}...`")
# 执行指标
if step.get("duration_ms") is not None:
lines.append(f"- **duration**: {step.get('duration_ms')}ms")
if step.get("tokens") is not None:
lines.append(f"- **tokens**: {step.get('tokens')}")
if step.get("cost") is not None:
lines.append(f"- **cost**: ${step.get('cost'):.4f}")
created_at = step.get("created_at", "")
if created_at:
if isinstance(created_at, str):
time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
else:
time_part = created_at.strftime("%H:%M:%S")
lines.append(f"- **time**: {time_part}")
lines.append("")
# 错误信息
error = step.get("error")
if error:
lines.append("")
lines.append("❌ Error
")
lines.append("")
lines.append(f"- **code**: `{error.get('code', 'UNKNOWN')}`")
lines.append(f"- **retryable**: {error.get('retryable', True)}")
lines.append(f"- **message**:")
lines.append("```")
error_msg = error.get('message', '')
if len(error_msg) > 500:
error_msg = error_msg[:500] + "..."
lines.append(error_msg)
lines.append("```")
lines.append("")
lines.append(" ")
lines.append("")
# Summary
if step.get("summary"):
lines.append("")
lines.append("📝 Summary
")
lines.append("")
summary = step.get('summary', '')
if len(summary) > 1000:
summary = summary[:1000] + "..."
lines.append(f"```\n{summary}\n```")
lines.append("")
lines.append(" ")
lines.append("")
# Data(更激进的截断)
data = step.get("data", {})
if data:
lines.append(self._render_markdown_data(data))
lines.append("")
return "\n".join(lines)
def _render_markdown_data(self, data: Dict[str, Any]) -> str:
"""渲染 data 字典为可折叠的 Markdown"""
lines = []
# 定义输出顺序(重要的放前面)
key_order = ["messages", "tools", "response", "content", "tool_calls", "model"]
# 先按顺序输出重要的 key
remaining_keys = set(data.keys())
for key in key_order:
if key in data:
lines.append(self._render_data_item(key, data[key]))
remaining_keys.remove(key)
# 再输出剩余的 key
for key in sorted(remaining_keys):
lines.append(self._render_data_item(key, data[key]))
return "\n".join(lines)
def _render_data_item(self, key: str, value: Any) -> str:
"""渲染单个 data 项(更激进的截断)"""
# 确定图标
icon_map = {
"messages": "📨",
"response": "🤖",
"tools": "🛠️",
"tool_calls": "🔧",
"model": "🎯",
"error": "❌",
"content": "💬",
"output": "📤",
"arguments": "⚙️",
}
icon = icon_map.get(key, "📄")
# 特殊处理:跳过 None 值
if value is None:
return ""
# 特殊处理 messages 中的图片引用
if key == 'messages' and isinstance(value, list):
# 统计图片数量
image_count = 0
for msg in value:
if isinstance(msg, dict):
content = msg.get('content', [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get('type') == 'image_url':
url = item.get('image_url', {}).get('url', '')
if url.startswith('blob://'):
image_count += 1
if image_count > 0:
# 显示图片摘要
lines = []
lines.append("")
lines.append(f"📨 Messages (含 {image_count} 张图片)
")
lines.append("")
lines.append("```json")
# 渲染消息,图片显示为简化格式
simplified_messages = []
for msg in value:
if isinstance(msg, dict):
simplified_msg = msg.copy()
content = msg.get('content', [])
if isinstance(content, list):
new_content = []
for item in content:
if isinstance(item, dict) and item.get('type') == 'image_url':
url = item.get('image_url', {}).get('url', '')
if url.startswith('blob://'):
blob_ref = url.replace('blob://', '')
size = item.get('image_url', {}).get('size', 0)
size_kb = size / 1024 if size > 0 else 0
new_content.append({
'type': 'image_url',
'image_url': {
'url': f'[IMAGE: {blob_ref[:8]}... ({size_kb:.1f}KB)]'
}
})
else:
new_content.append(item)
else:
new_content.append(item)
simplified_msg['content'] = new_content
simplified_messages.append(simplified_msg)
else:
simplified_messages.append(msg)
lines.append(json.dumps(simplified_messages, ensure_ascii=False, indent=2))
lines.append("```")
lines.append("")
lines.append(" ")
return "\n".join(lines)
# 判断是否需要折叠(长内容或复杂结构)
needs_collapse = False
if isinstance(value, str):
needs_collapse = len(value) > 100 or "\n" in value
elif isinstance(value, (dict, list)):
needs_collapse = True
if needs_collapse:
lines = []
# 可折叠块
lines.append("")
lines.append(f"{icon} {key.capitalize()}
")
lines.append("")
# 格式化内容(更激进的截断)
if isinstance(value, str):
# 检查是否包含图片 base64
if "data:image" in value or (isinstance(value, str) and len(value) > 10000 and not "\n" in value[:100]):
lines.append("```")
lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
lines.append("```")
elif len(value) > 2000:
# 超长文本,只显示前500字符
lines.append("```")
lines.append(value[:500])
lines.append(f"... (truncated, total {len(value)} chars)")
lines.append("```")
else:
lines.append("```")
lines.append(value)
lines.append("```")
elif isinstance(value, (dict, list)):
# 递归截断图片 base64
truncated_value = self._truncate_image_data(value)
json_str = json.dumps(truncated_value, ensure_ascii=False, indent=2)
# 如果 JSON 太长,也截断
if len(json_str) > 3000:
json_str = json_str[:3000] + "\n... (truncated)"
lines.append("```json")
lines.append(json_str)
lines.append("```")
lines.append("")
lines.append(" ")
return "\n".join(lines)
else:
# 简单值,直接显示
return f"- **{icon} {key}**: `{value}`"
def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any:
"""递归截断对象中的图片 base64 数据"""
if isinstance(obj, dict):
result = {}
for key, value in obj.items():
# 检测图片 URL(data:image/...;base64,...)
if isinstance(value, str) and value.startswith("data:image"):
# 提取 MIME 类型和数据长度
header_end = value.find(",")
if header_end > 0:
header = value[:header_end]
data = value[header_end+1:]
data_size_kb = len(data) / 1024
result[key] = f""
else:
result[key] = value[:max_length] + f"... ({len(value)} chars)"
# 检测 blob 引用
elif isinstance(value, str) and value.startswith("blob://"):
blob_ref = value.replace("blob://", "")
result[key] = f""
else:
result[key] = self._truncate_image_data(value, max_length)
return result
elif isinstance(obj, list):
return [self._truncate_image_data(item, max_length) for item in obj]
elif isinstance(obj, str) and len(obj) > 100000:
# 超长字符串(可能是未检测到的 base64)
return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)"
else:
return obj
def dump_tree(
trace: Optional[Any] = None,
steps: Optional[List[Any]] = None,
output_path: str = DEFAULT_DUMP_PATH,
title: str = "Step Tree Debug",
) -> str:
"""
便捷函数:输出 Step 树到文件
Args:
trace: Trace 对象或字典
steps: Step 对象或字典列表
output_path: 输出文件路径
title: 输出标题
Returns:
输出的文本内容
示例:
from agent.debug import dump_tree
# 每次 step 变化后调用
dump_tree(trace, steps)
# 自定义路径
dump_tree(trace, steps, output_path=".debug/my_trace.txt")
"""
# 转换为字典
trace_dict = None
if trace is not None:
trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
steps_list = []
if steps:
for step in steps:
if hasattr(step, "to_dict"):
steps_list.append(step.to_dict())
else:
steps_list.append(step)
dumper = StepTreeDumper(output_path)
return dumper.dump(trace_dict, steps_list, title)
def dump_json(
trace: Optional[Any] = None,
steps: Optional[List[Any]] = None,
output_path: str = DEFAULT_JSON_PATH,
) -> str:
"""
输出完整的 JSON 格式(用于程序化分析)
Args:
trace: Trace 对象或字典
steps: Step 对象或字典列表
output_path: 输出文件路径
Returns:
JSON 字符串
"""
path = Path(output_path)
path.parent.mkdir(parents=True, exist_ok=True)
# 转换为字典
trace_dict = None
if trace is not None:
trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
steps_list = []
if steps:
for step in steps:
if hasattr(step, "to_dict"):
steps_list.append(step.to_dict())
else:
steps_list.append(step)
data = {
"generated_at": datetime.now().isoformat(),
"trace": trace_dict,
"steps": steps_list,
}
content = json.dumps(data, ensure_ascii=False, indent=2)
path.write_text(content, encoding="utf-8")
return content
def dump_markdown(
trace: Optional[Any] = None,
steps: Optional[List[Any]] = None,
output_path: str = DEFAULT_MD_PATH,
title: str = "Step Tree Debug",
) -> str:
"""
便捷函数:输出 Markdown 格式(支持折叠,完整内容)
Args:
trace: Trace 对象或字典
steps: Step 对象或字典列表
output_path: 输出文件路径(默认 .trace/tree.md)
title: 输出标题
Returns:
输出的 Markdown 内容
示例:
from agent.debug import dump_markdown
# 输出完整可折叠的 Markdown
dump_markdown(trace, steps)
# 自定义路径
dump_markdown(trace, steps, output_path=".debug/debug.md")
"""
# 转换为字典
trace_dict = None
if trace is not None:
trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
steps_list = []
if steps:
for step in steps:
if hasattr(step, "to_dict"):
steps_list.append(step.to_dict())
else:
steps_list.append(step)
dumper = StepTreeDumper(output_path)
return dumper.dump_markdown(trace_dict, steps_list, title, output_path)