| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import json
- from content_agent.run_service import RunService
- from tests.p1_helpers import FakeQueryVariantClient
- from tests.replay_harness import replay_case
- def _start_mock_run(tmp_path):
- # M3: the mock platform's digg counts fall below the douyin heat floor, so the
- # plain mock source yields only review decisions (relevance caps at 60) and zero
- # pooled content_assets — leaving nothing for the decision_to_asset checks to
- # bite on. Drive a real pooled run via the real_id45 replay corpus (2 pooled +
- # 2 review under the M3 relevance+heat scoring), then re-attach a RunService to
- # the same runtime root to validate and tamper the produced lineage.
- runtime_root = tmp_path / "rt"
- artifacts = replay_case("real_id45", runtime_root=runtime_root)
- assert artifacts.state["status"] == "success"
- assert artifacts.files["final_output.json"]["content_assets"]
- service = RunService(
- runtime_root=runtime_root,
- query_variant_client=FakeQueryVariantClient(),
- )
- return service, artifacts.run_id
- def test_content_asset_requires_decision_to_asset_path(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
- paths = [
- json.loads(line)
- for line in paths_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- paths = [path for path in paths if path["source_path_type"] != "decision_to_asset"]
- paths_path.write_text(
- "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "decision_to_asset_missing"
- for finding in validation["findings"]
- )
- def test_content_asset_must_reference_decision_to_asset_path(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- paths = service.read_jsonl(run_id, "source_path_records.jsonl")
- final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
- final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
- asset = final_output["content_assets"][0]
- decision_asset_path_ids = {
- path["source_path_record_id"]
- for path in paths
- if path["source_path_type"] == "decision_to_asset"
- and path["decision_id"] == asset["decision_id"]
- and path["to_node_id"] == asset["platform_content_id"]
- }
- final_output["content_assets"][0]["source_path_record_ids"] = [
- path_id
- for path_id in final_output["content_assets"][0]["source_path_record_ids"]
- if path_id not in decision_asset_path_ids
- ]
- final_output_path.write_text(
- json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "decision_to_asset_missing"
- for finding in validation["findings"]
- )
- def test_content_asset_rejects_broken_decision_to_asset_path(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
- paths = [
- json.loads(line)
- for line in paths_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- for path in paths:
- if path["source_path_type"] == "decision_to_asset":
- path["from_node_id"] = "wrong_decision"
- break
- paths_path.write_text(
- "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "decision_to_asset_broken"
- for finding in validation["findings"]
- )
|