import json from pathlib import Path from content_agent.errors import ErrorCode from content_agent.integrations.runtime_files import LocalRuntimeFileStore from content_agent.run_service import RunService from content_agent.schemas import RunStartRequest from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE def _start_mock_run(tmp_path, **kwargs): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) kwargs.setdefault("source", str(REAL_SOURCE_FIXTURE)) state = service.start_run(RunStartRequest(platform_mode="mock", **kwargs)) assert state["status"] == "success" return service, state["run_id"] def test_runtime_files_are_parseable_and_consistent(tmp_path): service, run_id = _start_mock_run(tmp_path) run_dir = service.runtime.run_dir(run_id) for json_file in [ "source_context.json", "pattern_seed_pack.json", "final_output.json", "strategy_review.json", ]: data = json.loads((run_dir / json_file).read_text(encoding="utf-8")) assert data["schema_version"] == "runtime_record.v1" for jsonl_file in [ "search_queries.jsonl", "discovered_content_items.jsonl", "content_media_records.jsonl", "pattern_recall_evidence.jsonl", "rule_decisions.jsonl", "walk_actions.jsonl", "run_events.jsonl", "source_path_records.jsonl", "search_clues.jsonl", ]: for line in (run_dir / jsonl_file).read_text(encoding="utf-8").splitlines(): json.loads(line) items = service.read_jsonl(run_id, "discovered_content_items.jsonl") decisions = service.read_jsonl(run_id, "rule_decisions.jsonl") paths = service.read_jsonl(run_id, "source_path_records.jsonl") final_output = service.read_json(run_id, "final_output.json") source_context = service.read_json(run_id, "source_context.json") policy_run_id = service.read_json(run_id, "pattern_seed_pack.json")["policy_run_id"] content_ids = {item["platform_content_id"] for item in items} decision_target_ids = {decision["decision_target_id"] for decision in decisions} assert content_ids == decision_target_ids media_records = service.read_jsonl(run_id, "content_media_records.jsonl") recall_evidence = service.read_jsonl(run_id, "pattern_recall_evidence.jsonl") walk_actions = service.read_jsonl(run_id, "walk_actions.jsonl") for row in [*items, *media_records, *recall_evidence, *decisions, *walk_actions, *paths]: assert row["policy_run_id"] == policy_run_id assert row["record_schema_version"] == "runtime_record.v1" assert row["raw_payload"]["run_id"] == run_id assert {row["walk_status"] for row in walk_actions} <= { "success", "pending", "failed", "skipped", "rule_blocked", } assert all(row["walk_action_id"].startswith("wa_") for row in walk_actions) for media_record in media_records: assert media_record["platform"] == "douyin" assert final_output["policy_run_id"] == policy_run_id assert final_output["policy"]["policy_bundle_id"] == "douyin_policy_bundle_v1" assert final_output["policy"]["rule_pack_source_ref"]["file"].endswith( "douyin_rule_packs.v1.json" ) assert final_output["walk_strategy"]["walk_strategy_version"] == "V1.0" assert "policy_run_id" not in source_context assert {decision["decision_action"] for decision in decisions} <= { "ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW", "REJECT_CONTENT", } assert {decision["search_query_effect_status"] for decision in decisions} <= { "success", "pending", "failed", "rule_blocked", } search_clues = service.read_jsonl(run_id, "search_clues.jsonl") assert {clue["search_query_effect_status"] for clue in search_clues} <= { "success", "pending", "failed", "rule_blocked", } path_ids = {path["source_path_record_id"] for path in paths} for asset in final_output["content_assets"]: assert set(asset["source_path_record_ids"]) <= path_ids evidence_pack = source_context["ext_data"]["evidence_pack"] assert evidence_pack["pattern_source_system"] == "pg_pattern_v2" assert evidence_pack["pattern_execution_id"] == 581 assert evidence_pack["mining_config_id"] == 2082 assert evidence_pack["itemset_ids"] == [1608352] validation = service.validate_run(run_id) assert validation["status"] == "pass" def test_all_platform_query_failure_writes_failed_query_runtime_records(tmp_path): service = RunService( runtime_root=tmp_path / "runtime" / "v1", query_variant_client=FakeQueryVariantClient(), ) service._platform_client = lambda platform, platform_mode: _AllFailurePlatformClient() state = service.start_run( RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE)) ) assert state["status"] == "failed" assert state["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value search_queries = service.read_jsonl(state["run_id"], "search_queries.jsonl") search_clues = service.read_jsonl(state["run_id"], "search_clues.jsonl") run_events = service.read_jsonl(state["run_id"], "run_events.jsonl") assert len(search_queries) == len(state["error_detail"]["query_failures"]) assert {query["search_query_effect_status"] for query in search_queries} == {"failed"} assert all(query["raw_payload"]["query_failure"]["status"] == "failed" for query in search_queries) assert {clue["search_query_effect_status"] for clue in search_clues} == {"failed"} assert {clue["result_count"] for clue in search_clues} == {0} assert {clue["query_aggregation_id"] for clue in search_clues} == {"platform_query_failure"} platform_failures = [ event for event in run_events if event["event_type"] == "platform_query_failed" ] assert len(platform_failures) == len(search_queries) assert {event["error_code"] for event in platform_failures} == { ErrorCode.PLATFORM_REQUEST_FAILED.value } def test_runtime_validation_catches_summary_drift(tmp_path): service, run_id = _start_mock_run(tmp_path) final_output_path = service.runtime.run_dir(run_id) / "final_output.json" final_output = json.loads(final_output_path.read_text(encoding="utf-8")) final_output["summary"]["pooled_content_count"] = 99 final_output_path.write_text( json.dumps(final_output, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any(finding["check_id"] == "summary_mismatch" for finding in validation["findings"]) def test_local_runtime_replaces_pattern_recall_evidence_by_id(tmp_path): runtime = LocalRuntimeFileStore(tmp_path / "runtime") runtime.prepare_run("run_001") runtime.append_jsonl( "run_001", "pattern_recall_evidence.jsonl", [ { "run_id": "run_001", "policy_run_id": "policy_001", "recall_evidence_id": "recall_001", "recall_status": "pending", } ], ) runtime.append_jsonl( "run_001", "pattern_recall_evidence.jsonl", [ { "run_id": "run_001", "policy_run_id": "policy_001", "recall_evidence_id": "recall_001", "recall_status": "matched", } ], ) rows = runtime.read_jsonl("run_001", "pattern_recall_evidence.jsonl") assert len(rows) == 1 assert rows[0]["recall_status"] == "matched" def test_runtime_validation_catches_missing_policy_run_id(tmp_path): service, run_id = _start_mock_run(tmp_path) decisions_path = service.runtime.run_dir(run_id) / "rule_decisions.jsonl" decisions = [ json.loads(line) for line in decisions_path.read_text(encoding="utf-8").splitlines() if line.strip() ] decisions[0].pop("policy_run_id") decisions_path.write_text( "".join( json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n" for decision in decisions ), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any(finding["check_id"] == "policy_run_id_mismatch" for finding in validation["findings"]) def test_runtime_validation_catches_missing_record_schema_version(tmp_path): service, run_id = _start_mock_run(tmp_path) queries_path = service.runtime.run_dir(run_id) / "search_queries.jsonl" queries = [ json.loads(line) for line in queries_path.read_text(encoding="utf-8").splitlines() if line.strip() ] queries[0].pop("record_schema_version") queries_path.write_text( "".join( json.dumps(query, ensure_ascii=False, separators=(",", ":")) + "\n" for query in queries ), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "record_schema_version_missing" for finding in validation["findings"] ) class _AllFailurePlatformClient: def search(self, search_query: dict) -> list[dict]: raise RuntimeError("platform unavailable") def test_runtime_validation_catches_missing_raw_payload(tmp_path): service, run_id = _start_mock_run(tmp_path) media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl" media_records = [ json.loads(line) for line in media_path.read_text(encoding="utf-8").splitlines() if line.strip() ] media_records[0].pop("raw_payload") media_path.write_text( "".join( json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n" for media_record in media_records ), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any(finding["check_id"] == "raw_payload_missing" for finding in validation["findings"]) def test_runtime_validation_catches_forbidden_raw_payload_key(tmp_path): service, run_id = _start_mock_run(tmp_path) media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl" media_records = [ json.loads(line) for line in media_path.read_text(encoding="utf-8").splitlines() if line.strip() ] media_records[0]["raw_payload"]["secret"] = "should_not_be_stored" media_path.write_text( "".join( json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n" for media_record in media_records ), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "raw_payload_forbidden_key" for finding in validation["findings"] ) def test_runtime_validation_catches_missing_pattern_recall_evidence(tmp_path): service, run_id = _start_mock_run(tmp_path) items_path = service.runtime.run_dir(run_id) / "discovered_content_items.jsonl" items = [ json.loads(line) for line in items_path.read_text(encoding="utf-8").splitlines() if line.strip() ] items[0]["pattern_match_result"].pop("pattern_recall_evidence_id", None) items_path.write_text( "".join( json.dumps(item, ensure_ascii=False, separators=(",", ":")) + "\n" for item in items ), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "pattern_recall_evidence_missing" for finding in validation["findings"] ) def test_runtime_validation_allows_missing_decode_case_ids_in_source_evidence(tmp_path): service, run_id = _start_mock_run(tmp_path) run_dir = service.runtime.run_dir(run_id) decisions_path = run_dir / "rule_decisions.jsonl" decisions = [ json.loads(line) for line in decisions_path.read_text(encoding="utf-8").splitlines() if line.strip() ] for decision in decisions: decision["source_evidence"].pop("decode_case_ids", None) decisions_path.write_text( "".join( json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n" for decision in decisions ), encoding="utf-8", ) final_output_path = run_dir / "final_output.json" final_output = json.loads(final_output_path.read_text(encoding="utf-8")) for section in ["content_assets", "reject_records", "decision_records"]: for row in final_output.get(section, []): row.get("source_evidence", {}).pop("decode_case_ids", None) final_output_path.write_text( json.dumps(final_output, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "pass" def test_runtime_validation_catches_missing_final_decision_record(tmp_path): service, run_id = _start_mock_run(tmp_path) final_output_path = service.runtime.run_dir(run_id) / "final_output.json" final_output = json.loads(final_output_path.read_text(encoding="utf-8")) final_output["decision_records"] = final_output["decision_records"][:-1] final_output_path.write_text( json.dumps(final_output, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any(finding["check_id"] == "final_decision_missing" for finding in validation["findings"]) def test_runtime_validation_catches_platform_content_id_source_pollution(tmp_path): service, run_id = _start_mock_run(tmp_path) final_output_path = service.runtime.run_dir(run_id) / "final_output.json" final_output = json.loads(final_output_path.read_text(encoding="utf-8")) source_evidence = final_output["decision_records"][0]["source_evidence"] source_evidence["source_post_id"] = source_evidence["discovered_platform_content_id"] final_output_path.write_text( json.dumps(final_output, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any( finding["check_id"] == "source_evidence_content_pollution" for finding in validation["findings"] ) def test_runtime_validation_catches_reject_source_path_break(tmp_path): service, run_id = _start_mock_run(tmp_path) paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl" paths = [ json.loads(line) for line in paths_path.read_text(encoding="utf-8").splitlines() if line.strip() ] paths = [ path for path in paths if not ( path.get("source_path_type") == "search_query_to_content" and path.get("to_node_id") == "7390000000000000099" ) ] paths_path.write_text( "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths), encoding="utf-8", ) validation = service.validate_run(run_id) assert validation["status"] == "fail" assert any(finding["check_id"] == "source_path_broken" for finding in validation["findings"]) def test_real_source_fixture_keeps_upstream_evidence_pack(tmp_path): source_path = Path("tests/fixtures/real_case_source/source_context.json") service, run_id = _start_mock_run(tmp_path, source=str(source_path)) source_context = service.read_json(run_id, "source_context.json") evidence_pack = source_context["ext_data"]["evidence_pack"] decisions = service.read_jsonl(run_id, "rule_decisions.jsonl") assert evidence_pack["pattern_source_system"] == "pg_pattern_v2" assert evidence_pack["source_certainty"] == "db_validated" assert evidence_pack["validation_status"] == "passed" assert evidence_pack["source_post_id"] == "51978710" assert evidence_pack["pattern_execution_id"] == 581 assert evidence_pack["mining_config_id"] == 2082 assert evidence_pack["itemset_ids"] == [1608352] assert evidence_pack["upstream_run_id"] == "f405f129-3341-4f4a-98e6-fd3f73632adb" assert evidence_pack["support"] == 0.0045734552921411235 assert evidence_pack["absolute_support"] == 49 assert evidence_pack["decode_case_ids"] == [] assert decisions[0]["source_evidence"]["source_certainty"] == "db_validated" assert ( decisions[0]["source_evidence"]["discovered_platform_content_id"] not in evidence_pack["matched_post_ids"] ) assert service.validate_run(run_id)["status"] == "pass" def test_demand_content_json_array_source_is_adapted_to_source_context(tmp_path): source_path = tmp_path / "demand_content.json" source_path.write_text( json.dumps( [ { "id": 123, "merge_leve2": "PG Pattern V2 需求测试", "name": "爱国情感,人物故事", "suggestion": None, "score": 1.0, "dt": "20260604", "ext_data": { "evidence_pack": { "source_kind": "pattern_itemset", "pattern_source_system": "pg_pattern_v2", "case_id_type": "post_id", "source_post_id": "51978710", "pattern_execution_id": 581, "mining_config_id": 2082, "itemset_ids": [1608352], "itemset_items": [{"itemset_id": 1608352}], "category_bindings": [{"category_id": 76006}], "element_bindings": [{"category_id": 76006}], "support": 0.0045734552921411235, "absolute_support": 49, "matched_post_ids": ["51978710"], "video_ids": ["51978710"], "case_ids": ["51978710"], "decode_case_ids": [], "seed_terms": ["爱国情感", "人物故事"], "source_certainty": "db_validated", "validation_status": "passed", } }, } ], ensure_ascii=False, ), encoding="utf-8", ) service, run_id = _start_mock_run(tmp_path, source=str(source_path)) source_context = service.read_json(run_id, "source_context.json") evidence_pack = source_context["ext_data"]["evidence_pack"] assert source_context["demand_content_id"] == "123" assert evidence_pack["pattern_source_system"] == "pg_pattern_v2" assert evidence_pack["pattern_execution_id"] == 581 assert evidence_pack["mining_config_id"] == 2082 assert evidence_pack["itemset_ids"] == [1608352] assert service.validate_run(run_id)["status"] == "pass" def test_old_mysql_source_system_is_rejected(tmp_path): source_path = tmp_path / "source_context.json" source_path.write_text( json.dumps( { "run_id": "old_run", "demand_content_id": "old", "merge_leve2": "历史样例", "name": "旧 MySQL 样例", "ext_data": { "evidence_pack": { "source_kind": "pattern_itemset", "pattern_source_system": "mysql_topic_pattern", "case_id_type": "post_id", "source_post_id": "51978710", "pattern_execution_id": 581, "mining_config_id": 2082, "itemset_ids": [1608352], "itemset_items": [{"itemset_id": 1608352}], "category_bindings": [{"category_id": 76006}], "support": 0.0045734552921411235, "absolute_support": 49, "matched_post_ids": ["51978710"], "video_ids": ["51978710"], "case_ids": ["51978710"], "decode_case_ids": [], "seed_terms": ["爱国情感", "人物故事"], "source_certainty": "db_validated", "validation_status": "passed", } }, }, ensure_ascii=False, ), encoding="utf-8", ) service = RunService(runtime_root=tmp_path / "runtime" / "v1") state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path))) assert state["status"] == "failed" assert state["error_code"] == ErrorCode.INVALID_SOURCE.value assert state["errors"] == ["invalid source"]