| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- """Validate referential integrity inside douyin_rule_packs.v1.json (V2-M1D).
- Mirrors the walk-side checks in walk_strategy_json (which already cover walk FKs).
- Here we close the rule-pack side: every decision_action / decision_reason_code /
- scoring_rule dimension_key / dispatch rule_pack_id must resolve to an
- authoritative catalog entry within the same file.
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
- def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
- findings.append({"level": "fail", "check_id": check_id, "message": message})
- def _warn(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
- findings.append({"level": "warn", "check_id": check_id, "message": message})
- def validate_rule_pack_config(pkg: dict[str, Any]) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- rule_packs = pkg.get("rule_packs", [])
- rule_pack_ids = {p.get("rule_pack_id") for p in rule_packs}
- actions: set[str] = set()
- for entry in pkg.get("decision_action_catalog", []):
- actions.update(entry.get("allowed_actions", []))
- reason_codes = {r.get("decision_reason_code") for r in pkg.get("decision_reason_codes", [])}
- enabled_by_group: dict[tuple[Any, ...], list[str]] = {}
- for dispatch in pkg.get("rule_pack_dispatch", []):
- if dispatch.get("rule_pack_id") not in rule_pack_ids:
- _fail(findings, "dispatch_rule_pack_id",
- f"{dispatch.get('dispatch_id')} references unknown rule_pack_id: {dispatch.get('rule_pack_id')}")
- if dispatch.get("dispatch_enabled"):
- group = (dispatch.get("platform"), dispatch.get("strategy_version"), dispatch.get("runtime_stage"),
- dispatch.get("target_entity"), dispatch.get("content_format"))
- enabled_by_group.setdefault(group, []).append(dispatch.get("rule_pack_id"))
- for group, pack_ids in enabled_by_group.items():
- if len(pack_ids) > 1:
- _fail(findings, "dispatch_conflict",
- f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for group {group}: {pack_ids}")
- for pack in rule_packs:
- pid = pack.get("rule_pack_id")
- dimension_keys = {d.get("key") for d in pack.get("scorecard", {}).get("dimensions", [])}
- for gate in pack.get("hard_gates", []):
- if gate.get("decision_action") not in actions:
- _fail(findings, "hard_gate_action", f"{pid}/{gate.get('gate_id')} unknown decision_action: {gate.get('decision_action')}")
- if gate.get("decision_reason_code") not in reason_codes:
- # decision_reason_codes is a curated subset, not an exhaustive enum -> warn only.
- _warn(findings, "hard_gate_reason", f"{pid}/{gate.get('gate_id')} reason_code not in catalog: {gate.get('decision_reason_code')}")
- for rule in pack.get("scorecard", {}).get("scoring_rules", []):
- if rule.get("dimension_key") not in dimension_keys:
- _fail(findings, "scoring_rule_dimension", f"{pid}/{rule.get('scoring_rule_id')} unknown dimension_key: {rule.get('dimension_key')}")
- for i, threshold in enumerate(pack.get("thresholds", [])):
- if threshold.get("decision_action") not in actions:
- _fail(findings, "threshold_action", f"{pid}/threshold[{i}] unknown decision_action: {threshold.get('decision_action')}")
- if threshold.get("decision_reason_code") not in reason_codes:
- _warn(findings, "threshold_reason", f"{pid}/threshold[{i}] reason_code not in catalog: {threshold.get('decision_reason_code')}")
- return findings
- def main() -> int:
- args = _parse_args()
- path = args.config_path if args.config_path.is_absolute() else ROOT / args.config_path
- pkg = json.loads(path.read_text(encoding="utf-8"))
- findings = validate_rule_pack_config(pkg)
- result = {
- "status": "fail" if any(f["level"] == "fail" for f in findings) else "pass",
- "config_path": str(path.relative_to(ROOT)),
- "findings": findings,
- }
- print(json.dumps(result, ensure_ascii=False, indent=2))
- return 1 if result["status"] == "fail" else 0
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("config_path", nargs="?", type=Path, default=RULE_PACK_PATH)
- return parser.parse_args()
- if __name__ == "__main__":
- sys.exit(main())
|