test_walk_engine_loop.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. from content_agent.business_modules.walk_engine import run_bounded_walk
  2. from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context, set_v4_allow_walk
  3. def _allow_walk(context):
  4. # M8: 游走只对 allow_walk=true 的内容开 tag 边;放行种子决策以驱动游走循环。
  5. for decision in context["rule_decisions"]:
  6. set_v4_allow_walk(decision, True)
  7. def test_walk_engine_runs_bounded_edges_in_same_run_and_policy(tmp_path):
  8. context = build_initial_walk_context(tmp_path)
  9. _allow_walk(context)
  10. client = FakeWalkPlatformClient()
  11. result = run_bounded_walk(platform_client=client, **context)
  12. assert {row["run_id"] for row in result["rule_decisions"]} == {"run_001"}
  13. assert {row["policy_run_id"] for row in result["rule_decisions"]} == {"policy_run_001"}
  14. assert len(result["discovered_content_items"]) > len(context["discovered_content_items"])
  15. decision_ids = [row["decision_id"] for row in result["rule_decisions"]]
  16. assert len(decision_ids) == len(set(decision_ids))
  17. assert any(row["edge_id"] == "hashtag_to_query" for row in result["walk_actions"])
  18. def test_walk_engine_edge_failure_records_failed_action_not_failed_run(tmp_path):
  19. context = build_initial_walk_context(tmp_path)
  20. _allow_walk(context)
  21. client = FakeWalkPlatformClient(fail_tag=True)
  22. result = run_bounded_walk(platform_client=client, **context)
  23. failed_actions = [
  24. row for row in result["walk_actions"]
  25. if row["edge_id"] == "hashtag_to_query" and row["walk_status"] == "failed"
  26. ]
  27. assert failed_actions
  28. assert result["rule_decisions"]