""" Goal 数据模型 Goal: 执行计划中的目标节点 GoalTree: 目标树,管理整个执行计划 GoalStats: 目标统计信息 """ from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, List, Optional, Literal import json # Goal 状态 GoalStatus = Literal["pending", "in_progress", "completed", "abandoned"] # Goal 类型 GoalType = Literal["normal", "agent_call"] @dataclass class GoalStats: """目标统计信息""" message_count: int = 0 # 消息数量 total_tokens: int = 0 # Token 总数 total_cost: float = 0.0 # 总成本 preview: Optional[str] = None # 工具调用摘要,如 "read_file → edit_file → bash" def to_dict(self) -> Dict[str, Any]: return { "message_count": self.message_count, "total_tokens": self.total_tokens, "total_cost": self.total_cost, "preview": self.preview, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "GoalStats": return cls( message_count=data.get("message_count", 0), total_tokens=data.get("total_tokens", 0), total_cost=data.get("total_cost", 0.0), preview=data.get("preview"), ) @dataclass class Goal: """ 执行目标 使用扁平列表 + parent_id 构建层级结构。 agent_call 类型用于标记启动了 Sub-Trace 的 Goal。 """ id: str # 内部唯一 ID,纯自增("1", "2", "3"...) description: str # 目标描述 reason: str = "" # 创建理由(为什么做) parent_id: Optional[str] = None # 父 Goal ID(层级关系) type: GoalType = "normal" # Goal 类型 status: GoalStatus = "pending" # 状态 summary: Optional[str] = None # 完成/放弃时的总结 # agent_call 特有 sub_trace_ids: Optional[List[Dict[str, str]]] = None # 启动的 Sub-Trace 信息 [{"trace_id": "...", "mission": "..."}] agent_call_mode: Optional[str] = None # "explore" | "delegate" | "sequential" sub_trace_metadata: Optional[Dict[str, Dict[str, Any]]] = None # Sub-Trace 元数据 # 统计(后端维护,用于可视化边的数据) self_stats: GoalStats = field(default_factory=GoalStats) # 自身统计(仅直接关联的 messages) cumulative_stats: GoalStats = field(default_factory=GoalStats) # 累计统计(自身 + 所有后代) # 相关知识(自动检索注入) knowledge: Optional[List[Dict[str, Any]]] = None # 相关知识列表 created_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "id": self.id, "description": self.description, "reason": self.reason, "parent_id": self.parent_id, "type": self.type, "status": self.status, "summary": self.summary, "sub_trace_ids": self.sub_trace_ids, "agent_call_mode": self.agent_call_mode, "sub_trace_metadata": self.sub_trace_metadata, "self_stats": self.self_stats.to_dict(), "cumulative_stats": self.cumulative_stats.to_dict(), "knowledge": self.knowledge, "created_at": self.created_at.isoformat() if self.created_at else None, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Goal": """从字典创建""" created_at = data.get("created_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at) self_stats = data.get("self_stats", {}) if isinstance(self_stats, dict): self_stats = GoalStats.from_dict(self_stats) cumulative_stats = data.get("cumulative_stats", {}) if isinstance(cumulative_stats, dict): cumulative_stats = GoalStats.from_dict(cumulative_stats) return cls( id=data["id"], description=data["description"], reason=data.get("reason", ""), parent_id=data.get("parent_id"), type=data.get("type", "normal"), status=data.get("status", "pending"), summary=data.get("summary"), sub_trace_ids=data.get("sub_trace_ids"), agent_call_mode=data.get("agent_call_mode"), sub_trace_metadata=data.get("sub_trace_metadata"), self_stats=self_stats, cumulative_stats=cumulative_stats, knowledge=data.get("knowledge"), created_at=created_at or datetime.now(), ) @dataclass class GoalTree: """ 目标树 - 管理整个执行计划 使用扁平列表 + parent_id 构建层级结构 """ mission: str # 总任务描述 goals: List[Goal] = field(default_factory=list) # 扁平列表(通过 parent_id 构建层级) current_id: Optional[str] = None # 当前焦点 goal ID _next_id: int = 1 # 内部 ID 计数器(私有字段) created_at: datetime = field(default_factory=datetime.now) def find(self, goal_id: str) -> Optional[Goal]: """按 ID 查找 Goal""" for goal in self.goals: if goal.id == goal_id: return goal return None def find_by_display_id(self, display_id: str) -> Optional[Goal]: """按显示 ID 查找 Goal(如 "1", "2.1", "2.2")""" for goal in self.goals: if self._generate_display_id(goal) == display_id: return goal return None def find_parent(self, goal_id: str) -> Optional[Goal]: """查找指定 Goal 的父节点""" goal = self.find(goal_id) if not goal or not goal.parent_id: return None return self.find(goal.parent_id) def get_children(self, parent_id: Optional[str]) -> List[Goal]: """获取指定父节点的所有子节点""" return [g for g in self.goals if g.parent_id == parent_id] def get_current(self) -> Optional[Goal]: """获取当前焦点 Goal""" if self.current_id: return self.find(self.current_id) return None def _generate_id(self) -> str: """生成新的 Goal ID(纯自增)""" new_id = str(self._next_id) self._next_id += 1 return new_id def _generate_display_id(self, goal: Goal) -> str: """生成显示序号(1, 2, 2.1, 2.2...)""" if not goal.parent_id: # 顶层目标:找到在同级中的序号 siblings = [g for g in self.goals if g.parent_id is None and g.status != "abandoned"] try: index = [g.id for g in siblings].index(goal.id) + 1 return str(index) except ValueError: return "?" else: # 子目标:父序号 + "." + 在同级中的序号 parent = self.find(goal.parent_id) if not parent: return "?" parent_display = self._generate_display_id(parent) siblings = [g for g in self.goals if g.parent_id == goal.parent_id and g.status != "abandoned"] try: index = [g.id for g in siblings].index(goal.id) + 1 return f"{parent_display}.{index}" except ValueError: return f"{parent_display}.?" def add_goals( self, descriptions: List[str], reasons: Optional[List[str]] = None, parent_id: Optional[str] = None ) -> List[Goal]: """ 添加目标 如果 parent_id 为 None,添加到顶层 如果 parent_id 有值,添加为该 goal 的子目标 """ if parent_id: parent = self.find(parent_id) if not parent: raise ValueError(f"Parent goal not found: {parent_id}") # 创建新目标 new_goals = [] for i, desc in enumerate(descriptions): goal_id = self._generate_id() reason = reasons[i] if reasons and i < len(reasons) else "" goal = Goal( id=goal_id, description=desc.strip(), reason=reason, parent_id=parent_id ) self.goals.append(goal) new_goals.append(goal) return new_goals def add_goals_after( self, target_id: str, descriptions: List[str], reasons: Optional[List[str]] = None ) -> List[Goal]: """ 在指定 Goal 后面添加兄弟节点 新创建的 goals 与 target 有相同的 parent_id, 并插入到 goals 列表中 target 的后面。 """ target = self.find(target_id) if not target: raise ValueError(f"Target goal not found: {target_id}") # 创建新 goals(parent_id 与 target 相同) new_goals = [] for i, desc in enumerate(descriptions): goal_id = self._generate_id() reason = reasons[i] if reasons and i < len(reasons) else "" goal = Goal( id=goal_id, description=desc.strip(), reason=reason, parent_id=target.parent_id # 同层级 ) new_goals.append(goal) # 插入到 target 后面(调整 goals 列表顺序) target_index = self.goals.index(target) for i, goal in enumerate(new_goals): self.goals.insert(target_index + 1 + i, goal) return new_goals def focus(self, goal_id: str) -> Goal: """切换焦点到指定 Goal,并将其状态设为 in_progress""" goal = self.find(goal_id) if not goal: raise ValueError(f"Goal not found: {goal_id}") # 更新状态 if goal.status == "pending": goal.status = "in_progress" self.current_id = goal_id return goal def complete(self, goal_id: str, summary: str, clear_focus: bool = True) -> Goal: """ 完成指定 Goal Args: goal_id: 要完成的目标 ID summary: 完成总结 clear_focus: 如果完成的是当前焦点,是否清除焦点(默认 True) """ goal = self.find(goal_id) if not goal: raise ValueError(f"Goal not found: {goal_id}") goal.status = "completed" goal.summary = summary # 如果完成的是当前焦点,根据参数决定是否清除焦点 if clear_focus and self.current_id == goal_id: self.current_id = None # 检查是否所有兄弟都完成了,如果是则自动完成父节点 if goal.parent_id: siblings = self.get_children(goal.parent_id) all_completed = all(g.status == "completed" for g in siblings) if all_completed: parent = self.find(goal.parent_id) if parent and parent.status != "completed": # 自动级联完成父节点 parent.status = "completed" if not parent.summary: parent.summary = "所有子目标已完成" return goal def abandon(self, goal_id: str, reason: str) -> Goal: """放弃指定 Goal""" goal = self.find(goal_id) if not goal: raise ValueError(f"Goal not found: {goal_id}") goal.status = "abandoned" goal.summary = reason # 如果放弃的是当前焦点,清除焦点 if self.current_id == goal_id: self.current_id = None return goal def to_prompt(self, include_abandoned: bool = False, include_summary: bool = False) -> str: """ 格式化为 Prompt 注入文本 Args: include_abandoned: 是否包含已废弃的目标 include_summary: 是否显示 completed/abandoned goals 的 summary 详情 False(默认)= 精简视图,用于日常周期性注入 True = 完整视图(含 summary),用于压缩时提供上下文 展示策略: - 过滤掉 abandoned 目标(除非明确要求) - 完整展示所有顶层目标 - 完整展示当前 focus 目标的父链及其所有子孙 - 其他分支的子目标折叠显示(只显示数量和状态) - include_summary=True 时不折叠,全部展开并显示 summary """ lines = [] lines.append(f"**Mission**: {self.mission}") if self.current_id: current = self.find(self.current_id) if current: display_id = self._generate_display_id(current) lines.append(f"**Current**: {display_id} {current.description}") lines.append("") lines.append("**Progress**:") # 获取当前焦点的祖先链(从根到当前节点的路径) current_path = set() if self.current_id: goal = self.find(self.current_id) while goal: current_path.add(goal.id) if goal.parent_id: goal = self.find(goal.parent_id) else: break def format_goal(goal: Goal, indent: int = 0) -> List[str]: # 跳过废弃的目标(除非明确要求包含) if goal.status == "abandoned" and not include_abandoned: return [] prefix = " " * indent # 状态图标 if goal.status == "completed": icon = "[✓]" elif goal.status == "in_progress": icon = "[→]" elif goal.status == "abandoned": icon = "[✗]" else: icon = "[ ]" # 生成显示序号 display_id = self._generate_display_id(goal) # 当前焦点标记 current_mark = " ← current" if goal.id == self.current_id else "" result = [f"{prefix}{icon} {display_id}. {goal.description}{current_mark}"] # 显示 summary:include_summary=True 时全部显示,否则只在焦点路径上显示 if goal.summary and (include_summary or goal.id in current_path): result.append(f"{prefix} → {goal.summary}") # 显示相关知识:仅在当前焦点 goal 显示 if goal.id == self.current_id and goal.knowledge: result.append(f"{prefix} 📚 相关知识 ({len(goal.knowledge)} 条):") for idx, k in enumerate(goal.knowledge[:3], 1): k_id = k.get('id', 'N/A') # 将多行内容压缩为单行摘要 k_content = k.get('content', '').replace('\n', ' ').strip()[:80] result.append(f"{prefix} {idx}. [{k_id}] {k_content}...") # 递归处理子目标 children = self.get_children(goal.id) # include_summary 模式下不折叠,全部展开 if include_summary: for child in children: result.extend(format_goal(child, indent + 1)) return result # 判断是否需要折叠 # 如果当前 goal 或其子孙在焦点路径上,完整展示 should_expand = goal.id in current_path or any( child.id in current_path for child in self._get_all_descendants(goal.id) ) if should_expand or not children: # 完整展示子目标 for child in children: result.extend(format_goal(child, indent + 1)) else: # 折叠显示:只显示子目标的统计 non_abandoned = [c for c in children if c.status != "abandoned"] if non_abandoned: completed = sum(1 for c in non_abandoned if c.status == "completed") in_progress = sum(1 for c in non_abandoned if c.status == "in_progress") pending = sum(1 for c in non_abandoned if c.status == "pending") status_parts = [] if completed > 0: status_parts.append(f"{completed} completed") if in_progress > 0: status_parts.append(f"{in_progress} in progress") if pending > 0: status_parts.append(f"{pending} pending") status_str = ", ".join(status_parts) result.append(f"{prefix} ({len(non_abandoned)} subtasks: {status_str})") return result # 处理所有顶层目标 top_goals = self.get_children(None) for goal in top_goals: lines.extend(format_goal(goal)) return "\n".join(lines) def _get_all_descendants(self, goal_id: str) -> List[Goal]: """获取指定 Goal 的所有子孙节点""" descendants = [] children = self.get_children(goal_id) for child in children: descendants.append(child) descendants.extend(self._get_all_descendants(child.id)) return descendants def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "mission": self.mission, "goals": [g.to_dict() for g in self.goals], "current_id": self.current_id, "_next_id": self._next_id, "created_at": self.created_at.isoformat() if self.created_at else None, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "GoalTree": """从字典创建""" goals = [Goal.from_dict(g) for g in data.get("goals", [])] created_at = data.get("created_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at) return cls( mission=data["mission"], goals=goals, current_id=data.get("current_id"), _next_id=data.get("_next_id", 1), created_at=created_at or datetime.now(), ) def rebuild_for_rewind(self, cutoff_time: datetime) -> "GoalTree": """ 为 Rewind 重建干净的 GoalTree 以截断点消息的 created_at 为界: - 保留 created_at <= cutoff_time 的所有 goals(无论状态) - 丢弃 cutoff_time 之后创建的 goals - 将被保留的 in_progress goals 重置为 pending - 清空 current_id,让 Agent 重新选择焦点 Args: cutoff_time: 截断点消息的创建时间 Returns: 新的干净 GoalTree """ surviving_goals = [] for goal in self.goals: if goal.created_at <= cutoff_time: surviving_goals.append(goal) # 清理 parent_id 引用:如果 parent 不在存活列表中,设为 None surviving_ids = {g.id for g in surviving_goals} for goal in surviving_goals: if goal.parent_id and goal.parent_id not in surviving_ids: goal.parent_id = None # 将 in_progress 重置为 pending(回溯后需要重新执行) if goal.status == "in_progress": goal.status = "pending" new_tree = GoalTree( mission=self.mission, goals=surviving_goals, current_id=None, _next_id=self._next_id, created_at=self.created_at, ) return new_tree def save(self, path: str) -> None: """保存到 JSON 文件""" with open(path, "w", encoding="utf-8") as f: json.dump(self.to_dict(), f, ensure_ascii=False, indent=2) @classmethod def load(cls, path: str) -> "GoalTree": """从 JSON 文件加载""" with open(path, "r", encoding="utf-8") as f: data = json.load(f) return cls.from_dict(data)