from content_agent.business_modules.walk_engine import run_bounded_walk from tests.p6_walk_helpers import FakeWalkPlatformClient, build_initial_walk_context def _pool_decisions(context): # M3: mock judgment scores 60 (review/pending); the query_next_page edge needs a # pooled success query, so promote the seed decision to pool to exercise the loop. for decision in context["rule_decisions"]: decision["decision_action"] = "ADD_TO_CONTENT_POOL" decision["search_query_effect_status"] = "success" def test_walk_engine_runs_bounded_edges_in_same_run_and_policy(tmp_path): context = build_initial_walk_context(tmp_path) _pool_decisions(context) client = FakeWalkPlatformClient() result = run_bounded_walk(platform_client=client, **context) assert {row["run_id"] for row in result["rule_decisions"]} == {"run_001"} assert {row["policy_run_id"] for row in result["rule_decisions"]} == {"policy_run_001"} assert len(result["discovered_content_items"]) > len(context["discovered_content_items"]) decision_ids = [row["decision_id"] for row in result["rule_decisions"]] assert len(decision_ids) == len(set(decision_ids)) assert any(row["edge_id"] == "query_next_page" for row in result["walk_actions"]) def test_walk_engine_edge_failure_records_failed_action_not_failed_run(tmp_path): context = build_initial_walk_context(tmp_path) _pool_decisions(context) client = FakeWalkPlatformClient(fail_next_page=True) result = run_bounded_walk(platform_client=client, **context) failed_actions = [ row for row in result["walk_actions"] if row["edge_id"] == "query_next_page" and row["walk_status"] == "failed" ] assert failed_actions assert result["rule_decisions"]