| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- """Validate referential integrity inside douyin_rule_packs.v1.json (V2-M1D).
- Mirrors the walk-side checks in walk_strategy_json (which already cover walk FKs).
- Here we close the rule-pack side: every decision_action / decision_reason_code /
- scoring_rule dimension_key / dispatch rule_pack_id must resolve to an
- authoritative catalog entry within the same file.
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
- LEGACY_FIELD_BLOCKLIST = {
- "fit_senior_50plus",
- "fit_confidence",
- "relevance_score",
- "platform_heat",
- "age_50_plus_level",
- }
- def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
- findings.append({"level": "fail", "check_id": check_id, "message": message})
- def _warn(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
- findings.append({"level": "warn", "check_id": check_id, "message": message})
- def validate_rule_pack_config(pkg: dict[str, Any]) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- rule_packs = pkg.get("rule_packs", [])
- rule_pack_ids = {p.get("rule_pack_id") for p in rule_packs}
- actions: set[str] = set()
- for entry in pkg.get("decision_action_catalog", []):
- actions.update(entry.get("allowed_actions", []))
- reason_codes = {r.get("decision_reason_code") for r in pkg.get("decision_reason_codes", [])}
- enabled_by_group: dict[tuple[Any, ...], list[str]] = {}
- for dispatch in pkg.get("rule_pack_dispatch", []):
- if dispatch.get("rule_pack_id") not in rule_pack_ids:
- _fail(findings, "dispatch_rule_pack_id",
- f"{dispatch.get('dispatch_id')} references unknown rule_pack_id: {dispatch.get('rule_pack_id')}")
- if dispatch.get("dispatch_enabled"):
- group = (dispatch.get("platform"), dispatch.get("strategy_version"), dispatch.get("runtime_stage"),
- dispatch.get("target_entity"), dispatch.get("content_format"))
- enabled_by_group.setdefault(group, []).append(dispatch.get("rule_pack_id"))
- for group, pack_ids in enabled_by_group.items():
- if len(pack_ids) > 1:
- _fail(findings, "dispatch_conflict",
- f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for group {group}: {pack_ids}")
- for pack in rule_packs:
- pid = pack.get("rule_pack_id")
- dimension_keys = {d.get("key") for d in pack.get("scorecard", {}).get("dimensions", [])}
- for gate in pack.get("hard_gates", []):
- if gate.get("decision_action") not in actions:
- _fail(findings, "hard_gate_action", f"{pid}/{gate.get('gate_id')} unknown decision_action: {gate.get('decision_action')}")
- if gate.get("decision_reason_code") not in reason_codes:
- # decision_reason_codes is a curated subset, not an exhaustive enum -> warn only.
- _warn(findings, "hard_gate_reason", f"{pid}/{gate.get('gate_id')} reason_code not in catalog: {gate.get('decision_reason_code')}")
- for rule in pack.get("scorecard", {}).get("scoring_rules", []):
- if rule.get("dimension_key") not in dimension_keys:
- _fail(findings, "scoring_rule_dimension", f"{pid}/{rule.get('scoring_rule_id')} unknown dimension_key: {rule.get('dimension_key')}")
- for i, threshold in enumerate(pack.get("thresholds", [])):
- if threshold.get("decision_action") not in actions:
- _fail(findings, "threshold_action", f"{pid}/threshold[{i}] unknown decision_action: {threshold.get('decision_action')}")
- if threshold.get("decision_reason_code") not in reason_codes:
- _warn(findings, "threshold_reason", f"{pid}/threshold[{i}] reason_code not in catalog: {threshold.get('decision_reason_code')}")
- if _is_v4_pack(pack, pkg):
- _check_v4_rule_pack(findings, pack)
- return findings
- def _is_v4_pack(pack: dict[str, Any], pkg: dict[str, Any]) -> bool:
- return (
- (pkg.get("strategy_binding") or {}).get("strategy_version") == "V4"
- or (pack.get("scorecard") or {}).get("schema_version") == "v4_scorecard.v1"
- or pack.get("version") == "4.0.0"
- )
- def _check_v4_rule_pack(findings: list[dict[str, Any]], pack: dict[str, Any]) -> None:
- pid = pack.get("rule_pack_id")
- legacy_paths = _legacy_paths(pack, f"rule_pack:{pid}")
- if legacy_paths:
- _fail(findings, "v4_rule_pack_legacy_field", f"{pid} contains legacy fields: {legacy_paths[:5]}")
- scorecard = pack.get("scorecard") or {}
- if scorecard.get("schema_version") != "v4_scorecard.v1":
- _fail(findings, "v4_scorecard_schema", f"{pid} scorecard.schema_version must be v4_scorecard.v1")
- dimensions = [row for row in scorecard.get("dimensions", []) if row.get("runtime_status") == "active"]
- keys = [row.get("key") for row in dimensions]
- if keys != ["query_relevance", "platform_performance"]:
- _fail(findings, "v4_scorecard_dimensions", f"{pid} active dimensions must be query_relevance/platform_performance, got {keys}")
- for row in dimensions:
- if row.get("max_score") != 50 or row.get("weight_percent") != 50:
- _fail(findings, "v4_scorecard_weight", f"{pid}/{row.get('key')} must be max_score=50 weight_percent=50")
- thresholds = pack.get("thresholds", [])
- expected_reasons = {
- "v4_query_and_platform_pass",
- "v4_score_review_needed",
- "v4_query_or_score_below_threshold",
- }
- if {row.get("decision_reason_code") for row in thresholds} != expected_reasons:
- _fail(findings, "v4_threshold_reasons", f"{pid} thresholds must use V4 reason codes")
- pool = [row for row in thresholds if row.get("decision_action") == "ADD_TO_CONTENT_POOL"]
- review = [row for row in thresholds if row.get("decision_action") == "KEEP_CONTENT_FOR_REVIEW"]
- reject = [row for row in thresholds if row.get("decision_action") == "REJECT_CONTENT"]
- if not pool or pool[0].get("min_score") != 70:
- _fail(findings, "v4_pool_threshold", f"{pid} pool threshold min_score must be 70")
- if not review or review[0].get("min_score") != 55:
- _fail(findings, "v4_review_threshold", f"{pid} review threshold min_score must be 55")
- if not reject or reject[0].get("max_score", 0) > 55:
- _fail(findings, "v4_reject_threshold", f"{pid} reject threshold must stay below 55")
- def _legacy_paths(value: Any, prefix: str) -> list[str]:
- paths: list[str] = []
- if isinstance(value, dict):
- for key, child in value.items():
- child_path = f"{prefix}.{key}"
- if key in LEGACY_FIELD_BLOCKLIST:
- paths.append(child_path)
- paths.extend(_legacy_paths(child, child_path))
- elif isinstance(value, list):
- for index, child in enumerate(value):
- paths.extend(_legacy_paths(child, f"{prefix}[{index}]"))
- elif isinstance(value, str):
- if _string_has_legacy_field(value):
- paths.append(prefix)
- return paths
- def _string_has_legacy_field(value: str) -> bool:
- normalized = value.replace("[", ".").replace("]", ".").replace("/", ".")
- parts = [part.strip() for part in normalized.split(".")]
- return any(part in LEGACY_FIELD_BLOCKLIST for part in parts)
- def main() -> int:
- args = _parse_args()
- path = args.config_path if args.config_path.is_absolute() else ROOT / args.config_path
- pkg = json.loads(path.read_text(encoding="utf-8"))
- findings = validate_rule_pack_config(pkg)
- result = {
- "status": "fail" if any(f["level"] == "fail" for f in findings) else "pass",
- "config_path": str(path.relative_to(ROOT)),
- "findings": findings,
- }
- print(json.dumps(result, ensure_ascii=False, indent=2))
- return 1 if result["status"] == "fail" else 0
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("config_path", nargs="?", type=Path, default=RULE_PACK_PATH)
- return parser.parse_args()
- if __name__ == "__main__":
- sys.exit(main())
|