|
|
@@ -23,6 +23,7 @@ from typing import Any, Dict, List, Optional
|
|
|
# 默认输出路径
|
|
|
DEFAULT_DUMP_PATH = ".trace/tree.txt"
|
|
|
DEFAULT_JSON_PATH = ".trace/tree.json"
|
|
|
+DEFAULT_MD_PATH = ".trace/tree.md"
|
|
|
|
|
|
|
|
|
class StepTreeDumper:
|
|
|
@@ -228,6 +229,258 @@ class StepTreeDumper:
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
+ def dump_markdown(
|
|
|
+ self,
|
|
|
+ trace: Optional[Dict[str, Any]] = None,
|
|
|
+ steps: Optional[List[Dict[str, Any]]] = None,
|
|
|
+ title: str = "Step Tree Debug",
|
|
|
+ output_path: Optional[str] = None,
|
|
|
+ ) -> str:
|
|
|
+ """
|
|
|
+ 输出 Markdown 格式(支持折叠,完整内容)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ trace: Trace 字典(可选)
|
|
|
+ steps: Step 字典列表
|
|
|
+ title: 输出标题
|
|
|
+ output_path: 输出路径(默认 .trace/tree.md)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 输出的 Markdown 内容
|
|
|
+ """
|
|
|
+ lines = []
|
|
|
+
|
|
|
+ # 标题
|
|
|
+ lines.append(f"# {title}")
|
|
|
+ lines.append("")
|
|
|
+ lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # Trace 信息
|
|
|
+ if trace:
|
|
|
+ lines.append("## Trace")
|
|
|
+ lines.append("")
|
|
|
+ lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`")
|
|
|
+ lines.append(f"- **task**: {trace.get('task', 'N/A')}")
|
|
|
+ lines.append(f"- **status**: {trace.get('status', 'N/A')}")
|
|
|
+ lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}")
|
|
|
+ lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}")
|
|
|
+ lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # Steps
|
|
|
+ if steps:
|
|
|
+ lines.append("## Steps")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # 构建树并渲染为 Markdown
|
|
|
+ tree = self._build_tree(steps)
|
|
|
+ step_map = {s.get("step_id"): s for s in steps}
|
|
|
+ md_output = self._render_markdown_tree(tree, step_map, level=3)
|
|
|
+ lines.append(md_output)
|
|
|
+
|
|
|
+ content = "\n".join(lines)
|
|
|
+
|
|
|
+ # 写入文件
|
|
|
+ if output_path is None:
|
|
|
+ output_path = str(self.output_path).replace(".txt", ".md")
|
|
|
+
|
|
|
+ Path(output_path).write_text(content, encoding="utf-8")
|
|
|
+ return content
|
|
|
+
|
|
|
+ def _render_markdown_tree(
|
|
|
+ self,
|
|
|
+ tree: Dict[str, List[str]],
|
|
|
+ step_map: Dict[str, Dict[str, Any]],
|
|
|
+ parent_id: str = "__root__",
|
|
|
+ level: int = 3,
|
|
|
+ ) -> str:
|
|
|
+ """递归渲染 Markdown 树"""
|
|
|
+ lines = []
|
|
|
+ child_ids = tree.get(parent_id, [])
|
|
|
+
|
|
|
+ for step_id in child_ids:
|
|
|
+ step = step_map.get(step_id, {})
|
|
|
+
|
|
|
+ # 渲染节点
|
|
|
+ node_md = self._render_markdown_node(step, level)
|
|
|
+ lines.append(node_md)
|
|
|
+
|
|
|
+ # 递归子节点
|
|
|
+ if step_id in tree:
|
|
|
+ child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1)
|
|
|
+ lines.append(child_md)
|
|
|
+
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str:
|
|
|
+ """渲染单个节点的 Markdown"""
|
|
|
+ lines = []
|
|
|
+
|
|
|
+ # 标题
|
|
|
+ status = step.get("status", "unknown")
|
|
|
+ status_icons = {
|
|
|
+ "completed": "✓",
|
|
|
+ "in_progress": "→",
|
|
|
+ "planned": "○",
|
|
|
+ "failed": "✗",
|
|
|
+ "skipped": "⊘",
|
|
|
+ }
|
|
|
+ icon = status_icons.get(status, "?")
|
|
|
+
|
|
|
+ step_type = step.get("step_type", "unknown")
|
|
|
+ description = step.get("description", "")
|
|
|
+ heading = "#" * level
|
|
|
+
|
|
|
+ lines.append(f"{heading} [{icon}] {step_type}: {description}")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # 基本信息
|
|
|
+ lines.append("**基本信息**")
|
|
|
+ lines.append("")
|
|
|
+ step_id = step.get("step_id", "")[:16]
|
|
|
+ lines.append(f"- **id**: `{step_id}...`")
|
|
|
+
|
|
|
+ if step.get("duration_ms") is not None:
|
|
|
+ lines.append(f"- **duration**: {step.get('duration_ms')}ms")
|
|
|
+ if step.get("tokens") is not None:
|
|
|
+ lines.append(f"- **tokens**: {step.get('tokens')}")
|
|
|
+ if step.get("cost") is not None:
|
|
|
+ lines.append(f"- **cost**: ${step.get('cost'):.4f}")
|
|
|
+
|
|
|
+ created_at = step.get("created_at", "")
|
|
|
+ if created_at:
|
|
|
+ if isinstance(created_at, str):
|
|
|
+ time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
|
|
|
+ else:
|
|
|
+ time_part = created_at.strftime("%H:%M:%S")
|
|
|
+ lines.append(f"- **time**: {time_part}")
|
|
|
+
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # Summary
|
|
|
+ if step.get("summary"):
|
|
|
+ lines.append("<details>")
|
|
|
+ lines.append("<summary><b>📝 Summary</b></summary>")
|
|
|
+ lines.append("")
|
|
|
+ lines.append(f"```\n{step.get('summary')}\n```")
|
|
|
+ lines.append("")
|
|
|
+ lines.append("</details>")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # Data(完整输出,不截断)
|
|
|
+ data = step.get("data", {})
|
|
|
+ if data:
|
|
|
+ lines.append(self._render_markdown_data(data))
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _render_markdown_data(self, data: Dict[str, Any]) -> str:
|
|
|
+ """渲染 data 字典为可折叠的 Markdown"""
|
|
|
+ lines = []
|
|
|
+
|
|
|
+ # 定义输出顺序(重要的放前面)
|
|
|
+ key_order = ["messages", "tools", "response", "content", "tool_calls", "model"]
|
|
|
+
|
|
|
+ # 先按顺序输出重要的 key
|
|
|
+ remaining_keys = set(data.keys())
|
|
|
+ for key in key_order:
|
|
|
+ if key in data:
|
|
|
+ lines.append(self._render_data_item(key, data[key]))
|
|
|
+ remaining_keys.remove(key)
|
|
|
+
|
|
|
+ # 再输出剩余的 key
|
|
|
+ for key in sorted(remaining_keys):
|
|
|
+ lines.append(self._render_data_item(key, data[key]))
|
|
|
+
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ def _render_data_item(self, key: str, value: Any) -> str:
|
|
|
+ """渲染单个 data 项"""
|
|
|
+ # 确定图标
|
|
|
+ icon_map = {
|
|
|
+ "messages": "📨",
|
|
|
+ "response": "🤖",
|
|
|
+ "tools": "🛠️",
|
|
|
+ "tool_calls": "🔧",
|
|
|
+ "model": "🎯",
|
|
|
+ "error": "❌",
|
|
|
+ "content": "💬",
|
|
|
+ }
|
|
|
+ icon = icon_map.get(key, "📄")
|
|
|
+
|
|
|
+ # 特殊处理:跳过 None 值
|
|
|
+ if value is None:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 判断是否需要折叠(长内容或复杂结构)
|
|
|
+ needs_collapse = False
|
|
|
+ if isinstance(value, str):
|
|
|
+ needs_collapse = len(value) > 100 or "\n" in value
|
|
|
+ elif isinstance(value, (dict, list)):
|
|
|
+ needs_collapse = True
|
|
|
+
|
|
|
+ if needs_collapse:
|
|
|
+ lines = []
|
|
|
+ # 可折叠块
|
|
|
+ lines.append("<details>")
|
|
|
+ lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
|
|
|
+ lines.append("")
|
|
|
+
|
|
|
+ # 格式化内容
|
|
|
+ if isinstance(value, str):
|
|
|
+ # 检查是否包含图片 base64
|
|
|
+ if "data:image" in value or (isinstance(value, str) and len(value) > 10000):
|
|
|
+ lines.append("```")
|
|
|
+ lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
|
|
|
+ lines.append(value[:200] + "...")
|
|
|
+ lines.append("```")
|
|
|
+ else:
|
|
|
+ lines.append("```")
|
|
|
+ lines.append(value)
|
|
|
+ lines.append("```")
|
|
|
+ elif isinstance(value, (dict, list)):
|
|
|
+ # 递归截断图片 base64
|
|
|
+ truncated_value = self._truncate_image_data(value)
|
|
|
+ lines.append("```json")
|
|
|
+ lines.append(json.dumps(truncated_value, ensure_ascii=False, indent=2))
|
|
|
+ lines.append("```")
|
|
|
+
|
|
|
+ lines.append("")
|
|
|
+ lines.append("</details>")
|
|
|
+ return "\n".join(lines)
|
|
|
+ else:
|
|
|
+ # 简单值,直接显示
|
|
|
+ return f"- **{icon} {key}**: `{value}`"
|
|
|
+
|
|
|
+ def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any:
|
|
|
+ """递归截断对象中的图片 base64 数据"""
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ result = {}
|
|
|
+ for key, value in obj.items():
|
|
|
+ # 检测图片 URL(data:image/...;base64,...)
|
|
|
+ if isinstance(value, str) and value.startswith("data:image"):
|
|
|
+ # 提取 MIME 类型和数据长度
|
|
|
+ header_end = value.find(",")
|
|
|
+ if header_end > 0:
|
|
|
+ header = value[:header_end]
|
|
|
+ data = value[header_end+1:]
|
|
|
+ data_size_kb = len(data) / 1024
|
|
|
+ result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
|
|
|
+ else:
|
|
|
+ result[key] = value[:max_length] + f"... ({len(value)} chars)"
|
|
|
+ else:
|
|
|
+ result[key] = self._truncate_image_data(value, max_length)
|
|
|
+ return result
|
|
|
+ elif isinstance(obj, list):
|
|
|
+ return [self._truncate_image_data(item, max_length) for item in obj]
|
|
|
+ elif isinstance(obj, str) and len(obj) > 100000:
|
|
|
+ # 超长字符串(可能是未检测到的 base64)
|
|
|
+ return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)"
|
|
|
+ else:
|
|
|
+ return obj
|
|
|
+
|
|
|
|
|
|
def dump_tree(
|
|
|
trace: Optional[Any] = None,
|
|
|
@@ -315,3 +568,47 @@ def dump_json(
|
|
|
path.write_text(content, encoding="utf-8")
|
|
|
|
|
|
return content
|
|
|
+
|
|
|
+
|
|
|
+def dump_markdown(
|
|
|
+ trace: Optional[Any] = None,
|
|
|
+ steps: Optional[List[Any]] = None,
|
|
|
+ output_path: str = DEFAULT_MD_PATH,
|
|
|
+ title: str = "Step Tree Debug",
|
|
|
+) -> str:
|
|
|
+ """
|
|
|
+ 便捷函数:输出 Markdown 格式(支持折叠,完整内容)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ trace: Trace 对象或字典
|
|
|
+ steps: Step 对象或字典列表
|
|
|
+ output_path: 输出文件路径(默认 .trace/tree.md)
|
|
|
+ title: 输出标题
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 输出的 Markdown 内容
|
|
|
+
|
|
|
+ 示例:
|
|
|
+ from agent.debug import dump_markdown
|
|
|
+
|
|
|
+ # 输出完整可折叠的 Markdown
|
|
|
+ dump_markdown(trace, steps)
|
|
|
+
|
|
|
+ # 自定义路径
|
|
|
+ dump_markdown(trace, steps, output_path=".debug/debug.md")
|
|
|
+ """
|
|
|
+ # 转换为字典
|
|
|
+ trace_dict = None
|
|
|
+ if trace is not None:
|
|
|
+ trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
|
|
|
+
|
|
|
+ steps_list = []
|
|
|
+ if steps:
|
|
|
+ for step in steps:
|
|
|
+ if hasattr(step, "to_dict"):
|
|
|
+ steps_list.append(step.to_dict())
|
|
|
+ else:
|
|
|
+ steps_list.append(step)
|
|
|
+
|
|
|
+ dumper = StepTreeDumper(output_path)
|
|
|
+ return dumper.dump_markdown(trace_dict, steps_list, title, output_path)
|