test_p7_lineage_validation.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import json
  2. from content_agent.run_service import RunService
  3. from tests.p1_helpers import FakeQueryVariantClient
  4. from tests.replay_harness import replay_case
  5. def _start_mock_run(tmp_path):
  6. # M3: the mock platform's digg counts fall below the douyin heat floor, so the
  7. # plain mock source yields only review decisions (relevance caps at 60) and zero
  8. # pooled content_assets — leaving nothing for the decision_to_asset checks to
  9. # bite on. Drive a real pooled run via the real_id45 replay corpus (2 pooled +
  10. # 2 review under the M3 relevance+heat scoring), then re-attach a RunService to
  11. # the same runtime root to validate and tamper the produced lineage.
  12. runtime_root = tmp_path / "rt"
  13. artifacts = replay_case("real_id45", runtime_root=runtime_root)
  14. assert artifacts.state["status"] == "success"
  15. assert artifacts.files["final_output.json"]["content_assets"]
  16. service = RunService(
  17. runtime_root=runtime_root,
  18. query_variant_client=FakeQueryVariantClient(),
  19. )
  20. return service, artifacts.run_id
  21. def test_content_asset_requires_decision_to_asset_path(tmp_path):
  22. service, run_id = _start_mock_run(tmp_path)
  23. paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
  24. paths = [
  25. json.loads(line)
  26. for line in paths_path.read_text(encoding="utf-8").splitlines()
  27. if line.strip()
  28. ]
  29. paths = [path for path in paths if path["source_path_type"] != "decision_to_asset"]
  30. paths_path.write_text(
  31. "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
  32. encoding="utf-8",
  33. )
  34. validation = service.validate_run(run_id)
  35. assert validation["status"] == "fail"
  36. assert any(
  37. finding["check_id"] == "decision_to_asset_missing"
  38. for finding in validation["findings"]
  39. )
  40. def test_content_asset_must_reference_decision_to_asset_path(tmp_path):
  41. service, run_id = _start_mock_run(tmp_path)
  42. paths = service.read_jsonl(run_id, "source_path_records.jsonl")
  43. final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
  44. final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
  45. asset = final_output["content_assets"][0]
  46. decision_asset_path_ids = {
  47. path["source_path_record_id"]
  48. for path in paths
  49. if path["source_path_type"] == "decision_to_asset"
  50. and path["decision_id"] == asset["decision_id"]
  51. and path["to_node_id"] == asset["platform_content_id"]
  52. }
  53. final_output["content_assets"][0]["source_path_record_ids"] = [
  54. path_id
  55. for path_id in final_output["content_assets"][0]["source_path_record_ids"]
  56. if path_id not in decision_asset_path_ids
  57. ]
  58. final_output_path.write_text(
  59. json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
  60. encoding="utf-8",
  61. )
  62. validation = service.validate_run(run_id)
  63. assert validation["status"] == "fail"
  64. assert any(
  65. finding["check_id"] == "decision_to_asset_missing"
  66. for finding in validation["findings"]
  67. )
  68. def test_content_asset_rejects_broken_decision_to_asset_path(tmp_path):
  69. service, run_id = _start_mock_run(tmp_path)
  70. paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
  71. paths = [
  72. json.loads(line)
  73. for line in paths_path.read_text(encoding="utf-8").splitlines()
  74. if line.strip()
  75. ]
  76. for path in paths:
  77. if path["source_path_type"] == "decision_to_asset":
  78. path["from_node_id"] = "wrong_decision"
  79. break
  80. paths_path.write_text(
  81. "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
  82. encoding="utf-8",
  83. )
  84. validation = service.validate_run(run_id)
  85. assert validation["status"] == "fail"
  86. assert any(
  87. finding["check_id"] == "decision_to_asset_broken"
  88. for finding in validation["findings"]
  89. )