| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826 |
- from __future__ import annotations
- import json
- from collections import Counter
- from typing import Any
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
- from content_agent.findings import fail as _fail
- from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
- from content_agent.interfaces import RuntimeFileStore
- JSON_FILES = {
- "source_context.json",
- "pattern_seed_pack.json",
- "final_output.json",
- "strategy_review.json",
- }
- JSONL_FILES = set(RUNTIME_FILENAMES) - JSON_FILES
- POLICY_RUN_FILES = {
- "pattern_seed_pack.json",
- "search_queries.jsonl",
- "discovered_content_items.jsonl",
- "content_media_records.jsonl",
- "pattern_recall_evidence.jsonl",
- "rule_decisions.jsonl",
- "walk_actions.jsonl",
- "run_events.jsonl",
- "source_path_records.jsonl",
- "search_clues.jsonl",
- "final_output.json",
- "strategy_review.json",
- }
- RAW_PAYLOAD_FILES = {
- "search_queries.jsonl",
- "discovered_content_items.jsonl",
- "content_media_records.jsonl",
- "pattern_recall_evidence.jsonl",
- "rule_decisions.jsonl",
- "walk_actions.jsonl",
- "run_events.jsonl",
- "source_path_records.jsonl",
- "search_clues.jsonl",
- "strategy_review.json",
- }
- FORBIDDEN_PAYLOAD_KEYS = {
- "password",
- "token",
- "access_token",
- "refresh_token",
- "api_key",
- "apikey",
- "secret",
- "dsn",
- "authorization",
- "cookie",
- "session",
- "credential",
- }
- DECISION_ACTIONS = {
- "ADD_TO_CONTENT_POOL",
- "KEEP_CONTENT_FOR_REVIEW",
- "REJECT_CONTENT",
- }
- EFFECT_STATUSES = {"success", "pending", "failed", "rule_blocked"}
- WALK_STATUSES = {"success", "pending", "failed", "skipped", "rule_blocked"}
- DECISION_REPLAY_REQUIRED_FIELDS = {
- "policy_bundle_hash",
- "rule_pack_id",
- "rule_pack_version",
- "dispatch_id",
- "strategy_version",
- }
- SOURCE_EVIDENCE_FIELDS = {
- "source_kind",
- "pattern_source_system",
- "case_id_type",
- "source_post_id",
- "pattern_execution_id",
- "mining_config_id",
- "itemset_ids",
- "itemset_items",
- "support",
- "absolute_support",
- "matched_post_ids",
- "video_ids",
- "case_ids",
- "seed_terms",
- "discovery_start_source",
- "previous_discovery_step",
- "origin_path_id",
- "run_id",
- "policy_run_id",
- "discovered_platform_content_id",
- "source_certainty",
- "validation_status",
- }
- def validate_run(run_id: str, runtime: RuntimeFileStore) -> dict[str, Any]:
- findings: list[dict[str, Any]] = []
- data = _load_files(run_id, runtime, findings)
- if any(finding["level"] == "fail" for finding in findings):
- return _result(run_id, findings)
- _check_run_ids(run_id, data, findings)
- _check_schema_versions(data, findings)
- _check_policy_run_ids(data, findings)
- _check_raw_payloads(data, findings)
- _check_unique_ids(data, findings)
- _check_references(data, findings)
- _check_pattern_recall_evidence(data, findings)
- _check_source_evidence(data, findings)
- _check_source_paths(data, findings)
- _check_summary(data, findings)
- _check_completeness(data, findings)
- return _result(run_id, findings)
- def compute_final_output_completeness(
- final_output: dict[str, Any],
- decisions: list[dict[str, Any]],
- source_path_records: list[dict[str, Any]],
- ) -> dict[str, Any]:
- findings: list[str] = []
- decision_ids = {decision.get("decision_id") for decision in decisions}
- path_ids = {path.get("source_path_record_id") for path in source_path_records}
- decision_asset_paths = {
- (path.get("decision_id"), path.get("to_node_id")): path
- for path in source_path_records
- if path.get("source_path_type") == "decision_to_asset"
- }
- for asset in final_output.get("content_assets", []):
- decision_id = asset.get("decision_id")
- content_id = asset.get("platform_content_id")
- asset_path_ids = set(asset.get("source_path_record_ids", []))
- if decision_id not in decision_ids:
- findings.append(f"content_asset missing decision: {content_id}")
- missing_paths = sorted(asset_path_ids - path_ids)
- if missing_paths:
- findings.append(f"content_asset missing paths: {content_id}")
- decision_asset = decision_asset_paths.get((decision_id, content_id))
- if not decision_asset:
- findings.append(f"content_asset missing decision_to_asset: {content_id}")
- elif decision_asset.get("source_path_record_id") not in asset_path_ids:
- findings.append(f"content_asset omits decision_to_asset: {content_id}")
- for record in final_output.get("review_records", []):
- if record.get("decision_id") not in decision_ids:
- findings.append(f"review_record missing decision: {record.get('platform_content_id')}")
- if set(record.get("source_path_record_ids", [])) - path_ids:
- findings.append(f"review_record missing paths: {record.get('platform_content_id')}")
- for record in final_output.get("reject_records", []):
- if record.get("decision_id") not in decision_ids:
- findings.append(f"reject_record missing decision: {record.get('decision_target_id')}")
- final_decision_ids = {
- record.get("decision_id") for record in final_output.get("decision_records", [])
- }
- if decision_ids - final_decision_ids:
- findings.append("decision_records incomplete")
- for author_asset in final_output.get("author_assets", []):
- if set(author_asset.get("decision_ids", [])) - decision_ids:
- findings.append(f"author_asset missing decisions: {author_asset.get('author_asset_id')}")
- if set(author_asset.get("source_path_record_ids", [])) - path_ids:
- findings.append(f"author_asset missing paths: {author_asset.get('author_asset_id')}")
- evidence_refs = author_asset.get("evidence_refs") or {}
- if evidence_refs.get("decision_ids") != author_asset.get("decision_ids"):
- findings.append(f"author_asset evidence incomplete: {author_asset.get('author_asset_id')}")
- required_sections = [
- "content_assets",
- "author_assets",
- "review_records",
- "decision_records",
- "search_clues",
- "reject_records",
- "summary",
- ]
- missing_sections = [section for section in required_sections if section not in final_output]
- findings.extend(f"final_output missing section: {section}" for section in missing_sections)
- complete = not findings
- return {
- "validation_status": "pass" if complete else "fail",
- "run_path_complete": complete,
- "trace_complete": complete,
- "findings_summary": findings,
- }
- def _load_files(
- run_id: str,
- runtime: RuntimeFileStore,
- findings: list[dict[str, Any]],
- ) -> dict[str, Any]:
- run_dir = runtime.run_dir(run_id)
- data: dict[str, Any] = {}
- for filename in RUNTIME_FILENAMES:
- path = run_dir / filename
- if not path.exists():
- _fail(findings, "file_missing", f"missing runtime file: {filename}")
- continue
- try:
- if filename in JSON_FILES:
- data[filename] = json.loads(path.read_text(encoding="utf-8"))
- else:
- data[filename] = [
- json.loads(line)
- for line in path.read_text(encoding="utf-8").splitlines()
- if line.strip()
- ]
- except json.JSONDecodeError as exc:
- _fail(findings, "json_parse_failed", f"{filename} cannot parse: {exc}")
- return data
- def _check_run_ids(run_id: str, data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- for filename, value in data.items():
- rows = value if isinstance(value, list) else [value]
- for row in rows:
- if isinstance(row, dict) and row.get("run_id") != run_id:
- _fail(findings, "run_id_mismatch", f"{filename} has mismatched run_id")
- def _check_schema_versions(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- for filename, value in data.items():
- if filename in JSON_FILES:
- if isinstance(value, dict) and value.get("schema_version") != RUNTIME_SCHEMA_VERSION:
- _fail(findings, "schema_version_missing", f"{filename} has bad schema_version")
- continue
- rows = value if isinstance(value, list) else []
- for row in rows:
- if row.get("record_schema_version") != RUNTIME_RECORD_SCHEMA_VERSION:
- _fail(
- findings,
- "record_schema_version_missing",
- f"{filename} has bad record_schema_version",
- )
- def _check_policy_run_ids(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- if "policy_run_id" in data.get("source_context.json", {}):
- _fail(findings, "policy_run_id_unexpected", "source_context.json must stay run-level")
- policy_run_id = data.get("pattern_seed_pack.json", {}).get("policy_run_id")
- if not policy_run_id:
- _fail(findings, "policy_run_id_missing", "pattern_seed_pack.json missing policy_run_id")
- return
- for filename in POLICY_RUN_FILES:
- value = data.get(filename)
- rows = value if isinstance(value, list) else [value]
- for row in rows:
- if isinstance(row, dict) and row.get("policy_run_id") != policy_run_id:
- _fail(
- findings,
- "policy_run_id_mismatch",
- f"{filename} has mismatched policy_run_id",
- )
- def _check_raw_payloads(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- for filename in RAW_PAYLOAD_FILES:
- value = data.get(filename)
- rows = value if isinstance(value, list) else [value]
- for row in rows:
- if not isinstance(row, dict):
- continue
- raw_payload = row.get("raw_payload")
- if not isinstance(raw_payload, dict) or not raw_payload:
- _fail(findings, "raw_payload_missing", f"{filename} missing raw_payload")
- continue
- forbidden_paths = _find_forbidden_payload_keys(raw_payload)
- if forbidden_paths:
- _fail(
- findings,
- "raw_payload_forbidden_key",
- f"{filename} raw_payload contains forbidden keys: {forbidden_paths}",
- )
- def _find_forbidden_payload_keys(value: Any, prefix: str = "raw_payload") -> list[str]:
- if isinstance(value, dict):
- paths: list[str] = []
- for key, child in value.items():
- child_path = f"{prefix}.{key}"
- if str(key).lower() in FORBIDDEN_PAYLOAD_KEYS:
- paths.append(child_path)
- paths.extend(_find_forbidden_payload_keys(child, child_path))
- return paths
- if isinstance(value, list):
- paths = []
- for index, child in enumerate(value):
- paths.extend(_find_forbidden_payload_keys(child, f"{prefix}[{index}]"))
- return paths
- return []
- def _check_unique_ids(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- checks = [
- ("search_queries.jsonl", "search_query_id"),
- ("discovered_content_items.jsonl", "content_discovery_id"),
- ("discovered_content_items.jsonl", "platform_content_id"),
- ("pattern_recall_evidence.jsonl", "recall_evidence_id"),
- ("rule_decisions.jsonl", "decision_id"),
- ("rule_decisions.jsonl", "decision_target_id"),
- ("walk_actions.jsonl", "walk_action_id"),
- ("source_path_records.jsonl", "source_path_record_id"),
- ]
- for filename, field in checks:
- values = [row.get(field) for row in data.get(filename, [])]
- duplicates = [value for value, count in Counter(values).items() if value and count > 1]
- if duplicates:
- _fail(findings, "duplicate_id", f"{filename}.{field} duplicates: {duplicates}")
- def _check_references(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- search_query_ids = {row["search_query_id"] for row in data.get("search_queries.jsonl", [])}
- items = data.get("discovered_content_items.jsonl", [])
- content_ids = {row["platform_content_id"] for row in items}
- decisions = data.get("rule_decisions.jsonl", [])
- decision_ids = {row["decision_id"] for row in decisions}
- walk_action_ids = {row["walk_action_id"] for row in data.get("walk_actions.jsonl", [])}
- path_ids = {row["source_path_record_id"] for row in data.get("source_path_records.jsonl", [])}
- for item in items:
- if item.get("search_query_id") not in search_query_ids:
- _fail(
- findings,
- "missing_search_query_ref",
- f"content item has unknown search_query_id: {item.get('search_query_id')}",
- )
- for media in data.get("content_media_records.jsonl", []):
- if not media.get("platform"):
- _fail(findings, "platform_missing", "content_media_records.jsonl missing platform")
- if media.get("platform_content_id") not in content_ids:
- _fail(
- findings,
- "missing_content_ref",
- f"media has unknown platform_content_id: {media.get('platform_content_id')}",
- )
- for evidence in data.get("pattern_recall_evidence.jsonl", []):
- if evidence.get("platform_content_id") not in content_ids:
- _fail(
- findings,
- "missing_content_ref",
- f"recall evidence has unknown platform_content_id: {evidence.get('platform_content_id')}",
- )
- for decision in decisions:
- if decision.get("decision_target_id") not in content_ids:
- _fail(
- findings,
- "missing_content_ref",
- f"decision has unknown decision_target_id: {decision.get('decision_target_id')}",
- )
- if decision.get("decision_action") not in DECISION_ACTIONS:
- _fail(
- findings,
- "bad_decision_action",
- f"unsupported decision_action: {decision.get('decision_action')}",
- )
- if decision.get("search_query_effect_status") not in EFFECT_STATUSES:
- _fail(
- findings,
- "bad_effect_status",
- f"unsupported decision effect status: {decision.get('search_query_effect_status')}",
- )
- replay_data = decision.get("decision_replay_data") or {}
- missing_replay_fields = [
- field for field in DECISION_REPLAY_REQUIRED_FIELDS if not replay_data.get(field)
- ]
- if missing_replay_fields:
- _fail(
- findings,
- "decision_replay_incomplete",
- f"decision {decision.get('decision_id')} missing replay fields: {missing_replay_fields}",
- )
- for clue in data.get("search_clues.jsonl", []):
- if clue.get("search_query_effect_status") not in EFFECT_STATUSES:
- _fail(
- findings,
- "bad_effect_status",
- f"unsupported query effect status: {clue.get('search_query_effect_status')}",
- )
- if not clue.get("query_aggregation_id"):
- _fail(
- findings,
- "missing_query_aggregation_id",
- f"search clue missing query_aggregation_id: {clue.get('clue_id')}",
- )
- for action in data.get("walk_actions.jsonl", []):
- missing_action_fields = [
- field
- for field in ["walk_action_id", "edge_id", "walk_action", "walk_status"]
- if not action.get(field)
- ]
- if missing_action_fields:
- _fail(
- findings,
- "walk_action_incomplete",
- f"walk action missing fields: {missing_action_fields}",
- )
- if action.get("walk_status") not in WALK_STATUSES:
- _fail(
- findings,
- "bad_walk_status",
- f"unsupported walk_status: {action.get('walk_status')}",
- )
- decision_id = action.get("decision_id")
- if decision_id and decision_id not in decision_ids:
- _fail(findings, "missing_decision_ref", f"walk action has unknown decision_id: {decision_id}")
- for path in data.get("source_path_records.jsonl", []):
- decision_id = path.get("decision_id")
- if decision_id and decision_id not in decision_ids:
- _fail(findings, "missing_decision_ref", f"path has unknown decision_id: {decision_id}")
- walk_action_id = (path.get("raw_payload") or {}).get("walk_action_id") or path.get("walk_action_id")
- if walk_action_id and walk_action_id not in walk_action_ids:
- _fail(
- findings,
- "missing_walk_action_ref",
- f"path has unknown walk_action_id: {walk_action_id}",
- )
- final_output = data.get("final_output.json", {})
- for asset in final_output.get("content_assets", []):
- if asset.get("decision_id") not in decision_ids:
- _fail(
- findings,
- "missing_decision_ref",
- f"asset has unknown decision_id: {asset.get('decision_id')}",
- )
- for path_id in asset.get("source_path_record_ids", []):
- if path_id not in path_ids:
- _fail(findings, "missing_path_ref", f"asset has unknown path_id: {path_id}")
- for record in final_output.get("review_records", []):
- if record.get("decision_id") not in decision_ids:
- _fail(
- findings,
- "missing_decision_ref",
- f"review_records has unknown decision_id: {record.get('decision_id')}",
- )
- for path_id in record.get("source_path_record_ids", []):
- if path_id not in path_ids:
- _fail(
- findings,
- "missing_path_ref",
- f"review_records has unknown path_id: {path_id}",
- )
- for section in ["reject_records", "decision_records"]:
- for row in final_output.get(section, []):
- if row.get("decision_id") not in decision_ids:
- _fail(
- findings,
- "missing_decision_ref",
- f"{section} has unknown decision_id: {row.get('decision_id')}",
- )
- for author_asset in final_output.get("author_assets", []):
- for decision_id in author_asset.get("decision_ids", []):
- if decision_id not in decision_ids:
- _fail(
- findings,
- "missing_decision_ref",
- f"author_assets has unknown decision_id: {decision_id}",
- )
- for path_id in author_asset.get("source_path_record_ids", []):
- if path_id not in path_ids:
- _fail(
- findings,
- "missing_path_ref",
- f"author_assets has unknown path_id: {path_id}",
- )
- evidence_refs = author_asset.get("evidence_refs") or {}
- if evidence_refs.get("decision_ids") != author_asset.get("decision_ids"):
- _fail(
- findings,
- "author_asset_evidence_incomplete",
- f"author asset evidence refs do not match decisions: {author_asset.get('author_asset_id')}",
- )
- def _check_pattern_recall_evidence(
- data: dict[str, Any],
- findings: list[dict[str, Any]],
- ) -> None:
- evidence_rows = data.get("pattern_recall_evidence.jsonl", [])
- evidence_by_id = {row.get("recall_evidence_id"): row for row in evidence_rows}
- for item in data.get("discovered_content_items.jsonl", []):
- pattern_match = item.get("pattern_match_result") or {}
- # V3(M3):桥接键 pattern_recall 已退役;改以"是否被判定过"(judge_status 存在)为准——
- # 每条经 Gemini 判定的内容必须能解析到真实 evidence 行,否则视为血缘损坏。
- if not pattern_match.get("judge_status"):
- continue
- evidence_id = pattern_match.get("pattern_recall_evidence_id")
- evidence = evidence_by_id.get(evidence_id)
- if not evidence:
- _fail(
- findings,
- "pattern_recall_evidence_missing",
- f"matched item cannot find recall evidence: {evidence_id}",
- )
- # V3(M2):判定改为 Gemini 直读,不再有 decode 强证据词/分类树路径;
- # matched_terms/matched_category_paths 不再适用,故不再强制。
- def _check_source_evidence(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- source_context = data.get("source_context.json", {})
- evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
- for decision in data.get("rule_decisions.jsonl", []):
- _check_one_source_evidence(
- findings,
- decision.get("source_evidence") or {},
- evidence_pack,
- f"decision {decision.get('decision_id')}",
- )
- final_output = data.get("final_output.json", {})
- for asset in final_output.get("content_assets", []):
- _check_one_source_evidence(
- findings,
- asset.get("source_evidence") or {},
- evidence_pack,
- f"asset {asset.get('platform_content_id')}",
- )
- for record in final_output.get("reject_records", []):
- _check_one_source_evidence(
- findings,
- record.get("source_evidence") or {},
- evidence_pack,
- f"reject {record.get('decision_target_id')}",
- )
- for record in final_output.get("review_records", []):
- _check_one_source_evidence(
- findings,
- record.get("source_evidence") or {},
- evidence_pack,
- f"review {record.get('platform_content_id')}",
- )
- for record in final_output.get("decision_records", []):
- _check_one_source_evidence(
- findings,
- record.get("source_evidence") or {},
- evidence_pack,
- f"decision record {record.get('decision_id')}",
- )
- _check_final_decision_coverage(data, findings)
- def _check_one_source_evidence(
- findings: list[dict[str, Any]],
- source_evidence: dict[str, Any],
- evidence_pack: dict[str, Any],
- label: str,
- ) -> None:
- missing_fields = sorted(field for field in SOURCE_EVIDENCE_FIELDS if field not in source_evidence)
- if missing_fields:
- _fail(findings, "source_evidence_missing_fields", f"{label} missing {missing_fields}")
- # V3(M4):分类树 category/element binding 已随 decode 链路退役,不再作为血缘门槛。
- scalar_fields = [
- "pattern_execution_id",
- "source_post_id",
- "pattern_source_system",
- "case_id_type",
- "mining_config_id",
- "support",
- "absolute_support",
- "source_certainty",
- "validation_status",
- ]
- list_fields = [
- "itemset_ids",
- "itemset_items",
- "matched_post_ids",
- "video_ids",
- "case_ids",
- "seed_terms",
- ]
- for field in scalar_fields + list_fields:
- if source_evidence.get(field) != evidence_pack.get(field):
- _fail(findings, "source_evidence_mismatch", f"{label} mismatched {field}")
- if (
- "decode_case_ids" in source_evidence
- and source_evidence.get("decode_case_ids") != evidence_pack.get("decode_case_ids")
- ):
- _fail(findings, "source_evidence_mismatch", f"{label} mismatched decode_case_ids")
- platform_content_id = source_evidence.get("discovered_platform_content_id")
- if platform_content_id:
- if platform_content_id == source_evidence.get("source_post_id"):
- _fail(findings, "source_evidence_content_pollution", f"{label} rewrites source_post_id")
- if platform_content_id in (source_evidence.get("matched_post_ids") or []):
- _fail(
- findings,
- "source_evidence_content_pollution",
- f"{label} rewrites matched_post_ids",
- )
- def _check_final_decision_coverage(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- decision_ids = {decision["decision_id"] for decision in data.get("rule_decisions.jsonl", [])}
- final_decision_ids = {
- record.get("decision_id")
- for record in data.get("final_output.json", {}).get("decision_records", [])
- }
- missing = sorted(decision_ids - final_decision_ids)
- if missing:
- _fail(findings, "final_decision_missing", f"final_output decision_records missing {missing}")
- def _check_source_paths(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- evidence_pack = data.get("source_context.json", {}).get("ext_data", {}).get("evidence_pack", {})
- pattern_execution_id = evidence_pack.get("pattern_execution_id")
- paths = data.get("source_path_records.jsonl", [])
- path_by_id = {path["source_path_record_id"]: path for path in paths}
- pattern_query_paths = {
- path["to_node_id"]: path
- for path in paths
- if path.get("source_path_type") == "pattern_to_search_query"
- }
- query_content_paths = {
- path["to_node_id"]: path
- for path in paths
- if path.get("source_path_type") == "search_query_to_content"
- }
- decision_asset_paths = {
- (path.get("decision_id"), path.get("to_node_id")): path
- for path in paths
- if path.get("source_path_type") == "decision_to_asset"
- }
- for decision in data.get("rule_decisions.jsonl", []):
- _check_content_source_path(
- findings,
- label=f"decision {decision.get('decision_id')}",
- platform_content_id=decision.get("decision_target_id"),
- pattern_execution_id=pattern_execution_id,
- pattern_query_paths=pattern_query_paths,
- query_content_paths=query_content_paths,
- )
- for asset in data.get("final_output.json", {}).get("content_assets", []):
- platform_content_id = asset.get("platform_content_id")
- path_ids = set(asset.get("source_path_record_ids", []))
- query_content = query_content_paths.get(platform_content_id)
- if not query_content or query_content.get("source_path_record_id") not in path_ids:
- _fail(
- findings,
- "source_path_broken",
- f"asset lacks search_query_to_content path: {platform_content_id}",
- )
- continue
- pattern_query = pattern_query_paths.get(query_content.get("from_node_id"))
- if not pattern_query or pattern_query.get("source_path_record_id") not in path_ids:
- _fail(
- findings,
- "source_path_broken",
- f"asset lacks pattern_to_search_query path: {platform_content_id}",
- )
- continue
- if pattern_query.get("from_node_id") != pattern_execution_id:
- _fail(
- findings,
- "source_path_broken",
- f"asset path starts from wrong pattern: {platform_content_id}",
- )
- for path_id in path_ids:
- if path_id not in path_by_id:
- _fail(findings, "source_path_broken", f"asset path missing: {path_id}")
- decision_asset = decision_asset_paths.get(
- (asset.get("decision_id"), platform_content_id)
- )
- if not decision_asset:
- _fail(
- findings,
- "decision_to_asset_missing",
- f"asset lacks decision_to_asset path: {platform_content_id}",
- )
- continue
- if decision_asset.get("source_path_record_id") not in path_ids:
- _fail(
- findings,
- "decision_to_asset_missing",
- f"asset source paths omit decision_to_asset: {platform_content_id}",
- )
- if decision_asset.get("from_node_type") != "RuleDecision":
- _fail(
- findings,
- "decision_to_asset_broken",
- f"asset decision_to_asset starts from wrong node: {platform_content_id}",
- )
- if decision_asset.get("from_node_id") != asset.get("decision_id"):
- _fail(
- findings,
- "decision_to_asset_broken",
- f"asset decision_to_asset has wrong decision id: {platform_content_id}",
- )
- if decision_asset.get("to_node_type") != "ContentAsset":
- _fail(
- findings,
- "decision_to_asset_broken",
- f"asset decision_to_asset ends at wrong node: {platform_content_id}",
- )
- for record in data.get("final_output.json", {}).get("review_records", []):
- platform_content_id = record.get("platform_content_id")
- path_ids = set(record.get("source_path_record_ids", []))
- query_content = query_content_paths.get(platform_content_id)
- if not query_content or query_content.get("source_path_record_id") not in path_ids:
- _fail(
- findings,
- "source_path_broken",
- f"review record lacks search_query_to_content path: {platform_content_id}",
- )
- continue
- pattern_query = pattern_query_paths.get(query_content.get("from_node_id"))
- if not pattern_query or pattern_query.get("source_path_record_id") not in path_ids:
- _fail(
- findings,
- "source_path_broken",
- f"review record lacks pattern_to_search_query path: {platform_content_id}",
- )
- def _check_content_source_path(
- findings: list[dict[str, Any]],
- label: str,
- platform_content_id: Any,
- pattern_execution_id: Any,
- pattern_query_paths: dict[str, dict[str, Any]],
- query_content_paths: dict[str, dict[str, Any]],
- ) -> None:
- query_content = query_content_paths.get(platform_content_id)
- if not query_content:
- _fail(
- findings,
- "source_path_broken",
- f"{label} lacks search_query_to_content path: {platform_content_id}",
- )
- return
- pattern_query = pattern_query_paths.get(query_content.get("from_node_id"))
- if not pattern_query:
- _fail(
- findings,
- "source_path_broken",
- f"{label} lacks pattern_to_search_query path: {platform_content_id}",
- )
- return
- if pattern_query.get("from_node_id") != pattern_execution_id:
- _fail(
- findings,
- "source_path_broken",
- f"{label} path starts from wrong pattern: {platform_content_id}",
- )
- def _check_summary(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- decisions = data.get("rule_decisions.jsonl", [])
- action_counts = Counter(decision.get("decision_action") for decision in decisions)
- summary = data.get("final_output.json", {}).get("summary", {})
- expected = {
- "pooled_content_count": action_counts["ADD_TO_CONTENT_POOL"],
- "review_content_count": action_counts["KEEP_CONTENT_FOR_REVIEW"],
- "pending_content_count": 0,
- "rejected_content_count": action_counts["REJECT_CONTENT"],
- }
- for field, value in expected.items():
- if summary.get(field) != value:
- _fail(
- findings,
- "summary_mismatch",
- f"summary.{field} expected {value}, got {summary.get(field)}",
- )
- clue_counts = Counter()
- for clue in data.get("search_clues.jsonl", []):
- clue_counts["ADD_TO_CONTENT_POOL"] += clue.get("pooled_content_count", 0)
- clue_counts["KEEP_CONTENT_FOR_REVIEW"] += clue.get("review_content_count", 0)
- clue_counts["REJECT_CONTENT"] += clue.get("rejected_content_count", 0)
- for action, count in action_counts.items():
- if clue_counts[action] < count:
- _fail(
- findings,
- "search_clue_mismatch",
- f"search_clues {action} expected at least {count}, got {clue_counts[action]}",
- )
- def _check_completeness(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- final_output = data.get("final_output.json", {})
- expected = compute_final_output_completeness(
- final_output,
- data.get("rule_decisions.jsonl", []),
- data.get("source_path_records.jsonl", []),
- )
- summary = final_output.get("summary", {})
- for field in ["run_path_complete", "trace_complete"]:
- if summary.get(field) != expected[field]:
- _fail(
- findings,
- "completeness_mismatch",
- f"summary.{field} expected {expected[field]}, got {summary.get(field)}",
- )
- if final_output.get("validation_status") != expected["validation_status"]:
- _fail(
- findings,
- "completeness_mismatch",
- f"validation_status expected {expected['validation_status']}, got {final_output.get('validation_status')}",
- )
- def _result(run_id: str, findings: list[dict[str, Any]]) -> dict[str, Any]:
- return {
- "run_id": run_id,
- "status": "fail" if any(finding["level"] == "fail" for finding in findings) else "pass",
- "findings": findings,
- }
|