| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 |
- """Validate V4 M0 config contract without switching V3 production rule packs."""
- from __future__ import annotations
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- DATA_DIR = ROOT / "tech_documents/数据接口与来源"
- RULE_PACK_PATH = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json"
- WALK_STRATEGY_PATH = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
- LEGACY_FIELD_BLOCKLIST = {
- "fit_senior_50plus",
- "fit_confidence",
- "relevance_score",
- "platform_heat",
- "age_50_plus_level",
- }
- ENDPOINT_STATUSES = {
- "verified",
- "verified_unstable",
- "blocked",
- "source_only",
- "missing",
- }
- M2_PLATFORM_PROFILES = {"douyin", "kuaishou", "shipinhao"}
- PROFILE_EDGE_STATUSES = {"supported", "blocked"}
- PROFILE_ENDPOINT_STATUSES = {"verified", "verified_unstable", "blocked", "source_only", "missing"}
- OBSERVABLE_FIELDS = {
- "statistics.digg_count",
- "statistics.comment_count",
- "statistics.share_count",
- "statistics.collect_count",
- "statistics.play_count",
- }
- MISSING_OBSERVABLE_TYPES = {"natural_platform_missing", "runtime_missing"}
- M4_WALK_GATE_EDGES = {"query_next_page", "hashtag_to_query", "author_to_works"}
- M4_WALK_GATE_RAW_FIELDS = {"decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"}
- def main() -> int:
- findings = validate_v4_config_contract(ROOT)
- payload = {"status": "fail" if findings else "pass", "findings": findings}
- print(json.dumps(payload, ensure_ascii=False, indent=2))
- return 1 if findings else 0
- def validate_v4_config_contract(root: Path = ROOT) -> list[dict[str, str]]:
- data_dir = root / "tech_documents/数据接口与来源"
- findings: list[dict[str, str]] = []
- walk_graph = _load_json(data_dir / "walk_graph.json", findings)
- if walk_graph:
- _check_no_legacy_fields(findings, walk_graph, "walk_graph.json")
- _check_value(findings, "walk_graph_schema", walk_graph.get("schema_version"), "walk_graph.v2")
- _check_count(findings, "walk_graph_nodes", "walk_graph.nodes", walk_graph.get("nodes"), 8)
- _check_count(findings, "walk_graph_edges", "walk_graph.edges", walk_graph.get("edges"), 10)
- walk_policy = _load_json(data_dir / "walk_policy.json", findings)
- if walk_policy:
- _check_no_legacy_fields(findings, walk_policy, "walk_policy.json")
- _check_value(findings, "walk_policy_schema", walk_policy.get("schema_version"), "walk_policy.v1")
- for key in ["global", "edge_budgets", "dedup", "edge_permissions", "v4_walk_gate"]:
- if key not in walk_policy:
- _fail(findings, "walk_policy_missing_key", f"walk_policy.json missing {key}")
- _check_v4_walk_gate_config(findings, walk_policy.get("v4_walk_gate"), "walk_policy.v4_walk_gate")
- if walk_graph:
- graph_edges = {
- edge.get("edge_id")
- for edge in walk_graph.get("edges", [])
- if isinstance(edge, dict)
- }
- missing = sorted(M4_WALK_GATE_EDGES - graph_edges)
- if missing:
- _fail(findings, "v4_walk_gate_graph_edges_missing", f"walk_graph missing M4 gate edges: {missing}")
- endpoint_registry = _load_json(data_dir / "crawler_endpoints.registry.json", findings)
- if endpoint_registry:
- _check_no_legacy_fields(findings, endpoint_registry, "crawler_endpoints.registry.json")
- _check_value(
- findings,
- "crawler_endpoints_schema",
- endpoint_registry.get("registry_version"),
- "crawler_endpoints.v1",
- )
- endpoints = endpoint_registry.get("endpoints")
- if not isinstance(endpoints, list):
- _fail(findings, "crawler_endpoints_invalid", "endpoints must be a list")
- else:
- _check_count(findings, "crawler_endpoints_count", "endpoints", endpoints, 26)
- for endpoint in endpoints:
- if not isinstance(endpoint, dict):
- _fail(findings, "crawler_endpoint_invalid", "endpoint row must be object")
- continue
- for key in [
- "platform",
- "source_id",
- "status",
- "table_or_endpoint",
- "input_fields",
- "output_fields",
- ]:
- if key not in endpoint:
- _fail(
- findings,
- "crawler_endpoint_missing_key",
- f"{endpoint.get('source_id')} missing {key}",
- )
- status = endpoint.get("status")
- if not isinstance(status, (str, list)):
- _fail(
- findings,
- "crawler_endpoint_status_invalid",
- f"{endpoint.get('source_id')} status must be string or list",
- )
- continue
- statuses = status if isinstance(status, list) else [status]
- invalid_statuses = [item for item in statuses if item not in ENDPOINT_STATUSES]
- if invalid_statuses:
- _fail(
- findings,
- "crawler_endpoint_status_unknown",
- f"{endpoint.get('source_id')} unknown status: {invalid_statuses}",
- )
- field_map = _load_json(data_dir / "跨平台字段映射.json", findings)
- if field_map:
- _check_no_legacy_fields(findings, field_map, "跨平台字段映射.json")
- _check_value(
- findings,
- "field_map_schema",
- field_map.get("schema_version"),
- "cross_platform_field_map.v1",
- )
- if not isinstance(field_map.get("mappings"), dict):
- _fail(findings, "field_map_mappings_invalid", "mappings must be an object")
- for profile_path in sorted((data_dir / "platform_profiles").glob("*.json")):
- profile = _load_json(profile_path, findings)
- if not profile:
- continue
- _check_no_legacy_fields(findings, profile, profile_path.name)
- _check_value(
- findings,
- "platform_profile_schema",
- profile.get("schema_version"),
- "platform_profile.v1",
- label=profile_path.name,
- )
- for key in ["platform", "status", "runtime", "endpoints", "edges"]:
- if key not in profile:
- _fail(findings, "platform_profile_missing_key", f"{profile_path.name} missing {key}")
- if profile.get("platform") in M2_PLATFORM_PROFILES:
- _check_m2_platform_profile(findings, profile, profile_path.name)
- rule_pack_pkg = _load_json(RULE_PACK_PATH, findings)
- if rule_pack_pkg:
- _check_v4_rule_pack_contract(findings, rule_pack_pkg)
- walk_strategy = _load_json(WALK_STRATEGY_PATH, findings)
- if walk_strategy:
- _check_no_legacy_fields(findings, walk_strategy, "douyin_walk_strategy.v1.json")
- _check_v4_walk_strategy_contract(findings, walk_strategy)
- return findings
- def assert_no_v4_legacy_fields(value: Any, label: str = "v4_contract") -> list[str]:
- paths: list[str] = []
- _collect_legacy_paths(value, label, paths)
- return paths
- def _check_no_legacy_fields(
- findings: list[dict[str, str]],
- value: Any,
- label: str,
- ) -> None:
- paths = assert_no_v4_legacy_fields(value, label)
- if paths:
- _fail(
- findings,
- "v4_legacy_field_present",
- f"{label} contains legacy fields: {', '.join(paths[:5])}",
- )
- def _collect_legacy_paths(value: Any, prefix: str, paths: list[str]) -> None:
- if isinstance(value, dict):
- for key, child in value.items():
- child_path = f"{prefix}.{key}"
- if key in LEGACY_FIELD_BLOCKLIST:
- paths.append(child_path)
- _collect_legacy_paths(child, child_path, paths)
- elif isinstance(value, list):
- for index, child in enumerate(value):
- _collect_legacy_paths(child, f"{prefix}[{index}]", paths)
- elif isinstance(value, str):
- if _string_has_legacy_field(value):
- paths.append(prefix)
- def _string_has_legacy_field(value: str) -> bool:
- normalized = value.replace("[", ".").replace("]", ".").replace("/", ".")
- parts = [part.strip() for part in normalized.split(".")]
- return any(part in LEGACY_FIELD_BLOCKLIST for part in parts)
- def _load_json(path: Path, findings: list[dict[str, str]]) -> dict[str, Any] | None:
- try:
- return json.loads(path.read_text(encoding="utf-8"))
- except FileNotFoundError:
- _fail(findings, "file_missing", f"missing file: {path.relative_to(ROOT)}")
- except json.JSONDecodeError as exc:
- _fail(findings, "json_parse_failed", f"{path.relative_to(ROOT)} cannot parse: {exc}")
- return None
- def _check_value(
- findings: list[dict[str, str]],
- check_id: str,
- actual: Any,
- expected: Any,
- *,
- label: str = "",
- ) -> None:
- if actual != expected:
- target = f"{label} " if label else ""
- _fail(findings, check_id, f"{target}expected {expected}, got {actual}")
- def _check_count(
- findings: list[dict[str, str]],
- check_id: str,
- label: str,
- value: Any,
- expected: int,
- ) -> None:
- if not isinstance(value, list) or len(value) != expected:
- actual = len(value) if isinstance(value, list) else None
- _fail(findings, check_id, f"{label} expected {expected}, got {actual}")
- def _check_m2_platform_profile(
- findings: list[dict[str, str]],
- profile: dict[str, Any],
- label: str,
- ) -> None:
- platform = str(profile.get("platform") or "")
- edges = profile.get("edges")
- if not isinstance(edges, dict):
- _fail(findings, "platform_profile_edges_invalid", f"{label} edges must be an object")
- else:
- for edge_id, edge in edges.items():
- if not isinstance(edge, dict):
- _fail(findings, "platform_profile_edge_invalid", f"{label}.{edge_id} must be object")
- continue
- status = edge.get("status")
- if status not in PROFILE_EDGE_STATUSES:
- _fail(
- findings,
- "platform_profile_edge_status_unknown",
- f"{label}.{edge_id} status must be supported/blocked, got {status}",
- )
- endpoints = profile.get("endpoints")
- if not isinstance(endpoints, dict):
- _fail(findings, "platform_profile_endpoints_invalid", f"{label} endpoints must be an object")
- else:
- for endpoint_id, endpoint in endpoints.items():
- if not isinstance(endpoint, dict):
- _fail(
- findings,
- "platform_profile_endpoint_invalid",
- f"{label}.{endpoint_id} must be object",
- )
- continue
- status = endpoint.get("status")
- if status is not None and status not in PROFILE_ENDPOINT_STATUSES:
- _fail(
- findings,
- "platform_profile_endpoint_status_unknown",
- f"{label}.{endpoint_id} endpoint status must be stable enum, got {status}",
- )
- _check_observable_contract(findings, profile, label)
- _check_m2_platform_specifics(findings, profile, platform, label)
- def _check_observable_contract(
- findings: list[dict[str, str]],
- profile: dict[str, Any],
- label: str,
- ) -> None:
- observable_fields = profile.get("observable_fields")
- missing_fields = profile.get("missing_observable_fields")
- if not isinstance(observable_fields, list) or not observable_fields:
- _fail(findings, "observable_fields_invalid", f"{label} observable_fields must be non-empty list")
- else:
- for item in observable_fields:
- if not isinstance(item, dict):
- _fail(findings, "observable_field_invalid", f"{label} observable field must be object")
- continue
- field = item.get("field")
- if field not in OBSERVABLE_FIELDS:
- _fail(
- findings,
- "observable_field_unknown",
- f"{label} observable field unknown: {field}",
- )
- if item.get("availability") != "supported":
- _fail(
- findings,
- "observable_field_availability_invalid",
- f"{label} {field} availability must be supported",
- )
- if not isinstance(missing_fields, list):
- _fail(
- findings,
- "missing_observable_fields_invalid",
- f"{label} missing_observable_fields must be a list",
- )
- return
- seen_observable = {
- item.get("field")
- for item in observable_fields
- if isinstance(item, dict)
- } if isinstance(observable_fields, list) else set()
- seen_missing = set()
- for item in missing_fields:
- if not isinstance(item, dict):
- _fail(findings, "missing_observable_field_invalid", f"{label} missing field must be object")
- continue
- field = item.get("field")
- seen_missing.add(field)
- if field not in OBSERVABLE_FIELDS:
- _fail(findings, "missing_observable_field_unknown", f"{label} missing field unknown: {field}")
- missing_type = item.get("missing_type")
- if missing_type not in MISSING_OBSERVABLE_TYPES:
- _fail(
- findings,
- "missing_observable_type_unknown",
- f"{label} {field} missing_type must be natural_platform_missing/runtime_missing",
- )
- overlap = sorted(seen_observable & seen_missing)
- if overlap:
- _fail(
- findings,
- "observable_field_conflict",
- f"{label} fields cannot be both observable and missing: {overlap}",
- )
- uncovered = sorted(OBSERVABLE_FIELDS - seen_observable - seen_missing)
- if uncovered:
- _fail(
- findings,
- "observable_field_uncovered",
- f"{label} observable contract missing fields: {uncovered}",
- )
- def _check_m2_platform_specifics(
- findings: list[dict[str, str]],
- profile: dict[str, Any],
- platform: str,
- label: str,
- ) -> None:
- edges = profile.get("edges") if isinstance(profile.get("edges"), dict) else {}
- endpoints = profile.get("endpoints") if isinstance(profile.get("endpoints"), dict) else {}
- if platform == "kuaishou":
- _check_nested_status(findings, label, edges, "query_next_page", "blocked")
- _check_nested_status(findings, label, edges, "author_to_works", "blocked")
- _check_nested_status(findings, label, edges, "author_work_to_content", "blocked")
- if platform == "shipinhao":
- _check_nested_status(findings, label, endpoints, "account_info", "blocked")
- _check_nested_status(findings, label, edges, "author_to_works", "blocked")
- _check_nested_status(findings, label, edges, "author_work_to_content", "blocked")
- def _check_v4_walk_gate_config(
- findings: list[dict[str, str]],
- gate: Any,
- label: str,
- ) -> None:
- if not isinstance(gate, dict):
- _fail(findings, "v4_walk_gate_invalid", f"{label} must be an object")
- return
- if gate.get("requires_allow_walk") is not True:
- _fail(findings, "v4_walk_gate_requires_allow_walk", f"{label}.requires_allow_walk must be true")
- if gate.get("source_field") != "rule_decisions.jsonl[].decision_replay_data.allow_walk":
- _fail(findings, "v4_walk_gate_source_field", f"{label}.source_field is invalid")
- if gate.get("deny_reason_code") != "v4_allow_walk_denied":
- _fail(findings, "v4_walk_gate_deny_reason", f"{label}.deny_reason_code is invalid")
- if set(gate.get("applies_to_edges") or []) != M4_WALK_GATE_EDGES:
- _fail(findings, "v4_walk_gate_edges", f"{label}.applies_to_edges must cover M4 expansion edges")
- raw_fields = set(gate.get("raw_payload_fields") or [])
- if not M4_WALK_GATE_RAW_FIELDS <= raw_fields:
- _fail(findings, "v4_walk_gate_raw_fields", f"{label}.raw_payload_fields missing required fields")
- def _check_v4_walk_strategy_contract(
- findings: list[dict[str, str]],
- strategy: dict[str, Any],
- ) -> None:
- rows = strategy.get("v4_walk_gate")
- if not isinstance(rows, list) or not rows:
- _fail(findings, "v4_walk_strategy_gate_missing", "douyin_walk_strategy.v1.json missing v4_walk_gate")
- return
- by_id = {row.get("gate_id"): row for row in rows if isinstance(row, dict)}
- gate = by_id.get("allow_walk_required")
- if not gate:
- _fail(findings, "v4_walk_strategy_gate_missing", "allow_walk_required gate missing")
- return
- _check_v4_walk_gate_config(findings, gate, "douyin_walk_strategy.v4_walk_gate.allow_walk_required")
- def _check_nested_status(
- findings: list[dict[str, str]],
- label: str,
- section: dict[str, Any],
- key: str,
- expected: str,
- ) -> None:
- value = section.get(key)
- actual = value.get("status") if isinstance(value, dict) else None
- if actual != expected:
- _fail(
- findings,
- "platform_profile_status_mismatch",
- f"{label}.{key} expected status {expected}, got {actual}",
- )
- def _check_v4_rule_pack_contract(
- findings: list[dict[str, str]],
- pkg: dict[str, Any],
- ) -> None:
- strategy_version = (pkg.get("strategy_binding") or {}).get("strategy_version")
- for dispatch in pkg.get("rule_pack_dispatch", []):
- if dispatch.get("dispatch_enabled") and dispatch.get("strategy_version") == "V4":
- if dispatch.get("rule_pack_version") != "4.0.0":
- _fail(
- findings,
- "v4_rule_pack_version_invalid",
- f"{dispatch.get('dispatch_id')} V4 dispatch must use rule_pack_version 4.0.0",
- )
- for pack in pkg.get("rule_packs", []):
- scorecard = pack.get("scorecard") or {}
- is_v4 = (
- strategy_version == "V4"
- or pack.get("version") == "4.0.0"
- or scorecard.get("schema_version") == "v4_scorecard.v1"
- )
- if not is_v4:
- continue
- _check_no_legacy_fields(findings, pack, f"rule_pack:{pack.get('rule_pack_id')}")
- if scorecard.get("schema_version") != "v4_scorecard.v1":
- _fail(
- findings,
- "v4_scorecard_schema_invalid",
- f"{pack.get('rule_pack_id')} scorecard.schema_version must be v4_scorecard.v1",
- )
- dimensions = [row for row in scorecard.get("dimensions", []) if row.get("runtime_status") == "active"]
- keys = [row.get("key") for row in dimensions]
- if keys != ["query_relevance", "platform_performance"]:
- _fail(
- findings,
- "v4_scorecard_dimensions_invalid",
- f"{pack.get('rule_pack_id')} active dimensions must be query_relevance/platform_performance",
- )
- required_fields = set((pack.get("input_contract") or {}).get("required_fields") or [])
- for field in [
- "pattern_match_result.query_relevance_score",
- "content_engagement_metrics.platform_performance.platform_performance_score",
- "content_engagement_metrics.platform_performance.missing_observable_fields",
- ]:
- if field not in required_fields:
- _fail(
- findings,
- "v4_rule_pack_required_field_missing",
- f"{pack.get('rule_pack_id')} missing required field {field}",
- )
- def _fail(findings: list[dict[str, str]], check_id: str, message: str) -> None:
- findings.append({"level": "fail", "check_id": check_id, "message": message})
- if __name__ == "__main__":
- sys.exit(main())
|