compaction.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. """
  2. Context 压缩 — 两级压缩策略
  3. Level 1: GoalTree 过滤(确定性,零成本)
  4. - 跳过 completed/abandoned goals 的消息(信息已在 GoalTree summary 中)
  5. - 始终保留:system prompt、第一条 user message、当前 focus goal 的消息
  6. Level 2: LLM 总结(仅在 Level 1 后仍超限时触发)
  7. - 在消息列表末尾追加压缩 prompt → 主模型回复 → summary 存为新消息
  8. - summary 的 parent_sequence 跳过被压缩的范围
  9. 压缩不修改存储:原始消息永远保留在 messages/,通过 parent_sequence 树结构实现跳过。
  10. """
  11. from dataclasses import dataclass
  12. from typing import List, Dict, Any, Optional, Set
  13. from .goal_models import GoalTree
  14. from .models import Message
  15. # ===== 配置 =====
  16. @dataclass
  17. class CompressionConfig:
  18. """压缩配置"""
  19. max_tokens: int = 100000 # 最大 token 数
  20. threshold_ratio: float = 0.8 # 触发 Level 2 的阈值比例(80%)
  21. keep_recent_messages: int = 10 # Level 1 中始终保留最近 N 条消息
  22. # ===== Level 1: GoalTree 过滤 =====
  23. def filter_by_goal_status(
  24. messages: List[Message],
  25. goal_tree: Optional[GoalTree],
  26. ) -> List[Message]:
  27. """
  28. Level 1 过滤:跳过 completed/abandoned goals 的消息
  29. 始终保留:
  30. - goal_id 为 None 的消息(system prompt、初始 user message)
  31. - 当前 focus goal 及其祖先链上的消息
  32. - in_progress 和 pending goals 的消息
  33. 跳过:
  34. - completed 且不在焦点路径上的 goals 的消息
  35. - abandoned goals 的消息
  36. Args:
  37. messages: 主路径上的有序消息列表
  38. goal_tree: GoalTree 实例
  39. Returns:
  40. 过滤后的消息列表
  41. """
  42. if not goal_tree or not goal_tree.goals:
  43. return messages
  44. # 构建焦点路径(当前焦点 + 父链 + 直接子节点)
  45. focus_path = _get_focus_path(goal_tree)
  46. # 构建需要跳过的 goal IDs
  47. skip_goal_ids: Set[str] = set()
  48. for goal in goal_tree.goals:
  49. if goal.id in focus_path:
  50. continue # 焦点路径上的 goal 始终保留
  51. if goal.status in ("completed", "abandoned"):
  52. skip_goal_ids.add(goal.id)
  53. # 过滤消息
  54. result = []
  55. for msg in messages:
  56. if msg.goal_id is None:
  57. result.append(msg) # 无 goal 的消息始终保留
  58. elif msg.goal_id not in skip_goal_ids:
  59. result.append(msg) # 不在跳过列表中的消息保留
  60. return result
  61. def _get_focus_path(goal_tree: GoalTree) -> Set[str]:
  62. """获取焦点路径上的所有 goal IDs(焦点 + 父链 + 直接子节点)"""
  63. focus_ids: Set[str] = set()
  64. if not goal_tree.current_id:
  65. return focus_ids
  66. # 焦点自身
  67. focus_ids.add(goal_tree.current_id)
  68. # 父链
  69. goal = goal_tree.find(goal_tree.current_id)
  70. while goal and goal.parent_id:
  71. focus_ids.add(goal.parent_id)
  72. goal = goal_tree.find(goal.parent_id)
  73. # 直接子节点
  74. children = goal_tree.get_children(goal_tree.current_id)
  75. for child in children:
  76. focus_ids.add(child.id)
  77. return focus_ids
  78. # ===== Token 估算 =====
  79. def estimate_tokens(messages: List[Dict[str, Any]]) -> int:
  80. """
  81. 估算消息列表的 token 数量
  82. 简单估算:字符数 / 4。实际使用时应该用 tiktoken 或 API 返回的 token 数。
  83. """
  84. total_chars = 0
  85. for msg in messages:
  86. content = msg.get("content", "")
  87. if isinstance(content, str):
  88. total_chars += len(content)
  89. elif isinstance(content, list):
  90. for part in content:
  91. if isinstance(part, dict) and part.get("type") == "text":
  92. total_chars += len(part.get("text", ""))
  93. # tool_calls
  94. tool_calls = msg.get("tool_calls")
  95. if tool_calls and isinstance(tool_calls, list):
  96. for tc in tool_calls:
  97. if isinstance(tc, dict):
  98. func = tc.get("function", {})
  99. total_chars += len(func.get("name", ""))
  100. args = func.get("arguments", "")
  101. if isinstance(args, str):
  102. total_chars += len(args)
  103. return total_chars // 4
  104. def estimate_tokens_from_messages(messages: List[Message]) -> int:
  105. """从 Message 对象列表估算 token 数"""
  106. return estimate_tokens([msg.to_llm_dict() for msg in messages])
  107. def needs_level2_compression(
  108. token_count: int,
  109. config: CompressionConfig,
  110. ) -> bool:
  111. """判断是否需要触发 Level 2 压缩"""
  112. return token_count > config.max_tokens * config.threshold_ratio
  113. # ===== Level 2: 压缩 Prompt =====
  114. COMPRESSION_PROMPT = """请对以上对话历史进行压缩总结。
  115. 要求:
  116. 1. 保留关键决策、结论和产出(如创建的文件、修改的代码、得出的分析结论)
  117. 2. 保留重要的上下文(如用户的要求、约束条件、之前的讨论结果)
  118. 3. 省略中间探索过程、重复的工具调用细节
  119. 4. 使用结构化格式(标题 + 要点)
  120. 5. 控制在 2000 字以内
  121. 当前 GoalTree 状态(完整版,含 summary):
  122. {goal_tree_prompt}
  123. """
  124. REFLECT_PROMPT = """请回顾以上整个执行过程,提取有价值的经验教训。
  125. 关注以下方面:
  126. 1. **人工干预**:如果有用户中途修改了指令或纠正了方向,说明之前的决策哪里有问题
  127. 2. **弯路**:哪些尝试是不必要的,有没有更直接的方法
  128. 3. **好的决策**:哪些判断和选择是正确的,值得记住
  129. 4. **工具使用**:哪些工具用法是高效的,哪些可以改进
  130. 请以简洁的规则列表形式输出,每条规则格式为:
  131. - 当遇到 [条件] 时,应该 [动作](原因:[简短说明])
  132. """
  133. def build_compression_prompt(goal_tree: Optional[GoalTree]) -> str:
  134. """构建 Level 2 压缩 prompt"""
  135. goal_prompt = ""
  136. if goal_tree:
  137. goal_prompt = goal_tree.to_prompt(include_summary=True)
  138. return COMPRESSION_PROMPT.format(goal_tree_prompt=goal_prompt)
  139. def build_reflect_prompt() -> str:
  140. """构建反思 prompt"""
  141. return REFLECT_PROMPT