""" 测试重构后的 Goal 模型功能 测试内容: 1. Goal 模型的新字段(evaluation 相关) 2. 序列化和反序列化(to_dict/from_dict) 3. 向后兼容性(加载旧数据) """ import asyncio import sys from pathlib import Path from datetime import datetime # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from agent.models.goal import Goal, GoalTree, GoalStats def test_goal_new_fields(): """测试 Goal 模型的新字段""" print("=" * 80) print("测试 1: Goal 模型新字段") print("=" * 80) print() # 创建带有 evaluation 字段的 Goal goal = Goal( id="1", description="实现用户登录功能", type="agent_call", agent_call_mode="evaluation", target_goal_id="3", evaluation_input={ "goal_description": "实现用户登录功能", "actual_result": "已实现登录接口和前端页面", "context": {"files": ["login.py", "login.html"]} }, evaluation_result={ "passed": True, "reason": "功能完整,符合要求", "suggestions": [] }, completed_at=datetime.now() ) print("1. 创建的 Goal 对象:") print(f" ID: {goal.id}") print(f" 描述: {goal.description}") print(f" 类型: {goal.type}") print(f" 模式: {goal.agent_call_mode}") print(f" 目标 Goal ID: {goal.target_goal_id}") print(f" 评估输入: {goal.evaluation_input}") print(f" 评估结果: {goal.evaluation_result}") print(f" 完成时间: {goal.completed_at}") print() print("=" * 80) print("✅ 新字段测试完成") print("=" * 80) def test_goal_serialization(): """测试 Goal 的序列化和反序列化""" print("\n" + "=" * 80) print("测试 2: Goal 序列化和反序列化") print("=" * 80) print() # 创建 Goal original_goal = Goal( id="1", description="测试目标", reason="测试序列化", type="agent_call", agent_call_mode="evaluation", target_goal_id="2", evaluation_input={"actual_result": "测试结果"}, evaluation_result={"passed": True, "reason": "测试通过"}, completed_at=datetime.now() ) print("1. 原始 Goal:") print(f" {original_goal}") print() # 序列化 print("2. 序列化为字典:") goal_dict = original_goal.to_dict() print(f" ID: {goal_dict['id']}") print(f" 描述: {goal_dict['description']}") print(f" target_goal_id: {goal_dict.get('target_goal_id')}") print(f" evaluation_input: {goal_dict.get('evaluation_input')}") print(f" evaluation_result: {goal_dict.get('evaluation_result')}") print(f" completed_at: {goal_dict.get('completed_at')}") print() # 反序列化 print("3. 从字典反序列化:") restored_goal = Goal.from_dict(goal_dict) print(f" ID: {restored_goal.id}") print(f" 描述: {restored_goal.description}") print(f" target_goal_id: {restored_goal.target_goal_id}") print(f" evaluation_input: {restored_goal.evaluation_input}") print(f" evaluation_result: {restored_goal.evaluation_result}") print(f" completed_at: {restored_goal.completed_at}") print() # 验证一致性 print("4. 验证序列化前后一致性:") assert restored_goal.id == original_goal.id assert restored_goal.description == original_goal.description assert restored_goal.target_goal_id == original_goal.target_goal_id assert restored_goal.evaluation_input == original_goal.evaluation_input assert restored_goal.evaluation_result == original_goal.evaluation_result print(" ✅ 所有字段一致") print() print("=" * 80) print("✅ 序列化测试完成") print("=" * 80) def test_backward_compatibility(): """测试向后兼容性(加载旧数据)""" print("\n" + "=" * 80) print("测试 3: 向后兼容性") print("=" * 80) print() # 模拟旧版本的 Goal 数据(没有新字段) old_goal_dict = { "id": "1", "description": "旧版本的目标", "reason": "测试兼容性", "parent_id": None, "type": "normal", "status": "pending", "summary": None, "sub_trace_ids": None, "agent_call_mode": None, "sub_trace_metadata": None, "self_stats": { "message_count": 0, "total_tokens": 0, "total_cost": 0.0, "preview": None }, "cumulative_stats": { "message_count": 0, "total_tokens": 0, "total_cost": 0.0, "preview": None }, "created_at": "2026-02-07T10:00:00" # 注意:没有 target_goal_id, evaluation_input, evaluation_result, completed_at } print("1. 旧版本的 Goal 数据(缺少新字段):") print(f" {old_goal_dict}") print() # 尝试加载旧数据 print("2. 从旧数据加载 Goal:") try: goal = Goal.from_dict(old_goal_dict) print(f" ✅ 成功加载") print(f" ID: {goal.id}") print(f" 描述: {goal.description}") print(f" target_goal_id: {goal.target_goal_id} (应该是 None)") print(f" evaluation_input: {goal.evaluation_input} (应该是 None)") print(f" evaluation_result: {goal.evaluation_result} (应该是 None)") print(f" completed_at: {goal.completed_at} (应该是 None)") print() # 验证新字段为 None assert goal.target_goal_id is None assert goal.evaluation_input is None assert goal.evaluation_result is None assert goal.completed_at is None print(" ✅ 新字段默认值正确(None)") print() except Exception as e: print(f" ❌ 加载失败: {e}") import traceback traceback.print_exc() print("=" * 80) print("✅ 向后兼容性测试完成") print("=" * 80) def test_goal_tree_serialization(): """测试 GoalTree 的序列化""" print("\n" + "=" * 80) print("测试 4: GoalTree 序列化") print("=" * 80) print() # 创建 GoalTree tree = GoalTree(mission="测试任务") # 添加目标 goals = tree.add_goals( ["目标1", "目标2", "目标3"], reasons=["理由1", "理由2", "理由3"] ) # 为第一个目标添加子目标 tree.add_goals( ["子目标1.1", "子目标1.2"], parent_id=goals[0].id ) # 设置一个目标为 evaluation 类型 goals[0].type = "agent_call" goals[0].agent_call_mode = "evaluation" goals[0].target_goal_id = goals[1].id goals[0].evaluation_input = {"actual_result": "测试"} goals[0].evaluation_result = {"passed": True} print("1. 创建的 GoalTree:") print(tree.to_prompt()) print() # 序列化 print("2. 序列化 GoalTree:") tree_dict = tree.to_dict() print(f" Mission: {tree_dict['mission']}") print(f" Goals 数量: {len(tree_dict['goals'])}") print(f" Current ID: {tree_dict['current_id']}") print() # 反序列化 print("3. 从字典恢复 GoalTree:") restored_tree = GoalTree.from_dict(tree_dict) print(f" Mission: {restored_tree.mission}") print(f" Goals 数量: {len(restored_tree.goals)}") print(f" Current ID: {restored_tree.current_id}") print() # 验证 evaluation 字段 print("4. 验证 evaluation 字段:") restored_goal = restored_tree.find(goals[0].id) print(f" target_goal_id: {restored_goal.target_goal_id}") print(f" evaluation_input: {restored_goal.evaluation_input}") print(f" evaluation_result: {restored_goal.evaluation_result}") print() assert restored_goal.target_goal_id == goals[1].id assert restored_goal.evaluation_input == {"actual_result": "测试"} assert restored_goal.evaluation_result == {"passed": True} print(" ✅ evaluation 字段正确") print() print("=" * 80) print("✅ GoalTree 序列化测试完成") print("=" * 80) def test_agent_call_mode_values(): """测试 agent_call_mode 的所有可能值""" print("\n" + "=" * 80) print("测试 5: agent_call_mode 的值") print("=" * 80) print() modes = ["explore", "delegate", "sequential", "evaluation"] print("1. 测试所有 agent_call_mode 值:") for mode in modes: goal = Goal( id=f"goal-{mode}", description=f"测试 {mode} 模式", type="agent_call", agent_call_mode=mode ) print(f" ✅ {mode}: {goal.agent_call_mode}") print() # 序列化和反序列化 print("2. 测试序列化和反序列化:") for mode in modes: goal = Goal( id=f"goal-{mode}", description=f"测试 {mode} 模式", type="agent_call", agent_call_mode=mode ) goal_dict = goal.to_dict() restored_goal = Goal.from_dict(goal_dict) assert restored_goal.agent_call_mode == mode print(f" ✅ {mode}: 序列化前后一致") print() print("=" * 80) print("✅ agent_call_mode 测试完成") print("=" * 80) def main(): """运行所有测试""" print("\n" + "🧪" * 40) print("Goal 模型功能测试") print("🧪" * 40 + "\n") try: test_goal_new_fields() test_goal_serialization() test_backward_compatibility() test_goal_tree_serialization() test_agent_call_mode_values() print("\n" + "=" * 80) print("🎉 所有测试完成!") print("=" * 80) except Exception as e: print(f"\n❌ 测试失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()