validate_rule_pack_config.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """Validate referential integrity inside douyin_rule_packs.v1.json (V2-M1D).
  2. Mirrors the walk-side checks in walk_strategy_json (which already cover walk FKs).
  3. Here we close the rule-pack side: every decision_action / decision_reason_code /
  4. scoring_rule dimension_key / dispatch rule_pack_id must resolve to an
  5. authoritative catalog entry within the same file.
  6. """
  7. from __future__ import annotations
  8. import argparse
  9. import json
  10. import sys
  11. from pathlib import Path
  12. from typing import Any
  13. ROOT = Path(__file__).resolve().parents[1]
  14. RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
  15. LEGACY_FIELD_BLOCKLIST = {
  16. "fit_senior_50plus",
  17. "fit_confidence",
  18. "relevance_score",
  19. "platform_heat",
  20. "age_50_plus_level",
  21. }
  22. def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
  23. findings.append({"level": "fail", "check_id": check_id, "message": message})
  24. def _warn(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
  25. findings.append({"level": "warn", "check_id": check_id, "message": message})
  26. def validate_rule_pack_config(pkg: dict[str, Any]) -> list[dict[str, Any]]:
  27. findings: list[dict[str, Any]] = []
  28. rule_packs = pkg.get("rule_packs", [])
  29. rule_pack_ids = {p.get("rule_pack_id") for p in rule_packs}
  30. actions: set[str] = set()
  31. for entry in pkg.get("decision_action_catalog", []):
  32. actions.update(entry.get("allowed_actions", []))
  33. reason_codes = {r.get("decision_reason_code") for r in pkg.get("decision_reason_codes", [])}
  34. enabled_by_group: dict[tuple[Any, ...], list[str]] = {}
  35. for dispatch in pkg.get("rule_pack_dispatch", []):
  36. if dispatch.get("rule_pack_id") not in rule_pack_ids:
  37. _fail(findings, "dispatch_rule_pack_id",
  38. f"{dispatch.get('dispatch_id')} references unknown rule_pack_id: {dispatch.get('rule_pack_id')}")
  39. if dispatch.get("dispatch_enabled"):
  40. group = (dispatch.get("platform"), dispatch.get("strategy_version"), dispatch.get("runtime_stage"),
  41. dispatch.get("target_entity"), dispatch.get("content_format"))
  42. enabled_by_group.setdefault(group, []).append(dispatch.get("rule_pack_id"))
  43. for group, pack_ids in enabled_by_group.items():
  44. if len(pack_ids) > 1:
  45. _fail(findings, "dispatch_conflict",
  46. f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for group {group}: {pack_ids}")
  47. for pack in rule_packs:
  48. pid = pack.get("rule_pack_id")
  49. dimension_keys = {d.get("key") for d in pack.get("scorecard", {}).get("dimensions", [])}
  50. for gate in pack.get("hard_gates", []):
  51. if gate.get("decision_action") not in actions:
  52. _fail(findings, "hard_gate_action", f"{pid}/{gate.get('gate_id')} unknown decision_action: {gate.get('decision_action')}")
  53. if gate.get("decision_reason_code") not in reason_codes:
  54. # decision_reason_codes is a curated subset, not an exhaustive enum -> warn only.
  55. _warn(findings, "hard_gate_reason", f"{pid}/{gate.get('gate_id')} reason_code not in catalog: {gate.get('decision_reason_code')}")
  56. for rule in pack.get("scorecard", {}).get("scoring_rules", []):
  57. if rule.get("dimension_key") not in dimension_keys:
  58. _fail(findings, "scoring_rule_dimension", f"{pid}/{rule.get('scoring_rule_id')} unknown dimension_key: {rule.get('dimension_key')}")
  59. for i, threshold in enumerate(pack.get("thresholds", [])):
  60. if threshold.get("decision_action") not in actions:
  61. _fail(findings, "threshold_action", f"{pid}/threshold[{i}] unknown decision_action: {threshold.get('decision_action')}")
  62. if threshold.get("decision_reason_code") not in reason_codes:
  63. _warn(findings, "threshold_reason", f"{pid}/threshold[{i}] reason_code not in catalog: {threshold.get('decision_reason_code')}")
  64. if _is_v4_pack(pack, pkg):
  65. _check_v4_rule_pack(findings, pack)
  66. return findings
  67. def _is_v4_pack(pack: dict[str, Any], pkg: dict[str, Any]) -> bool:
  68. return (
  69. (pkg.get("strategy_binding") or {}).get("strategy_version") == "V4"
  70. or (pack.get("scorecard") or {}).get("schema_version") == "v4_scorecard.v1"
  71. or pack.get("version") == "4.0.0"
  72. )
  73. def _check_v4_rule_pack(findings: list[dict[str, Any]], pack: dict[str, Any]) -> None:
  74. pid = pack.get("rule_pack_id")
  75. legacy_paths = _legacy_paths(pack, f"rule_pack:{pid}")
  76. if legacy_paths:
  77. _fail(findings, "v4_rule_pack_legacy_field", f"{pid} contains legacy fields: {legacy_paths[:5]}")
  78. scorecard = pack.get("scorecard") or {}
  79. if scorecard.get("schema_version") != "v4_scorecard.v1":
  80. _fail(findings, "v4_scorecard_schema", f"{pid} scorecard.schema_version must be v4_scorecard.v1")
  81. dimensions = [row for row in scorecard.get("dimensions", []) if row.get("runtime_status") == "active"]
  82. keys = [row.get("key") for row in dimensions]
  83. if keys != ["query_relevance", "platform_performance"]:
  84. _fail(findings, "v4_scorecard_dimensions", f"{pid} active dimensions must be query_relevance/platform_performance, got {keys}")
  85. for row in dimensions:
  86. if row.get("max_score") != 50 or row.get("weight_percent") != 50:
  87. _fail(findings, "v4_scorecard_weight", f"{pid}/{row.get('key')} must be max_score=50 weight_percent=50")
  88. thresholds = pack.get("thresholds", [])
  89. expected_reasons = {
  90. "v4_query_and_platform_pass",
  91. "v4_score_review_needed",
  92. "v4_query_or_score_below_threshold",
  93. }
  94. if {row.get("decision_reason_code") for row in thresholds} != expected_reasons:
  95. _fail(findings, "v4_threshold_reasons", f"{pid} thresholds must use V4 reason codes")
  96. pool = [row for row in thresholds if row.get("decision_action") == "ADD_TO_CONTENT_POOL"]
  97. review = [row for row in thresholds if row.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"]
  98. reject = [row for row in thresholds if row.get("decision_action") == "REJECT_CONTENT"]
  99. if not pool or pool[0].get("min_score") != 70:
  100. _fail(findings, "v4_pool_threshold", f"{pid} pool threshold min_score must be 70")
  101. if not review or review[0].get("min_score") != 55:
  102. _fail(findings, "v4_review_threshold", f"{pid} review threshold min_score must be 55")
  103. if not reject or reject[0].get("max_score", 0) > 55:
  104. _fail(findings, "v4_reject_threshold", f"{pid} reject threshold must stay below 55")
  105. def _legacy_paths(value: Any, prefix: str) -> list[str]:
  106. paths: list[str] = []
  107. if isinstance(value, dict):
  108. for key, child in value.items():
  109. child_path = f"{prefix}.{key}"
  110. if key in LEGACY_FIELD_BLOCKLIST:
  111. paths.append(child_path)
  112. paths.extend(_legacy_paths(child, child_path))
  113. elif isinstance(value, list):
  114. for index, child in enumerate(value):
  115. paths.extend(_legacy_paths(child, f"{prefix}[{index}]"))
  116. elif isinstance(value, str):
  117. if _string_has_legacy_field(value):
  118. paths.append(prefix)
  119. return paths
  120. def _string_has_legacy_field(value: str) -> bool:
  121. normalized = value.replace("[", ".").replace("]", ".").replace("/", ".")
  122. parts = [part.strip() for part in normalized.split(".")]
  123. return any(part in LEGACY_FIELD_BLOCKLIST for part in parts)
  124. def main() -> int:
  125. args = _parse_args()
  126. path = args.config_path if args.config_path.is_absolute() else ROOT / args.config_path
  127. pkg = json.loads(path.read_text(encoding="utf-8"))
  128. findings = validate_rule_pack_config(pkg)
  129. result = {
  130. "status": "fail" if any(f["level"] == "fail" for f in findings) else "pass",
  131. "config_path": str(path.relative_to(ROOT)),
  132. "findings": findings,
  133. }
  134. print(json.dumps(result, ensure_ascii=False, indent=2))
  135. return 1 if result["status"] == "fail" else 0
  136. def _parse_args() -> argparse.Namespace:
  137. parser = argparse.ArgumentParser(description=__doc__)
  138. parser.add_argument("config_path", nargs="?", type=Path, default=RULE_PACK_PATH)
  139. return parser.parse_args()
  140. if __name__ == "__main__":
  141. sys.exit(main())