"""Validate V4 M0 config contract without switching V3 production rule packs.""" from __future__ import annotations import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DATA_DIR = ROOT / "tech_documents/数据接口与来源" RULE_PACK_PATH = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json" WALK_STRATEGY_PATH = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json" LEGACY_FIELD_BLOCKLIST = { "fit_senior_50plus", "fit_confidence", "relevance_score", "platform_heat", "age_50_plus_level", } ENDPOINT_STATUSES = { "verified", "verified_unstable", "blocked", "source_only", "missing", } M2_PLATFORM_PROFILES = {"douyin", "kuaishou", "shipinhao"} PROFILE_EDGE_STATUSES = {"supported", "blocked"} PROFILE_ENDPOINT_STATUSES = {"verified", "verified_unstable", "blocked", "source_only", "missing"} OBSERVABLE_FIELDS = { "statistics.digg_count", "statistics.comment_count", "statistics.share_count", "statistics.collect_count", "statistics.play_count", } MISSING_OBSERVABLE_TYPES = {"natural_platform_missing", "runtime_missing"} M4_WALK_GATE_EDGES = {"hashtag_to_query", "author_to_works"} M4_WALK_GATE_RAW_FIELDS = {"decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"} def main() -> int: findings = validate_v4_config_contract(ROOT) payload = {"status": "fail" if findings else "pass", "findings": findings} print(json.dumps(payload, ensure_ascii=False, indent=2)) return 1 if findings else 0 def validate_v4_config_contract(root: Path = ROOT) -> list[dict[str, str]]: data_dir = root / "tech_documents/数据接口与来源" findings: list[dict[str, str]] = [] walk_graph = _load_json(data_dir / "walk_graph.json", findings) if walk_graph: _check_no_legacy_fields(findings, walk_graph, "walk_graph.json") _check_value(findings, "walk_graph_schema", walk_graph.get("schema_version"), "walk_graph.v2") _check_count(findings, "walk_graph_nodes", "walk_graph.nodes", walk_graph.get("nodes"), 8) _check_count(findings, "walk_graph_edges", "walk_graph.edges", walk_graph.get("edges"), 9) walk_policy = _load_json(data_dir / "walk_policy.json", findings) if walk_policy: _check_no_legacy_fields(findings, walk_policy, "walk_policy.json") _check_value(findings, "walk_policy_schema", walk_policy.get("schema_version"), "walk_policy.v1") for key in ["global", "edge_budgets", "dedup", "edge_permissions", "v4_walk_gate"]: if key not in walk_policy: _fail(findings, "walk_policy_missing_key", f"walk_policy.json missing {key}") _check_v4_walk_gate_config(findings, walk_policy.get("v4_walk_gate"), "walk_policy.v4_walk_gate") if walk_graph: graph_edges = { edge.get("edge_id") for edge in walk_graph.get("edges", []) if isinstance(edge, dict) } missing = sorted(M4_WALK_GATE_EDGES - graph_edges) if missing: _fail(findings, "v4_walk_gate_graph_edges_missing", f"walk_graph missing M4 gate edges: {missing}") endpoint_registry = _load_json(data_dir / "crawler_endpoints.registry.json", findings) if endpoint_registry: _check_no_legacy_fields(findings, endpoint_registry, "crawler_endpoints.registry.json") _check_value( findings, "crawler_endpoints_schema", endpoint_registry.get("registry_version"), "crawler_endpoints.v1", ) endpoints = endpoint_registry.get("endpoints") if not isinstance(endpoints, list): _fail(findings, "crawler_endpoints_invalid", "endpoints must be a list") else: _check_count(findings, "crawler_endpoints_count", "endpoints", endpoints, 26) for endpoint in endpoints: if not isinstance(endpoint, dict): _fail(findings, "crawler_endpoint_invalid", "endpoint row must be object") continue for key in [ "platform", "source_id", "status", "table_or_endpoint", "input_fields", "output_fields", ]: if key not in endpoint: _fail( findings, "crawler_endpoint_missing_key", f"{endpoint.get('source_id')} missing {key}", ) status = endpoint.get("status") if not isinstance(status, (str, list)): _fail( findings, "crawler_endpoint_status_invalid", f"{endpoint.get('source_id')} status must be string or list", ) continue statuses = status if isinstance(status, list) else [status] invalid_statuses = [item for item in statuses if item not in ENDPOINT_STATUSES] if invalid_statuses: _fail( findings, "crawler_endpoint_status_unknown", f"{endpoint.get('source_id')} unknown status: {invalid_statuses}", ) field_map = _load_json(data_dir / "跨平台字段映射.json", findings) if field_map: _check_no_legacy_fields(findings, field_map, "跨平台字段映射.json") _check_value( findings, "field_map_schema", field_map.get("schema_version"), "cross_platform_field_map.v1", ) if not isinstance(field_map.get("mappings"), dict): _fail(findings, "field_map_mappings_invalid", "mappings must be an object") for profile_path in sorted((data_dir / "platform_profiles").glob("*.json")): profile = _load_json(profile_path, findings) if not profile: continue _check_no_legacy_fields(findings, profile, profile_path.name) _check_value( findings, "platform_profile_schema", profile.get("schema_version"), "platform_profile.v1", label=profile_path.name, ) for key in ["platform", "status", "runtime", "endpoints", "edges"]: if key not in profile: _fail(findings, "platform_profile_missing_key", f"{profile_path.name} missing {key}") if profile.get("platform") in M2_PLATFORM_PROFILES: _check_m2_platform_profile(findings, profile, profile_path.name) rule_pack_pkg = _load_json(RULE_PACK_PATH, findings) if rule_pack_pkg: _check_v4_rule_pack_contract(findings, rule_pack_pkg) walk_strategy = _load_json(WALK_STRATEGY_PATH, findings) if walk_strategy: _check_no_legacy_fields(findings, walk_strategy, "douyin_walk_strategy.v1.json") _check_v4_walk_strategy_contract(findings, walk_strategy) return findings def assert_no_v4_legacy_fields(value: Any, label: str = "v4_contract") -> list[str]: paths: list[str] = [] _collect_legacy_paths(value, label, paths) return paths def _check_no_legacy_fields( findings: list[dict[str, str]], value: Any, label: str, ) -> None: paths = assert_no_v4_legacy_fields(value, label) if paths: _fail( findings, "v4_legacy_field_present", f"{label} contains legacy fields: {', '.join(paths[:5])}", ) def _collect_legacy_paths(value: Any, prefix: str, paths: list[str]) -> None: if isinstance(value, dict): for key, child in value.items(): child_path = f"{prefix}.{key}" if key in LEGACY_FIELD_BLOCKLIST: paths.append(child_path) _collect_legacy_paths(child, child_path, paths) elif isinstance(value, list): for index, child in enumerate(value): _collect_legacy_paths(child, f"{prefix}[{index}]", paths) elif isinstance(value, str): if _string_has_legacy_field(value): paths.append(prefix) def _string_has_legacy_field(value: str) -> bool: normalized = value.replace("[", ".").replace("]", ".").replace("/", ".") parts = [part.strip() for part in normalized.split(".")] return any(part in LEGACY_FIELD_BLOCKLIST for part in parts) def _load_json(path: Path, findings: list[dict[str, str]]) -> dict[str, Any] | None: try: return json.loads(path.read_text(encoding="utf-8")) except FileNotFoundError: _fail(findings, "file_missing", f"missing file: {path.relative_to(ROOT)}") except json.JSONDecodeError as exc: _fail(findings, "json_parse_failed", f"{path.relative_to(ROOT)} cannot parse: {exc}") return None def _check_value( findings: list[dict[str, str]], check_id: str, actual: Any, expected: Any, *, label: str = "", ) -> None: if actual != expected: target = f"{label} " if label else "" _fail(findings, check_id, f"{target}expected {expected}, got {actual}") def _check_count( findings: list[dict[str, str]], check_id: str, label: str, value: Any, expected: int, ) -> None: if not isinstance(value, list) or len(value) != expected: actual = len(value) if isinstance(value, list) else None _fail(findings, check_id, f"{label} expected {expected}, got {actual}") def _check_m2_platform_profile( findings: list[dict[str, str]], profile: dict[str, Any], label: str, ) -> None: platform = str(profile.get("platform") or "") edges = profile.get("edges") if not isinstance(edges, dict): _fail(findings, "platform_profile_edges_invalid", f"{label} edges must be an object") else: for edge_id, edge in edges.items(): if not isinstance(edge, dict): _fail(findings, "platform_profile_edge_invalid", f"{label}.{edge_id} must be object") continue status = edge.get("status") if status not in PROFILE_EDGE_STATUSES: _fail( findings, "platform_profile_edge_status_unknown", f"{label}.{edge_id} status must be supported/blocked, got {status}", ) endpoints = profile.get("endpoints") if not isinstance(endpoints, dict): _fail(findings, "platform_profile_endpoints_invalid", f"{label} endpoints must be an object") else: for endpoint_id, endpoint in endpoints.items(): if not isinstance(endpoint, dict): _fail( findings, "platform_profile_endpoint_invalid", f"{label}.{endpoint_id} must be object", ) continue status = endpoint.get("status") if status is not None and status not in PROFILE_ENDPOINT_STATUSES: _fail( findings, "platform_profile_endpoint_status_unknown", f"{label}.{endpoint_id} endpoint status must be stable enum, got {status}", ) _check_observable_contract(findings, profile, label) _check_m2_platform_specifics(findings, profile, platform, label) def _check_observable_contract( findings: list[dict[str, str]], profile: dict[str, Any], label: str, ) -> None: observable_fields = profile.get("observable_fields") missing_fields = profile.get("missing_observable_fields") if not isinstance(observable_fields, list) or not observable_fields: _fail(findings, "observable_fields_invalid", f"{label} observable_fields must be non-empty list") else: for item in observable_fields: if not isinstance(item, dict): _fail(findings, "observable_field_invalid", f"{label} observable field must be object") continue field = item.get("field") if field not in OBSERVABLE_FIELDS: _fail( findings, "observable_field_unknown", f"{label} observable field unknown: {field}", ) if item.get("availability") != "supported": _fail( findings, "observable_field_availability_invalid", f"{label} {field} availability must be supported", ) if not isinstance(missing_fields, list): _fail( findings, "missing_observable_fields_invalid", f"{label} missing_observable_fields must be a list", ) return seen_observable = { item.get("field") for item in observable_fields if isinstance(item, dict) } if isinstance(observable_fields, list) else set() seen_missing = set() for item in missing_fields: if not isinstance(item, dict): _fail(findings, "missing_observable_field_invalid", f"{label} missing field must be object") continue field = item.get("field") seen_missing.add(field) if field not in OBSERVABLE_FIELDS: _fail(findings, "missing_observable_field_unknown", f"{label} missing field unknown: {field}") missing_type = item.get("missing_type") if missing_type not in MISSING_OBSERVABLE_TYPES: _fail( findings, "missing_observable_type_unknown", f"{label} {field} missing_type must be natural_platform_missing/runtime_missing", ) overlap = sorted(seen_observable & seen_missing) if overlap: _fail( findings, "observable_field_conflict", f"{label} fields cannot be both observable and missing: {overlap}", ) uncovered = sorted(OBSERVABLE_FIELDS - seen_observable - seen_missing) if uncovered: _fail( findings, "observable_field_uncovered", f"{label} observable contract missing fields: {uncovered}", ) def _check_m2_platform_specifics( findings: list[dict[str, str]], profile: dict[str, Any], platform: str, label: str, ) -> None: edges = profile.get("edges") if isinstance(profile.get("edges"), dict) else {} endpoints = profile.get("endpoints") if isinstance(profile.get("endpoints"), dict) else {} if platform == "kuaishou": _check_nested_status(findings, label, edges, "author_to_works", "blocked") _check_nested_status(findings, label, edges, "author_work_to_content", "blocked") if platform == "shipinhao": _check_nested_status(findings, label, endpoints, "account_info", "blocked") _check_nested_status(findings, label, edges, "author_to_works", "blocked") _check_nested_status(findings, label, edges, "author_work_to_content", "blocked") def _check_v4_walk_gate_config( findings: list[dict[str, str]], gate: Any, label: str, ) -> None: if not isinstance(gate, dict): _fail(findings, "v4_walk_gate_invalid", f"{label} must be an object") return if gate.get("requires_allow_walk") is not True: _fail(findings, "v4_walk_gate_requires_allow_walk", f"{label}.requires_allow_walk must be true") if gate.get("source_field") != "rule_decisions.jsonl[].decision_replay_data.allow_walk": _fail(findings, "v4_walk_gate_source_field", f"{label}.source_field is invalid") if gate.get("deny_reason_code") != "v4_allow_walk_denied": _fail(findings, "v4_walk_gate_deny_reason", f"{label}.deny_reason_code is invalid") if set(gate.get("applies_to_edges") or []) != M4_WALK_GATE_EDGES: _fail(findings, "v4_walk_gate_edges", f"{label}.applies_to_edges must cover M4 expansion edges") raw_fields = set(gate.get("raw_payload_fields") or []) if not M4_WALK_GATE_RAW_FIELDS <= raw_fields: _fail(findings, "v4_walk_gate_raw_fields", f"{label}.raw_payload_fields missing required fields") def _check_v4_walk_strategy_contract( findings: list[dict[str, str]], strategy: dict[str, Any], ) -> None: rows = strategy.get("v4_walk_gate") if not isinstance(rows, list) or not rows: _fail(findings, "v4_walk_strategy_gate_missing", "douyin_walk_strategy.v1.json missing v4_walk_gate") return by_id = {row.get("gate_id"): row for row in rows if isinstance(row, dict)} gate = by_id.get("allow_walk_required") if not gate: _fail(findings, "v4_walk_strategy_gate_missing", "allow_walk_required gate missing") return _check_v4_walk_gate_config(findings, gate, "douyin_walk_strategy.v4_walk_gate.allow_walk_required") def _check_nested_status( findings: list[dict[str, str]], label: str, section: dict[str, Any], key: str, expected: str, ) -> None: value = section.get(key) actual = value.get("status") if isinstance(value, dict) else None if actual != expected: _fail( findings, "platform_profile_status_mismatch", f"{label}.{key} expected status {expected}, got {actual}", ) def _check_v4_rule_pack_contract( findings: list[dict[str, str]], pkg: dict[str, Any], ) -> None: strategy_version = (pkg.get("strategy_binding") or {}).get("strategy_version") for dispatch in pkg.get("rule_pack_dispatch", []): if dispatch.get("dispatch_enabled") and dispatch.get("strategy_version") == "V4": if dispatch.get("rule_pack_version") != "4.0.0": _fail( findings, "v4_rule_pack_version_invalid", f"{dispatch.get('dispatch_id')} V4 dispatch must use rule_pack_version 4.0.0", ) for pack in pkg.get("rule_packs", []): scorecard = pack.get("scorecard") or {} is_v4 = ( strategy_version == "V4" or pack.get("version") == "4.0.0" or scorecard.get("schema_version") == "v4_scorecard.v1" ) if not is_v4: continue _check_no_legacy_fields(findings, pack, f"rule_pack:{pack.get('rule_pack_id')}") if scorecard.get("schema_version") != "v4_scorecard.v1": _fail( findings, "v4_scorecard_schema_invalid", f"{pack.get('rule_pack_id')} scorecard.schema_version must be v4_scorecard.v1", ) dimensions = [row for row in scorecard.get("dimensions", []) if row.get("runtime_status") == "active"] keys = [row.get("key") for row in dimensions] if keys != ["query_relevance", "platform_performance"]: _fail( findings, "v4_scorecard_dimensions_invalid", f"{pack.get('rule_pack_id')} active dimensions must be query_relevance/platform_performance", ) required_fields = set((pack.get("input_contract") or {}).get("required_fields") or []) for field in [ "pattern_match_result.query_relevance_score", "content_engagement_metrics.platform_performance.platform_performance_score", "content_engagement_metrics.platform_performance.missing_observable_fields", ]: if field not in required_fields: _fail( findings, "v4_rule_pack_required_field_missing", f"{pack.get('rule_pack_id')} missing required field {field}", ) def _fail(findings: list[dict[str, str]], check_id: str, message: str) -> None: findings.append({"level": "fail", "check_id": check_id, "message": message}) if __name__ == "__main__": sys.exit(main())