| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- #!/usr/bin/env python3
- """
- 集成测试 6: 信号驱动机制测试
- 测试目标:
- - 验证信号的发送和接收机制
- - 验证 wait=True 模式(同步等待信号)
- - 验证后台任务执行
- - 验证信号轮询机制
- - 验证错误信号传播
- """
- import asyncio
- import sys
- import os
- from pathlib import Path
- # 添加项目根目录到 Python 路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from dotenv import load_dotenv
- load_dotenv()
- from agent.llm.prompts import SimplePrompt
- from agent.core.runner import AgentRunner
- from agent.execution import FileSystemTraceStore, Trace, Message
- from agent.llm import create_openrouter_llm_call
- async def main():
- """运行测试"""
- # 路径配置
- base_dir = Path(__file__).parent
- prompt_path = base_dir / "task.prompt"
- output_dir = base_dir / "output"
- print("=" * 80)
- print("集成测试 6: 信号驱动机制测试")
- print("=" * 80)
- print()
- # 1. 加载 prompt
- print("1. 加载任务...")
- prompt = SimplePrompt(prompt_path)
- system_prompt = prompt._messages.get("system", "")
- user_prompt = prompt._messages.get("user", "")
- print(f" ✓ 任务类型: 数据验证模块实现")
- print(f" ✓ 测试重点: 信号机制")
- print(f" ✓ 监控内容: 信号发送、接收、轮询")
- print()
- # 2. 创建 Agent Runner
- print("2. 创建 Agent Runner...")
- print(f" - 模型: Claude Sonnet 4.5")
- print(f" - 信号机制: 已启用")
- print()
- runner = AgentRunner(
- trace_store=FileSystemTraceStore(base_path=".trace"),
- llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"),
- skills_dir=str(project_root / "agent" / "skills"),
- debug=False
- )
- # 验证 SignalBus 已创建
- if hasattr(runner, 'signal_bus'):
- print(" ✅ SignalBus 已创建")
- else:
- print(" ❌ SignalBus 未创建")
- return
- # 3. 运行 Agent
- print()
- print("3. 启动 Agent...")
- print("=" * 80)
- print()
- # 创建输出目录
- output_dir.mkdir(exist_ok=True)
- # 监控变量
- current_trace_id = None
- goal_used = False
- subagent_used = False
- evaluate_used = False
- iteration_count = 0
- tool_calls_count = {}
- evaluation_count = 0
- evaluation_results = []
- # 信号监控
- signals_emitted = []
- signals_received = []
- signal_types = set()
- # 钩子:监控信号发送
- original_emit = runner.signal_bus.emit
- def monitored_emit(signal):
- signals_emitted.append({
- "type": signal.type,
- "trace_id": signal.trace_id,
- "data_keys": list(signal.data.keys())
- })
- signal_types.add(signal.type)
- print(f" [信号发送] {signal.type} (trace: {signal.trace_id[:8]}...)")
- return original_emit(signal)
- runner.signal_bus.emit = monitored_emit
- # 钩子:监控信号接收
- original_check_buffer = runner.signal_bus.check_buffer
- def monitored_check_buffer(trace_id):
- signals = original_check_buffer(trace_id)
- if signals:
- for signal in signals:
- signals_received.append({
- "type": signal.type,
- "trace_id": signal.trace_id
- })
- print(f" [信号接收] {signal.type} (trace: {signal.trace_id[:8]}...)")
- return signals
- runner.signal_bus.check_buffer = monitored_check_buffer
- async for item in runner.run(
- task=user_prompt,
- system_prompt=system_prompt,
- model="anthropic/claude-sonnet-4.5",
- temperature=0.5,
- max_iterations=30,
- ):
- # 处理 Trace 对象
- if isinstance(item, Trace):
- current_trace_id = item.trace_id
- if item.status == "running":
- print(f"[Trace] 开始: {item.trace_id[:8]}...")
- elif item.status == "completed":
- print()
- print("=" * 80)
- print(f"[Trace] 完成")
- print(f" - 总消息数: {item.total_messages}")
- print(f" - 总 Token 数: {item.total_tokens}")
- print(f" - 总成本: ${item.total_cost:.4f}")
- print("=" * 80)
- elif item.status == "failed":
- print()
- print(f"[Trace] 失败: {item.error_message}")
- # 处理 Message 对象
- elif isinstance(item, Message):
- if item.role == "assistant":
- iteration_count += 1
- content = item.content
- if isinstance(content, dict):
- text = content.get("text", "")
- tool_calls = content.get("tool_calls")
- # 显示 Agent 的思考
- if text and not tool_calls:
- print(f"\n[{iteration_count}] Agent 回复:")
- print(f" {text[:200]}{'...' if len(text) > 200 else ''}")
- elif text:
- print(f"\n[{iteration_count}] Agent 思考:")
- print(f" {text[:150]}{'...' if len(text) > 150 else ''}")
- # 显示工具调用
- if tool_calls:
- for tc in tool_calls:
- tool_name = tc.get("function", {}).get("name", "unknown")
- args = tc.get("function", {}).get("arguments", {})
- # 如果 args 是字符串,尝试解析为 JSON
- if isinstance(args, str):
- import json
- try:
- args = json.loads(args)
- except:
- args = {}
- # 统计工具使用
- tool_calls_count[tool_name] = tool_calls_count.get(tool_name, 0) + 1
- # 检测关键工具使用
- if tool_name == "goal":
- goal_used = True
- if isinstance(args, dict):
- if args.get("add"):
- print(f" → goal(add): {args['add'][:80]}...")
- elif args.get("done"):
- print(f" → goal(done): {args['done'][:80]}...")
- elif args.get("focus"):
- print(f" → goal(focus): {args['focus']}")
- elif tool_name == "subagent":
- subagent_used = True
- if isinstance(args, dict):
- mode = args.get("mode", "unknown")
- wait = args.get("wait", True)
- if mode == "evaluate":
- evaluate_used = True
- evaluation_count += 1
- target = args.get("target_goal_id", "?")
- wait_str = f"wait={wait}"
- print(f" → subagent(evaluate, {wait_str}): 评估目标 {target} [评估 #{evaluation_count}]")
- else:
- # 其他工具简化显示
- if tool_name in ["read_file", "write_file", "edit_file"]:
- if isinstance(args, dict):
- file_path = args.get("file_path", "")
- if file_path:
- file_name = Path(file_path).name
- print(f" → {tool_name}: {file_name}")
- elif item.role == "tool":
- # 检查是否是评估结果
- content = item.content
- if isinstance(content, str):
- import json
- try:
- result = json.loads(content)
- if isinstance(result, dict) and "passed" in result:
- passed = result.get("passed", False)
- reason = result.get("reason", "")[:100]
- evaluation_results.append({
- "passed": passed,
- "reason": reason
- })
- status = "✅ 通过" if passed else "❌ 不通过"
- print(f" [评估结果] {status}")
- except:
- pass
- # 4. 测试结果总结
- print()
- print("=" * 80)
- print("测试结果总结")
- print("=" * 80)
- print()
- print("功能使用情况:")
- print(f" - goal 工具: {'✅ 使用' if goal_used else '❌ 未使用'}")
- print(f" - subagent 工具: {'✅ 使用' if subagent_used else '❌ 未使用'}")
- print(f" - evaluate 模式: {'✅ 使用' if evaluate_used else '❌ 未使用'} ({evaluation_count} 次)")
- print()
- print("工具调用统计:")
- for tool_name, count in sorted(tool_calls_count.items(), key=lambda x: x[1], reverse=True):
- print(f" - {tool_name}: {count} 次")
- print()
- # 信号机制测试结果
- print("=" * 80)
- print("信号机制测试结果")
- print("=" * 80)
- print()
- print(f"信号统计:")
- print(f" - 发送信号数: {len(signals_emitted)}")
- print(f" - 接收信号数: {len(signals_received)}")
- print(f" - 信号类型: {', '.join(sorted(signal_types))}")
- print()
- if signals_emitted:
- print("发送的信号:")
- for i, sig in enumerate(signals_emitted, 1):
- print(f" {i}. {sig['type']} (trace: {sig['trace_id'][:8]}...)")
- print()
- if signals_received:
- print("接收的信号:")
- for i, sig in enumerate(signals_received, 1):
- print(f" {i}. {sig['type']} (trace: {sig['trace_id'][:8]}...)")
- print()
- # 检查输出文件
- print("输出文件:")
- validator_file = output_dir / "validator.py"
- report_file = output_dir / "REPORT.md"
- if validator_file.exists():
- size = validator_file.stat().st_size
- print(f" ✅ validator.py ({size} bytes)")
- else:
- print(f" ❌ validator.py (未生成)")
- if report_file.exists():
- size = report_file.stat().st_size
- print(f" ✅ REPORT.md ({size} bytes)")
- else:
- print(f" ❌ REPORT.md (未生成)")
- print()
- # 验证测试目标
- print("=" * 80)
- print("测试目标验证")
- print("=" * 80)
- print()
- success = True
- # 1. 验证 SignalBus 创建
- if hasattr(runner, 'signal_bus'):
- print(f" ✅ SignalBus 已创建")
- else:
- print(f" ❌ SignalBus 未创建")
- success = False
- # 2. 验证信号发送
- if len(signals_emitted) > 0:
- print(f" ✅ 信号已发送 ({len(signals_emitted)} 个)")
- else:
- print(f" ❌ 未发送信号")
- success = False
- # 3. 验证信号接收
- if len(signals_received) > 0:
- print(f" ✅ 信号已接收 ({len(signals_received)} 个)")
- else:
- print(f" ❌ 未接收信号")
- success = False
- # 4. 验证信号类型
- expected_types = {"subagent.start", "subagent.complete"}
- if expected_types.issubset(signal_types):
- print(f" ✅ 包含预期的信号类型")
- else:
- missing = expected_types - signal_types
- print(f" ⚠️ 缺少信号类型: {', '.join(missing)}")
- # 5. 验证 subagent 使用
- if evaluate_used:
- print(f" ✅ 使用了 subagent(evaluate) ({evaluation_count} 次)")
- else:
- print(f" ❌ 未使用 subagent(evaluate)")
- success = False
- # 6. 验证评估结果
- if evaluation_results:
- print(f" ✅ 获得了评估结果 ({len(evaluation_results)} 次)")
- else:
- print(f" ❌ 未获得评估结果")
- # 7. 验证文件生成
- if validator_file.exists():
- print(f" ✅ 生成了代码文件")
- else:
- print(f" ❌ 未生成代码文件")
- success = False
- print()
- if success:
- print("🎉 测试成功!信号驱动机制工作正常。")
- else:
- print("⚠️ 测试未完全通过,请检查实现。")
- print()
- if current_trace_id:
- print(f"详细日志: .trace/{current_trace_id}/")
- print("=" * 80)
- if __name__ == "__main__":
- asyncio.run(main())
|