"""Validate referential integrity inside douyin_rule_packs.v1.json (V2-M1D). Mirrors the walk-side checks in walk_strategy_json (which already cover walk FKs). Here we close the rule-pack side: every decision_action / decision_reason_code / scoring_rule dimension_key / dispatch rule_pack_id must resolve to an authoritative catalog entry within the same file. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json") def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None: findings.append({"level": "fail", "check_id": check_id, "message": message}) def _warn(findings: list[dict[str, Any]], check_id: str, message: str) -> None: findings.append({"level": "warn", "check_id": check_id, "message": message}) def validate_rule_pack_config(pkg: dict[str, Any]) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] rule_packs = pkg.get("rule_packs", []) rule_pack_ids = {p.get("rule_pack_id") for p in rule_packs} actions: set[str] = set() for entry in pkg.get("decision_action_catalog", []): actions.update(entry.get("allowed_actions", [])) reason_codes = {r.get("decision_reason_code") for r in pkg.get("decision_reason_codes", [])} enabled_by_group: dict[tuple[Any, ...], list[str]] = {} for dispatch in pkg.get("rule_pack_dispatch", []): if dispatch.get("rule_pack_id") not in rule_pack_ids: _fail(findings, "dispatch_rule_pack_id", f"{dispatch.get('dispatch_id')} references unknown rule_pack_id: {dispatch.get('rule_pack_id')}") if dispatch.get("dispatch_enabled"): group = (dispatch.get("platform"), dispatch.get("strategy_version"), dispatch.get("runtime_stage"), dispatch.get("target_entity"), dispatch.get("content_format")) enabled_by_group.setdefault(group, []).append(dispatch.get("rule_pack_id")) for group, pack_ids in enabled_by_group.items(): if len(pack_ids) > 1: _fail(findings, "dispatch_conflict", f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for group {group}: {pack_ids}") for pack in rule_packs: pid = pack.get("rule_pack_id") dimension_keys = {d.get("key") for d in pack.get("scorecard", {}).get("dimensions", [])} for gate in pack.get("hard_gates", []): if gate.get("decision_action") not in actions: _fail(findings, "hard_gate_action", f"{pid}/{gate.get('gate_id')} unknown decision_action: {gate.get('decision_action')}") if gate.get("decision_reason_code") not in reason_codes: # decision_reason_codes is a curated subset, not an exhaustive enum -> warn only. _warn(findings, "hard_gate_reason", f"{pid}/{gate.get('gate_id')} reason_code not in catalog: {gate.get('decision_reason_code')}") for rule in pack.get("scorecard", {}).get("scoring_rules", []): if rule.get("dimension_key") not in dimension_keys: _fail(findings, "scoring_rule_dimension", f"{pid}/{rule.get('scoring_rule_id')} unknown dimension_key: {rule.get('dimension_key')}") for i, threshold in enumerate(pack.get("thresholds", [])): if threshold.get("decision_action") not in actions: _fail(findings, "threshold_action", f"{pid}/threshold[{i}] unknown decision_action: {threshold.get('decision_action')}") if threshold.get("decision_reason_code") not in reason_codes: _warn(findings, "threshold_reason", f"{pid}/threshold[{i}] reason_code not in catalog: {threshold.get('decision_reason_code')}") return findings def main() -> int: args = _parse_args() path = args.config_path if args.config_path.is_absolute() else ROOT / args.config_path pkg = json.loads(path.read_text(encoding="utf-8")) findings = validate_rule_pack_config(pkg) result = { "status": "fail" if any(f["level"] == "fail" for f in findings) else "pass", "config_path": str(path.relative_to(ROOT)), "findings": findings, } print(json.dumps(result, ensure_ascii=False, indent=2)) return 1 if result["status"] == "fail" else 0 def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("config_path", nargs="?", type=Path, default=RULE_PACK_PATH) return parser.parse_args() if __name__ == "__main__": sys.exit(main())