Просмотр исходного кода

feat(execution): 实现文件系统存储并重构执行追踪系统

- 将 MemoryTraceStore 替换为 FileSystemTraceStore,支持跨进程持久化
- 重构 Step 模型,移除未使用的字段并添加 UI 优化字段
- 扩展 TraceStore 协议,支持事件流和断线续传
- 更新 API 和 WebSocket 接口,支持 compact/full 视图和分页查询
- 添加前端 API 文档,提供完整的对接指南
- 清理未使用的代码和冗余功能,简化架构
max_liu 1 месяц назад
Родитель
Сommit
f3ab0d4116

+ 4 - 4
.gitignore

@@ -49,7 +49,7 @@ htmlcov/
 .DS_Store
 .DS_Store
 Thumbs.db
 Thumbs.db
 
 
-# .env
+.env
 debug.log
 debug.log
 info.log
 info.log
 .browser_use_files
 .browser_use_files
@@ -59,6 +59,6 @@ output
 
 
 # Debug output
 # Debug output
 .trace/
 .trace/
-
-
-cloud_xhs/
+.trace_test/
+.trace_test2/
+examples/**/output*/

+ 2 - 4
agent/__init__.py

@@ -15,9 +15,8 @@ from agent.core.runner import AgentRunner
 from agent.core.config import AgentConfig, CallResult
 from agent.core.config import AgentConfig, CallResult
 
 
 # 执行追踪
 # 执行追踪
-from agent.execution.models import Trace, Step, StepType, Status
+from agent.execution.models import Trace, Step, StepType, StepStatus
 from agent.execution.protocols import TraceStore
 from agent.execution.protocols import TraceStore
-from agent.execution.store import MemoryTraceStore
 
 
 # 记忆系统
 # 记忆系统
 from agent.memory.models import Experience, Skill
 from agent.memory.models import Experience, Skill
@@ -39,9 +38,8 @@ __all__ = [
     "Trace",
     "Trace",
     "Step",
     "Step",
     "StepType",
     "StepType",
-    "Status",
+    "StepStatus",
     "TraceStore",
     "TraceStore",
-    "MemoryTraceStore",
     # Memory
     # Memory
     "Experience",
     "Experience",
     "Skill",
     "Skill",

+ 6 - 101
agent/core/runner.py

@@ -19,7 +19,6 @@ from agent.memory.models import Experience, Skill
 from agent.memory.protocols import MemoryStore, StateStore
 from agent.memory.protocols import MemoryStore, StateStore
 from agent.memory.skill_loader import load_skills_from_dir
 from agent.memory.skill_loader import load_skills_from_dir
 from agent.tools import ToolRegistry, get_tool_registry
 from agent.tools import ToolRegistry, get_tool_registry
-from agent.execution import dump_tree, dump_markdown
 
 
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
 
 
@@ -68,7 +67,7 @@ class AgentRunner:
             llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
             llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
             config: Agent 配置
             config: Agent 配置
             skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
             skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
-            debug: 是否启用 debug 模式(输出 step tree 到 .trace/tree.txt
+            debug: 保留参数(已废弃,请使用 API Server 可视化
         """
         """
         self.trace_store = trace_store
         self.trace_store = trace_store
         self.memory_store = memory_store
         self.memory_store = memory_store
@@ -85,17 +84,10 @@ class AgentRunner:
         return str(uuid.uuid4())
         return str(uuid.uuid4())
 
 
     async def _dump_debug(self, trace_id: str) -> None:
     async def _dump_debug(self, trace_id: str) -> None:
-        """Debug 模式下输出 step tree(txt + markdown 两种格式)"""
-        if not self.debug or not self.trace_store:
-            return
-        trace = await self.trace_store.get_trace(trace_id)
-        steps = await self.trace_store.get_trace_steps(trace_id)
-
-        # 输出 tree.txt(简洁格式,兼容旧版)
-        dump_tree(trace, steps)
-
-        # 输出 tree.md(完整可折叠格式)
-        dump_markdown(trace, steps)
+        """Debug 模式(已废弃 - 使用 API 可视化替代)"""
+        # 不再自动生成 tree.txt/tree.md/tree.json
+        # 请使用 API Server 进行可视化:python3 api_server.py
+        pass
 
 
     # ===== 单次调用 =====
     # ===== 单次调用 =====
 
 
@@ -415,13 +407,10 @@ class AgentRunner:
                     for tc in tool_calls:
                     for tc in tool_calls:
                         tool_name = tc["function"]["name"]
                         tool_name = tc["function"]["name"]
                         tool_args = tc["function"]["arguments"]
                         tool_args = tc["function"]["arguments"]
-                        if tool_args and isinstance(tool_args, str):
+                        if isinstance(tool_args, str):
                             import json
                             import json
                             tool_args = json.loads(tool_args)
                             tool_args = json.loads(tool_args)
 
 
-                        if not tool_args:
-                            tool_args = {}
-
                         # 执行工具
                         # 执行工具
                         tool_result = await self.tools.execute(
                         tool_result = await self.tools.execute(
                             tool_name,
                             tool_name,
@@ -541,90 +530,6 @@ class AgentRunner:
                     yield trace_obj
                     yield trace_obj
             raise
             raise
 
 
-    # ===== 反馈 =====
-
-    async def add_feedback(
-        self,
-        trace_id: str,
-        target_step_id: str,
-        feedback_type: Literal["positive", "negative", "correction"],
-        content: str,
-        extract_experience: bool = True
-    ) -> Optional[str]:
-        """
-        添加人工反馈
-
-        Args:
-            trace_id: Trace ID
-            target_step_id: 反馈针对的 Step ID
-            feedback_type: 反馈类型
-            content: 反馈内容
-            extract_experience: 是否自动提取经验
-
-        Returns:
-            experience_id: 如果提取了经验
-        """
-        if not self.trace_store:
-            return None
-
-        # 获取 Trace
-        trace = await self.trace_store.get_trace(trace_id)
-        if not trace:
-            logger.warning(f"Trace not found: {trace_id}")
-            return None
-
-        # 创建 feedback Step
-        steps = await self.trace_store.get_trace_steps(trace_id)
-        max_seq = max(s.sequence for s in steps) if steps else 0
-
-        feedback_step = Step.create(
-            trace_id=trace_id,
-            step_type="feedback",
-            sequence=max_seq + 1,
-            status="completed",
-            description=f"{feedback_type}: {content[:50]}...",
-            parent_id=target_step_id,
-            data={
-                "target_step_id": target_step_id,
-                "feedback_type": feedback_type,
-                "content": content
-            }
-        )
-        await self.trace_store.add_step(feedback_step)
-        await self._dump_debug(trace_id)
-
-        # 提取经验
-        exp_id = None
-        if extract_experience and self.memory_store and feedback_type in ("positive", "correction"):
-            exp = Experience.create(
-                scope=f"agent:{trace.agent_type}" if trace.agent_type else "agent:default",
-                condition=f"执行类似 '{trace.task}' 任务时" if trace.task else "通用场景",
-                rule=content,
-                evidence=[target_step_id, feedback_step.step_id],
-                source="feedback",
-                confidence=0.8 if feedback_type == "positive" else 0.6
-            )
-            exp_id = await self.memory_store.add_experience(exp)
-
-            # 记录 memory_write Step
-            mem_step = Step.create(
-                trace_id=trace_id,
-                step_type="memory_write",
-                sequence=max_seq + 2,
-                status="completed",
-                description=f"保存经验: {exp.condition[:30]}...",
-                parent_id=feedback_step.step_id,
-                data={
-                    "experience_id": exp_id,
-                    "condition": exp.condition,
-                    "rule": exp.rule
-                }
-            )
-            await self.trace_store.add_step(mem_step)
-            await self._dump_debug(trace_id)
-
-        return exp_id
-
     # ===== 辅助方法 =====
     # ===== 辅助方法 =====
 
 
     def _format_skills(self, skills: List[Skill]) -> str:
     def _format_skills(self, skills: List[Skill]) -> str:

+ 0 - 71
agent/execution/README.md

@@ -1,71 +0,0 @@
-# Agent Execution - 执行追踪系统
-
-## 职责
-
-执行追踪系统负责记录和可视化 Agent 的执行过程:
-
-1. **数据模型** (`models.py`)
-   - `Trace` - 一次完整的执行轨迹
-   - `Step` - 执行过程中的一个原子操作(形成树结构)
-   - `StepType` - 步骤类型(思考、动作、结果等)
-   - `Status` - 步骤状态(计划中、执行中、完成、失败)
-
-2. **存储接口** (`protocols.py`, `store.py`)
-   - `TraceStore` - Trace 存储接口协议
-   - `MemoryTraceStore` - 内存实现(用于测试)
-
-3. **可视化工具** (`tree_dump.py`)
-   - `dump_tree()` - 输出文本格式的 step tree
-   - `dump_markdown()` - 输出 markdown 格式(可折叠)
-   - `dump_json()` - 输出 JSON 格式
-
-4. **API 和实时推送** (`api.py`, `websocket.py`)
-   - RESTful API - 查询 Trace 和 Step
-   - WebSocket - 实时推送执行更新
-
-## 模块边界
-
-- **只依赖**:core(事件系统)
-- **被依赖**:core.runner(记录 trace)
-- **独立开发**:可视化、API、WebSocket 可独立迭代
-
-## 使用示例
-
-```python
-from agent.execution import MemoryTraceStore, Trace, Step, dump_tree
-
-# 创建存储
-store = MemoryTraceStore()
-
-# 创建 Trace
-trace = Trace.create(mode="agent", task="Complete task")
-trace_id = await store.create_trace(trace)
-
-# 添加 Step
-step = Step.create(
-    trace_id=trace_id,
-    step_type="thought",
-    description="Analyzing task"
-)
-await store.add_step(step)
-
-# 可视化
-trace = await store.get_trace(trace_id)
-steps = await store.get_trace_steps(trace_id)
-dump_tree(trace, steps)
-```
-
-## 文件说明
-
-- `models.py` - Trace 和 Step 数据模型
-- `protocols.py` - TraceStore 接口定义
-- `store.py` - MemoryTraceStore 实现
-- `tree_dump.py` - Step tree 可视化工具
-- `api.py` - RESTful API(可选,需要 FastAPI)
-- `websocket.py` - WebSocket 推送(可选,需要 FastAPI)
-- `__init__.py` - 模块导出
-
-## 适合分工
-
-- **算法同事**:负责 `tree_dump.py` 可视化优化
-- **后端同事**:负责 `api.py` 和 `websocket.py` 开发

+ 8 - 8
agent/execution/__init__.py

@@ -3,20 +3,20 @@ Execution - 执行追踪系统
 
 
 核心职责:
 核心职责:
 1. Trace/Step 模型定义
 1. Trace/Step 模型定义
-2. 存储接口和实现(内存/数据库
+2. 存储接口和实现(文件系统
 3. Step 树可视化(文本/markdown/JSON)
 3. Step 树可视化(文本/markdown/JSON)
-4. RESTful API(可视化查询)
-5. WebSocket 推送(实时更新)
+4. RESTful API(可视化查询,支持 compact/full 视图
+5. WebSocket 推送(实时更新,支持断线续传
 """
 """
 
 
 # 模型(核心,无依赖)
 # 模型(核心,无依赖)
-from agent.execution.models import Trace, Step, StepType, Status
+from agent.execution.models import Trace, Step, StepType, StepStatus
 
 
 # 存储接口(核心,无依赖)
 # 存储接口(核心,无依赖)
 from agent.execution.protocols import TraceStore
 from agent.execution.protocols import TraceStore
 
 
-# 内存存储实现(核心,无依赖
-from agent.execution.store import MemoryTraceStore
+# 文件系统存储实现(跨进程 + 持久化
+from agent.execution.fs_store import FileSystemTraceStore
 
 
 # Debug 工具(可视化)
 # Debug 工具(可视化)
 from agent.execution.tree_dump import StepTreeDumper, dump_tree, dump_markdown, dump_json
 from agent.execution.tree_dump import StepTreeDumper, dump_tree, dump_markdown, dump_json
@@ -62,10 +62,10 @@ __all__ = [
     "Trace",
     "Trace",
     "Step",
     "Step",
     "StepType",
     "StepType",
-    "Status",
+    "StepStatus",
     # 存储
     # 存储
     "TraceStore",
     "TraceStore",
-    "MemoryTraceStore",
+    "FileSystemTraceStore",
     # Debug/可视化
     # Debug/可视化
     "StepTreeDumper",
     "StepTreeDumper",
     "dump_tree",
     "dump_tree",

+ 76 - 6
agent/execution/api.py

@@ -1,11 +1,12 @@
 """
 """
 Step 树 RESTful API
 Step 树 RESTful API
 
 
-提供 Trace 和 Step 的查询接口,支持懒加载
+提供 Trace 和 Step 的查询接口,支持懒加载和 compact/full 视图
 """
 """
 
 
 from typing import List, Optional, Dict, Any
 from typing import List, Optional, Dict, Any
 from fastapi import APIRouter, HTTPException, Query
 from fastapi import APIRouter, HTTPException, Query
+from fastapi.responses import PlainTextResponse
 from pydantic import BaseModel
 from pydantic import BaseModel
 
 
 from agent.execution.protocols import TraceStore
 from agent.execution.protocols import TraceStore
@@ -137,12 +138,18 @@ async def get_trace(trace_id: str):
 
 
 
 
 @router.get("/{trace_id}/tree", response_model=TreeResponse)
 @router.get("/{trace_id}/tree", response_model=TreeResponse)
-async def get_full_tree(trace_id: str):
+async def get_full_tree(
+    trace_id: str,
+    view: str = Query("compact", regex="^(compact|full)$"),
+    max_depth: int = Query(999, ge=1, le=999)
+):
     """
     """
     获取完整 Step 树(小型 Trace 推荐)
     获取完整 Step 树(小型 Trace 推荐)
 
 
     Args:
     Args:
         trace_id: Trace ID
         trace_id: Trace ID
+        view: compact(默认,不含 blob)| full(含 blob)
+        max_depth: 最大深度
     """
     """
     store = get_trace_store()
     store = get_trace_store()
 
 
@@ -155,7 +162,7 @@ async def get_full_tree(trace_id: str):
     steps = await store.get_trace_steps(trace_id)
     steps = await store.get_trace_steps(trace_id)
 
 
     # 构建树结构
     # 构建树结构
-    root_nodes = await _build_tree(store, trace_id, None, expand=True, max_depth=999)
+    root_nodes = await _build_tree(store, trace_id, None, view=view, expand=True, max_depth=max_depth)
 
 
     return TreeResponse(
     return TreeResponse(
         trace_id=trace_id,
         trace_id=trace_id,
@@ -167,6 +174,7 @@ async def get_full_tree(trace_id: str):
 async def get_node(
 async def get_node(
     trace_id: str,
     trace_id: str,
     step_id: str,
     step_id: str,
+    view: str = Query("compact", regex="^(compact|full)$"),
     expand: bool = Query(False, description="是否加载子节点"),
     expand: bool = Query(False, description="是否加载子节点"),
     max_depth: int = Query(1, ge=1, le=10, description="递归深度")
     max_depth: int = Query(1, ge=1, le=10, description="递归深度")
 ):
 ):
@@ -176,6 +184,7 @@ async def get_node(
     Args:
     Args:
         trace_id: Trace ID
         trace_id: Trace ID
         step_id: Step ID("null" 表示根节点)
         step_id: Step ID("null" 表示根节点)
+        view: compact | full
         expand: 是否加载子节点
         expand: 是否加载子节点
         max_depth: 递归深度
         max_depth: 递归深度
     """
     """
@@ -196,7 +205,7 @@ async def get_node(
             raise HTTPException(status_code=404, detail="Step not found")
             raise HTTPException(status_code=404, detail="Step not found")
 
 
     # 构建节点树
     # 构建节点树
-    children = await _build_tree(store, trace_id, actual_step_id, expand, max_depth)
+    children = await _build_tree(store, trace_id, actual_step_id, view=view, expand=expand, max_depth=max_depth)
 
 
     # 如果是根节点,返回所有根 Steps
     # 如果是根节点,返回所有根 Steps
     if actual_step_id is None:
     if actual_step_id is None:
@@ -217,6 +226,65 @@ async def get_node(
     )
     )
 
 
 
 
+@router.get("/{trace_id}/node/{step_id}/children")
+async def get_children_paginated(
+    trace_id: str,
+    step_id: str,
+    cursor: Optional[int] = Query(None, description="上次最后的 sequence"),
+    limit: int = Query(20, ge=1, le=100),
+    view: str = Query("compact", regex="^(compact|full)$")
+):
+    """
+    分页获取子节点(基于 sequence 游标)
+
+    Args:
+        trace_id: Trace ID
+        step_id: Step ID
+        cursor: 上次最后的 sequence(None 表示从头开始)
+        limit: 每页数量
+        view: compact | full
+
+    Returns:
+        {
+            "children": [...],
+            "next_cursor": 123,  # 下一页游标(None 表示没有更多)
+            "has_more": true
+        }
+    """
+    store = get_trace_store()
+
+    # 验证 trace 存在
+    trace = await store.get_trace(trace_id)
+    if not trace:
+        raise HTTPException(status_code=404, detail="Trace not found")
+
+    # 验证 step 存在
+    step = await store.get_step(step_id)
+    if not step or step.trace_id != trace_id:
+        raise HTTPException(status_code=404, detail="Step not found")
+
+    # 获取所有子节点
+    children = await store.get_step_children(step_id)
+
+    # 过滤 cursor 之后的节点
+    if cursor is not None:
+        children = [s for s in children if s.sequence > cursor]
+
+    # 分页
+    has_more = len(children) > limit
+    page = children[:limit]
+    next_cursor = page[-1].sequence if page and has_more else None
+
+    # 序列化
+    children_dicts = [s.to_dict(view=view) for s in page]
+
+    return {
+        "children": children_dicts,
+        "next_cursor": next_cursor,
+        "has_more": has_more
+    }
+
+
 # ===== 核心算法:懒加载树构建 =====
 # ===== 核心算法:懒加载树构建 =====
 
 
 
 
@@ -224,6 +292,7 @@ async def _build_tree(
     store: TraceStore,
     store: TraceStore,
     trace_id: str,
     trace_id: str,
     step_id: Optional[str],
     step_id: Optional[str],
+    view: str = "compact",  # 新增参数
     expand: bool = False,
     expand: bool = False,
     max_depth: int = 1,
     max_depth: int = 1,
     current_depth: int = 0
     current_depth: int = 0
@@ -238,6 +307,7 @@ async def _build_tree(
         store: TraceStore 实例
         store: TraceStore 实例
         trace_id: Trace ID
         trace_id: Trace ID
         step_id: 当前 Step ID(None 表示根节点)
         step_id: 当前 Step ID(None 表示根节点)
+        view: "compact" | "full"
         expand: 是否展开子节点
         expand: 是否展开子节点
         max_depth: 最大递归深度
         max_depth: 最大递归深度
         current_depth: 当前递归深度
         current_depth: 当前递归深度
@@ -257,7 +327,7 @@ async def _build_tree(
     # 2. 构建响应
     # 2. 构建响应
     result_nodes = []
     result_nodes = []
     for step in current_nodes:
     for step in current_nodes:
-        node_dict = step.to_dict()
+        node_dict = step.to_dict(view=view)  # 使用 view 参数
         node_dict["children"] = []
         node_dict["children"] = []
 
 
         # 3. 递归加载子节点(可选)
         # 3. 递归加载子节点(可选)
@@ -266,7 +336,7 @@ async def _build_tree(
             if children:
             if children:
                 node_dict["children"] = await _build_tree(
                 node_dict["children"] = await _build_tree(
                     store, trace_id, step.step_id,
                     store, trace_id, step.step_id,
-                    expand=True, max_depth=max_depth,
+                    view=view, expand=True, max_depth=max_depth,
                     current_depth=current_depth + 1
                     current_depth=current_depth + 1
                 )
                 )
 
 

+ 353 - 0
agent/execution/fs_store.py

@@ -0,0 +1,353 @@
+"""
+FileSystem Trace Store - 文件系统存储实现
+
+用于跨进程数据共享,数据持久化到 .trace/ 目录
+"""
+
+import json
+import os
+from pathlib import Path
+from typing import Dict, List, Optional, Any
+from datetime import datetime
+
+from agent.execution.models import Trace, Step
+
+
+class FileSystemTraceStore:
+    """文件系统 Trace 存储
+
+    目录结构:
+    .trace/
+    ├── trace-001/
+    │   ├── meta.json           # Trace 元数据
+    │   ├── steps/
+    │   │   ├── step-1.json     # 每个 step 独立文件
+    │   │   ├── step-2.json
+    │   │   └── step-3.json
+    │   └── events.jsonl        # 事件流(WebSocket 续传)
+    └── trace-002/
+        └── ...
+    """
+
+    def __init__(self, base_path: str = ".trace"):
+        self.base_path = Path(base_path)
+        self.base_path.mkdir(exist_ok=True)
+
+    def _get_trace_dir(self, trace_id: str) -> Path:
+        """获取 trace 目录"""
+        return self.base_path / trace_id
+
+    def _get_meta_file(self, trace_id: str) -> Path:
+        """获取 meta.json 文件路径"""
+        return self._get_trace_dir(trace_id) / "meta.json"
+
+    def _get_steps_dir(self, trace_id: str) -> Path:
+        """获取 steps 目录"""
+        return self._get_trace_dir(trace_id) / "steps"
+
+    def _get_step_file(self, trace_id: str, step_id: str) -> Path:
+        """获取 step 文件路径"""
+        return self._get_steps_dir(trace_id) / f"{step_id}.json"
+
+    def _get_events_file(self, trace_id: str) -> Path:
+        """获取 events.jsonl 文件路径"""
+        return self._get_trace_dir(trace_id) / "events.jsonl"
+
+    # ===== Trace 操作 =====
+
+    async def create_trace(self, trace: Trace) -> str:
+        """创建新的 Trace"""
+        trace_dir = self._get_trace_dir(trace.trace_id)
+        trace_dir.mkdir(exist_ok=True)
+
+        # 创建 steps 目录
+        steps_dir = self._get_steps_dir(trace.trace_id)
+        steps_dir.mkdir(exist_ok=True)
+
+        # 写入 meta.json
+        meta_file = self._get_meta_file(trace.trace_id)
+        meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+        # 创建空的 events.jsonl
+        events_file = self._get_events_file(trace.trace_id)
+        events_file.touch()
+
+        return trace.trace_id
+
+    async def get_trace(self, trace_id: str) -> Optional[Trace]:
+        """获取 Trace"""
+        meta_file = self._get_meta_file(trace_id)
+        if not meta_file.exists():
+            return None
+
+        data = json.loads(meta_file.read_text())
+
+        # 解析 datetime 字段
+        if data.get("created_at"):
+            data["created_at"] = datetime.fromisoformat(data["created_at"])
+        if data.get("completed_at"):
+            data["completed_at"] = datetime.fromisoformat(data["completed_at"])
+
+        return Trace(**data)
+
+    async def update_trace(self, trace_id: str, **updates) -> None:
+        """更新 Trace"""
+        trace = await self.get_trace(trace_id)
+        if not trace:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(trace, key):
+                setattr(trace, key, value)
+
+        # 写回文件
+        meta_file = self._get_meta_file(trace_id)
+        meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+    async def list_traces(
+        self,
+        mode: Optional[str] = None,
+        agent_type: Optional[str] = None,
+        uid: Optional[str] = None,
+        status: Optional[str] = None,
+        limit: int = 50
+    ) -> List[Trace]:
+        """列出 Traces"""
+        traces = []
+
+        # 遍历所有 trace 目录
+        if not self.base_path.exists():
+            return []
+
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            meta_file = trace_dir / "meta.json"
+            if not meta_file.exists():
+                continue
+
+            try:
+                data = json.loads(meta_file.read_text())
+
+                # 过滤
+                if mode and data.get("mode") != mode:
+                    continue
+                if agent_type and data.get("agent_type") != agent_type:
+                    continue
+                if uid and data.get("uid") != uid:
+                    continue
+                if status and data.get("status") != status:
+                    continue
+
+                # 解析 datetime
+                if data.get("created_at"):
+                    data["created_at"] = datetime.fromisoformat(data["created_at"])
+                if data.get("completed_at"):
+                    data["completed_at"] = datetime.fromisoformat(data["completed_at"])
+
+                traces.append(Trace(**data))
+            except Exception:
+                # 跳过损坏的文件
+                continue
+
+        # 排序(最新的在前)
+        traces.sort(key=lambda t: t.created_at, reverse=True)
+
+        return traces[:limit]
+
+    # ===== Step 操作 =====
+
+    async def add_step(self, step: Step) -> str:
+        """添加 Step"""
+        trace_id = step.trace_id
+
+        # 1. 写入 step 文件
+        step_file = self._get_step_file(trace_id, step.step_id)
+        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+        # 2. 更新 trace 的统计信息
+        trace = await self.get_trace(trace_id)
+        if trace:
+            trace.total_steps += 1
+            trace.last_sequence = max(trace.last_sequence, step.sequence)
+
+            # 累加 tokens 和 cost
+            if step.tokens:
+                trace.total_tokens += step.tokens
+            if step.cost:
+                trace.total_cost += step.cost
+            if step.duration_ms:
+                trace.total_duration_ms += step.duration_ms
+
+            # 写回 meta.json
+            meta_file = self._get_meta_file(trace_id)
+            meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
+
+        # 3. 更新父节点的 UI 字段
+        if step.parent_id:
+            parent = await self.get_step(step.parent_id)
+            if parent:
+                parent.has_children = True
+                parent.children_count += 1
+
+                # 写回父节点文件
+                parent_file = self._get_step_file(trace_id, step.parent_id)
+                parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+        # 4. 追加 step_added 事件
+        await self.append_event(trace_id, "step_added", {
+            "step_id": step.step_id,
+            "step_type": step.step_type,
+            "parent_id": step.parent_id,
+            "sequence": step.sequence,
+        })
+
+        return step.step_id
+
+    async def get_step(self, step_id: str) -> Optional[Step]:
+        """获取 Step(扫描所有 trace)"""
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            step_file = trace_dir / "steps" / f"{step_id}.json"
+            if step_file.exists():
+                try:
+                    data = json.loads(step_file.read_text())
+
+                    # 解析 datetime
+                    if data.get("created_at"):
+                        data["created_at"] = datetime.fromisoformat(data["created_at"])
+
+                    return Step(**data)
+                except Exception:
+                    continue
+
+        return None
+
+    async def get_trace_steps(self, trace_id: str) -> List[Step]:
+        """获取 Trace 的所有 Steps"""
+        steps_dir = self._get_steps_dir(trace_id)
+        if not steps_dir.exists():
+            return []
+
+        steps = []
+        for step_file in steps_dir.glob("*.json"):
+            try:
+                data = json.loads(step_file.read_text())
+
+                # 解析 datetime
+                if data.get("created_at"):
+                    data["created_at"] = datetime.fromisoformat(data["created_at"])
+
+                steps.append(Step(**data))
+            except Exception:
+                # 跳过损坏的文件
+                continue
+
+        # 按 sequence 排序
+        steps.sort(key=lambda s: s.sequence)
+        return steps
+
+    async def get_step_children(self, step_id: str) -> List[Step]:
+        """获取 Step 的子节点"""
+        # 需要扫描所有 trace 的所有 steps
+        # TODO: 可以优化为维护索引文件
+        children = []
+
+        for trace_dir in self.base_path.iterdir():
+            if not trace_dir.is_dir():
+                continue
+
+            steps_dir = trace_dir / "steps"
+            if not steps_dir.exists():
+                continue
+
+            for step_file in steps_dir.glob("*.json"):
+                try:
+                    data = json.loads(step_file.read_text())
+                    if data.get("parent_id") == step_id:
+                        # 解析 datetime
+                        if data.get("created_at"):
+                            data["created_at"] = datetime.fromisoformat(data["created_at"])
+                        children.append(Step(**data))
+                except Exception:
+                    continue
+
+        # 按 sequence 排序
+        children.sort(key=lambda s: s.sequence)
+        return children
+
+    async def update_step(self, step_id: str, **updates) -> None:
+        """更新 Step 字段"""
+        step = await self.get_step(step_id)
+        if not step:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(step, key):
+                setattr(step, key, value)
+
+        # 写回文件
+        step_file = self._get_step_file(step.trace_id, step_id)
+        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+
+    # ===== 事件流操作(用于 WebSocket 断线续传)=====
+
+    async def get_events(
+        self,
+        trace_id: str,
+        since_event_id: int = 0
+    ) -> List[Dict[str, Any]]:
+        """获取事件流"""
+        events_file = self._get_events_file(trace_id)
+        if not events_file.exists():
+            return []
+
+        events = []
+        with events_file.open('r') as f:
+            for line in f:
+                try:
+                    event = json.loads(line.strip())
+                    if event.get("event_id", 0) > since_event_id:
+                        events.append(event)
+                except Exception:
+                    continue
+
+        return events
+
+    async def append_event(
+        self,
+        trace_id: str,
+        event_type: str,
+        payload: Dict[str, Any]
+    ) -> int:
+        """追加事件,返回 event_id"""
+        # 获取 trace 并递增 event_id
+        trace = await self.get_trace(trace_id)
+        if not trace:
+            return 0
+
+        trace.last_event_id += 1
+        event_id = trace.last_event_id
+
+        # 更新 trace 的 last_event_id
+        await self.update_trace(trace_id, last_event_id=event_id)
+
+        # 创建事件
+        event = {
+            "event_id": event_id,
+            "event": event_type,
+            "ts": datetime.now().isoformat(),
+            **payload
+        }
+
+        # 追加到 events.jsonl
+        events_file = self._get_events_file(trace_id)
+        with events_file.open('a') as f:
+            f.write(json.dumps(event, ensure_ascii=False) + '\n')
+
+        return event_id

+ 62 - 16
agent/execution/models.py

@@ -21,19 +21,18 @@ StepType = Literal[
     "evaluation",  # 评估总结(需要 summary)
     "evaluation",  # 评估总结(需要 summary)
     "response",    # 最终回复
     "response",    # 最终回复
 
 
-    # 工具相关
+    # 工具相关(数据结构上分开以保留描述能力,可视化时候可能合并)
     "action",      # 工具调用(tool_call)
     "action",      # 工具调用(tool_call)
     "result",      # 工具结果(tool_result)
     "result",      # 工具结果(tool_result)
 
 
     # 系统相关
     # 系统相关
     "memory_read",   # 读取记忆(经验/技能)
     "memory_read",   # 读取记忆(经验/技能)
     "memory_write",  # 写入记忆
     "memory_write",  # 写入记忆
-    "feedback",      # 人工反馈
 ]
 ]
 
 
 
 
 # Step 状态
 # Step 状态
-Status = Literal[
+StepStatus = Literal[
     "planned",           # 计划中(未执行)
     "planned",           # 计划中(未执行)
     "in_progress",       # 执行中
     "in_progress",       # 执行中
     "awaiting_approval", # 等待用户确认
     "awaiting_approval", # 等待用户确认
@@ -68,6 +67,11 @@ class Trace:
     total_steps: int = 0
     total_steps: int = 0
     total_tokens: int = 0
     total_tokens: int = 0
     total_cost: float = 0.0
     total_cost: float = 0.0
+    total_duration_ms: int = 0  # 总耗时(毫秒)
+
+    # 进度追踪(head)
+    last_sequence: int = 0      # 最新 step 的 sequence
+    last_event_id: int = 0      # 最新事件 ID(用于 WS 续传)
 
 
     # 上下文
     # 上下文
     uid: Optional[str] = None
     uid: Optional[str] = None
@@ -105,6 +109,9 @@ class Trace:
             "total_steps": self.total_steps,
             "total_steps": self.total_steps,
             "total_tokens": self.total_tokens,
             "total_tokens": self.total_tokens,
             "total_cost": self.total_cost,
             "total_cost": self.total_cost,
+            "total_duration_ms": self.total_duration_ms,
+            "last_sequence": self.last_sequence,
+            "last_event_id": self.last_event_id,
             "uid": self.uid,
             "uid": self.uid,
             "context": self.context,
             "context": self.context,
             "current_goal_id": self.current_goal_id,
             "current_goal_id": self.current_goal_id,
@@ -119,11 +126,32 @@ class Step:
     执行步骤 - Trace 中的一个原子操作
     执行步骤 - Trace 中的一个原子操作
 
 
     Step 之间通过 parent_id 形成树结构(单父节点)
     Step 之间通过 parent_id 形成树结构(单父节点)
+
+    ## 字段设计规则
+
+    **顶层字段**(Step 类属性):
+    - 所有(或大部分)step 都有的字段
+    - 需要筛选/排序/索引的字段(如 tokens, cost, duration_ms)
+    - 结构化、类型明确的字段
+
+    **data 字段**(Dict):
+    - step_type 特定的字段(不同类型有不同 schema)
+    - 详细的业务数据(如 messages, content, arguments, output)
+    - 可能很大的字段
+    - 半结构化、动态的字段
+
+    ## data 字段 schema(按 step_type)
+
+    - thought/response: model, messages, content, tool_calls
+    - action: tool_name, arguments
+    - result: tool_name, output, error
+    - memory_read: experiences_count, skills_count
+    - goal: 自定义(根据具体目标)
     """
     """
     step_id: str
     step_id: str
     trace_id: str
     trace_id: str
     step_type: StepType
     step_type: StepType
-    status: Status
+    status: StepStatus
     sequence: int  # 在 Trace 中的顺序
     sequence: int  # 在 Trace 中的顺序
 
 
     # 树结构(单父节点)
     # 树结构(单父节点)
@@ -138,6 +166,10 @@ class Step:
     # 仅 evaluation 类型需要
     # 仅 evaluation 类型需要
     summary: Optional[str] = None
     summary: Optional[str] = None
 
 
+    # UI 优化字段
+    has_children: bool = False      # 是否有子节点
+    children_count: int = 0         # 子节点数量
+
     # 执行指标
     # 执行指标
     duration_ms: Optional[int] = None
     duration_ms: Optional[int] = None
     tokens: Optional[int] = None
     tokens: Optional[int] = None
@@ -152,7 +184,7 @@ class Step:
         trace_id: str,
         trace_id: str,
         step_type: StepType,
         step_type: StepType,
         sequence: int,
         sequence: int,
-        status: Status = "completed",
+        status: StepStatus = "completed",
         description: str = "",
         description: str = "",
         data: Dict[str, Any] = None,
         data: Dict[str, Any] = None,
         parent_id: Optional[str] = None,
         parent_id: Optional[str] = None,
@@ -177,9 +209,15 @@ class Step:
             cost=cost,
             cost=cost,
         )
         )
 
 
-    def to_dict(self) -> Dict[str, Any]:
-        """转换为字典"""
-        return {
+    def to_dict(self, view: str = "full") -> Dict[str, Any]:
+        """
+        转换为字典
+
+        Args:
+            view: "compact" - 不返回大字段
+                  "full" - 返回完整数据
+        """
+        result = {
             "step_id": self.step_id,
             "step_id": self.step_id,
             "trace_id": self.trace_id,
             "trace_id": self.trace_id,
             "step_type": self.step_type,
             "step_type": self.step_type,
@@ -187,14 +225,29 @@ class Step:
             "sequence": self.sequence,
             "sequence": self.sequence,
             "parent_id": self.parent_id,
             "parent_id": self.parent_id,
             "description": self.description,
             "description": self.description,
-            "data": self.data,
             "summary": self.summary,
             "summary": self.summary,
+            "has_children": self.has_children,
+            "children_count": self.children_count,
             "duration_ms": self.duration_ms,
             "duration_ms": self.duration_ms,
             "tokens": self.tokens,
             "tokens": self.tokens,
             "cost": self.cost,
             "cost": self.cost,
             "created_at": self.created_at.isoformat() if self.created_at else None,
             "created_at": self.created_at.isoformat() if self.created_at else None,
         }
         }
 
 
+        # 处理 data 字段
+        if view == "compact":
+            # compact 模式:移除 data 中的大字段
+            data_copy = self.data.copy()
+            # 移除可能的大字段(如 output, content 等)
+            for key in ["output", "content", "full_output", "full_content"]:
+                data_copy.pop(key, None)
+            result["data"] = data_copy
+        else:
+            # full 模式:返回完整 data
+            result["data"] = self.data
+
+        return result
+
 
 
 # Step.data 结构说明
 # Step.data 结构说明
 #
 #
@@ -233,13 +286,6 @@ class Step:
 #       "is_final": True,
 #       "is_final": True,
 #   }
 #   }
 #
 #
-# feedback:
-#   {
-#       "target_step_id": "...",
-#       "feedback_type": "positive" | "negative" | "correction",
-#       "content": "..."
-#   }
-#
 # memory_read:
 # memory_read:
 #   {
 #   {
 #       "skills": [...],
 #       "skills": [...],

+ 49 - 1
agent/execution/protocols.py

@@ -4,7 +4,7 @@ Trace Storage Protocol - Trace 存储接口定义
 使用 Protocol 定义接口,允许不同的存储实现(内存、PostgreSQL、Neo4j 等)
 使用 Protocol 定义接口,允许不同的存储实现(内存、PostgreSQL、Neo4j 等)
 """
 """
 
 
-from typing import Protocol, List, Optional, runtime_checkable
+from typing import Protocol, List, Optional, Dict, Any, runtime_checkable
 
 
 from agent.execution.models import Trace, Step
 from agent.execution.models import Trace, Step
 
 
@@ -77,3 +77,51 @@ class TraceStore(Protocol):
     async def get_step_children(self, step_id: str) -> List[Step]:
     async def get_step_children(self, step_id: str) -> List[Step]:
         """获取 Step 的子节点"""
         """获取 Step 的子节点"""
         ...
         ...
+
+    async def update_step(self, step_id: str, **updates) -> None:
+        """
+        更新 Step 字段(用于状态变更、错误记录等)
+
+        Args:
+            step_id: Step ID
+            **updates: 要更新的字段
+        """
+        ...
+
+    # ===== 事件流操作(用于 WebSocket 断线续传)=====
+
+    async def get_events(
+        self,
+        trace_id: str,
+        since_event_id: int = 0
+    ) -> List[Dict[str, Any]]:
+        """
+        获取事件流(用于 WS 断线续传)
+
+        Args:
+            trace_id: Trace ID
+            since_event_id: 从哪个事件 ID 开始(0 表示全部)
+
+        Returns:
+            事件列表(按 event_id 排序)
+        """
+        ...
+
+    async def append_event(
+        self,
+        trace_id: str,
+        event_type: str,
+        payload: Dict[str, Any]
+    ) -> int:
+        """
+        追加事件,返回 event_id
+
+        Args:
+            trace_id: Trace ID
+            event_type: 事件类型(step_added/step_updated/trace_completed)
+            payload: 事件数据
+
+        Returns:
+            event_id: 新事件的 ID
+        """
+        ...

+ 0 - 89
agent/execution/store.py

@@ -1,89 +0,0 @@
-"""
-Memory Trace Store - 内存存储实现
-
-用于测试和简单场景,数据不持久化
-"""
-
-from typing import Dict, List, Optional
-
-from agent.execution.models import Trace, Step
-
-
-class MemoryTraceStore:
-    """内存 Trace 存储"""
-
-    def __init__(self):
-        self._traces: Dict[str, Trace] = {}
-        self._steps: Dict[str, Step] = {}
-        self._trace_steps: Dict[str, List[str]] = {}  # trace_id -> [step_ids]
-
-    async def create_trace(self, trace: Trace) -> str:
-        self._traces[trace.trace_id] = trace
-        self._trace_steps[trace.trace_id] = []
-        return trace.trace_id
-
-    async def get_trace(self, trace_id: str) -> Optional[Trace]:
-        return self._traces.get(trace_id)
-
-    async def update_trace(self, trace_id: str, **updates) -> None:
-        trace = self._traces.get(trace_id)
-        if trace:
-            for key, value in updates.items():
-                if hasattr(trace, key):
-                    setattr(trace, key, value)
-
-    async def list_traces(
-        self,
-        mode: Optional[str] = None,
-        agent_type: Optional[str] = None,
-        uid: Optional[str] = None,
-        status: Optional[str] = None,
-        limit: int = 50
-    ) -> List[Trace]:
-        traces = list(self._traces.values())
-
-        # 过滤
-        if mode:
-            traces = [t for t in traces if t.mode == mode]
-        if agent_type:
-            traces = [t for t in traces if t.agent_type == agent_type]
-        if uid:
-            traces = [t for t in traces if t.uid == uid]
-        if status:
-            traces = [t for t in traces if t.status == status]
-
-        # 排序(最新的在前)
-        traces.sort(key=lambda t: t.created_at, reverse=True)
-
-        return traces[:limit]
-
-    async def add_step(self, step: Step) -> str:
-        self._steps[step.step_id] = step
-
-        # 添加到 trace 的 steps 列表
-        if step.trace_id in self._trace_steps:
-            self._trace_steps[step.trace_id].append(step.step_id)
-
-        # 更新 trace 的 total_steps
-        trace = self._traces.get(step.trace_id)
-        if trace:
-            trace.total_steps += 1
-
-        return step.step_id
-
-    async def get_step(self, step_id: str) -> Optional[Step]:
-        return self._steps.get(step_id)
-
-    async def get_trace_steps(self, trace_id: str) -> List[Step]:
-        step_ids = self._trace_steps.get(trace_id, [])
-        steps = [self._steps[sid] for sid in step_ids if sid in self._steps]
-        steps.sort(key=lambda s: s.sequence)
-        return steps
-
-    async def get_step_children(self, step_id: str) -> List[Step]:
-        children = []
-        for step in self._steps.values():
-            if step.parent_id == step_id:
-                children.append(step)
-        children.sort(key=lambda s: s.sequence)
-        return children

+ 223 - 12
agent/execution/tree_dump.py

@@ -70,6 +70,25 @@ class StepTreeDumper:
             lines.append(f"  total_cost: {trace.get('total_cost', 0.0):.4f}")
             lines.append(f"  total_cost: {trace.get('total_cost', 0.0):.4f}")
             lines.append("")
             lines.append("")
 
 
+        # 统计摘要
+        if steps:
+            lines.append("## Statistics")
+            stats = self._calculate_statistics(steps)
+            lines.append(f"  Total steps: {stats['total']}")
+            lines.append(f"  By type:")
+            for step_type, count in sorted(stats['by_type'].items()):
+                lines.append(f"    {step_type}: {count}")
+            lines.append(f"  By status:")
+            for status, count in sorted(stats['by_status'].items()):
+                lines.append(f"    {status}: {count}")
+            if stats['total_duration_ms'] > 0:
+                lines.append(f"  Total duration: {stats['total_duration_ms']}ms")
+            if stats['total_tokens'] > 0:
+                lines.append(f"  Total tokens: {stats['total_tokens']}")
+            if stats['total_cost'] > 0:
+                lines.append(f"  Total cost: ${stats['total_cost']:.4f}")
+            lines.append("")
+
         # Step 树
         # Step 树
         if steps:
         if steps:
             lines.append("## Steps")
             lines.append("## Steps")
@@ -87,6 +106,36 @@ class StepTreeDumper:
 
 
         return content
         return content
 
 
+    def _calculate_statistics(self, steps: List[Dict[str, Any]]) -> Dict[str, Any]:
+        """计算统计信息"""
+        stats = {
+            'total': len(steps),
+            'by_type': {},
+            'by_status': {},
+            'total_duration_ms': 0,
+            'total_tokens': 0,
+            'total_cost': 0.0,
+        }
+
+        for step in steps:
+            # 按类型统计
+            step_type = step.get('step_type', 'unknown')
+            stats['by_type'][step_type] = stats['by_type'].get(step_type, 0) + 1
+
+            # 按状态统计
+            status = step.get('status', 'unknown')
+            stats['by_status'][status] = stats['by_status'].get(status, 0) + 1
+
+            # 累计指标
+            if step.get('duration_ms'):
+                stats['total_duration_ms'] += step.get('duration_ms', 0)
+            if step.get('tokens'):
+                stats['total_tokens'] += step.get('tokens', 0)
+            if step.get('cost'):
+                stats['total_cost'] += step.get('cost', 0.0)
+
+        return stats
+
     def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
     def _build_tree(self, steps: List[Dict[str, Any]]) -> Dict[str, List[str]]:
         """构建父子关系映射"""
         """构建父子关系映射"""
         # parent_id -> [child_ids]
         # parent_id -> [child_ids]
@@ -151,6 +200,7 @@ class StepTreeDumper:
             "planned": "○",
             "planned": "○",
             "failed": "✗",
             "failed": "✗",
             "skipped": "⊘",
             "skipped": "⊘",
+            "awaiting_approval": "⏸",
         }
         }
         icon = status_icons.get(status, "?")
         icon = status_icons.get(status, "?")
 
 
@@ -165,6 +215,16 @@ class StepTreeDumper:
         step_id = step.get("step_id", "")[:8]  # 只显示前 8 位
         step_id = step.get("step_id", "")[:8]  # 只显示前 8 位
         lines.append(f"{child_prefix}id: {step_id}...")
         lines.append(f"{child_prefix}id: {step_id}...")
 
 
+        # 关键字段:sequence, status, parent_id
+        sequence = step.get("sequence")
+        if sequence is not None:
+            lines.append(f"{child_prefix}sequence: {sequence}")
+        lines.append(f"{child_prefix}status: {status}")
+
+        parent_id = step.get("parent_id")
+        if parent_id:
+            lines.append(f"{child_prefix}parent_id: {parent_id[:8]}...")
+
         # 执行指标
         # 执行指标
         if step.get("duration_ms") is not None:
         if step.get("duration_ms") is not None:
             lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
             lines.append(f"{child_prefix}duration: {step.get('duration_ms')}ms")
@@ -181,11 +241,22 @@ class StepTreeDumper:
                 summary = summary[:100] + "..."
                 summary = summary[:100] + "..."
             lines.append(f"{child_prefix}summary: {summary}")
             lines.append(f"{child_prefix}summary: {summary}")
 
 
-        # data 内容(格式化输出)
+        # 错误信息(结构化显示)
+        error = step.get("error")
+        if error:
+            lines.append(f"{child_prefix}error:")
+            lines.append(f"{child_prefix}  code: {error.get('code', 'UNKNOWN')}")
+            error_msg = error.get('message', '')
+            if len(error_msg) > 200:
+                error_msg = error_msg[:200] + "..."
+            lines.append(f"{child_prefix}  message: {error_msg}")
+            lines.append(f"{child_prefix}  retryable: {error.get('retryable', True)}")
+
+        # data 内容(格式化输出,更激进的截断)
         data = step.get("data", {})
         data = step.get("data", {})
         if data:
         if data:
             lines.append(f"{child_prefix}data:")
             lines.append(f"{child_prefix}data:")
-            data_lines = self._format_data(data, child_prefix + "  ")
+            data_lines = self._format_data(data, child_prefix + "  ", max_value_len=150)
             lines.append(data_lines)
             lines.append(data_lines)
 
 
         # 时间
         # 时间
@@ -201,13 +272,18 @@ class StepTreeDumper:
         lines.append("")  # 空行分隔
         lines.append("")  # 空行分隔
         return "\n".join(lines)
         return "\n".join(lines)
 
 
-    def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 200) -> str:
-        """格式化 data 字典"""
+    def _format_data(self, data: Dict[str, Any], prefix: str, max_value_len: int = 150) -> str:
+        """格式化 data 字典(更激进的截断策略)"""
         lines = []
         lines = []
 
 
         for key, value in data.items():
         for key, value in data.items():
             # 格式化值
             # 格式化值
             if isinstance(value, str):
             if isinstance(value, str):
+                # 检测图片数据
+                if value.startswith("data:image") or (len(value) > 10000 and not "\n" in value[:100]):
+                    lines.append(f"{prefix}{key}: [IMAGE_DATA: {len(value)} chars, truncated]")
+                    continue
+
                 if len(value) > max_value_len:
                 if len(value) > max_value_len:
                     value_str = value[:max_value_len] + f"... ({len(value)} chars)"
                     value_str = value[:max_value_len] + f"... ({len(value)} chars)"
                 else:
                 else:
@@ -215,7 +291,8 @@ class StepTreeDumper:
                 # 处理多行字符串
                 # 处理多行字符串
                 if "\n" in value_str:
                 if "\n" in value_str:
                     first_line = value_str.split("\n")[0]
                     first_line = value_str.split("\n")[0]
-                    value_str = first_line + f"... ({value_str.count(chr(10))+1} lines)"
+                    line_count = value.count("\n") + 1
+                    value_str = first_line + f"... ({line_count} lines)"
             elif isinstance(value, (dict, list)):
             elif isinstance(value, (dict, list)):
                 value_str = json.dumps(value, ensure_ascii=False, indent=2)
                 value_str = json.dumps(value, ensure_ascii=False, indent=2)
                 if len(value_str) > max_value_len:
                 if len(value_str) > max_value_len:
@@ -268,6 +345,31 @@ class StepTreeDumper:
             lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
             lines.append(f"- **total_cost**: ${trace.get('total_cost', 0.0):.4f}")
             lines.append("")
             lines.append("")
 
 
+        # 统计摘要
+        if steps:
+            lines.append("## Statistics")
+            lines.append("")
+            stats = self._calculate_statistics(steps)
+            lines.append(f"- **Total steps**: {stats['total']}")
+            lines.append("")
+            lines.append("**By type:**")
+            lines.append("")
+            for step_type, count in sorted(stats['by_type'].items()):
+                lines.append(f"- `{step_type}`: {count}")
+            lines.append("")
+            lines.append("**By status:**")
+            lines.append("")
+            for status, count in sorted(stats['by_status'].items()):
+                lines.append(f"- `{status}`: {count}")
+            lines.append("")
+            if stats['total_duration_ms'] > 0:
+                lines.append(f"- **Total duration**: {stats['total_duration_ms']}ms")
+            if stats['total_tokens'] > 0:
+                lines.append(f"- **Total tokens**: {stats['total_tokens']}")
+            if stats['total_cost'] > 0:
+                lines.append(f"- **Total cost**: ${stats['total_cost']:.4f}")
+            lines.append("")
+
         # Steps
         # Steps
         if steps:
         if steps:
             lines.append("## Steps")
             lines.append("## Steps")
@@ -325,6 +427,7 @@ class StepTreeDumper:
             "planned": "○",
             "planned": "○",
             "failed": "✗",
             "failed": "✗",
             "skipped": "⊘",
             "skipped": "⊘",
+            "awaiting_approval": "⏸",
         }
         }
         icon = status_icons.get(status, "?")
         icon = status_icons.get(status, "?")
 
 
@@ -341,6 +444,17 @@ class StepTreeDumper:
         step_id = step.get("step_id", "")[:16]
         step_id = step.get("step_id", "")[:16]
         lines.append(f"- **id**: `{step_id}...`")
         lines.append(f"- **id**: `{step_id}...`")
 
 
+        # 关键字段
+        sequence = step.get("sequence")
+        if sequence is not None:
+            lines.append(f"- **sequence**: {sequence}")
+        lines.append(f"- **status**: {status}")
+
+        parent_id = step.get("parent_id")
+        if parent_id:
+            lines.append(f"- **parent_id**: `{parent_id[:16]}...`")
+
+        # 执行指标
         if step.get("duration_ms") is not None:
         if step.get("duration_ms") is not None:
             lines.append(f"- **duration**: {step.get('duration_ms')}ms")
             lines.append(f"- **duration**: {step.get('duration_ms')}ms")
         if step.get("tokens") is not None:
         if step.get("tokens") is not None:
@@ -358,17 +472,39 @@ class StepTreeDumper:
 
 
         lines.append("")
         lines.append("")
 
 
+        # 错误信息
+        error = step.get("error")
+        if error:
+            lines.append("<details>")
+            lines.append("<summary><b>❌ Error</b></summary>")
+            lines.append("")
+            lines.append(f"- **code**: `{error.get('code', 'UNKNOWN')}`")
+            lines.append(f"- **retryable**: {error.get('retryable', True)}")
+            lines.append(f"- **message**:")
+            lines.append("```")
+            error_msg = error.get('message', '')
+            if len(error_msg) > 500:
+                error_msg = error_msg[:500] + "..."
+            lines.append(error_msg)
+            lines.append("```")
+            lines.append("")
+            lines.append("</details>")
+            lines.append("")
+
         # Summary
         # Summary
         if step.get("summary"):
         if step.get("summary"):
             lines.append("<details>")
             lines.append("<details>")
             lines.append("<summary><b>📝 Summary</b></summary>")
             lines.append("<summary><b>📝 Summary</b></summary>")
             lines.append("")
             lines.append("")
-            lines.append(f"```\n{step.get('summary')}\n```")
+            summary = step.get('summary', '')
+            if len(summary) > 1000:
+                summary = summary[:1000] + "..."
+            lines.append(f"```\n{summary}\n```")
             lines.append("")
             lines.append("")
             lines.append("</details>")
             lines.append("</details>")
             lines.append("")
             lines.append("")
 
 
-        # Data(完整输出,不截断)
+        # Data(更激进的截断)
         data = step.get("data", {})
         data = step.get("data", {})
         if data:
         if data:
             lines.append(self._render_markdown_data(data))
             lines.append(self._render_markdown_data(data))
@@ -397,7 +533,7 @@ class StepTreeDumper:
         return "\n".join(lines)
         return "\n".join(lines)
 
 
     def _render_data_item(self, key: str, value: Any) -> str:
     def _render_data_item(self, key: str, value: Any) -> str:
-        """渲染单个 data 项"""
+        """渲染单个 data 项(更激进的截断)"""
         # 确定图标
         # 确定图标
         icon_map = {
         icon_map = {
             "messages": "📨",
             "messages": "📨",
@@ -407,6 +543,8 @@ class StepTreeDumper:
             "model": "🎯",
             "model": "🎯",
             "error": "❌",
             "error": "❌",
             "content": "💬",
             "content": "💬",
+            "output": "📤",
+            "arguments": "⚙️",
         }
         }
         icon = icon_map.get(key, "📄")
         icon = icon_map.get(key, "📄")
 
 
@@ -414,6 +552,64 @@ class StepTreeDumper:
         if value is None:
         if value is None:
             return ""
             return ""
 
 
+        # 特殊处理 messages 中的图片引用
+        if key == 'messages' and isinstance(value, list):
+            # 统计图片数量
+            image_count = 0
+            for msg in value:
+                if isinstance(msg, dict):
+                    content = msg.get('content', [])
+                    if isinstance(content, list):
+                        for item in content:
+                            if isinstance(item, dict) and item.get('type') == 'image_url':
+                                url = item.get('image_url', {}).get('url', '')
+                                if url.startswith('blob://'):
+                                    image_count += 1
+
+            if image_count > 0:
+                # 显示图片摘要
+                lines = []
+                lines.append("<details>")
+                lines.append(f"<summary><b>📨 Messages (含 {image_count} 张图片)</b></summary>")
+                lines.append("")
+                lines.append("```json")
+
+                # 渲染消息,图片显示为简化格式
+                simplified_messages = []
+                for msg in value:
+                    if isinstance(msg, dict):
+                        simplified_msg = msg.copy()
+                        content = msg.get('content', [])
+                        if isinstance(content, list):
+                            new_content = []
+                            for item in content:
+                                if isinstance(item, dict) and item.get('type') == 'image_url':
+                                    url = item.get('image_url', {}).get('url', '')
+                                    if url.startswith('blob://'):
+                                        blob_ref = url.replace('blob://', '')
+                                        size = item.get('image_url', {}).get('size', 0)
+                                        size_kb = size / 1024 if size > 0 else 0
+                                        new_content.append({
+                                            'type': 'image_url',
+                                            'image_url': {
+                                                'url': f'[IMAGE: {blob_ref[:8]}... ({size_kb:.1f}KB)]'
+                                            }
+                                        })
+                                    else:
+                                        new_content.append(item)
+                                else:
+                                    new_content.append(item)
+                            simplified_msg['content'] = new_content
+                        simplified_messages.append(simplified_msg)
+                    else:
+                        simplified_messages.append(msg)
+
+                lines.append(json.dumps(simplified_messages, ensure_ascii=False, indent=2))
+                lines.append("```")
+                lines.append("")
+                lines.append("</details>")
+                return "\n".join(lines)
+
         # 判断是否需要折叠(长内容或复杂结构)
         # 判断是否需要折叠(长内容或复杂结构)
         needs_collapse = False
         needs_collapse = False
         if isinstance(value, str):
         if isinstance(value, str):
@@ -428,13 +624,18 @@ class StepTreeDumper:
             lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
             lines.append(f"<summary><b>{icon} {key.capitalize()}</b></summary>")
             lines.append("")
             lines.append("")
 
 
-            # 格式化内容
+            # 格式化内容(更激进的截断)
             if isinstance(value, str):
             if isinstance(value, str):
                 # 检查是否包含图片 base64
                 # 检查是否包含图片 base64
-                if "data:image" in value or (isinstance(value, str) and len(value) > 10000):
+                if "data:image" in value or (isinstance(value, str) and len(value) > 10000 and not "\n" in value[:100]):
                     lines.append("```")
                     lines.append("```")
                     lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
                     lines.append(f"[IMAGE DATA: {len(value)} chars, truncated for display]")
-                    lines.append(value[:200] + "...")
+                    lines.append("```")
+                elif len(value) > 2000:
+                    # 超长文本,只显示前500字符
+                    lines.append("```")
+                    lines.append(value[:500])
+                    lines.append(f"... (truncated, total {len(value)} chars)")
                     lines.append("```")
                     lines.append("```")
                 else:
                 else:
                     lines.append("```")
                     lines.append("```")
@@ -443,8 +644,14 @@ class StepTreeDumper:
             elif isinstance(value, (dict, list)):
             elif isinstance(value, (dict, list)):
                 # 递归截断图片 base64
                 # 递归截断图片 base64
                 truncated_value = self._truncate_image_data(value)
                 truncated_value = self._truncate_image_data(value)
+                json_str = json.dumps(truncated_value, ensure_ascii=False, indent=2)
+
+                # 如果 JSON 太长,也截断
+                if len(json_str) > 3000:
+                    json_str = json_str[:3000] + "\n... (truncated)"
+
                 lines.append("```json")
                 lines.append("```json")
-                lines.append(json.dumps(truncated_value, ensure_ascii=False, indent=2))
+                lines.append(json_str)
                 lines.append("```")
                 lines.append("```")
 
 
             lines.append("")
             lines.append("")
@@ -470,6 +677,10 @@ class StepTreeDumper:
                         result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
                         result[key] = f"<IMAGE_DATA: {data_size_kb:.1f}KB, {header}, preview: {data[:50]}...>"
                     else:
                     else:
                         result[key] = value[:max_length] + f"... ({len(value)} chars)"
                         result[key] = value[:max_length] + f"... ({len(value)} chars)"
+                # 检测 blob 引用
+                elif isinstance(value, str) and value.startswith("blob://"):
+                    blob_ref = value.replace("blob://", "")
+                    result[key] = f"<BLOB_REF: {blob_ref[:8]}...>"
                 else:
                 else:
                     result[key] = self._truncate_image_data(value, max_length)
                     result[key] = self._truncate_image_data(value, max_length)
             return result
             return result

+ 60 - 13
agent/execution/websocket.py

@@ -1,11 +1,12 @@
 """
 """
 Step 树 WebSocket 推送
 Step 树 WebSocket 推送
 
 
-实时推送进行中 Trace 的 Step 更新
+实时推送进行中 Trace 的 Step 更新,支持断线续传
 """
 """
 
 
-from typing import Dict, Set
-from fastapi import APIRouter, WebSocket, WebSocketDisconnect
+from typing import Dict, Set, Any
+from datetime import datetime
+from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
 
 
 from agent.execution.protocols import TraceStore
 from agent.execution.protocols import TraceStore
 
 
@@ -37,12 +38,19 @@ def get_trace_store() -> TraceStore:
 
 
 
 
 @router.websocket("/{trace_id}/watch")
 @router.websocket("/{trace_id}/watch")
-async def watch_trace(websocket: WebSocket, trace_id: str):
+async def watch_trace(
+    websocket: WebSocket,
+    trace_id: str,
+    since_event_id: int = Query(0, description="从哪个事件 ID 开始(0=补发所有历史)")
+):
     """
     """
-    监听 Trace 的 Step 更新
+    监听 Trace 的 Step 更新,支持断线续传
 
 
     Args:
     Args:
         trace_id: Trace ID
         trace_id: Trace ID
+        since_event_id: 从哪个事件 ID 开始
+            - 0: 补发所有历史事件(初次连接)
+            - >0: 补发指定 ID 之后的事件(断线重连)
     """
     """
     await websocket.accept()
     await websocket.accept()
 
 
@@ -63,12 +71,26 @@ async def watch_trace(websocket: WebSocket, trace_id: str):
     _active_connections[trace_id].add(websocket)
     _active_connections[trace_id].add(websocket)
 
 
     try:
     try:
-        # 发送连接成功消息
+        # 发送连接成功消息 + 当前 event_id
         await websocket.send_json({
         await websocket.send_json({
             "event": "connected",
             "event": "connected",
-            "trace_id": trace_id
+            "trace_id": trace_id,
+            "current_event_id": trace.last_event_id
         })
         })
 
 
+        # 补发历史事件(since_event_id=0 表示补发所有历史)
+        if since_event_id >= 0:
+            missed_events = await store.get_events(trace_id, since_event_id)
+            # 限制补发数量(最多 100 条)
+            if len(missed_events) > 100:
+                await websocket.send_json({
+                    "event": "error",
+                    "message": f"Too many missed events ({len(missed_events)}), please reload full tree via REST API"
+                })
+            else:
+                for evt in missed_events:
+                    await websocket.send_json(evt)
+
         # 保持连接(等待客户端断开或接收消息)
         # 保持连接(等待客户端断开或接收消息)
         while True:
         while True:
             try:
             try:
@@ -93,18 +115,26 @@ async def watch_trace(websocket: WebSocket, trace_id: str):
 
 
 async def broadcast_step_added(trace_id: str, step_dict: Dict):
 async def broadcast_step_added(trace_id: str, step_dict: Dict):
     """
     """
-    广播 Step 添加事件
+    广播 Step 添加事件(自动分配 event_id)
 
 
     Args:
     Args:
         trace_id: Trace ID
         trace_id: Trace ID
-        step_dict: Step 字典(from step.to_dict())
+        step_dict: Step 字典(from step.to_dict(view="compact"))
     """
     """
     if trace_id not in _active_connections:
     if trace_id not in _active_connections:
         return
         return
 
 
+    # 从 store 获取最新 event_id(已由 add_step 自动追加)
+    store = get_trace_store()
+    trace = await store.get_trace(trace_id)
+    if not trace:
+        return
+
     message = {
     message = {
         "event": "step_added",
         "event": "step_added",
-        "step": step_dict
+        "event_id": trace.last_event_id,
+        "ts": datetime.now().isoformat(),
+        "step": step_dict  # compact 视图
     }
     }
 
 
     # 发送给所有监听该 Trace 的客户端
     # 发送给所有监听该 Trace 的客户端
@@ -122,20 +152,30 @@ async def broadcast_step_added(trace_id: str, step_dict: Dict):
 
 
 async def broadcast_step_updated(trace_id: str, step_id: str, updates: Dict):
 async def broadcast_step_updated(trace_id: str, step_id: str, updates: Dict):
     """
     """
-    广播 Step 更新事件
+    广播 Step 更新事件(patch 语义)
 
 
     Args:
     Args:
         trace_id: Trace ID
         trace_id: Trace ID
         step_id: Step ID
         step_id: Step ID
-        updates: 更新字段
+        updates: 更新字段(patch 格式)
     """
     """
     if trace_id not in _active_connections:
     if trace_id not in _active_connections:
         return
         return
 
 
+    store = get_trace_store()
+
+    # 追加事件到 store
+    event_id = await store.append_event(trace_id, "step_updated", {
+        "step_id": step_id,
+        "updates": updates
+    })
+
     message = {
     message = {
         "event": "step_updated",
         "event": "step_updated",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
         "step_id": step_id,
         "step_id": step_id,
-        "updates": updates
+        "patch": updates  # JSON Patch 风格:{"status": "completed", "duration_ms": 123}
     }
     }
 
 
     disconnected = []
     disconnected = []
@@ -160,8 +200,15 @@ async def broadcast_trace_completed(trace_id: str, total_steps: int):
     if trace_id not in _active_connections:
     if trace_id not in _active_connections:
         return
         return
 
 
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "trace_completed", {
+        "total_steps": total_steps
+    })
+
     message = {
     message = {
         "event": "trace_completed",
         "event": "trace_completed",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
         "trace_id": trace_id,
         "trace_id": trace_id,
         "total_steps": total_steps
         "total_steps": total_steps
     }
     }

+ 0 - 2
agent/memory/stores.py

@@ -2,8 +2,6 @@
 Memory Implementation - 内存存储实现
 Memory Implementation - 内存存储实现
 
 
 用于测试和简单场景,数据不持久化
 用于测试和简单场景,数据不持久化
-
-MemoryTraceStore 已移动到 agent.execution.store
 """
 """
 
 
 from typing import Dict, List, Optional, Any
 from typing import Dict, List, Optional, Any

+ 5 - 5
api_server.py

@@ -9,9 +9,9 @@ from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 from fastapi.middleware.cors import CORSMiddleware
 import uvicorn
 import uvicorn
 
 
-from agent.trace import MemoryTraceStore
-from agent.trace.api import router as api_router, set_trace_store as set_api_trace_store
-from agent.trace.websocket import router as ws_router, set_trace_store as set_ws_trace_store
+from agent.execution import FileSystemTraceStore
+from agent.execution.api import router as api_router, set_trace_store as set_api_trace_store
+from agent.execution.websocket import router as ws_router, set_trace_store as set_ws_trace_store
 
 
 
 
 # ===== 日志配置 =====
 # ===== 日志配置 =====
@@ -43,8 +43,8 @@ app.add_middleware(
 
 
 # ===== 初始化存储 =====
 # ===== 初始化存储 =====
 
 
-# 使用内存存储(后续可替换为 PostgreSQL
-trace_store = MemoryTraceStore()
+# 使用文件系统存储(支持跨进程和持久化
+trace_store = FileSystemTraceStore(base_path=".trace")
 
 
 # 注入到 step_tree 模块
 # 注入到 step_tree 模块
 set_api_trace_store(trace_store)
 set_api_trace_store(trace_store)

+ 2 - 1
docs/README.md

@@ -464,6 +464,7 @@ class SkillLoader(Protocol):
 
 
 **实现策略**:
 **实现策略**:
 - Trace/Step: 文件系统(JSON)
 - Trace/Step: 文件系统(JSON)
+  - `FileSystemTraceStore` - 文件持久化(支持跨进程)
 - Experience: PostgreSQL + pgvector
 - Experience: PostgreSQL + pgvector
 - Skill: 文件系统(Markdown)
 - Skill: 文件系统(Markdown)
 
 
@@ -482,7 +483,7 @@ agent/
 ├── execution/             # 执行追踪
 ├── execution/             # 执行追踪
 │   ├── models.py          # Trace, Step
 │   ├── models.py          # Trace, Step
 │   ├── protocols.py       # TraceStore
 │   ├── protocols.py       # TraceStore
-│   ├── store.py           # MemoryTraceStore
+│   ├── fs_store.py        # FileSystemTraceStore
 │   ├── tree_dump.py       # 可视化
 │   ├── tree_dump.py       # 可视化
 │   ├── api.py             # RESTful API
 │   ├── api.py             # RESTful API
 │   └── websocket.py       # WebSocket
 │   └── websocket.py       # WebSocket

+ 118 - 0
docs/decisions.md

@@ -518,3 +518,121 @@ Step 工具等核心功能如何让 Agent 知道?
 3. **可扩展性**:通过 Protocol 定义接口,便于后期切换实现
 3. **可扩展性**:通过 Protocol 定义接口,便于后期切换实现
 4. **安全性**:敏感数据占位符、域名匹配等机制保护隐私
 4. **安全性**:敏感数据占位符、域名匹配等机制保护隐私
 5. **可观测性**:内建统计、完整追踪,便于监控和调试
 5. **可观测性**:内建统计、完整追踪,便于监控和调试
+
+---
+
+## 10. 删除未使用的结构化错误功能
+
+**日期**: 2026-02-03
+
+### 问题
+在 execution trace v2.0 开发中引入了 `ErrorCode`、`StepError` 和 feedback 机制,但代码审查发现这些功能完全未被使用。
+
+### 决策
+**选择:删除未使用的代码**
+
+**理由**:
+1. **YAGNI 原则**:不应该维护未使用的功能(You Aren't Gonna Need It)
+2. **减少复杂度**:
+   - ErrorCode 枚举难以穷举,且 Python 无法强制约束
+   - StepError 与 ToolResult.error 存在设计重复
+   - feedback 机制缺少使用场景和配套接口
+3. **可恢复性**:需要时可以从 git 历史恢复
+4. **向后兼容**:这些功能从未被使用,删除不影响现有代码
+
+**影响**:
+- ✅ 代码更简洁
+- ✅ 减少维护负担
+- ✅ 不影响现有功能
+- ⚠️ 未来需要结构化错误时需要重新设计
+
+---
+
+---
+
+## 11. Step 关联统一使用 parent_id
+
+**日期**: 2026-02-03
+
+### 问题
+execution trace v2.0 设计中引入了多个跨节点关联字段(`tool_call_id`、`paired_action_id`、`span_id`),但实际分析后发现这些字段存在冗余。
+
+### 现状分析
+
+**字段使用情况**:
+- `tool_call_id` - Step 对象中未使用,只在 messages 中使用
+- `paired_action_id` - 完全未使用
+- `span_id` - 完全未使用
+
+**关联需求**:
+1. **Action/Result 配对** - 通过 parent_id 已经建立(result.parent_id = action.step_id)
+2. **与 messages 对应** - messages 中包含完整对话历史(含 tool_call_id)
+3. **重试追踪** - 同一个 action 下的多个 result,通过 parent_id 即可
+
+### 设计决策
+
+**保留**:
+- ✅ `parent_id` - 唯一的树结构关联字段
+
+**删除**:
+- ❌ `tool_call_id` - messages 中已包含,Step 不需要重复
+- ❌ `paired_action_id` - 与 parent_id 冗余
+- ❌ `span_id` - 分布式追踪功能,当前用不到
+
+---
+
+## 12. 删除 Blob 存储系统
+
+**日期**: 2026-02-03
+
+### 问题
+execution trace v2.0 引入了 Blob 存储系统用于处理大输出和图片,但实际分析后发现该系统过度设计且与 Agent 现有架构冗余。
+
+### 架构分析
+
+**Agent 的文件处理方式**:
+1. **用户输入**:提供文件路径(不是 base64)
+2. **工具处理**:内置工具直接读写文件系统
+3. **LLM 调用**:Runner 在调用 LLM 时才将文件路径转换为 base64
+4. **Trace 存储**:Step 中存储的是 messages(已包含 base64)
+
+**Blob 系统的问题**:
+1. **冗余提取**:从 messages 中提取 base64 再存储,而 messages 已经在 Step.data 中
+2. **功能重叠**:Agent 内置工具已经提供文件读写能力
+3. **过度设计**:引入 content-addressed storage、deduplication 等复杂功能,但 Agent 场景不需要
+4. **未被使用**:output_preview/blob_ref 字段从未在实际代码中使用
+
+### 设计决策
+
+**删除内容**:
+- ❌ `agent/execution/blob_store.py` 整个文件
+- ❌ `BlobStore` 协议及所有实现
+- ❌ `extract_images_from_messages()` 方法
+- ❌ `restore_images_in_messages()` 方法
+- ❌ `store_large_output()` 方法
+- ❌ Step 字段:`output_preview`、`blob_ref`
+
+**保留方案**:
+- ✅ Step 中的 `data` 字段直接存储 messages(包含 base64)
+- ✅ Agent 内置工具处理文件操作
+- ✅ 用户通过文件路径引用文件(不是 base64)
+
+### 理由
+
+1. **YAGNI 原则**:
+   - 功能从未被使用
+   - 未来需要时可以重新设计
+
+2. **架构更简洁**:
+   - 不需要额外的 blob 存储层
+   - 文件处理统一走工具系统
+
+3. **符合 Agent 场景**:
+   - Agent 运行在本地,直接访问文件系统
+   - 不需要像云服务那样做 blob 存储和 deduplication
+
+4. **数据完整性**:
+   - messages 中的 base64 已经足够
+   - 不需要拆分成 preview + ref
+
+---

+ 1 - 1
docs/project-structure.md

@@ -22,7 +22,7 @@ agent/
 ├── execution/                  # 执行追踪
 ├── execution/                  # 执行追踪
 │   ├── models.py               # Trace, Step
 │   ├── models.py               # Trace, Step
 │   ├── protocols.py            # TraceStore
 │   ├── protocols.py            # TraceStore
-│   ├── store.py                # MemoryTraceStore
+│   ├── fs_store.py             # FileSystemTraceStore
 │   └── tree_dump.py            # 可视化
 │   └── tree_dump.py            # 可视化
 ├── memory/                     # 记忆系统
 ├── memory/                     # 记忆系统
 │   ├── models.py               # Experience, Skill
 │   ├── models.py               # Experience, Skill

+ 78 - 0
docs/ref/Claude Code/agent-prompt-agent-creation-architect.md

@@ -0,0 +1,78 @@
+<!--
+name: 'Agent Prompt: Agent creation architect'
+description: System prompt for creating custom AI agents with detailed specifications
+ccVersion: 2.0.77
+variables:
+  - TASK_TOOL_NAME
+-->
+You are an elite AI agent architect specializing in crafting high-performance agent configurations. Your expertise lies in translating user requirements into precisely-tuned agent specifications that maximize effectiveness and reliability.
+
+**Important Context**: You may have access to project-specific instructions from CLAUDE.md files and other context that may include coding standards, project structure, and custom requirements. Consider this context when creating agents to ensure they align with the project's established patterns and practices.
+
+When a user describes what they want an agent to do, you will:
+
+1. **Extract Core Intent**: Identify the fundamental purpose, key responsibilities, and success criteria for the agent. Look for both explicit requirements and implicit needs. Consider any project-specific context from CLAUDE.md files. For agents that are meant to review code, you should assume that the user is asking to review recently written code and not the whole codebase, unless the user has explicitly instructed you otherwise.
+
+2. **Design Expert Persona**: Create a compelling expert identity that embodies deep domain knowledge relevant to the task. The persona should inspire confidence and guide the agent's decision-making approach.
+
+3. **Architect Comprehensive Instructions**: Develop a system prompt that:
+   - Establishes clear behavioral boundaries and operational parameters
+   - Provides specific methodologies and best practices for task execution
+   - Anticipates edge cases and provides guidance for handling them
+   - Incorporates any specific requirements or preferences mentioned by the user
+   - Defines output format expectations when relevant
+   - Aligns with project-specific coding standards and patterns from CLAUDE.md
+
+4. **Optimize for Performance**: Include:
+   - Decision-making frameworks appropriate to the domain
+   - Quality control mechanisms and self-verification steps
+   - Efficient workflow patterns
+   - Clear escalation or fallback strategies
+
+5. **Create Identifier**: Design a concise, descriptive identifier that:
+   - Uses lowercase letters, numbers, and hyphens only
+   - Is typically 2-4 words joined by hyphens
+   - Clearly indicates the agent's primary function
+   - Is memorable and easy to type
+   - Avoids generic terms like "helper" or "assistant"
+
+6 **Example agent descriptions**:
+  - in the 'whenToUse' field of the JSON object, you should include examples of when this agent should be used.
+  - examples should be of the form:
+    - <example>
+      Context: The user is creating a test-runner agent that should be called after a logical chunk of code is written.
+      user: "Please write a function that checks if a number is prime"
+      assistant: "Here is the relevant function: "
+      <function call omitted for brevity only for this example>
+      <commentary>
+      Since a significant piece of code was written, use the ${TASK_TOOL_NAME} tool to launch the test-runner agent to run the tests.
+      </commentary>
+      assistant: "Now let me use the test-runner agent to run the tests"
+    </example>
+    - <example>
+      Context: User is creating an agent to respond to the word "hello" with a friendly jok.
+      user: "Hello"
+      assistant: "I'm going to use the ${TASK_TOOL_NAME} tool to launch the greeting-responder agent to respond with a friendly joke"
+      <commentary>
+      Since the user is greeting, use the greeting-responder agent to respond with a friendly joke. 
+      </commentary>
+    </example>
+  - If the user mentioned or implied that the agent should be used proactively, you should include examples of this.
+- NOTE: Ensure that in the examples, you are making the assistant use the Agent tool and not simply respond directly to the task.
+
+Your output must be a valid JSON object with exactly these fields:
+{
+  "identifier": "A unique, descriptive identifier using lowercase letters, numbers, and hyphens (e.g., 'test-runner', 'api-docs-writer', 'code-formatter')",
+  "whenToUse": "A precise, actionable description starting with 'Use this agent when...' that clearly defines the triggering conditions and use cases. Ensure you include examples as described above.",
+  "systemPrompt": "The complete system prompt that will govern the agent's behavior, written in second person ('You are...', 'You will...') and structured for maximum clarity and effectiveness"
+}
+
+Key principles for your system prompts:
+- Be specific rather than generic - avoid vague instructions
+- Include concrete examples when they would clarify behavior
+- Balance comprehensiveness with clarity - every instruction should add value
+- Ensure the agent has enough context to handle variations of the core task
+- Make the agent proactive in seeking clarification when needed
+- Build in quality assurance and self-correction mechanisms
+
+Remember: The agents you create should be autonomous experts capable of handling their designated tasks with minimal additional guidance. Your system prompts are their complete operational manual.

+ 12 - 0
docs/ref/Claude Code/agent-prompt-bash-command-description-writer.md

@@ -0,0 +1,12 @@
+Clear, concise description of what this command does in active voice. Never use words like "complex" or "risk" in the description - just describe what it does.
+
+For simple commands (git, npm, standard CLI tools), keep it brief (5-10 words):
+
+ls → "List files in current directory"
+git status → "Show working tree status"
+npm install → "Install package dependencies"
+For commands that are harder to parse at a glance (piped commands, obscure flags, etc.), add enough context to clarify what it does:
+
+find . -name "*.tmp" -exec rm {} \; → "Find and delete all .tmp files recursively"
+git reset --hard origin/main → "Discard all local changes and match remote main"
+curl -s url | jq '.data[]' → "Fetch JSON from URL and extract data array elements"

+ 9 - 0
docs/ref/Claude Code/system-prompt-doing-tasks.md

@@ -0,0 +1,9 @@
+Doing tasks
+The user will primarily request you perform software engineering tasks. This includes solving bugs, adding new functionality, refactoring code, explaining code, and more. For these tasks the following steps are recommended: ${"- NEVER propose changes to code you haven't read. If a user asks about or wants you to modify a file, read it first. Understand existing code before suggesting modifications."}${TOOL_USAGE_HINTS_ARRAY.length>0? ${TOOL_USAGE_HINTS_ARRAY.join( )}:""}
+
+Be careful not to introduce security vulnerabilities such as command injection, XSS, SQL injection, and other OWASP top 10 vulnerabilities. If you notice that you wrote insecure code, immediately fix it.
+Avoid over-engineering. Only make changes that are directly requested or clearly necessary. Keep solutions simple and focused.
+Don't add features, refactor code, or make "improvements" beyond what was asked. A bug fix doesn't need surrounding code cleaned up. A simple feature doesn't need extra configurability. Don't add docstrings, comments, or type annotations to code you didn't change. Only add comments where the logic isn't self-evident.
+Don't add error handling, fallbacks, or validation for scenarios that can't happen. Trust internal code and framework guarantees. Only validate at system boundaries (user input, external APIs). Don't use feature flags or backwards-compatibility shims when you can just change the code.
+Don't create helpers, utilities, or abstractions for one-time operations. Don't design for hypothetical future requirements. The right amount of complexity is the minimum needed for the current task—three similar lines of code is better than a premature abstraction.
+Avoid backwards-compatibility hacks like renaming unused `_vars`, re-exporting types, adding `// removed` comments for removed code, etc. If something is unused, delete it completely.

+ 6 - 0
docs/ref/Claude Code/system-prompt-tool-usage-policy.md

@@ -0,0 +1,6 @@
+Tool usage policy${WEBFETCH_ENABLED_SECTION}${MCP_TOOLS_SECTION}
+You can call multiple tools in a single response. If you intend to call multiple tools and there are no dependencies between them, make all independent tool calls in parallel. Maximize use of parallel tool calls where possible to increase efficiency. However, if some tool calls depend on previous calls to inform dependent values, do NOT call these tools in parallel and instead call them sequentially. For instance, if one operation must complete before another starts, run these operations sequentially instead. Never use placeholders or guess missing parameters in tool calls.
+If the user specifies that they want you to run tools "in parallel", you MUST send a single message with multiple tool use content blocks. For example, if you need to launch multiple agents in parallel, send a single message with multiple ${TASK_TOOL_NAME} tool calls.
+Use specialized tools instead of bash commands when possible, as this provides a better user experience. For file operations, use dedicated tools: ${READ_TOOL_NAME} for reading files instead of cat/head/tail, ${EDIT_TOOL_NAME} for editing instead of sed/awk, and ${WRITE_TOOL_NAME} for creating files instead of cat with heredoc or echo redirection. Reserve bash tools exclusively for actual system commands and terminal operations that require shell execution. NEVER use bash echo or other command-line tools to communicate thoughts, explanations, or instructions to the user. Output all communication directly in your response text instead.
+${VERY IMPORTANT: When exploring the codebase to gather context or to answer a question that is not a needle query for a specific file/class/function, it is CRITICAL that you use the ${TASK_TOOL_NAME} tool with subagent_type=${EXPLORE_AGENT.agentType} instead of running search commands directly.}
+user: Where are errors from the client handled? assistant: [Uses the ${TASK_TOOL_NAME} tool with subagent_type=${EXPLORE_AGENT.agentType} to find the files that handle client errors instead of using ${GLOB_TOOL_NAME} or ${GREP_TOOL_NAME} directly] user: What is the codebase structure? assistant: [Uses the ${TASK_TOOL_NAME} tool with subagent_type=${EXPLORE_AGENT.agentType}]

+ 16 - 0
docs/ref/Claude Code/system-prompt-tool-use-summary-generation.md

@@ -0,0 +1,16 @@
+You summarize what was accomplished by a coding assistant. Given the tools executed and their results, provide a brief summary.
+
+Rules:
+
+Use past tense (e.g., "Read package.json", "Fixed type error in utils.ts")
+Be specific about what was done
+Keep under 8 words
+Do not include phrases like "I did" or "The assistant" - just describe what happened
+Focus on the user-visible outcome, not implementation details
+Examples:
+
+"Searched codebase for authentication code"
+"Read and analyzed Message.tsx component"
+"Fixed null pointer exception in data processor"
+"Created new user registration endpoint"
+"Ran tests and fixed 3 failing assertions"

+ 0 - 0
docs/ref/Claude Code/tool-description-bash.md


+ 26 - 1
docs/step-tree.md

@@ -74,6 +74,10 @@ class Step:
     # 仅 evaluation 类型需要
     # 仅 evaluation 类型需要
     summary: Optional[str] = None
     summary: Optional[str] = None
 
 
+    # UI 优化字段
+    has_children: bool = False            # 是否有子节点
+    children_count: int = 0               # 子节点数量
+
     # 执行指标
     # 执行指标
     duration_ms: Optional[int] = None
     duration_ms: Optional[int] = None
     cost: Optional[float] = None
     cost: Optional[float] = None
@@ -87,6 +91,27 @@ class Step:
 - `parent_id` 是单个值(树结构),不是列表(DAG)
 - `parent_id` 是单个值(树结构),不是列表(DAG)
 - `summary` 仅在 `evaluation` 类型节点填充,不是每个节点都需要
 - `summary` 仅在 `evaluation` 类型节点填充,不是每个节点都需要
 - `planned` 状态的 step 相当于 TODO item
 - `planned` 状态的 step 相当于 TODO item
+- `has_children` 和 `children_count` 用于前端 UI 优化(判断可展开、显示统计)
+
+**字段设计规则**:
+
+**顶层字段**(Step 类属性):
+- 所有(或大部分)step 都有的字段
+- 需要筛选/排序/索引的字段(如 tokens, cost, duration_ms)
+- 结构化、类型明确的字段
+
+**data 字段**(Dict):
+- step_type 特定的字段(不同类型有不同 schema)
+- 详细的业务数据(如 messages, content, arguments, output)
+- 可能很大的字段
+- 半结构化、动态的字段
+
+**data 字段 schema(按 step_type)**:
+- `thought` / `response`: model, messages, content, tool_calls
+- `action`: tool_name, arguments
+- `result`: tool_name, output, error
+- `memory_read`: experiences_count, skills_count
+- `goal`: 自定义(根据具体目标)
 
 
 ---
 ---
 
 
@@ -498,7 +523,7 @@ code .trace/tree.md
 - Step 模型:`agent/execution/models.py:Step`(已实现)
 - Step 模型:`agent/execution/models.py:Step`(已实现)
 - Trace 模型:`agent/execution/models.py:Trace`(已实现)
 - Trace 模型:`agent/execution/models.py:Trace`(已实现)
 - 存储接口:`agent/execution/protocols.py:TraceStore`(已实现)
 - 存储接口:`agent/execution/protocols.py:TraceStore`(已实现)
-- 内存存储:`agent/execution/store.py:MemoryTraceStore`(已实现)
+- 文件存储:`agent/execution/fs_store.py:FileSystemTraceStore`(已实现)
 - Debug 工具:`agent/execution/tree_dump.py`(已实现)
 - Debug 工具:`agent/execution/tree_dump.py`(已实现)
 - **Core Skill**:`agent/skills/core.md`(已实现)
 - **Core Skill**:`agent/skills/core.md`(已实现)
 - step 工具:`agent/tools/builtin/step.py`(待实现)
 - step 工具:`agent/tools/builtin/step.py`(待实现)

+ 8 - 8
docs/trace-api.md

@@ -102,14 +102,14 @@ class MyCustomStore:
     async def get_step_children(self, step_id: str) -> List[Step]: ...
     async def get_step_children(self, step_id: str) -> List[Step]: ...
 ```
 ```
 
 
-### MemoryTraceStore
+### FileSystemTraceStore
 
 
-内存存储实现(用于开发和测试
+文件系统存储实现(支持跨进程和持久化
 
 
 ```python
 ```python
-from agent.trace import MemoryTraceStore
+from agent.execution import FileSystemTraceStore
 
 
-store = MemoryTraceStore()
+store = FileSystemTraceStore(base_path=".trace")
 
 
 # 使用方法
 # 使用方法
 trace_id = await store.create_trace(trace)
 trace_id = await store.create_trace(trace)
@@ -244,10 +244,10 @@ ws.onmessage = (e) => {
 
 
 ```python
 ```python
 from agent import AgentRunner
 from agent import AgentRunner
-from agent.trace import MemoryTraceStore
+from agent.execution import FileSystemTraceStore
 
 
 # 初始化
 # 初始化
-store = MemoryTraceStore()
+store = FileSystemTraceStore(base_path=".trace")
 runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
 runner = AgentRunner(trace_store=store, llm_call=my_llm_fn)
 
 
 # 执行 Agent(自动记录 Trace)
 # 执行 Agent(自动记录 Trace)
@@ -341,8 +341,8 @@ class PostgreSQLTraceStore:
 
 
 ```python
 ```python
 # ✅ 推荐导入
 # ✅ 推荐导入
-from agent.trace import Trace, Step, StepType, Status
-from agent.trace import TraceStore, MemoryTraceStore
+from agent.execution import Trace, Step, StepType, StepStatus
+from agent.execution import TraceStore, FileSystemTraceStore
 
 
 # ✅ 顶层导入(等价)
 # ✅ 顶层导入(等价)
 from agent import Trace, Step, TraceStore
 from agent import Trace, Step, TraceStore

+ 20 - 6
examples/feature_extract/run.py

@@ -17,7 +17,11 @@ load_dotenv()
 
 
 from agent.llm.prompts import SimplePrompt
 from agent.llm.prompts import SimplePrompt
 from agent.core.runner import AgentRunner
 from agent.core.runner import AgentRunner
-from agent.execution import MemoryTraceStore, Trace, Step
+from agent.execution import (
+    FileSystemTraceStore,
+    Trace,
+    Step,
+)
 from agent.llm import create_openrouter_llm_call
 from agent.llm import create_openrouter_llm_call
 
 
 
 
@@ -80,7 +84,7 @@ async def main():
     print(f"   - 模型: Claude Sonnet 4.5 (via OpenRouter)")
     print(f"   - 模型: Claude Sonnet 4.5 (via OpenRouter)")
 
 
     runner = AgentRunner(
     runner = AgentRunner(
-        trace_store=MemoryTraceStore(),
+        trace_store=FileSystemTraceStore(base_path=".trace"),
         llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
         llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
         skills_dir=skills_dir,  # 可选:加载额外的用户自定义 skills(内置 skills 会自动加载)
         skills_dir=skills_dir,  # 可选:加载额外的用户自定义 skills(内置 skills 会自动加载)
         debug=True  # 启用 debug,输出到 .trace/
         debug=True  # 启用 debug,输出到 .trace/
@@ -93,6 +97,7 @@ async def main():
     print()
     print()
 
 
     final_response = ""
     final_response = ""
+    current_trace_id = None  # 保存 trace_id 用于后续测试
 
 
     async for item in runner.run(
     async for item in runner.run(
         task="[图片和特征描述已包含在 messages 中]",  # 占位符
         task="[图片和特征描述已包含在 messages 中]",  # 占位符
@@ -105,6 +110,7 @@ async def main():
     ):
     ):
         # 处理 Trace 对象(整体状态变化)
         # 处理 Trace 对象(整体状态变化)
         if isinstance(item, Trace):
         if isinstance(item, Trace):
+            current_trace_id = item.trace_id  # 保存 trace_id
             if item.status == "in_progress":
             if item.status == "in_progress":
                 print(f"[Trace] 开始: {item.trace_id[:8]}")
                 print(f"[Trace] 开始: {item.trace_id[:8]}")
             elif item.status == "completed":
             elif item.status == "completed":
@@ -156,10 +162,18 @@ async def main():
     print(f"✓ 结果已保存到: {output_file}")
     print(f"✓ 结果已保存到: {output_file}")
     print()
     print()
 
 
-    # 8. 提示查看 debug 文件
-    print("Debug 文件:")
-    print(f"  - 完整可折叠: {Path.cwd() / '.trace' / 'tree.md'}")
-    print(f"  - 简洁文本: {Path.cwd() / '.trace' / 'tree.txt'}")
+    # 提示使用 API 可视化
+    print("=" * 60)
+    print("可视化 Step Tree:")
+    print("=" * 60)
+    print("1. 启动 API Server:")
+    print("   python3 api_server.py")
+    print()
+    print("2. 浏览器访问:")
+    print("   http://localhost:8000/api/traces")
+    print()
+    print(f"3. Trace ID: {current_trace_id}")
+    print("=" * 60)
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":

+ 659 - 0
frontend/API.md

@@ -0,0 +1,659 @@
+# Agent Execution API - 前端对接文档
+
+> 版本:v2.0
+> 更新日期:2026-02-03
+
+---
+
+## 📋 概览
+
+本 API 提供 Agent 执行过程的实时可视化能力,包括:
+- **REST API** - 查询历史数据、获取完整 Step 树
+- **WebSocket** - 实时推送 Step 更新(支持断线续传)
+
+**核心概念**:
+- **Trace** - 一次完整的任务执行(如一次 Agent 运行)
+- **Step** - 执行过程中的一个原子操作,形成树结构
+- **Event** - Step 的变更事件(用于 WebSocket 推送和断线续传)
+
+可以运行 python3 examples/feature_extract/run.py 来生成数据。
+---
+
+## 🌐 REST API
+
+### 基础信息
+
+- **Base URL**: `http://localhost:8000`
+- **Content-Type**: `application/json`
+
+---
+
+### 1. 列出 Traces
+
+获取 Trace 列表(支持过滤)
+
+```http
+GET /api/traces?status=running&limit=20
+```
+
+**查询参数**:
+| 参数 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| `status` | string | 否 | 过滤状态:`running` / `completed` / `failed` |
+| `mode` | string | 否 | 过滤模式:`call` / `agent` |
+| `limit` | int | 否 | 返回数量(默认 50,最大 100)|
+
+**响应示例**:
+```json
+{
+  "traces": [
+    {
+      "trace_id": "abc123",
+      "mode": "agent",
+      "task": "分析代码库结构",
+      "status": "running",
+      "total_steps": 15,
+      "total_tokens": 5000,
+      "total_cost": 0.05,
+      "created_at": "2026-02-03T15:30:00"
+    }
+  ],
+  "total": 1
+}
+```
+
+---
+
+### 2. 获取 Trace 元数据
+
+```http
+GET /api/traces/{trace_id}
+```
+
+**响应示例**:
+```json
+{
+  "trace_id": "abc123",
+  "mode": "agent",
+  "task": "分析代码库结构",
+  "status": "running",
+  "total_steps": 15,
+  "total_tokens": 5000,
+  "total_cost": 0.05,
+  "total_duration_ms": 12345,
+  "last_sequence": 15,
+  "last_event_id": 15,
+  "created_at": "2026-02-03T15:30:00",
+  "completed_at": null
+}
+```
+
+---
+
+### 3. 获取完整 Step 树 ⭐ 重要
+
+获取 Trace 的完整 Step 树(适合小型 Trace,<100 个 Step)
+
+```http
+GET /api/traces/{trace_id}/tree?view=compact&max_depth=10
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `view` | string | `compact` | `compact` - 精简视图 / `full` - 完整视图 |
+| `max_depth` | int | 无限 | 最大树深度 |
+
+**响应示例**:
+```json
+{
+  "trace_id": "abc123",
+  "root_steps": [
+    {
+      "step_id": "step-001",
+      "step_type": "thought",
+      "status": "completed",
+      "sequence": 1,
+      "parent_id": null,
+      "description": "分析项目结构...",
+      "has_children": true,
+      "children_count": 2,
+      "duration_ms": 1234,
+      "tokens": 500,
+      "cost": 0.005,
+      "created_at": "2026-02-03T15:30:01",
+      "data": {
+        "content": "让我先看看项目的目录结构...",
+        "model": "claude-sonnet-4.5"
+      },
+      "children": [
+        {
+          "step_id": "step-002",
+          "step_type": "action",
+          "status": "completed",
+          "parent_id": "step-001",
+          "description": "glob_files(**/*.py)",
+          "data": {
+            "tool_name": "glob_files",
+            "arguments": {"pattern": "**/*.py"}
+          },
+          "children": [
+            {
+              "step_id": "step-003",
+              "step_type": "result",
+              "status": "completed",
+              "parent_id": "step-002",
+              "data": {
+                "tool_name": "glob_files",
+                "output": ["src/main.py", "src/utils.py"]
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
+```
+
+**注意**:
+- `children` 字段包含嵌套的子节点(递归结构)
+- `compact` 视图:`data` 中的大字段(如 `messages`)会被省略
+- `full` 视图:返回所有字段(数据量可能很大)
+
+---
+
+### 4. 懒加载单个节点
+
+适用于大型 Trace(>100 Step),按需加载子树
+
+```http
+GET /api/traces/{trace_id}/node/{step_id}?expand=true&max_depth=2
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `expand` | bool | `false` | 是否展开子节点 |
+| `max_depth` | int | 1 | 展开深度 |
+| `view` | string | `compact` | 视图类型 |
+
+**响应**:与 `/tree` 格式相同,但只返回指定节点及其子树。
+
+---
+
+## ⚡ WebSocket API
+
+### 连接
+
+```javascript
+const ws = new WebSocket(
+  'ws://localhost:8000/api/traces/{trace_id}/watch?since_event_id=0'
+)
+```
+
+**查询参数**:
+| 参数 | 类型 | 默认值 | 说明 |
+|------|------|--------|------|
+| `since_event_id` | int | `0` | 从哪个事件 ID 开始<br>`0` = 补发所有历史<br>`>0` = 只补发指定 ID 之后的 |
+
+---
+
+### 事件类型
+
+#### 1. connected(连接成功)
+
+```json
+{
+  "event": "connected",
+  "trace_id": "abc123",
+  "current_event_id": 15
+}
+```
+
+**说明**:连接建立后的第一条消息,包含当前最新的 event_id
+
+**前端处理**:
+```javascript
+if (data.event === 'connected') {
+  // 保存 event_id 用于断线重连
+  localStorage.setItem('last_event_id', data.current_event_id)
+}
+```
+
+---
+
+#### 2. step_added(新增 Step)⭐ 最常用
+
+```json
+{
+  "event": "step_added",
+  "event_id": 16,
+  "ts": "2026-02-03T15:30:10.123456",
+  "step": {
+    "step_id": "step-016",
+    "step_type": "action",
+    "status": "completed",
+    "sequence": 16,
+    "parent_id": "step-001",
+    "description": "read_file(config.yaml)",
+    "has_children": false,
+    "children_count": 0,
+    "duration_ms": 50,
+    "data": {
+      "tool_name": "read_file",
+      "arguments": {"file_path": "config.yaml"}
+    }
+  }
+}
+```
+
+**前端处理**:
+```javascript
+if (data.event === 'step_added') {
+  // 添加到树结构
+  addStepToTree(data.step)
+
+  // 更新 event_id
+  localStorage.setItem('last_event_id', data.event_id)
+}
+```
+
+---
+
+#### 3. step_updated(Step 更新)
+
+```json
+{
+  "event": "step_updated",
+  "event_id": 17,
+  "ts": "2026-02-03T15:30:15.123456",
+  "step_id": "step-016",
+  "patch": {
+    "status": "completed",
+    "duration_ms": 1234
+  }
+}
+```
+
+**说明**:`patch` 是增量更新(只包含变化的字段)
+
+**前端处理**:
+```javascript
+if (data.event === 'step_updated') {
+  const step = findStepById(data.step_id)
+  Object.assign(step, data.patch)
+  updateUI()
+}
+```
+
+---
+
+#### 4. trace_completed(任务完成)
+
+```json
+{
+  "event": "trace_completed",
+  "event_id": 18,
+  "ts": "2026-02-03T15:35:00.123456",
+  "trace_id": "abc123",
+  "total_steps": 18
+}
+```
+
+**前端处理**:
+```javascript
+if (data.event === 'trace_completed') {
+  console.log('Task completed!')
+  ws.close()
+}
+```
+
+---
+
+#### 5. error(错误)
+
+```json
+{
+  "event": "error",
+  "message": "Too many missed events (150), please reload full tree via REST API"
+}
+```
+
+**说明**:
+- 如果断线期间产生超过 100 条事件,会收到此错误
+- 此时应该用 REST API 重新加载完整树
+
+---
+
+### 断线续传
+
+**场景**:网络断开后重新连接,不丢失中间的更新
+
+**实现方式**:
+
+```javascript
+let lastEventId = 0
+
+// 初次连接
+const ws = new WebSocket(
+  `ws://localhost:8000/api/traces/abc123/watch?since_event_id=0`
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+
+  // 保存最新 event_id
+  if (data.event_id) {
+    lastEventId = data.event_id
+    localStorage.setItem('trace_abc123_event_id', lastEventId)
+  }
+}
+
+ws.onclose = () => {
+  // 3 秒后重连
+  setTimeout(() => {
+    // 从上次的 event_id 继续
+    const ws2 = new WebSocket(
+      `ws://localhost:8000/api/traces/abc123/watch?since_event_id=${lastEventId}`
+    )
+    // 服务器会补发 lastEventId 之后的所有事件
+  }, 3000)
+}
+```
+
+**注意**:
+- 每条消息都有 `event_id` 和 `ts` 字段
+- 重连时传入 `since_event_id`,服务器自动补发缺失的事件(最多 100 条)
+- 超过 100 条会返回错误,需要用 REST API 重新加载
+
+---
+
+### 心跳检测
+
+保持连接活跃,检测僵尸连接
+
+```javascript
+// 每 30 秒发送心跳
+const heartbeat = setInterval(() => {
+  if (ws.readyState === WebSocket.OPEN) {
+    ws.send('ping')
+  }
+}, 30000)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event === 'pong') {
+    console.log('Connection alive')
+  }
+}
+```
+
+---
+
+## 📊 数据模型
+
+### Trace
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `trace_id` | string | 唯一 ID |
+| `mode` | string | `call` - 单次调用 / `agent` - Agent 模式 |
+| `task` | string | 任务描述(Agent 模式)|
+| `status` | string | `running` / `completed` / `failed` |
+| `total_steps` | int | Step 总数 |
+| `total_tokens` | int | Token 总消耗 |
+| `total_cost` | float | 成本总和 |
+| `total_duration_ms` | int | 总耗时(毫秒)|
+| `last_sequence` | int | 最新 Step 的 sequence |
+| `last_event_id` | int | 最新事件 ID |
+| `created_at` | string | 创建时间(ISO 8601)|
+| `completed_at` | string \| null | 完成时间 |
+
+---
+
+### Step
+
+#### 顶层字段(所有 Step 共有)
+
+| 字段 | 类型 | 说明 |
+|------|------|------|
+| `step_id` | string | 唯一 ID |
+| `trace_id` | string | 所属 Trace ID |
+| `step_type` | string | 步骤类型(见下表)|
+| `status` | string | 状态(见下表)|
+| `sequence` | int | 在 Trace 中的顺序(递增)|
+| `parent_id` | string \| null | 父节点 ID |
+| `description` | string | 简短描述 |
+| `summary` | string \| null | 总结(仅 `evaluation` 类型)|
+| `has_children` | bool | 是否有子节点 |
+| `children_count` | int | 子节点数量 |
+| `duration_ms` | int \| null | 耗时(毫秒)|
+| `tokens` | int \| null | Token 消耗 |
+| `cost` | float \| null | 成本 |
+| `created_at` | string | 创建时间 |
+| `data` | object | 类型相关的详细数据 |
+
+#### step_type(步骤类型)
+
+| 类型 | 说明 | 来源 |
+|------|------|------|
+| `goal` | 目标/计划 | LLM |
+| `thought` | 思考/分析 | LLM |
+| `evaluation` | 评估总结 | LLM |
+| `response` | 最终回复 | LLM |
+| `action` | 工具调用 | System |
+| `result` | 工具结果 | System |
+| `memory_read` | 读取记忆 | System |
+| `memory_write` | 写入记忆 | System |
+
+#### status(步骤状态)
+
+| 状态 | 说明 |
+|------|------|
+| `planned` | 计划中(未执行)|
+| `in_progress` | 执行中 |
+| `awaiting_approval` | 等待用户确认 |
+| `completed` | 已完成 |
+| `failed` | 失败 |
+| `skipped` | 跳过 |
+
+#### data 字段(按 step_type)
+
+**thought / response**:
+```json
+{
+  "model": "claude-sonnet-4.5",
+  "content": "让我先分析...",
+  "messages": [...],  // full 视图才有
+  "tool_calls": [...]  // 如果有工具调用
+}
+```
+
+**action**:
+```json
+{
+  "tool_name": "read_file",
+  "arguments": {
+    "file_path": "config.yaml"
+  }
+}
+```
+
+**result**:
+```json
+{
+  "tool_name": "read_file",
+  "output": "file content...",
+  "error": null
+}
+```
+
+**memory_read**:
+```json
+{
+  "experiences_count": 5,
+  "skills_count": 3
+}
+```
+
+---
+
+## 🎯 推荐的实现方案
+
+### 方案 1:纯 WebSocket(简单场景)
+
+适用于:实时监控进行中的任务,Step 数量 < 100
+
+```javascript
+// 只用 WebSocket,自动获取历史
+const ws = new WebSocket(
+  'ws://localhost:8000/api/traces/abc123/watch?since_event_id=0'
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+
+  if (data.event === 'step_added') {
+    // 历史 + 新增的 Step 都会收到
+    addStepToTree(data.step)
+  }
+}
+```
+
+**优点**:代码简单
+**缺点**:超过 100 个 Step 会失败
+
+---
+
+### 方案 2:REST + WebSocket(生产推荐)
+
+适用于:查看历史任务,或 Step 数量 > 100
+
+```javascript
+// 1. 先用 REST API 获取完整树
+const response = await fetch(
+  `/api/traces/${traceId}/tree?view=compact`
+)
+const treeData = await response.json()
+renderTree(treeData.root_steps)
+
+// 2. 连接 WebSocket 监听增量更新
+const ws = new WebSocket(
+  `ws://localhost:8000/api/traces/${traceId}/watch?since_event_id=0`
+)
+
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event === 'step_added') {
+    addStepToTree(data.step)  // 只处理新增的
+  }
+}
+```
+
+**优点**:可靠,支持大型 Trace
+**缺点**:略复杂
+
+---
+
+## 🐛 错误处理
+
+### HTTP 错误码
+
+| 状态码 | 说明 |
+|--------|------|
+| 200 | 成功 |
+| 404 | Trace/Step 不存在 |
+| 400 | 参数错误 |
+| 500 | 服务器错误 |
+
+### WebSocket 错误
+
+```javascript
+ws.onerror = (error) => {
+  console.error('WebSocket error:', error)
+  // 重连
+}
+
+ws.onclose = (event) => {
+  console.log('Connection closed:', event.code, event.reason)
+  // 自动重连
+}
+```
+
+---
+
+## 💡 最佳实践
+
+### 1. 保存 event_id 用于断线重连
+
+```javascript
+ws.onmessage = (event) => {
+  const data = JSON.parse(event.data)
+  if (data.event_id) {
+    localStorage.setItem(
+      `trace_${traceId}_event_id`,
+      data.event_id
+    )
+  }
+}
+```
+
+### 2. 实现自动重连
+
+```javascript
+function connectWebSocket(traceId) {
+  const lastEventId = localStorage.getItem(`trace_${traceId}_event_id`) || 0
+  const ws = new WebSocket(
+    `ws://localhost:8000/api/traces/${traceId}/watch?since_event_id=${lastEventId}`
+  )
+
+  ws.onclose = () => {
+    setTimeout(() => connectWebSocket(traceId), 3000)
+  }
+
+  return ws
+}
+```
+
+### 3. 使用 compact 视图减少流量
+
+```javascript
+// ✅ 推荐
+const response = await fetch(`/api/traces/${id}/tree?view=compact`)
+
+// ❌ 避免(数据量可能很大)
+const response = await fetch(`/api/traces/${id}/tree?view=full`)
+```
+
+### 4. 按需懒加载(大型 Trace)
+
+```javascript
+// 初次只加载第一层
+const tree = await fetch(
+  `/api/traces/${id}/tree?max_depth=1`
+).then(r => r.json())
+
+// 用户点击展开时,懒加载子树
+async function onExpand(stepId) {
+  const node = await fetch(
+    `/api/traces/${id}/node/${stepId}?expand=true&max_depth=1`
+  ).then(r => r.json())
+
+  appendChildren(stepId, node.children)
+}
+```
+
+---
+
+## 🔗 相关文档
+
+- [Step 树结构详解](./step-tree.md)
+- [API 接口规范](./trace-api.md)
+- [架构设计文档](./README.md)
+
+---
+
+## 📞 问题反馈
+
+如有问题请提 Issue:https://github.com/anthropics/agent/issues

+ 387 - 0
frontend/test_api.py

@@ -0,0 +1,387 @@
+#!/usr/bin/env python3
+"""
+Agent Execution API 自动化测试
+
+测试 REST API 和 WebSocket 接口是否正常工作
+
+运行方式:
+    python3 frontend/test_api.py
+
+要求:
+    - API Server 已启动(python3 api_server.py)
+    - 有至少一个 Trace 数据(运行 examples/feature_extract/run.py)
+"""
+
+import asyncio
+import json
+import sys
+from typing import Optional
+from datetime import datetime
+
+try:
+    import httpx
+    import websockets
+except ImportError:
+    print("❌ 缺少依赖,请安装:")
+    print("   pip install httpx websockets")
+    sys.exit(1)
+
+
+# 配置
+API_BASE = "http://localhost:8000"
+WS_BASE = "ws://localhost:8000"
+
+
+class TestResult:
+    """测试结果"""
+    def __init__(self, name: str):
+        self.name = name
+        self.passed = False
+        self.message = ""
+        self.duration_ms = 0
+
+    def success(self, message: str = ""):
+        self.passed = True
+        self.message = message
+
+    def fail(self, message: str):
+        self.passed = False
+        self.message = message
+
+    def __str__(self):
+        icon = "✅" if self.passed else "❌"
+        return f"{icon} {self.name} ({self.duration_ms}ms)" + (f"\n   {self.message}" if self.message else "")
+
+
+class APITester:
+    """API 测试器"""
+
+    def __init__(self):
+        self.results = []
+        self.test_trace_id: Optional[str] = None
+
+    async def run_all_tests(self):
+        """运行所有测试"""
+        print("🧪 Agent Execution API 自动化测试")
+        print("=" * 60)
+        print()
+
+        # 检查 API Server
+        if not await self.check_server():
+            print("❌ API Server 未启动,请先运行:")
+            print("   python3 api_server.py")
+            return False
+
+        print("✅ API Server 已启动\n")
+
+        # REST API 测试
+        print("📡 测试 REST API")
+        print("-" * 60)
+        await self.test_list_traces()
+        await self.test_get_trace()
+        await self.test_get_tree()
+        print()
+
+        # WebSocket 测试
+        print("⚡ 测试 WebSocket")
+        print("-" * 60)
+        await self.test_websocket_connect()
+        await self.test_websocket_events()
+        await self.test_websocket_reconnect()
+        print()
+
+        # 汇总结果
+        self.print_summary()
+
+        return all(r.passed for r in self.results)
+
+    async def check_server(self) -> bool:
+        """检查 API Server 是否运行"""
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces", timeout=2.0)
+                return response.status_code in [200, 404]
+        except Exception:
+            return False
+
+    async def test_list_traces(self):
+        """测试:列出 Traces"""
+        result = TestResult("GET /api/traces - 列出 Traces")
+        start = datetime.now()
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces?limit=10")
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    if "traces" not in data:
+                        result.fail("响应缺少 'traces' 字段")
+                    else:
+                        trace_count = len(data["traces"])
+                        if trace_count == 0:
+                            result.fail("没有找到 Trace(请先运行 example 生成数据)")
+                        else:
+                            # 保存第一个 Trace ID 用于后续测试
+                            self.test_trace_id = data["traces"][0]["trace_id"]
+                            result.success(f"找到 {trace_count} 个 Trace")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_get_trace(self):
+        """测试:获取单个 Trace"""
+        result = TestResult("GET /api/traces/{id} - 获取 Trace 元数据")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(f"{API_BASE}/api/traces/{self.test_trace_id}")
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    required_fields = ["trace_id", "mode", "status", "total_steps"]
+                    missing = [f for f in required_fields if f not in data]
+                    if missing:
+                        result.fail(f"响应缺少字段: {missing}")
+                    else:
+                        result.success(f"Status: {data['status']}, Steps: {data['total_steps']}")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_get_tree(self):
+        """测试:获取完整 Step 树"""
+        result = TestResult("GET /api/traces/{id}/tree - 获取 Step 树")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            async with httpx.AsyncClient() as client:
+                response = await client.get(
+                    f"{API_BASE}/api/traces/{self.test_trace_id}/tree?view=compact"
+                )
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if response.status_code != 200:
+                    result.fail(f"HTTP {response.status_code}")
+                else:
+                    data = response.json()
+                    if "root_steps" not in data:
+                        result.fail("响应缺少 'root_steps' 字段")
+                    else:
+                        root_count = len(data["root_steps"])
+                        # 统计总 step 数(递归)
+                        def count_steps(nodes):
+                            count = len(nodes)
+                            for node in nodes:
+                                if "children" in node:
+                                    count += count_steps(node["children"])
+                            return count
+
+                        total_steps = count_steps(data["root_steps"])
+                        result.success(f"根节点: {root_count}, 总 Steps: {total_steps}")
+        except Exception as e:
+            result.fail(f"请求失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_connect(self):
+        """测试:WebSocket 连接"""
+        result = TestResult("WebSocket - 连接和接收 connected 事件")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+            async with websockets.connect(uri) as ws:
+                # 接收第一条消息(connected)
+                message = await asyncio.wait_for(ws.recv(), timeout=3.0)
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                data = json.loads(message)
+                if data.get("event") != "connected":
+                    result.fail(f"首条消息不是 'connected',而是: {data.get('event')}")
+                elif "current_event_id" not in data:
+                    result.fail("connected 消息缺少 'current_event_id' 字段")
+                else:
+                    event_id = data["current_event_id"]
+                    result.success(f"Event ID: {event_id}")
+        except asyncio.TimeoutError:
+            result.fail("连接超时(3秒)")
+        except Exception as e:
+            result.fail(f"连接失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_events(self):
+        """测试:WebSocket 接收历史事件"""
+        result = TestResult("WebSocket - 接收历史 step_added 事件")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+            async with websockets.connect(uri) as ws:
+                # 接收 connected
+                await ws.recv()
+
+                # 接收后续消息(应该是历史 step_added 事件)
+                events_received = []
+                try:
+                    while len(events_received) < 10:  # 最多接收 10 条
+                        message = await asyncio.wait_for(ws.recv(), timeout=1.0)
+                        data = json.loads(message)
+                        events_received.append(data.get("event"))
+                except asyncio.TimeoutError:
+                    pass  # 没有更多消息了
+
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                step_added_count = events_received.count("step_added")
+                if step_added_count == 0:
+                    result.fail("没有收到 step_added 事件(Trace 可能为空)")
+                else:
+                    other_events = [e for e in events_received if e != "step_added"]
+                    result.success(
+                        f"收到 {step_added_count} 个 step_added" +
+                        (f", {len(other_events)} 个其他事件" if other_events else "")
+                    )
+        except Exception as e:
+            result.fail(f"测试失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    async def test_websocket_reconnect(self):
+        """测试:WebSocket 断线续传"""
+        result = TestResult("WebSocket - 断线续传(since_event_id)")
+        start = datetime.now()
+
+        if not self.test_trace_id:
+            result.fail("跳过(没有可用的 Trace ID)")
+            self.results.append(result)
+            print(result)
+            return
+
+        try:
+            uri = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id=0"
+
+            # 第一次连接,获取 event_id
+            last_event_id = 0
+            async with websockets.connect(uri) as ws:
+                message = await ws.recv()
+                data = json.loads(message)
+                last_event_id = data.get("current_event_id", 0)
+
+            if last_event_id == 0:
+                result.fail("无法获取 event_id")
+                self.results.append(result)
+                print(result)
+                return
+
+            # 第二次连接,使用 since_event_id
+            uri2 = f"{WS_BASE}/api/traces/{self.test_trace_id}/watch?since_event_id={last_event_id}"
+            async with websockets.connect(uri2) as ws:
+                message = await ws.recv()
+                data = json.loads(message)
+
+                result.duration_ms = int((datetime.now() - start).total_seconds() * 1000)
+
+                if data.get("event") != "connected":
+                    result.fail(f"重连后首条消息不是 'connected': {data.get('event')}")
+                else:
+                    # 检查是否不再接收历史事件
+                    try:
+                        message = await asyncio.wait_for(ws.recv(), timeout=0.5)
+                        data2 = json.loads(message)
+                        # 如果收到消息,说明补发了历史(可能是新增的)
+                        result.success(f"重连成功,从 event_id={last_event_id} 继续")
+                    except asyncio.TimeoutError:
+                        # 没有收到消息,说明没有新增的事件(正常)
+                        result.success(f"重连成功,无新增事件(event_id={last_event_id})")
+        except Exception as e:
+            result.fail(f"测试失败: {e}")
+
+        self.results.append(result)
+        print(result)
+
+    def print_summary(self):
+        """打印测试摘要"""
+        print("=" * 60)
+        print("📊 测试摘要")
+        print("=" * 60)
+
+        passed = sum(1 for r in self.results if r.passed)
+        total = len(self.results)
+        failed = total - passed
+
+        print(f"\n总计: {total} 个测试")
+        print(f"✅ 通过: {passed}")
+        if failed > 0:
+            print(f"❌ 失败: {failed}")
+        print()
+
+        if failed > 0:
+            print("失败的测试:")
+            for r in self.results:
+                if not r.passed:
+                    print(f"  - {r.name}")
+                    if r.message:
+                        print(f"    {r.message}")
+            print()
+
+        if passed == total:
+            print("🎉 所有测试通过!")
+        else:
+            print("⚠️  部分测试失败,请检查上述错误")
+
+        print()
+
+
+async def main():
+    """主函数"""
+    tester = APITester()
+    success = await tester.run_all_tests()
+    sys.exit(0 if success else 1)
+
+
+if __name__ == "__main__":
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        print("\n\n⚠️  测试被中断")
+        sys.exit(1)

+ 0 - 2
requirements.txt

@@ -11,5 +11,3 @@ browser-use>=0.11.0
 fastapi>=0.115.0
 fastapi>=0.115.0
 uvicorn[standard]>=0.32.0
 uvicorn[standard]>=0.32.0
 websockets>=13.0
 websockets>=13.0
-
-