validate_v4_config_contract.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. """Validate V4 M0 config contract without switching V3 production rule packs."""
  2. from __future__ import annotations
  3. import json
  4. import sys
  5. from pathlib import Path
  6. from typing import Any
  7. ROOT = Path(__file__).resolve().parents[1]
  8. DATA_DIR = ROOT / "tech_documents/数据接口与来源"
  9. RULE_PACK_PATH = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json"
  10. WALK_STRATEGY_PATH = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
  11. LEGACY_FIELD_BLOCKLIST = {
  12. "fit_senior_50plus",
  13. "fit_confidence",
  14. "relevance_score",
  15. "platform_heat",
  16. "age_50_plus_level",
  17. }
  18. ENDPOINT_STATUSES = {
  19. "verified",
  20. "verified_unstable",
  21. "blocked",
  22. "source_only",
  23. "missing",
  24. }
  25. M2_PLATFORM_PROFILES = {"douyin", "kuaishou", "shipinhao"}
  26. PROFILE_EDGE_STATUSES = {"supported", "blocked"}
  27. PROFILE_ENDPOINT_STATUSES = {"verified", "verified_unstable", "blocked", "source_only", "missing"}
  28. OBSERVABLE_FIELDS = {
  29. "statistics.digg_count",
  30. "statistics.comment_count",
  31. "statistics.share_count",
  32. "statistics.collect_count",
  33. "statistics.play_count",
  34. }
  35. MISSING_OBSERVABLE_TYPES = {"natural_platform_missing", "runtime_missing"}
  36. M4_WALK_GATE_EDGES = {"hashtag_to_query", "author_to_works"}
  37. M4_WALK_GATE_RAW_FIELDS = {"decision_id", "allow_walk", "allow_walk_reason", "walk_gate_snapshot"}
  38. def main() -> int:
  39. findings = validate_v4_config_contract(ROOT)
  40. payload = {"status": "fail" if findings else "pass", "findings": findings}
  41. print(json.dumps(payload, ensure_ascii=False, indent=2))
  42. return 1 if findings else 0
  43. def validate_v4_config_contract(root: Path = ROOT) -> list[dict[str, str]]:
  44. data_dir = root / "tech_documents/数据接口与来源"
  45. findings: list[dict[str, str]] = []
  46. walk_graph = _load_json(data_dir / "walk_graph.json", findings)
  47. if walk_graph:
  48. _check_no_legacy_fields(findings, walk_graph, "walk_graph.json")
  49. _check_value(findings, "walk_graph_schema", walk_graph.get("schema_version"), "walk_graph.v2")
  50. _check_count(findings, "walk_graph_nodes", "walk_graph.nodes", walk_graph.get("nodes"), 8)
  51. _check_count(findings, "walk_graph_edges", "walk_graph.edges", walk_graph.get("edges"), 9)
  52. walk_policy = _load_json(data_dir / "walk_policy.json", findings)
  53. if walk_policy:
  54. _check_no_legacy_fields(findings, walk_policy, "walk_policy.json")
  55. _check_value(findings, "walk_policy_schema", walk_policy.get("schema_version"), "walk_policy.v1")
  56. for key in ["global", "edge_budgets", "dedup", "edge_permissions", "v4_walk_gate"]:
  57. if key not in walk_policy:
  58. _fail(findings, "walk_policy_missing_key", f"walk_policy.json missing {key}")
  59. _check_v4_walk_gate_config(findings, walk_policy.get("v4_walk_gate"), "walk_policy.v4_walk_gate")
  60. if walk_graph:
  61. graph_edges = {
  62. edge.get("edge_id")
  63. for edge in walk_graph.get("edges", [])
  64. if isinstance(edge, dict)
  65. }
  66. missing = sorted(M4_WALK_GATE_EDGES - graph_edges)
  67. if missing:
  68. _fail(findings, "v4_walk_gate_graph_edges_missing", f"walk_graph missing M4 gate edges: {missing}")
  69. endpoint_registry = _load_json(data_dir / "crawler_endpoints.registry.json", findings)
  70. if endpoint_registry:
  71. _check_no_legacy_fields(findings, endpoint_registry, "crawler_endpoints.registry.json")
  72. _check_value(
  73. findings,
  74. "crawler_endpoints_schema",
  75. endpoint_registry.get("registry_version"),
  76. "crawler_endpoints.v1",
  77. )
  78. endpoints = endpoint_registry.get("endpoints")
  79. if not isinstance(endpoints, list):
  80. _fail(findings, "crawler_endpoints_invalid", "endpoints must be a list")
  81. else:
  82. _check_count(findings, "crawler_endpoints_count", "endpoints", endpoints, 26)
  83. for endpoint in endpoints:
  84. if not isinstance(endpoint, dict):
  85. _fail(findings, "crawler_endpoint_invalid", "endpoint row must be object")
  86. continue
  87. for key in [
  88. "platform",
  89. "source_id",
  90. "status",
  91. "table_or_endpoint",
  92. "input_fields",
  93. "output_fields",
  94. ]:
  95. if key not in endpoint:
  96. _fail(
  97. findings,
  98. "crawler_endpoint_missing_key",
  99. f"{endpoint.get('source_id')} missing {key}",
  100. )
  101. status = endpoint.get("status")
  102. if not isinstance(status, (str, list)):
  103. _fail(
  104. findings,
  105. "crawler_endpoint_status_invalid",
  106. f"{endpoint.get('source_id')} status must be string or list",
  107. )
  108. continue
  109. statuses = status if isinstance(status, list) else [status]
  110. invalid_statuses = [item for item in statuses if item not in ENDPOINT_STATUSES]
  111. if invalid_statuses:
  112. _fail(
  113. findings,
  114. "crawler_endpoint_status_unknown",
  115. f"{endpoint.get('source_id')} unknown status: {invalid_statuses}",
  116. )
  117. field_map = _load_json(data_dir / "跨平台字段映射.json", findings)
  118. if field_map:
  119. _check_no_legacy_fields(findings, field_map, "跨平台字段映射.json")
  120. _check_value(
  121. findings,
  122. "field_map_schema",
  123. field_map.get("schema_version"),
  124. "cross_platform_field_map.v1",
  125. )
  126. if not isinstance(field_map.get("mappings"), dict):
  127. _fail(findings, "field_map_mappings_invalid", "mappings must be an object")
  128. for profile_path in sorted((data_dir / "platform_profiles").glob("*.json")):
  129. profile = _load_json(profile_path, findings)
  130. if not profile:
  131. continue
  132. _check_no_legacy_fields(findings, profile, profile_path.name)
  133. _check_value(
  134. findings,
  135. "platform_profile_schema",
  136. profile.get("schema_version"),
  137. "platform_profile.v1",
  138. label=profile_path.name,
  139. )
  140. for key in ["platform", "status", "runtime", "endpoints", "edges"]:
  141. if key not in profile:
  142. _fail(findings, "platform_profile_missing_key", f"{profile_path.name} missing {key}")
  143. if profile.get("platform") in M2_PLATFORM_PROFILES:
  144. _check_m2_platform_profile(findings, profile, profile_path.name)
  145. rule_pack_pkg = _load_json(RULE_PACK_PATH, findings)
  146. if rule_pack_pkg:
  147. _check_v4_rule_pack_contract(findings, rule_pack_pkg)
  148. walk_strategy = _load_json(WALK_STRATEGY_PATH, findings)
  149. if walk_strategy:
  150. _check_no_legacy_fields(findings, walk_strategy, "douyin_walk_strategy.v1.json")
  151. _check_v4_walk_strategy_contract(findings, walk_strategy)
  152. return findings
  153. def assert_no_v4_legacy_fields(value: Any, label: str = "v4_contract") -> list[str]:
  154. paths: list[str] = []
  155. _collect_legacy_paths(value, label, paths)
  156. return paths
  157. def _check_no_legacy_fields(
  158. findings: list[dict[str, str]],
  159. value: Any,
  160. label: str,
  161. ) -> None:
  162. paths = assert_no_v4_legacy_fields(value, label)
  163. if paths:
  164. _fail(
  165. findings,
  166. "v4_legacy_field_present",
  167. f"{label} contains legacy fields: {', '.join(paths[:5])}",
  168. )
  169. def _collect_legacy_paths(value: Any, prefix: str, paths: list[str]) -> None:
  170. if isinstance(value, dict):
  171. for key, child in value.items():
  172. child_path = f"{prefix}.{key}"
  173. if key in LEGACY_FIELD_BLOCKLIST:
  174. paths.append(child_path)
  175. _collect_legacy_paths(child, child_path, paths)
  176. elif isinstance(value, list):
  177. for index, child in enumerate(value):
  178. _collect_legacy_paths(child, f"{prefix}[{index}]", paths)
  179. elif isinstance(value, str):
  180. if _string_has_legacy_field(value):
  181. paths.append(prefix)
  182. def _string_has_legacy_field(value: str) -> bool:
  183. normalized = value.replace("[", ".").replace("]", ".").replace("/", ".")
  184. parts = [part.strip() for part in normalized.split(".")]
  185. return any(part in LEGACY_FIELD_BLOCKLIST for part in parts)
  186. def _load_json(path: Path, findings: list[dict[str, str]]) -> dict[str, Any] | None:
  187. try:
  188. return json.loads(path.read_text(encoding="utf-8"))
  189. except FileNotFoundError:
  190. _fail(findings, "file_missing", f"missing file: {path.relative_to(ROOT)}")
  191. except json.JSONDecodeError as exc:
  192. _fail(findings, "json_parse_failed", f"{path.relative_to(ROOT)} cannot parse: {exc}")
  193. return None
  194. def _check_value(
  195. findings: list[dict[str, str]],
  196. check_id: str,
  197. actual: Any,
  198. expected: Any,
  199. *,
  200. label: str = "",
  201. ) -> None:
  202. if actual != expected:
  203. target = f"{label} " if label else ""
  204. _fail(findings, check_id, f"{target}expected {expected}, got {actual}")
  205. def _check_count(
  206. findings: list[dict[str, str]],
  207. check_id: str,
  208. label: str,
  209. value: Any,
  210. expected: int,
  211. ) -> None:
  212. if not isinstance(value, list) or len(value) != expected:
  213. actual = len(value) if isinstance(value, list) else None
  214. _fail(findings, check_id, f"{label} expected {expected}, got {actual}")
  215. def _check_m2_platform_profile(
  216. findings: list[dict[str, str]],
  217. profile: dict[str, Any],
  218. label: str,
  219. ) -> None:
  220. platform = str(profile.get("platform") or "")
  221. edges = profile.get("edges")
  222. if not isinstance(edges, dict):
  223. _fail(findings, "platform_profile_edges_invalid", f"{label} edges must be an object")
  224. else:
  225. for edge_id, edge in edges.items():
  226. if not isinstance(edge, dict):
  227. _fail(findings, "platform_profile_edge_invalid", f"{label}.{edge_id} must be object")
  228. continue
  229. status = edge.get("status")
  230. if status not in PROFILE_EDGE_STATUSES:
  231. _fail(
  232. findings,
  233. "platform_profile_edge_status_unknown",
  234. f"{label}.{edge_id} status must be supported/blocked, got {status}",
  235. )
  236. endpoints = profile.get("endpoints")
  237. if not isinstance(endpoints, dict):
  238. _fail(findings, "platform_profile_endpoints_invalid", f"{label} endpoints must be an object")
  239. else:
  240. for endpoint_id, endpoint in endpoints.items():
  241. if not isinstance(endpoint, dict):
  242. _fail(
  243. findings,
  244. "platform_profile_endpoint_invalid",
  245. f"{label}.{endpoint_id} must be object",
  246. )
  247. continue
  248. status = endpoint.get("status")
  249. if status is not None and status not in PROFILE_ENDPOINT_STATUSES:
  250. _fail(
  251. findings,
  252. "platform_profile_endpoint_status_unknown",
  253. f"{label}.{endpoint_id} endpoint status must be stable enum, got {status}",
  254. )
  255. _check_observable_contract(findings, profile, label)
  256. _check_m2_platform_specifics(findings, profile, platform, label)
  257. def _check_observable_contract(
  258. findings: list[dict[str, str]],
  259. profile: dict[str, Any],
  260. label: str,
  261. ) -> None:
  262. observable_fields = profile.get("observable_fields")
  263. missing_fields = profile.get("missing_observable_fields")
  264. if not isinstance(observable_fields, list) or not observable_fields:
  265. _fail(findings, "observable_fields_invalid", f"{label} observable_fields must be non-empty list")
  266. else:
  267. for item in observable_fields:
  268. if not isinstance(item, dict):
  269. _fail(findings, "observable_field_invalid", f"{label} observable field must be object")
  270. continue
  271. field = item.get("field")
  272. if field not in OBSERVABLE_FIELDS:
  273. _fail(
  274. findings,
  275. "observable_field_unknown",
  276. f"{label} observable field unknown: {field}",
  277. )
  278. if item.get("availability") != "supported":
  279. _fail(
  280. findings,
  281. "observable_field_availability_invalid",
  282. f"{label} {field} availability must be supported",
  283. )
  284. if not isinstance(missing_fields, list):
  285. _fail(
  286. findings,
  287. "missing_observable_fields_invalid",
  288. f"{label} missing_observable_fields must be a list",
  289. )
  290. return
  291. seen_observable = {
  292. item.get("field")
  293. for item in observable_fields
  294. if isinstance(item, dict)
  295. } if isinstance(observable_fields, list) else set()
  296. seen_missing = set()
  297. for item in missing_fields:
  298. if not isinstance(item, dict):
  299. _fail(findings, "missing_observable_field_invalid", f"{label} missing field must be object")
  300. continue
  301. field = item.get("field")
  302. seen_missing.add(field)
  303. if field not in OBSERVABLE_FIELDS:
  304. _fail(findings, "missing_observable_field_unknown", f"{label} missing field unknown: {field}")
  305. missing_type = item.get("missing_type")
  306. if missing_type not in MISSING_OBSERVABLE_TYPES:
  307. _fail(
  308. findings,
  309. "missing_observable_type_unknown",
  310. f"{label} {field} missing_type must be natural_platform_missing/runtime_missing",
  311. )
  312. overlap = sorted(seen_observable & seen_missing)
  313. if overlap:
  314. _fail(
  315. findings,
  316. "observable_field_conflict",
  317. f"{label} fields cannot be both observable and missing: {overlap}",
  318. )
  319. uncovered = sorted(OBSERVABLE_FIELDS - seen_observable - seen_missing)
  320. if uncovered:
  321. _fail(
  322. findings,
  323. "observable_field_uncovered",
  324. f"{label} observable contract missing fields: {uncovered}",
  325. )
  326. def _check_m2_platform_specifics(
  327. findings: list[dict[str, str]],
  328. profile: dict[str, Any],
  329. platform: str,
  330. label: str,
  331. ) -> None:
  332. edges = profile.get("edges") if isinstance(profile.get("edges"), dict) else {}
  333. endpoints = profile.get("endpoints") if isinstance(profile.get("endpoints"), dict) else {}
  334. if platform == "kuaishou":
  335. _check_nested_status(findings, label, edges, "author_to_works", "blocked")
  336. _check_nested_status(findings, label, edges, "author_work_to_content", "blocked")
  337. if platform == "shipinhao":
  338. _check_nested_status(findings, label, endpoints, "account_info", "blocked")
  339. _check_nested_status(findings, label, edges, "author_to_works", "blocked")
  340. _check_nested_status(findings, label, edges, "author_work_to_content", "blocked")
  341. def _check_v4_walk_gate_config(
  342. findings: list[dict[str, str]],
  343. gate: Any,
  344. label: str,
  345. ) -> None:
  346. if not isinstance(gate, dict):
  347. _fail(findings, "v4_walk_gate_invalid", f"{label} must be an object")
  348. return
  349. if gate.get("requires_allow_walk") is not True:
  350. _fail(findings, "v4_walk_gate_requires_allow_walk", f"{label}.requires_allow_walk must be true")
  351. if gate.get("source_field") != "rule_decisions.jsonl[].decision_replay_data.allow_walk":
  352. _fail(findings, "v4_walk_gate_source_field", f"{label}.source_field is invalid")
  353. if gate.get("deny_reason_code") != "v4_allow_walk_denied":
  354. _fail(findings, "v4_walk_gate_deny_reason", f"{label}.deny_reason_code is invalid")
  355. if set(gate.get("applies_to_edges") or []) != M4_WALK_GATE_EDGES:
  356. _fail(findings, "v4_walk_gate_edges", f"{label}.applies_to_edges must cover M4 expansion edges")
  357. raw_fields = set(gate.get("raw_payload_fields") or [])
  358. if not M4_WALK_GATE_RAW_FIELDS <= raw_fields:
  359. _fail(findings, "v4_walk_gate_raw_fields", f"{label}.raw_payload_fields missing required fields")
  360. def _check_v4_walk_strategy_contract(
  361. findings: list[dict[str, str]],
  362. strategy: dict[str, Any],
  363. ) -> None:
  364. rows = strategy.get("v4_walk_gate")
  365. if not isinstance(rows, list) or not rows:
  366. _fail(findings, "v4_walk_strategy_gate_missing", "douyin_walk_strategy.v1.json missing v4_walk_gate")
  367. return
  368. by_id = {row.get("gate_id"): row for row in rows if isinstance(row, dict)}
  369. gate = by_id.get("allow_walk_required")
  370. if not gate:
  371. _fail(findings, "v4_walk_strategy_gate_missing", "allow_walk_required gate missing")
  372. return
  373. _check_v4_walk_gate_config(findings, gate, "douyin_walk_strategy.v4_walk_gate.allow_walk_required")
  374. def _check_nested_status(
  375. findings: list[dict[str, str]],
  376. label: str,
  377. section: dict[str, Any],
  378. key: str,
  379. expected: str,
  380. ) -> None:
  381. value = section.get(key)
  382. actual = value.get("status") if isinstance(value, dict) else None
  383. if actual != expected:
  384. _fail(
  385. findings,
  386. "platform_profile_status_mismatch",
  387. f"{label}.{key} expected status {expected}, got {actual}",
  388. )
  389. def _check_v4_rule_pack_contract(
  390. findings: list[dict[str, str]],
  391. pkg: dict[str, Any],
  392. ) -> None:
  393. strategy_version = (pkg.get("strategy_binding") or {}).get("strategy_version")
  394. for dispatch in pkg.get("rule_pack_dispatch", []):
  395. if dispatch.get("dispatch_enabled") and dispatch.get("strategy_version") == "V4":
  396. if dispatch.get("rule_pack_version") != "4.0.0":
  397. _fail(
  398. findings,
  399. "v4_rule_pack_version_invalid",
  400. f"{dispatch.get('dispatch_id')} V4 dispatch must use rule_pack_version 4.0.0",
  401. )
  402. for pack in pkg.get("rule_packs", []):
  403. scorecard = pack.get("scorecard") or {}
  404. is_v4 = (
  405. strategy_version == "V4"
  406. or pack.get("version") == "4.0.0"
  407. or scorecard.get("schema_version") == "v4_scorecard.v1"
  408. )
  409. if not is_v4:
  410. continue
  411. _check_no_legacy_fields(findings, pack, f"rule_pack:{pack.get('rule_pack_id')}")
  412. if scorecard.get("schema_version") != "v4_scorecard.v1":
  413. _fail(
  414. findings,
  415. "v4_scorecard_schema_invalid",
  416. f"{pack.get('rule_pack_id')} scorecard.schema_version must be v4_scorecard.v1",
  417. )
  418. dimensions = [row for row in scorecard.get("dimensions", []) if row.get("runtime_status") == "active"]
  419. keys = [row.get("key") for row in dimensions]
  420. if keys != ["query_relevance", "platform_performance"]:
  421. _fail(
  422. findings,
  423. "v4_scorecard_dimensions_invalid",
  424. f"{pack.get('rule_pack_id')} active dimensions must be query_relevance/platform_performance",
  425. )
  426. required_fields = set((pack.get("input_contract") or {}).get("required_fields") or [])
  427. for field in [
  428. "pattern_match_result.query_relevance_score",
  429. "content_engagement_metrics.platform_performance.platform_performance_score",
  430. "content_engagement_metrics.platform_performance.missing_observable_fields",
  431. ]:
  432. if field not in required_fields:
  433. _fail(
  434. findings,
  435. "v4_rule_pack_required_field_missing",
  436. f"{pack.get('rule_pack_id')} missing required field {field}",
  437. )
  438. def _fail(findings: list[dict[str, str]], check_id: str, message: str) -> None:
  439. findings.append({"level": "fail", "check_id": check_id, "message": message})
  440. if __name__ == "__main__":
  441. sys.exit(main())