run_compression_test.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. 压缩后缓存功能测试
  3. 通过降低压缩阈值来快速触发压缩,测试压缩后的缓存行为
  4. """
  5. import argparse
  6. import os
  7. import sys
  8. import asyncio
  9. from pathlib import Path
  10. # 添加项目根目录到 Python 路径
  11. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  12. from dotenv import load_dotenv
  13. load_dotenv()
  14. from agent.llm.prompts import SimplePrompt
  15. from agent.core.runner import AgentRunner, RunConfig
  16. from agent.trace import FileSystemTraceStore, Trace, Message
  17. from agent.trace.compaction import CompressionConfig
  18. from agent.llm import create_openrouter_llm_call
  19. async def main():
  20. # 路径配置
  21. base_dir = Path(__file__).parent
  22. prompt_path = base_dir / "test.prompt"
  23. output_dir = base_dir / "output"
  24. output_dir.mkdir(exist_ok=True)
  25. print("=" * 60)
  26. print("压缩后缓存功能测试")
  27. print("=" * 60)
  28. print()
  29. # 加载 prompt
  30. print("1. 加载 prompt 配置...")
  31. prompt = SimplePrompt(prompt_path)
  32. # 构建消息
  33. print("2. 构建任务消息...")
  34. messages = prompt.build_messages()
  35. # 创建 Agent Runner with 低压缩阈值
  36. print("3. 创建 Agent Runner...")
  37. print(f" - 模型: {prompt.config.get('model', 'sonnet-4.6')}")
  38. print(f" - 压缩阈值: 10,000 tokens (降低以快速触发)")
  39. store = FileSystemTraceStore(base_path=".trace")
  40. # 创建自定义压缩配置
  41. compression_config = CompressionConfig(
  42. max_tokens=10000, # 降低到10K以快速触发压缩
  43. threshold_ratio=0.5,
  44. keep_recent_messages=10
  45. )
  46. runner = AgentRunner(
  47. trace_store=store,
  48. llm_call=create_openrouter_llm_call(model=f"anthropic/claude-{prompt.config.get('model', 'sonnet-4.6')}"),
  49. skills_dir=None,
  50. debug=True,
  51. compression_config=compression_config # 使用自定义压缩配置
  52. )
  53. print(f"4. 启动新 Agent 模式...")
  54. print()
  55. current_trace_id = None
  56. compression_detected = False
  57. try:
  58. initial_messages = messages
  59. config = RunConfig(
  60. model=f"anthropic/claude-{prompt.config.get('model', 'sonnet-4.6')}",
  61. temperature=float(prompt.config.get('temperature', 0.3)),
  62. max_iterations=100,
  63. name="压缩缓存测试",
  64. )
  65. print("▶️ 开始执行...")
  66. print()
  67. async for item in runner.run(messages=initial_messages, config=config):
  68. # 处理 Trace 对象
  69. if isinstance(item, Trace):
  70. current_trace_id = item.trace_id
  71. if item.status == "running":
  72. print(f"[Trace] 开始: {item.trace_id[:8]}...")
  73. elif item.status == "completed":
  74. print(f"\n[Trace] ✅ 完成")
  75. print(f" - Total messages: {item.total_messages}")
  76. print(f" - Total tokens: {item.total_tokens:,}")
  77. print(f" - Cache creation: {item.total_cache_creation_tokens:,}")
  78. print(f" - Cache read: {item.total_cache_read_tokens:,}")
  79. if item.total_prompt_tokens > 0:
  80. print(f" - Cache hit rate: {item.total_cache_read_tokens / item.total_prompt_tokens * 100:.1f}%")
  81. print(f" - Total cost: ${item.total_cost:.4f}")
  82. elif item.status == "failed":
  83. print(f"\n[Trace] ❌ 失败: {item.error_message}")
  84. # 处理 Message 对象
  85. elif isinstance(item, Message):
  86. if item.role == "assistant":
  87. content = item.content
  88. if isinstance(content, dict):
  89. tool_calls = content.get("tool_calls")
  90. if tool_calls:
  91. print(f"[{item.sequence}] Tool calls: {len(tool_calls)}")
  92. # 检测压缩消息
  93. if item.role == "user" and isinstance(item.content, str):
  94. if "对话历史摘要" in item.content or "自动压缩" in item.content:
  95. if not compression_detected:
  96. compression_detected = True
  97. print(f"\n{'='*60}")
  98. print(f"🔄 检测到压缩发生在 sequence {item.sequence}")
  99. print(f"{'='*60}\n")
  100. except KeyboardInterrupt:
  101. print("\n\n用户中断 (Ctrl+C)")
  102. if current_trace_id:
  103. await runner.stop(current_trace_id)
  104. # 分析缓存情况
  105. if current_trace_id:
  106. print()
  107. print("=" * 60)
  108. print("缓存分析")
  109. print("=" * 60)
  110. trace = await store.get_trace(current_trace_id)
  111. if trace:
  112. print(f"\nTrace ID: {current_trace_id}")
  113. print(f"总消息数: {trace.total_messages}")
  114. print(f"总 tokens: {trace.total_tokens:,}")
  115. print(f"Prompt tokens: {trace.total_prompt_tokens:,}")
  116. print(f"Cache creation: {trace.total_cache_creation_tokens:,} ({trace.total_cache_creation_tokens / trace.total_prompt_tokens * 100:.1f}%)")
  117. print(f"Cache read: {trace.total_cache_read_tokens:,} ({trace.total_cache_read_tokens / trace.total_prompt_tokens * 100:.1f}%)")
  118. print(f"总成本: ${trace.total_cost:.4f}")
  119. if compression_detected:
  120. print(f"\n✅ 压缩已触发")
  121. print(f" - 压缩后缓存机制应该只保留系统prompt缓存")
  122. print(f" - 新的message缓存点会在压缩后重新创建")
  123. else:
  124. print(f"\n⚠️ 未检测到压缩")
  125. print()
  126. print(f"Trace 目录: .trace/{current_trace_id}")
  127. if __name__ == "__main__":
  128. asyncio.run(main())