| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- """V3-M4A: WalkGraphStore 读三个游走配置 JSON + 校验 + 拍板值解包。"""
- from __future__ import annotations
- import pytest
- from content_agent.integrations.walk_graph_json import (
- WalkGraphStore,
- _validate_graph,
- edge_permission,
- edge_supported,
- low_budget,
- )
- def test_graph_has_8_nodes_and_10_edges():
- graph = WalkGraphStore().load_graph()
- assert len(graph["nodes"]) == 8
- assert len(graph["edges"]) == 10
- assert {edge["gate"] for edge in graph["edges"]} == {"none", "decision_gated", "keep_only"}
- def test_policy_unwraps_pinned_values():
- policy = WalkGraphStore().load_policy()
- # 2026-06-11 拍板:60 / 1 / deny / halve_min_1(max(1, budget//2))。
- assert policy["global"]["max_total_actions_per_run"] == 60
- assert policy["global"]["max_reseed_rounds"] == 1
- assert policy["edge_permissions"]["KEEP_CONTENT_FOR_REVIEW"]["video_to_hashtag"] == "deny"
- assert policy["budget_tiers"]["low_budget"] == "halve_min_1"
- assert low_budget(3) == 1
- assert low_budget(10) == 5
- def test_policy_pins_gemini_cap_and_workers():
- # 2026-06-12 拍板(M5):单 run Gemini 调用上限 200、判定并发度 4。
- policy_global = WalkGraphStore().load_policy()["global"]
- assert policy_global["gemini_calls_per_run_cap"] == 200
- assert policy_global["gemini_max_workers"] == 4
- def test_policy_edge_budgets_match_decided_values():
- # 基线=v1 实际硬限;R7 放宽拍板(2026-06-12):tag 1→3、作者 2→3(真跑顶格实证)。
- budgets = WalkGraphStore().load_policy()["edge_budgets_by_id"]
- assert budgets["query_next_page"]["max_total_actions"] == 3
- assert budgets["author_to_works"]["max_total_actions"] == 3
- assert budgets["author_to_works"]["max_works_per_author"] == 3
- assert budgets["hashtag_to_query"]["max_total_actions"] == 3
- def test_shipinhao_author_edges_blocked_and_terminal_edges_supported():
- store = WalkGraphStore()
- profile = store.load_profile("shipinhao")
- assert not edge_supported(profile, "author_to_works")
- assert not edge_supported(profile, "author_work_to_content")
- assert edge_supported(profile, "hashtag_to_query")
- # 终端边不在 profile(平台无关控制边)→ 恒 supported。
- assert edge_supported(profile, "decision_to_asset")
- assert all(
- edge_supported(store.load_profile("douyin"), edge_id)
- for edge_id in ["query_next_page", "author_to_works", "video_to_hashtag"]
- )
- def test_edge_permission_defaults_to_deny():
- policy = WalkGraphStore().load_policy()
- assert edge_permission(policy, "ADD_TO_CONTENT_POOL", "author_to_works") == "allow"
- assert edge_permission(policy, "KEEP_CONTENT_FOR_REVIEW", "author_to_works") == "allow_low_budget"
- assert edge_permission(policy, "REJECT_CONTENT", "author_to_works") == "deny"
- assert edge_permission(policy, None, "author_to_works") == "deny"
- assert edge_permission(policy, "ADD_TO_CONTENT_POOL", "unknown_edge") == "deny"
- def test_unknown_graph_edge_fails_validation():
- graph = WalkGraphStore().load_graph()
- tampered = {**graph, "edges": [*graph["edges"], {**graph["edges"][0], "edge_id": "made_up_edge"}]}
- findings = _validate_graph(tampered, {edge["edge_id"] for edge in graph["edges"]})
- assert any(finding["check_id"] == "edge_not_in_catalog" for finding in findings)
- with pytest.raises(ValueError):
- store = WalkGraphStore()
- bad_policy = {**store.load_policy(), "edge_budgets": [{"edge_id": "made_up_edge"}]}
- from content_agent.integrations.walk_graph_json import _raise_on_fail, _validate_policy
- _raise_on_fail(_validate_policy(bad_policy, {e["edge_id"] for e in graph["edges"]}), "walk policy")
|