""" 在同一个 Trace 内测试 Prompt Caching 测试场景: 1. 第一轮对话:创建缓存(system prompt + 工具定义) 2. 第二轮对话:命中缓存(system prompt + 工具定义 + 第一轮历史) 3. 第三轮对话:命中更多缓存(system prompt + 工具定义 + 前两轮历史) """ import asyncio import os import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from dotenv import load_dotenv load_dotenv() import logging logging.basicConfig(level=logging.DEBUG) from agent.core.runner import AgentRunner, RunConfig from agent.trace import FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call async def main(): print("=" * 60) print("同一 Trace 内的 Prompt Caching 测试") print("=" * 60) print() base_dir = Path(__file__).parent project_root = base_dir.parent.parent trace_dir = project_root / ".trace" runner = AgentRunner( trace_store=FileSystemTraceStore(base_path=str(trace_dir)), llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"), debug=True ) # 构造 >1500 tokens 的稳定前缀 stable_prefix = """你是一个专业的 AI 技术顾问,专注于软件工程和系统架构。 ## 核心专业领域 ### 1. 编程语言与框架 - **Python**: Django, Flask, FastAPI, Celery, SQLAlchemy, Pandas, NumPy - **JavaScript/TypeScript**: React, Vue, Angular, Node.js, Express, NestJS - **Go**: Gin, Echo, gRPC, Cobra - **Rust**: Actix, Rocket, Tokio - **Java**: Spring Boot, Hibernate, Maven, Gradle ### 2. 数据库技术 - **关系型数据库**: PostgreSQL, MySQL, Oracle, SQL Server - **NoSQL 数据库**: MongoDB, Redis, Cassandra, DynamoDB - **时序数据库**: InfluxDB, TimescaleDB - **图数据库**: Neo4j, ArangoDB - **搜索引擎**: Elasticsearch, Solr ### 3. 云平台与基础设施 - **AWS**: EC2, S3, Lambda, RDS, DynamoDB, CloudFormation, ECS, EKS - **GCP**: Compute Engine, Cloud Storage, Cloud Functions, BigQuery, GKE - **Azure**: Virtual Machines, Blob Storage, Functions, Cosmos DB, AKS - **容器化**: Docker, Docker Compose, Podman - **编排**: Kubernetes, Helm, Istio, Linkerd ### 4. DevOps 与 CI/CD - **版本控制**: Git, GitHub, GitLab, Bitbucket - **CI/CD**: Jenkins, GitLab CI, GitHub Actions, CircleCI, Travis CI - **配置管理**: Ansible, Terraform, Puppet, Chef - **监控告警**: Prometheus, Grafana, ELK Stack, Datadog, New Relic - **日志管理**: Fluentd, Logstash, Loki ### 5. 架构模式 - **微服务架构**: 服务拆分、API 网关、服务发现、熔断降级 - **事件驱动架构**: 消息队列、事件溯源、CQRS - **Serverless 架构**: FaaS、BaaS、无服务器框架 - **分布式系统**: CAP 理论、一致性协议、分布式事务 - **高可用设计**: 负载均衡、故障转移、灾备恢复 ### 6. 安全最佳实践 - **认证授权**: OAuth 2.0, JWT, SAML, OpenID Connect - **加密技术**: TLS/SSL, AES, RSA, 哈希算法 - **安全审计**: 漏洞扫描、渗透测试、安全合规 - **数据保护**: 数据脱敏、访问控制、审计日志 ### 7. 性能优化 - **缓存策略**: Redis, Memcached, CDN, 浏览器缓存 - **数据库优化**: 索引设计、查询优化、分库分表 - **代码优化**: 算法复杂度、并发编程、异步处理 - **系统调优**: 负载测试、性能分析、资源监控 ### 8. 机器学习与 AI - **深度学习框架**: TensorFlow, PyTorch, Keras - **模型部署**: TensorFlow Serving, TorchServe, ONNX - **MLOps**: MLflow, Kubeflow, SageMaker - **自然语言处理**: Transformers, BERT, GPT, LangChain ## 工作原则 1. **准确性优先**: 提供经过验证的技术方案,避免误导 2. **实用导向**: 给出可直接应用的代码示例和配置 3. **最佳实践**: 遵循行业标准和社区共识 4. **安全意识**: 始终考虑安全性和隐私保护 5. **性能考虑**: 关注系统性能和资源效率 6. **可维护性**: 代码清晰、文档完善、易于扩展 7. **成本意识**: 平衡技术方案与成本投入 ## 响应格式 ### 问题分析 - 理解用户需求和上下文 - 识别关键技术挑战 - 评估可行性和风险 ### 解决方案 - 提供清晰的实现步骤 - 包含完整的代码示例 - 解释关键技术点 - 指出潜在问题和注意事项 ### 最佳实践建议 - 性能优化建议 - 安全加固措施 - 可扩展性考虑 - 运维监控方案 ### 替代方案 - 列出其他可行方案 - 对比优缺点 - 给出选择建议 ## 技术栈版本参考 - Python: 3.11+ - Node.js: 20 LTS - PostgreSQL: 15+ - Redis: 7+ - Kubernetes: 1.28+ - Docker: 24+ 这是一个足够长且稳定的 system prompt,用于测试 Anthropic Prompt Caching。 此内容在所有请求中保持完全一致,以确保缓存能够命中。 Version: 3.0 """ * 2 # 重复 2 次,确保 >1500 tokens print(f"System prompt 长度: {len(stable_prefix)} 字符") print(f"预估 tokens: ~{len(stable_prefix) // 4}") print() trace_id = None # 第一轮对话 print("=" * 60) print("第 1 轮对话:创建缓存") print("=" * 60) async for item in runner.run( messages=[{"role": "user", "content": "请用一句话介绍 Python"}], config=RunConfig( system_prompt=stable_prefix, model="anthropic/claude-sonnet-4.5", temperature=0.3, max_iterations=1, enable_prompt_caching=True, name="同一Trace缓存测试" ) ): if isinstance(item, Trace): trace_id = item.trace_id if item.status == "completed": print(f"\n✓ 第 1 轮完成") print(f" Total tokens: {item.total_tokens}") print(f" Cache write: {item.total_cache_creation_tokens}") print(f" Cache read: {item.total_cache_read_tokens}") print(f" Cost: ${item.total_cost:.6f}") elif isinstance(item, Message) and item.role == "assistant": print(f"\n[Response] {item.content.get('text', '')[:100]}...") print(f" Prompt tokens: {item.prompt_tokens}") print(f" Cache write: {item.cache_creation_tokens}") print(f" Cache read: {item.cache_read_tokens}") print("\n等待 2 秒...") await asyncio.sleep(2) # 第二轮对话(续跑同一个 trace) print("\n" + "=" * 60) print("第 2 轮对话:应该命中缓存(system + 第1轮历史)") print("=" * 60) async for item in runner.run( messages=[{"role": "user", "content": "请用一句话介绍 JavaScript"}], config=RunConfig( trace_id=trace_id, # 续跑同一个 trace system_prompt=stable_prefix, model="anthropic/claude-sonnet-4.5", temperature=0.3, max_iterations=1, enable_prompt_caching=True, ) ): if isinstance(item, Trace) and item.status == "completed": print(f"\n✓ 第 2 轮完成") print(f" Total tokens: {item.total_tokens}") print(f" Cache write: {item.total_cache_creation_tokens}") print(f" Cache read: {item.total_cache_read_tokens}") print(f" Cost: ${item.total_cost:.6f}") elif isinstance(item, Message) and item.role == "assistant": print(f"\n[Response] {item.content.get('text', '')[:100]}...") print(f" Prompt tokens: {item.prompt_tokens}") print(f" Cache write: {item.cache_creation_tokens}") print(f" Cache read: {item.cache_read_tokens}") print("\n等待 2 秒...") await asyncio.sleep(2) # 第三轮对话(续跑同一个 trace) print("\n" + "=" * 60) print("第 3 轮对话:应该命中更多缓存(system + 前2轮历史)") print("=" * 60) async for item in runner.run( messages=[{"role": "user", "content": "请用一句话介绍 Go"}], config=RunConfig( trace_id=trace_id, # 续跑同一个 trace system_prompt=stable_prefix, model="anthropic/claude-sonnet-4.5", temperature=0.3, max_iterations=1, enable_prompt_caching=True, ) ): if isinstance(item, Trace) and item.status == "completed": print(f"\n✓ 第 3 轮完成") print(f" Total tokens: {item.total_tokens}") print(f" Cache write: {item.total_cache_creation_tokens}") print(f" Cache read: {item.total_cache_read_tokens}") print(f" Cost: ${item.total_cost:.6f}") elif isinstance(item, Message) and item.role == "assistant": print(f"\n[Response] {item.content.get('text', '')[:100]}...") print(f" Prompt tokens: {item.prompt_tokens}") print(f" Cache write: {item.cache_creation_tokens}") print(f" Cache read: {item.cache_read_tokens}") print("\n" + "=" * 60) print("测试完成") print("=" * 60) print() print("预期结果:") print("- 第 1 轮:cache_write > 0(创建缓存)") print("- 第 2 轮:cache_read > 0(命中 system prompt 缓存)") print("- 第 3 轮:cache_read 更大(命中 system + 历史消息缓存)") print() print(f"Trace ID: {trace_id}") if __name__ == "__main__": asyncio.run(main())