elksmmx 1 неделя назад
Родитель
Сommit
3a5ced184d

+ 117 - 0
agent/core/runner.py

@@ -64,6 +64,7 @@ class RunConfig:
     enable_memory: bool = True
     auto_execute_tools: bool = True
     name: Optional[str] = None                 # 显示名称(空则由 utility_llm 自动生成)
+    enable_prompt_caching: bool = True         # 启用 Anthropic Prompt Caching(仅 Claude 模型有效)
 
     # --- Trace 控制 ---
     trace_id: Optional[str] = None             # None = 新建
@@ -680,6 +681,13 @@ class AgentRunner:
                 if context_injection:
                     llm_messages.append({"role": "system", "content": context_injection})
 
+            # 应用 Prompt Caching(不修改原始 history,只在发送给 LLM 时添加缓存标记)
+            llm_messages = self._add_cache_control(
+                llm_messages,
+                config.model,
+                config.enable_prompt_caching
+            )
+
             # 调用 LLM
             result = await self.llm_call(
                 messages=llm_messages,
@@ -695,6 +703,8 @@ class AgentRunner:
             prompt_tokens = result.get("prompt_tokens", 0)
             completion_tokens = result.get("completion_tokens", 0)
             step_cost = result.get("cost", 0)
+            cache_creation_tokens = result.get("cache_creation_tokens")
+            cache_read_tokens = result.get("cache_read_tokens")
 
             # 按需自动创建 root goal
             if goal_tree and not goal_tree.goals and tool_calls:
@@ -729,6 +739,8 @@ class AgentRunner:
                 content={"text": response_content, "tool_calls": tool_calls},
                 prompt_tokens=prompt_tokens,
                 completion_tokens=completion_tokens,
+                cache_creation_tokens=cache_creation_tokens,
+                cache_read_tokens=cache_read_tokens,
                 finish_reason=finish_reason,
                 cost=step_cost,
             )
@@ -923,6 +935,13 @@ class AgentRunner:
             reflect_prompt = build_reflect_prompt()
             reflect_messages = list(history) + [{"role": "user", "content": reflect_prompt}]
 
+            # 应用 Prompt Caching
+            reflect_messages = self._add_cache_control(
+                reflect_messages,
+                config.model,
+                config.enable_prompt_caching
+            )
+
             reflect_result = await self.llm_call(
                 messages=reflect_messages,
                 model=config.model,
@@ -948,6 +967,13 @@ class AgentRunner:
         compress_prompt = build_compression_prompt(goal_tree)
         compress_messages = list(history) + [{"role": "user", "content": compress_prompt}]
 
+        # 应用 Prompt Caching
+        compress_messages = self._add_cache_control(
+            compress_messages,
+            config.model,
+            config.enable_prompt_caching
+        )
+
         compress_result = await self.llm_call(
             messages=compress_messages,
             model=config.model,
@@ -1288,6 +1314,97 @@ class AgentRunner:
 
     # ===== 辅助方法 =====
 
+    def _add_cache_control(
+        self,
+        messages: List[Dict],
+        model: str,
+        enable: bool
+    ) -> List[Dict]:
+        """
+        为支持的模型添加 Prompt Caching 标记
+
+        策略:
+        1. system message 添加缓存(如果存在且足够长)
+        2. 倒数第 3-5 条 user/assistant 消息添加缓存点
+
+        Args:
+            messages: 原始消息列表
+            model: 模型名称
+            enable: 是否启用缓存
+
+        Returns:
+            添加了 cache_control 的消息列表(深拷贝)
+        """
+        if not enable:
+            return messages
+
+        # 只对 Claude 模型启用
+        if "claude" not in model.lower():
+            return messages
+
+        # 深拷贝避免修改原始数据
+        import copy
+        messages = copy.deepcopy(messages)
+
+        # 策略 1: 为 system message 添加缓存
+        for msg in messages:
+            if msg.get("role") == "system":
+                content = msg.get("content", "")
+                # 只有足够长的 system prompt 才值得缓存(>1024 tokens 约 4000 字符)
+                if isinstance(content, str) and len(content) > 1000:
+                    # Anthropic API 格式:在 content 的最后一个 block 添加 cache_control
+                    # 如果 content 是 string,需要转换为 list 格式
+                    msg["content"] = [
+                        {
+                            "type": "text",
+                            "text": content,
+                            "cache_control": {"type": "ephemeral"}
+                        }
+                    ]
+                    logger.debug(f"[Cache] 为 system message 添加缓存标记 (len={len(content)})")
+                break
+
+        # 策略 2: 为倒数第 3-5 条消息添加缓存点
+        # 这样可以缓存大部分历史对话,只有最新的几条消息是新的
+        cache_positions = []
+        user_assistant_msgs = [
+            (i, msg) for i, msg in enumerate(messages)
+            if msg.get("role") in ("user", "assistant")
+        ]
+
+        if len(user_assistant_msgs) >= 5:
+            # 在倒数第 5 条添加缓存点
+            cache_positions.append(user_assistant_msgs[-5][0])
+        elif len(user_assistant_msgs) >= 3:
+            # 在倒数第 3 条添加缓存点
+            cache_positions.append(user_assistant_msgs[-3][0])
+
+        for idx in cache_positions:
+            msg = messages[idx]
+            content = msg.get("content", "")
+
+            # 处理 string content
+            if isinstance(content, str):
+                msg["content"] = [
+                    {
+                        "type": "text",
+                        "text": content,
+                        "cache_control": {"type": "ephemeral"}
+                    }
+                ]
+                logger.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 添加缓存标记")
+
+            # 处理 list content(多模态消息)
+            elif isinstance(content, list) and len(content) > 0:
+                # 在最后一个 text block 添加 cache_control
+                for i in range(len(content) - 1, -1, -1):
+                    if isinstance(content[i], dict) and content[i].get("type") == "text":
+                        content[i]["cache_control"] = {"type": "ephemeral"}
+                        logger.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 的 content[{i}] 添加缓存标记")
+                        break
+
+        return messages
+
     def _get_tool_schemas(self, tools: Optional[List[str]]) -> List[Dict]:
         """
         获取工具 Schema

+ 20 - 2
agent/llm/openrouter.py

@@ -60,11 +60,20 @@ def _parse_openrouter_usage(usage: Dict[str, Any], model: str) -> TokenUsage:
     # OpenRouter 通常返回 OpenAI 格式,但可能包含额外字段
     if provider == "anthropic":
         # Claude 模型可能有缓存字段
+        # OpenRouter 使用 prompt_tokens_details 嵌套结构
+        prompt_details = usage.get("prompt_tokens_details", {})
+
+        # 调试:打印原始 usage
+        if logger.isEnabledFor(logging.DEBUG):
+            logger.debug(f"[OpenRouter] Raw usage: {usage}")
+            logger.debug(f"[OpenRouter] prompt_tokens_details: {prompt_details}")
+
         return TokenUsage(
             input_tokens=usage.get("prompt_tokens") or usage.get("input_tokens", 0),
             output_tokens=usage.get("completion_tokens") or usage.get("output_tokens", 0),
-            cache_creation_tokens=usage.get("cache_creation_input_tokens", 0),
-            cache_read_tokens=usage.get("cache_read_input_tokens", 0),
+            # OpenRouter 格式:prompt_tokens_details.cached_tokens / cache_write_tokens
+            cache_read_tokens=prompt_details.get("cached_tokens", 0),
+            cache_creation_tokens=prompt_details.get("cache_write_tokens", 0),
         )
     elif provider == "deepseek":
         # DeepSeek 可能有 reasoning_tokens
@@ -180,6 +189,15 @@ async def openrouter_llm_call(
     if "max_tokens" in kwargs:
         payload["max_tokens"] = kwargs["max_tokens"]
 
+    # 对于 Anthropic 模型,锁定 provider 以确保缓存生效
+    if "anthropic" in model.lower() or "claude" in model.lower():
+        payload["provider"] = {
+            "only": ["Anthropic"],
+            "allow_fallbacks": False,
+            "require_parameters": True
+        }
+        logger.debug("[OpenRouter] Locked provider to Anthropic for caching support")
+
     # OpenRouter 特定参数
     headers = {
         "Authorization": f"Bearer {api_key}",

+ 3 - 3
agent/trace/models.py

@@ -405,11 +405,11 @@ class Message:
         # 只添加非空的可选字段
         if self.abandoned_at:
             result["abandoned_at"] = self.abandoned_at.isoformat()
-        if self.reasoning_tokens:
+        if self.reasoning_tokens is not None:
             result["reasoning_tokens"] = self.reasoning_tokens
-        if self.cache_creation_tokens:
+        if self.cache_creation_tokens is not None:
             result["cache_creation_tokens"] = self.cache_creation_tokens
-        if self.cache_read_tokens:
+        if self.cache_read_tokens is not None:
             result["cache_read_tokens"] = self.cache_read_tokens
         return result
 

+ 128 - 0
examples/test_cache/run.py

@@ -0,0 +1,128 @@
+"""
+测试 Prompt Caching 功能
+"""
+
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+# 添加项目根目录到 Python 路径
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+import logging
+# 开启 DEBUG 日志查看缓存标记
+logging.basicConfig(level=logging.DEBUG)
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_openrouter_llm_call
+
+async def main():
+    print("=" * 60)
+    print("测试 Prompt Caching 功能")
+    print("=" * 60)
+    print()
+
+    # 路径配置
+    base_dir = Path(__file__).parent
+    project_root = base_dir.parent.parent
+    trace_dir = project_root / ".trace"
+
+    # 创建 Runner
+    runner = AgentRunner(
+        trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
+        llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
+        debug=True
+    )
+
+    # 准备测试消息(足够长的 system prompt)
+    system_prompt = """你是一个专业的 AI 助手。
+
+## 核心能力
+- 代码分析和生成
+- 问题解决和调试
+- 技术文档编写
+- 架构设计建议
+
+## 工作原则
+1. 准确性优先:确保提供的信息和代码是正确的
+2. 清晰表达:用简洁明了的语言解释复杂概念
+3. 实用导向:提供可直接使用的解决方案
+4. 持续学习:根据反馈不断改进
+
+## 技术栈
+- Python, JavaScript, TypeScript
+- React, Vue, Node.js
+- Docker, Kubernetes
+- PostgreSQL, MongoDB, Redis
+- AWS, GCP, Azure
+
+这是一个足够长的 system prompt,用于测试 Anthropic Prompt Caching 功能。
+缓存需要至少 1024 tokens 才能生效,所以我们需要让这个 prompt 足够长。
+""" * 3  # 重复 3 次确保足够长
+
+    messages = [
+        {"role": "user", "content": "请简单介绍一下 Python 的特点,用 3 句话概括"}
+    ]
+
+    print("第一次调用(创建缓存)...")
+    print("-" * 60)
+
+    trace_id = None
+    iteration = 0
+
+    async for item in runner.run(
+        messages=messages,
+        config=RunConfig(
+            system_prompt=system_prompt,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=3,
+            enable_prompt_caching=True,  # 启用缓存
+            name="缓存测试"
+        )
+    ):
+        if isinstance(item, Trace):
+            trace_id = item.trace_id
+            if item.status == "completed":
+                print(f"\n✓ Trace 完成")
+                print(f"  Total tokens: {item.total_tokens}")
+                print(f"  Total cost: ${item.total_cost:.6f}")
+
+        elif isinstance(item, Message):
+            if item.role == "assistant":
+                iteration += 1
+                print(f"\n[Iteration {iteration}]")
+                print(f"  Prompt tokens: {item.prompt_tokens}")
+                print(f"  Completion tokens: {item.completion_tokens}")
+                print(f"  Cache creation: {item.cache_creation_tokens}")
+                print(f"  Cache read: {item.cache_read_tokens}")
+                print(f"  Cost: ${item.cost:.6f}")
+
+                content = item.content
+                if isinstance(content, dict):
+                    text = content.get("text", "")
+                    if text:
+                        preview = text[:100] + "..." if len(text) > 100 else text
+                        print(f"  Response: {preview}")
+
+    print()
+    print("=" * 60)
+    print("测试完成")
+    print("=" * 60)
+    print()
+
+    if trace_id:
+        print("验证要点:")
+        print("1. 第一次调用应该有 cache_creation_tokens > 0")
+        print("2. 后续调用应该有 cache_read_tokens > 0")
+        print("3. cache_read_tokens 的成本应该是正常 input tokens 的 10%")
+        print()
+        print(f"Trace ID: {trace_id}")
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 138 - 0
examples/test_cache/run_multi.py

@@ -0,0 +1,138 @@
+"""
+测试多轮对话的 Prompt Caching
+"""
+
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_openrouter_llm_call
+
+async def main():
+    print("=" * 60)
+    print("测试多轮对话 Prompt Caching")
+    print("=" * 60)
+    print()
+
+    base_dir = Path(__file__).parent
+    project_root = base_dir.parent.parent
+    trace_dir = project_root / ".trace"
+
+    runner = AgentRunner(
+        trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
+        llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
+        debug=True
+    )
+
+    # 超长 system prompt 确保 >1024 tokens
+    system_prompt = """你是一个专业的 AI 助手,专注于帮助用户解决技术问题。
+
+## 核心能力
+- 代码分析和生成
+- 问题解决和调试
+- 技术文档编写
+- 架构设计建议
+- 性能优化建议
+- 安全审计
+
+## 工作原则
+1. 准确性优先:确保提供的信息和代码是正确的
+2. 清晰表达:用简洁明了的语言解释复杂概念
+3. 实用导向:提供可直接使用的解决方案
+4. 持续学习:根据反馈不断改进
+5. 安全意识:始终考虑安全性和最佳实践
+6. 性能考虑:提供高效的解决方案
+
+## 技术栈
+- 编程语言:Python, JavaScript, TypeScript, Go, Rust, Java
+- 前端框架:React, Vue, Angular, Svelte
+- 后端框架:Node.js, Django, Flask, FastAPI, Spring Boot
+- 数据库:PostgreSQL, MongoDB, Redis, MySQL, Elasticsearch
+- 云平台:AWS, GCP, Azure
+- DevOps:Docker, Kubernetes, CI/CD, Terraform
+- 机器学习:TensorFlow, PyTorch, scikit-learn
+
+## 响应格式
+- 提供清晰的步骤说明
+- 包含代码示例
+- 解释关键概念
+- 指出潜在问题
+- 给出最佳实践建议
+
+这是一个足够长的 system prompt,用于测试 Anthropic Prompt Caching 功能。
+缓存需要至少 1024 tokens 才能生效,所以我们需要让这个 prompt 足够长。
+""" * 5  # 重复 5 次确保足够长
+
+    messages = [
+        {"role": "user", "content": "请用一句话介绍 Python"}
+    ]
+
+    print("开始多轮对话测试...")
+    print("-" * 60)
+
+    trace_id = None
+    iteration = 0
+
+    async for item in runner.run(
+        messages=messages,
+        config=RunConfig(
+            system_prompt=system_prompt,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=5,  # 多轮对话
+            enable_prompt_caching=True,
+            name="多轮缓存测试"
+        )
+    ):
+        if isinstance(item, Trace):
+            trace_id = item.trace_id
+            if item.status == "completed":
+                print(f"\n✓ Trace 完成")
+                print(f"  Total messages: {item.total_messages}")
+                print(f"  Total tokens: {item.total_tokens}")
+                print(f"  Total cache creation: {item.total_cache_creation_tokens}")
+                print(f"  Total cache read: {item.total_cache_read_tokens}")
+                print(f"  Total cost: ${item.total_cost:.6f}")
+
+        elif isinstance(item, Message):
+            if item.role == "assistant":
+                iteration += 1
+                print(f"\n[Iteration {iteration}]")
+                print(f"  Prompt tokens: {item.prompt_tokens}")
+                print(f"  Completion tokens: {item.completion_tokens}")
+                print(f"  Cache creation: {item.cache_creation_tokens}")
+                print(f"  Cache read: {item.cache_read_tokens}")
+                print(f"  Cost: ${item.cost:.6f}")
+
+                content = item.content
+                if isinstance(content, dict):
+                    text = content.get("text", "")
+                    tool_calls = content.get("tool_calls")
+                    if text and not tool_calls:
+                        preview = text[:80] + "..." if len(text) > 80 else text
+                        print(f"  Response: {preview}")
+                    if tool_calls:
+                        print(f"  Tool calls: {len(tool_calls)}")
+
+    print()
+    print("=" * 60)
+    print("测试完成")
+    print("=" * 60)
+    print()
+
+    if trace_id:
+        print("分析:")
+        print("- 第 1 次调用:应该有 cache_creation_tokens > 0(创建缓存)")
+        print("- 第 2+ 次调用:应该有 cache_read_tokens > 0(命中缓存)")
+        print(f"\nTrace ID: {trace_id}")
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 259 - 0
examples/test_cache/run_same_trace.py

@@ -0,0 +1,259 @@
+"""
+在同一个 Trace 内测试 Prompt Caching
+
+测试场景:
+1. 第一轮对话:创建缓存(system prompt + 工具定义)
+2. 第二轮对话:命中缓存(system prompt + 工具定义 + 第一轮历史)
+3. 第三轮对话:命中更多缓存(system prompt + 工具定义 + 前两轮历史)
+"""
+
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_openrouter_llm_call
+
+async def main():
+    print("=" * 60)
+    print("同一 Trace 内的 Prompt Caching 测试")
+    print("=" * 60)
+    print()
+
+    base_dir = Path(__file__).parent
+    project_root = base_dir.parent.parent
+    trace_dir = project_root / ".trace"
+
+    runner = AgentRunner(
+        trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
+        llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
+        debug=True
+    )
+
+    # 构造 >1500 tokens 的稳定前缀
+    stable_prefix = """你是一个专业的 AI 技术顾问,专注于软件工程和系统架构。
+
+## 核心专业领域
+
+### 1. 编程语言与框架
+- **Python**: Django, Flask, FastAPI, Celery, SQLAlchemy, Pandas, NumPy
+- **JavaScript/TypeScript**: React, Vue, Angular, Node.js, Express, NestJS
+- **Go**: Gin, Echo, gRPC, Cobra
+- **Rust**: Actix, Rocket, Tokio
+- **Java**: Spring Boot, Hibernate, Maven, Gradle
+
+### 2. 数据库技术
+- **关系型数据库**: PostgreSQL, MySQL, Oracle, SQL Server
+- **NoSQL 数据库**: MongoDB, Redis, Cassandra, DynamoDB
+- **时序数据库**: InfluxDB, TimescaleDB
+- **图数据库**: Neo4j, ArangoDB
+- **搜索引擎**: Elasticsearch, Solr
+
+### 3. 云平台与基础设施
+- **AWS**: EC2, S3, Lambda, RDS, DynamoDB, CloudFormation, ECS, EKS
+- **GCP**: Compute Engine, Cloud Storage, Cloud Functions, BigQuery, GKE
+- **Azure**: Virtual Machines, Blob Storage, Functions, Cosmos DB, AKS
+- **容器化**: Docker, Docker Compose, Podman
+- **编排**: Kubernetes, Helm, Istio, Linkerd
+
+### 4. DevOps 与 CI/CD
+- **版本控制**: Git, GitHub, GitLab, Bitbucket
+- **CI/CD**: Jenkins, GitLab CI, GitHub Actions, CircleCI, Travis CI
+- **配置管理**: Ansible, Terraform, Puppet, Chef
+- **监控告警**: Prometheus, Grafana, ELK Stack, Datadog, New Relic
+- **日志管理**: Fluentd, Logstash, Loki
+
+### 5. 架构模式
+- **微服务架构**: 服务拆分、API 网关、服务发现、熔断降级
+- **事件驱动架构**: 消息队列、事件溯源、CQRS
+- **Serverless 架构**: FaaS、BaaS、无服务器框架
+- **分布式系统**: CAP 理论、一致性协议、分布式事务
+- **高可用设计**: 负载均衡、故障转移、灾备恢复
+
+### 6. 安全最佳实践
+- **认证授权**: OAuth 2.0, JWT, SAML, OpenID Connect
+- **加密技术**: TLS/SSL, AES, RSA, 哈希算法
+- **安全审计**: 漏洞扫描、渗透测试、安全合规
+- **数据保护**: 数据脱敏、访问控制、审计日志
+
+### 7. 性能优化
+- **缓存策略**: Redis, Memcached, CDN, 浏览器缓存
+- **数据库优化**: 索引设计、查询优化、分库分表
+- **代码优化**: 算法复杂度、并发编程、异步处理
+- **系统调优**: 负载测试、性能分析、资源监控
+
+### 8. 机器学习与 AI
+- **深度学习框架**: TensorFlow, PyTorch, Keras
+- **模型部署**: TensorFlow Serving, TorchServe, ONNX
+- **MLOps**: MLflow, Kubeflow, SageMaker
+- **自然语言处理**: Transformers, BERT, GPT, LangChain
+
+## 工作原则
+
+1. **准确性优先**: 提供经过验证的技术方案,避免误导
+2. **实用导向**: 给出可直接应用的代码示例和配置
+3. **最佳实践**: 遵循行业标准和社区共识
+4. **安全意识**: 始终考虑安全性和隐私保护
+5. **性能考虑**: 关注系统性能和资源效率
+6. **可维护性**: 代码清晰、文档完善、易于扩展
+7. **成本意识**: 平衡技术方案与成本投入
+
+## 响应格式
+
+### 问题分析
+- 理解用户需求和上下文
+- 识别关键技术挑战
+- 评估可行性和风险
+
+### 解决方案
+- 提供清晰的实现步骤
+- 包含完整的代码示例
+- 解释关键技术点
+- 指出潜在问题和注意事项
+
+### 最佳实践建议
+- 性能优化建议
+- 安全加固措施
+- 可扩展性考虑
+- 运维监控方案
+
+### 替代方案
+- 列出其他可行方案
+- 对比优缺点
+- 给出选择建议
+
+## 技术栈版本参考
+
+- Python: 3.11+
+- Node.js: 20 LTS
+- PostgreSQL: 15+
+- Redis: 7+
+- Kubernetes: 1.28+
+- Docker: 24+
+
+这是一个足够长且稳定的 system prompt,用于测试 Anthropic Prompt Caching。
+此内容在所有请求中保持完全一致,以确保缓存能够命中。
+Version: 3.0
+""" * 2  # 重复 2 次,确保 >1500 tokens
+
+    print(f"System prompt 长度: {len(stable_prefix)} 字符")
+    print(f"预估 tokens: ~{len(stable_prefix) // 4}")
+    print()
+
+    trace_id = None
+
+    # 第一轮对话
+    print("=" * 60)
+    print("第 1 轮对话:创建缓存")
+    print("=" * 60)
+
+    async for item in runner.run(
+        messages=[{"role": "user", "content": "请用一句话介绍 Python"}],
+        config=RunConfig(
+            system_prompt=stable_prefix,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=1,
+            enable_prompt_caching=True,
+            name="同一Trace缓存测试"
+        )
+    ):
+        if isinstance(item, Trace):
+            trace_id = item.trace_id
+            if item.status == "completed":
+                print(f"\n✓ 第 1 轮完成")
+                print(f"  Total tokens: {item.total_tokens}")
+                print(f"  Cache write: {item.total_cache_creation_tokens}")
+                print(f"  Cache read: {item.total_cache_read_tokens}")
+                print(f"  Cost: ${item.total_cost:.6f}")
+        elif isinstance(item, Message) and item.role == "assistant":
+            print(f"\n[Response] {item.content.get('text', '')[:100]}...")
+            print(f"  Prompt tokens: {item.prompt_tokens}")
+            print(f"  Cache write: {item.cache_creation_tokens}")
+            print(f"  Cache read: {item.cache_read_tokens}")
+
+    print("\n等待 2 秒...")
+    await asyncio.sleep(2)
+
+    # 第二轮对话(续跑同一个 trace)
+    print("\n" + "=" * 60)
+    print("第 2 轮对话:应该命中缓存(system + 第1轮历史)")
+    print("=" * 60)
+
+    async for item in runner.run(
+        messages=[{"role": "user", "content": "请用一句话介绍 JavaScript"}],
+        config=RunConfig(
+            trace_id=trace_id,  # 续跑同一个 trace
+            system_prompt=stable_prefix,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=1,
+            enable_prompt_caching=True,
+        )
+    ):
+        if isinstance(item, Trace) and item.status == "completed":
+            print(f"\n✓ 第 2 轮完成")
+            print(f"  Total tokens: {item.total_tokens}")
+            print(f"  Cache write: {item.total_cache_creation_tokens}")
+            print(f"  Cache read: {item.total_cache_read_tokens}")
+            print(f"  Cost: ${item.total_cost:.6f}")
+        elif isinstance(item, Message) and item.role == "assistant":
+            print(f"\n[Response] {item.content.get('text', '')[:100]}...")
+            print(f"  Prompt tokens: {item.prompt_tokens}")
+            print(f"  Cache write: {item.cache_creation_tokens}")
+            print(f"  Cache read: {item.cache_read_tokens}")
+
+    print("\n等待 2 秒...")
+    await asyncio.sleep(2)
+
+    # 第三轮对话(续跑同一个 trace)
+    print("\n" + "=" * 60)
+    print("第 3 轮对话:应该命中更多缓存(system + 前2轮历史)")
+    print("=" * 60)
+
+    async for item in runner.run(
+        messages=[{"role": "user", "content": "请用一句话介绍 Go"}],
+        config=RunConfig(
+            trace_id=trace_id,  # 续跑同一个 trace
+            system_prompt=stable_prefix,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=1,
+            enable_prompt_caching=True,
+        )
+    ):
+        if isinstance(item, Trace) and item.status == "completed":
+            print(f"\n✓ 第 3 轮完成")
+            print(f"  Total tokens: {item.total_tokens}")
+            print(f"  Cache write: {item.total_cache_creation_tokens}")
+            print(f"  Cache read: {item.total_cache_read_tokens}")
+            print(f"  Cost: ${item.total_cost:.6f}")
+        elif isinstance(item, Message) and item.role == "assistant":
+            print(f"\n[Response] {item.content.get('text', '')[:100]}...")
+            print(f"  Prompt tokens: {item.prompt_tokens}")
+            print(f"  Cache write: {item.cache_creation_tokens}")
+            print(f"  Cache read: {item.cache_read_tokens}")
+
+    print("\n" + "=" * 60)
+    print("测试完成")
+    print("=" * 60)
+    print()
+    print("预期结果:")
+    print("- 第 1 轮:cache_write > 0(创建缓存)")
+    print("- 第 2 轮:cache_read > 0(命中 system prompt 缓存)")
+    print("- 第 3 轮:cache_read 更大(命中 system + 历史消息缓存)")
+    print()
+    print(f"Trace ID: {trace_id}")
+
+if __name__ == "__main__":
+    asyncio.run(main())

+ 244 - 0
examples/test_cache/run_strict.py

@@ -0,0 +1,244 @@
+"""
+严格的 Prompt Caching 验证测试
+
+按照 OpenRouter + Anthropic 的规范:
+1. 使用 prompt_tokens_details.cached_tokens / cache_write_tokens
+2. 锁定 provider 为 Anthropic
+3. 使用 >1500 tokens 的稳定前缀
+4. 在 5 分钟内多次请求
+"""
+
+import asyncio
+import os
+import sys
+from pathlib import Path
+import time
+
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from dotenv import load_dotenv
+load_dotenv()
+
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from agent.core.runner import AgentRunner, RunConfig
+from agent.trace import FileSystemTraceStore, Trace, Message
+from agent.llm import create_openrouter_llm_call
+
+async def main():
+    print("=" * 60)
+    print("严格的 Prompt Caching 验证测试")
+    print("=" * 60)
+    print()
+
+    base_dir = Path(__file__).parent
+    project_root = base_dir.parent.parent
+    trace_dir = project_root / ".trace"
+
+    runner = AgentRunner(
+        trace_store=FileSystemTraceStore(base_path=str(trace_dir)),
+        llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
+        debug=True
+    )
+
+    # 构造 >1500 tokens 的稳定前缀(约 6000 字符)
+    # 这段内容在所有请求中完全不变
+    stable_prefix = """你是一个专业的 AI 技术顾问,专注于软件工程和系统架构。
+
+## 核心专业领域
+
+### 1. 编程语言与框架
+- **Python**: Django, Flask, FastAPI, Celery, SQLAlchemy, Pandas, NumPy
+- **JavaScript/TypeScript**: React, Vue, Angular, Node.js, Express, NestJS
+- **Go**: Gin, Echo, gRPC, Cobra
+- **Rust**: Actix, Rocket, Tokio
+- **Java**: Spring Boot, Hibernate, Maven, Gradle
+
+### 2. 数据库技术
+- **关系型数据库**: PostgreSQL, MySQL, Oracle, SQL Server
+- **NoSQL 数据库**: MongoDB, Redis, Cassandra, DynamoDB
+- **时序数据库**: InfluxDB, TimescaleDB
+- **图数据库**: Neo4j, ArangoDB
+- **搜索引擎**: Elasticsearch, Solr
+
+### 3. 云平台与基础设施
+- **AWS**: EC2, S3, Lambda, RDS, DynamoDB, CloudFormation, ECS, EKS
+- **GCP**: Compute Engine, Cloud Storage, Cloud Functions, BigQuery, GKE
+- **Azure**: Virtual Machines, Blob Storage, Functions, Cosmos DB, AKS
+- **容器化**: Docker, Docker Compose, Podman
+- **编排**: Kubernetes, Helm, Istio, Linkerd
+
+### 4. DevOps 与 CI/CD
+- **版本控制**: Git, GitHub, GitLab, Bitbucket
+- **CI/CD**: Jenkins, GitLab CI, GitHub Actions, CircleCI, Travis CI
+- **配置管理**: Ansible, Terraform, Puppet, Chef
+- **监控告警**: Prometheus, Grafana, ELK Stack, Datadog, New Relic
+- **日志管理**: Fluentd, Logstash, Loki
+
+### 5. 架构模式
+- **微服务架构**: 服务拆分、API 网关、服务发现、熔断降级
+- **事件驱动架构**: 消息队列、事件溯源、CQRS
+- **Serverless 架构**: FaaS、BaaS、无服务器框架
+- **分布式系统**: CAP 理论、一致性协议、分布式事务
+- **高可用设计**: 负载均衡、故障转移、灾备恢复
+
+### 6. 安全最佳实践
+- **认证授权**: OAuth 2.0, JWT, SAML, OpenID Connect
+- **加密技术**: TLS/SSL, AES, RSA, 哈希算法
+- **安全审计**: 漏洞扫描、渗透测试、安全合规
+- **数据保护**: 数据脱敏、访问控制、审计日志
+
+### 7. 性能优化
+- **缓存策略**: Redis, Memcached, CDN, 浏览器缓存
+- **数据库优化**: 索引设计、查询优化、分库分表
+- **代码优化**: 算法复杂度、并发编程、异步处理
+- **系统调优**: 负载测试、性能分析、资源监控
+
+### 8. 机器学习与 AI
+- **深度学习框架**: TensorFlow, PyTorch, Keras
+- **模型部署**: TensorFlow Serving, TorchServe, ONNX
+- **MLOps**: MLflow, Kubeflow, SageMaker
+- **自然语言处理**: Transformers, BERT, GPT, LangChain
+
+## 工作原则
+
+1. **准确性优先**: 提供经过验证的技术方案,避免误导
+2. **实用导向**: 给出可直接应用的代码示例和配置
+3. **最佳实践**: 遵循行业标准和社区共识
+4. **安全意识**: 始终考虑安全性和隐私保护
+5. **性能考虑**: 关注系统性能和资源效率
+6. **可维护性**: 代码清晰、文档完善、易于扩展
+7. **成本意识**: 平衡技术方案与成本投入
+
+## 响应格式
+
+### 问题分析
+- 理解用户需求和上下文
+- 识别关键技术挑战
+- 评估可行性和风险
+
+### 解决方案
+- 提供清晰的实现步骤
+- 包含完整的代码示例
+- 解释关键技术点
+- 指出潜在问题和注意事项
+
+### 最佳实践建议
+- 性能优化建议
+- 安全加固措施
+- 可扩展性考虑
+- 运维监控方案
+
+### 替代方案
+- 列出其他可行方案
+- 对比优缺点
+- 给出选择建议
+
+## 技术栈版本参考
+
+- Python: 3.11+
+- Node.js: 20 LTS
+- PostgreSQL: 15+
+- Redis: 7+
+- Kubernetes: 1.28+
+- Docker: 24+
+
+这是一个足够长且稳定的 system prompt,用于测试 Anthropic Prompt Caching。
+此内容在所有请求中保持完全一致,以确保缓存能够命中。
+Version: 2.0
+""" * 2  # 重复 2 次,确保 >1500 tokens
+
+    print(f"System prompt 长度: {len(stable_prefix)} 字符")
+    print(f"预估 tokens: ~{len(stable_prefix) // 4}")
+    print()
+
+    # 第一次请求:创建缓存
+    print("=" * 60)
+    print("第 1 次请求:创建缓存")
+    print("=" * 60)
+
+    messages1 = [
+        {"role": "user", "content": "请用一句话介绍 Python"}
+    ]
+
+    trace_id_1 = None
+    async for item in runner.run(
+        messages=messages1,
+        config=RunConfig(
+            system_prompt=stable_prefix,
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=1,
+            enable_prompt_caching=True,
+            name="缓存测试-第1次"
+        )
+    ):
+        if isinstance(item, Trace):
+            trace_id_1 = item.trace_id
+            if item.status == "completed":
+                print(f"\n✓ 第 1 次完成")
+                print(f"  Total tokens: {item.total_tokens}")
+                print(f"  Cache write: {item.total_cache_creation_tokens}")
+                print(f"  Cache read: {item.total_cache_read_tokens}")
+                print(f"  Cost: ${item.total_cost:.6f}")
+
+        elif isinstance(item, Message) and item.role == "assistant":
+            print(f"\n[Response]")
+            print(f"  Prompt tokens: {item.prompt_tokens}")
+            print(f"  Cache write: {item.cache_creation_tokens}")
+            print(f"  Cache read: {item.cache_read_tokens}")
+
+    # 等待 2 秒,确保缓存已生效
+    print("\n等待 2 秒...")
+    await asyncio.sleep(2)
+
+    # 第二次请求:应该命中缓存
+    print("\n" + "=" * 60)
+    print("第 2 次请求:应该命中缓存")
+    print("=" * 60)
+
+    messages2 = [
+        {"role": "user", "content": "请用一句话介绍 JavaScript"}
+    ]
+
+    trace_id_2 = None
+    async for item in runner.run(
+        messages=messages2,
+        config=RunConfig(
+            system_prompt=stable_prefix,  # 完全相同的 system prompt
+            model="anthropic/claude-sonnet-4.5",
+            temperature=0.3,
+            max_iterations=1,
+            enable_prompt_caching=True,
+            name="缓存测试-第2次"
+        )
+    ):
+        if isinstance(item, Trace):
+            trace_id_2 = item.trace_id
+            if item.status == "completed":
+                print(f"\n✓ 第 2 次完成")
+                print(f"  Total tokens: {item.total_tokens}")
+                print(f"  Cache write: {item.total_cache_creation_tokens}")
+                print(f"  Cache read: {item.total_cache_read_tokens}")
+                print(f"  Cost: ${item.total_cost:.6f}")
+
+        elif isinstance(item, Message) and item.role == "assistant":
+            print(f"\n[Response]")
+            print(f"  Prompt tokens: {item.prompt_tokens}")
+            print(f"  Cache write: {item.cache_creation_tokens}")
+            print(f"  Cache read: {item.cache_read_tokens}")
+
+    print("\n" + "=" * 60)
+    print("测试完成")
+    print("=" * 60)
+    print()
+    print("预期结果:")
+    print("- 第 1 次:cache_write_tokens > 0(创建缓存)")
+    print("- 第 2 次:cached_tokens > 0(命中缓存)")
+    print()
+    print(f"Trace 1: {trace_id_1}")
+    print(f"Trace 2: {trace_id_2}")
+
+if __name__ == "__main__":
+    asyncio.run(main())