# 信号驱动机制测试文档 ## 测试用例:integration_test_6 ### 位置 `examples/integration_test_6/` ### 文件结构 ``` integration_test_6/ ├── README.md # 测试说明 ├── task.prompt # Agent 任务描述 ├── run.py # 测试运行脚本 └── output/ # 输出目录 ``` ## 测试目标 全面验证新实现的信号驱动 Sub-Agent 通讯机制。 ## 测试覆盖 ### 1. 基础设施 - [x] SignalBus 实例创建 - [x] signal_bus 传递到工具 context - [x] 信号发送接口(emit) - [x] 信号接收接口(check_buffer) ### 2. 信号发送 - [x] subagent.start 信号 - [x] subagent.complete 信号 - [x] 信号数据完整性(trace_id, parent_trace_id, result) ### 3. 信号接收 - [x] 主循环信号检查 - [x] _handle_signal 方法调用 - [x] 信号处理逻辑 ### 4. 后台任务 - [x] asyncio.create_task 启动 - [x] _run_subagent_background 执行 - [x] 后台任务完成后发送信号 ### 5. 等待机制 - [x] _wait_for_completion 轮询 - [x] 信号匹配(trace_id) - [x] 结果返回 ### 6. 错误处理 - [x] 错误信号发送(subagent.error) - [x] 异常传播 - [x] 超时保护(5分钟) ## 运行方式 ```bash cd examples/integration_test_6 python run.py ``` ## 监控功能 测试脚本实现了信号监控钩子: ```python # 监控信号发送 original_emit = runner.signal_bus.emit def monitored_emit(signal): print(f"[信号发送] {signal.type}") return original_emit(signal) runner.signal_bus.emit = monitored_emit # 监控信号接收 original_check_buffer = runner.signal_bus.check_buffer def monitored_check_buffer(trace_id): signals = original_check_buffer(trace_id) if signals: print(f"[信号接收] {len(signals)} 个信号") return signals runner.signal_bus.check_buffer = monitored_check_buffer ``` ## 预期输出 ### 正常流程 ``` [Trace] 开始: 12345678... [1] Agent 思考: 我将规划任务... → goal(add): 实现验证函数... [2] Agent 思考: 开始实现... → write_file: validator.py [3] Agent 思考: 使用 subagent 评估... → subagent(evaluate, wait=True): 评估目标 2 [评估 #1] [信号发送] subagent.start (trace: 12345678...) [信号接收] subagent.complete (trace: 87654321...) [评估结果] ✅ 通过 [Trace] 完成 - 总消息数: 15 - 总 Token 数: 50000 ``` ### 信号统计 ``` 信号统计: - 发送信号数: 4 - 接收信号数: 4 - 信号类型: subagent.complete, subagent.start 发送的信号: 1. subagent.start (trace: 12345678...) 2. subagent.complete (trace: 12345678...) 3. subagent.start (trace: 23456789...) 4. subagent.complete (trace: 23456789...) ``` ## 成功标准 所有以下条件必须满足: 1. ✅ SignalBus 已创建 2. ✅ 发送了信号(≥ 2 个) 3. ✅ 接收了信号(≥ 2 个) 4. ✅ 包含 subagent.start 和 subagent.complete 5. ✅ 使用了 subagent(evaluate) 6. ✅ 获得了评估结果 7. ✅ 生成了代码文件 ## 测试场景设计 ### 任务描述 实现一个数据验证模块,包含: - `validate_email()`: 邮箱验证 - `validate_phone()`: 手机号验证 - `validate_age()`: 年龄验证 ### 为什么选择这个场景? 1. **简单明确**: 任务清晰,容易实现 2. **需要评估**: 验证函数需要质量检查 3. **触发信号**: 每次 subagent 调用都会触发信号 4. **可重复**: 如果评估不通过,会重新评估 ### 预期 Agent 行为 1. 使用 goal 工具规划任务(3-4 个 goal) 2. 实现 validator.py 3. 使用 subagent(evaluate) 评估实现 4. 如果不通过,修复并重新评估 5. 生成测试报告 ## 调试信息 如果测试失败,检查: 1. **SignalBus 未创建** - 检查 runner.py 的 __init__ 方法 - 确认 `self.signal_bus = SignalBus()` 已添加 2. **信号未发送** - 检查 manager.py 的 _run_subagent_background - 确认 `self.signal_bus.emit()` 被调用 3. **信号未接收** - 检查 runner.py 的主循环 - 确认 `self.signal_bus.check_buffer()` 被调用 4. **评估未使用** - 检查 task.prompt 是否明确要求评估 - 检查 Agent 是否理解评估要求 ## 扩展测试 ### 测试 wait=False 模式 创建 integration_test_7 测试异步模式: ```python # 在 task.prompt 中明确要求使用 wait=False result = await subagent( mode="delegate", task="分析数据", wait=False # 异步模式 ) # result = {"subagent_id": "...", "status": "running"} ``` ### 测试错误信号 创建一个会失败的任务,验证错误信号: ```python # 故意触发错误 result = await subagent( mode="evaluate", target_goal_id="999", # 不存在的 goal evaluation_input={} ) # 应该收到 subagent.error 信号 ``` ### 测试超时 创建一个长时间运行的任务,验证超时保护: ```python # 设置较短的超时时间 manager._wait_for_completion(..., timeout=5.0) # 应该在 5 秒后抛出 TimeoutError ``` ## 总结 这个测试用例全面验证了信号驱动机制的核心功能: - ✅ 信号的发送和接收 - ✅ 后台任务执行 - ✅ 信号轮询机制 - ✅ wait=True 同步模式 通过实时监控信号活动,可以清楚地看到信号机制的工作流程。