| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- from fastapi.testclient import TestClient
- from content_agent import api
- from content_agent.integrations.mock_platform import MockPlatformClient
- from content_agent.run_service import RunService
- from tests.p1_helpers import FakeDemandSource, FakeQueryVariantClient
- def test_api_runs_and_queries_mock_chain(tmp_path, monkeypatch):
- monkeypatch.setattr(
- api,
- "service",
- RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- ),
- )
- client = TestClient(api.app)
- response = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"})
- assert response.status_code == 200
- payload = response.json()
- run_id = payload["run_id"]
- assert payload["platform_mode"] == "mock"
- assert payload["policy_run_id"].startswith("policy_run_")
- assert payload["policy_bundle_id"] == "douyin_policy_bundle_v1"
- assert payload["strategy_version"] == "V1"
- for path in [
- f"/runs/{run_id}",
- f"/runs/{run_id}/discovered-content-items",
- f"/runs/{run_id}/rule-decisions",
- f"/runs/{run_id}/source-path-records",
- f"/runs/{run_id}/final-output",
- f"/runs/{run_id}/strategy-review",
- f"/runs/{run_id}/validation",
- ]:
- get_response = client.get(path)
- assert get_response.status_code == 200, path
- review = client.get(f"/runs/{run_id}/strategy-review").json()["data"]
- # M3 受控变化: 画像门槛退役,改 Gemini 相关性 + 平台热度打分;mock 链热度不足,
- # 三条内容落复看带(原 1 进池)。
- assert review["summary"]["pooled_content_count"] == 0
- assert review["summary"]["review_content_count"] == 3
- assert review["suggestions"]
- validation = client.get(f"/runs/{run_id}/validation").json()
- assert validation["status"] == "pass"
- summary = client.get(f"/runs/{run_id}").json()
- assert summary["validation_status"] == "pass"
- run_list = client.get("/runs").json()
- assert run_list["total"] == 1
- assert run_list["items"][0]["run_id"] == run_id
- assert run_list["data_origin"] == "runtime_export"
- dashboard = client.get(f"/runs/{run_id}/dashboard").json()
- assert dashboard["run_id"] == run_id
- assert dashboard["data_origin"] == "runtime_export"
- assert dashboard["counts"]["queries"] >= 1
- assert dashboard["runtime_files"]
- assert dashboard["business_summary"]["query_count"] >= 1
- assert {stage["stage_id"] for stage in dashboard["stage_conclusions"]} >= {
- "source",
- "query",
- "platform",
- "judge",
- "walk",
- "asset",
- "learning",
- }
- query_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "query")
- assert "生成成功" in query_stage["metric"]
- assert "llm_variant" not in query_stage["metric"]
- source_stage = next(stage for stage in dashboard["stage_conclusions"] if stage["stage_id"] == "source")
- assert source_stage["detail"] == "需求池 ID:1"
- assert isinstance(dashboard["rule_application_summary"], list)
- assert "nodes" in dashboard["walk_graph"]
- assert dashboard["technical_refs"]["runtime_files_url"].endswith("/runtime-files")
- queries = client.get(f"/runs/{run_id}/queries").json()
- assert queries["total"] >= 1
- assert queries["items"][0]["search_query_id"]
- content_items = client.get(f"/runs/{run_id}/content-items").json()
- assert content_items["total"] >= 1
- assert "rule_decision" in content_items["items"][0]
- timeline = client.get(f"/runs/{run_id}/timeline").json()
- assert timeline["total"] >= 1
- assert any(item["source"] == "run_events.jsonl" for item in timeline["items"])
- runtime_files = client.get(f"/runs/{run_id}/runtime-files").json()
- filenames = {item["filename"] for item in runtime_files["files"]}
- assert "search_queries.jsonl" in filenames
- runtime_file = client.get(f"/runs/{run_id}/runtime-files/search_queries.jsonl").json()
- assert runtime_file["records"]
- assert runtime_file["data_origin"] == "runtime_export"
- bad_runtime_file = client.get(f"/runs/{run_id}/runtime-files/not_allowed.jsonl")
- assert bad_runtime_file.status_code == 400
- assert bad_runtime_file.json()["detail"]["error_code"] == "INVALID_REQUEST"
- missing_dashboard = client.get("/runs/not-a-run/dashboard")
- assert missing_dashboard.status_code == 404
- assert missing_dashboard.json()["detail"]["error_code"] == "RUN_NOT_FOUND"
- def test_api_defaults_to_real_platform_mode_but_can_select_mock(tmp_path, monkeypatch):
- selected_modes = []
- def fake_platform_client(self, platform, platform_mode):
- selected_modes.append((platform, platform_mode))
- return MockPlatformClient()
- monkeypatch.setattr(RunService, "_platform_client", fake_platform_client)
- monkeypatch.setattr(
- api,
- "service",
- RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- ),
- )
- client = TestClient(api.app)
- default_response = client.post("/runs", json={})
- mock_response = client.post("/runs", json={"platform_mode": "mock"})
- assert default_response.status_code == 200
- assert default_response.json()["platform_mode"] == "real"
- assert mock_response.status_code == 200
- assert mock_response.json()["platform_mode"] == "mock"
- assert selected_modes == [("douyin", "real"), ("douyin", "mock")]
- def test_api_rejects_non_douyin_real_platform(tmp_path, monkeypatch):
- monkeypatch.setattr(
- api,
- "service",
- RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- ),
- )
- client = TestClient(api.app)
- response = client.post(
- "/runs", json={"platform": "other_platform", "platform_mode": "real"}
- )
- assert response.status_code == 400
- assert response.json()["detail"]["error_code"] == "INVALID_REQUEST"
- def test_api_returns_partial_success_as_successful_response(tmp_path, monkeypatch):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- )
- service._platform_client = lambda platform, platform_mode: _PartialFailurePlatformClient()
- monkeypatch.setattr(api, "service", service)
- client = TestClient(api.app)
- response = client.post("/runs", json={"platform": "douyin", "platform_mode": "real"})
- assert response.status_code == 200
- payload = response.json()
- assert payload["status"] == "partial_success"
- summary = client.get(f"/runs/{payload['run_id']}").json()
- assert summary["status"] == "partial_success"
- def test_api_rejects_legacy_run_identifier_field(tmp_path, monkeypatch):
- monkeypatch.setattr(api, "service", RunService(runtime_root=tmp_path / "runtime" / "v1"))
- client = TestClient(api.app)
- legacy_run_key = "tr" + "ace_id"
- response = client.post(
- "/runs",
- json={"platform": "douyin", "platform_mode": "mock", legacy_run_key: "legacy_run_001"},
- )
- assert response.status_code == 422
- class _PartialFailurePlatformClient:
- def __init__(self) -> None:
- self.mock = MockPlatformClient()
- def search(self, search_query: dict) -> list[dict]:
- if search_query["search_query_id"] == "q_002":
- raise RuntimeError("temporary platform failure")
- return self.mock.search(search_query)
- def test_api_timeline_includes_summary(tmp_path, monkeypatch):
- monkeypatch.setattr(
- api,
- "service",
- RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- ),
- )
- client = TestClient(api.app)
- run_id = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"}).json()["run_id"]
- timeline = client.get(f"/runs/{run_id}/timeline").json()
- summary = timeline["summary"]
- assert set(summary) == {
- "total_duration_ms",
- "stage_duration_ms",
- "query_failure_count",
- "platform_rate_limited_count",
- "decode_status_counts",
- "error_counts",
- "walk_status_counts",
- }
- assert summary["stage_duration_ms"]
- assert "stalled" not in timeline and "is_blocked" not in timeline
- def test_api_config_readonly_endpoints():
- client = TestClient(api.app)
- rule_packs = client.get("/config/rule-packs").json()
- assert rule_packs["source_file"].endswith("douyin_rule_packs.v1.json")
- # M4 砍包受控变化:4 future dispatch/binding 已删,仅剩 content。
- assert len(rule_packs["data"]["rule_pack_dispatch"]) == 1
- assert len(rule_packs["data"]["effect_status_mapping"]) == 5
- walk = client.get("/config/walk-strategy").json()
- assert len(walk["data"]["walk_rule_pack_binding"]) == 1
- assert len(walk["data"]["walk_edge_catalog"]) == 10
- prompts = client.get("/config/query-prompts").json()
- assert "douyin/V1" in prompts["data"]["profiles"]
- policy = client.get("/config/walk-policy").json()
- assert policy["source_file"].endswith("walk_policy.json")
- assert "edge_permissions" in policy["data"]
- assert "edge_budgets" in policy["data"]
- def test_api_dashboard_rule_summary_effect_status_not_none(tmp_path, monkeypatch):
- monkeypatch.setattr(
- api,
- "service",
- RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- demand_source=FakeDemandSource(),
- query_variant_client=FakeQueryVariantClient(),
- ),
- )
- client = TestClient(api.app)
- run_id = client.post("/runs", json={"platform": "douyin", "platform_mode": "mock"}).json()["run_id"]
- dashboard = client.get(f"/runs/{run_id}/dashboard").json()
- summaries = dashboard["rule_application_summary"]
- assert summaries
- # 修复前 content_effect_status 恒为 None(decision 记录无该字段)。
- assert all(row["content_effect_status"] is not None for row in summaries)
- # walk_graph edge 带归属/执行分离字段。
- walk_edges = [e for e in dashboard["walk_graph"]["edges"] if e.get("rule_pack")]
- assert any("rule_pack_executed" in e for e in walk_edges)
|