| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548 |
- import json
- from pathlib import Path
- from content_agent.errors import ErrorCode
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
- def _start_mock_run(tmp_path, **kwargs):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- query_variant_client=FakeQueryVariantClient(),
- )
- kwargs.setdefault("source", str(REAL_SOURCE_FIXTURE))
- state = service.start_run(RunStartRequest(platform_mode="mock", **kwargs))
- assert state["status"] == "success"
- return service, state["run_id"]
- def test_runtime_files_are_parseable_and_consistent(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- run_dir = service.runtime.run_dir(run_id)
- for json_file in [
- "source_context.json",
- "pattern_seed_pack.json",
- "final_output.json",
- "strategy_review.json",
- ]:
- data = json.loads((run_dir / json_file).read_text(encoding="utf-8"))
- assert data["schema_version"] == "runtime_record.v1"
- for jsonl_file in [
- "search_queries.jsonl",
- "discovered_content_items.jsonl",
- "content_media_records.jsonl",
- "pattern_recall_evidence.jsonl",
- "rule_decisions.jsonl",
- "walk_actions.jsonl",
- "run_events.jsonl",
- "source_path_records.jsonl",
- "search_clues.jsonl",
- ]:
- for line in (run_dir / jsonl_file).read_text(encoding="utf-8").splitlines():
- json.loads(line)
- items = service.read_jsonl(run_id, "discovered_content_items.jsonl")
- decisions = service.read_jsonl(run_id, "rule_decisions.jsonl")
- paths = service.read_jsonl(run_id, "source_path_records.jsonl")
- final_output = service.read_json(run_id, "final_output.json")
- source_context = service.read_json(run_id, "source_context.json")
- policy_run_id = service.read_json(run_id, "pattern_seed_pack.json")["policy_run_id"]
- content_ids = {item["platform_content_id"] for item in items}
- decision_target_ids = {decision["decision_target_id"] for decision in decisions}
- assert content_ids == decision_target_ids
- media_records = service.read_jsonl(run_id, "content_media_records.jsonl")
- recall_evidence = service.read_jsonl(run_id, "pattern_recall_evidence.jsonl")
- walk_actions = service.read_jsonl(run_id, "walk_actions.jsonl")
- for row in [*items, *media_records, *recall_evidence, *decisions, *walk_actions, *paths]:
- assert row["policy_run_id"] == policy_run_id
- assert row["record_schema_version"] == "runtime_record.v1"
- assert row["raw_payload"]["run_id"] == run_id
- assert {row["walk_status"] for row in walk_actions} <= {
- "success",
- "pending",
- "failed",
- "skipped",
- "rule_blocked",
- }
- assert all(row["walk_action_id"].startswith("wa_") for row in walk_actions)
- for media_record in media_records:
- assert media_record["platform"] == "douyin"
- assert final_output["policy_run_id"] == policy_run_id
- assert final_output["policy"]["policy_bundle_id"] == "douyin_policy_bundle_v1"
- assert final_output["policy"]["rule_pack_source_ref"]["file"].endswith(
- "douyin_rule_packs.v1.json"
- )
- assert final_output["walk_strategy"]["walk_strategy_version"] == "V1.0"
- assert "policy_run_id" not in source_context
- assert {decision["decision_action"] for decision in decisions} <= {
- "ADD_TO_CONTENT_POOL",
- "KEEP_CONTENT_FOR_REVIEW",
- "REJECT_CONTENT",
- }
- assert {decision["search_query_effect_status"] for decision in decisions} <= {
- "success",
- "pending",
- "failed",
- "rule_blocked",
- }
- search_clues = service.read_jsonl(run_id, "search_clues.jsonl")
- assert {clue["search_query_effect_status"] for clue in search_clues} <= {
- "success",
- "pending",
- "failed",
- "rule_blocked",
- }
- path_ids = {path["source_path_record_id"] for path in paths}
- for asset in final_output["content_assets"]:
- assert set(asset["source_path_record_ids"]) <= path_ids
- evidence_pack = source_context["ext_data"]["evidence_pack"]
- assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
- assert evidence_pack["pattern_execution_id"] == 581
- assert evidence_pack["mining_config_id"] == 2082
- assert evidence_pack["itemset_ids"] == [1608352]
- validation = service.validate_run(run_id)
- assert validation["status"] == "pass"
- def test_all_platform_query_failure_writes_failed_query_runtime_records(tmp_path):
- service = RunService(
- runtime_root=tmp_path / "runtime" / "v1",
- query_variant_client=FakeQueryVariantClient(),
- )
- service._platform_client = lambda platform, platform_mode: _AllFailurePlatformClient()
- state = service.start_run(
- RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
- )
- assert state["status"] == "failed"
- assert state["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
- search_queries = service.read_jsonl(state["run_id"], "search_queries.jsonl")
- search_clues = service.read_jsonl(state["run_id"], "search_clues.jsonl")
- run_events = service.read_jsonl(state["run_id"], "run_events.jsonl")
- assert len(search_queries) == len(state["error_detail"]["query_failures"])
- assert {query["search_query_effect_status"] for query in search_queries} == {"failed"}
- assert all(query["raw_payload"]["query_failure"]["status"] == "failed" for query in search_queries)
- assert {clue["search_query_effect_status"] for clue in search_clues} == {"failed"}
- assert {clue["result_count"] for clue in search_clues} == {0}
- assert {clue["query_aggregation_id"] for clue in search_clues} == {"platform_query_failure"}
- platform_failures = [
- event for event in run_events if event["event_type"] == "platform_query_failed"
- ]
- assert len(platform_failures) == len(search_queries)
- assert {event["error_code"] for event in platform_failures} == {
- ErrorCode.PLATFORM_REQUEST_FAILED.value
- }
- def test_runtime_validation_catches_summary_drift(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
- final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
- final_output["summary"]["pooled_content_count"] = 99
- final_output_path.write_text(
- json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(finding["check_id"] == "summary_mismatch" for finding in validation["findings"])
- def test_local_runtime_replaces_pattern_recall_evidence_by_id(tmp_path):
- runtime = LocalRuntimeFileStore(tmp_path / "runtime")
- runtime.prepare_run("run_001")
- runtime.append_jsonl(
- "run_001",
- "pattern_recall_evidence.jsonl",
- [
- {
- "run_id": "run_001",
- "policy_run_id": "policy_001",
- "recall_evidence_id": "recall_001",
- "recall_status": "pending",
- }
- ],
- )
- runtime.append_jsonl(
- "run_001",
- "pattern_recall_evidence.jsonl",
- [
- {
- "run_id": "run_001",
- "policy_run_id": "policy_001",
- "recall_evidence_id": "recall_001",
- "recall_status": "matched",
- }
- ],
- )
- rows = runtime.read_jsonl("run_001", "pattern_recall_evidence.jsonl")
- assert len(rows) == 1
- assert rows[0]["recall_status"] == "matched"
- def test_runtime_validation_catches_missing_policy_run_id(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- decisions_path = service.runtime.run_dir(run_id) / "rule_decisions.jsonl"
- decisions = [
- json.loads(line)
- for line in decisions_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- decisions[0].pop("policy_run_id")
- decisions_path.write_text(
- "".join(
- json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n"
- for decision in decisions
- ),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(finding["check_id"] == "policy_run_id_mismatch" for finding in validation["findings"])
- def test_runtime_validation_catches_missing_record_schema_version(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- queries_path = service.runtime.run_dir(run_id) / "search_queries.jsonl"
- queries = [
- json.loads(line)
- for line in queries_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- queries[0].pop("record_schema_version")
- queries_path.write_text(
- "".join(
- json.dumps(query, ensure_ascii=False, separators=(",", ":")) + "\n"
- for query in queries
- ),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "record_schema_version_missing"
- for finding in validation["findings"]
- )
- class _AllFailurePlatformClient:
- def search(self, search_query: dict) -> list[dict]:
- raise RuntimeError("platform unavailable")
- def test_runtime_validation_catches_missing_raw_payload(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl"
- media_records = [
- json.loads(line)
- for line in media_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- media_records[0].pop("raw_payload")
- media_path.write_text(
- "".join(
- json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n"
- for media_record in media_records
- ),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(finding["check_id"] == "raw_payload_missing" for finding in validation["findings"])
- def test_runtime_validation_catches_forbidden_raw_payload_key(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl"
- media_records = [
- json.loads(line)
- for line in media_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- media_records[0]["raw_payload"]["secret"] = "should_not_be_stored"
- media_path.write_text(
- "".join(
- json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n"
- for media_record in media_records
- ),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "raw_payload_forbidden_key"
- for finding in validation["findings"]
- )
- def test_runtime_validation_catches_missing_pattern_recall_evidence(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- items_path = service.runtime.run_dir(run_id) / "discovered_content_items.jsonl"
- items = [
- json.loads(line)
- for line in items_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- items[0]["pattern_match_result"].pop("pattern_recall_evidence_id", None)
- items_path.write_text(
- "".join(
- json.dumps(item, ensure_ascii=False, separators=(",", ":")) + "\n"
- for item in items
- ),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "pattern_recall_evidence_missing"
- for finding in validation["findings"]
- )
- def test_runtime_validation_allows_missing_decode_case_ids_in_source_evidence(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- run_dir = service.runtime.run_dir(run_id)
- decisions_path = run_dir / "rule_decisions.jsonl"
- decisions = [
- json.loads(line)
- for line in decisions_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- for decision in decisions:
- decision["source_evidence"].pop("decode_case_ids", None)
- decisions_path.write_text(
- "".join(
- json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n"
- for decision in decisions
- ),
- encoding="utf-8",
- )
- final_output_path = run_dir / "final_output.json"
- final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
- for section in ["content_assets", "reject_records", "decision_records"]:
- for row in final_output.get(section, []):
- row.get("source_evidence", {}).pop("decode_case_ids", None)
- final_output_path.write_text(
- json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "pass"
- def test_runtime_validation_catches_missing_final_decision_record(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
- final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
- final_output["decision_records"] = final_output["decision_records"][:-1]
- final_output_path.write_text(
- json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(finding["check_id"] == "final_decision_missing" for finding in validation["findings"])
- def test_runtime_validation_catches_platform_content_id_source_pollution(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
- final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
- source_evidence = final_output["decision_records"][0]["source_evidence"]
- source_evidence["source_post_id"] = source_evidence["discovered_platform_content_id"]
- final_output_path.write_text(
- json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(
- finding["check_id"] == "source_evidence_content_pollution"
- for finding in validation["findings"]
- )
- def test_runtime_validation_catches_reject_source_path_break(tmp_path):
- service, run_id = _start_mock_run(tmp_path)
- paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
- paths = [
- json.loads(line)
- for line in paths_path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- paths = [
- path
- for path in paths
- if not (
- path.get("source_path_type") == "search_query_to_content"
- and path.get("to_node_id") == "7390000000000000099"
- )
- ]
- paths_path.write_text(
- "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
- encoding="utf-8",
- )
- validation = service.validate_run(run_id)
- assert validation["status"] == "fail"
- assert any(finding["check_id"] == "source_path_broken" for finding in validation["findings"])
- def test_real_source_fixture_keeps_upstream_evidence_pack(tmp_path):
- source_path = Path("tests/fixtures/real_case_source/source_context.json")
- service, run_id = _start_mock_run(tmp_path, source=str(source_path))
- source_context = service.read_json(run_id, "source_context.json")
- evidence_pack = source_context["ext_data"]["evidence_pack"]
- decisions = service.read_jsonl(run_id, "rule_decisions.jsonl")
- assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
- assert evidence_pack["source_certainty"] == "db_validated"
- assert evidence_pack["validation_status"] == "passed"
- assert evidence_pack["source_post_id"] == "51978710"
- assert evidence_pack["pattern_execution_id"] == 581
- assert evidence_pack["mining_config_id"] == 2082
- assert evidence_pack["itemset_ids"] == [1608352]
- assert evidence_pack["upstream_run_id"] == "f405f129-3341-4f4a-98e6-fd3f73632adb"
- assert evidence_pack["support"] == 0.0045734552921411235
- assert evidence_pack["absolute_support"] == 49
- assert evidence_pack["decode_case_ids"] == []
- assert decisions[0]["source_evidence"]["source_certainty"] == "db_validated"
- assert (
- decisions[0]["source_evidence"]["discovered_platform_content_id"]
- not in evidence_pack["matched_post_ids"]
- )
- assert service.validate_run(run_id)["status"] == "pass"
- def test_demand_content_json_array_source_is_adapted_to_source_context(tmp_path):
- source_path = tmp_path / "demand_content.json"
- source_path.write_text(
- json.dumps(
- [
- {
- "id": 123,
- "merge_leve2": "PG Pattern V2 需求测试",
- "name": "爱国情感,人物故事",
- "suggestion": None,
- "score": 1.0,
- "dt": "20260604",
- "ext_data": {
- "evidence_pack": {
- "source_kind": "pattern_itemset",
- "pattern_source_system": "pg_pattern_v2",
- "case_id_type": "post_id",
- "source_post_id": "51978710",
- "pattern_execution_id": 581,
- "mining_config_id": 2082,
- "itemset_ids": [1608352],
- "itemset_items": [{"itemset_id": 1608352}],
- "category_bindings": [{"category_id": 76006}],
- "element_bindings": [{"category_id": 76006}],
- "support": 0.0045734552921411235,
- "absolute_support": 49,
- "matched_post_ids": ["51978710"],
- "video_ids": ["51978710"],
- "case_ids": ["51978710"],
- "decode_case_ids": [],
- "seed_terms": ["爱国情感", "人物故事"],
- "source_certainty": "db_validated",
- "validation_status": "passed",
- }
- },
- }
- ],
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- service, run_id = _start_mock_run(tmp_path, source=str(source_path))
- source_context = service.read_json(run_id, "source_context.json")
- evidence_pack = source_context["ext_data"]["evidence_pack"]
- assert source_context["demand_content_id"] == "123"
- assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
- assert evidence_pack["pattern_execution_id"] == 581
- assert evidence_pack["mining_config_id"] == 2082
- assert evidence_pack["itemset_ids"] == [1608352]
- assert service.validate_run(run_id)["status"] == "pass"
- def test_old_mysql_source_system_is_rejected(tmp_path):
- source_path = tmp_path / "source_context.json"
- source_path.write_text(
- json.dumps(
- {
- "run_id": "old_run",
- "demand_content_id": "old",
- "merge_leve2": "历史样例",
- "name": "旧 MySQL 样例",
- "ext_data": {
- "evidence_pack": {
- "source_kind": "pattern_itemset",
- "pattern_source_system": "mysql_topic_pattern",
- "case_id_type": "post_id",
- "source_post_id": "51978710",
- "pattern_execution_id": 581,
- "mining_config_id": 2082,
- "itemset_ids": [1608352],
- "itemset_items": [{"itemset_id": 1608352}],
- "category_bindings": [{"category_id": 76006}],
- "support": 0.0045734552921411235,
- "absolute_support": 49,
- "matched_post_ids": ["51978710"],
- "video_ids": ["51978710"],
- "case_ids": ["51978710"],
- "decode_case_ids": [],
- "seed_terms": ["爱国情感", "人物故事"],
- "source_certainty": "db_validated",
- "validation_status": "passed",
- }
- },
- },
- ensure_ascii=False,
- ),
- encoding="utf-8",
- )
- service = RunService(runtime_root=tmp_path / "runtime" / "v1")
- state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path)))
- assert state["status"] == "failed"
- assert state["error_code"] == ErrorCode.INVALID_SOURCE.value
- assert state["errors"] == ["invalid source"]
|