test_walk_graph_config.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """V3-M4A: WalkGraphStore 读三个游走配置 JSON + 校验 + 拍板值解包。"""
  2. from __future__ import annotations
  3. import pytest
  4. from content_agent.integrations.walk_graph_json import (
  5. WalkGraphStore,
  6. _validate_graph,
  7. edge_permission,
  8. edge_supported,
  9. low_budget,
  10. )
  11. def test_graph_has_8_nodes_and_10_edges():
  12. graph = WalkGraphStore().load_graph()
  13. assert len(graph["nodes"]) == 8
  14. assert len(graph["edges"]) == 10
  15. assert {edge["gate"] for edge in graph["edges"]} == {"none", "decision_gated", "keep_only"}
  16. def test_policy_unwraps_pinned_values():
  17. policy = WalkGraphStore().load_policy()
  18. # 2026-06-11 拍板:60 / 1 / deny / halve_min_1(max(1, budget//2))。
  19. assert policy["global"]["max_total_actions_per_run"] == 60
  20. assert policy["global"]["max_reseed_rounds"] == 1
  21. assert policy["edge_permissions"]["KEEP_CONTENT_FOR_REVIEW"]["video_to_hashtag"] == "deny"
  22. assert policy["budget_tiers"]["low_budget"] == "halve_min_1"
  23. assert low_budget(3) == 1
  24. assert low_budget(10) == 5
  25. def test_policy_pins_gemini_cap_and_workers():
  26. # 2026-06-12 拍板(M5):单 run Gemini 调用上限 200、判定并发度 4。
  27. policy_global = WalkGraphStore().load_policy()["global"]
  28. assert policy_global["gemini_calls_per_run_cap"] == 200
  29. assert policy_global["gemini_max_workers"] == 4
  30. def test_policy_edge_budgets_match_decided_values():
  31. # 基线=v1 实际硬限;R7 放宽拍板(2026-06-12):tag 1→3、作者 2→3(真跑顶格实证)。
  32. budgets = WalkGraphStore().load_policy()["edge_budgets_by_id"]
  33. assert budgets["query_next_page"]["max_total_actions"] == 3
  34. assert budgets["author_to_works"]["max_total_actions"] == 3
  35. assert budgets["author_to_works"]["max_works_per_author"] == 3
  36. assert budgets["hashtag_to_query"]["max_total_actions"] == 3
  37. def test_shipinhao_author_edges_blocked_and_terminal_edges_supported():
  38. store = WalkGraphStore()
  39. profile = store.load_profile("shipinhao")
  40. assert not edge_supported(profile, "author_to_works")
  41. assert not edge_supported(profile, "author_work_to_content")
  42. assert edge_supported(profile, "hashtag_to_query")
  43. # 终端边不在 profile(平台无关控制边)→ 恒 supported。
  44. assert edge_supported(profile, "decision_to_asset")
  45. assert all(
  46. edge_supported(store.load_profile("douyin"), edge_id)
  47. for edge_id in ["query_next_page", "author_to_works", "video_to_hashtag"]
  48. )
  49. def test_edge_permission_defaults_to_deny():
  50. policy = WalkGraphStore().load_policy()
  51. assert edge_permission(policy, "ADD_TO_CONTENT_POOL", "author_to_works") == "allow"
  52. assert edge_permission(policy, "KEEP_CONTENT_FOR_REVIEW", "author_to_works") == "allow_low_budget"
  53. assert edge_permission(policy, "REJECT_CONTENT", "author_to_works") == "deny"
  54. assert edge_permission(policy, None, "author_to_works") == "deny"
  55. assert edge_permission(policy, "ADD_TO_CONTENT_POOL", "unknown_edge") == "deny"
  56. def test_unknown_graph_edge_fails_validation():
  57. graph = WalkGraphStore().load_graph()
  58. tampered = {**graph, "edges": [*graph["edges"], {**graph["edges"][0], "edge_id": "made_up_edge"}]}
  59. findings = _validate_graph(tampered, {edge["edge_id"] for edge in graph["edges"]})
  60. assert any(finding["check_id"] == "edge_not_in_catalog" for finding in findings)
  61. with pytest.raises(ValueError):
  62. store = WalkGraphStore()
  63. bad_policy = {**store.load_policy(), "edge_budgets": [{"edge_id": "made_up_edge"}]}
  64. from content_agent.integrations.walk_graph_json import _raise_on_fail, _validate_policy
  65. _raise_on_fail(_validate_policy(bad_policy, {e["edge_id"] for e in graph["edges"]}), "walk policy")