from fastapi.testclient import TestClient from content_agent import api from content_agent.integrations.mock_platform import MockPlatformClient from content_agent.run_service import RunService from tests.p1_helpers import FakeDemandSource, FakeQueryVariantClient def test_api_runs_and_queries_mock_chain(tmp_path, monkeypatch): monkeypatch.setattr( api, "service", RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ), ) client = TestClient(api.app) response = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"}) assert response.status_code == 200 payload = response.json() run_id = payload["run_id"] assert payload["platform_mode"] == "mock" assert payload["policy_run_id"].startswith("policy_run_") assert payload["policy_bundle_id"] == "douyin_policy_bundle_v1" assert payload["strategy_version"] == "V1" for path in [ f"/runs/{run_id}", f"/runs/{run_id}/discovered-content-items", f"/runs/{run_id}/rule-decisions", f"/runs/{run_id}/source-path-records", f"/runs/{run_id}/final-output", f"/runs/{run_id}/strategy-review", f"/runs/{run_id}/validation", ]: get_response = client.get(path) assert get_response.status_code == 200, path review = client.get(f"/runs/{run_id}/strategy-review").json()["data"] # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分;mock 链热度不足, # 三条内容落复看带(原 1 进池)。 assert review["summary"]["pooled_content_count"] == 0 assert review["summary"]["review_content_count"] == 3 assert review["suggestions"] validation = client.get(f"/runs/{run_id}/validation").json() assert validation["status"] == "pass" summary = client.get(f"/runs/{run_id}").json() assert summary["validation_status"] == "pass" run_list = client.get("/runs").json() assert run_list["total"] == 1 assert run_list["items"][0]["run_id"] == run_id assert run_list["data_origin"] == "runtime_export" dashboard = client.get(f"/runs/{run_id}/dashboard").json() assert dashboard["run_id"] == run_id assert dashboard["data_origin"] == "runtime_export" assert dashboard["counts"]["queries"] >= 1 assert dashboard["runtime_files"] assert dashboard["business_summary"]["query_count"] >= 1 assert {stage["stage_id"] for stage in dashboard["stage_conclusions"]} >= { "source", "query", "platform", "judge", "walk", "asset", "learning", } query_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "query") assert "生成成功" in query_stage["metric"] assert "llm_variant" not in query_stage["metric"] source_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "source") assert source_stage["detail"] == "需求池 ID:1" assert isinstance(dashboard["rule_application_summary"], list) assert "nodes" in dashboard["walk_graph"] assert dashboard["technical_refs"]["runtime_files_url"].endswith("/runtime-files") queries = client.get(f"/runs/{run_id}/queries").json() assert queries["total"] >= 1 assert queries["items"][0]["search_query_id"] content_items = client.get(f"/runs/{run_id}/content-items").json() assert content_items["total"] >= 1 assert "rule_decision" in content_items["items"][0] timeline = client.get(f"/runs/{run_id}/timeline").json() assert timeline["total"] >= 1 assert any(item["source"] == "run_events.jsonl" for item in timeline["items"]) runtime_files = client.get(f"/runs/{run_id}/runtime-files").json() filenames = {item["filename"] for item in runtime_files["files"]} assert "search_queries.jsonl" in filenames runtime_file = client.get(f"/runs/{run_id}/runtime-files/search_queries.jsonl").json() assert runtime_file["records"] assert runtime_file["data_origin"] == "runtime_export" bad_runtime_file = client.get(f"/runs/{run_id}/runtime-files/not_allowed.jsonl") assert bad_runtime_file.status_code == 400 assert bad_runtime_file.json()["detail"]["error_code"] == "INVALID_REQUEST" missing_dashboard = client.get("/runs/not-a-run/dashboard") assert missing_dashboard.status_code == 404 assert missing_dashboard.json()["detail"]["error_code"] == "RUN_NOT_FOUND" def test_api_defaults_to_real_platform_mode_but_can_select_mock(tmp_path, monkeypatch): selected_modes = [] def fake_platform_client(self, platform, platform_mode): selected_modes.append((platform, platform_mode)) return MockPlatformClient() monkeypatch.setattr(RunService, "_platform_client", fake_platform_client) monkeypatch.setattr( api, "service", RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ), ) client = TestClient(api.app) default_response = client.post("/runs", json={}) mock_response = client.post("/runs", json={"platform_mode": "mock"}) assert default_response.status_code == 200 assert default_response.json()["platform_mode"] == "real" assert mock_response.status_code == 200 assert mock_response.json()["platform_mode"] == "mock" assert selected_modes == [("douyin", "real"), ("douyin", "mock")] def test_api_rejects_non_douyin_real_platform(tmp_path, monkeypatch): monkeypatch.setattr( api, "service", RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ), ) client = TestClient(api.app) response = client.post( "/runs", json={"platform": "other_platform", "platform_mode": "real"} ) assert response.status_code == 400 assert response.json()["detail"]["error_code"] == "INVALID_REQUEST" def test_api_returns_partial_success_as_successful_response(tmp_path, monkeypatch): service = RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ) service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient() monkeypatch.setattr(api, "service", service) client = TestClient(api.app) response = client.post("/runs", json={"platform": "douyin", "platform_mode": "real"}) assert response.status_code == 200 payload = response.json() assert payload["status"] == "partial_success" summary = client.get(f"/runs/{payload['run_id']}").json() assert summary["status"] == "partial_success" def test_api_rejects_legacy_run_identifier_field(tmp_path, monkeypatch): monkeypatch.setattr(api, "service", RunService(runtime_root=tmp_path / "runtime" / "v1")) client = TestClient(api.app) legacy_run_key = "tr" + "ace_id" response = client.post( "/runs", json={"platform": "douyin", "platform_mode": "mock", legacy_run_key: "legacy_run_001"}, ) assert response.status_code == 422 class _PartialFailurePlatformClient: def __init__(self) -> None: self.mock = MockPlatformClient() def search(self, search_query: dict) -> list[dict]: if search_query["search_query_id"] == "q_002": raise RuntimeError("temporary platform failure") return self.mock.search(search_query) def test_api_timeline_includes_summary(tmp_path, monkeypatch): monkeypatch.setattr( api, "service", RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ), ) client = TestClient(api.app) run_id = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"}).json()["run_id"] timeline = client.get(f"/runs/{run_id}/timeline").json() summary = timeline["summary"] assert set(summary) == { "total_duration_ms", "stage_duration_ms", "query_failure_count", "platform_rate_limited_count", "decode_status_counts", "error_counts", "walk_status_counts", } assert summary["stage_duration_ms"] assert "stalled" not in timeline and "is_blocked" not in timeline def test_api_config_readonly_endpoints(): client = TestClient(api.app) rule_packs = client.get("/config/rule-packs").json() assert rule_packs["source_file"].endswith("douyin_rule_packs.v1.json") # M4 砍包受控变化:4 future dispatch/binding 已删,仅剩 content。 assert len(rule_packs["data"]["rule_pack_dispatch"]) == 1 assert len(rule_packs["data"]["effect_status_mapping"]) == 5 walk = client.get("/config/walk-strategy").json() assert len(walk["data"]["walk_rule_pack_binding"]) == 1 assert len(walk["data"]["walk_edge_catalog"]) == 10 prompts = client.get("/config/query-prompts").json() assert "douyin/V1" in prompts["data"]["profiles"] policy = client.get("/config/walk-policy").json() assert policy["source_file"].endswith("walk_policy.json") assert "edge_permissions" in policy["data"] assert "edge_budgets" in policy["data"] def test_api_dashboard_rule_summary_effect_status_not_none(tmp_path, monkeypatch): monkeypatch.setattr( api, "service", RunService( runtime_root=tmp_path / "runtime" / "v1", demand_source=FakeDemandSource(), query_variant_client=FakeQueryVariantClient(), ), ) client = TestClient(api.app) run_id = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"}).json()["run_id"] dashboard = client.get(f"/runs/{run_id}/dashboard").json() summaries = dashboard["rule_application_summary"] assert summaries # 修复前 content_effect_status 恒为 None(decision 记录无该字段)。 assert all(row["content_effect_status"] is not None for row in summaries) # walk_graph edge 带归属/执行分离字段。 walk_edges = [e for e in dashboard["walk_graph"]["edges"] if e.get("rule_pack")] assert any("rule_pack_executed" in e for e in walk_edges)