#!/usr/bin/env python3 """ 集成测试 6: 信号驱动机制测试 测试目标: - 验证信号的发送和接收机制 - 验证 wait=True 模式(同步等待信号) - 验证后台任务执行 - 验证信号轮询机制 - 验证错误信号传播 """ import asyncio import sys import os from pathlib import Path # 添加项目根目录到 Python 路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv() from agent.llm.prompts import SimplePrompt from agent.core.runner import AgentRunner from agent.execution import FileSystemTraceStore, Trace, Message from agent.llm import create_openrouter_llm_call async def main(): """运行测试""" # 路径配置 base_dir = Path(__file__).parent prompt_path = base_dir / "task.prompt" output_dir = base_dir / "output" print("=" * 80) print("集成测试 6: 信号驱动机制测试") print("=" * 80) print() # 1. 加载 prompt print("1. 加载任务...") prompt = SimplePrompt(prompt_path) system_prompt = prompt._messages.get("system", "") user_prompt = prompt._messages.get("user", "") print(f" ✓ 任务类型: 数据验证模块实现") print(f" ✓ 测试重点: 信号机制") print(f" ✓ 监控内容: 信号发送、接收、轮询") print() # 2. 创建 Agent Runner print("2. 创建 Agent Runner...") print(f" - 模型: Claude Sonnet 4.5") print(f" - 信号机制: 已启用") print() runner = AgentRunner( trace_store=FileSystemTraceStore(base_path=".trace"), llm_call=create_openrouter_llm_call(model="anthropic/claude-sonnet-4.5"), skills_dir=str(project_root / "agent" / "skills"), debug=False ) # 验证 SignalBus 已创建 if hasattr(runner, 'signal_bus'): print(" ✅ SignalBus 已创建") else: print(" ❌ SignalBus 未创建") return # 3. 运行 Agent print() print("3. 启动 Agent...") print("=" * 80) print() # 创建输出目录 output_dir.mkdir(exist_ok=True) # 监控变量 current_trace_id = None goal_used = False subagent_used = False evaluate_used = False iteration_count = 0 tool_calls_count = {} evaluation_count = 0 evaluation_results = [] # 信号监控 signals_emitted = [] signals_received = [] signal_types = set() # 钩子:监控信号发送 original_emit = runner.signal_bus.emit def monitored_emit(signal): signals_emitted.append({ "type": signal.type, "trace_id": signal.trace_id, "data_keys": list(signal.data.keys()) }) signal_types.add(signal.type) print(f" [信号发送] {signal.type} (trace: {signal.trace_id[:8]}...)") return original_emit(signal) runner.signal_bus.emit = monitored_emit # 钩子:监控信号接收 original_check_buffer = runner.signal_bus.check_buffer def monitored_check_buffer(trace_id): signals = original_check_buffer(trace_id) if signals: for signal in signals: signals_received.append({ "type": signal.type, "trace_id": signal.trace_id }) print(f" [信号接收] {signal.type} (trace: {signal.trace_id[:8]}...)") return signals runner.signal_bus.check_buffer = monitored_check_buffer async for item in runner.run( task=user_prompt, system_prompt=system_prompt, model="anthropic/claude-sonnet-4.5", temperature=0.5, max_iterations=30, ): # 处理 Trace 对象 if isinstance(item, Trace): current_trace_id = item.trace_id if item.status == "running": print(f"[Trace] 开始: {item.trace_id[:8]}...") elif item.status == "completed": print() print("=" * 80) print(f"[Trace] 完成") print(f" - 总消息数: {item.total_messages}") print(f" - 总 Token 数: {item.total_tokens}") print(f" - 总成本: ${item.total_cost:.4f}") print("=" * 80) elif item.status == "failed": print() print(f"[Trace] 失败: {item.error_message}") # 处理 Message 对象 elif isinstance(item, Message): if item.role == "assistant": iteration_count += 1 content = item.content if isinstance(content, dict): text = content.get("text", "") tool_calls = content.get("tool_calls") # 显示 Agent 的思考 if text and not tool_calls: print(f"\n[{iteration_count}] Agent 回复:") print(f" {text[:200]}{'...' if len(text) > 200 else ''}") elif text: print(f"\n[{iteration_count}] Agent 思考:") print(f" {text[:150]}{'...' if len(text) > 150 else ''}") # 显示工具调用 if tool_calls: for tc in tool_calls: tool_name = tc.get("function", {}).get("name", "unknown") args = tc.get("function", {}).get("arguments", {}) # 如果 args 是字符串,尝试解析为 JSON if isinstance(args, str): import json try: args = json.loads(args) except: args = {} # 统计工具使用 tool_calls_count[tool_name] = tool_calls_count.get(tool_name, 0) + 1 # 检测关键工具使用 if tool_name == "goal": goal_used = True if isinstance(args, dict): if args.get("add"): print(f" → goal(add): {args['add'][:80]}...") elif args.get("done"): print(f" → goal(done): {args['done'][:80]}...") elif args.get("focus"): print(f" → goal(focus): {args['focus']}") elif tool_name == "subagent": subagent_used = True if isinstance(args, dict): mode = args.get("mode", "unknown") wait = args.get("wait", True) if mode == "evaluate": evaluate_used = True evaluation_count += 1 target = args.get("target_goal_id", "?") wait_str = f"wait={wait}" print(f" → subagent(evaluate, {wait_str}): 评估目标 {target} [评估 #{evaluation_count}]") else: # 其他工具简化显示 if tool_name in ["read_file", "write_file", "edit_file"]: if isinstance(args, dict): file_path = args.get("file_path", "") if file_path: file_name = Path(file_path).name print(f" → {tool_name}: {file_name}") elif item.role == "tool": # 检查是否是评估结果 content = item.content if isinstance(content, str): import json try: result = json.loads(content) if isinstance(result, dict) and "passed" in result: passed = result.get("passed", False) reason = result.get("reason", "")[:100] evaluation_results.append({ "passed": passed, "reason": reason }) status = "✅ 通过" if passed else "❌ 不通过" print(f" [评估结果] {status}") except: pass # 4. 测试结果总结 print() print("=" * 80) print("测试结果总结") print("=" * 80) print() print("功能使用情况:") print(f" - goal 工具: {'✅ 使用' if goal_used else '❌ 未使用'}") print(f" - subagent 工具: {'✅ 使用' if subagent_used else '❌ 未使用'}") print(f" - evaluate 模式: {'✅ 使用' if evaluate_used else '❌ 未使用'} ({evaluation_count} 次)") print() print("工具调用统计:") for tool_name, count in sorted(tool_calls_count.items(), key=lambda x: x[1], reverse=True): print(f" - {tool_name}: {count} 次") print() # 信号机制测试结果 print("=" * 80) print("信号机制测试结果") print("=" * 80) print() print(f"信号统计:") print(f" - 发送信号数: {len(signals_emitted)}") print(f" - 接收信号数: {len(signals_received)}") print(f" - 信号类型: {', '.join(sorted(signal_types))}") print() if signals_emitted: print("发送的信号:") for i, sig in enumerate(signals_emitted, 1): print(f" {i}. {sig['type']} (trace: {sig['trace_id'][:8]}...)") print() if signals_received: print("接收的信号:") for i, sig in enumerate(signals_received, 1): print(f" {i}. {sig['type']} (trace: {sig['trace_id'][:8]}...)") print() # 检查输出文件 print("输出文件:") validator_file = output_dir / "validator.py" report_file = output_dir / "REPORT.md" if validator_file.exists(): size = validator_file.stat().st_size print(f" ✅ validator.py ({size} bytes)") else: print(f" ❌ validator.py (未生成)") if report_file.exists(): size = report_file.stat().st_size print(f" ✅ REPORT.md ({size} bytes)") else: print(f" ❌ REPORT.md (未生成)") print() # 验证测试目标 print("=" * 80) print("测试目标验证") print("=" * 80) print() success = True # 1. 验证 SignalBus 创建 if hasattr(runner, 'signal_bus'): print(f" ✅ SignalBus 已创建") else: print(f" ❌ SignalBus 未创建") success = False # 2. 验证信号发送 if len(signals_emitted) > 0: print(f" ✅ 信号已发送 ({len(signals_emitted)} 个)") else: print(f" ❌ 未发送信号") success = False # 3. 验证信号接收 if len(signals_received) > 0: print(f" ✅ 信号已接收 ({len(signals_received)} 个)") else: print(f" ❌ 未接收信号") success = False # 4. 验证信号类型 expected_types = {"subagent.start", "subagent.complete"} if expected_types.issubset(signal_types): print(f" ✅ 包含预期的信号类型") else: missing = expected_types - signal_types print(f" ⚠️ 缺少信号类型: {', '.join(missing)}") # 5. 验证 subagent 使用 if evaluate_used: print(f" ✅ 使用了 subagent(evaluate) ({evaluation_count} 次)") else: print(f" ❌ 未使用 subagent(evaluate)") success = False # 6. 验证评估结果 if evaluation_results: print(f" ✅ 获得了评估结果 ({len(evaluation_results)} 次)") else: print(f" ❌ 未获得评估结果") # 7. 验证文件生成 if validator_file.exists(): print(f" ✅ 生成了代码文件") else: print(f" ❌ 未生成代码文件") success = False print() if success: print("🎉 测试成功!信号驱动机制工作正常。") else: print("⚠️ 测试未完全通过,请检查实现。") print() if current_trace_id: print(f"详细日志: .trace/{current_trace_id}/") print("=" * 80) if __name__ == "__main__": asyncio.run(main())