validate_rule_pack_config.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. """Validate referential integrity inside douyin_rule_packs.v1.json (V2-M1D).
  2. Mirrors the walk-side checks in walk_strategy_json (which already cover walk FKs).
  3. Here we close the rule-pack side: every decision_action / decision_reason_code /
  4. scoring_rule dimension_key / dispatch rule_pack_id must resolve to an
  5. authoritative catalog entry within the same file.
  6. """
  7. from __future__ import annotations
  8. import argparse
  9. import json
  10. import sys
  11. from pathlib import Path
  12. from typing import Any
  13. ROOT = Path(__file__).resolve().parents[1]
  14. RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
  15. def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
  16. findings.append({"level": "fail", "check_id": check_id, "message": message})
  17. def _warn(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
  18. findings.append({"level": "warn", "check_id": check_id, "message": message})
  19. def validate_rule_pack_config(pkg: dict[str, Any]) -> list[dict[str, Any]]:
  20. findings: list[dict[str, Any]] = []
  21. rule_packs = pkg.get("rule_packs", [])
  22. rule_pack_ids = {p.get("rule_pack_id") for p in rule_packs}
  23. actions: set[str] = set()
  24. for entry in pkg.get("decision_action_catalog", []):
  25. actions.update(entry.get("allowed_actions", []))
  26. reason_codes = {r.get("decision_reason_code") for r in pkg.get("decision_reason_codes", [])}
  27. enabled_by_group: dict[tuple[Any, ...], list[str]] = {}
  28. for dispatch in pkg.get("rule_pack_dispatch", []):
  29. if dispatch.get("rule_pack_id") not in rule_pack_ids:
  30. _fail(findings, "dispatch_rule_pack_id",
  31. f"{dispatch.get('dispatch_id')} references unknown rule_pack_id: {dispatch.get('rule_pack_id')}")
  32. if dispatch.get("dispatch_enabled"):
  33. group = (dispatch.get("platform"), dispatch.get("strategy_version"), dispatch.get("runtime_stage"),
  34. dispatch.get("target_entity"), dispatch.get("content_format"))
  35. enabled_by_group.setdefault(group, []).append(dispatch.get("rule_pack_id"))
  36. for group, pack_ids in enabled_by_group.items():
  37. if len(pack_ids) > 1:
  38. _fail(findings, "dispatch_conflict",
  39. f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for group {group}: {pack_ids}")
  40. for pack in rule_packs:
  41. pid = pack.get("rule_pack_id")
  42. dimension_keys = {d.get("key") for d in pack.get("scorecard", {}).get("dimensions", [])}
  43. for gate in pack.get("hard_gates", []):
  44. if gate.get("decision_action") not in actions:
  45. _fail(findings, "hard_gate_action", f"{pid}/{gate.get('gate_id')} unknown decision_action: {gate.get('decision_action')}")
  46. if gate.get("decision_reason_code") not in reason_codes:
  47. # decision_reason_codes is a curated subset, not an exhaustive enum -> warn only.
  48. _warn(findings, "hard_gate_reason", f"{pid}/{gate.get('gate_id')} reason_code not in catalog: {gate.get('decision_reason_code')}")
  49. for rule in pack.get("scorecard", {}).get("scoring_rules", []):
  50. if rule.get("dimension_key") not in dimension_keys:
  51. _fail(findings, "scoring_rule_dimension", f"{pid}/{rule.get('scoring_rule_id')} unknown dimension_key: {rule.get('dimension_key')}")
  52. for i, threshold in enumerate(pack.get("thresholds", [])):
  53. if threshold.get("decision_action") not in actions:
  54. _fail(findings, "threshold_action", f"{pid}/threshold[{i}] unknown decision_action: {threshold.get('decision_action')}")
  55. if threshold.get("decision_reason_code") not in reason_codes:
  56. _warn(findings, "threshold_reason", f"{pid}/threshold[{i}] reason_code not in catalog: {threshold.get('decision_reason_code')}")
  57. return findings
  58. def main() -> int:
  59. args = _parse_args()
  60. path = args.config_path if args.config_path.is_absolute() else ROOT / args.config_path
  61. pkg = json.loads(path.read_text(encoding="utf-8"))
  62. findings = validate_rule_pack_config(pkg)
  63. result = {
  64. "status": "fail" if any(f["level"] == "fail" for f in findings) else "pass",
  65. "config_path": str(path.relative_to(ROOT)),
  66. "findings": findings,
  67. }
  68. print(json.dumps(result, ensure_ascii=False, indent=2))
  69. return 1 if result["status"] == "fail" else 0
  70. def _parse_args() -> argparse.Namespace:
  71. parser = argparse.ArgumentParser(description=__doc__)
  72. parser.add_argument("config_path", nargs="?", type=Path, default=RULE_PACK_PATH)
  73. return parser.parse_args()
  74. if __name__ == "__main__":
  75. sys.exit(main())