| 1234567891011121314151617181920212223242526272829303132333435363738 |
- from content_agent.business_modules.walk_engine import run_bounded_walk
- from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
- def _allow_walk(context):
- # M8: 游走只对 allow_walk=true 的内容开 tag 边;放行种子决策以驱动游走循环。
- for decision in context["rule_decisions"]:
- set_v4_allow_walk(decision, True)
- def test_walk_engine_runs_bounded_edges_in_same_run_and_policy(tmp_path):
- context = build_initial_walk_context(tmp_path)
- _allow_walk(context)
- client = FakeWalkPlatformClient()
- result = run_bounded_walk(platform_client=client, **context)
- assert {row["run_id"] for row in result["rule_decisions"]} == {"run_001"}
- assert {row["policy_run_id"] for row in result["rule_decisions"]} == {"policy_run_001"}
- assert len(result["discovered_content_items"]) > len(context["discovered_content_items"])
- decision_ids = [row["decision_id"] for row in result["rule_decisions"]]
- assert len(decision_ids) == len(set(decision_ids))
- assert any(row["edge_id"] == "hashtag_to_query" for row in result["walk_actions"])
- def test_walk_engine_edge_failure_records_failed_action_not_failed_run(tmp_path):
- context = build_initial_walk_context(tmp_path)
- _allow_walk(context)
- client = FakeWalkPlatformClient(fail_tag=True)
- result = run_bounded_walk(platform_client=client, **context)
- failed_actions = [
- row for row in result["walk_actions"]
- if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "failed"
- ]
- assert failed_actions
- assert result["rule_decisions"]
|