Browse Source

feat: step tree & visualization API

Talegorithm 1 month ago
parent
commit
8b204cfba7

+ 1 - 3
.claude/settings.local.json

@@ -6,9 +6,7 @@
       "Bash(pip show:*)",
       "Read(//usr/local/anaconda3/lib/python3.13/site-packages/browser_use/**)",
       "Bash(tee:*)",
-      "Bash(browser-use:*)",
-      "Bash(pip install:*)",
-      "Bash(timeout 60 python:*)"
+      "Bash(browser-use:*)"
     ],
     "deny": [],
     "ask": []

+ 4 - 2
.gitignore

@@ -49,7 +49,7 @@ htmlcov/
 .DS_Store
 Thumbs.db
 
-# .env
+.env
 debug.log
 info.log
 .browser_use_files
@@ -59,4 +59,6 @@ output
 
 # Debug output
 .trace/
-cloud_xhs/
+.trace_test/
+.trace_test2/
+examples/**/output*/

+ 2 - 4
agent/__init__.py

@@ -15,9 +15,8 @@ from agent.core.runner import AgentRunner
 from agent.core.config import AgentConfig, CallResult
 
 # 执行追踪
-from agent.execution.models import Trace, Step, StepType, Status
+from agent.execution.models import Trace, Step, StepType, StepStatus
 from agent.execution.protocols import TraceStore
-from agent.execution.store import MemoryTraceStore
 
 # 记忆系统
 from agent.memory.models import Experience, Skill
@@ -39,9 +38,8 @@ __all__ = [
     "Trace",
     "Step",
     "StepType",
-    "Status",
+    "StepStatus",
     "TraceStore",
-    "MemoryTraceStore",
     # Memory
     "Experience",
     "Skill",

+ 6 - 101
agent/core/runner.py

@@ -19,7 +19,6 @@ from agent.memory.models import Experience, Skill
 from agent.memory.protocols import MemoryStore, StateStore
 from agent.memory.skill_loader import load_skills_from_dir
 from agent.tools import ToolRegistry, get_tool_registry
-from agent.execution import dump_tree, dump_markdown
 
 logger = logging.getLogger(__name__)
 
@@ -68,7 +67,7 @@ class AgentRunner:
             llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
             config: Agent 配置
             skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
-            debug: 是否启用 debug 模式(输出 step tree 到 .trace/tree.txt
+            debug: 保留参数(已废弃,请使用 API Server 可视化
         """
         self.trace_store = trace_store
         self.memory_store = memory_store
@@ -85,17 +84,10 @@ class AgentRunner:
         return str(uuid.uuid4())
 
     async def _dump_debug(self, trace_id: str) -> None:
-        """Debug 模式下输出 step tree(txt + markdown 两种格式)"""
-        if not self.debug or not self.trace_store:
-            return
-        trace = await self.trace_store.get_trace(trace_id)
-        steps = await self.trace_store.get_trace_steps(trace_id)
-
-        # 输出 tree.txt(简洁格式,兼容旧版)
-        dump_tree(trace, steps)
-
-        # 输出 tree.md(完整可折叠格式)
-        dump_markdown(trace, steps)
+        """Debug 模式(已废弃 - 使用 API 可视化替代)"""
+        # 不再自动生成 tree.txt/tree.md/tree.json
+        # 请使用 API Server 进行可视化:python3 api_server.py
+        pass
 
     # ===== 单次调用 =====
 
@@ -415,13 +407,10 @@ class AgentRunner:
                     for tc in tool_calls:
                         tool_name = tc["function"]["name"]
                         tool_args = tc["function"]["arguments"]
-                        if tool_args and isinstance(tool_args, str):
+                        if isinstance(tool_args, str):
                             import json
                             tool_args = json.loads(tool_args)
 
-                        if not tool_args:
-                            tool_args = {}
-
                         # 执行工具
                         tool_result = await self.tools.execute(
                             tool_name,
@@ -541,90 +530,6 @@ class AgentRunner:
                     yield trace_obj
             raise
 
-    # ===== 反馈 =====
-
-    async def add_feedback(
-        self,
-        trace_id: str,
-        target_step_id: str,
-        feedback_type: Literal["positive", "negative", "correction"],
-        content: str,
-        extract_experience: bool = True
-    ) -> Optional[str]:
-        """
-        添加人工反馈
-
-        Args:
-            trace_id: Trace ID
-            target_step_id: 反馈针对的 Step ID
-            feedback_type: 反馈类型
-            content: 反馈内容
-            extract_experience: 是否自动提取经验
-
-        Returns:
-            experience_id: 如果提取了经验
-        """
-        if not self.trace_store:
-            return None
-
-        # 获取 Trace
-        trace = await self.trace_store.get_trace(trace_id)
-        if not trace:
-            logger.warning(f"Trace not found: {trace_id}")
-            return None
-
-        # 创建 feedback Step
-        steps = await self.trace_store.get_trace_steps(trace_id)
-        max_seq = max(s.sequence for s in steps) if steps else 0
-
-        feedback_step = Step.create(
-            trace_id=trace_id,
-            step_type="feedback",
-            sequence=max_seq + 1,
-            status="completed",
-            description=f"{feedback_type}: {content[:50]}...",
-            parent_id=target_step_id,
-            data={
-                "target_step_id": target_step_id,
-                "feedback_type": feedback_type,
-                "content": content
-            }
-        )
-        await self.trace_store.add_step(feedback_step)
-        await self._dump_debug(trace_id)
-
-        # 提取经验
-        exp_id = None
-        if extract_experience and self.memory_store and feedback_type in ("positive", "correction"):
-            exp = Experience.create(
-                scope=f"agent:{trace.agent_type}" if trace.agent_type else "agent:default",
-                condition=f"执行类似 '{trace.task}' 任务时" if trace.task else "通用场景",
-                rule=content,
-                evidence=[target_step_id, feedback_step.step_id],
-                source="feedback",
-                confidence=0.8 if feedback_type == "positive" else 0.6
-            )
-            exp_id = await self.memory_store.add_experience(exp)
-
-            # 记录 memory_write Step
-            mem_step = Step.create(
-                trace_id=trace_id,
-                step_type="memory_write",
-                sequence=max_seq + 2,
-                status="completed",
-                description=f"保存经验: {exp.condition[:30]}...",
-                parent_id=feedback_step.step_id,
-                data={
-                    "experience_id": exp_id,
-                    "condition": exp.condition,
-                    "rule": exp.rule
-                }
-            )
-            await self.trace_store.add_step(mem_step)
-            await self._dump_debug(trace_id)
-
-        return exp_id
-
     # ===== 辅助方法 =====
 
     def _format_skills(self, skills: List[Skill]) -> str:

+ 0 - 71
agent/execution/README.md

@@ -1,71 +0,0 @@
-# Agent Execution - 执行追踪系统
-
-## 职责
-
-执行追踪系统负责记录和可视化 Agent 的执行过程:
-
-1. **数据模型** (`models.py`)
-   - `Trace` - 一次完整的执行轨迹
-   - `Step` - 执行过程中的一个原子操作(形成树结构)
-   - `StepType` - 步骤类型(思考、动作、结果等)
-   - `Status` - 步骤状态(计划中、执行中、完成、失败)
-
-2. **存储接口** (`protocols.py`, `store.py`)
-   - `TraceStore` - Trace 存储接口协议
-   - `MemoryTraceStore` - 内存实现(用于测试)
-
-3. **可视化工具** (`tree_dump.py`)
-   - `dump_tree()` - 输出文本格式的 step tree
-   - `dump_markdown()` - 输出 markdown 格式(可折叠)
-   - `dump_json()` - 输出 JSON 格式
-
-4. **API 和实时推送** (`api.py`, `websocket.py`)
-   - RESTful API - 查询 Trace 和 Step
-   - WebSocket - 实时推送执行更新
-
-## 模块边界
-
-- **只依赖**:core(事件系统)
-- **被依赖**:core.runner(记录 trace)
-- **独立开发**:可视化、API、WebSocket 可独立迭代
-
-## 使用示例
-
-```python
-from agent.execution import MemoryTraceStore, Trace, Step, dump_tree
-
-# 创建存储
-store = MemoryTraceStore()
-
-# 创建 Trace
-trace = Trace.create(mode="agent", task="Complete task")
-trace_id = await store.create_trace(trace)
-
-# 添加 Step
-step = Step.create(
-    trace_id=trace_id,
-    step_type="thought",
-    description="Analyzing task"
-)
-await store.add_step(step)
-
-# 可视化
-trace = await store.get_trace(trace_id)
-steps = await store.get_trace_steps(trace_id)
-dump_tree(trace, steps)
-```
-
-## 文件说明
-
-- `models.py` - Trace 和 Step 数据模型
-- `protocols.py` - TraceStore 接口定义
-- `store.py` - MemoryTraceStore 实现
-- `tree_dump.py` - Step tree 可视化工具
-- `api.py` - RESTful API(可选,需要 FastAPI)
-- `websocket.py` - WebSocket 推送(可选,需要 FastAPI)
-- `__init__.py` - 模块导出
-
-## 适合分工
-
-- **算法同事**:负责 `tree_dump.py` 可视化优化
-- **后端同事**:负责 `api.py` 和 `websocket.py` 开发

+ 8 - 8
agent/execution/__init__.py

@@ -3,20 +3,20 @@ Execution - 执行追踪系统
 
 核心职责:
 1. Trace/Step 模型定义
-2. 存储接口和实现(内存/数据库
+2. 存储接口和实现(文件系统
 3. Step 树可视化(文本/markdown/JSON)
-4. RESTful API(可视化查询)
-5. WebSocket 推送(实时更新)
+4. RESTful API(可视化查询,支持 compact/full 视图
+5. WebSocket 推送(实时更新,支持断线续传
 """
 
 # 模型(核心,无依赖)
-from agent.execution.models import Trace, Step, StepType, Status
+from agent.execution.models import Trace, Step, StepType, StepStatus
 
 # 存储接口(核心,无依赖)
 from agent.execution.protocols import TraceStore
 
-# 内存存储实现(核心,无依赖
-from agent.execution.store import MemoryTraceStore
+# 文件系统存储实现(跨进程 + 持久化
+from agent.execution.fs_store import FileSystemTraceStore
 
 # Debug 工具(可视化)
 from agent.execution.tree_dump import StepTreeDumper, dump_tree, dump_markdown, dump_json
@@ -62,10 +62,10 @@ __all__ = [
     "Trace",
     "Step",
     "StepType",
-    "Status",
+    "StepStatus",
     # 存储
     "TraceStore",
-    "MemoryTraceStore",
+    "FileSystemTraceStore",
     # Debug/可视化
     "StepTreeDumper",
     "dump_tree",

+ 76 - 6
agent/execution/api.py

@@ -1,11 +1,12 @@
 """
 Step 树 RESTful API
 
-提供 Trace 和 Step 的查询接口,支持懒加载
+提供 Trace 和 Step 的查询接口,支持懒加载和 compact/full 视图
 """
 
 from typing import List, Optional, Dict, Any
 from fastapi import APIRouter, HTTPException, Query
+from fastapi.responses import PlainTextResponse
 from pydantic import BaseModel
 
 from agent.execution.protocols import TraceStore
@@ -137,12 +138,18 @@ async def get_trace(trace_id: str):
 
 
 @router.get("/{trace_id}/tree", response_model=TreeResponse)
-async def get_full_tree(trace_id: str):
+async def get_full_tree(
+    trace_id: str,
+    view: str = Query("compact", regex="^(compact|full)$"),
+    max_depth: int = Query(999, ge=1, le=999)
+):
     """
     获取完整 Step 树(小型 Trace 推荐)
 
     Args:
         trace_id: Trace ID
+        view: compact(默认,不含 blob)| full(含 blob)
+        max_depth: 最大深度
     """
     store = get_trace_store()
 
@@ -155,7 +162,7 @@ async def get_full_tree(trace_id: str):
     steps = await store.get_trace_steps(trace_id)
 
     # 构建树结构
-    root_nodes = await _build_tree(store, trace_id, None, expand=True, max_depth=999)
+    root_nodes = await _build_tree(store, trace_id, None, view=view, expand=True, max_depth=max_depth)
 
     return TreeResponse(
         trace_id=trace_id,
@@ -167,6 +174,7 @@ async def get_full_tree(trace_id: str):
 async def get_node(
     trace_id: str,
     step_id: str,
+    view: str = Query("compact", regex="^(compact|full)$"),
     expand: bool = Query(False, description="是否加载子节点"),
     max_depth: int = Query(1, ge=1, le=10, description="递归深度")
 ):
@@ -176,6 +184,7 @@ async def get_node(
     Args:
         trace_id: Trace ID
         step_id: Step ID("null" 表示根节点)
+        view: compact | full
         expand: 是否加载子节点
         max_depth: 递归深度
     """
@@ -196,7 +205,7 @@ async def get_node(
             raise HTTPException(status_code=404, detail="Step not found")
 
     # 构建节点树
-    children = await _build_tree(store, trace_id, actual_step_id, expand, max_depth)
+    children = await _build_tree(store, trace_id, actual_step_id, view=view, expand=expand, max_depth=max_depth)
 
     # 如果是根节点,返回所有根 Steps
     if actual_step_id is None:
@@ -217,6 +226,65 @@ async def get_node(
     )
 
 
+@router.get("/{trace_id}/node/{step_id}/children")
+async def get_children_paginated(
+    trace_id: str,
+    step_id: str,
+    cursor: Optional[int] = Query(None, description="上次最后的 sequence"),
+    limit: int = Query(20, ge=1, le=100),
+    view: str = Query("compact", regex="^(compact|full)$")
+):
+    """
+    分页获取子节点(基于 sequence 游标)
+
+    Args:
+        trace_id: Trace ID
+        step_id: Step ID
+        cursor: 上次最后的 sequence(None 表示从头开始)
+        limit: 每页数量
+        view: compact | full
+
+    Returns:
+        {
+            "children": [...],
+            "next_cursor": 123,  # 下一页游标(None 表示没有更多)
+            "has_more": true
+        }
+    """
+    store = get_trace_store()
+
+    # 验证 trace 存在
+    trace = await store.get_trace(trace_id)
+    if not trace:
+        raise HTTPException(status_code=404, detail="Trace not found")
+
+    # 验证 step 存在
+    step = await store.get_step(step_id)
+    if not step or step.trace_id != trace_id:
+        raise HTTPException(status_code=404, detail="Step not found")
+
+    # 获取所有子节点
+    children = await store.get_step_children(step_id)
+
+    # 过滤 cursor 之后的节点
+    if cursor is not None:
+        children = [s for s in children if s.sequence > cursor]
+
+    # 分页
+    has_more = len(children) > limit
+    page = children[:limit]
+    next_cursor = page[-1].sequence if page and has_more else None
+
+    # 序列化
+    children_dicts = [s.to_dict(view=view) for s in page]
+
+    return {
+        "children": children_dicts,
+        "next_cursor": next_cursor,
+        "has_more": has_more
+    }
+
+
 # ===== 核心算法:懒加载树构建 =====
 
 
@@ -224,6 +292,7 @@ async def _build_tree(
     store: TraceStore,
     trace_id: str,
     step_id: Optional[str],
+    view: str = "compact",  # 新增参数
     expand: bool = False,
     max_depth: int = 1,
     current_depth: int = 0
@@ -238,6 +307,7 @@ async def _build_tree(
         store: TraceStore 实例
         trace_id: Trace ID
         step_id: 当前 Step ID(None 表示根节点)
+        view: "compact" | "full"
         expand: 是否展开子节点
         max_depth: 最大递归深度
         current_depth: 当前递归深度
@@ -257,7 +327,7 @@ async def _build_tree(
     # 2. 构建响应
     result_nodes = []
     for step in current_nodes:
-        node_dict = step.to_dict()
+        node_dict = step.to_dict(view=view)  # 使用 view 参数
         node_dict["children"] = []
 
         # 3. 递归加载子节点(可选)
@@ -266,7 +336,7 @@ async def _build_tree(
             if children:
                 node_dict["children"] = await _build_tree(
                     store, trace_id, step.step_id,
-                    expand=True, max_depth=max_depth,
+                    view=view, expand=True, max_depth=max_depth,
                     current_depth=current_depth + 1
                 )
 

+ 353 - 0
agent/execution/fs_store.py

@@ -0,0 +1,353 @@
+"""
+FileSystem Trace Store - 文件系统存储实现
+
+用于跨进程数据共享,数据持久化到 .trace/ 目录
+"""
+
+import json
+import os
+from pathlib import Path
+from typing import Dict, List, Optional, Any
+from datetime import datetime
+
+from agent.execution.models import Trace, Step
+
+
+class FileSystemTraceStore:
+    """文件系统 Trace 存储
+
+    目录结构:
+    .trace/
+    ├── trace-001/
+    │   ├── meta.json           # Trace 元数据
+    │   ├── steps/
+    │   │   ├── step-1.json     # 每个 step 独立文件
+    │   │   ├── step-2.json
+    │   │   └── step-3.json
+    │   └── events.jsonl        # 事件流(WebSocket 续传)
+    └── trace-002/
+        └── ...
+    """
+
+    def __init__(self, base_path: str = ".trace"):
+        self.base_path = Path(base_path)
+        self.base_path.mkdir(exist_ok=True)
+
+    def _get_trace_dir(self, trace_id: str) -> Path:
+        """获取 trace 目录"""
+        return self.base_path / trace_id
+
+    def _get_meta_file(self, trace_id: str) -> Path:
+        """获取 meta.json 文件路径"""
+        return self._get_trace_dir(trace_id) / "meta.json"
+
+    def _get_steps_dir(self, trace_id: str) -> Path:
+        """获取 steps 目录"""
+        return self._get_trace_dir(trace_id) / "steps"
+
+    def _get_step_file(self, trace_id: str, step_id: str) -> Path:
+        """获取 step 文件路径"""
+        return self._get_steps_dir(trace_id) / f"{step_id}.json"
+
+    def _get_events_file(self, trace_id: str) -> Path:
+        """获取 events.jsonl 文件路径"""
+        return self._get_trace_dir(trace_id) / "events.jsonl"
+
+    # ===== Trace 操作 =====
+
+    async def create_trace(self, trace: Trace) -> str:
+        """创建新的 Trace"""
+        trace_dir = self._get_trace_dir(trace.trace_id)
+        trace_dir.mkdir(exist_ok=True)
+
+        # 创建 steps 目录
+        steps_dir = self._get_steps_dir(trace.trace_id)
+        steps_dir.mkdir(exist_ok=True)
+
+        # 写入 meta.json
+        meta_file = self._get_meta_file(trace.trace_id)
+        meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+        # 创建空的 events.jsonl
+        events_file = self._get_events_file(trace.trace_id)
+        events_file.touch()
+
+        return trace.trace_id
+
+    async def get_trace(self, trace_id: str) -> Optional[Trace]:
+        """获取 Trace"""
+        meta_file = self._get_meta_file(trace_id)
+        if not meta_file.exists():
+            return None
+
+        data = json.loads(meta_file.read_text())
+
+        # 解析 datetime 字段
+        if data.get("created_at"):
+            data["created_at"] = datetime.fromisoformat(data["created_at"])
+        if data.get("completed_at"):
+            data["completed_at"] = datetime.fromisoformat(data["completed_at"])
+
+        return Trace(**data)
+
+    async def update_trace(self, trace_id: str, **updates) -> None:
+        """更新 Trace"""
+        trace = await self.get_trace(trace_id)
+        if not trace:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(trace, key):
+                setattr(trace, key, value)
+
+        # 写回文件
+        meta_file = self._get_meta_file(trace_id)
+        meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+    async def list_traces(
+        self,
+        mode: Optional[str] = None,
+        agent_type: Optional[str] = None,
+        uid: Optional[str] = None,
+        status: Optional[str] = None,
+        limit: int = 50
+    ) -> List[Trace]:
+        """列出 Traces"""
+        traces = []
+
+        # 遍历所有 trace 目录
+        if not self.base_path.exists():
+            return []
+
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            meta_file = trace_dir / "meta.json"
+            if not meta_file.exists():
+                continue
+
+            try:
+                data = json.loads(meta_file.read_text())
+
+                # 过滤
+                if mode and data.get("mode") != mode:
+                    continue
+                if agent_type and data.get("agent_type") != agent_type:
+                    continue
+                if uid and data.get("uid") != uid:
+                    continue
+                if status and data.get("status") != status:
+                    continue
+
+                # 解析 datetime
+                if data.get("created_at"):
+                    data["created_at"] = datetime.fromisoformat(data["created_at"])
+                if data.get("completed_at"):
+                    data["completed_at"] = datetime.fromisoformat(data["completed_at"])
+
+                traces.append(Trace(**data))
+            except Exception:
+                # 跳过损坏的文件
+                continue
+
+        # 排序(最新的在前)
+        traces.sort(key=lambda t: t.created_at, reverse=True)
+
+        return traces[:limit]
+
+    # ===== Step 操作 =====
+
+    async def add_step(self, step: Step) -> str:
+        """添加 Step"""
+        trace_id = step.trace_id
+
+        # 1. 写入 step 文件
+        step_file = self._get_step_file(trace_id, step.step_id)
+        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+        # 2. 更新 trace 的统计信息
+        trace = await self.get_trace(trace_id)
+        if trace:
+            trace.total_steps += 1
+            trace.last_sequence = max(trace.last_sequence, step.sequence)
+
+            # 累加 tokens 和 cost
+            if step.tokens:
+                trace.total_tokens += step.tokens
+            if step.cost:
+                trace.total_cost += step.cost
+            if step.duration_ms:
+                trace.total_duration_ms += step.duration_ms
+
+            # 写回 meta.json
+            meta_file = self._get_meta_file(trace_id)
+            meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+        # 3. 更新父节点的 UI 字段
+        if step.parent_id:
+            parent = await self.get_step(step.parent_id)
+            if parent:
+                parent.has_children = True
+                parent.children_count += 1
+
+                # 写回父节点文件
+                parent_file = self._get_step_file(trace_id, step.parent_id)
+                parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+        # 4. 追加 step_added 事件
+        await self.append_event(trace_id, "step_added", {
+            "step_id": step.step_id,
+            "step_type": step.step_type,
+            "parent_id": step.parent_id,
+            "sequence": step.sequence,
+        })
+
+        return step.step_id
+
+    async def get_step(self, step_id: str) -> Optional[Step]:
+        """获取 Step(扫描所有 trace)"""
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            step_file = trace_dir / "steps" / f"{step_id}.json"
+            if step_file.exists():
+                try:
+                    data = json.loads(step_file.read_text())
+
+                    # 解析 datetime
+                    if data.get("created_at"):
+                        data["created_at"] = datetime.fromisoformat(data["created_at"])
+
+                    return Step(**data)
+                except Exception:
+                    continue
+
+        return None
+
+    async def get_trace_steps(self, trace_id: str) -> List[Step]:
+        """获取 Trace 的所有 Steps"""
+        steps_dir = self._get_steps_dir(trace_id)
+        if not steps_dir.exists():
+            return []
+
+        steps = []
+        for step_file in steps_dir.glob("*.json"):
+            try:
+                data = json.loads(step_file.read_text())
+
+                # 解析 datetime
+                if data.get("created_at"):
+                    data["created_at"] = datetime.fromisoformat(data["created_at"])
+
+                steps.append(Step(**data))
+            except Exception:
+                # 跳过损坏的文件
+                continue
+
+        # 按 sequence 排序
+        steps.sort(key=lambda s: s.sequence)
+        return steps
+
+    async def get_step_children(self, step_id: str) -> List[Step]:
+        """获取 Step 的子节点"""
+        # 需要扫描所有 trace 的所有 steps
+        # TODO: 可以优化为维护索引文件
+        children = []
+
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            steps_dir = trace_dir / "steps"
+            if not steps_dir.exists():
+                continue
+
+            for step_file in steps_dir.glob("*.json"):
+                try:
+                    data = json.loads(step_file.read_text())
+                    if data.get("parent_id") == step_id:
+                        # 解析 datetime
+                        if data.get("created_at"):
+                            data["created_at"] = datetime.fromisoformat(data["created_at"])
+                        children.append(Step(**data))
+                except Exception:
+                    continue
+
+        # 按 sequence 排序
+        children.sort(key=lambda s: s.sequence)
+        return children
+
+    async def update_step(self, step_id: str, **updates) -> None:
+        """更新 Step 字段"""
+        step = await self.get_step(step_id)
+        if not step:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(step, key):
+                setattr(step, key, value)
+
+        # 写回文件
+        step_file = self._get_step_file(step.trace_id, step_id)
+        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+    # ===== 事件流操作(用于 WebSocket 断线续传)=====
+
+    async def get_events(
+        self,
+        trace_id: str,
+        since_event_id: int = 0
+    ) -> List[Dict[str, Any]]:
+        """获取事件流"""
+        events_file = self._get_events_file(trace_id)
+        if not events_file.exists():
+            return []
+
+        events = []
+        with events_file.open('r') as f:
+            for line in f:
+                try:
+                    event = json.loads(line.strip())
+                    if event.get("event_id", 0) > since_event_id:
+                        events.append(event)
+                except Exception:
+                    continue
+
+        return events
+
+    async def append_event(
+        self,
+        trace_id: str,
+        event_type: str,
+        payload: Dict[str, Any]
+    ) -> int:
+        """追加事件,返回 event_id"""
+        # 获取 trace 并递增 event_id
+        trace = await self.get_trace(trace_id)
+        if not trace:
+            return 0
+
+        trace.last_event_id += 1
+        event_id = trace.last_event_id
+
+        # 更新 trace 的 last_event_id
+        await self.update_trace(trace_id, last_event_id=event_id)
+
+        # 创建事件
+        event = {
+            "event_id": event_id,
+            "event": event_type,
+            "ts": datetime.now().isoformat(),
+            **payload
+        }
+
+        # 追加到 events.jsonl
+        events_file = self._get_events_file(trace_id)
+        with events_file.open('a') as f:
+            f.write(json.dumps(event, ensure_ascii=False) + '\n')
+
+        return event_id

+ 62 - 16
agent/execution/models.py

@@ -21,19 +21,18 @@ StepType = Literal[
     "evaluation",  # 评估总结(需要 summary)
     "response",    # 最终回复
 
-    # 工具相关
+    # 工具相关(数据结构上分开以保留描述能力,可视化时候可能合并)
     "action",      # 工具调用(tool_call)
     "result",      # 工具结果(tool_result)
 
     # 系统相关
     "memory_read",   # 读取记忆(经验/技能)
     "memory_write",  # 写入记忆
-    "feedback",      # 人工反馈
 ]
 
 
 # Step 状态
-Status = Literal[
+StepStatus = Literal[
     "planned",           # 计划中(未执行)
     "in_progress",       # 执行中
     "awaiting_approval", # 等待用户确认
@@ -68,6 +67,11 @@ class Trace:
     total_steps: int = 0
     total_tokens: int = 0
     total_cost: float = 0.0
+    total_duration_ms: int = 0  # 总耗时(毫秒)
+
+    # 进度追踪(head)
+    last_sequence: int = 0      # 最新 step 的 sequence
+    last_event_id: int = 0      # 最新事件 ID(用于 WS 续传)
 
     # 上下文
     uid: Optional[str] = None
@@ -105,6 +109,9 @@ class Trace:
             "total_steps": self.total_steps,
             "total_tokens": self.total_tokens,
             "total_cost": self.total_cost,
+            "total_duration_ms": self.total_duration_ms,
+            "last_sequence": self.last_sequence,
+            "last_event_id": self.last_event_id,
             "uid": self.uid,
             "context": self.context,
             "current_goal_id": self.current_goal_id,
@@ -119,11 +126,32 @@ class Step:
     执行步骤 - Trace 中的一个原子操作
 
     Step 之间通过 parent_id 形成树结构(单父节点)
+
+    ## 字段设计规则
+
+    **顶层字段**(Step 类属性):
+    - 所有(或大部分)step 都有的字段
+    - 需要筛选/排序/索引的字段(如 tokens, cost, duration_ms)
+    - 结构化、类型明确的字段
+
+    **data 字段**(Dict):
+    - step_type 特定的字段(不同类型有不同 schema)
+    - 详细的业务数据(如 messages, content, arguments, output)
+    - 可能很大的字段
+    - 半结构化、动态的字段
+
+    ## data 字段 schema(按 step_type)
+
+    - thought/response: model, messages, content, tool_calls
+    - action: tool_name, arguments
+    - result: tool_name, output, error
+    - memory_read: experiences_count, skills_count
+    - goal: 自定义(根据具体目标)
     """
     step_id: str
     trace_id: str
     step_type: StepType
-    status: Status
+    status: StepStatus
     sequence: int  # 在 Trace 中的顺序
 
     # 树结构(单父节点)
@@ -138,6 +166,10 @@ class Step:
     # 仅 evaluation 类型需要
     summary: Optional[str] = None
 
+    # UI 优化字段
+    has_children: bool = False      # 是否有子节点
+    children_count: int = 0         # 子节点数量
+
     # 执行指标
     duration_ms: Optional[int] = None
     tokens: Optional[int] = None
@@ -152,7 +184,7 @@ class Step:
         trace_id: str,
         step_type: StepType,
         sequence: int,
-        status: Status = "completed",
+        status: StepStatus = "completed",
         description: str = "",
         data: Dict[str, Any] = None,
         parent_id: Optional[str] = None,
@@ -177,9 +209,15 @@ class Step:
             cost=cost,
         )
 
-    def to_dict(self) -> Dict[str, Any]:
-        """转换为字典"""
-        return {
+    def to_dict(self, view: str = "full") -> Dict[str, Any]:
+        """
+        转换为字典
+
+        Args:
+            view: "compact" - 不返回大字段
+                  "full" - 返回完整数据
+        """
+        result = {
             "step_id": self.step_id,
             "trace_id": self.trace_id,
             "step_type": self.step_type,
@@ -187,14 +225,29 @@ class Step:
             "sequence": self.sequence,
             "parent_id": self.parent_id,
             "description": self.description,
-            "data": self.data,
             "summary": self.summary,
+            "has_children": self.has_children,
+            "children_count": self.children_count,
             "duration_ms": self.duration_ms,
             "tokens": self.tokens,
             "cost": self.cost,
             "created_at": self.created_at.isoformat() if self.created_at else None,
         }
 
+        # 处理 data 字段
+        if view == "compact":
+            # compact 模式:移除 data 中的大字段
+            data_copy = self.data.copy()
+            # 移除可能的大字段(如 output, content 等)
+            for key in ["output", "content", "full_output", "full_content"]:
+                data_copy.pop(key, None)
+            result["data"] = data_copy
+        else:
+            # full 模式:返回完整 data
+            result["data"] = self.data
+
+        return result
+
 
 # Step.data 结构说明
 #
@@ -233,13 +286,6 @@ class Step:
 #       "is_final": True,
 #   }
 #
-# feedback:
-#   {
-#       "target_step_id": "...",
-#       "feedback_type": "positive" | "negative" | "correction",
-#       "content": "..."
-#   }
-#
 # memory_read:
 #   {
 #       "skills": [...],

+ 49 - 1
agent/execution/protocols.py

@@ -4,7 +4,7 @@ Trace Storage Protocol - Trace 存储接口定义
 使用 Protocol 定义接口,允许不同的存储实现(内存、PostgreSQL、Neo4j 等)
 """
 
-from typing import Protocol, List, Optional, runtime_checkable
+from typing import Protocol, List, Optional, Dict, Any, runtime_checkable
 
 from agent.execution.models import Trace, Step
 
@@ -77,3 +77,51 @@ class TraceStore(Protocol):
     async def get_step_children(self, step_id: str) -> List[Step]:
         """获取 Step 的子节点"""
         ...
+
+    async def update_step(self, step_id: str, **updates) -> None:
+        """
+        更新 Step 字段(用于状态变更、错误记录等)
+
+        Args:
+            step_id: Step ID
+            **updates: 要更新的字段
+        """
+        ...
+
+    # ===== 事件流操作(用于 WebSocket 断线续传)=====
+
+    async def get_events(
+        self,
+        trace_id: str,
+        since_event_id: int = 0
+    ) -> List[Dict[str, Any]]:
+        """
+        获取事件流(用于 WS 断线续传)
+
+        Args:
+            trace_id: Trace ID
+            since_event_id: 从哪个事件 ID 开始(0 表示全部)
+
+        Returns:
+            事件列表(按 event_id 排序)
+        """
+        ...
+
+    async def append_event(
+        self,
+        trace_id: str,
+        event_type: str,
+        payload: Dict[str, Any]
+    ) -> int:
+        """
+        追加事件,返回 event_id
+
+        Args:
+            trace_id: Trace ID
+            event_type: 事件类型(step_added/step_updated/trace_completed)
+            payload: 事件数据
+
+        Returns:
+            event_id: 新事件的 ID
+        """
+        ...

+ 0 - 89
agent/execution/store.py

@@ -1,89 +0,0 @@
-"""
-Memory Trace Store - 内存存储实现
-
-用于测试和简单场景,数据不持久化
-"""
-
-from typing import Dict, List, Optional
-
-from agent.execution.models import Trace, Step
-
-
-class MemoryTraceStore:
-    """内存 Trace 存储"""
-
-    def __init__(self):
-        self._traces: Dict[str, Trace] = {}
-        self._steps: Dict[str, Step] = {}
-        self._trace_steps: Dict[str, List[str]] = {}  # trace_id -> [step_ids]
-
-    async def create_trace(self, trace: Trace) -> str:
-        self._traces[trace.trace_id] = trace
-        self._trace_steps[trace.trace_id] = []
-        return trace.trace_id
-
-    async def get_trace(self, trace_id: str) -> Optional[Trace]:
-        return self._traces.get(trace_id)
-
-    async def update_trace(self, trace_id: str, **updates) -> None:
-        trace = self._traces.get(trace_id)
-        if trace:
-            for key, value in updates.items():
-                if hasattr(trace, key):
-                    setattr(trace, key, value)
-
-    async def list_traces(
-        self,
-        mode: Optional[str] = None,
-        agent_type: Optional[str] = None,
-        uid: Optional[str] = None,
-        status: Optional[str] = None,
-        limit: int = 50
-    ) -> List[Trace]:
-        traces = list(self._traces.values())
-
-        # 过滤
-        if mode:
-            traces = [t for t in traces if t.mode == mode]
-        if agent_type:
-            traces = [t for t in traces if t.agent_type == agent_type]
-        if uid:
-            traces = [t for t in traces if t.uid == uid]
-        if status:
-            traces = [t for t in traces if t.status == status]
-
-        # 排序(最新的在前)
-        traces.sort(key=lambda t: t.created_at, reverse=True)
-
-        return traces[:limit]
-
-    async def add_step(self, step: Step) -> str:
-        self._steps[step.step_id] = step
-
-        # 添加到 trace 的 steps 列表
-        if step.trace_id in self._trace_steps:
-            self._trace_steps[step.trace_id].append(step.step_id)
-
-        # 更新 trace 的 total_steps
-        trace = self._traces.get(step.trace_id)
-        if trace:
-            trace.total_steps += 1
-
-        return step.step_id
-
-    async def get_step(self, step_id: str) -> Optional[Step]:
-        return self._steps.get(step_id)
-
-    async def get_trace_steps(self, trace_id: str) -> List[Step]:
-        step_ids = self._trace_steps.get(trace_id, [])
-        steps = [self._steps[sid] for sid in step_ids if sid in self._steps]
-        steps.sort(key=lambda s: s.sequence)
-        return steps
-
-    async def get_step_children(self, step_id: str) -> List[Step]:
-        children = []
-        for step in self._steps.values():
-            if step.parent_id == step_id:
-                children.append(step)
-        children.sort(key=lambda s: s.sequence)
-        return children

+ 223 - 12
agent/execution/tree_dump.py

@@ -70,6 +70,25 @@ class StepTreeDumper:
             lines.append(f"  total_cost: {trace.get('total_cost', 0.0):.4f}")
             lines.append("")
 
+        # 统计摘要
+        if steps:
+            lines.append("## Statistics")
+            stats = self._calculate_statistics(steps)
+            lines.append(f"  Total steps: {stats['total']}")
+            lines.append(f"  By type:")
+            for step_type, count in sorted(stats['by_type'].items()):
+                lines.append(f"    {step_type}: {count}")
+            lines.append(f"  By status:")
+            for status, count in sorted(stats['by_status'].items()):
+                lines.append(f"    {status}: {count}")
+            if stats['total_duration_ms'] > 0:
+                lines.append(f"  Total duration: {stats['total_duration_ms']}ms")
+            if stats['total_tokens'] > 0:
+                lines.append(f"  Total tokens: {stats['total_tokens']}")
+            if stats['total_cost'] > 0:
+                lines.append(f"  Total cost: ${stats['total_cost']:.4f}")
+            lines.append("")
+
         # Step 树
         if steps:
             lines.append("## Steps")
@@ -87,6 +106,36 @@ class StepTreeDumper:
 
         return content
 
+    def _calculate_statistics(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]:
+        """计算统计信息"""
+        stats = {
+            'total': len(steps),
+            'by_type': {},
+            'by_status': {},
+            'total_duration_ms': 0,
+            'total_tokens': 0,
+            'total_cost': 0.0,
+        }
+
+        for step in steps:
+            # 按类型统计
+            step_type = step.get('step_type', 'unknown')
+            stats['by_type'][step_type] = stats['by_type'].get(step_type, 0) + 1
+
+            # 按状态统计
+            status = step.get('status', 'unknown')
+            stats['by_status'][status] = stats['by_status'].get(status, 0) + 1
+
+            # 累计指标
+            if step.get('duration_ms'):
+                stats['total_duration_ms'] += step.get('duration_ms', 0)
+            if step.get('tokens'):
+                stats['total_tokens'] += step.get('tokens', 0)
+            if step.get('cost'):
+                stats['total_cost'] += step.get('cost', 0.0)
+
+        return stats
+
     def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
         """构建父子关系映射"""
         # parent_id -> [child_ids]
@@ -151,6 +200,7 @@ class StepTreeDumper:
             "planned": "○",
             "failed": "✗",
             "skipped": "⊘",
+            "awaiting_approval": "⏸",
         }
         icon = status_icons.get(status, "?")
 
@@ -165,6 +215,16 @@ class StepTreeDumper:
         step_id = step.get("step_id", "")[:8]  # 只显示前 8 位
         lines.append(f"{child_prefix}id: {step_id}...")
 
+        # 关键字段:sequence, status, parent_id
+        sequence = step.get("sequence")
+        if sequence is not None:
+            lines.append(f"{child_prefix}sequence: {sequence}")
+        lines.append(f"{child_prefix}status: {status}")
+
+        parent_id = step.get("parent_id")
+        if parent_id:
+            lines.append(f"{child_prefix}parent_id: {parent_id[:8]}...")
+
         # 执行指标
         if step.get("duration_ms") is not None:
             lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
@@ -181,11 +241,22 @@ class StepTreeDumper:
                 summary = summary[:100] + "..."
             lines.append(f"{child_prefix}summary: {summary}")
 
-        # data 内容(格式化输出)
+        # 错误信息(结构化显示)
+        error = step.get("error")
+        if error:
+            lines.append(f"{child_prefix}error:")
+            lines.append(f"{child_prefix}  code: {error.get('code', 'UNKNOWN')}")
+            error_msg = error.get('message', '')
+            if len(error_msg) > 200:
+                error_msg = error_msg[:200] + "..."
+            lines.append(f"{child_prefix}  message: {error_msg}")
+            lines.append(f"{child_prefix}  retryable: {error.get('retryable', True)}")
+
+        # data 内容(格式化输出,更激进的截断)
         data = step.get("data", {})
         if data:
             lines.append(f"{child_prefix}data:")
-            data_lines = self._format_data(data, child_prefix + "  ")
+            data_lines = self._format_data(data, child_prefix + "  ", max_value_len=150)
             lines.append(data_lines)
 
         # 时间
@@ -201,13 +272,18 @@ class StepTreeDumper:
         lines.append("")  # 空行分隔
         return "\n".join(lines)
 
-    def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 200) -> str:
-        """格式化 data 字典"""
+    def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 150) -> str:
+        """格式化 data 字典(更激进的截断策略)"""
         lines = []
 
         for key, value in data.items():
             # 格式化值
             if isinstance(value, str):
+                # 检测图片数据
+                if value.startswith("data:image") or (len(value) > 10000 and not "\n" in value[:100]):
+                    lines.append(f"{prefix}{key}: [IMAGE_DATA: {len(value)} chars, truncated]")
+                    continue
+
                 if len(value) > max_value_len:
                     value_str = value[:max_value_len] + f"... ({len(value)} chars)"
                 else:
@@ -215,7 +291,8 @@ class StepTreeDumper:
                 # 处理多行字符串
                 if "\n" in value_str:
                     first_line = value_str.split("\n")[0]
-                    value_str = first_line + f"... ({value_str.count(chr(10))+1} lines)"
+                    line_count = value.count("\n") + 1
+                    value_str = first_line + f"... ({line_count} lines)"
             elif isinstance(value, (dict, list)):
                 value_str = json.dumps(value, ensure_ascii=False, indent=2)
                 if len(value_str) > max_value_len:
@@ -268,6 +345,31 @@ class StepTreeDumper:
             lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
             lines.append("")
 
+        # 统计摘要
+        if steps:
+            lines.append("## Statistics")
+            lines.append("")
+            stats = self._calculate_statistics(steps)
+            lines.append(f"- **Total steps**: {stats['total']}")
+            lines.append("")
+            lines.append("**By type:**")
+            lines.append("")
+            for step_type, count in sorted(stats['by_type'].items()):
+                lines.append(f"- `{step_type}`: {count}")
+            lines.append("")
+            lines.append("**By status:**")
+            lines.append("")
+            for status, count in sorted(stats['by_status'].items()):
+                lines.append(f"- `{status}`: {count}")
+            lines.append("")
+            if stats['total_duration_ms'] > 0:
+                lines.append(f"- **Total duration**: {stats['total_duration_ms']}ms")
+            if stats['total_tokens'] > 0:
+                lines.append(f"- **Total tokens**: {stats['total_tokens']}")
+            if stats['total_cost'] > 0:
+                lines.append(f"- **Total cost**: ${stats['total_cost']:.4f}")
+            lines.append("")
+
         # Steps
         if steps:
             lines.append("## Steps")
@@ -325,6 +427,7 @@ class StepTreeDumper:
             "planned": "○",
             "failed": "✗",
             "skipped": "⊘",
+            "awaiting_approval": "⏸",
         }
         icon = status_icons.get(status, "?")
 
@@ -341,6 +444,17 @@ class StepTreeDumper:
         step_id = step.get("step_id", "")[:16]
         lines.append(f"- **id**: `{step_id}...`")
 
+        # 关键字段
+        sequence = step.get("sequence")
+        if sequence is not None:
+            lines.append(f"- **sequence**: {sequence}")
+        lines.append(f"- **status**: {status}")
+
+        parent_id = step.get("parent_id")
+        if parent_id:
+            lines.append(f"- **parent_id**: `{parent_id[:16]}...`")
+
+        # 执行指标
         if step.get("duration_ms") is not None:
             lines.append(f"- **duration**: {step.get('duration_ms')}ms")
         if step.get("tokens") is not None:
@@ -358,17 +472,39 @@ class StepTreeDumper:
 
         lines.append("")
 
+        # 错误信息
+        error = step.get("error")
+        if error:
+            lines.append("<details>")
+            lines.append("<summary><b>❌ Error</b></summary>")
+            lines.append("")
+            lines.append(f"- **code**: `{error.get('code', 'UNKNOWN')}`")
+            lines.append(f"- **retryable**: {error.get('retryable', True)}")
+            lines.append(f"- **message**:")
+            lines.append("```")
+            error_msg = error.get('message', '')
+            if len(error_msg) > 500:
+                error_msg = error_msg[:500] + "..."
+            lines.append(error_msg)
+            lines.append("```")
+            lines.append("")
+            lines.append("</details>")
+            lines.append("")
+
         # Summary
         if step.get("summary"):
             lines.append("<details>")
             lines.append("<summary><b>📝 Summary</b></summary>")
             lines.append("")
-            lines.append(f"```\n{step.get('summary')}\n```")
+            summary = step.get('summary', '')
+            if len(summary) > 1000:
+                summary = summary[:1000] + "..."
+            lines.append(f"```\n{summary}\n```")
             lines.append("")
             lines.append("</details>")
             lines.append("")
 
-        # Data(完整输出,不截断)
+        # Data(更激进的截断)
         data = step.get("data", {})
         if data:
             lines.append(self._render_markdown_data(data))
@@ -397,7 +533,7 @@ class StepTreeDumper:
         return "\n".join(lines)
 
     def _render_data_item(self, key: str, value: Any) -> str:
-        """渲染单个 data 项"""
+        """渲染单个 data 项(更激进的截断)"""
         # 确定图标
         icon_map = {
             "messages": "📨",
@@ -407,6 +543,8 @@ class StepTreeDumper:
             "model": "🎯",
             "error": "❌",
             "content": "💬",
+            "output": "📤",
+            "arguments": "⚙️",
         }
         icon = icon_map.get(key, "📄")
 
@@ -414,6 +552,64 @@ class StepTreeDumper:
         if value is None:
             return ""
 
+        # 特殊处理 messages 中的图片引用
+        if key == 'messages' and isinstance(value, list):
+            # 统计图片数量
+            image_count = 0
+            for msg in value:
+                if isinstance(msg, dict):
+                    content = msg.get('content', [])
+                    if isinstance(content, list):
+                        for item in content:
+                            if isinstance(item, dict) and item.get('type') == 'image_url':
+                                url = item.get('image_url', {}).get('url', '')
+                                if url.startswith('blob://'):
+                                    image_count += 1
+
+            if image_count > 0:
+                # 显示图片摘要
+                lines = []
+                lines.append("<details>")
+                lines.append(f"<summary><b>📨 Messages (含 {image_count} 张图片)</b></summary>")
+                lines.append("")
+                lines.append("```json")
+
+                # 渲染消息,图片显示为简化格式
+                simplified_messages = []
+                for msg in value:
+                    if isinstance(msg, dict):
+                        simplified_msg = msg.copy()
+                        content = msg.get('content', [])
+                        if isinstance(content, list):
+                            new_content = []
+                            for item in content:
+                                if isinstance(item, dict) and item.get('type') == 'image_url':
+                                    url = item.get('image_url', {}).get('url', '')
+                                    if url.startswith('blob://'):
+                                        blob_ref = url.replace('blob://', '')
+                                        size = item.get('image_url', {}).get('size', 0)
+                                        size_kb = size / 1024 if size > 0 else 0
+                                        new_content.append({
+                                            'type': 'image_url',
+                                            'image_url': {
+                                                'url': f'[IMAGE: {blob_ref[:8]}... ({size_kb:.1f}KB)]'
+                                            }
+                                        })
+                                    else:
+                                        new_content.append(item)
+                                else:
+                                    new_content.append(item)
+                            simplified_msg['content'] = new_content
+                        simplified_messages.append(simplified_msg)
+                    else:
+                        simplified_messages.append(msg)
+
+                lines.append(json.dumps(simplified_messages, ensure_ascii=False, indent=2))
+                lines.append("```")
+                lines.append("")
+                lines.append("</details>")
+                return "\n".join(lines)
+
         # 判断是否需要折叠(长内容或复杂结构)
         needs_collapse = False
         if isinstance(value, str):
@@ -428,13 +624,18 @@ class StepTreeDumper:
             lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
             lines.append("")
 
-            # 格式化内容
+            # 格式化内容(更激进的截断)
             if isinstance(value, str):
                 # 检查是否包含图片 base64
-                if "data:image" in value or (isinstance(value, str) and len(value) > 10000):
+                if "data:image" in value or (isinstance(value, str) and len(value) > 10000 and not "\n" in value[:100]):
                     lines.append("```")
                     lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
-                    lines.append(value[:200] + "...")
+                    lines.append("```")
+                elif len(value) > 2000:
+                    # 超长文本,只显示前500字符
+                    lines.append("```")
+                    lines.append(value[:500])
+                    lines.append(f"... (truncated, total {len(value)} chars)")
                     lines.append("```")
                 else:
                     lines.append("```")
@@ -443,8 +644,14 @@ class StepTreeDumper:
             elif isinstance(value, (dict, list)):
                 # 递归截断图片 base64
                 truncated_value = self._truncate_image_data(value)
+                json_str = json.dumps(truncated_value, ensure_ascii=False, indent=2)
+
+                # 如果 JSON 太长,也截断
+                if len(json_str) > 3000:
+                    json_str = json_str[:3000] + "\n... (truncated)"
+
                 lines.append("```json")
-                lines.append(json.dumps(truncated_value, ensure_ascii=False, indent=2))
+                lines.append(json_str)
                 lines.append("```")
 
             lines.append("")
@@ -470,6 +677,10 @@ class StepTreeDumper:
                         result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
                     else:
                         result[key] = value[:max_length] + f"... ({len(value)} chars)"
+                # 检测 blob 引用
+                elif isinstance(value, str) and value.startswith("blob://"):
+                    blob_ref = value.replace("blob://", "")
+                    result[key] = f"<BLOB_REF: {blob_ref[:8]}...>"
                 else:
                     result[key] = self._truncate_image_data(value, max_length)
             return result

+ 60 - 13
agent/execution/websocket.py

@@ -1,11 +1,12 @@
 """
 Step 树 WebSocket 推送
 
-实时推送进行中 Trace 的 Step 更新
+实时推送进行中 Trace 的 Step 更新,支持断线续传
 """
 
-from typing import Dict, Set
-from fastapi import APIRouter, WebSocket, WebSocketDisconnect
+from typing import Dict, Set, Any
+from datetime import datetime
+from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
 
 from agent.execution.protocols import TraceStore
 
@@ -37,12 +38,19 @@ def get_trace_store() -> TraceStore:
 
 
 @router.websocket("/{trace_id}/watch")
-async def watch_trace(websocket: WebSocket, trace_id: str):
+async def watch_trace(
+    websocket: WebSocket,
+    trace_id: str,
+    since_event_id: int = Query(0, description="从哪个事件 ID 开始(0=补发所有历史)")
+):
     """
-    监听 Trace 的 Step 更新
+    监听 Trace 的 Step 更新,支持断线续传
 
     Args:
         trace_id: Trace ID
+        since_event_id: 从哪个事件 ID 开始
+            - 0: 补发所有历史事件(初次连接)
+            - >0: 补发指定 ID 之后的事件(断线重连)
     """
     await websocket.accept()
 
@@ -63,12 +71,26 @@ async def watch_trace(websocket: WebSocket, trace_id: str):
     _active_connections[trace_id].add(websocket)
 
     try:
-        # 发送连接成功消息
+        # 发送连接成功消息 + 当前 event_id
         await websocket.send_json({
             "event": "connected",
-            "trace_id": trace_id
+            "trace_id": trace_id,
+            "current_event_id": trace.last_event_id
         })
 
+        # 补发历史事件(since_event_id=0 表示补发所有历史)
+        if since_event_id >= 0:
+            missed_events = await store.get_events(trace_id, since_event_id)
+            # 限制补发数量(最多 100 条)
+            if len(missed_events) > 100:
+                await websocket.send_json({
+                    "event": "error",
+                    "message": f"Too many missed events ({len(missed_events)}), please reload full tree via REST API"
+                })
+            else:
+                for evt in missed_events:
+                    await websocket.send_json(evt)
+
         # 保持连接(等待客户端断开或接收消息)
         while True:
             try:
@@ -93,18 +115,26 @@ async def watch_trace(websocket: WebSocket, trace_id: str):
 
 async def broadcast_step_added(trace_id: str, step_dict: Dict):
     """
-    广播 Step 添加事件
+    广播 Step 添加事件(自动分配 event_id)
 
     Args:
         trace_id: Trace ID
-        step_dict: Step 字典(from step.to_dict())
+        step_dict: Step 字典(from step.to_dict(view="compact"))
     """
     if trace_id not in _active_connections:
         return
 
+    # 从 store 获取最新 event_id(已由 add_step 自动追加)
+    store = get_trace_store()
+    trace = await store.get_trace(trace_id)
+    if not trace:
+        return
+
     message = {
         "event": "step_added",
-        "step": step_dict
+        "event_id": trace.last_event_id,
+        "ts": datetime.now().isoformat(),
+        "step": step_dict  # compact 视图
     }
 
     # 发送给所有监听该 Trace 的客户端
@@ -122,20 +152,30 @@ async def broadcast_step_added(trace_id: str, step_dict: Dict):
 
 async def broadcast_step_updated(trace_id: str, step_id: str, updates: Dict):
     """
-    广播 Step 更新事件
+    广播 Step 更新事件(patch 语义)
 
     Args:
         trace_id: Trace ID
         step_id: Step ID
-        updates: 更新字段
+        updates: 更新字段(patch 格式)
     """
     if trace_id not in _active_connections:
         return
 
+    store = get_trace_store()
+
+    # 追加事件到 store
+    event_id = await store.append_event(trace_id, "step_updated", {
+        "step_id": step_id,
+        "updates": updates
+    })
+
     message = {
         "event": "step_updated",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
         "step_id": step_id,
-        "updates": updates
+        "patch": updates  # JSON Patch 风格:{"status": "completed", "duration_ms": 123}
     }
 
     disconnected = []
@@ -160,8 +200,15 @@ async def broadcast_trace_completed(trace_id: str, total_steps: int):
     if trace_id not in _active_connections:
         return
 
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "trace_completed", {
+        "total_steps": total_steps
+    })
+
     message = {
         "event": "trace_completed",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
         "trace_id": trace_id,
         "total_steps": total_steps
     }

+ 0 - 2
agent/memory/stores.py

@@ -2,8 +2,6 @@
 Memory Implementation - 内存存储实现
 
 用于测试和简单场景,数据不持久化
-
-MemoryTraceStore 已移动到 agent.execution.store
 """
 
 from typing import Dict, List, Optional, Any

+ 5 - 5
api_server.py

@@ -9,9 +9,9 @@ from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 import uvicorn
 
-from agent.trace import MemoryTraceStore
-from agent.trace.api import router as api_router, set_trace_store as set_api_trace_store
-from agent.trace.websocket import router as ws_router, set_trace_store as set_ws_trace_store
+from agent.execution import FileSystemTraceStore
+from agent.execution.api import router as api_router, set_trace_store as set_api_trace_store
+from agent.execution.websocket import router as ws_router, set_trace_store as set_ws_trace_store
 
 
 # ===== 日志配置 =====
@@ -43,8 +43,8 @@ app.add_middleware(
 
 # ===== 初始化存储 =====
 
-# 使用内存存储(后续可替换为 PostgreSQL
-trace_store = MemoryTraceStore()
+# 使用文件系统存储(支持跨进程和持久化
+trace_store = FileSystemTraceStore(base_path=".trace")
 
 # 注入到 step_tree 模块
 set_api_trace_store(trace_store)

+ 2 - 1
docs/README.md

@@ -464,6 +464,7 @@ class SkillLoader(Protocol):
 
 **实现策略**:
 - Trace/Step: 文件系统(JSON)
+  - `FileSystemTraceStore` - 文件持久化(支持跨进程)
 - Experience: PostgreSQL + pgvector
 - Skill: 文件系统(Markdown)
 
@@ -482,7 +483,7 @@ agent/
 ├── execution/             # 执行追踪
 │   ├── models.py          # Trace, Step
 │   ├── protocols.py       # TraceStore
-│   ├── store.py           # MemoryTraceStore
+│   ├── fs_store.py        # FileSystemTraceStore
 │   ├── tree_dump.py       # 可视化
 │   ├── api.py             # RESTful API
 │   └── websocket.py       # WebSocket

+ 118 - 0
docs/decisions.md

@@ -518,3 +518,121 @@ Step 工具等核心功能如何让 Agent 知道?
 3. **可扩展性**:通过 Protocol 定义接口,便于后期切换实现
 4. **安全性**:敏感数据占位符、域名匹配等机制保护隐私
 5. **可观测性**:内建统计、完整追踪,便于监控和调试
+
+---
+
+## 10. 删除未使用的结构化错误功能
+
+**日期**: 2026-02-03
+
+### 问题
+在 execution trace v2.0 开发中引入了 `ErrorCode`、`StepError` 和 feedback 机制,但代码审查发现这些功能完全未被使用。
+
+### 决策
+**选择:删除未使用的代码**
+
+**理由**:
+1. **YAGNI 原则**:不应该维护未使用的功能(You Aren't Gonna Need It)
+2. **减少复杂度**:
+   - ErrorCode 枚举难以穷举,且 Python 无法强制约束
+   - StepError 与 ToolResult.error 存在设计重复
+   - feedback 机制缺少使用场景和配套接口
+3. **可恢复性**:需要时可以从 git 历史恢复
+4. **向后兼容**:这些功能从未被使用,删除不影响现有代码
+
+**影响**:
+- ✅ 代码更简洁
+- ✅ 减少维护负担
+- ✅ 不影响现有功能
+- ⚠️ 未来需要结构化错误时需要重新设计
+
+---
+
+---
+
+## 11. Step 关联统一使用 parent_id
+
+**日期**: 2026-02-03
+
+### 问题
+execution trace v2.0 设计中引入了多个跨节点关联字段(`tool_call_id`、`paired_action_id`、`span_id`),但实际分析后发现这些字段存在冗余。
+
+### 现状分析
+
+**字段使用情况**:
+- `tool_call_id` - Step 对象中未使用,只在 messages 中使用
+- `paired_action_id` - 完全未使用
+- `span_id` - 完全未使用
+
+**关联需求**:
+1. **Action/Result 配对** - 通过 parent_id 已经建立(result.parent_id = action.step_id)
+2. **与 messages 对应** - messages 中包含完整对话历史(含 tool_call_id)
+3. **重试追踪** - 同一个 action 下的多个 result,通过 parent_id 即可
+
+### 设计决策
+
+**保留**:
+- ✅ `parent_id` - 唯一的树结构关联字段
+
+**删除**:
+- ❌ `tool_call_id` - messages 中已包含,Step 不需要重复
+- ❌ `paired_action_id` - 与 parent_id 冗余
+- ❌ `span_id` - 分布式追踪功能,当前用不到
+
+---
+
+## 12. 删除 Blob 存储系统
+
+**日期**: 2026-02-03
+
+### 问题
+execution trace v2.0 引入了 Blob 存储系统用于处理大输出和图片,但实际分析后发现该系统过度设计且与 Agent 现有架构冗余。
+
+### 架构分析
+
+**Agent 的文件处理方式**:
+1. **用户输入**:提供文件路径(不是 base64)
+2. **工具处理**:内置工具直接读写文件系统
+3. **LLM 调用**:Runner 在调用 LLM 时才将文件路径转换为 base64
+4. **Trace 存储**:Step 中存储的是 messages(已包含 base64)
+
+**Blob 系统的问题**:
+1. **冗余提取**:从 messages 中提取 base64 再存储,而 messages 已经在 Step.data 中
+2. **功能重叠**:Agent 内置工具已经提供文件读写能力
+3. **过度设计**:引入 content-addressed storage、deduplication 等复杂功能,但 Agent 场景不需要
+4. **未被使用**:output_preview/blob_ref 字段从未在实际代码中使用
+
+### 设计决策
+
+**删除内容**:
+- ❌ `agent/execution/blob_store.py` 整个文件
+- ❌ `BlobStore` 协议及所有实现
+- ❌ `extract_images_from_messages()` 方法
+- ❌ `restore_images_in_messages()` 方法
+- ❌ `store_large_output()` 方法
+- ❌ Step 字段:`output_preview`、`blob_ref`
+
+**保留方案**:
+- ✅ Step 中的 `data` 字段直接存储 messages(包含 base64)
+- ✅ Agent 内置工具处理文件操作
+- ✅ 用户通过文件路径引用文件(不是 base64)
+
+### 理由
+
+1. **YAGNI 原则**:
+   - 功能从未被使用
+   - 未来需要时可以重新设计
+
+2. **架构更简洁**:
+   - 不需要额外的 blob 存储层
+   - 文件处理统一走工具系统
+
+3. **符合 Agent 场景**:
+   - Agent 运行在本地,直接访问文件系统
+   - 不需要像云服务那样做 blob 存储和 deduplication
+
+4. **数据完整性**:
+   - messages 中的 base64 已经足够
+   - 不需要拆分成 preview + ref
+
+---

+ 1 - 1
docs/project-structure.md

@@ -22,7 +22,7 @@ agent/
 ├── execution/                  # 执行追踪
 │   ├── models.py               # Trace, Step
 │   ├── protocols.py            # TraceStore
-│   ├── store.py                # MemoryTraceStore
+│   ├── fs_store.py             # FileSystemTraceStore
 │   └── tree_dump.py            # 可视化
 ├── memory/                     # 记忆系统
 │   ├── models.py               # Experience, Skill

+ 26 - 1
docs/step-tree.md

@@ -74,6 +74,10 @@ class Step:
     # 仅 evaluation 类型需要
     summary: Optional[str] = None
 
+    # UI 优化字段
+    has_children: bool = False            # 是否有子节点
+    children_count: int = 0               # 子节点数量
+
     # 执行指标
     duration_ms: Optional[int] = None
     cost: Optional[float] = None
@@ -87,6 +91,27 @@ class Step:
 - `parent_id` 是单个值(树结构),不是列表(DAG)
 - `summary` 仅在 `evaluation` 类型节点填充,不是每个节点都需要
 - `planned` 状态的 step 相当于 TODO item
+- `has_children` 和 `children_count` 用于前端 UI 优化(判断可展开、显示统计)
+
+**字段设计规则**:
+
+**顶层字段**(Step 类属性):
+- 所有(或大部分)step 都有的字段
+- 需要筛选/排序/索引的字段(如 tokens, cost, duration_ms)
+- 结构化、类型明确的字段
+
+**data 字段**(Dict):
+- step_type 特定的字段(不同类型有不同 schema)
+- 详细的业务数据(如 messages, content, arguments, output)
+- 可能很大的字段
+- 半结构化、动态的字段
+
+**data 字段 schema(按 step_type)**:
+- `thought` / `response`: model, messages, content, tool_calls
+- `action`: tool_name, arguments
+- `result`: tool_name, output, error
+- `memory_read`: experiences_count, skills_count
+- `goal`: 自定义(根据具体目标)
 
 ---
 
@@ -498,7 +523,7 @@ code .trace/tree.md
 - Step 模型:`agent/execution/models.py:Step`(已实现)
 - Trace 模型:`agent/execution/models.py:Trace`(已实现)
 - 存储接口:`agent/execution/protocols.py:TraceStore`(已实现)
-- 内存存储:`agent/execution/store.py:MemoryTraceStore`(已实现)
+- 文件存储:`agent/execution/fs_store.py:FileSystemTraceStore`(已实现)
 - Debug 工具:`agent/execution/tree_dump.py`(已实现)
 - **Core Skill**:`agent/skills/core.md`(已实现)
 - step 工具:`agent/tools/builtin/step.py`(待实现)

+ 8 - 8
docs/trace-api.md

@@ -102,14 +102,14 @@ class MyCustomStore:
     async def get_step_children(self, step_id: str) -> List[Step]: ...
 ```
 
-### MemoryTraceStore
+### FileSystemTraceStore
 
-内存存储实现(用于开发和测试
+文件系统存储实现(支持跨进程和持久化
 
 ```python
-from agent.trace import MemoryTraceStore
+from agent.execution import FileSystemTraceStore
 
-store = MemoryTraceStore()
+store = FileSystemTraceStore(base_path=".trace")
 
 # 使用方法
 trace_id = await store.create_trace(trace)
@@ -244,10 +244,10 @@ ws.onmessage = (e) => {
 
 ```python
 from agent import AgentRunner
-from agent.trace import MemoryTraceStore
+from agent.execution import FileSystemTraceStore
 
 # 初始化
-store = MemoryTraceStore()
+store = FileSystemTraceStore(base_path=".trace")
 runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
 
 # 执行 Agent(自动记录 Trace)
@@ -341,8 +341,8 @@ class PostgreSQLTraceStore:
 
 ```python
 # ✅ 推荐导入
-from agent.trace import Trace, Step, StepType, Status
-from agent.trace import TraceStore, MemoryTraceStore
+from agent.execution import Trace, Step, StepType, StepStatus
+from agent.execution import TraceStore, FileSystemTraceStore
 
 # ✅ 顶层导入(等价)
 from agent import Trace, Step, TraceStore

+ 20 - 6
examples/feature_extract/run.py

@@ -17,7 +17,11 @@ load_dotenv()
 
 from agent.llm.prompts import SimplePrompt
 from agent.core.runner import AgentRunner
-from agent.execution import MemoryTraceStore, Trace, Step
+from agent.execution import (
+    FileSystemTraceStore,
+    Trace,
+    Step,
+)
 from agent.llm import create_openrouter_llm_call
 
 
@@ -80,7 +84,7 @@ async def main():
     print(f"   - 模型: Claude Sonnet 4.5 (via OpenRouter)")
 
     runner = AgentRunner(
-        trace_store=MemoryTraceStore(),
+        trace_store=FileSystemTraceStore(base_path=".trace"),
         llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
         skills_dir=skills_dir,  # 可选:加载额外的用户自定义 skills(内置 skills 会自动加载)
         debug=True  # 启用 debug,输出到 .trace/
@@ -93,6 +97,7 @@ async def main():
     print()
 
     final_response = ""
+    current_trace_id = None  # 保存 trace_id 用于后续测试
 
     async for item in runner.run(
         task="[图片和特征描述已包含在 messages 中]",  # 占位符
@@ -105,6 +110,7 @@ async def main():
     ):
         # 处理 Trace 对象(整体状态变化)
         if isinstance(item, Trace):
+            current_trace_id = item.trace_id  # 保存 trace_id
             if item.status == "in_progress":
                 print(f"[Trace] 开始: {item.trace_id[:8]}")
             elif item.status == "completed":
@@ -156,10 +162,18 @@ async def main():
     print(f"✓ 结果已保存到: {output_file}")
     print()
 
-    # 8. 提示查看 debug 文件
-    print("Debug 文件:")
-    print(f"  - 完整可折叠: {Path.cwd() / '.trace' / 'tree.md'}")
-    print(f"  - 简洁文本: {Path.cwd() / '.trace' / 'tree.txt'}")
+    # 提示使用 API 可视化
+    print("=" * 60)
+    print("可视化 Step Tree:")
+    print("=" * 60)
+    print("1. 启动 API Server:")
+    print("   python3 api_server.py")
+    print()
+    print("2. 浏览器访问:")
+    print("   http://localhost:8000/api/traces")
+    print()
+    print(f"3. Trace ID: {current_trace_id}")
+    print("=" * 60)
 
 
 if __name__ == "__main__":

+ 659 - 0
frontend/API.md

@@ -0,0 +1,659 @@
+# Agent Execution API - 前端对接文档
+
+> 版本:v2.0
+> 更新日期:2026-02-03
+
+---
+
+## 📋 概览
+
+本 API 提供 Agent 执行过程的实时可视化能力,包括:
+- **REST API** - 查询历史数据、获取完整 Step 树
+- **WebSocket** - 实时推送 Step 更新(支持断线续传)
+
+**核心概念**:
+- **Trace** - 一次完整的任务执行(如一次 Agent 运行)
+- **Step** - 执行过程中的一个原子操作,形成树结构
+- **Event** - Step 的变更事件(用于 WebSocket 推送和断线续传)
+
+可以运行 python3 examples/feature_extract/run.py 来生成数据。
+---
+
+## 🌐 REST API
+
+### 基础信息
+
+- **Base URL**: `http://localhost:8000`
+- **Content-Type**: `application/json`
+
+---
+
+### 1. 列出 Traces
+
+获取 Trace 列表(支持过滤)
+
+```http
+GET /api/traces?status=running&limit=20
+```
+
+**查询参数**:
+| 参数 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| `status` | string | 否 | 过滤状态:`running` / `completed` / `failed` |
+| `mode` | string | 否 | 过滤模式:`call` / `agent` |
+| `limit` | int | 否 | 返回数量(默认 50,最大 100)|
+
+**响应示例**:
+```json
+{
+  "traces": [
+    {
+      "trace_id": "abc123",
+      "mode": "agent",
+      "task": "分析代码库结构",
+      "status": "running",
+      "total_steps": 15,
+      "total_tokens": 5000,
+      "total_cost": 0.05,
+      "created_at": "2026-02-03T15:30:00"
+    }
+  ],
+  "total": 1
+}
+```
+
+---
+
+### 2. 获取 Trace 元数据
+
+```http
+GET /api/traces/{trace_id}
+```
+
+**响应示例**:
+```json
+{
+  "trace_id": "abc123",
+  "mode": "agent",
+  "task": "分析代码库结构",
+  "status": "running",
+  "total_steps": 15,
+  "total_tokens": 5000,
+  "total_cost": 0.05,
+  "total_duration_ms": 12345,
+  "last_sequence": 15,
+  "last_event_id": 15,
+  "created_at": "2026-02-03T15:30:00",
+  "completed_at": null
+}
+```
+
+---
+
+### 3. 获取完整 Step 树 ⭐ 重要
+
+获取 Trace 的完整 Step 树(适合小型 Trace,<100 个 Step)
+
+```http
+GET /api/traces/{trace_id}/tree?view=compact&max_depth=10
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `view` | string | `compact` | `compact` - 精简视图 / `full` - 完整视图 |
+| `max_depth` | int | 无限 | 最大树深度 |
+
+**响应示例**:
+```json
+{
+  "trace_id": "abc123",
+  "root_steps": [
+    {
+      "step_id": "step-001",
+      "step_type": "thought",
+      "status": "completed",
+      "sequence": 1,
+      "parent_id": null,
+      "description": "分析项目结构...",
+      "has_children": true,
+      "children_count": 2,
+      "duration_ms": 1234,
+      "tokens": 500,
+      "cost": 0.005,
+      "created_at": "2026-02-03T15:30:01",
+      "data": {
+        "content": "让我先看看项目的目录结构...",
+        "model": "claude-sonnet-4.5"
+      },
+      "children": [
+        {
+          "step_id": "step-002",
+          "step_type": "action",
+          "status": "completed",
+          "parent_id": "step-001",
+          "description": "glob_files(**/*.py)",
+          "data": {
+            "tool_name": "glob_files",
+            "arguments": {"pattern": "**/*.py"}
+          },
+          "children": [
+            {
+              "step_id": "step-003",
+              "step_type": "result",
+              "status": "completed",
+              "parent_id": "step-002",
+              "data": {
+                "tool_name": "glob_files",
+                "output": ["src/main.py", "src/utils.py"]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
+```
+
+**注意**:
+- `children` 字段包含嵌套的子节点(递归结构)
+- `compact` 视图:`data` 中的大字段(如 `messages`)会被省略
+- `full` 视图:返回所有字段(数据量可能很大)
+
+---
+
+### 4. 懒加载单个节点
+
+适用于大型 Trace(>100 Step),按需加载子树
+
+```http
+GET /api/traces/{trace_id}/node/{step_id}?expand=true&max_depth=2
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `expand` | bool | `false` | 是否展开子节点 |
+| `max_depth` | int | 1 | 展开深度 |
+| `view` | string | `compact` | 视图类型 |
+
+**响应**:与 `/tree` 格式相同,但只返回指定节点及其子树。
+
+---
+
+## ⚡ WebSocket API
+
+### 连接
+
+```javascript
+const ws = new WebSocket(
+  'ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0'
+)
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `since_event_id` | int | `0` | 从哪个事件 ID 开始<br>`0` = 补发所有历史<br>`>0` = 只补发指定 ID 之后的 |
+
+---
+
+### 事件类型
+
+#### 1. connected(连接成功)
+
+```json
+{
+  "event": "connected",
+  "trace_id": "abc123",
+  "current_event_id": 15
+}
+```
+
+**说明**:连接建立后的第一条消息,包含当前最新的 event_id
+
+**前端处理**:
+```javascript
+if (data.event === 'connected') {
+  // 保存 event_id 用于断线重连
+  localStorage.setItem('last_event_id', data.current_event_id)
+}
+```
+
+---
+
+#### 2. step_added(新增 Step)⭐ 最常用
+
+```json
+{
+  "event": "step_added",
+  "event_id": 16,
+  "ts": "2026-02-03T15:30:10.123456",
+  "step": {
+    "step_id": "step-016",
+    "step_type": "action",
+    "status": "completed",
+    "sequence": 16,
+    "parent_id": "step-001",
+    "description": "read_file(config.yaml)",
+    "has_children": false,
+    "children_count": 0,
+    "duration_ms": 50,
+    "data": {
+      "tool_name": "read_file",
+      "arguments": {"file_path": "config.yaml"}
+    }
+  }
+}
+```
+
+**前端处理**:
+```javascript
+if (data.event === 'step_added') {
+  // 添加到树结构
+  addStepToTree(data.step)
+
+  // 更新 event_id
+  localStorage.setItem('last_event_id', data.event_id)
+}
+```
+
+---
+
+#### 3. step_updated(Step 更新)
+
+```json
+{
+  "event": "step_updated",
+  "event_id": 17,
+  "ts": "2026-02-03T15:30:15.123456",
+  "step_id": "step-016",
+  "patch": {
+    "status": "completed",
+    "duration_ms": 1234
+  }
+}
+```
+
+**说明**:`patch` 是增量更新(只包含变化的字段)
+
+**前端处理**:
+```javascript
+if (data.event === 'step_updated') {
+  const step = findStepById(data.step_id)
+  Object.assign(step, data.patch)
+  updateUI()
+}
+```
+
+---
+
+#### 4. trace_completed(任务完成)
+
+```json
+{
+  "event": "trace_completed",
+  "event_id": 18,
+  "ts": "2026-02-03T15:35:00.123456",
+  "trace_id": "abc123",
+  "total_steps": 18
+}
+```
+
+**前端处理**:
+```javascript
+if (data.event === 'trace_completed') {
+  console.log('Task completed!')
+  ws.close()
+}
+```
+
+---
+
+#### 5. error(错误)
+
+```json
+{
+  "event": "error",
+  "message": "Too many missed events (150), please reload full tree via REST API"
+}
+```
+
+**说明**:
+- 如果断线期间产生超过 100 条事件,会收到此错误
+- 此时应该用 REST API 重新加载完整树
+
+---
+
+### 断线续传
+
+**场景**:网络断开后重新连接,不丢失中间的更新
+
+**实现方式**:
+
+```javascript
+let lastEventId = 0
+
+// 初次连接
+const ws = new WebSocket(
+  `ws://localhost:8000/api/traces/abc123/watch?since_event_id=0`
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+
+  // 保存最新 event_id
+  if (data.event_id) {
+    lastEventId = data.event_id
+    localStorage.setItem('trace_abc123_event_id', lastEventId)
+  }
+}
+
+ws.onclose = () => {
+  // 3 秒后重连
+  setTimeout(() => {
+    // 从上次的 event_id 继续
+    const ws2 = new WebSocket(
+      `ws://localhost:8000/api/traces/abc123/watch?since_event_id=${lastEventId}`
+    )
+    // 服务器会补发 lastEventId 之后的所有事件
+  }, 3000)
+}
+```
+
+**注意**:
+- 每条消息都有 `event_id` 和 `ts` 字段
+- 重连时传入 `since_event_id`,服务器自动补发缺失的事件(最多 100 条)
+- 超过 100 条会返回错误,需要用 REST API 重新加载
+
+---
+
+### 心跳检测
+
+保持连接活跃,检测僵尸连接
+
+```javascript
+// 每 30 秒发送心跳
+const heartbeat = setInterval(() => {
+  if (ws.readyState === WebSocket.OPEN) {
+    ws.send('ping')
+  }
+}, 30000)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event === 'pong') {
+    console.log('Connection alive')
+  }
+}
+```
+
+---
+
+## 📊 数据模型
+
+### Trace
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `trace_id` | string | 唯一 ID |
+| `mode` | string | `call` - 单次调用 / `agent` - Agent 模式 |
+| `task` | string | 任务描述(Agent 模式)|
+| `status` | string | `running` / `completed` / `failed` |
+| `total_steps` | int | Step 总数 |
+| `total_tokens` | int | Token 总消耗 |
+| `total_cost` | float | 成本总和 |
+| `total_duration_ms` | int | 总耗时(毫秒)|
+| `last_sequence` | int | 最新 Step 的 sequence |
+| `last_event_id` | int | 最新事件 ID |
+| `created_at` | string | 创建时间(ISO 8601)|
+| `completed_at` | string \| null | 完成时间 |
+
+---
+
+### Step
+
+#### 顶层字段(所有 Step 共有)
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `step_id` | string | 唯一 ID |
+| `trace_id` | string | 所属 Trace ID |
+| `step_type` | string | 步骤类型(见下表)|
+| `status` | string | 状态(见下表)|
+| `sequence` | int | 在 Trace 中的顺序(递增)|
+| `parent_id` | string \| null | 父节点 ID |
+| `description` | string | 简短描述 |
+| `summary` | string \| null | 总结(仅 `evaluation` 类型)|
+| `has_children` | bool | 是否有子节点 |
+| `children_count` | int | 子节点数量 |
+| `duration_ms` | int \| null | 耗时(毫秒)|
+| `tokens` | int \| null | Token 消耗 |
+| `cost` | float \| null | 成本 |
+| `created_at` | string | 创建时间 |
+| `data` | object | 类型相关的详细数据 |
+
+#### step_type(步骤类型)
+
+| 类型 | 说明 | 来源 |
+|------|------|------|
+| `goal` | 目标/计划 | LLM |
+| `thought` | 思考/分析 | LLM |
+| `evaluation` | 评估总结 | LLM |
+| `response` | 最终回复 | LLM |
+| `action` | 工具调用 | System |
+| `result` | 工具结果 | System |
+| `memory_read` | 读取记忆 | System |
+| `memory_write` | 写入记忆 | System |
+
+#### status(步骤状态)
+
+| 状态 | 说明 |
+|------|------|
+| `planned` | 计划中(未执行)|
+| `in_progress` | 执行中 |
+| `awaiting_approval` | 等待用户确认 |
+| `completed` | 已完成 |
+| `failed` | 失败 |
+| `skipped` | 跳过 |
+
+#### data 字段(按 step_type)
+
+**thought / response**:
+```json
+{
+  "model": "claude-sonnet-4.5",
+  "content": "让我先分析...",
+  "messages": [...],  // full 视图才有
+  "tool_calls": [...]  // 如果有工具调用
+}
+```
+
+**action**:
+```json
+{
+  "tool_name": "read_file",
+  "arguments": {
+    "file_path": "config.yaml"
+  }
+}
+```
+
+**result**:
+```json
+{
+  "tool_name": "read_file",
+  "output": "file content...",
+  "error": null
+}
+```
+
+**memory_read**:
+```json
+{
+  "experiences_count": 5,
+  "skills_count": 3
+}
+```
+
+---
+
+## 🎯 推荐的实现方案
+
+### 方案 1:纯 WebSocket(简单场景)
+
+适用于:实时监控进行中的任务,Step 数量 < 100
+
+```javascript
+// 只用 WebSocket,自动获取历史
+const ws = new WebSocket(
+  'ws://localhost:8000/api/traces/abc123/watch?since_event_id=0'
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+
+  if (data.event === 'step_added') {
+    // 历史 + 新增的 Step 都会收到
+    addStepToTree(data.step)
+  }
+}
+```
+
+**优点**:代码简单
+**缺点**:超过 100 个 Step 会失败
+
+---
+
+### 方案 2:REST + WebSocket(生产推荐)
+
+适用于:查看历史任务,或 Step 数量 > 100
+
+```javascript
+// 1. 先用 REST API 获取完整树
+const response = await fetch(
+  `/api/traces/${traceId}/tree?view=compact`
+)
+const treeData = await response.json()
+renderTree(treeData.root_steps)
+
+// 2. 连接 WebSocket 监听增量更新
+const ws = new WebSocket(
+  `ws://localhost:8000/api/traces/${traceId}/watch?since_event_id=0`
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event === 'step_added') {
+    addStepToTree(data.step)  // 只处理新增的
+  }
+}
+```
+
+**优点**:可靠,支持大型 Trace
+**缺点**:略复杂
+
+---
+
+## 🐛 错误处理
+
+### HTTP 错误码
+
+| 状态码 | 说明 |
+|--------|------|
+| 200 | 成功 |
+| 404 | Trace/Step 不存在 |
+| 400 | 参数错误 |
+| 500 | 服务器错误 |
+
+### WebSocket 错误
+
+```javascript
+ws.onerror = (error) => {
+  console.error('WebSocket error:', error)
+  // 重连
+}
+
+ws.onclose = (event) => {
+  console.log('Connection closed:', event.code, event.reason)
+  // 自动重连
+}
+```
+
+---
+
+## 💡 最佳实践
+
+### 1. 保存 event_id 用于断线重连
+
+```javascript
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event_id) {
+    localStorage.setItem(
+      `trace_${traceId}_event_id`,
+      data.event_id
+    )
+  }
+}
+```
+
+### 2. 实现自动重连
+
+```javascript
+function connectWebSocket(traceId) {
+  const lastEventId = localStorage.getItem(`trace_${traceId}_event_id`) || 0
+  const ws = new WebSocket(
+    `ws://localhost:8000/api/traces/${traceId}/watch?since_event_id=${lastEventId}`
+  )
+
+  ws.onclose = () => {
+    setTimeout(() => connectWebSocket(traceId), 3000)
+  }
+
+  return ws
+}
+```
+
+### 3. 使用 compact 视图减少流量
+
+```javascript
+// ✅ 推荐
+const response = await fetch(`/api/traces/${id}/tree?view=compact`)
+
+// ❌ 避免(数据量可能很大)
+const response = await fetch(`/api/traces/${id}/tree?view=full`)
+```
+
+### 4. 按需懒加载(大型 Trace)
+
+```javascript
+// 初次只加载第一层
+const tree = await fetch(
+  `/api/traces/${id}/tree?max_depth=1`
+).then(r => r.json())
+
+// 用户点击展开时,懒加载子树
+async function onExpand(stepId) {
+  const node = await fetch(
+    `/api/traces/${id}/node/${stepId}?expand=true&max_depth=1`
+  ).then(r => r.json())
+
+  appendChildren(stepId, node.children)
+}
+```
+
+---
+
+## 🔗 相关文档
+
+- [Step 树结构详解](./step-tree.md)
+- [API 接口规范](./trace-api.md)
+- [架构设计文档](./README.md)
+
+---
+
+## 📞 问题反馈
+
+如有问题请提 Issue:https://github.com/anthropics/agent/issues

+ 387 - 0
frontend/test_api.py

@@ -0,0 +1,387 @@
+#!/usr/bin/env python3
+"""
+Agent Execution API 自动化测试
+
+测试 REST API 和 WebSocket 接口是否正常工作
+
+运行方式:
+    python3 frontend/test_api.py
+
+要求:
+    - API Server 已启动(python3 api_server.py)
+    - 有至少一个 Trace 数据(运行 examples/feature_extract/run.py)
+"""
+
+import asyncio
+import json
+import sys
+from typing import Optional
+from datetime import datetime
+
+try:
+    import httpx
+    import websockets
+except ImportError:
+    print("❌ 缺少依赖,请安装:")
+    print("   pip install httpx websockets")
+    sys.exit(1)
+
+
+# 配置
+API_BASE = "http://localhost:8000"
+WS_BASE = "ws://localhost:8000"
+
+
+class TestResult:
+    """测试结果"""
+    def __init__(self, name: str):
+        self.name = name
+        self.passed = False
+        self.message = ""
+        self.duration_ms = 0
+
+    def success(self, message: str = ""):
+        self.passed = True
+        self.message = message
+
+    def fail(self, message: str):
+        self.passed = False
+        self.message = message
+
+    def __str__(self):
+        icon = "✅" if self.passed else "❌"
+        return f"{icon} {self.name} ({self.duration_ms}ms)" + (f"\n   {self.message}" if self.message else "")
+
+
+class APITester:
+    """API 测试器"""
+
+    def __init__(self):
+        self.results = []
+        self.test_trace_id: Optional[str] = None
+
+    async def run_all_tests(self):
+        """运行所有测试"""
+        print("🧪 Agent Execution API 自动化测试")
+        print("=" * 60)
+        print()
+
+        # 检查 API Server
+        if not await self.check_server():
+            print("❌ API Server 未启动,请先运行:")
+            print("   python3 api_server.py")
+            return False
+
+        print("✅ API Server 已启动\n")
+
+        # REST API 测试
+        print("📡 测试 REST API")
+        print("-" * 60)
+        await self.test_list_traces()
+        await self.test_get_trace()
+        await self.test_get_tree()
+        print()
+
+        # WebSocket 测试
+        print("⚡ 测试 WebSocket")
+        print("-" * 60)
+        await self.test_websocket_connect()
+        await self.test_websocket_events()
+        await self.test_websocket_reconnect()
+        print()
+
+        # 汇总结果
+        self.print_summary()
+
+        return all(r.passed for r in self.results)
+
+    async def check_server(self) -> bool:
+        """检查 API Server 是否运行"""
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces", timeout=2.0)
+                return response.status_code in [200, 404]
+        except Exception:
+            return False
+
+    async def test_list_traces(self):
+        """测试:列出 Traces"""
+        result = TestResult("GET /api/traces - 列出 Traces")
+        start = datetime.now()
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces?limit=10")
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    if "traces" not in data:
+                        result.fail("响应缺少 'traces' 字段")
+                    else:
+                        trace_count = len(data["traces"])
+                        if trace_count == 0:
+                            result.fail("没有找到 Trace(请先运行 example 生成数据)")
+                        else:
+                            # 保存第一个 Trace ID 用于后续测试
+                            self.test_trace_id = data["traces"][0]["trace_id"]
+                            result.success(f"找到 {trace_count} 个 Trace")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_get_trace(self):
+        """测试:获取单个 Trace"""
+        result = TestResult("GET /api/traces/{id} - 获取 Trace 元数据")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces/{self.test_trace_id}")
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    required_fields = ["trace_id", "mode", "status", "total_steps"]
+                    missing = [f for f in required_fields if f not in data]
+                    if missing:
+                        result.fail(f"响应缺少字段: {missing}")
+                    else:
+                        result.success(f"Status: {data['status']}, Steps: {data['total_steps']}")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_get_tree(self):
+        """测试:获取完整 Step 树"""
+        result = TestResult("GET /api/traces/{id}/tree - 获取 Step 树")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(
+                    f"{API_BASE}/api/traces/{self.test_trace_id}/tree?view=compact"
+                )
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    if "root_steps" not in data:
+                        result.fail("响应缺少 'root_steps' 字段")
+                    else:
+                        root_count = len(data["root_steps"])
+                        # 统计总 step 数(递归)
+                        def count_steps(nodes):
+                            count = len(nodes)
+                            for node in nodes:
+                                if "children" in node:
+                                    count += count_steps(node["children"])
+                            return count
+
+                        total_steps = count_steps(data["root_steps"])
+                        result.success(f"根节点: {root_count}, 总 Steps: {total_steps}")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_connect(self):
+        """测试:WebSocket 连接"""
+        result = TestResult("WebSocket - 连接和接收 connected 事件")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+            async with websockets.connect(uri) as ws:
+                # 接收第一条消息(connected)
+                message = await asyncio.wait_for(ws.recv(), timeout=3.0)
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                data = json.loads(message)
+                if data.get("event") != "connected":
+                    result.fail(f"首条消息不是 'connected',而是: {data.get('event')}")
+                elif "current_event_id" not in data:
+                    result.fail("connected 消息缺少 'current_event_id' 字段")
+                else:
+                    event_id = data["current_event_id"]
+                    result.success(f"Event ID: {event_id}")
+        except asyncio.TimeoutError:
+            result.fail("连接超时(3秒)")
+        except Exception as e:
+            result.fail(f"连接失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_events(self):
+        """测试:WebSocket 接收历史事件"""
+        result = TestResult("WebSocket - 接收历史 step_added 事件")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+            async with websockets.connect(uri) as ws:
+                # 接收 connected
+                await ws.recv()
+
+                # 接收后续消息(应该是历史 step_added 事件)
+                events_received = []
+                try:
+                    while len(events_received) < 10:  # 最多接收 10 条
+                        message = await asyncio.wait_for(ws.recv(), timeout=1.0)
+                        data = json.loads(message)
+                        events_received.append(data.get("event"))
+                except asyncio.TimeoutError:
+                    pass  # 没有更多消息了
+
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                step_added_count = events_received.count("step_added")
+                if step_added_count == 0:
+                    result.fail("没有收到 step_added 事件(Trace 可能为空)")
+                else:
+                    other_events = [e for e in events_received if e != "step_added"]
+                    result.success(
+                        f"收到 {step_added_count} 个 step_added" +
+                        (f", {len(other_events)} 个其他事件" if other_events else "")
+                    )
+        except Exception as e:
+            result.fail(f"测试失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_reconnect(self):
+        """测试:WebSocket 断线续传"""
+        result = TestResult("WebSocket - 断线续传(since_event_id)")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+
+            # 第一次连接,获取 event_id
+            last_event_id = 0
+            async with websockets.connect(uri) as ws:
+                message = await ws.recv()
+                data = json.loads(message)
+                last_event_id = data.get("current_event_id", 0)
+
+            if last_event_id == 0:
+                result.fail("无法获取 event_id")
+                self.results.append(result)
+                print(result)
+                return
+
+            # 第二次连接,使用 since_event_id
+            uri2 = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id={last_event_id}"
+            async with websockets.connect(uri2) as ws:
+                message = await ws.recv()
+                data = json.loads(message)
+
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if data.get("event") != "connected":
+                    result.fail(f"重连后首条消息不是 'connected': {data.get('event')}")
+                else:
+                    # 检查是否不再接收历史事件
+                    try:
+                        message = await asyncio.wait_for(ws.recv(), timeout=0.5)
+                        data2 = json.loads(message)
+                        # 如果收到消息,说明补发了历史(可能是新增的)
+                        result.success(f"重连成功,从 event_id={last_event_id} 继续")
+                    except asyncio.TimeoutError:
+                        # 没有收到消息,说明没有新增的事件(正常)
+                        result.success(f"重连成功,无新增事件(event_id={last_event_id})")
+        except Exception as e:
+            result.fail(f"测试失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    def print_summary(self):
+        """打印测试摘要"""
+        print("=" * 60)
+        print("📊 测试摘要")
+        print("=" * 60)
+
+        passed = sum(1 for r in self.results if r.passed)
+        total = len(self.results)
+        failed = total - passed
+
+        print(f"\n总计: {total} 个测试")
+        print(f"✅ 通过: {passed}")
+        if failed > 0:
+            print(f"❌ 失败: {failed}")
+        print()
+
+        if failed > 0:
+            print("失败的测试:")
+            for r in self.results:
+                if not r.passed:
+                    print(f"  - {r.name}")
+                    if r.message:
+                        print(f"    {r.message}")
+            print()
+
+        if passed == total:
+            print("🎉 所有测试通过!")
+        else:
+            print("⚠️  部分测试失败,请检查上述错误")
+
+        print()
+
+
+async def main():
+    """主函数"""
+    tester = APITester()
+    success = await tester.run_all_tests()
+    sys.exit(0 if success else 1)
+
+
+if __name__ == "__main__":
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        print("\n\n⚠️  测试被中断")
+        sys.exit(1)

+ 0 - 2
requirements.txt

@@ -11,5 +11,3 @@ browser-use>=0.11.0
 fastapi>=0.115.0
 uvicorn[standard]>=0.32.0
 websockets>=13.0
-
-