import json from content_agent.run_service import RunService from tests.p1_helpers import FakeQueryVariantClient from tests.replay_harness import replay_case def _start_mock_run(tmp_path): # M3: the mock platform's digg counts fall below the douyin heat floor, so the # plain mock source yields only review decisions (relevance caps at 60) and zero # pooled content_assets — leaving nothing for the decision_to_asset checks to # bite on. Drive a real pooled run via the real_id45 replay corpus (2 pooled + # 2 review under the M3 relevance+heat scoring), then re-attach a RunService to # the same runtime root to validate and tamper the produced lineage. runtime_root = tmp_path / "rt" artifacts = replay_case("real_id45", runtime_root=runtime_root) assert artifacts.state["status"] == "success" assert artifacts.files["final_output.json"]["content_assets"] service = RunService( runtime_root=runtime_root, query_variant_client=FakeQueryVariantClient(), ) return service, artifacts.run_id def test_content_asset_requires_decision_to_asset_path(tmp_path): service, run_id = _start_mock_run(tmp_path) paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl" paths = [ json.loads(line) for line in paths_path.read_text(encoding="utf-8").splitlines() if line.strip() ] paths = [path for path in paths if path["source_path_type"] != "decision_to_asset"] paths_path.write_text( "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "decision_to_asset_missing" for finding in validation["findings"] ) def test_content_asset_must_reference_decision_to_asset_path(tmp_path): service, run_id = _start_mock_run(tmp_path) paths = service.read_jsonl(run_id, "source_path_records.jsonl") final_output_path = service.runtime.run_dir(run_id) / "final_output.json" final_output = json.loads(final_output_path.read_text(encoding="utf-8")) asset = final_output["content_assets"][0] decision_asset_path_ids = { path["source_path_record_id"] for path in paths if path["source_path_type"] == "decision_to_asset" and path["decision_id"] == asset["decision_id"] and path["to_node_id"] == asset["platform_content_id"] } final_output["content_assets"][0]["source_path_record_ids"] = [ path_id for path_id in final_output["content_assets"][0]["source_path_record_ids"] if path_id not in decision_asset_path_ids ] final_output_path.write_text( json.dumps(final_output, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "decision_to_asset_missing" for finding in validation["findings"] ) def test_content_asset_rejects_broken_decision_to_asset_path(tmp_path): service, run_id = _start_mock_run(tmp_path) paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl" paths = [ json.loads(line) for line in paths_path.read_text(encoding="utf-8").splitlines() if line.strip() ] for path in paths: if path["source_path_type"] == "decision_to_asset": path["from_node_id"] = "wrong_decision" break paths_path.write_text( "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "decision_to_asset_broken" for finding in validation["findings"] )