validate_schema_registry.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. from __future__ import annotations
  2. import json
  3. from pathlib import Path
  4. from typing import Any
  5. from count_schema_registry import (
  6. EXPECTED_BUSINESS_MODULES,
  7. REGISTRY_PATH,
  8. parse_runtime_filenames,
  9. parse_sql_schema,
  10. )
  11. ROOT = Path(__file__).resolve().parents[1]
  12. EXPECTED_COUNTS = {
  13. "table_count": 21,
  14. "sql_column_count": 362,
  15. "json_column_count": 63,
  16. "unique_index_count": 27,
  17. "secondary_index_count": 38,
  18. "business_module_count": 10,
  19. "runtime_file_count": 13,
  20. }
  21. RAW_PAYLOAD_TERMS = {
  22. "raw_payload",
  23. "platform_raw_payload",
  24. "source_evidence",
  25. "final_output",
  26. }
  27. LEGACY_RUNTIME_NAMES = {
  28. "queries.jsonl",
  29. "candidate_pool.jsonl",
  30. "media_assets.jsonl",
  31. "source_edges.jsonl",
  32. "trace_events.jsonl",
  33. }
  34. def main() -> None:
  35. findings: list[str] = []
  36. registry = json.loads(REGISTRY_PATH.read_text(encoding="utf-8"))
  37. sql_tables = parse_sql_schema()
  38. runtime_files = parse_runtime_filenames()
  39. _check_top_level_counts(registry, sql_tables, runtime_files, findings)
  40. _check_business_modules(registry, findings)
  41. _check_tables(registry, sql_tables, findings)
  42. _check_runtime_files(registry, runtime_files, findings)
  43. _check_field_index(registry, findings)
  44. _check_payload_policy(registry, findings)
  45. _check_migration_aliases(registry, findings)
  46. _check_source_documents(registry, findings)
  47. payload = {"status": "fail" if findings else "pass", "findings": findings}
  48. print(json.dumps(payload, ensure_ascii=False, indent=2))
  49. if findings:
  50. raise SystemExit(1)
  51. def _check_top_level_counts(
  52. registry: dict[str, Any],
  53. sql_tables: dict[str, Any],
  54. runtime_files: list[str],
  55. findings: list[str],
  56. ) -> None:
  57. actual = {
  58. "table_count": len(sql_tables),
  59. "sql_column_count": sum(len(table.columns) for table in sql_tables.values()),
  60. "json_column_count": sum(
  61. 1 for table in sql_tables.values() for column in table.columns if column.is_json
  62. ),
  63. "unique_index_count": sum(len(table.unique_indexes) for table in sql_tables.values()),
  64. "secondary_index_count": sum(
  65. len(table.secondary_indexes) for table in sql_tables.values()
  66. ),
  67. "business_module_count": len(registry.get("business_modules", {})),
  68. "runtime_file_count": len(runtime_files),
  69. }
  70. targets = registry.get("coverage_targets", {})
  71. for key, expected in EXPECTED_COUNTS.items():
  72. if actual.get(key) != expected:
  73. findings.append(f"{key} actual {actual.get(key)} != expected {expected}")
  74. if targets.get(key) != expected:
  75. findings.append(f"coverage_targets.{key} {targets.get(key)} != expected {expected}")
  76. def _check_business_modules(registry: dict[str, Any], findings: list[str]) -> None:
  77. modules = registry.get("business_modules", {})
  78. missing = sorted(set(EXPECTED_BUSINESS_MODULES) - set(modules))
  79. extra = sorted(set(modules) - set(EXPECTED_BUSINESS_MODULES))
  80. if missing:
  81. findings.append(f"business_modules missing: {missing}")
  82. if extra:
  83. findings.append(f"business_modules extra: {extra}")
  84. for module_id, module in modules.items():
  85. if not module.get("business_label"):
  86. findings.append(f"{module_id} missing business_label")
  87. if not module.get("writer_boundary"):
  88. findings.append(f"{module_id} missing writer_boundary")
  89. if "tables" not in module or "runtime_files" not in module:
  90. findings.append(f"{module_id} missing tables/runtime_files")
  91. def _check_tables(
  92. registry: dict[str, Any],
  93. sql_tables: dict[str, Any],
  94. findings: list[str],
  95. ) -> None:
  96. registry_tables = registry.get("tables", {})
  97. missing_tables = sorted(set(sql_tables) - set(registry_tables))
  98. extra_tables = sorted(set(registry_tables) - set(sql_tables))
  99. if missing_tables:
  100. findings.append(f"tables missing: {missing_tables}")
  101. if extra_tables:
  102. findings.append(f"tables extra: {extra_tables}")
  103. for table_name, sql_table in sql_tables.items():
  104. registry_table = registry_tables.get(table_name)
  105. if not registry_table:
  106. continue
  107. for key in ["owner_module", "writer_boundary", "reader_boundary"]:
  108. if not registry_table.get(key):
  109. findings.append(f"{table_name} missing table-level {key}")
  110. registry_columns = registry_table.get("columns", [])
  111. if len(registry_columns) != len(sql_table.columns):
  112. findings.append(
  113. f"{table_name} column count {len(registry_columns)} != {len(sql_table.columns)}"
  114. )
  115. by_name = {column.get("field_name"): column for column in registry_columns}
  116. for sql_column in sql_table.columns:
  117. registry_column = by_name.get(sql_column.name)
  118. if not registry_column:
  119. findings.append(f"{table_name}.{sql_column.name} missing from registry")
  120. continue
  121. if registry_column.get("db_type") != sql_column.column_type:
  122. findings.append(
  123. f"{table_name}.{sql_column.name} db_type {registry_column.get('db_type')} != {sql_column.column_type}"
  124. )
  125. if registry_column.get("nullable") != sql_column.nullable:
  126. findings.append(f"{table_name}.{sql_column.name} nullable mismatch")
  127. if registry_column.get("default") != sql_column.default:
  128. findings.append(f"{table_name}.{sql_column.name} default mismatch")
  129. if registry_column.get("storage_kind") not in {"db_column", "json_column"}:
  130. findings.append(f"{table_name}.{sql_column.name} invalid storage_kind")
  131. if sql_column.is_json and registry_column.get("storage_kind") != "json_column":
  132. findings.append(f"{table_name}.{sql_column.name} should be json_column")
  133. for key in [
  134. "business_label",
  135. "meaning",
  136. "owner_module",
  137. "writer_boundary",
  138. "reader_boundary",
  139. "field_role",
  140. "promotion_status",
  141. ]:
  142. if not registry_column.get(key):
  143. findings.append(f"{table_name}.{sql_column.name} missing {key}")
  144. if len(registry_table.get("unique_indexes", [])) != len(sql_table.unique_indexes):
  145. findings.append(f"{table_name} unique index count mismatch")
  146. if len(registry_table.get("secondary_indexes", [])) != len(sql_table.secondary_indexes):
  147. findings.append(f"{table_name} secondary index count mismatch")
  148. def _check_runtime_files(
  149. registry: dict[str, Any],
  150. runtime_files: list[str],
  151. findings: list[str],
  152. ) -> None:
  153. registry_runtime_files = registry.get("runtime_files", {})
  154. missing = sorted(set(runtime_files) - set(registry_runtime_files))
  155. extra = sorted(set(registry_runtime_files) - set(runtime_files))
  156. if missing:
  157. findings.append(f"runtime_files missing: {missing}")
  158. if extra:
  159. findings.append(f"runtime_files extra: {extra}")
  160. for filename, spec in registry_runtime_files.items():
  161. if not spec.get("owner_module"):
  162. findings.append(f"{filename} missing owner_module")
  163. if not spec.get("writer_boundary"):
  164. findings.append(f"{filename} missing writer_boundary")
  165. if not spec.get("reader_boundary"):
  166. findings.append(f"{filename} missing reader_boundary")
  167. if not spec.get("mapped_table"):
  168. findings.append(f"{filename} missing mapped_table")
  169. def _check_field_index(registry: dict[str, Any], findings: list[str]) -> None:
  170. valid_refs = {
  171. f"{table_name}.{column['field_name']}"
  172. for table_name, table in registry.get("tables", {}).items()
  173. for column in table.get("columns", [])
  174. }
  175. for field_name, refs in registry.get("field_index", {}).items():
  176. if not isinstance(refs, list) or not refs:
  177. findings.append(f"field_index.{field_name} must be non-empty list")
  178. continue
  179. for ref in refs:
  180. if ref not in valid_refs:
  181. findings.append(f"field_index.{field_name} invalid ref {ref}")
  182. def _check_payload_policy(registry: dict[str, Any], findings: list[str]) -> None:
  183. policy = registry.get("raw_payload_policy", {})
  184. payload_kinds = set(policy.get("payload_kinds", {}))
  185. missing_terms = sorted(RAW_PAYLOAD_TERMS - payload_kinds)
  186. if missing_terms:
  187. findings.append(f"raw_payload_policy missing payload kinds: {missing_terms}")
  188. forbidden = set(policy.get("forbidden_keys", []))
  189. for key in ["password", "token", "api_key", "secret", "dsn"]:
  190. if key not in forbidden:
  191. findings.append(f"raw_payload_policy.forbidden_keys missing {key}")
  192. def _check_migration_aliases(registry: dict[str, Any], findings: list[str]) -> None:
  193. aliases = registry.get("migration_aliases", {})
  194. legacy_runtime_names = set((aliases.get("legacy_runtime_files") or {}).keys())
  195. missing = sorted(LEGACY_RUNTIME_NAMES - legacy_runtime_names)
  196. if missing:
  197. findings.append(f"migration_aliases missing legacy runtime names: {missing}")
  198. registry_text = json.dumps(registry, ensure_ascii=False)
  199. for legacy_name in LEGACY_RUNTIME_NAMES:
  200. if legacy_name in registry_text and legacy_name not in json.dumps(aliases, ensure_ascii=False):
  201. findings.append(f"{legacy_name} appears outside migration_aliases")
  202. def _check_source_documents(registry: dict[str, Any], findings: list[str]) -> None:
  203. for rel_path in registry.get("source_documents", []):
  204. if not (ROOT / rel_path).exists():
  205. findings.append(f"source_documents missing path: {rel_path}")
  206. sql_source = registry.get("database", {}).get("sql_source")
  207. if sql_source and not (ROOT / sql_source).exists():
  208. findings.append(f"database.sql_source missing path: {sql_source}")
  209. if __name__ == "__main__":
  210. main()