tree_dump.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. """
  2. Step 树 Debug 输出
  3. 将 Step 树以完整格式输出到文件,便于开发调试。
  4. 使用方式:
  5. 1. 命令行实时查看:
  6. watch -n 0.5 cat .trace/tree.txt
  7. 2. VS Code 打开文件自动刷新:
  8. code .trace/tree.txt
  9. 3. 代码中使用:
  10. from agent.debug import dump_tree
  11. dump_tree(trace, steps)
  12. """
  13. import json
  14. from datetime import datetime
  15. from pathlib import Path
  16. from typing import Any, Dict, List, Optional
  17. # 默认输出路径
  18. DEFAULT_DUMP_PATH = ".trace/tree.txt"
  19. DEFAULT_JSON_PATH = ".trace/tree.json"
  20. DEFAULT_MD_PATH = ".trace/tree.md"
  21. class StepTreeDumper:
  22. """Step 树 Debug 输出器"""
  23. def __init__(self, output_path: str = DEFAULT_DUMP_PATH):
  24. self.output_path = Path(output_path)
  25. self.output_path.parent.mkdir(parents=True, exist_ok=True)
  26. def dump(
  27. self,
  28. trace: Optional[Dict[str, Any]] = None,
  29. steps: Optional[List[Dict[str, Any]]] = None,
  30. title: str = "Step Tree Debug",
  31. ) -> str:
  32. """
  33. 输出完整的树形结构到文件
  34. Args:
  35. trace: Trace 字典(可选)
  36. steps: Step 字典列表
  37. title: 输出标题
  38. Returns:
  39. 输出的文本内容
  40. """
  41. lines = []
  42. # 标题和时间
  43. lines.append("=" * 60)
  44. lines.append(f" {title}")
  45. lines.append(f" Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  46. lines.append("=" * 60)
  47. lines.append("")
  48. # Trace 信息
  49. if trace:
  50. lines.append("## Trace")
  51. lines.append(f" trace_id: {trace.get('trace_id', 'N/A')}")
  52. lines.append(f" task: {trace.get('task', 'N/A')}")
  53. lines.append(f" status: {trace.get('status', 'N/A')}")
  54. lines.append(f" total_steps: {trace.get('total_steps', 0)}")
  55. lines.append(f" total_tokens: {trace.get('total_tokens', 0)}")
  56. lines.append(f" total_cost: {trace.get('total_cost', 0.0):.4f}")
  57. lines.append("")
  58. # Step 树
  59. if steps:
  60. lines.append("## Steps")
  61. lines.append("")
  62. # 构建树结构
  63. tree = self._build_tree(steps)
  64. tree_output = self._render_tree(tree, steps)
  65. lines.append(tree_output)
  66. content = "\n".join(lines)
  67. # 写入文件
  68. self.output_path.write_text(content, encoding="utf-8")
  69. return content
  70. def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
  71. """构建父子关系映射"""
  72. # parent_id -> [child_ids]
  73. children: Dict[str, List[str]] = {"__root__": []}
  74. for step in steps:
  75. step_id = step.get("step_id", "")
  76. parent_id = step.get("parent_id")
  77. if parent_id is None:
  78. children["__root__"].append(step_id)
  79. else:
  80. if parent_id not in children:
  81. children[parent_id] = []
  82. children[parent_id].append(step_id)
  83. return children
  84. def _render_tree(
  85. self,
  86. tree: Dict[str, List[str]],
  87. steps: List[Dict[str, Any]],
  88. parent_id: str = "__root__",
  89. indent: int = 0,
  90. ) -> str:
  91. """递归渲染树结构"""
  92. # step_id -> step 映射
  93. step_map = {s.get("step_id"): s for s in steps}
  94. lines = []
  95. child_ids = tree.get(parent_id, [])
  96. for i, step_id in enumerate(child_ids):
  97. step = step_map.get(step_id, {})
  98. is_last = i == len(child_ids) - 1
  99. # 渲染当前节点
  100. node_output = self._render_node(step, indent, is_last)
  101. lines.append(node_output)
  102. # 递归渲染子节点
  103. if step_id in tree:
  104. child_output = self._render_tree(tree, steps, step_id, indent + 1)
  105. lines.append(child_output)
  106. return "\n".join(lines)
  107. def _render_node(self, step: Dict[str, Any], indent: int, is_last: bool) -> str:
  108. """渲染单个节点的完整信息"""
  109. lines = []
  110. # 缩进和连接符
  111. prefix = " " * indent
  112. connector = "└── " if is_last else "├── "
  113. child_prefix = " " * indent + (" " if is_last else "│ ")
  114. # 状态图标
  115. status = step.get("status", "unknown")
  116. status_icons = {
  117. "completed": "✓",
  118. "in_progress": "→",
  119. "planned": "○",
  120. "failed": "✗",
  121. "skipped": "⊘",
  122. }
  123. icon = status_icons.get(status, "?")
  124. # 类型和描述
  125. step_type = step.get("step_type", "unknown")
  126. description = step.get("description", "")
  127. # 第一行:类型和描述
  128. lines.append(f"{prefix}{connector}[{icon}] {step_type}: {description}")
  129. # 详细信息
  130. step_id = step.get("step_id", "")[:8] # 只显示前 8 位
  131. lines.append(f"{child_prefix}id: {step_id}...")
  132. # 执行指标
  133. if step.get("duration_ms") is not None:
  134. lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
  135. if step.get("tokens") is not None:
  136. lines.append(f"{child_prefix}tokens: {step.get('tokens')}")
  137. if step.get("cost") is not None:
  138. lines.append(f"{child_prefix}cost: ${step.get('cost'):.4f}")
  139. # summary(如果有)
  140. if step.get("summary"):
  141. summary = step.get("summary", "")
  142. # 截断长 summary
  143. if len(summary) > 100:
  144. summary = summary[:100] + "..."
  145. lines.append(f"{child_prefix}summary: {summary}")
  146. # data 内容(格式化输出)
  147. data = step.get("data", {})
  148. if data:
  149. lines.append(f"{child_prefix}data:")
  150. data_lines = self._format_data(data, child_prefix + " ")
  151. lines.append(data_lines)
  152. # 时间
  153. created_at = step.get("created_at", "")
  154. if created_at:
  155. if isinstance(created_at, str):
  156. # 只显示时间部分
  157. time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
  158. else:
  159. time_part = created_at.strftime("%H:%M:%S")
  160. lines.append(f"{child_prefix}time: {time_part}")
  161. lines.append("") # 空行分隔
  162. return "\n".join(lines)
  163. def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 200) -> str:
  164. """格式化 data 字典"""
  165. lines = []
  166. for key, value in data.items():
  167. # 格式化值
  168. if isinstance(value, str):
  169. if len(value) > max_value_len:
  170. value_str = value[:max_value_len] + f"... ({len(value)} chars)"
  171. else:
  172. value_str = value
  173. # 处理多行字符串
  174. if "\n" in value_str:
  175. first_line = value_str.split("\n")[0]
  176. value_str = first_line + f"... ({value_str.count(chr(10))+1} lines)"
  177. elif isinstance(value, (dict, list)):
  178. value_str = json.dumps(value, ensure_ascii=False, indent=2)
  179. if len(value_str) > max_value_len:
  180. value_str = value_str[:max_value_len] + "..."
  181. # 缩进多行
  182. value_str = value_str.replace("\n", "\n" + prefix + " ")
  183. else:
  184. value_str = str(value)
  185. lines.append(f"{prefix}{key}: {value_str}")
  186. return "\n".join(lines)
  187. def dump_markdown(
  188. self,
  189. trace: Optional[Dict[str, Any]] = None,
  190. steps: Optional[List[Dict[str, Any]]] = None,
  191. title: str = "Step Tree Debug",
  192. output_path: Optional[str] = None,
  193. ) -> str:
  194. """
  195. 输出 Markdown 格式(支持折叠,完整内容)
  196. Args:
  197. trace: Trace 字典(可选)
  198. steps: Step 字典列表
  199. title: 输出标题
  200. output_path: 输出路径(默认 .trace/tree.md)
  201. Returns:
  202. 输出的 Markdown 内容
  203. """
  204. lines = []
  205. # 标题
  206. lines.append(f"# {title}")
  207. lines.append("")
  208. lines.append(f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
  209. lines.append("")
  210. # Trace 信息
  211. if trace:
  212. lines.append("## Trace")
  213. lines.append("")
  214. lines.append(f"- **trace_id**: `{trace.get('trace_id', 'N/A')}`")
  215. lines.append(f"- **task**: {trace.get('task', 'N/A')}")
  216. lines.append(f"- **status**: {trace.get('status', 'N/A')}")
  217. lines.append(f"- **total_steps**: {trace.get('total_steps', 0)}")
  218. lines.append(f"- **total_tokens**: {trace.get('total_tokens', 0)}")
  219. lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
  220. lines.append("")
  221. # Steps
  222. if steps:
  223. lines.append("## Steps")
  224. lines.append("")
  225. # 构建树并渲染为 Markdown
  226. tree = self._build_tree(steps)
  227. step_map = {s.get("step_id"): s for s in steps}
  228. md_output = self._render_markdown_tree(tree, step_map, level=3)
  229. lines.append(md_output)
  230. content = "\n".join(lines)
  231. # 写入文件
  232. if output_path is None:
  233. output_path = str(self.output_path).replace(".txt", ".md")
  234. Path(output_path).write_text(content, encoding="utf-8")
  235. return content
  236. def _render_markdown_tree(
  237. self,
  238. tree: Dict[str, List[str]],
  239. step_map: Dict[str, Dict[str, Any]],
  240. parent_id: str = "__root__",
  241. level: int = 3,
  242. ) -> str:
  243. """递归渲染 Markdown 树"""
  244. lines = []
  245. child_ids = tree.get(parent_id, [])
  246. for step_id in child_ids:
  247. step = step_map.get(step_id, {})
  248. # 渲染节点
  249. node_md = self._render_markdown_node(step, level)
  250. lines.append(node_md)
  251. # 递归子节点
  252. if step_id in tree:
  253. child_md = self._render_markdown_tree(tree, step_map, step_id, level + 1)
  254. lines.append(child_md)
  255. return "\n".join(lines)
  256. def _render_markdown_node(self, step: Dict[str, Any], level: int) -> str:
  257. """渲染单个节点的 Markdown"""
  258. lines = []
  259. # 标题
  260. status = step.get("status", "unknown")
  261. status_icons = {
  262. "completed": "✓",
  263. "in_progress": "→",
  264. "planned": "○",
  265. "failed": "✗",
  266. "skipped": "⊘",
  267. }
  268. icon = status_icons.get(status, "?")
  269. step_type = step.get("step_type", "unknown")
  270. description = step.get("description", "")
  271. heading = "#" * level
  272. lines.append(f"{heading} [{icon}] {step_type}: {description}")
  273. lines.append("")
  274. # 基本信息
  275. lines.append("**基本信息**")
  276. lines.append("")
  277. step_id = step.get("step_id", "")[:16]
  278. lines.append(f"- **id**: `{step_id}...`")
  279. if step.get("duration_ms") is not None:
  280. lines.append(f"- **duration**: {step.get('duration_ms')}ms")
  281. if step.get("tokens") is not None:
  282. lines.append(f"- **tokens**: {step.get('tokens')}")
  283. if step.get("cost") is not None:
  284. lines.append(f"- **cost**: ${step.get('cost'):.4f}")
  285. created_at = step.get("created_at", "")
  286. if created_at:
  287. if isinstance(created_at, str):
  288. time_part = created_at.split("T")[-1][:8] if "T" in created_at else created_at
  289. else:
  290. time_part = created_at.strftime("%H:%M:%S")
  291. lines.append(f"- **time**: {time_part}")
  292. lines.append("")
  293. # Summary
  294. if step.get("summary"):
  295. lines.append("<details>")
  296. lines.append("<summary><b>📝 Summary</b></summary>")
  297. lines.append("")
  298. lines.append(f"```\n{step.get('summary')}\n```")
  299. lines.append("")
  300. lines.append("</details>")
  301. lines.append("")
  302. # Data(完整输出,不截断)
  303. data = step.get("data", {})
  304. if data:
  305. lines.append(self._render_markdown_data(data))
  306. lines.append("")
  307. return "\n".join(lines)
  308. def _render_markdown_data(self, data: Dict[str, Any]) -> str:
  309. """渲染 data 字典为可折叠的 Markdown"""
  310. lines = []
  311. # 定义输出顺序(重要的放前面)
  312. key_order = ["messages", "tools", "response", "content", "tool_calls", "model"]
  313. # 先按顺序输出重要的 key
  314. remaining_keys = set(data.keys())
  315. for key in key_order:
  316. if key in data:
  317. lines.append(self._render_data_item(key, data[key]))
  318. remaining_keys.remove(key)
  319. # 再输出剩余的 key
  320. for key in sorted(remaining_keys):
  321. lines.append(self._render_data_item(key, data[key]))
  322. return "\n".join(lines)
  323. def _render_data_item(self, key: str, value: Any) -> str:
  324. """渲染单个 data 项"""
  325. # 确定图标
  326. icon_map = {
  327. "messages": "📨",
  328. "response": "🤖",
  329. "tools": "🛠️",
  330. "tool_calls": "🔧",
  331. "model": "🎯",
  332. "error": "❌",
  333. "content": "💬",
  334. }
  335. icon = icon_map.get(key, "📄")
  336. # 特殊处理:跳过 None 值
  337. if value is None:
  338. return ""
  339. # 判断是否需要折叠(长内容或复杂结构)
  340. needs_collapse = False
  341. if isinstance(value, str):
  342. needs_collapse = len(value) > 100 or "\n" in value
  343. elif isinstance(value, (dict, list)):
  344. needs_collapse = True
  345. if needs_collapse:
  346. lines = []
  347. # 可折叠块
  348. lines.append("<details>")
  349. lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
  350. lines.append("")
  351. # 格式化内容
  352. if isinstance(value, str):
  353. # 检查是否包含图片 base64
  354. if "data:image" in value or (isinstance(value, str) and len(value) > 10000):
  355. lines.append("```")
  356. lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
  357. lines.append(value[:200] + "...")
  358. lines.append("```")
  359. else:
  360. lines.append("```")
  361. lines.append(value)
  362. lines.append("```")
  363. elif isinstance(value, (dict, list)):
  364. # 递归截断图片 base64
  365. truncated_value = self._truncate_image_data(value)
  366. lines.append("```json")
  367. lines.append(json.dumps(truncated_value, ensure_ascii=False, indent=2))
  368. lines.append("```")
  369. lines.append("")
  370. lines.append("</details>")
  371. return "\n".join(lines)
  372. else:
  373. # 简单值,直接显示
  374. return f"- **{icon} {key}**: `{value}`"
  375. def _truncate_image_data(self, obj: Any, max_length: int = 200) -> Any:
  376. """递归截断对象中的图片 base64 数据"""
  377. if isinstance(obj, dict):
  378. result = {}
  379. for key, value in obj.items():
  380. # 检测图片 URL(data:image/...;base64,...)
  381. if isinstance(value, str) and value.startswith("data:image"):
  382. # 提取 MIME 类型和数据长度
  383. header_end = value.find(",")
  384. if header_end > 0:
  385. header = value[:header_end]
  386. data = value[header_end+1:]
  387. data_size_kb = len(data) / 1024
  388. result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
  389. else:
  390. result[key] = value[:max_length] + f"... ({len(value)} chars)"
  391. else:
  392. result[key] = self._truncate_image_data(value, max_length)
  393. return result
  394. elif isinstance(obj, list):
  395. return [self._truncate_image_data(item, max_length) for item in obj]
  396. elif isinstance(obj, str) and len(obj) > 100000:
  397. # 超长字符串(可能是未检测到的 base64)
  398. return obj[:max_length] + f"... (TRUNCATED: {len(obj)} chars total)"
  399. else:
  400. return obj
  401. def dump_tree(
  402. trace: Optional[Any] = None,
  403. steps: Optional[List[Any]] = None,
  404. output_path: str = DEFAULT_DUMP_PATH,
  405. title: str = "Step Tree Debug",
  406. ) -> str:
  407. """
  408. 便捷函数:输出 Step 树到文件
  409. Args:
  410. trace: Trace 对象或字典
  411. steps: Step 对象或字典列表
  412. output_path: 输出文件路径
  413. title: 输出标题
  414. Returns:
  415. 输出的文本内容
  416. 示例:
  417. from agent.debug import dump_tree
  418. # 每次 step 变化后调用
  419. dump_tree(trace, steps)
  420. # 自定义路径
  421. dump_tree(trace, steps, output_path=".debug/my_trace.txt")
  422. """
  423. # 转换为字典
  424. trace_dict = None
  425. if trace is not None:
  426. trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
  427. steps_list = []
  428. if steps:
  429. for step in steps:
  430. if hasattr(step, "to_dict"):
  431. steps_list.append(step.to_dict())
  432. else:
  433. steps_list.append(step)
  434. dumper = StepTreeDumper(output_path)
  435. return dumper.dump(trace_dict, steps_list, title)
  436. def dump_json(
  437. trace: Optional[Any] = None,
  438. steps: Optional[List[Any]] = None,
  439. output_path: str = DEFAULT_JSON_PATH,
  440. ) -> str:
  441. """
  442. 输出完整的 JSON 格式(用于程序化分析)
  443. Args:
  444. trace: Trace 对象或字典
  445. steps: Step 对象或字典列表
  446. output_path: 输出文件路径
  447. Returns:
  448. JSON 字符串
  449. """
  450. path = Path(output_path)
  451. path.parent.mkdir(parents=True, exist_ok=True)
  452. # 转换为字典
  453. trace_dict = None
  454. if trace is not None:
  455. trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
  456. steps_list = []
  457. if steps:
  458. for step in steps:
  459. if hasattr(step, "to_dict"):
  460. steps_list.append(step.to_dict())
  461. else:
  462. steps_list.append(step)
  463. data = {
  464. "generated_at": datetime.now().isoformat(),
  465. "trace": trace_dict,
  466. "steps": steps_list,
  467. }
  468. content = json.dumps(data, ensure_ascii=False, indent=2)
  469. path.write_text(content, encoding="utf-8")
  470. return content
  471. def dump_markdown(
  472. trace: Optional[Any] = None,
  473. steps: Optional[List[Any]] = None,
  474. output_path: str = DEFAULT_MD_PATH,
  475. title: str = "Step Tree Debug",
  476. ) -> str:
  477. """
  478. 便捷函数:输出 Markdown 格式(支持折叠,完整内容)
  479. Args:
  480. trace: Trace 对象或字典
  481. steps: Step 对象或字典列表
  482. output_path: 输出文件路径(默认 .trace/tree.md)
  483. title: 输出标题
  484. Returns:
  485. 输出的 Markdown 内容
  486. 示例:
  487. from agent.debug import dump_markdown
  488. # 输出完整可折叠的 Markdown
  489. dump_markdown(trace, steps)
  490. # 自定义路径
  491. dump_markdown(trace, steps, output_path=".debug/debug.md")
  492. """
  493. # 转换为字典
  494. trace_dict = None
  495. if trace is not None:
  496. trace_dict = trace.to_dict() if hasattr(trace, "to_dict") else trace
  497. steps_list = []
  498. if steps:
  499. for step in steps:
  500. if hasattr(step, "to_dict"):
  501. steps_list.append(step.to_dict())
  502. else:
  503. steps_list.append(step)
  504. dumper = StepTreeDumper(output_path)
  505. return dumper.dump_markdown(trace_dict, steps_list, title, output_path)