""" 将 messages 转为可视化 HTML 结构展示 功能: - 每条 message 有清晰的类型标识(系统 / 用户 / 助手 / 工具) - 工具类型标注工具名称和工具输出 - 内容过长时支持展开/收起 """ import json from pathlib import Path from typing import Any, List, Union # 展开阈值:超过此字符数则默认折叠 COLLAPSE_THRESHOLD = 300 def _ensure_messages(messages: List[Any]) -> List[dict]: """将 Message 对象或 dict 统一转为 dict 列表""" result = [] for m in messages: if hasattr(m, "to_dict"): result.append(m.to_dict()) elif isinstance(m, dict): result.append(m) else: result.append({"role": "unknown", "content": str(m)}) return result def _get_message_type_info(msg: dict) -> tuple[str, str, str]: """ 根据消息内容返回 (类型标签, 简短说明, 样式类) """ role = msg.get("role", "unknown") content = msg.get("content") desc = msg.get("description", "") if role == "system": return "系统", "系统指令", "msg-system" if role == "user": return "用户", "用户输入", "msg-user" if role == "assistant": if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") if tool_calls: names = [ tc.get("function", {}).get("name", "?") for tc in (tool_calls if isinstance(tool_calls, list) else []) ] label = f"工具调用: {', '.join(names)}" if names else "工具调用" return "助手", label, "msg-assistant-tool" if text: return "助手", "文本回复", "msg-assistant" return "助手", desc or "助手消息", "msg-assistant" if role == "tool": tool_name = "unknown" if isinstance(content, dict): tool_name = content.get("tool_name", content.get("name", "unknown")) return "工具", tool_name, "msg-tool" return "未知", str(role), "msg-unknown" def _extract_display_content(msg: dict) -> str: """提取用于展示的文本内容""" role = msg.get("role", "unknown") content = msg.get("content") if role == "system" or role == "user": return str(content) if content else "" if role == "assistant" and isinstance(content, dict): return content.get("text", "") or "" if role == "tool" and isinstance(content, dict): result = content.get("result", content) if isinstance(result, list): return json.dumps(result, ensure_ascii=False, indent=2) return str(result) if result else "" return str(content) if content else "" def _extract_tool_info(msg: dict) -> tuple[str, str]: """提取 tool 消息的工具名和输出""" content = msg.get("content") if not isinstance(content, dict): return "unknown", str(content or "") tool_name = content.get("tool_name", content.get("name", msg.get("description", "unknown"))) result = content.get("result", content.get("output", content)) if isinstance(result, dict) or isinstance(result, list): output = json.dumps(result, ensure_ascii=False, indent=2) else: output = str(result) if result is not None else "" return tool_name, output def _render_collapsible(content: str, block_id: str = "") -> str: """生成可展开/收起的 HTML 片段""" content = content.strip() if not content: return '
'

    escaped = content.replace("&", "&").replace("<", "<").replace(">", ">")
    should_collapse = len(content) > COLLAPSE_THRESHOLD
    safe_id = "".join(c if c.isalnum() or c in "-_" else "-" for c in block_id) or "x"

    if should_collapse:
        preview = escaped[:COLLAPSE_THRESHOLD] + "…"
        full = escaped
        return f'''
{preview}
''' return f'
{escaped}
' def _render_message(msg: dict, index: int) -> str: """渲染单条消息为 HTML""" type_label, short_desc, css_class = _get_message_type_info(msg) seq = msg.get("sequence", index) role = msg.get("role", "unknown") bid = f"m{index}" # 头部:类型 + 简短说明 header = f'
{type_label} {short_desc}
' body_parts = [] if role == "assistant": content = msg.get("content") if isinstance(content, dict): tool_calls = content.get("tool_calls") text = content.get("text", "") if tool_calls: for tc in tool_calls: fn = tc.get("function", {}) name = fn.get("name", "?") args_str = fn.get("arguments", "{}") try: args_json = json.loads(args_str) args_preview = json.dumps(args_json, ensure_ascii=False)[:200] if len(json.dumps(args_json)) > 200: args_preview += "…" except Exception: args_preview = args_str[:200] + ("…" if len(args_str) > 200 else "") body_parts.append( f'
🛠 {name}
{args_preview}
' ) if text: body_parts.append(_render_collapsible(text, f"{bid}-text")) elif role == "tool": tool_name, output = _extract_tool_info(msg) body_parts.append(f'
🛠 {tool_name}
') body_parts.append(_render_collapsible(output, f"{bid}-tool")) else: content = _extract_display_content(msg) body_parts.append(_render_collapsible(content, bid)) body = "\n".join(body_parts) return f'
{header}
{body}
' def _build_html(messages: List[dict], title: str = "Messages") -> str: """构建完整 HTML 文档""" items_html = "\n".join(_render_message(m, i) for i, m in enumerate(messages)) return f""" {title}

{title}

{items_html}
""" def messages_to_html( messages: List[Any], output_path: Union[str, Path], title: str = "Messages 可视化", ) -> Path: """ 将 messages 转为 HTML 并写入文件 Args: messages: Message 对象或 dict 列表 output_path: 输出 HTML 文件路径 title: 页面标题 Returns: 输出文件的 Path """ data = _ensure_messages(messages) html = _build_html(data, title) out = Path(output_path) out.parent.mkdir(parents=True, exist_ok=True) out.write_text(html, encoding="utf-8") return out async def trace_to_html( trace_id: str, output_path: Union[str, Path], base_path: str = ".trace", title: str | None = None, ) -> Path: """ 从 Trace 加载 messages 并生成 HTML Args: trace_id: Trace ID output_path: 输出 HTML 文件路径 base_path: Trace 存储根目录 title: 页面标题,默认使用 trace_id Returns: 输出文件的 Path """ from agent.trace import FileSystemTraceStore store = FileSystemTraceStore(base_path=base_path) messages = await store.get_trace_messages(trace_id) if not messages: raise FileNotFoundError(f"Trace {trace_id} 下没有找到 messages") page_title = title or f"Trace {trace_id[:8]}... Messages" return messages_to_html(messages, output_path, title=page_title) if __name__ == "__main__": import asyncio import sys from pathlib import Path # 添加项目根目录,使 agent 模块可被导入 _project_root = Path(__file__).resolve().parent.parent.parent if str(_project_root) not in sys.path: sys.path.insert(0, str(_project_root)) async def _main(): import argparse parser = argparse.ArgumentParser(description="将 trace messages 转为 HTML") parser.add_argument("--trace", required=True, help="Trace ID") parser.add_argument("-o", "--output", default="messages.html", help="输出文件路径") parser.add_argument("--base-path", default=".trace", help="Trace 存储根目录") args = parser.parse_args() out = await trace_to_html(args.trace, args.output, base_path=args.base_path) print(f"已生成: {out.absolute()}") asyncio.run(_main())