""" Context 压缩 — 两级压缩策略 Level 1: Goal 完成压缩(确定性,零 LLM 成本) - 对 completed/abandoned goals:保留 goal 工具消息,移除非 goal 工具消息 - 三种模式:none / on_complete / on_overflow Level 2: LLM 总结(仅在 Level 1 后仍超限时触发) - 通过侧分支多轮 agent 模式压缩 - 压缩后重建 history 为:system prompt + 第一条 user message + summary 压缩不修改存储:原始消息永远保留在 messages/,纯内存操作。 """ import copy import json import logging from dataclasses import dataclass from typing import List, Dict, Any, Optional, Set from .goal_models import GoalTree from .models import Message from agent.core.prompts import ( REFLECT_PROMPT, build_compression_eval_prompt, ) logger = logging.getLogger(__name__) # ===== 模型 Context Window(tokens)===== MODEL_CONTEXT_WINDOWS: Dict[str, int] = { # Anthropic Claude "claude-sonnet-4": 200_000, "claude-opus-4": 200_000, "claude-3-5-sonnet": 200_000, "claude-3-5-haiku": 200_000, "claude-3-opus": 200_000, "claude-3-sonnet": 200_000, "claude-3-haiku": 200_000, # OpenAI "gpt-4o": 128_000, "gpt-4o-mini": 128_000, "gpt-4-turbo": 128_000, "gpt-4": 8_192, "o1": 200_000, "o3-mini": 200_000, # Google Gemini "gemini-2.5-pro": 1_000_000, "gemini-2.5-flash": 1_000_000, "gemini-2.0-flash": 1_000_000, "gemini-1.5-pro": 2_000_000, "gemini-1.5-flash": 1_000_000, # DeepSeek "deepseek-chat": 64_000, "deepseek-r1": 64_000, } DEFAULT_CONTEXT_WINDOW = 200_000 def get_context_window(model: str) -> int: """ 根据模型名称获取 context window 大小。 支持带 provider 前缀的模型名(如 "anthropic/claude-sonnet-4.5")和 带版本后缀的名称(如 "claude-3-5-sonnet-20241022")。 """ # 去掉 provider 前缀 name = model.split("/")[-1].lower() # 精确匹配 if name in MODEL_CONTEXT_WINDOWS: return MODEL_CONTEXT_WINDOWS[name] # 前缀匹配(处理版本后缀) for key, window in MODEL_CONTEXT_WINDOWS.items(): if name.startswith(key): return window return DEFAULT_CONTEXT_WINDOW # ===== 配置 ===== @dataclass class CompressionConfig: """压缩配置""" max_tokens: int = 0 # 最大 token 数(0 = 自动:context_window * 0.5) threshold_ratio: float = 0.5 # 触发压缩的阈值 = context_window 的比例 keep_recent_messages: int = 10 # Level 1 中始终保留最近 N 条消息 max_messages: int = 50 # 最大消息数(超过此数量触发压缩,0 = 禁用) def get_max_tokens(self, model: str) -> int: """获取实际的 max_tokens(如果为 0 则自动计算)""" if self.max_tokens > 0: return self.max_tokens window = get_context_window(model) return int(window * self.threshold_ratio) # ===== Level 1: Goal 完成压缩 ===== def compress_completed_goals( messages: List[Message], goal_tree: Optional[GoalTree], ) -> List[Message]: """ Level 1 压缩:移除 completed/abandoned goals 的非 goal 工具消息 对每个 completed/abandoned goal: - 保留:所有调用 goal 工具的 assistant 消息及其 tool result - 移除:所有非 goal 工具的 assistant 消息及其 tool result - 替换:goal(done=...) 的 tool result 内容为 "具体执行过程已清理" - goal_id 为 None 的消息始终保留(system prompt、初始 user message) - pending / in_progress goals 的消息不受影响 纯内存操作,不修改原始 Message 对象,不涉及持久化。 Args: messages: 主路径上的有序消息列表(Message 对象) goal_tree: GoalTree 实例 Returns: 压缩后的消息列表 """ if not goal_tree or not goal_tree.goals: return messages # 收集 completed/abandoned goal IDs completed_ids: Set[str] = { g.id for g in goal_tree.goals if g.status in ("completed", "abandoned") } if not completed_ids: return messages # Pass 1: 扫描 assistant 消息,分类 tool_call_ids remove_seqs: Set[int] = set() # 要移除的 assistant 消息 sequence remove_tc_ids: Set[str] = set() # 要移除的 tool result 的 tool_call_id done_tc_ids: Set[str] = set() # goal(done=...) 的 tool_call_id(替换 tool result) for msg in messages: if msg.goal_id not in completed_ids: continue if msg.role != "assistant": continue content = msg.content tc_list = [] if isinstance(content, dict): tc_list = content.get("tool_calls", []) if not tc_list: # 纯文本 assistant 消息(无工具调用),移除 remove_seqs.add(msg.sequence) continue # 检查是否包含 goal 工具调用 has_goal_call = False for tc in tc_list: func_name = tc.get("function", {}).get("name", "") if func_name == "goal": has_goal_call = True # 检查是否为 done 调用 args_str = tc.get("function", {}).get("arguments", "{}") try: args = json.loads(args_str) if isinstance(args_str, str) else (args_str or {}) except json.JSONDecodeError: args = {} if args.get("done") is not None: tc_id = tc.get("id") if tc_id: done_tc_ids.add(tc_id) if not has_goal_call: # 不含 goal 工具调用 → 移除整条 assistant 及其所有 tool results remove_seqs.add(msg.sequence) for tc in tc_list: tc_id = tc.get("id") if tc_id: remove_tc_ids.add(tc_id) # 无需压缩 if not remove_seqs and not done_tc_ids: return messages # Pass 2: 构建结果 result: List[Message] = [] for msg in messages: # 跳过标记移除的 assistant 消息 if msg.sequence in remove_seqs: continue # 跳过标记移除的 tool result if msg.role == "tool" and msg.tool_call_id in remove_tc_ids: continue # 替换 done 的 tool result 内容 if msg.role == "tool" and msg.tool_call_id in done_tc_ids: modified = copy.copy(msg) modified.content = {"tool_name": "goal", "result": "具体执行过程已清理"} result.append(modified) continue result.append(msg) return result # ===== Token 估算 ===== def estimate_tokens(messages: List[Dict[str, Any]]) -> int: """ 估算消息列表的 token 数量 对 CJK 字符和 ASCII 字符使用不同的估算系数: - ASCII/Latin 字符:~4 字符 ≈ 1 token - CJK 字符(中日韩):~1 字符 ≈ 1.5 tokens(BPE tokenizer 特性) """ total_tokens = 0 for msg in messages: content = msg.get("content", "") if isinstance(content, str): total_tokens += _estimate_text_tokens(content) elif isinstance(content, list): for part in content: if isinstance(part, dict): if part.get("type") == "text": total_tokens += _estimate_text_tokens(part.get("text", "")) elif part.get("type") in ("image_url", "image"): total_tokens += _estimate_image_tokens(part) # tool_calls tool_calls = msg.get("tool_calls") if tool_calls and isinstance(tool_calls, list): for tc in tool_calls: if isinstance(tc, dict): func = tc.get("function", {}) total_tokens += len(func.get("name", "")) // 4 args = func.get("arguments", "") if isinstance(args, str): total_tokens += _estimate_text_tokens(args) return total_tokens def _estimate_text_tokens(text: str) -> int: """ 估算文本的 token 数,区分 CJK 和 ASCII 字符。 CJK 字符在 BPE tokenizer 中通常占 1.5-2 tokens, ASCII 字符约 4 个对应 1 token。 """ if not text: return 0 cjk_chars = 0 other_chars = 0 for ch in text: if _is_cjk(ch): cjk_chars += 1 else: other_chars += 1 # CJK: 1 char ≈ 1.5 tokens; ASCII: 4 chars ≈ 1 token return int(cjk_chars * 1.5) + other_chars // 4 def _estimate_image_tokens(block: Dict[str, Any]) -> int: """ 估算图片块的 token 消耗。 Anthropic 计算方式:tokens = (width * height) / 750 优先从 _image_meta 读取真实尺寸,其次从 base64 数据量粗估,最小 1600 tokens。 """ MIN_IMAGE_TOKENS = 1600 # 优先使用 _image_meta 中的真实尺寸 meta = block.get("_image_meta") if meta and meta.get("width") and meta.get("height"): tokens = (meta["width"] * meta["height"]) // 750 return max(MIN_IMAGE_TOKENS, tokens) # 回退:从 base64 数据长度粗估 b64_data = "" if block.get("type") == "image": source = block.get("source", {}) if source.get("type") == "base64": b64_data = source.get("data", "") elif block.get("type") == "image_url": url_obj = block.get("image_url", {}) url = url_obj.get("url", "") if isinstance(url_obj, dict) else str(url_obj) if url.startswith("data:"): _, _, b64_data = url.partition(",") if b64_data: # base64 编码后大小约为原始字节的 4/3 raw_bytes = len(b64_data) * 3 // 4 # 粗估:假设 JPEG 压缩率 ~10:1,像素数 ≈ raw_bytes * 10 / 3 (RGB) estimated_pixels = raw_bytes * 10 // 3 estimated_tokens = estimated_pixels // 750 return max(MIN_IMAGE_TOKENS, estimated_tokens) return MIN_IMAGE_TOKENS def _is_cjk(ch: str) -> bool: """判断字符是否为 CJK(中日韩)字符""" cp = ord(ch) return ( 0x2E80 <= cp <= 0x9FFF # CJK 基本区 + 部首 + 笔画 + 兼容 or 0xF900 <= cp <= 0xFAFF # CJK 兼容表意文字 or 0xFE30 <= cp <= 0xFE4F # CJK 兼容形式 or 0x20000 <= cp <= 0x2FA1F # CJK 扩展 B-F + 兼容补充 or 0x3000 <= cp <= 0x303F # CJK 标点符号 or 0xFF00 <= cp <= 0xFFEF # 全角字符 ) def estimate_tokens_from_messages(messages: List[Message]) -> int: """从 Message 对象列表估算 token 数""" return estimate_tokens([msg.to_llm_dict() for msg in messages]) def needs_level2_compression( token_count: int, config: CompressionConfig, model: str = "", ) -> bool: """判断是否需要触发 Level 2 压缩""" limit = config.get_max_tokens(model) if model else config.max_tokens return token_count > limit # ===== Level 2: 压缩 Prompt ===== # 注意:这些 prompt 已迁移到 agent.core.prompts # COMPRESSION_EVAL_PROMPT 和 REFLECT_PROMPT 现在从 prompts.py 导入 def build_compression_prompt(goal_tree: Optional[GoalTree]) -> str: """构建 Level 2 压缩 prompt""" goal_prompt = "" if goal_tree: goal_prompt = goal_tree.to_prompt(include_summary=True) return build_compression_eval_prompt( goal_tree_prompt=goal_prompt, ) def build_reflect_prompt() -> str: """构建反思 prompt""" return REFLECT_PROMPT