| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- """
- 将 messages 转为可视化 HTML 结构展示
- 功能:
- - 每条 message 有清晰的类型标识(系统 / 用户 / 助手 / 工具)
- - 工具类型标注工具名称和工具输出
- - 内容过长时支持展开/收起
- """
- import json
- from pathlib import Path
- from typing import Any, List, Union
- # 展开阈值:超过此字符数则默认折叠
- COLLAPSE_THRESHOLD = 300
- def _ensure_messages(messages: List[Any]) -> List[dict]:
- """将 Message 对象或 dict 统一转为 dict 列表"""
- result = []
- for m in messages:
- if hasattr(m, "to_dict"):
- result.append(m.to_dict())
- elif isinstance(m, dict):
- result.append(m)
- else:
- result.append({"role": "unknown", "content": str(m)})
- return result
- def _get_message_type_info(msg: dict) -> tuple[str, str, str]:
- """
- 根据消息内容返回 (类型标签, 简短说明, 样式类)
- """
- role = msg.get("role", "unknown")
- content = msg.get("content")
- desc = msg.get("description", "")
- if role == "system":
- return "系统", "系统指令", "msg-system"
- if role == "user":
- return "用户", "用户输入", "msg-user"
- if role == "assistant":
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- if tool_calls:
- names = [
- tc.get("function", {}).get("name", "?")
- for tc in (tool_calls if isinstance(tool_calls, list) else [])
- ]
- label = f"工具调用: {', '.join(names)}" if names else "工具调用"
- return "助手", label, "msg-assistant-tool"
- if text:
- return "助手", "文本回复", "msg-assistant"
- return "助手", desc or "助手消息", "msg-assistant"
- if role == "tool":
- tool_name = "unknown"
- if isinstance(content, dict):
- tool_name = content.get("tool_name", content.get("name", "unknown"))
- return "工具", tool_name, "msg-tool"
- return "未知", str(role), "msg-unknown"
- def _extract_display_content(msg: dict) -> str:
- """提取用于展示的文本内容"""
- role = msg.get("role", "unknown")
- content = msg.get("content")
- if role == "system" or role == "user":
- return str(content) if content else ""
- if role == "assistant" and isinstance(content, dict):
- return content.get("text", "") or ""
- if role == "tool" and isinstance(content, dict):
- result = content.get("result", content)
- if isinstance(result, list):
- return json.dumps(result, ensure_ascii=False, indent=2)
- return str(result) if result else ""
- return str(content) if content else ""
- def _extract_tool_info(msg: dict) -> tuple[str, str]:
- """提取 tool 消息的工具名和输出"""
- content = msg.get("content")
- if not isinstance(content, dict):
- return "unknown", str(content or "")
- tool_name = content.get("tool_name", content.get("name", msg.get("description", "unknown")))
- result = content.get("result", content.get("output", content))
- if isinstance(result, dict) or isinstance(result, list):
- output = json.dumps(result, ensure_ascii=False, indent=2)
- else:
- output = str(result) if result is not None else ""
- return tool_name, output
- def _render_collapsible(content: str, block_id: str = "") -> str:
- """生成可展开/收起的 HTML 片段"""
- content = content.strip()
- if not content:
- return '<pre class="content-body"></pre>'
- escaped = content.replace("&", "&").replace("<", "<").replace(">", ">")
- should_collapse = len(content) > COLLAPSE_THRESHOLD
- safe_id = "".join(c if c.isalnum() or c in "-_" else "-" for c in block_id) or "x"
- if should_collapse:
- preview = escaped[:COLLAPSE_THRESHOLD] + "…"
- full = escaped
- return f'''<div class="collapsible-wrap">
- <pre class="content-body content-preview" id="preview-{safe_id}">{preview}</pre>
- <pre class="content-body content-full" id="full-{safe_id}" style="display:none">{full}</pre>
- <button class="btn-toggle" onclick="toggleExpand('{safe_id}')">展开全部</button>
- </div>'''
- return f'<pre class="content-body">{escaped}</pre>'
- def _render_message(msg: dict, index: int) -> str:
- """渲染单条消息为 HTML"""
- type_label, short_desc, css_class = _get_message_type_info(msg)
- seq = msg.get("sequence", index)
- role = msg.get("role", "unknown")
- bid = f"m{index}"
- # 头部:类型 + 简短说明
- header = f'<div class="msg-header"><span class="msg-type {css_class}">{type_label}</span> <span class="msg-desc">{short_desc}</span></div>'
- body_parts = []
- if role == "assistant":
- content = msg.get("content")
- if isinstance(content, dict):
- tool_calls = content.get("tool_calls")
- text = content.get("text", "")
- if tool_calls:
- for tc in tool_calls:
- fn = tc.get("function", {})
- name = fn.get("name", "?")
- args_str = fn.get("arguments", "{}")
- try:
- args_json = json.loads(args_str)
- args_preview = json.dumps(args_json, ensure_ascii=False)[:200]
- if len(json.dumps(args_json)) > 200:
- args_preview += "…"
- except Exception:
- args_preview = args_str[:200] + ("…" if len(args_str) > 200 else "")
- body_parts.append(
- f'<div class="tool-call-item"><span class="tool-name">🛠 {name}</span><pre class="tool-args">{args_preview}</pre></div>'
- )
- if text:
- body_parts.append(_render_collapsible(text, f"{bid}-text"))
- elif role == "tool":
- tool_name, output = _extract_tool_info(msg)
- body_parts.append(f'<div class="tool-output-header"><span class="tool-name">🛠 {tool_name}</span></div>')
- body_parts.append(_render_collapsible(output, f"{bid}-tool"))
- else:
- content = _extract_display_content(msg)
- body_parts.append(_render_collapsible(content, bid))
- body = "\n".join(body_parts)
- return f'<div class="msg-item" data-role="{role}" data-seq="{seq}">{header}<div class="msg-body">{body}</div></div>'
- def _build_html(messages: List[dict], title: str = "Messages") -> str:
- """构建完整 HTML 文档"""
- items_html = "\n".join(_render_message(m, i) for i, m in enumerate(messages))
- return f"""<!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <title>{title}</title>
- <style>
- * {{ box-sizing: border-box; }}
- body {{ font-family: ui-sans-serif, system-ui, -apple-system, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; line-height: 1.5; }}
- h1 {{ font-size: 1.25rem; margin-bottom: 16px; color: #333; }}
- .msg-list {{ display: flex; flex-direction: column; gap: 12px; }}
- .msg-item {{ background: #fff; border-radius: 8px; padding: 12px 16px; box-shadow: 0 1px 3px rgba(0,0,0,.08); border-left: 4px solid #94a3b8; }}
- .msg-item[data-role="system"] {{ border-left-color: #64748b; }}
- .msg-item[data-role="user"] {{ border-left-color: #3b82f6; }}
- .msg-item[data-role="assistant"] {{ border-left-color: #22c55e; }}
- .msg-item[data-role="tool"] {{ border-left-color: #f59e0b; }}
- .msg-header {{ margin-bottom: 10px; display: flex; align-items: center; gap: 8px; flex-wrap: wrap; }}
- .msg-type {{ font-size: 0.75rem; font-weight: 600; padding: 2px 8px; border-radius: 4px; }}
- .msg-system {{ background: #e2e8f0; color: #475569; }}
- .msg-user {{ background: #dbeafe; color: #1d4ed8; }}
- .msg-assistant {{ background: #dcfce7; color: #15803d; }}
- .msg-assistant-tool {{ background: #fef3c7; color: #b45309; }}
- .msg-tool {{ background: #fed7aa; color: #c2410c; }}
- .msg-desc {{ font-size: 0.875rem; color: #64748b; }}
- .msg-body {{ font-size: 0.875rem; }}
- .content-body {{ margin: 0; white-space: pre-wrap; word-break: break-word; font-size: 0.8125rem; color: #334155; max-height: 400px; overflow-y: auto; }}
- .tool-call-item {{ margin-bottom: 8px; }}
- .tool-name {{ font-weight: 600; color: #0f172a; }}
- .tool-args {{ margin: 4px 0 0 0; padding: 8px; background: #f8fafc; border-radius: 4px; font-size: 0.75rem; overflow-x: auto; }}
- .tool-output-header {{ margin-bottom: 8px; }}
- .btn-toggle {{ margin-top: 8px; padding: 4px 12px; font-size: 0.75rem; cursor: pointer; background: #e2e8f0; border: 1px solid #cbd5e1; border-radius: 4px; color: #475569; }}
- .btn-toggle:hover {{ background: #cbd5e1; }}
- .collapsible-wrap {{ position: relative; }}
- </style>
- </head>
- <body>
- <h1>{title}</h1>
- <div class="msg-list">{items_html}</div>
- <script>
- function toggleExpand(idSuffix) {{
- var preview = document.getElementById('preview-' + idSuffix);
- var full = document.getElementById('full-' + idSuffix);
- var btn = preview.parentElement.querySelector('.btn-toggle');
- if (!preview || !full) return;
- if (full.style.display === 'none') {{
- preview.style.display = 'none';
- full.style.display = 'block';
- if (btn) btn.textContent = '收起';
- }} else {{
- preview.style.display = 'block';
- full.style.display = 'none';
- if (btn) btn.textContent = '展开全部';
- }}
- }}
- </script>
- </body>
- </html>"""
- def messages_to_html(
- messages: List[Any],
- output_path: Union[str, Path],
- title: str = "Messages 可视化",
- ) -> Path:
- """
- 将 messages 转为 HTML 并写入文件
- Args:
- messages: Message 对象或 dict 列表
- output_path: 输出 HTML 文件路径
- title: 页面标题
- Returns:
- 输出文件的 Path
- """
- data = _ensure_messages(messages)
- html = _build_html(data, title)
- out = Path(output_path)
- out.parent.mkdir(parents=True, exist_ok=True)
- out.write_text(html, encoding="utf-8")
- return out
- async def trace_to_html(
- trace_id: str,
- output_path: Union[str, Path],
- base_path: str = ".trace",
- title: str | None = None,
- ) -> Path:
- """
- 从 Trace 加载 messages 并生成 HTML
- Args:
- trace_id: Trace ID
- output_path: 输出 HTML 文件路径
- base_path: Trace 存储根目录
- title: 页面标题,默认使用 trace_id
- Returns:
- 输出文件的 Path
- """
- from agent.trace import FileSystemTraceStore
- store = FileSystemTraceStore(base_path=base_path)
- messages = await store.get_trace_messages(trace_id)
- if not messages:
- raise FileNotFoundError(f"Trace {trace_id} 下没有找到 messages")
- page_title = title or f"Trace {trace_id[:8]}... Messages"
- return messages_to_html(messages, output_path, title=page_title)
- if __name__ == "__main__":
- import asyncio
- import sys
- from pathlib import Path
- # 添加项目根目录,使 agent 模块可被导入
- _project_root = Path(__file__).resolve().parent.parent.parent
- if str(_project_root) not in sys.path:
- sys.path.insert(0, str(_project_root))
- async def _main():
- import argparse
- parser = argparse.ArgumentParser(description="将 trace messages 转为 HTML")
- parser.add_argument("--trace", required=True, help="Trace ID")
- parser.add_argument("-o", "--output", default="messages.html", help="输出文件路径")
- parser.add_argument("--base-path", default=".trace", help="Trace 存储根目录")
- args = parser.parse_args()
- out = await trace_to_html(args.trace, args.output, base_path=args.base_path)
- print(f"已生成: {out.absolute()}")
- asyncio.run(_main())
|