Просмотр исходного кода

refactor: context management (without test)

Talegorithm 1 месяц назад
Родитель
Сommit
115a5d7a42

+ 97 - 228
agent/core/runner.py

@@ -3,20 +3,21 @@ Agent Runner - Agent 执行引擎
 
 核心职责:
 1. 执行 Agent 任务(循环调用 LLM + 工具)
-2. 记录执行图(Trace + Steps
+2. 记录执行轨迹(Trace + Messages + GoalTree
 3. 检索和注入记忆(Experience + Skill)
-4. 管理执行计划(Goal Tree)
+4. 管理执行计划(GoalTree)
 5. 收集反馈,提取经验
 """
 
 import logging
-from dataclasses import field
 from datetime import datetime
 from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Union
 
 from agent.core.config import AgentConfig, CallResult
-from agent.execution import Trace, Step, TraceStore
-from agent.goal import GoalTree, goal_tool, compress_messages_for_goal
+from agent.execution.models import Trace, Message
+from agent.execution.protocols import TraceStore
+from agent.goal.models import GoalTree
+from agent.goal.tool import goal_tool
 from agent.memory.models import Experience, Skill
 from agent.memory.protocols import MemoryStore, StateStore
 from agent.memory.skill_loader import load_skills_from_dir
@@ -89,12 +90,6 @@ class AgentRunner:
         import uuid
         return str(uuid.uuid4())
 
-    async def _dump_debug(self, trace_id: str) -> None:
-        """Debug 模式(已废弃 - 使用 API 可视化替代)"""
-        # 不再自动生成 tree.txt/tree.md/tree.json
-        # 请使用 API Server 进行可视化:python3 api_server.py
-        pass
-
     # ===== 单次调用 =====
 
     async def call(
@@ -124,7 +119,7 @@ class AgentRunner:
             raise ValueError("llm_call function not provided")
 
         trace_id = None
-        step_id = None
+        message_id = None
 
         # 创建 Trace
         if trace and self.trace_store:
@@ -136,10 +131,8 @@ class AgentRunner:
             trace_id = await self.trace_store.create_trace(trace_obj)
 
         # 准备工具 Schema
-        # 合并内置工具 + 用户指定工具
         tool_names = BUILTIN_TOOLS.copy()
         if tools:
-            # 添加用户指定的工具(去重)
             for tool in tools:
                 if tool not in tool_names:
                     tool_names.append(tool)
@@ -154,41 +147,31 @@ class AgentRunner:
             **kwargs
         )
 
-        # 记录 Step
+        # 记录 Message(单次调用模式不使用 GoalTree)
         if trace and self.trace_store and trace_id:
-            step = Step.create(
+            msg = Message.create(
                 trace_id=trace_id,
-                step_type="thought",
-                sequence=0,
-                status="completed",
-                description=f"LLM 调用 ({model})",
-                data={
-                    "messages": messages,
-                    "response": result.get("content", ""),
-                    "model": model,
-                    "tools": tool_schemas,  # 记录传给模型的 tools schema
-                    "tool_calls": result.get("tool_calls"),
-                },
+                role="assistant",
+                sequence=1,
+                goal_id="0",  # 单次调用没有 goal,使用占位符
+                content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
                 tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0),
                 cost=result.get("cost", 0),
             )
-            step_id = await self.trace_store.add_step(step)
-            await self._dump_debug(trace_id)
+            message_id = await self.trace_store.add_message(msg)
 
             # 完成 Trace
             await self.trace_store.update_trace(
                 trace_id,
                 status="completed",
                 completed_at=datetime.now(),
-                total_tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0),
-                total_cost=result.get("cost", 0)
             )
 
         return CallResult(
             reply=result.get("content", ""),
             tool_calls=result.get("tool_calls"),
             trace_id=trace_id,
-            step_id=step_id,
+            step_id=message_id,  # 兼容字段名
             tokens={
                 "prompt": result.get("prompt_tokens", 0),
                 "completion": result.get("completion_tokens", 0),
@@ -211,7 +194,7 @@ class AgentRunner:
         enable_memory: Optional[bool] = None,
         auto_execute_tools: Optional[bool] = None,
         **kwargs
-    ) -> AsyncIterator[Union[Trace, Step]]:
+    ) -> AsyncIterator[Union[Trace, Message]]:
         """
         Agent 模式执行
 
@@ -229,7 +212,7 @@ class AgentRunner:
             **kwargs: 其他参数
 
         Yields:
-            Union[Trace, Step]: Trace 对象(状态变化)或 Step 对象(执行过程)
+            Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
         """
         if not self.llm_call:
             raise ValueError("llm_call function not provided")
@@ -242,19 +225,25 @@ class AgentRunner:
 
         # 创建 Trace
         trace_id = self._generate_id()
-        trace_obj = None
+        trace_obj = Trace(
+            trace_id=trace_id,
+            mode="agent",
+            task=task,
+            agent_type=agent_type,
+            uid=uid,
+            context={"model": model, **kwargs},
+            status="running"
+        )
+
         if self.trace_store:
-            trace_obj = Trace(
-                trace_id=trace_id,
-                mode="agent",
-                task=task,
-                agent_type=agent_type,
-                uid=uid,
-                context={"model": model, **kwargs}
-            )
             await self.trace_store.create_trace(trace_obj)
-            # 返回 Trace(表示开始)
-            yield trace_obj
+
+            # 初始化 GoalTree
+            goal_tree = self.goal_tree or GoalTree(mission=task)
+            await self.trace_store.update_goal_tree(trace_id, goal_tree)
+
+        # 返回 Trace(表示开始)
+        yield trace_obj
 
         try:
             # 加载记忆(Experience 和 Skill)
@@ -265,28 +254,9 @@ class AgentRunner:
                 scope = f"agent:{agent_type}"
                 experiences = await self.memory_store.search_experiences(scope, task)
                 experiences_text = self._format_experiences(experiences)
-
-                # 记录 memory_read Step
-                if self.trace_store:
-                    mem_step = Step.create(
-                        trace_id=trace_id,
-                        step_type="memory_read",
-                        sequence=0,
-                        status="completed",
-                        description=f"加载 {len(experiences)} 条经验",
-                        data={
-                            "experiences_count": len(experiences),
-                            "experiences": [e.to_dict() for e in experiences],
-                        }
-                    )
-                    await self.trace_store.add_step(mem_step)
-                    await self._dump_debug(trace_id)
-                    # 返回 Step(表示记忆加载完成)
-                    yield mem_step
+                logger.info(f"加载 {len(experiences)} 条经验")
 
             # 加载 Skills(内置 + 用户自定义)
-            # load_skills_from_dir() 会自动加载 agent/skills/ 中的内置 skills
-            # 如果提供了 skills_dir,会额外加载用户自定义的 skills
             skills = load_skills_from_dir(self.skills_dir)
             if skills:
                 skills_text = self._format_skills(skills)
@@ -312,14 +282,18 @@ class AgentRunner:
             # 添加任务描述
             messages.append({"role": "user", "content": task})
 
-            # 初始化 GoalTree
-            goal_tree = self.goal_tree or GoalTree(mission=task)
+            # 获取 GoalTree
+            goal_tree = None
+            if self.trace_store:
+                goal_tree = await self.trace_store.get_goal_tree(trace_id)
+
+                # 设置 goal_tree 到 goal 工具(供 LLM 调用)
+                from agent.tools.builtin.goal import set_goal_tree
+                set_goal_tree(goal_tree)
 
             # 准备工具 Schema
-            # 合并内置工具 + 用户指定工具
             tool_names = BUILTIN_TOOLS.copy()
             if tools:
-                # 添加用户指定的工具(去重)
                 for tool in tools:
                     if tool not in tool_names:
                         tool_names.append(tool)
@@ -327,17 +301,14 @@ class AgentRunner:
             tool_schemas = self.tools.get_schemas(tool_names)
 
             # 执行循环
-            current_goal_id = None  # 当前焦点 goal
             sequence = 1
-            total_tokens = 0
-            total_cost = 0.0
 
             for iteration in range(max_iterations):
                 # 注入当前计划到 messages(如果有 goals)
                 llm_messages = list(messages)
-                if goal_tree.goals:
+                if goal_tree and goal_tree.goals:
                     plan_text = f"\n## Current Plan\n\n{goal_tree.to_prompt()}"
-                    # 作为最后一条 system 消息注入
+                    # 在最后一条 system 消息之后注入
                     llm_messages.append({"role": "system", "content": plan_text})
 
                 # 调用 LLM
@@ -353,76 +324,34 @@ class AgentRunner:
                 step_tokens = result.get("prompt_tokens", 0) + result.get("completion_tokens", 0)
                 step_cost = result.get("cost", 0)
 
-                total_tokens += step_tokens
-                total_cost += step_cost
+                # 获取当前 goal_id
+                current_goal_id = goal_tree.current_id if goal_tree else "0"
+
+                # 记录 assistant Message
+                assistant_msg = Message.create(
+                    trace_id=trace_id,
+                    role="assistant",
+                    sequence=sequence,
+                    goal_id=current_goal_id,
+                    content={"text": response_content, "tool_calls": tool_calls},
+                    tokens=step_tokens,
+                    cost=step_cost,
+                )
 
-                # 记录 LLM 调用 Step
-                llm_step_id = self._generate_id()
-                llm_step = None
                 if self.trace_store:
-                    # 推断 step_type
-                    step_type = "thought"
-                    if tool_calls:
-                        step_type = "thought"  # 有工具调用的思考
-                    elif not tool_calls and iteration > 0:
-                        step_type = "response"  # 无工具调用,可能是最终回复
-
-                    llm_step = Step(
-                        step_id=llm_step_id,
-                        trace_id=trace_id,
-                        step_type=step_type,
-                        status="completed",
-                        sequence=sequence,
-                        parent_id=current_goal_id,
-                        description=response_content[:100] + "..." if len(response_content) > 100 else response_content,
-                        data={
-                            "messages": messages,  # 记录完整的 messages(包含 system prompt)
-                            "content": response_content,
-                            "model": model,
-                            "tools": tool_schemas,  # 记录传给模型的 tools schema
-                            "tool_calls": tool_calls,
-                        },
-                        tokens=step_tokens,
-                        cost=step_cost,
-                    )
-                    await self.trace_store.add_step(llm_step)
-                    await self._dump_debug(trace_id)
-                    # 返回 Step(LLM 思考完成)
-                    yield llm_step
+                    await self.trace_store.add_message(assistant_msg)
+                    # WebSocket 广播由 add_message 内部的 append_event 触发
 
+                yield assistant_msg
                 sequence += 1
 
                 # 处理工具调用
                 if tool_calls and auto_execute_tools:
-                    # 检查是否需要用户确认
-                    if self.tools.check_confirmation_required(tool_calls):
-                        # 创建等待确认的 Step
-                        await_step = Step.create(
-                            trace_id=trace_id,
-                            step_type="action",
-                            status="awaiting_approval",
-                            sequence=sequence,
-                            parent_id=llm_step_id,
-                            description="等待用户确认工具调用",
-                            data={
-                                "tool_calls": tool_calls,
-                                "confirmation_flags": self.tools.get_confirmation_flags(tool_calls),
-                                "editable_params": self.tools.get_editable_params_map(tool_calls)
-                            }
-                        )
-                        if self.trace_store:
-                            await self.trace_store.add_step(await_step)
-                            await self._dump_debug(trace_id)
-                        yield await_step
-                        # TODO: 等待用户确认
-                        break
-
-                    # 执行工具
+                    # 添加 assistant 消息到对话历史
                     messages.append({
                         "role": "assistant",
                         "content": response_content,
                         "tool_calls": tool_calls,
-                        "goal_id": goal_tree.current_id,
                     })
 
                     for tc in tool_calls:
@@ -432,123 +361,70 @@ class AgentRunner:
                             import json
                             tool_args = json.loads(tool_args)
 
-                        # 拦截 goal 工具调用
+                        # 拦截 goal 工具调用(需要保存更新后的 GoalTree)
                         if tool_name == "goal":
-                            prev_goal_id = goal_tree.current_id
-                            prev_goal = goal_tree.get_current()
-                            tool_result = goal_tool(tree=goal_tree, **tool_args)
-
-                            # 如果 done/abandon 触发了压缩
-                            if prev_goal_id and prev_goal:
-                                if prev_goal.status in ("completed", "abandoned") and prev_goal.summary:
-                                    messages = compress_messages_for_goal(
-                                        messages, prev_goal_id, prev_goal.summary
-                                    )
-                        else:
-                            # 执行普通工具
+                            # 执行 goal 工具
                             tool_result = await self.tools.execute(
                                 tool_name,
                                 tool_args,
                                 uid=uid or ""
                             )
 
-                        # 记录 action Step
-                        action_step_id = self._generate_id()
-                        action_step = None
-                        if self.trace_store:
-                            action_step = Step(
-                                step_id=action_step_id,
-                                trace_id=trace_id,
-                                step_type="action",
-                                status="completed",
-                                sequence=sequence,
-                                parent_id=llm_step_id,
-                                description=f"{tool_name}({', '.join(f'{k}={v}' for k, v in list(tool_args.items())[:2])})",
-                                data={
-                                    "tool_name": tool_name,
-                                    "arguments": tool_args,
-                                }
+                            # 保存更新后的 GoalTree
+                            if self.trace_store and goal_tree:
+                                await self.trace_store.update_goal_tree(trace_id, goal_tree)
+
+                                # TODO: 广播 goal 更新事件
+                        else:
+                            # 执行普通工具
+                            tool_result = await self.tools.execute(
+                                tool_name,
+                                tool_args,
+                                uid=uid or ""
                             )
-                            await self.trace_store.add_step(action_step)
-                            await self._dump_debug(trace_id)
-                            # 返回 Step(工具调用)
-                            yield action_step
 
-                        sequence += 1
+                        # 记录 tool Message
+                        tool_msg = Message.create(
+                            trace_id=trace_id,
+                            role="tool",
+                            sequence=sequence,
+                            goal_id=current_goal_id,
+                            tool_call_id=tc["id"],
+                            content={"tool_name": tool_name, "result": tool_result},
+                        )
 
-                        # 记录 result Step
-                        result_step_id = self._generate_id()
-                        result_step = None
                         if self.trace_store:
-                            result_step = Step(
-                                step_id=result_step_id,
-                                trace_id=trace_id,
-                                step_type="result",
-                                status="completed",
-                                sequence=sequence,
-                                parent_id=action_step_id,
-                                description=str(tool_result)[:100] if tool_result else "",
-                                data={
-                                    "tool_name": tool_name,
-                                    "output": tool_result,
-                                }
-                            )
-                            await self.trace_store.add_step(result_step)
-                            await self._dump_debug(trace_id)
-                            # 返回 Step(工具结果)
-                            yield result_step
+                            await self.trace_store.add_message(tool_msg)
 
+                        yield tool_msg
                         sequence += 1
 
-                        # 添加到消息(Gemini 需要 name 字段!)
+                        # 添加到消息历史
                         messages.append({
                             "role": "tool",
                             "tool_call_id": tc["id"],
                             "name": tool_name,
-                            "content": tool_result,
-                            "goal_id": goal_tree.current_id,
+                            "content": str(tool_result),
                         })
 
                     continue  # 继续循环
 
                 # 无工具调用,任务完成
-                # 记录 response Step
-                response_step_id = self._generate_id()
-                response_step = None
-                if self.trace_store:
-                    response_step = Step(
-                        step_id=response_step_id,
-                        trace_id=trace_id,
-                        step_type="response",
-                        status="completed",
-                        sequence=sequence,
-                        parent_id=current_goal_id,
-                        description=response_content[:100] + "..." if len(response_content) > 100 else response_content,
-                        data={
-                            "content": response_content,
-                            "is_final": True
-                        }
-                    )
-                    await self.trace_store.add_step(response_step)
-                    await self._dump_debug(trace_id)
-                    # 返回 Step(最终回复)
-                    yield response_step
-
                 break
 
             # 完成 Trace
             if self.trace_store:
-                await self.trace_store.update_trace(
-                    trace_id,
-                    status="completed",
-                    completed_at=datetime.now(),
-                    total_tokens=total_tokens,
-                    total_cost=total_cost
-                )
-                # 重新获取更新后的 Trace 并返回
                 trace_obj = await self.trace_store.get_trace(trace_id)
                 if trace_obj:
-                    yield trace_obj
+                    await self.trace_store.update_trace(
+                        trace_id,
+                        status="completed",
+                        completed_at=datetime.now(),
+                    )
+                    # 重新获取更新后的 Trace 并返回
+                    trace_obj = await self.trace_store.get_trace(trace_id)
+                    if trace_obj:
+                        yield trace_obj
 
         except Exception as e:
             logger.error(f"Agent run failed: {e}")
@@ -559,7 +435,6 @@ class AgentRunner:
                     status="failed",
                     completed_at=datetime.now()
                 )
-                # 重新获取更新后的 Trace 并返回
                 trace_obj = await self.trace_store.get_trace(trace_id)
                 if trace_obj:
                     yield trace_obj
@@ -578,9 +453,3 @@ class AgentRunner:
         if not experiences:
             return ""
         return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)
-
-    def _format_skills(self, skills: List[Skill]) -> str:
-        """格式化 Skills 为 Prompt 文本"""
-        if not skills:
-            return ""
-        return "\n\n".join(s.to_prompt_text() for s in skills)

+ 22 - 26
agent/execution/__init__.py

@@ -2,15 +2,18 @@
 Execution - 执行追踪系统
 
 核心职责:
-1. Trace/Step 模型定义
+1. Trace/Message 模型定义(新架构)
 2. 存储接口和实现(文件系统)
-3. Step 树可视化(文本/markdown/JSON
-4. RESTful API(可视化查询,支持 compact/full 视图
+3. GoalTree 集成(计划管理
+4. RESTful API(可视化查询)
 5. WebSocket 推送(实时更新,支持断线续传)
 """
 
 # 模型(核心,无依赖)
-from agent.execution.models import Trace, Step, StepType, StepStatus
+from agent.execution.models import Trace, Message
+
+# 向后兼容:保留 Step 导出(已废弃)
+from agent.execution.models import Step, StepType, StepStatus
 
 # 存储接口(核心,无依赖)
 from agent.execution.protocols import TraceStore
@@ -18,9 +21,6 @@ from agent.execution.protocols import TraceStore
 # 文件系统存储实现(跨进程 + 持久化)
 from agent.execution.fs_store import FileSystemTraceStore
 
-# Debug 工具(可视化)
-from agent.execution.tree_dump import StepTreeDumper, dump_tree, dump_markdown, dump_json
-
 
 # API 路由(可选,需要 FastAPI)
 def _get_api_router():
@@ -39,36 +39,32 @@ def _get_ws_router():
 def _get_broadcast_functions():
     """延迟导入 WebSocket 广播函数"""
     from agent.execution.websocket import (
-        broadcast_step_added,
-        broadcast_step_updated,
+        broadcast_goal_added,
+        broadcast_goal_updated,
+        broadcast_branch_started,
+        broadcast_branch_completed,
+        broadcast_explore_completed,
+        broadcast_trace_completed,
+    )
+    return (
+        broadcast_goal_added,
+        broadcast_goal_updated,
+        broadcast_branch_started,
+        broadcast_branch_completed,
+        broadcast_explore_completed,
         broadcast_trace_completed,
     )
-    return broadcast_step_added, broadcast_step_updated, broadcast_trace_completed
-
-
-# 便捷属性(仅在访问时导入)
-@property
-def api_router():
-    return _get_api_router()
-
-
-@property
-def ws_router():
-    return _get_ws_router()
 
 
 __all__ = [
     # 模型
     "Trace",
+    "Message",
+    # 向后兼容(已废弃)
     "Step",
     "StepType",
     "StepStatus",
     # 存储
     "TraceStore",
     "FileSystemTraceStore",
-    # Debug/可视化
-    "StepTreeDumper",
-    "dump_tree",
-    "dump_markdown",
-    "dump_json",
 ]

+ 63 - 218
agent/execution/api.py

@@ -1,12 +1,11 @@
 """
-Step 树 RESTful API
+Trace RESTful API
 
-提供 Trace 和 Step 的查询接口,支持懒加载和 compact/full 视图
+提供 Trace、GoalTree、Message、Branch 的查询接口
 """
 
 from typing import List, Optional, Dict, Any
 from fastapi import APIRouter, HTTPException, Query
-from fastapi.responses import PlainTextResponse
 from pydantic import BaseModel
 
 from agent.execution.protocols import TraceStore
@@ -23,49 +22,22 @@ class TraceListResponse(BaseModel):
     traces: List[Dict[str, Any]]
 
 
-class TraceResponse(BaseModel):
-    """Trace 元数据响应"""
-    trace_id: str
-    mode: str
-    task: Optional[str] = None
-    agent_type: Optional[str] = None
-    status: str
-    total_steps: int
-    total_tokens: int
-    total_cost: float
-    created_at: str
-    completed_at: Optional[str] = None
-
-
-class StepNode(BaseModel):
-    """Step 节点(递归结构)"""
-    step_id: str
-    step_type: str
-    status: str
-    description: str
-    sequence: int
-    parent_id: Optional[str] = None
-    data: Optional[Dict[str, Any]] = None
-    summary: Optional[str] = None
-    duration_ms: Optional[int] = None
-    tokens: Optional[int] = None
-    cost: Optional[float] = None
-    created_at: str
-    children: List["StepNode"] = []
-
-
-class TreeResponse(BaseModel):
-    """完整树响应"""
-    trace_id: str
-    root_steps: List[StepNode]
-
-
-class NodeResponse(BaseModel):
-    """节点响应"""
-    step_id: Optional[str]
-    step_type: Optional[str]
-    description: Optional[str]
-    children: List[StepNode]
+class TraceDetailResponse(BaseModel):
+    """Trace 详情响应(包含 GoalTree 和分支元数据)"""
+    trace: Dict[str, Any]
+    goal_tree: Optional[Dict[str, Any]] = None
+    branches: Dict[str, Dict[str, Any]] = {}
+
+
+class MessagesResponse(BaseModel):
+    """Messages 响应"""
+    messages: List[Dict[str, Any]]
+
+
+class BranchDetailResponse(BaseModel):
+    """分支详情响应(包含分支的 GoalTree)"""
+    branch: Dict[str, Any]
+    goal_tree: Optional[Dict[str, Any]] = None
 
 
 # ===== 全局 TraceStore(由 api_server.py 注入)=====
@@ -121,72 +93,49 @@ async def list_traces(
     )
 
 
-@router.get("/{trace_id}", response_model=TraceResponse)
+@router.get("/{trace_id}", response_model=TraceDetailResponse)
 async def get_trace(trace_id: str):
     """
-    获取 Trace 元数据
-
-    Args:
-        trace_id: Trace ID
-    """
-    store = get_trace_store()
-    trace = await store.get_trace(trace_id)
-    if not trace:
-        raise HTTPException(status_code=404, detail="Trace not found")
-
-    return TraceResponse(**trace.to_dict())
+    获取 Trace 详情
 
-
-@router.get("/{trace_id}/tree", response_model=TreeResponse)
-async def get_full_tree(
-    trace_id: str,
-    view: str = Query("compact", regex="^(compact|full)$"),
-    max_depth: int = Query(999, ge=1, le=999)
-):
-    """
-    获取完整 Step 树(小型 Trace 推荐)
+    返回 Trace 元数据、主线 GoalTree、分支元数据(不含分支内 GoalTree)
 
     Args:
         trace_id: Trace ID
-        view: compact(默认,不含 blob)| full(含 blob)
-        max_depth: 最大深度
     """
     store = get_trace_store()
 
-    # 验证 Trace 存在
+    # 获取 Trace
     trace = await store.get_trace(trace_id)
     if not trace:
         raise HTTPException(status_code=404, detail="Trace not found")
 
-    # 获取所有 Steps
-    steps = await store.get_trace_steps(trace_id)
+    # 获取 GoalTree
+    goal_tree = await store.get_goal_tree(trace_id)
 
-    # 构建树结构
-    root_nodes = await _build_tree(store, trace_id, None, view=view, expand=True, max_depth=max_depth)
+    # 获取所有分支元数据
+    branches = await store.list_branches(trace_id)
 
-    return TreeResponse(
-        trace_id=trace_id,
-        root_steps=root_nodes
+    return TraceDetailResponse(
+        trace=trace.to_dict(),
+        goal_tree=goal_tree.to_dict() if goal_tree else None,
+        branches={b_id: b.to_dict() for b_id, b in branches.items()}
     )
 
 
-@router.get("/{trace_id}/node/{step_id}", response_model=NodeResponse)
-async def get_node(
+@router.get("/{trace_id}/messages", response_model=MessagesResponse)
+async def get_messages(
     trace_id: str,
-    step_id: str,
-    view: str = Query("compact", regex="^(compact|full)$"),
-    expand: bool = Query(False, description="是否加载子节点"),
-    max_depth: int = Query(1, ge=1, le=10, description="递归深度")
+    goal_id: Optional[str] = Query(None, description="过滤指定 Goal 的消息"),
+    branch_id: Optional[str] = Query(None, description="过滤指定分支的消息")
 ):
     """
-    懒加载节点 + 子节点(大型 Trace 推荐)
+    获取 Messages
 
     Args:
         trace_id: Trace ID
-        step_id: Step ID("null" 表示根节点)
-        view: compact | full
-        expand: 是否加载子节点
-        max_depth: 递归深度
+        goal_id: 可选,过滤指定 Goal 的消息
+        branch_id: 可选,过滤指定分支的消息
     """
     store = get_trace_store()
 
@@ -195,151 +144,47 @@ async def get_node(
     if not trace:
         raise HTTPException(status_code=404, detail="Trace not found")
 
-    # step_id = "null" 表示根节点
-    actual_step_id = None if step_id == "null" else step_id
-
-    # 验证 Step 存在(非根节点)
-    if actual_step_id:
-        step = await store.get_step(actual_step_id)
-        if not step or step.trace_id != trace_id:
-            raise HTTPException(status_code=404, detail="Step not found")
-
-    # 构建节点树
-    children = await _build_tree(store, trace_id, actual_step_id, view=view, expand=expand, max_depth=max_depth)
-
-    # 如果是根节点,返回所有根 Steps
-    if actual_step_id is None:
-        return NodeResponse(
-            step_id=None,
-            step_type=None,
-            description=None,
-            children=children
-        )
-
-    # 否则返回当前节点 + 子节点
-    step = await store.get_step(actual_step_id)
-    return NodeResponse(
-        step_id=step.step_id,
-        step_type=step.step_type,
-        description=step.description,
-        children=children
+    # 获取 Messages
+    if goal_id:
+        messages = await store.get_messages_by_goal(trace_id, goal_id, branch_id)
+    else:
+        messages = await store.get_trace_messages(trace_id, branch_id)
+
+    return MessagesResponse(
+        messages=[m.to_dict() for m in messages]
     )
 
 
-@router.get("/{trace_id}/node/{step_id}/children")
-async def get_children_paginated(
+@router.get("/{trace_id}/branches/{branch_id}", response_model=BranchDetailResponse)
+async def get_branch_detail(
     trace_id: str,
-    step_id: str,
-    cursor: Optional[int] = Query(None, description="上次最后的 sequence"),
-    limit: int = Query(20, ge=1, le=100),
-    view: str = Query("compact", regex="^(compact|full)$")
+    branch_id: str
 ):
     """
-    分页获取子节点(基于 sequence 游标)
+    获取分支详情
+
+    返回分支元数据和分支的 GoalTree(按需加载)
 
     Args:
         trace_id: Trace ID
-        step_id: Step ID
-        cursor: 上次最后的 sequence(None 表示从头开始)
-        limit: 每页数量
-        view: compact | full
-
-    Returns:
-        {
-            "children": [...],
-            "next_cursor": 123,  # 下一页游标(None 表示没有更多)
-            "has_more": true
-        }
+        branch_id: 分支 ID
     """
     store = get_trace_store()
 
-    # 验证 trace 存在
+    # 验证 Trace 存在
     trace = await store.get_trace(trace_id)
     if not trace:
         raise HTTPException(status_code=404, detail="Trace not found")
 
-    # 验证 step 存在
-    step = await store.get_step(step_id)
-    if not step or step.trace_id != trace_id:
-        raise HTTPException(status_code=404, detail="Step not found")
-
-    # 获取所有子节点
-    children = await store.get_step_children(step_id)
-
-    # 过滤 cursor 之后的节点
-    if cursor is not None:
-        children = [s for s in children if s.sequence > cursor]
-
-    # 分页
-    has_more = len(children) > limit
-    page = children[:limit]
-    next_cursor = page[-1].sequence if page and has_more else None
-
-    # 序列化
-    children_dicts = [s.to_dict(view=view) for s in page]
-
-    return {
-        "children": children_dicts,
-        "next_cursor": next_cursor,
-        "has_more": has_more
-    }
-
-
-# ===== 核心算法:懒加载树构建 =====
+    # 获取分支元数据
+    branch = await store.get_branch(trace_id, branch_id)
+    if not branch:
+        raise HTTPException(status_code=404, detail="Branch not found")
 
+    # 获取分支的 GoalTree
+    goal_tree = await store.get_branch_goal_tree(trace_id, branch_id)
 
-async def _build_tree(
-    store: TraceStore,
-    trace_id: str,
-    step_id: Optional[str],
-    view: str = "compact",  # 新增参数
-    expand: bool = False,
-    max_depth: int = 1,
-    current_depth: int = 0
-) -> List[StepNode]:
-    """
-    懒加载核心逻辑(简洁版本)
-
-    没有"批次计算"、没有"同层完整性检查"
-    只有简单的递归遍历
-
-    Args:
-        store: TraceStore 实例
-        trace_id: Trace ID
-        step_id: 当前 Step ID(None 表示根节点)
-        view: "compact" | "full"
-        expand: 是否展开子节点
-        max_depth: 最大递归深度
-        current_depth: 当前递归深度
-
-    Returns:
-        List[StepNode]: 节点列表
-    """
-    # 1. 获取当前层节点
-    if step_id is None:
-        # 根节点:获取所有 parent_id=None 的 Steps
-        steps = await store.get_trace_steps(trace_id)
-        current_nodes = [s for s in steps if s.parent_id is None]
-    else:
-        # 非根节点:获取子节点
-        current_nodes = await store.get_step_children(step_id)
-
-    # 2. 构建响应
-    result_nodes = []
-    for step in current_nodes:
-        node_dict = step.to_dict(view=view)  # 使用 view 参数
-        node_dict["children"] = []
-
-        # 3. 递归加载子节点(可选)
-        if expand and current_depth < max_depth:
-            children = await store.get_step_children(step.step_id)
-            if children:
-                node_dict["children"] = await _build_tree(
-                    store, trace_id, step.step_id,
-                    view=view, expand=True, max_depth=max_depth,
-                    current_depth=current_depth + 1
-                )
-
-        result_nodes.append(StepNode(**node_dict))
-
-    return result_nodes
+    return BranchDetailResponse(
+        branch=branch.to_dict(),
+        goal_tree=goal_tree.to_dict() if goal_tree else None
+    )

+ 396 - 126
agent/execution/fs_store.py

@@ -2,6 +2,22 @@
 FileSystem Trace Store - 文件系统存储实现
 
 用于跨进程数据共享,数据持久化到 .trace/ 目录
+
+目录结构:
+.trace/{trace_id}/
+├── meta.json           # Trace 元数据
+├── goal.json           # 主线 GoalTree(扁平 JSON,通过 parent_id 构建层级)
+├── messages/           # 主线 Messages(每条独立文件)
+│   ├── {message_id}.json
+│   └── ...
+├── branches/           # 分支数据(独立存储)
+│   ├── A/
+│   │   ├── meta.json   # BranchContext 元数据
+│   │   ├── goal.json   # 分支 A 的 GoalTree
+│   │   └── messages/   # 分支 A 的 Messages
+│   └── B/
+│       └── ...
+└── events.jsonl        # 事件流(WebSocket 续传)
 """
 
 import json
@@ -10,24 +26,12 @@ from pathlib import Path
 from typing import Dict, List, Optional, Any
 from datetime import datetime
 
-from agent.execution.models import Trace, Step
+from agent.execution.models import Trace, Message
+from agent.goal.models import GoalTree, Goal, BranchContext, GoalStats
 
 
 class FileSystemTraceStore:
-    """文件系统 Trace 存储
-
-    目录结构:
-    .trace/
-    ├── trace-001/
-    │   ├── meta.json           # Trace 元数据
-    │   ├── steps/
-    │   │   ├── step-1.json     # 每个 step 独立文件
-    │   │   ├── step-2.json
-    │   │   └── step-3.json
-    │   └── events.jsonl        # 事件流(WebSocket 续传)
-    └── trace-002/
-        └── ...
-    """
+    """文件系统 Trace 存储"""
 
     def __init__(self, base_path: str = ".trace"):
         self.base_path = Path(base_path)
@@ -41,13 +45,37 @@ class FileSystemTraceStore:
         """获取 meta.json 文件路径"""
         return self._get_trace_dir(trace_id) / "meta.json"
 
-    def _get_steps_dir(self, trace_id: str) -> Path:
-        """获取 steps 目录"""
-        return self._get_trace_dir(trace_id) / "steps"
+    def _get_goal_file(self, trace_id: str) -> Path:
+        """获取 goal.json 文件路径"""
+        return self._get_trace_dir(trace_id) / "goal.json"
+
+    def _get_messages_dir(self, trace_id: str) -> Path:
+        """获取 messages 目录"""
+        return self._get_trace_dir(trace_id) / "messages"
+
+    def _get_message_file(self, trace_id: str, message_id: str) -> Path:
+        """获取 message 文件路径"""
+        return self._get_messages_dir(trace_id) / f"{message_id}.json"
+
+    def _get_branches_dir(self, trace_id: str) -> Path:
+        """获取 branches 目录"""
+        return self._get_trace_dir(trace_id) / "branches"
+
+    def _get_branch_dir(self, trace_id: str, branch_id: str) -> Path:
+        """获取分支目录"""
+        return self._get_branches_dir(trace_id) / branch_id
+
+    def _get_branch_meta_file(self, trace_id: str, branch_id: str) -> Path:
+        """获取分支 meta.json 文件路径"""
+        return self._get_branch_dir(trace_id, branch_id) / "meta.json"
+
+    def _get_branch_goal_file(self, trace_id: str, branch_id: str) -> Path:
+        """获取分支 goal.json 文件路径"""
+        return self._get_branch_dir(trace_id, branch_id) / "goal.json"
 
-    def _get_step_file(self, trace_id: str, step_id: str) -> Path:
-        """获取 step 文件路径"""
-        return self._get_steps_dir(trace_id) / f"{step_id}.json"
+    def _get_branch_messages_dir(self, trace_id: str, branch_id: str) -> Path:
+        """获取分支 messages 目录"""
+        return self._get_branch_dir(trace_id, branch_id) / "messages"
 
     def _get_events_file(self, trace_id: str) -> Path:
         """获取 events.jsonl 文件路径"""
@@ -60,9 +88,13 @@ class FileSystemTraceStore:
         trace_dir = self._get_trace_dir(trace.trace_id)
         trace_dir.mkdir(exist_ok=True)
 
-        # 创建 steps 目录
-        steps_dir = self._get_steps_dir(trace.trace_id)
-        steps_dir.mkdir(exist_ok=True)
+        # 创建 messages 目录
+        messages_dir = self._get_messages_dir(trace.trace_id)
+        messages_dir.mkdir(exist_ok=True)
+
+        # 创建 branches 目录
+        branches_dir = self._get_branches_dir(trace.trace_id)
+        branches_dir.mkdir(exist_ok=True)
 
         # 写入 meta.json
         meta_file = self._get_meta_file(trace.trace_id)
@@ -116,7 +148,6 @@ class FileSystemTraceStore:
         """列出 Traces"""
         traces = []
 
-        # 遍历所有 trace 目录
         if not self.base_path.exists():
             return []
 
@@ -149,7 +180,6 @@ class FileSystemTraceStore:
 
                 traces.append(Trace(**data))
             except Exception:
-                # 跳过损坏的文件
                 continue
 
         # 排序(最新的在前)
@@ -157,140 +187,380 @@ class FileSystemTraceStore:
 
         return traces[:limit]
 
-    # ===== Step 操作 =====
+    # ===== GoalTree 操作 =====
+
+    async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]:
+        """获取 GoalTree"""
+        goal_file = self._get_goal_file(trace_id)
+        if not goal_file.exists():
+            return None
+
+        try:
+            data = json.loads(goal_file.read_text())
+            return GoalTree.from_dict(data)
+        except Exception:
+            return None
+
+    async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None:
+        """更新完整 GoalTree"""
+        goal_file = self._get_goal_file(trace_id)
+        goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False))
+
+    async def add_goal(self, trace_id: str, goal: Goal) -> None:
+        """添加 Goal 到 GoalTree"""
+        tree = await self.get_goal_tree(trace_id)
+        if not tree:
+            return
+
+        tree.goals.append(goal)
+        await self.update_goal_tree(trace_id, tree)
+
+    async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None:
+        """更新 Goal 字段"""
+        tree = await self.get_goal_tree(trace_id)
+        if not tree:
+            return
+
+        goal = tree.find(goal_id)
+        if not goal:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(goal, key):
+                # 特殊处理 stats 字段(可能是 dict)
+                if key in ["self_stats", "cumulative_stats"] and isinstance(value, dict):
+                    value = GoalStats.from_dict(value)
+                setattr(goal, key, value)
+
+        await self.update_goal_tree(trace_id, tree)
+
+    # ===== Branch 操作 =====
+
+    async def create_branch(self, trace_id: str, branch: BranchContext) -> None:
+        """创建分支上下文"""
+        branch_dir = self._get_branch_dir(trace_id, branch.id)
+        branch_dir.mkdir(parents=True, exist_ok=True)
+
+        # 创建分支 messages 目录
+        messages_dir = self._get_branch_messages_dir(trace_id, branch.id)
+        messages_dir.mkdir(exist_ok=True)
+
+        # 写入 meta.json
+        meta_file = self._get_branch_meta_file(trace_id, branch.id)
+        meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False))
+
+    async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]:
+        """获取分支元数据"""
+        meta_file = self._get_branch_meta_file(trace_id, branch_id)
+        if not meta_file.exists():
+            return None
+
+        try:
+            data = json.loads(meta_file.read_text())
+            return BranchContext.from_dict(data)
+        except Exception:
+            return None
+
+    async def get_branch_goal_tree(self, trace_id: str, branch_id: str) -> Optional[GoalTree]:
+        """获取分支的 GoalTree"""
+        goal_file = self._get_branch_goal_file(trace_id, branch_id)
+        if not goal_file.exists():
+            return None
+
+        try:
+            data = json.loads(goal_file.read_text())
+            return GoalTree.from_dict(data)
+        except Exception:
+            return None
+
+    async def update_branch_goal_tree(self, trace_id: str, branch_id: str, tree: GoalTree) -> None:
+        """更新分支的 GoalTree"""
+        goal_file = self._get_branch_goal_file(trace_id, branch_id)
+        goal_file.write_text(json.dumps(tree.to_dict(), indent=2, ensure_ascii=False))
+
+    async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None:
+        """更新分支元数据"""
+        branch = await self.get_branch(trace_id, branch_id)
+        if not branch:
+            return
+
+        # 更新字段
+        for key, value in updates.items():
+            if hasattr(branch, key):
+                # 特殊处理 stats 字段
+                if key == "cumulative_stats" and isinstance(value, dict):
+                    value = GoalStats.from_dict(value)
+                setattr(branch, key, value)
+
+        # 写回文件
+        meta_file = self._get_branch_meta_file(trace_id, branch_id)
+        meta_file.write_text(json.dumps(branch.to_dict(), indent=2, ensure_ascii=False))
+
+    async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]:
+        """列出所有分支元数据"""
+        branches_dir = self._get_branches_dir(trace_id)
+        if not branches_dir.exists():
+            return {}
+
+        branches = {}
+        for branch_dir in branches_dir.iterdir():
+            if not branch_dir.is_dir():
+                continue
+
+            meta_file = branch_dir / "meta.json"
+            if not meta_file.exists():
+                continue
+
+            try:
+                data = json.loads(meta_file.read_text())
+                branch = BranchContext.from_dict(data)
+                branches[branch.id] = branch
+            except Exception:
+                continue
+
+        return branches
+
+    # ===== Message 操作 =====
 
-    async def add_step(self, step: Step) -> str:
-        """添加 Step"""
-        trace_id = step.trace_id
+    async def add_message(self, message: Message) -> str:
+        """
+        添加 Message
 
-        # 1. 写入 step 文件
-        step_file = self._get_step_file(trace_id, step.step_id)
-        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+        自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats)
+        """
+        trace_id = message.trace_id
+        branch_id = message.branch_id
 
-        # 2. 更新 trace 的统计信息
+        # 1. 写入 message 文件
+        if branch_id:
+            # 分支消息
+            messages_dir = self._get_branch_messages_dir(trace_id, branch_id)
+        else:
+            # 主线消息
+            messages_dir = self._get_messages_dir(trace_id)
+
+        message_file = messages_dir / f"{message.message_id}.json"
+        message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False))
+
+        # 2. 更新 trace 统计
         trace = await self.get_trace(trace_id)
         if trace:
-            trace.total_steps += 1
-            trace.last_sequence = max(trace.last_sequence, step.sequence)
-
-            # 累加 tokens 和 cost
-            if step.tokens:
-                trace.total_tokens += step.tokens
-            if step.cost:
-                trace.total_cost += step.cost
-            if step.duration_ms:
-                trace.total_duration_ms += step.duration_ms
-
-            # 写回 meta.json
-            meta_file = self._get_meta_file(trace_id)
-            meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False))
-
-        # 3. 更新父节点的 UI 字段
-        if step.parent_id:
-            parent = await self.get_step(step.parent_id)
-            if parent:
-                parent.has_children = True
-                parent.children_count += 1
-
-                # 写回父节点文件
-                parent_file = self._get_step_file(trace_id, step.parent_id)
-                parent_file.write_text(json.dumps(parent.to_dict(view="full"), indent=2, ensure_ascii=False))
-
-        # 4. 追加 step_added 事件(包含完整 compact 视图,用于断线续传)
-        await self.append_event(trace_id, "step_added", {
-            "step": step.to_dict(view="compact")
+            trace.total_messages += 1
+            trace.last_sequence = max(trace.last_sequence, message.sequence)
+
+            if message.tokens:
+                trace.total_tokens += message.tokens
+            if message.cost:
+                trace.total_cost += message.cost
+            if message.duration_ms:
+                trace.total_duration_ms += message.duration_ms
+
+            # 更新 Trace(不要传递 trace_id,它已经在方法参数中)
+            await self.update_trace(
+                trace_id,
+                total_messages=trace.total_messages,
+                last_sequence=trace.last_sequence,
+                total_tokens=trace.total_tokens,
+                total_cost=trace.total_cost,
+                total_duration_ms=trace.total_duration_ms
+            )
+
+        # 3. 更新 Goal stats
+        await self._update_goal_stats(trace_id, message)
+
+        # 4. 追加 message_added 事件
+        affected_goals = await self._get_affected_goals(trace_id, message)
+        await self.append_event(trace_id, "message_added", {
+            "message": message.to_dict(),
+            "affected_goals": affected_goals
         })
 
-        return step.step_id
+        return message.message_id
+
+    async def _update_goal_stats(self, trace_id: str, message: Message) -> None:
+        """更新 Goal 的 self_stats 和祖先的 cumulative_stats"""
+        # 确定使用主线还是分支的 GoalTree
+        if message.branch_id:
+            tree = await self.get_branch_goal_tree(trace_id, message.branch_id)
+        else:
+            tree = await self.get_goal_tree(trace_id)
+
+        if not tree:
+            return
+
+        # 找到关联的 Goal
+        goal = tree.find(message.goal_id)
+        if not goal:
+            return
+
+        # 更新自身 self_stats
+        goal.self_stats.message_count += 1
+        if message.tokens:
+            goal.self_stats.total_tokens += message.tokens
+        if message.cost:
+            goal.self_stats.total_cost += message.cost
+        # TODO: 更新 preview(工具调用摘要)
+
+        # 更新自身 cumulative_stats
+        goal.cumulative_stats.message_count += 1
+        if message.tokens:
+            goal.cumulative_stats.total_tokens += message.tokens
+        if message.cost:
+            goal.cumulative_stats.total_cost += message.cost
+
+        # 沿祖先链向上更新 cumulative_stats
+        current_goal = goal
+        while current_goal.parent_id:
+            parent = tree.find(current_goal.parent_id)
+            if not parent:
+                break
+
+            parent.cumulative_stats.message_count += 1
+            if message.tokens:
+                parent.cumulative_stats.total_tokens += message.tokens
+            if message.cost:
+                parent.cumulative_stats.total_cost += message.cost
+
+            current_goal = parent
+
+        # 保存更新后的 tree
+        if message.branch_id:
+            await self.update_branch_goal_tree(trace_id, message.branch_id, tree)
+        else:
+            await self.update_goal_tree(trace_id, tree)
+
+    async def _get_affected_goals(self, trace_id: str, message: Message) -> List[Dict[str, Any]]:
+        """获取受影响的 Goals(自身 + 所有祖先)"""
+        if message.branch_id:
+            tree = await self.get_branch_goal_tree(trace_id, message.branch_id)
+        else:
+            tree = await self.get_goal_tree(trace_id)
+
+        if not tree:
+            return []
+
+        goal = tree.find(message.goal_id)
+        if not goal:
+            return []
+
+        affected = []
+
+        # 添加自身(包含 self_stats 和 cumulative_stats)
+        affected.append({
+            "goal_id": goal.id,
+            "self_stats": goal.self_stats.to_dict(),
+            "cumulative_stats": goal.cumulative_stats.to_dict()
+        })
+
+        # 添加所有祖先(仅 cumulative_stats)
+        current_goal = goal
+        while current_goal.parent_id:
+            parent = tree.find(current_goal.parent_id)
+            if not parent:
+                break
+
+            affected.append({
+                "goal_id": parent.id,
+                "cumulative_stats": parent.cumulative_stats.to_dict()
+            })
+
+            current_goal = parent
+
+        return affected
 
-    async def get_step(self, step_id: str) -> Optional[Step]:
-        """获取 Step(扫描所有 trace)"""
+    async def get_message(self, message_id: str) -> Optional[Message]:
+        """获取 Message(扫描所有 trace)"""
         for trace_dir in self.base_path.iterdir():
             if not trace_dir.is_dir():
                 continue
 
-            step_file = trace_dir / "steps" / f"{step_id}.json"
-            if step_file.exists():
+            # 检查主线 messages
+            message_file = trace_dir / "messages" / f"{message_id}.json"
+            if message_file.exists():
                 try:
-                    data = json.loads(step_file.read_text())
-
-                    # 解析 datetime
+                    data = json.loads(message_file.read_text())
                     if data.get("created_at"):
                         data["created_at"] = datetime.fromisoformat(data["created_at"])
-
-                    return Step(**data)
+                    return Message(**data)
                 except Exception:
-                    continue
+                    pass
+
+            # 检查分支 messages
+            branches_dir = trace_dir / "branches"
+            if branches_dir.exists():
+                for branch_dir in branches_dir.iterdir():
+                    if not branch_dir.is_dir():
+                        continue
+                    message_file = branch_dir / "messages" / f"{message_id}.json"
+                    if message_file.exists():
+                        try:
+                            data = json.loads(message_file.read_text())
+                            if data.get("created_at"):
+                                data["created_at"] = datetime.fromisoformat(data["created_at"])
+                            return Message(**data)
+                        except Exception:
+                            pass
 
         return None
 
-    async def get_trace_steps(self, trace_id: str) -> List[Step]:
-        """获取 Trace 的所有 Steps"""
-        steps_dir = self._get_steps_dir(trace_id)
-        if not steps_dir.exists():
+    async def get_trace_messages(
+        self,
+        trace_id: str,
+        branch_id: Optional[str] = None
+    ) -> List[Message]:
+        """获取 Trace 的所有 Messages"""
+        if branch_id:
+            messages_dir = self._get_branch_messages_dir(trace_id, branch_id)
+        else:
+            messages_dir = self._get_messages_dir(trace_id)
+
+        if not messages_dir.exists():
             return []
 
-        steps = []
-        for step_file in steps_dir.glob("*.json"):
+        messages = []
+        for message_file in messages_dir.glob("*.json"):
             try:
-                data = json.loads(step_file.read_text())
-
-                # 解析 datetime
+                data = json.loads(message_file.read_text())
                 if data.get("created_at"):
                     data["created_at"] = datetime.fromisoformat(data["created_at"])
-
-                steps.append(Step(**data))
+                messages.append(Message(**data))
             except Exception:
-                # 跳过损坏的文件
                 continue
 
         # 按 sequence 排序
-        steps.sort(key=lambda s: s.sequence)
-        return steps
-
-    async def get_step_children(self, step_id: str) -> List[Step]:
-        """获取 Step 的子节点"""
-        # 需要扫描所有 trace 的所有 steps
-        # TODO: 可以优化为维护索引文件
-        children = []
-
-        for trace_dir in self.base_path.iterdir():
-            if not trace_dir.is_dir():
-                continue
+        messages.sort(key=lambda m: m.sequence)
+        return messages
 
-            steps_dir = trace_dir / "steps"
-            if not steps_dir.exists():
-                continue
-
-            for step_file in steps_dir.glob("*.json"):
-                try:
-                    data = json.loads(step_file.read_text())
-                    if data.get("parent_id") == step_id:
-                        # 解析 datetime
-                        if data.get("created_at"):
-                            data["created_at"] = datetime.fromisoformat(data["created_at"])
-                        children.append(Step(**data))
-                except Exception:
-                    continue
-
-        # 按 sequence 排序
-        children.sort(key=lambda s: s.sequence)
-        return children
-
-    async def update_step(self, step_id: str, **updates) -> None:
-        """更新 Step 字段"""
-        step = await self.get_step(step_id)
-        if not step:
+    async def get_messages_by_goal(
+        self,
+        trace_id: str,
+        goal_id: str,
+        branch_id: Optional[str] = None
+    ) -> List[Message]:
+        """获取指定 Goal 关联的所有 Messages"""
+        all_messages = await self.get_trace_messages(trace_id, branch_id)
+        return [m for m in all_messages if m.goal_id == goal_id]
+
+    async def update_message(self, message_id: str, **updates) -> None:
+        """更新 Message 字段"""
+        message = await self.get_message(message_id)
+        if not message:
             return
 
         # 更新字段
         for key, value in updates.items():
-            if hasattr(step, key):
-                setattr(step, key, value)
+            if hasattr(message, key):
+                setattr(message, key, value)
 
-        # 写回文件
-        step_file = self._get_step_file(step.trace_id, step_id)
-        step_file.write_text(json.dumps(step.to_dict(view="full"), indent=2, ensure_ascii=False))
+        # 确定文件路径
+        if message.branch_id:
+            messages_dir = self._get_branch_messages_dir(message.trace_id, message.branch_id)
+        else:
+            messages_dir = self._get_messages_dir(message.trace_id)
+
+        message_file = messages_dir / f"{message_id}.json"
+        message_file.write_text(json.dumps(message.to_dict(), indent=2, ensure_ascii=False))
 
     # ===== 事件流操作(用于 WebSocket 断线续传)=====
 

+ 151 - 132
agent/execution/models.py

@@ -1,8 +1,8 @@
 """
-Trace 和 Step 数据模型
+Trace 和 Message 数据模型
 
 Trace: 一次完整的 LLM 交互(单次调用或 Agent 任务)
-Step: Trace 中的一个原子操作,形成树结构
+Message: Trace 中的 LLM 消息,对应 LLM API 格式
 """
 
 from dataclasses import dataclass, field
@@ -11,44 +11,13 @@ from typing import Dict, Any, List, Optional, Literal
 import uuid
 
 
-# Step 类型
-StepType = Literal[
-    # 计划相关
-    "goal",        # 目标/计划项(可以有子 steps)
-
-    # LLM 输出
-    "thought",     # 思考/分析(中间过程)
-    "evaluation",  # 评估总结(需要 summary)
-    "response",    # 最终回复
-
-    # 工具相关(数据结构上分开以保留描述能力,可视化时候可能合并)
-    "action",      # 工具调用(tool_call)
-    "result",      # 工具结果(tool_result)
-
-    # 系统相关
-    "memory_read",   # 读取记忆(经验/技能)
-    "memory_write",  # 写入记忆
-]
-
-
-# Step 状态
-StepStatus = Literal[
-    "planned",           # 计划中(未执行)
-    "in_progress",       # 执行中
-    "awaiting_approval", # 等待用户确认
-    "completed",         # 已完成
-    "failed",            # 失败
-    "skipped",           # 跳过
-]
-
-
 @dataclass
 class Trace:
     """
     执行轨迹 - 一次完整的 LLM 交互
 
-    单次调用: mode="call", 只有 1 个 Step
-    Agent 模式: mode="agent", 多个 Steps 形成树结构
+    单次调用: mode="call"
+    Agent 模式: mode="agent"
     """
     trace_id: str
     mode: Literal["call", "agent"]
@@ -64,20 +33,20 @@ class Trace:
     status: Literal["running", "completed", "failed"] = "running"
 
     # 统计
-    total_steps: int = 0
+    total_messages: int = 0      # 消息总数(改名自 total_steps)
     total_tokens: int = 0
     total_cost: float = 0.0
     total_duration_ms: int = 0  # 总耗时(毫秒)
 
     # 进度追踪(head)
-    last_sequence: int = 0      # 最新 step 的 sequence
+    last_sequence: int = 0      # 最新 message 的 sequence
     last_event_id: int = 0      # 最新事件 ID(用于 WS 续传)
 
     # 上下文
     uid: Optional[str] = None
     context: Dict[str, Any] = field(default_factory=dict)
 
-    # 当前焦点 goal(用于 step 工具)
+    # 当前焦点 goal
     current_goal_id: Optional[str] = None
 
     # 时间
@@ -106,7 +75,7 @@ class Trace:
             "task": self.task,
             "agent_type": self.agent_type,
             "status": self.status,
-            "total_steps": self.total_steps,
+            "total_messages": self.total_messages,
             "total_tokens": self.total_tokens,
             "total_cost": self.total_cost,
             "total_duration_ms": self.total_duration_ms,
@@ -121,61 +90,167 @@ class Trace:
 
 
 @dataclass
-class Step:
+class Message:
     """
-    执行步骤 - Trace 中的一个原子操作
+    执行消息 - Trace 中的 LLM 消息
+
+    对应 LLM API 消息格式(assistant/tool),通过 goal_id 和 branch_id 关联 Goal。
+
+    description 字段自动生成规则:
+    - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX"
+    - tool: 使用 tool name
+    """
+    message_id: str
+    trace_id: str
+    role: Literal["assistant", "tool"]   # 和 LLM API 一致
+    sequence: int                        # 全局顺序
+    goal_id: str                         # 关联的 Goal 内部 ID
+    description: str = ""                # 消息描述(系统自动生成)
+    branch_id: Optional[str] = None      # 所属分支(null=主线, "A"/"B"=分支)
+    tool_call_id: Optional[str] = None   # tool 消息关联对应的 tool_call
+    content: Any = None                  # 消息内容(和 LLM API 格式一致)
+
+    # 元数据
+    tokens: Optional[int] = None
+    cost: Optional[float] = None
+    duration_ms: Optional[int] = None
+    created_at: datetime = field(default_factory=datetime.now)
+
+    @classmethod
+    def create(
+        cls,
+        trace_id: str,
+        role: Literal["assistant", "tool"],
+        sequence: int,
+        goal_id: str,
+        content: Any = None,
+        branch_id: Optional[str] = None,
+        tool_call_id: Optional[str] = None,
+        tokens: Optional[int] = None,
+        cost: Optional[float] = None,
+        duration_ms: Optional[int] = None,
+    ) -> "Message":
+        """创建新的 Message,自动生成 description"""
+        description = cls._generate_description(role, content)
+
+        return cls(
+            message_id=str(uuid.uuid4()),
+            trace_id=trace_id,
+            role=role,
+            sequence=sequence,
+            goal_id=goal_id,
+            content=content,
+            description=description,
+            branch_id=branch_id,
+            tool_call_id=tool_call_id,
+            tokens=tokens,
+            cost=cost,
+            duration_ms=duration_ms,
+        )
+
+    @staticmethod
+    def _generate_description(role: str, content: Any) -> str:
+        """
+        自动生成 description
+
+        - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX"
+        - tool: 使用 tool name
+        """
+        if role == "assistant":
+            # assistant 消息:content 是字典,可能包含 text 和 tool_calls
+            if isinstance(content, dict):
+                # 优先返回文本内容
+                if content.get("text"):
+                    text = content["text"]
+                    # 截断过长的文本
+                    return text[:200] + "..." if len(text) > 200 else text
+
+                # 如果没有文本,检查 tool_calls
+                if content.get("tool_calls"):
+                    tool_calls = content["tool_calls"]
+                    if isinstance(tool_calls, list):
+                        tool_names = []
+                        for tc in tool_calls:
+                            if isinstance(tc, dict) and tc.get("function", {}).get("name"):
+                                tool_names.append(tc["function"]["name"])
+                        if tool_names:
+                            return f"tool call: {', '.join(tool_names)}"
+
+            # 如果 content 是字符串
+            if isinstance(content, str):
+                return content[:200] + "..." if len(content) > 200 else content
+
+            return "assistant message"
+
+        elif role == "tool":
+            # tool 消息:从 content 中提取 tool name
+            if isinstance(content, dict):
+                if content.get("tool_name"):
+                    return content["tool_name"]
+
+            # 如果是字符串,尝试解析
+            if isinstance(content, str):
+                return content[:100] + "..." if len(content) > 100 else content
+
+            return "tool result"
+
+        return ""
+
+    def to_dict(self) -> Dict[str, Any]:
+        """转换为字典"""
+        return {
+            "message_id": self.message_id,
+            "trace_id": self.trace_id,
+            "branch_id": self.branch_id,
+            "role": self.role,
+            "sequence": self.sequence,
+            "goal_id": self.goal_id,
+            "tool_call_id": self.tool_call_id,
+            "content": self.content,
+            "description": self.description,
+            "tokens": self.tokens,
+            "cost": self.cost,
+            "duration_ms": self.duration_ms,
+            "created_at": self.created_at.isoformat() if self.created_at else None,
+        }
+
 
-    Step 之间通过 parent_id 形成树结构(单父节点)
+# ===== 已弃用:Step 模型(保留用于向后兼容)=====
 
-    ## 字段设计规则
+# Step 类型
+StepType = Literal[
+    "goal", "thought", "evaluation", "response",
+    "action", "result", "memory_read", "memory_write",
+]
 
-    **顶层字段**(Step 类属性):
-    - 所有(或大部分)step 都有的字段
-    - 需要筛选/排序/索引的字段(如 tokens, cost, duration_ms)
-    - 结构化、类型明确的字段
+# Step 状态
+StepStatus = Literal[
+    "planned", "in_progress", "awaiting_approval",
+    "completed", "failed", "skipped",
+]
 
-    **data 字段**(Dict):
-    - step_type 特定的字段(不同类型有不同 schema)
-    - 详细的业务数据(如 messages, content, arguments, output)
-    - 可能很大的字段
-    - 半结构化、动态的字段
 
-    ## data 字段 schema(按 step_type)
+@dataclass
+class Step:
+    """
+    [已弃用] 执行步骤 - 使用 Message 模型替代
 
-    - thought/response: model, messages, content, tool_calls
-    - action: tool_name, arguments
-    - result: tool_name, output, error
-    - memory_read: experiences_count, skills_count
-    - goal: 自定义(根据具体目标)
+    保留用于向后兼容
     """
     step_id: str
     trace_id: str
     step_type: StepType
     status: StepStatus
-    sequence: int  # 在 Trace 中的顺序
-
-    # 树结构(单父节点)
+    sequence: int
     parent_id: Optional[str] = None
-
-    # 内容
-    description: str = ""  # 所有节点都有,系统自动提取
-
-    # 类型相关数据
+    description: str = ""
     data: Dict[str, Any] = field(default_factory=dict)
-
-    # 仅 evaluation 类型需要
     summary: Optional[str] = None
-
-    # UI 优化字段
-    has_children: bool = False      # 是否有子节点
-    children_count: int = 0         # 子节点数量
-
-    # 执行指标
+    has_children: bool = False
+    children_count: int = 0
     duration_ms: Optional[int] = None
     tokens: Optional[int] = None
     cost: Optional[float] = None
-
-    # 时间
     created_at: datetime = field(default_factory=datetime.now)
 
     @classmethod
@@ -236,67 +311,11 @@ class Step:
 
         # 处理 data 字段
         if view == "compact":
-            # compact 模式:移除 data 中的大字段
             data_copy = self.data.copy()
-            # 移除可能的大字段(如 output, content 等)
             for key in ["output", "content", "full_output", "full_content"]:
                 data_copy.pop(key, None)
             result["data"] = data_copy
         else:
-            # full 模式:返回完整 data
             result["data"] = self.data
 
         return result
-
-
-# Step.data 结构说明
-#
-# goal:
-#   {
-#       "description": "探索代码库",
-#   }
-#
-# thought:
-#   {
-#       "content": "需要先了解项目结构...",
-#   }
-#
-# action:
-#   {
-#       "tool_name": "glob_files",
-#       "arguments": {"pattern": "**/*.py"},
-#   }
-#
-# result:
-#   {
-#       "tool_name": "glob_files",
-#       "output": ["src/main.py", ...],
-#       "title": "找到 15 个文件",
-#   }
-#
-# evaluation:
-#   {
-#       "content": "分析完成...",
-#   }
-#   # summary 字段存储简短总结
-#
-# response:
-#   {
-#       "content": "任务已完成...",
-#       "is_final": True,
-#   }
-#
-# memory_read:
-#   {
-#       "skills": [...],
-#       "experiences": [...],
-#       "skills_count": 3,
-#       "experiences_count": 5
-#   }
-#
-# memory_write:
-#   {
-#       "experience_id": "...",
-#       "condition": "...",
-#       "rule": "..."
-#   }

+ 165 - 17
agent/execution/protocols.py

@@ -6,12 +6,13 @@ Trace Storage Protocol - Trace 存储接口定义
 
 from typing import Protocol, List, Optional, Dict, Any, runtime_checkable
 
-from agent.execution.models import Trace, Step
+from agent.execution.models import Trace, Message
+from agent.goal.models import GoalTree, Goal, BranchContext
 
 
 @runtime_checkable
 class TraceStore(Protocol):
-    """Trace + Step 存储接口"""
+    """Trace + Message + GoalTree + Branch 存储接口"""
 
     # ===== Trace 操作 =====
 
@@ -52,38 +53,185 @@ class TraceStore(Protocol):
         """列出 Traces"""
         ...
 
-    # ===== Step 操作 =====
+    # ===== GoalTree 操作 =====
 
-    async def add_step(self, step: Step) -> str:
+    async def get_goal_tree(self, trace_id: str) -> Optional[GoalTree]:
         """
-        添加 Step
+        获取 GoalTree
 
         Args:
-            step: Step 对象
+            trace_id: Trace ID
 
         Returns:
-            step_id
+            GoalTree 对象,如果不存在返回 None
+        """
+        ...
+
+    async def update_goal_tree(self, trace_id: str, tree: GoalTree) -> None:
+        """
+        更新完整 GoalTree
+
+        Args:
+            trace_id: Trace ID
+            tree: GoalTree 对象
+        """
+        ...
+
+    async def add_goal(self, trace_id: str, goal: Goal) -> None:
+        """
+        添加 Goal 到 GoalTree
+
+        Args:
+            trace_id: Trace ID
+            goal: Goal 对象
+        """
+        ...
+
+    async def update_goal(self, trace_id: str, goal_id: str, **updates) -> None:
+        """
+        更新 Goal 字段
+
+        Args:
+            trace_id: Trace ID
+            goal_id: Goal ID
+            **updates: 要更新的字段(如 status, summary, self_stats, cumulative_stats)
+        """
+        ...
+
+    # ===== Branch 操作 =====
+
+    async def create_branch(self, trace_id: str, branch: BranchContext) -> None:
+        """
+        创建分支上下文
+
+        Args:
+            trace_id: Trace ID
+            branch: BranchContext 对象
         """
         ...
 
-    async def get_step(self, step_id: str) -> Optional[Step]:
-        """获取 Step"""
+    async def get_branch(self, trace_id: str, branch_id: str) -> Optional[BranchContext]:
+        """
+        获取分支元数据
+
+        Args:
+            trace_id: Trace ID
+            branch_id: 分支 ID
+
+        Returns:
+            BranchContext 对象(不含分支内 GoalTree)
+        """
         ...
 
-    async def get_trace_steps(self, trace_id: str) -> List[Step]:
-        """获取 Trace 的所有 Steps(按 sequence 排序)"""
+    async def get_branch_goal_tree(self, trace_id: str, branch_id: str) -> Optional[GoalTree]:
+        """
+        获取分支的 GoalTree
+
+        Args:
+            trace_id: Trace ID
+            branch_id: 分支 ID
+
+        Returns:
+            分支的 GoalTree 对象
+        """
         ...
 
-    async def get_step_children(self, step_id: str) -> List[Step]:
-        """获取 Step 的子节点"""
+    async def update_branch_goal_tree(self, trace_id: str, branch_id: str, tree: GoalTree) -> None:
+        """
+        更新分支的 GoalTree
+
+        Args:
+            trace_id: Trace ID
+            branch_id: 分支 ID
+            tree: GoalTree 对象
+        """
+        ...
+
+    async def update_branch(self, trace_id: str, branch_id: str, **updates) -> None:
+        """
+        更新分支元数据
+
+        Args:
+            trace_id: Trace ID
+            branch_id: 分支 ID
+            **updates: 要更新的字段(如 status, summary, cumulative_stats)
+        """
+        ...
+
+    async def list_branches(self, trace_id: str) -> Dict[str, BranchContext]:
+        """
+        列出所有分支元数据
+
+        Args:
+            trace_id: Trace ID
+
+        Returns:
+            Dict[branch_id, BranchContext]
+        """
+        ...
+
+    # ===== Message 操作 =====
+
+    async def add_message(self, message: Message) -> str:
+        """
+        添加 Message
+
+        自动更新关联 Goal 的 stats(self_stats 和祖先的 cumulative_stats)
+
+        Args:
+            message: Message 对象
+
+        Returns:
+            message_id
+        """
+        ...
+
+    async def get_message(self, message_id: str) -> Optional[Message]:
+        """获取 Message"""
+        ...
+
+    async def get_trace_messages(
+        self,
+        trace_id: str,
+        branch_id: Optional[str] = None
+    ) -> List[Message]:
+        """
+        获取 Trace 的所有 Messages(按 sequence 排序)
+
+        Args:
+            trace_id: Trace ID
+            branch_id: 可选,过滤指定分支的消息
+
+        Returns:
+            Message 列表
+        """
+        ...
+
+    async def get_messages_by_goal(
+        self,
+        trace_id: str,
+        goal_id: str,
+        branch_id: Optional[str] = None
+    ) -> List[Message]:
+        """
+        获取指定 Goal 关联的所有 Messages
+
+        Args:
+            trace_id: Trace ID
+            goal_id: Goal ID
+            branch_id: 可选,指定分支
+
+        Returns:
+            Message 列表
+        """
         ...
 
-    async def update_step(self, step_id: str, **updates) -> None:
+    async def update_message(self, message_id: str, **updates) -> None:
         """
-        更新 Step 字段(用于状态变更、错误记录等)
+        更新 Message 字段(用于状态变更、错误记录等)
 
         Args:
-            step_id: Step ID
+            message_id: Message ID
             **updates: 要更新的字段
         """
         ...
@@ -118,7 +266,7 @@ class TraceStore(Protocol):
 
         Args:
             trace_id: Trace ID
-            event_type: 事件类型(step_added/step_updated/trace_completed)
+            event_type: 事件类型
             payload: 事件数据
 
         Returns:

+ 204 - 54
agent/execution/websocket.py

@@ -1,7 +1,7 @@
 """
-Step 树 WebSocket 推送
+Trace WebSocket 推送
 
-实时推送进行中 Trace 的 Step 更新,支持断线续传
+实时推送进行中 Trace 的更新,支持断线续传
 """
 
 from typing import Dict, Set, Any
@@ -44,7 +44,18 @@ async def watch_trace(
     since_event_id: int = Query(0, description="从哪个事件 ID 开始(0=补发所有历史)")
 ):
     """
-    监听 Trace 的 Step 更新,支持断线续传
+    监听 Trace 的更新,支持断线续传
+
+    事件类型:
+    - connected: 连接成功,返回 goal_tree 和 branches
+    - goal_added: 新增 Goal
+    - goal_updated: Goal 状态变化(含级联完成)
+    - message_added: 新 Message(含 affected_goals)
+    - branch_started: 分支开始探索
+    - branch_goal_added: 分支内新增 Goal
+    - branch_completed: 分支完成
+    - explore_completed: 所有分支完成
+    - trace_completed: 执行完成
 
     Args:
         trace_id: Trace ID
@@ -71,11 +82,17 @@ async def watch_trace(
     _active_connections[trace_id].add(websocket)
 
     try:
-        # 发送连接成功消息 + 当前 event_id
+        # 获取 GoalTree 和分支元数据
+        goal_tree = await store.get_goal_tree(trace_id)
+        branches = await store.list_branches(trace_id)
+
+        # 发送连接成功消息 + 完整状态
         await websocket.send_json({
             "event": "connected",
             "trace_id": trace_id,
-            "current_event_id": trace.last_event_id
+            "current_event_id": trace.last_event_id,
+            "goal_tree": goal_tree.to_dict() if goal_tree else None,
+            "branches": {b_id: b.to_dict() for b_id, b in branches.items()}
         })
 
         # 补发历史事件(since_event_id=0 表示补发所有历史)
@@ -85,7 +102,7 @@ async def watch_trace(
             if len(missed_events) > 100:
                 await websocket.send_json({
                     "event": "error",
-                    "message": f"Too many missed events ({len(missed_events)}), please reload full tree via REST API"
+                    "message": f"Too many missed events ({len(missed_events)}), please reload via REST API"
                 })
             else:
                 for evt in missed_events:
@@ -96,7 +113,6 @@ async def watch_trace(
             try:
                 # 接收客户端消息(心跳检测)
                 data = await websocket.receive_text()
-                # 可以处理客户端请求(如请求完整状态)
                 if data == "ping":
                     await websocket.send_json({"event": "pong"})
             except WebSocketDisconnect:
@@ -110,99 +126,215 @@ async def watch_trace(
                 del _active_connections[trace_id]
 
 
-# ===== 广播函数(由 AgentRunner 调用)=====
+# ===== 广播函数(由 AgentRunner 或 TraceStore 调用)=====
 
 
-async def broadcast_step_added(trace_id: str, step_dict: Dict):
+async def broadcast_goal_added(trace_id: str, goal_dict: Dict[str, Any]):
     """
-    广播 Step 添加事件(自动分配 event_id)
+    广播 Goal 添加事件
 
     Args:
         trace_id: Trace ID
-        step_dict: Step 字典(from step.to_dict(view="compact")
+        goal_dict: Goal 字典(完整数据,含 stats
     """
     if trace_id not in _active_connections:
         return
 
-    # 从 store 获取最新 event_id(已由 add_step 自动追加)
     store = get_trace_store()
-    trace = await store.get_trace(trace_id)
-    if not trace:
+    event_id = await store.append_event(trace_id, "goal_added", {
+        "goal": goal_dict
+    })
+
+    message = {
+        "event": "goal_added",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
+        "goal": goal_dict
+    }
+
+    await _broadcast_to_trace(trace_id, message)
+
+
+async def broadcast_goal_updated(
+    trace_id: str,
+    goal_id: str,
+    updates: Dict[str, Any],
+    affected_goals: list[Dict[str, Any]] = None
+):
+    """
+    广播 Goal 更新事件(patch 语义)
+
+    Args:
+        trace_id: Trace ID
+        goal_id: Goal ID
+        updates: 更新字段(patch 格式)
+        affected_goals: 受影响的 Goals(含级联完成的父节点)
+    """
+    if trace_id not in _active_connections:
         return
 
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "goal_updated", {
+        "goal_id": goal_id,
+        "updates": updates,
+        "affected_goals": affected_goals or []
+    })
+
     message = {
-        "event": "step_added",
-        "event_id": trace.last_event_id,
+        "event": "goal_updated",
+        "event_id": event_id,
         "ts": datetime.now().isoformat(),
-        "step": step_dict  # compact 视图
+        "goal_id": goal_id,
+        "patch": updates,
+        "affected_goals": affected_goals or []
     }
 
-    # 发送给所有监听该 Trace 的客户端
-    disconnected = []
-    for websocket in _active_connections[trace_id]:
-        try:
-            await websocket.send_json(message)
-        except Exception:
-            disconnected.append(websocket)
+    await _broadcast_to_trace(trace_id, message)
 
-    # 清理断开的连接
-    for ws in disconnected:
-        _active_connections[trace_id].discard(ws)
 
+async def broadcast_branch_started(trace_id: str, branch_dict: Dict[str, Any]):
+    """
+    广播分支开始事件
+
+    Args:
+        trace_id: Trace ID
+        branch_dict: BranchContext 字典
+    """
+    if trace_id not in _active_connections:
+        return
+
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "branch_started", {
+        "branch": branch_dict
+    })
+
+    message = {
+        "event": "branch_started",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
+        "branch": branch_dict
+    }
+
+    await _broadcast_to_trace(trace_id, message)
 
-async def broadcast_step_updated(trace_id: str, step_id: str, updates: Dict):
+
+async def broadcast_branch_goal_added(
+    trace_id: str,
+    branch_id: str,
+    goal_dict: Dict[str, Any]
+):
     """
-    广播 Step 更新事件(patch 语义)
+    广播分支内新增 Goal
 
     Args:
         trace_id: Trace ID
-        step_id: Step ID
-        updates: 更新字段(patch 格式)
+        branch_id: 分支 ID
+        goal_dict: Goal 字典
     """
     if trace_id not in _active_connections:
         return
 
     store = get_trace_store()
+    event_id = await store.append_event(trace_id, "branch_goal_added", {
+        "branch_id": branch_id,
+        "goal": goal_dict
+    })
+
+    message = {
+        "event": "branch_goal_added",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
+        "branch_id": branch_id,
+        "goal": goal_dict
+    }
+
+    await _broadcast_to_trace(trace_id, message)
 
-    # 追加事件到 store
-    event_id = await store.append_event(trace_id, "step_updated", {
-        "step_id": step_id,
-        "updates": updates
+
+async def broadcast_branch_completed(
+    trace_id: str,
+    branch_id: str,
+    summary: str,
+    cumulative_stats: Dict[str, Any]
+):
+    """
+    广播分支完成事件
+
+    Args:
+        trace_id: Trace ID
+        branch_id: 分支 ID
+        summary: 分支总结
+        cumulative_stats: 分支累计统计
+    """
+    if trace_id not in _active_connections:
+        return
+
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "branch_completed", {
+        "branch_id": branch_id,
+        "summary": summary,
+        "cumulative_stats": cumulative_stats
     })
 
     message = {
-        "event": "step_updated",
+        "event": "branch_completed",
         "event_id": event_id,
         "ts": datetime.now().isoformat(),
-        "step_id": step_id,
-        "patch": updates  # JSON Patch 风格:{"status": "completed", "duration_ms": 123}
+        "branch_id": branch_id,
+        "summary": summary,
+        "cumulative_stats": cumulative_stats
     }
 
-    disconnected = []
-    for websocket in _active_connections[trace_id]:
-        try:
-            await websocket.send_json(message)
-        except Exception:
-            disconnected.append(websocket)
+    await _broadcast_to_trace(trace_id, message)
 
-    for ws in disconnected:
-        _active_connections[trace_id].discard(ws)
 
+async def broadcast_explore_completed(
+    trace_id: str,
+    explore_start_id: str,
+    merge_summary: str
+):
+    """
+    广播探索完成事件
+
+    Args:
+        trace_id: Trace ID
+        explore_start_id: explore_start Goal ID
+        merge_summary: 汇总结果
+    """
+    if trace_id not in _active_connections:
+        return
+
+    store = get_trace_store()
+    event_id = await store.append_event(trace_id, "explore_completed", {
+        "explore_start_id": explore_start_id,
+        "merge_summary": merge_summary
+    })
+
+    message = {
+        "event": "explore_completed",
+        "event_id": event_id,
+        "ts": datetime.now().isoformat(),
+        "explore_start_id": explore_start_id,
+        "merge_summary": merge_summary
+    }
+
+    await _broadcast_to_trace(trace_id, message)
 
-async def broadcast_trace_completed(trace_id: str, total_steps: int):
+
+async def broadcast_trace_completed(trace_id: str, total_messages: int):
     """
     广播 Trace 完成事件
 
     Args:
         trace_id: Trace ID
-        total_steps: 总 Step 数
+        total_messages: 总 Message
     """
     if trace_id not in _active_connections:
         return
 
     store = get_trace_store()
     event_id = await store.append_event(trace_id, "trace_completed", {
-        "total_steps": total_steps
+        "total_messages": total_messages
     })
 
     message = {
@@ -210,9 +342,30 @@ async def broadcast_trace_completed(trace_id: str, total_steps: int):
         "event_id": event_id,
         "ts": datetime.now().isoformat(),
         "trace_id": trace_id,
-        "total_steps": total_steps
+        "total_messages": total_messages
     }
 
+    await _broadcast_to_trace(trace_id, message)
+
+    # 完成后清理所有连接
+    if trace_id in _active_connections:
+        del _active_connections[trace_id]
+
+
+# ===== 内部辅助函数 =====
+
+
+async def _broadcast_to_trace(trace_id: str, message: Dict[str, Any]):
+    """
+    向指定 Trace 的所有连接广播消息
+
+    Args:
+        trace_id: Trace ID
+        message: 消息内容
+    """
+    if trace_id not in _active_connections:
+        return
+
     disconnected = []
     for websocket in _active_connections[trace_id]:
         try:
@@ -220,9 +373,6 @@ async def broadcast_trace_completed(trace_id: str, total_steps: int):
         except Exception:
             disconnected.append(websocket)
 
+    # 清理断开的连接
     for ws in disconnected:
         _active_connections[trace_id].discard(ws)
-
-    # 完成后清理所有连接
-    if trace_id in _active_connections:
-        del _active_connections[trace_id]

+ 13 - 12
agent/goal/__init__.py

@@ -4,26 +4,27 @@ Goal 模块 - 执行计划管理
 提供 Goal 和 GoalTree 数据模型,以及 goal 工具。
 """
 
-from agent.goal.models import Goal, GoalTree, GoalStatus
-from agent.goal.tool import goal_tool, create_goal_tool_schema
-from agent.goal.compaction import (
-    compress_messages_for_goal,
-    compress_all_completed,
-    get_messages_for_goal,
-    should_compress,
+from agent.goal.models import (
+    Goal,
+    GoalTree,
+    GoalStatus,
+    GoalType,
+    GoalStats,
+    BranchContext,
+    BranchStatus,
 )
+from agent.goal.tool import goal_tool, create_goal_tool_schema
 
 __all__ = [
     # Models
     "Goal",
     "GoalTree",
     "GoalStatus",
+    "GoalType",
+    "GoalStats",
+    "BranchContext",
+    "BranchStatus",
     # Tool
     "goal_tool",
     "create_goal_tool_schema",
-    # Compaction
-    "compress_messages_for_goal",
-    "compress_all_completed",
-    "get_messages_for_goal",
-    "should_compress",
 ]

+ 220 - 46
agent/goal/models.py

@@ -3,6 +3,8 @@ Goal 数据模型
 
 Goal: 执行计划中的目标节点
 GoalTree: 目标树,管理整个执行计划
+GoalStats: 目标统计信息
+BranchContext: 分支执行上下文
 """
 
 from dataclasses import dataclass, field
@@ -14,19 +16,67 @@ import json
 # Goal 状态
 GoalStatus = Literal["pending", "in_progress", "completed", "abandoned"]
 
+# Goal 类型
+GoalType = Literal["normal", "explore_start", "explore_merge"]
+
+# Branch 状态
+BranchStatus = Literal["exploring", "completed", "abandoned"]
+
+
+@dataclass
+class GoalStats:
+    """目标统计信息"""
+    message_count: int = 0               # 消息数量
+    total_tokens: int = 0                # Token 总数
+    total_cost: float = 0.0              # 总成本
+    preview: Optional[str] = None        # 工具调用摘要,如 "read_file → edit_file → bash"
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            "message_count": self.message_count,
+            "total_tokens": self.total_tokens,
+            "total_cost": self.total_cost,
+            "preview": self.preview,
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "GoalStats":
+        return cls(
+            message_count=data.get("message_count", 0),
+            total_tokens=data.get("total_tokens", 0),
+            total_cost=data.get("total_cost", 0.0),
+            preview=data.get("preview"),
+        )
+
 
 @dataclass
 class Goal:
     """
     执行目标
 
-    通过 children 形成层级结构。
+    使用扁平列表 + parent_id 构建层级结构。
     """
-    id: str                                  # 自动生成: "1", "1.1", "2"
+    id: str                                  # 内部唯一 ID,纯自增("1", "2", "3"...)
     description: str                         # 目标描述
+    reason: str = ""                         # 创建理由(为什么做)
+    parent_id: Optional[str] = None          # 父 Goal ID(层级关系)
+    branch_id: Optional[str] = None          # 所属分支 ID(分支关系,null=主线)
+    type: GoalType = "normal"                # Goal 类型
     status: GoalStatus = "pending"           # 状态
     summary: Optional[str] = None            # 完成/放弃时的总结
-    children: List["Goal"] = field(default_factory=list)
+
+    # explore_start 特有
+    branch_ids: Optional[List[str]] = None   # 关联的分支 ID 列表
+
+    # explore_merge 特有
+    explore_start_id: Optional[str] = None   # 关联的 explore_start Goal
+    merge_summary: Optional[str] = None      # 各分支汇总结果
+    selected_branch: Optional[str] = None    # 选中的分支(可选)
+
+    # 统计(后端维护,用于可视化边的数据)
+    self_stats: GoalStats = field(default_factory=GoalStats)          # 自身统计(仅直接关联的 messages)
+    cumulative_stats: GoalStats = field(default_factory=GoalStats)    # 累计统计(自身 + 所有后代)
+
     created_at: datetime = field(default_factory=datetime.now)
 
     def to_dict(self) -> Dict[str, Any]:
@@ -34,26 +84,95 @@ class Goal:
         return {
             "id": self.id,
             "description": self.description,
+            "reason": self.reason,
+            "parent_id": self.parent_id,
+            "branch_id": self.branch_id,
+            "type": self.type,
             "status": self.status,
             "summary": self.summary,
-            "children": [c.to_dict() for c in self.children],
+            "branch_ids": self.branch_ids,
+            "explore_start_id": self.explore_start_id,
+            "merge_summary": self.merge_summary,
+            "selected_branch": self.selected_branch,
+            "self_stats": self.self_stats.to_dict(),
+            "cumulative_stats": self.cumulative_stats.to_dict(),
             "created_at": self.created_at.isoformat() if self.created_at else None,
         }
 
     @classmethod
     def from_dict(cls, data: Dict[str, Any]) -> "Goal":
         """从字典创建"""
-        children = [cls.from_dict(c) for c in data.get("children", [])]
         created_at = data.get("created_at")
         if isinstance(created_at, str):
             created_at = datetime.fromisoformat(created_at)
 
+        self_stats = data.get("self_stats", {})
+        if isinstance(self_stats, dict):
+            self_stats = GoalStats.from_dict(self_stats)
+
+        cumulative_stats = data.get("cumulative_stats", {})
+        if isinstance(cumulative_stats, dict):
+            cumulative_stats = GoalStats.from_dict(cumulative_stats)
+
         return cls(
             id=data["id"],
             description=data["description"],
+            reason=data.get("reason", ""),
+            parent_id=data.get("parent_id"),
+            branch_id=data.get("branch_id"),
+            type=data.get("type", "normal"),
             status=data.get("status", "pending"),
             summary=data.get("summary"),
-            children=children,
+            branch_ids=data.get("branch_ids"),
+            explore_start_id=data.get("explore_start_id"),
+            merge_summary=data.get("merge_summary"),
+            selected_branch=data.get("selected_branch"),
+            self_stats=self_stats,
+            cumulative_stats=cumulative_stats,
+            created_at=created_at or datetime.now(),
+        )
+
+
+@dataclass
+class BranchContext:
+    """分支执行上下文(独立的 sub-agent 环境)"""
+    id: str                          # 分支 ID,如 "A", "B"
+    explore_start_id: str            # 关联的 explore_start Goal ID
+    description: str                 # 探索方向描述
+    status: BranchStatus             # exploring | completed | abandoned
+
+    summary: Optional[str] = None           # 完成时的总结
+    cumulative_stats: GoalStats = field(default_factory=GoalStats)  # 累计统计
+    created_at: datetime = field(default_factory=datetime.now)
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            "id": self.id,
+            "explore_start_id": self.explore_start_id,
+            "description": self.description,
+            "status": self.status,
+            "summary": self.summary,
+            "cumulative_stats": self.cumulative_stats.to_dict(),
+            "created_at": self.created_at.isoformat() if self.created_at else None,
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "BranchContext":
+        created_at = data.get("created_at")
+        if isinstance(created_at, str):
+            created_at = datetime.fromisoformat(created_at)
+
+        cumulative_stats = data.get("cumulative_stats", {})
+        if isinstance(cumulative_stats, dict):
+            cumulative_stats = GoalStats.from_dict(cumulative_stats)
+
+        return cls(
+            id=data["id"],
+            explore_start_id=data["explore_start_id"],
+            description=data["description"],
+            status=data.get("status", "exploring"),
+            summary=data.get("summary"),
+            cumulative_stats=cumulative_stats,
             created_at=created_at or datetime.now(),
         )
 
@@ -62,35 +181,32 @@ class Goal:
 class GoalTree:
     """
     目标树 - 管理整个执行计划
+
+    使用扁平列表 + parent_id 构建层级结构
     """
     mission: str                             # 总任务描述
-    goals: List[Goal] = field(default_factory=list)
+    goals: List[Goal] = field(default_factory=list)  # 扁平列表(通过 parent_id 构建层级)
     current_id: Optional[str] = None         # 当前焦点 goal ID
+    _next_id: int = 1                        # 内部 ID 计数器(私有字段)
     created_at: datetime = field(default_factory=datetime.now)
 
     def find(self, goal_id: str) -> Optional[Goal]:
         """按 ID 查找 Goal"""
-        def search(goals: List[Goal]) -> Optional[Goal]:
-            for goal in goals:
-                if goal.id == goal_id:
-                    return goal
-                found = search(goal.children)
-                if found:
-                    return found
-            return None
-        return search(self.goals)
+        for goal in self.goals:
+            if goal.id == goal_id:
+                return goal
+        return None
 
     def find_parent(self, goal_id: str) -> Optional[Goal]:
         """查找指定 Goal 的父节点"""
-        def search(goals: List[Goal], parent: Optional[Goal] = None) -> Optional[Goal]:
-            for goal in goals:
-                if goal.id == goal_id:
-                    return parent
-                found = search(goal.children, goal)
-                if found is not None:
-                    return found
+        goal = self.find(goal_id)
+        if not goal or not goal.parent_id:
             return None
-        return search(self.goals, None)
+        return self.find(goal.parent_id)
+
+    def get_children(self, parent_id: Optional[str]) -> List[Goal]:
+        """获取指定父节点的所有子节点"""
+        return [g for g in self.goals if g.parent_id == parent_id]
 
     def get_current(self) -> Optional[Goal]:
         """获取当前焦点 Goal"""
@@ -98,35 +214,64 @@ class GoalTree:
             return self.find(self.current_id)
         return None
 
-    def _generate_id(self, parent_id: Optional[str], sibling_count: int) -> str:
-        """生成新的 Goal ID"""
-        new_index = sibling_count + 1
-        if parent_id:
-            return f"{parent_id}.{new_index}"
-        return str(new_index)
-
-    def add_goals(self, descriptions: List[str], parent_id: Optional[str] = None) -> List[Goal]:
+    def _generate_id(self) -> str:
+        """生成新的 Goal ID(纯自增)"""
+        new_id = str(self._next_id)
+        self._next_id += 1
+        return new_id
+
+    def _generate_display_id(self, goal: Goal) -> str:
+        """生成显示序号(1, 2, 2.1, 2.2...)"""
+        if not goal.parent_id:
+            # 顶层目标:找到在同级中的序号
+            siblings = [g for g in self.goals if g.parent_id is None and g.status != "abandoned"]
+            try:
+                index = [g.id for g in siblings].index(goal.id) + 1
+                return str(index)
+            except ValueError:
+                return "?"
+        else:
+            # 子目标:父序号 + "." + 在同级中的序号
+            parent = self.find(goal.parent_id)
+            if not parent:
+                return "?"
+            parent_display = self._generate_display_id(parent)
+            siblings = [g for g in self.goals if g.parent_id == goal.parent_id and g.status != "abandoned"]
+            try:
+                index = [g.id for g in siblings].index(goal.id) + 1
+                return f"{parent_display}.{index}"
+            except ValueError:
+                return f"{parent_display}.?"
+
+    def add_goals(
+        self,
+        descriptions: List[str],
+        reasons: Optional[List[str]] = None,
+        parent_id: Optional[str] = None
+    ) -> List[Goal]:
         """
         添加目标
 
         如果 parent_id 为 None,添加到顶层
         如果 parent_id 有值,添加为该 goal 的子目标
         """
-        # 确定添加位置
         if parent_id:
             parent = self.find(parent_id)
             if not parent:
                 raise ValueError(f"Parent goal not found: {parent_id}")
-            target_list = parent.children
-        else:
-            target_list = self.goals
 
         # 创建新目标
         new_goals = []
-        for desc in descriptions:
-            goal_id = self._generate_id(parent_id, len(target_list))
-            goal = Goal(id=goal_id, description=desc.strip())
-            target_list.append(goal)
+        for i, desc in enumerate(descriptions):
+            goal_id = self._generate_id()
+            reason = reasons[i] if reasons and i < len(reasons) else ""
+            goal = Goal(
+                id=goal_id,
+                description=desc.strip(),
+                reason=reason,
+                parent_id=parent_id
+            )
+            self.goals.append(goal)
             new_goals.append(goal)
 
         return new_goals
@@ -157,6 +302,18 @@ class GoalTree:
         if self.current_id == goal_id:
             self.current_id = None
 
+        # 检查是否所有兄弟都完成了,如果是则自动完成父节点
+        if goal.parent_id:
+            siblings = self.get_children(goal.parent_id)
+            all_completed = all(g.status == "completed" for g in siblings)
+            if all_completed:
+                parent = self.find(goal.parent_id)
+                if parent and parent.status != "completed":
+                    # 自动级联完成父节点
+                    parent.status = "completed"
+                    if not parent.summary:
+                        parent.summary = "所有子目标已完成"
+
         return goal
 
     def abandon(self, goal_id: str, reason: str) -> Goal:
@@ -174,20 +331,29 @@ class GoalTree:
 
         return goal
 
-    def to_prompt(self) -> str:
-        """格式化为 Prompt 注入文本"""
+    def to_prompt(self, include_abandoned: bool = False) -> str:
+        """
+        格式化为 Prompt 注入文本
+
+        过滤掉 abandoned 目标,重新生成连续显示序号
+        """
         lines = []
         lines.append(f"**Mission**: {self.mission}")
 
         if self.current_id:
             current = self.find(self.current_id)
             if current:
-                lines.append(f"**Current**: {self.current_id} {current.description}")
+                display_id = self._generate_display_id(current)
+                lines.append(f"**Current**: {display_id} {current.description}")
 
         lines.append("")
         lines.append("**Progress**:")
 
         def format_goal(goal: Goal, indent: int = 0) -> List[str]:
+            # 跳过废弃的目标(除非明确要求包含)
+            if goal.status == "abandoned" and not include_abandoned:
+                return []
+
             prefix = "    " * indent
 
             # 状态图标
@@ -200,22 +366,28 @@ class GoalTree:
             else:
                 icon = "[ ]"
 
+            # 生成显示序号
+            display_id = self._generate_display_id(goal)
+
             # 当前焦点标记
             current_mark = " ← current" if goal.id == self.current_id else ""
 
-            result = [f"{prefix}{icon} {goal.id}. {goal.description}{current_mark}"]
+            result = [f"{prefix}{icon} {display_id}. {goal.description}{current_mark}"]
 
             # 显示 summary(如果有)
             if goal.summary:
                 result.append(f"{prefix}    → {goal.summary}")
 
             # 递归处理子目标
-            for child in goal.children:
+            children = self.get_children(goal.id)
+            for child in children:
                 result.extend(format_goal(child, indent + 1))
 
             return result
 
-        for goal in self.goals:
+        # 处理所有顶层目标
+        top_goals = self.get_children(None)
+        for goal in top_goals:
             lines.extend(format_goal(goal))
 
         return "\n".join(lines)
@@ -226,6 +398,7 @@ class GoalTree:
             "mission": self.mission,
             "goals": [g.to_dict() for g in self.goals],
             "current_id": self.current_id,
+            "_next_id": self._next_id,
             "created_at": self.created_at.isoformat() if self.created_at else None,
         }
 
@@ -241,6 +414,7 @@ class GoalTree:
             mission=data["mission"],
             goals=goals,
             current_id=data.get("current_id"),
+            _next_id=data.get("_next_id", 1),
             created_at=created_at or datetime.now(),
         )
 

+ 41 - 10
agent/goal/tool.py

@@ -25,7 +25,7 @@ def goal_tool(
         add: 添加目标(逗号分隔多个)。添加到当前 focus 的 goal 下作为子目标。
         done: 完成当前目标,值为 summary
         abandon: 放弃当前目标,值为原因
-        focus: 切换焦点到指定 id
+        focus: 切换焦点到指定内部 id
 
     Returns:
         更新后的计划状态文本
@@ -37,19 +37,44 @@ def goal_tool(
         if not tree.current_id:
             return "错误:没有当前目标可以放弃"
         goal = tree.abandon(tree.current_id, abandon)
-        changes.append(f"已放弃: {goal.id}. {goal.description}")
+        display_id = tree._generate_display_id(goal)
+        changes.append(f"已放弃: {display_id}. {goal.description}")
 
     # 2. 处理 done
     if done is not None:
         if not tree.current_id:
             return "错误:没有当前目标可以完成"
         goal = tree.complete(tree.current_id, done)
-        changes.append(f"已完成: {goal.id}. {goal.description}")
+        display_id = tree._generate_display_id(goal)
+        changes.append(f"已完成: {display_id}. {goal.description}")
+
+        # 检查是否有级联完成的父目标
+        if goal.parent_id:
+            parent = tree.find(goal.parent_id)
+            if parent and parent.status == "completed":
+                parent_display_id = tree._generate_display_id(parent)
+                changes.append(f"自动完成: {parent_display_id}. {parent.description}(所有子目标已完成)")
 
     # 3. 处理 focus(在 add 之前,这样 add 可以添加到新焦点下)
     if focus is not None:
-        goal = tree.focus(focus)
-        changes.append(f"切换焦点: {goal.id}. {goal.description}")
+        # focus 参数可以是内部 ID 或显示 ID
+        # 先尝试作为内部 ID 查找
+        goal = tree.find(focus)
+
+        # 如果找不到,尝试根据显示 ID 查找
+        if not goal:
+            # 通过遍历所有 goal 查找匹配的显示 ID
+            for g in tree.goals:
+                if tree._generate_display_id(g) == focus:
+                    goal = g
+                    break
+
+        if not goal:
+            return f"错误:找不到目标 {focus}"
+
+        tree.focus(goal.id)
+        display_id = tree._generate_display_id(goal)
+        changes.append(f"切换焦点: {display_id}. {goal.description}")
 
     # 4. 处理 add
     if add is not None:
@@ -57,16 +82,19 @@ def goal_tool(
         if descriptions:
             # 添加到当前焦点下(如果有焦点),否则添加到顶层
             parent_id = tree.current_id
-            new_goals = tree.add_goals(descriptions, parent_id)
+            new_goals = tree.add_goals(descriptions, parent_id=parent_id)
+
             if parent_id:
-                changes.append(f"在 {parent_id} 下添加 {len(new_goals)} 个子目标")
+                parent_display_id = tree._generate_display_id(tree.find(parent_id))
+                changes.append(f"在 {parent_display_id} 下添加 {len(new_goals)} 个子目标")
             else:
                 changes.append(f"添加 {len(new_goals)} 个顶层目标")
 
             # 如果没有焦点且添加了目标,自动 focus 到第一个新目标
             if not tree.current_id and new_goals:
                 tree.focus(new_goals[0].id)
-                changes.append(f"自动切换焦点: {new_goals[0].id}")
+                display_id = tree._generate_display_id(new_goals[0])
+                changes.append(f"自动切换焦点: {display_id}")
 
     # 返回当前状态
     result = []
@@ -90,13 +118,16 @@ def create_goal_tool_schema() -> dict:
 - add: 添加目标(逗号分隔多个)。添加到当前 focus 的 goal 下作为子目标。
 - done: 完成当前目标,值为 summary
 - abandon: 放弃当前目标,值为原因(会触发 context 压缩)
-- focus: 切换焦点到指定 id
+- focus: 切换焦点到指定 id(可以是内部 ID 或显示 ID)
 
 示例:
 - goal(add="分析代码, 实现功能, 测试") - 添加顶层目标
 - goal(focus="2", add="设计接口, 实现代码") - 切换到目标2,并添加子目标
 - goal(done="发现用户模型在 models/user.py") - 完成当前目标
 - goal(abandon="方案A需要Redis,环境没有", add="实现方案B") - 放弃当前并添加新目标
+
+注意:内部 ID 是纯自增数字("1", "2", "3"),显示 ID 是带层级的("1", "2.1", "2.2")。
+focus 参数可以使用任意格式的 ID。
 """,
         "parameters": {
             "type": "object",
@@ -115,7 +146,7 @@ def create_goal_tool_schema() -> dict:
                 },
                 "focus": {
                     "type": "string",
-                    "description": "切换焦点到指定 goal id"
+                    "description": "切换焦点到指定 goal id(可以是内部 ID 或显示 ID)"
                 }
             },
             "required": []

+ 2 - 0
agent/tools/builtin/__init__.py

@@ -14,6 +14,7 @@ from agent.tools.builtin.glob import glob_files
 from agent.tools.builtin.grep import grep_content
 from agent.tools.builtin.bash import bash_command
 from agent.tools.builtin.skill import skill, list_skills
+from agent.tools.builtin.goal import goal
 from agent.tools.builtin.search import search_posts, get_search_suggestions
 from agent.tools.builtin.sandbox import (sandbox_create_environment, sandbox_run_shell,
                                          sandbox_rebuild_with_ports,sandbox_destroy_environment)
@@ -27,6 +28,7 @@ __all__ = [
     "bash_command",
     "skill",
     "list_skills",
+    "goal",
     "search_posts",
     "get_search_suggestions",
     "sandbox_create_environment",

+ 64 - 0
agent/tools/builtin/goal.py

@@ -0,0 +1,64 @@
+"""
+Goal 工具 - 执行计划管理
+
+提供 LLM 可调用的 goal 工具,用于管理执行计划(GoalTree)。
+"""
+
+from typing import Optional
+
+
+# 全局 GoalTree 引用(由 AgentRunner 注入)
+_current_goal_tree = None
+
+
+def set_goal_tree(tree):
+    """设置当前 GoalTree(由 AgentRunner 调用)"""
+    global _current_goal_tree
+    _current_goal_tree = tree
+
+
+def get_goal_tree():
+    """获取当前 GoalTree"""
+    return _current_goal_tree
+
+
+def goal(
+    add: Optional[str] = None,
+    done: Optional[str] = None,
+    abandon: Optional[str] = None,
+    focus: Optional[str] = None,
+) -> str:
+    """
+    管理执行计划。
+
+    参数:
+        add: 添加目标(逗号分隔多个)。添加到当前 focus 的 goal 下作为子目标。
+        done: 完成当前目标,值为 summary
+        abandon: 放弃当前目标,值为原因(会触发 context 压缩)
+        focus: 切换焦点到指定 id(可以是内部 ID 或显示 ID)
+
+    示例:
+        goal(add="分析代码, 实现功能, 测试") - 添加顶层目标
+        goal(focus="2", add="设计接口, 实现代码") - 切换到目标2,并添加子目标
+        goal(done="发现用户模型在 models/user.py") - 完成当前目标
+        goal(abandon="方案A需要Redis,环境没有", add="实现方案B") - 放弃当前并添加新目标
+
+    注意: 内部 ID 是纯自增数字("1", "2", "3"),显示 ID 是带层级的("1", "2.1", "2.2")。
+    focus 参数可以使用任意格式的 ID。
+
+    返回:
+        str: 更新后的计划状态文本
+    """
+    from agent.goal.tool import goal_tool
+
+    tree = get_goal_tree()
+    if tree is None:
+        return "错误:GoalTree 未初始化"
+
+    return goal_tool(
+        tree=tree,
+        add=add,
+        done=done,
+        abandon=abandon,
+        focus=focus
+    )

+ 5 - 0
docs/context-management.md

@@ -256,6 +256,7 @@ class Message:
     goal_id: str                         # 关联的 Goal 内部 ID
     tool_call_id: Optional[str]          # tool 消息关联对应的 tool_call
     content: Any                         # 消息内容(和 LLM API 格式一致)
+    description: str                     # 消息描述(系统自动生成)
 
     # 元数据
     tokens: Optional[int] = None
@@ -263,6 +264,10 @@ class Message:
     created_at: datetime
 ```
 
+**description 字段**(系统自动生成):
+- `assistant` 消息:优先取 content 中的 text,若无 text 则生成 "tool call: XX, XX"
+- `tool` 消息:使用 tool name
+
 **实现**:`agent/execution/models.py:Message`
 
 **Message 类型说明**:

+ 2 - 2
docs/tools.md

@@ -84,7 +84,7 @@ async def search_notes(
     uid: str = ""
 ) -> str:
     """
-    搜索用户的笔记
+    搜索笔记
 
     Args:
         query: 搜索关键词
@@ -93,7 +93,7 @@ async def search_notes(
     Returns:
         JSON 格式的搜索结果
     """
-    # 自动从 docstring 提取 function description 和 parameter descriptions
+    # 支持自动从 docstring 提取 function description 和 parameter descriptions
     ...
 ```
 

+ 4 - 0
docs/trace-api.md

@@ -70,6 +70,10 @@ tool_msg = Message.create(
 )
 ```
 
+**description 字段**(系统自动生成):
+- `assistant` 消息:优先取 content 中的 text,若无 text 则生成 "tool call: XX, XX"
+- `tool` 消息:使用 tool name
+
 **实现**:`agent/execution/models.py:Message`
 
 ---

+ 29 - 26
examples/feature_extract/run.py

@@ -20,7 +20,7 @@ from agent.core.runner import AgentRunner
 from agent.execution import (
     FileSystemTraceStore,
     Trace,
-    Step,
+    Message,
 )
 from agent.llm import create_openrouter_llm_call
 
@@ -111,39 +111,42 @@ async def main():
         # 处理 Trace 对象(整体状态变化)
         if isinstance(item, Trace):
             current_trace_id = item.trace_id  # 保存 trace_id
-            if item.status == "in_progress":
+            if item.status == "running":
                 print(f"[Trace] 开始: {item.trace_id[:8]}")
             elif item.status == "completed":
                 print(f"[Trace] 完成")
+                print(f"  - Total messages: {item.total_messages}")
                 print(f"  - Total tokens: {item.total_tokens}")
                 print(f"  - Total cost: ${item.total_cost:.4f}")
             elif item.status == "failed":
                 print(f"[Trace] 失败")
 
-        # 处理 Step 对象(执行过程)
-        elif isinstance(item, Step):
-            if item.step_type == "memory_read":
-                exp_count = item.data.get('experiences_count', 0)
-                if exp_count > 0:
-                    print(f"[Memory] 加载 {exp_count} 条经验")
-
-            elif item.step_type == "thought":
-                if item.status == "completed":
-                    content = item.data.get('content', '')
-                    if content:
-                        print(f"[Thought] {content[:100]}...")
-
-            elif item.step_type == "action":
-                tool_name = item.data.get('tool_name', '')
-                print(f"[Tool] 执行 {tool_name}")
-
-            elif item.step_type == "result":
-                tool_name = item.data.get('tool_name', '')
-                print(f"[Tool] {tool_name} 完成")
-
-            elif item.step_type == "response":
-                final_response = item.data.get('content', '')
-                print(f"[Response] Agent 完成")
+        # 处理 Message 对象(执行过程)
+        elif isinstance(item, Message):
+            if item.role == "assistant":
+                content = item.content
+                if isinstance(content, dict):
+                    text = content.get("text", "")
+                    tool_calls = content.get("tool_calls")
+
+                    if text and not tool_calls:
+                        # 纯文本回复(最终响应)
+                        final_response = text
+                        print(f"[Response] Agent 完成")
+                    elif text:
+                        print(f"[Assistant] {text[:100]}...")
+
+                    if tool_calls:
+                        for tc in tool_calls:
+                            tool_name = tc.get("function", {}).get("name", "unknown")
+                            print(f"[Tool Call] {tool_name}")
+
+            elif item.role == "tool":
+                content = item.content
+                if isinstance(content, dict):
+                    tool_name = content.get("tool_name", "unknown")
+                    print(f"[Tool Result] {tool_name}")
+                print(f"  {item.description[:80]}...")
 
     # 6. 输出结果
     print()

+ 276 - 0
examples/test_plan.py

@@ -0,0 +1,276 @@
+"""
+测试新的 Plan 系统
+
+测试 GoalTree、Message、TraceStore 的基本功能
+"""
+
+import asyncio
+import sys
+from pathlib import Path
+
+# 添加项目根目录到 Python 路径
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from agent.goal.models import GoalTree, Goal, GoalStats
+from agent.execution.models import Trace, Message
+from agent.execution.fs_store import FileSystemTraceStore
+from agent.goal.tool import goal_tool
+
+
+async def test_basic_plan():
+    """测试基本的计划功能"""
+    print("=" * 60)
+    print("测试 1: 基本计划功能")
+    print("=" * 60)
+    print()
+
+    # 1. 创建 GoalTree
+    tree = GoalTree(mission="实现用户认证功能")
+    print("1. 创建 GoalTree")
+    print(f"   Mission: {tree.mission}")
+    print()
+
+    # 2. 添加顶层目标
+    print("2. 添加顶层目标")
+    result = goal_tool(tree, add="分析代码, 实现功能, 测试")
+    print(result)
+    print()
+
+    # 3. Focus 到目标 2
+    print("3. Focus 到目标 2")
+    result = goal_tool(tree, focus="2")
+    print(result)
+    print()
+
+    # 4. 添加子目标
+    print("4. 在目标 2 下添加子目标")
+    result = goal_tool(tree, add="设计接口, 实现代码, 单元测试")
+    print(result)
+    print()
+
+    # 5. Focus 到子目标并完成
+    print("5. Focus 到 2.1 并完成")
+    result = goal_tool(tree, focus="3")  # 内部 ID 是 3("2.1" 的内部 ID)
+    print(result)
+    print()
+
+    # 通过显示 ID 完成
+    print("6. 完成目标(使用内部 ID)")
+    result = goal_tool(tree, done="API 接口设计完成,定义了登录和注册端点")
+    print(result)
+    print()
+
+    # 7. 测试 abandon
+    print("7. Focus 到 4(2.2)并放弃")
+    result = goal_tool(tree, focus="4")
+    print(result)
+    print()
+
+    result = goal_tool(tree, abandon="发现需求变更,需要改用 OAuth")
+    print(result)
+    print()
+
+    # 8. 添加新方案
+    print("8. 添加新的实现方案")
+    result = goal_tool(tree, add="实现 OAuth 认证")
+    print(result)
+    print()
+
+    # 9. 查看最终状态
+    print("9. 查看完整计划(包含废弃目标)")
+    print(tree.to_prompt(include_abandoned=True))
+    print()
+
+    # 10. 查看过滤后的计划(默认不显示废弃目标)
+    print("10. 查看过滤后的计划(默认)")
+    print(tree.to_prompt())
+    print()
+
+
+async def test_trace_store():
+    """测试 TraceStore 的 GoalTree 和 Message 存储"""
+    print("=" * 60)
+    print("测试 2: TraceStore 存储功能")
+    print("=" * 60)
+    print()
+
+    # 创建 TraceStore
+    store = FileSystemTraceStore(base_path=".trace_test")
+
+    # 1. 创建 Trace
+    print("1. 创建 Trace")
+    trace = Trace.create(
+        mode="agent",
+        task="测试任务"
+    )
+    await store.create_trace(trace)
+    print(f"   Trace ID: {trace.trace_id[:8]}...")
+    print()
+
+    # 2. 创建并保存 GoalTree
+    print("2. 创建并保存 GoalTree")
+    tree = GoalTree(mission="测试任务")
+    tree.add_goals(["分析", "实现", "测试"])
+    await store.update_goal_tree(trace.trace_id, tree)
+    print(f"   添加了 {len(tree.goals)} 个目标")
+    print()
+
+    # 3. 添加 Messages
+    print("3. 添加 Messages")
+
+    # Focus 到第一个目标
+    tree.focus("1")
+    await store.update_goal_tree(trace.trace_id, tree)
+
+    # 添加 assistant message
+    msg1 = Message.create(
+        trace_id=trace.trace_id,
+        role="assistant",
+        sequence=1,
+        goal_id="1",
+        content={"text": "开始分析代码", "tool_calls": [
+            {
+                "id": "call_1",
+                "function": {
+                    "name": "read_file",
+                    "arguments": '{"path": "src/main.py"}'
+                }
+            }
+        ]},
+        tokens=100,
+        cost=0.002
+    )
+    await store.add_message(msg1)
+    print(f"   Message 1: {msg1.description}")
+
+    # 添加 tool message
+    msg2 = Message.create(
+        trace_id=trace.trace_id,
+        role="tool",
+        sequence=2,
+        goal_id="1",
+        tool_call_id="call_1",
+        content={"tool_name": "read_file", "result": "文件内容..."},
+        tokens=50,
+        cost=0.001
+    )
+    await store.add_message(msg2)
+    print(f"   Message 2: {msg2.description}")
+    print()
+
+    # 4. 查看更新后的 GoalTree(stats 应该自动更新)
+    print("4. 查看更新后的 GoalTree(含 stats)")
+    updated_tree = await store.get_goal_tree(trace.trace_id)
+    goal1 = updated_tree.find("1")
+    print(f"   Goal 1 stats:")
+    print(f"     - message_count: {goal1.self_stats.message_count}")
+    print(f"     - total_tokens: {goal1.self_stats.total_tokens}")
+    print(f"     - total_cost: ${goal1.self_stats.total_cost:.4f}")
+    print()
+
+    # 5. 添加子目标和 Message
+    print("5. 添加子目标和 Message")
+    updated_tree.add_goals(["读取配置", "解析代码"], parent_id="1")
+    updated_tree.focus("4")  # Focus 到第一个子目标
+    await store.update_goal_tree(trace.trace_id, updated_tree)
+
+    msg3 = Message.create(
+        trace_id=trace.trace_id,
+        role="assistant",
+        sequence=3,
+        goal_id="4",
+        content={"text": "读取配置文件"},
+        tokens=80,
+        cost=0.0015
+    )
+    await store.add_message(msg3)
+    print(f"   添加子目标 Message: {msg3.description}")
+    print()
+
+    # 6. 查看累计 stats(父节点应该包含子节点的统计)
+    print("6. 查看累计 stats")
+    updated_tree = await store.get_goal_tree(trace.trace_id)
+    goal1 = updated_tree.find("1")
+    print(f"   Goal 1 cumulative stats:")
+    print(f"     - message_count: {goal1.cumulative_stats.message_count}")
+    print(f"     - total_tokens: {goal1.cumulative_stats.total_tokens}")
+    print(f"     - total_cost: ${goal1.cumulative_stats.total_cost:.4f}")
+    print()
+
+    # 7. 查看 Messages
+    print("7. 查询 Messages")
+    all_messages = await store.get_trace_messages(trace.trace_id)
+    print(f"   总共 {len(all_messages)} 条 Messages")
+
+    goal1_messages = await store.get_messages_by_goal(trace.trace_id, "1")
+    print(f"   Goal 1 的 Messages: {len(goal1_messages)} 条")
+    print()
+
+    # 8. 显示完整 GoalTree
+    print("8. 完整 GoalTree")
+    print(updated_tree.to_prompt())
+    print()
+
+    # 9. 测试级联完成
+    print("9. 测试级联完成")
+    updated_tree.focus("4")
+    updated_tree.complete("4", "配置读取完成")
+    updated_tree.focus("5")
+    updated_tree.complete("5", "代码解析完成")
+
+    # 检查父节点是否自动完成
+    goal1 = updated_tree.find("1")
+    print(f"   Goal 1 status: {goal1.status}")
+    print(f"   Goal 1 summary: {goal1.summary}")
+    print()
+
+    print("✅ TraceStore 测试完成!")
+    print(f"   数据保存在: .trace_test/{trace.trace_id[:8]}...")
+    print()
+
+
+async def test_display_ids():
+    """测试显示 ID 的生成"""
+    print("=" * 60)
+    print("测试 3: 显示 ID 生成")
+    print("=" * 60)
+    print()
+
+    tree = GoalTree(mission="测试显示 ID")
+
+    # 添加多层嵌套目标
+    tree.add_goals(["A", "B", "C"])
+    tree.focus("2")
+    tree.add_goals(["B1", "B2"])
+    tree.focus("4")
+    tree.add_goals(["B1-1", "B1-2"])
+
+    print("完整结构:")
+    print(tree.to_prompt())
+    print()
+
+    # 测试 abandon 后的重新编号
+    print("放弃 B1-1 后:")
+    tree.focus("6")
+    tree.abandon("6", "测试废弃")
+    print(tree.to_prompt())
+    print()
+
+    print("包含废弃目标的完整视图:")
+    print(tree.to_prompt(include_abandoned=True))
+    print()
+
+
+async def main():
+    """运行所有测试"""
+    await test_basic_plan()
+    await test_trace_store()
+    await test_display_ids()
+
+    print("=" * 60)
+    print("所有测试完成!")
+    print("=" * 60)
+
+
+if __name__ == "__main__":
+    asyncio.run(main())