| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- from __future__ import annotations
- import json
- from pathlib import Path
- from typing import Any
- from count_schema_registry import (
- EXPECTED_BUSINESS_MODULES,
- REGISTRY_PATH,
- parse_runtime_filenames,
- parse_sql_schema,
- )
- ROOT = Path(__file__).resolve().parents[1]
- EXPECTED_COUNTS = {
- "table_count": 21,
- "sql_column_count": 362,
- "json_column_count": 63,
- "unique_index_count": 27,
- "secondary_index_count": 38,
- "business_module_count": 10,
- "runtime_file_count": 13,
- }
- RAW_PAYLOAD_TERMS = {
- "raw_payload",
- "platform_raw_payload",
- "source_evidence",
- "final_output",
- }
- LEGACY_RUNTIME_NAMES = {
- "queries.jsonl",
- "candidate_pool.jsonl",
- "media_assets.jsonl",
- "source_edges.jsonl",
- "trace_events.jsonl",
- }
- def main() -> None:
- findings: list[str] = []
- registry = json.loads(REGISTRY_PATH.read_text(encoding="utf-8"))
- sql_tables = parse_sql_schema()
- runtime_files = parse_runtime_filenames()
- _check_top_level_counts(registry, sql_tables, runtime_files, findings)
- _check_business_modules(registry, findings)
- _check_tables(registry, sql_tables, findings)
- _check_runtime_files(registry, runtime_files, findings)
- _check_field_index(registry, findings)
- _check_payload_policy(registry, findings)
- _check_migration_aliases(registry, findings)
- _check_source_documents(registry, findings)
- payload = {"status": "fail" if findings else "pass", "findings": findings}
- print(json.dumps(payload, ensure_ascii=False, indent=2))
- if findings:
- raise SystemExit(1)
- def _check_top_level_counts(
- registry: dict[str, Any],
- sql_tables: dict[str, Any],
- runtime_files: list[str],
- findings: list[str],
- ) -> None:
- actual = {
- "table_count": len(sql_tables),
- "sql_column_count": sum(len(table.columns) for table in sql_tables.values()),
- "json_column_count": sum(
- 1 for table in sql_tables.values() for column in table.columns if column.is_json
- ),
- "unique_index_count": sum(len(table.unique_indexes) for table in sql_tables.values()),
- "secondary_index_count": sum(
- len(table.secondary_indexes) for table in sql_tables.values()
- ),
- "business_module_count": len(registry.get("business_modules", {})),
- "runtime_file_count": len(runtime_files),
- }
- targets = registry.get("coverage_targets", {})
- for key, expected in EXPECTED_COUNTS.items():
- if actual.get(key) != expected:
- findings.append(f"{key} actual {actual.get(key)} != expected {expected}")
- if targets.get(key) != expected:
- findings.append(f"coverage_targets.{key} {targets.get(key)} != expected {expected}")
- def _check_business_modules(registry: dict[str, Any], findings: list[str]) -> None:
- modules = registry.get("business_modules", {})
- missing = sorted(set(EXPECTED_BUSINESS_MODULES) - set(modules))
- extra = sorted(set(modules) - set(EXPECTED_BUSINESS_MODULES))
- if missing:
- findings.append(f"business_modules missing: {missing}")
- if extra:
- findings.append(f"business_modules extra: {extra}")
- for module_id, module in modules.items():
- if not module.get("business_label"):
- findings.append(f"{module_id} missing business_label")
- if not module.get("writer_boundary"):
- findings.append(f"{module_id} missing writer_boundary")
- if "tables" not in module or "runtime_files" not in module:
- findings.append(f"{module_id} missing tables/runtime_files")
- def _check_tables(
- registry: dict[str, Any],
- sql_tables: dict[str, Any],
- findings: list[str],
- ) -> None:
- registry_tables = registry.get("tables", {})
- missing_tables = sorted(set(sql_tables) - set(registry_tables))
- extra_tables = sorted(set(registry_tables) - set(sql_tables))
- if missing_tables:
- findings.append(f"tables missing: {missing_tables}")
- if extra_tables:
- findings.append(f"tables extra: {extra_tables}")
- for table_name, sql_table in sql_tables.items():
- registry_table = registry_tables.get(table_name)
- if not registry_table:
- continue
- for key in ["owner_module", "writer_boundary", "reader_boundary"]:
- if not registry_table.get(key):
- findings.append(f"{table_name} missing table-level {key}")
- registry_columns = registry_table.get("columns", [])
- if len(registry_columns) != len(sql_table.columns):
- findings.append(
- f"{table_name} column count {len(registry_columns)} != {len(sql_table.columns)}"
- )
- by_name = {column.get("field_name"): column for column in registry_columns}
- for sql_column in sql_table.columns:
- registry_column = by_name.get(sql_column.name)
- if not registry_column:
- findings.append(f"{table_name}.{sql_column.name} missing from registry")
- continue
- if registry_column.get("db_type") != sql_column.column_type:
- findings.append(
- f"{table_name}.{sql_column.name} db_type {registry_column.get('db_type')} != {sql_column.column_type}"
- )
- if registry_column.get("nullable") != sql_column.nullable:
- findings.append(f"{table_name}.{sql_column.name} nullable mismatch")
- if registry_column.get("default") != sql_column.default:
- findings.append(f"{table_name}.{sql_column.name} default mismatch")
- if registry_column.get("storage_kind") not in {"db_column", "json_column"}:
- findings.append(f"{table_name}.{sql_column.name} invalid storage_kind")
- if sql_column.is_json and registry_column.get("storage_kind") != "json_column":
- findings.append(f"{table_name}.{sql_column.name} should be json_column")
- for key in [
- "business_label",
- "meaning",
- "owner_module",
- "writer_boundary",
- "reader_boundary",
- "field_role",
- "promotion_status",
- ]:
- if not registry_column.get(key):
- findings.append(f"{table_name}.{sql_column.name} missing {key}")
- if len(registry_table.get("unique_indexes", [])) != len(sql_table.unique_indexes):
- findings.append(f"{table_name} unique index count mismatch")
- if len(registry_table.get("secondary_indexes", [])) != len(sql_table.secondary_indexes):
- findings.append(f"{table_name} secondary index count mismatch")
- def _check_runtime_files(
- registry: dict[str, Any],
- runtime_files: list[str],
- findings: list[str],
- ) -> None:
- registry_runtime_files = registry.get("runtime_files", {})
- missing = sorted(set(runtime_files) - set(registry_runtime_files))
- extra = sorted(set(registry_runtime_files) - set(runtime_files))
- if missing:
- findings.append(f"runtime_files missing: {missing}")
- if extra:
- findings.append(f"runtime_files extra: {extra}")
- for filename, spec in registry_runtime_files.items():
- if not spec.get("owner_module"):
- findings.append(f"{filename} missing owner_module")
- if not spec.get("writer_boundary"):
- findings.append(f"{filename} missing writer_boundary")
- if not spec.get("reader_boundary"):
- findings.append(f"{filename} missing reader_boundary")
- if not spec.get("mapped_table"):
- findings.append(f"{filename} missing mapped_table")
- def _check_field_index(registry: dict[str, Any], findings: list[str]) -> None:
- valid_refs = {
- f"{table_name}.{column['field_name']}"
- for table_name, table in registry.get("tables", {}).items()
- for column in table.get("columns", [])
- }
- for field_name, refs in registry.get("field_index", {}).items():
- if not isinstance(refs, list) or not refs:
- findings.append(f"field_index.{field_name} must be non-empty list")
- continue
- for ref in refs:
- if ref not in valid_refs:
- findings.append(f"field_index.{field_name} invalid ref {ref}")
- def _check_payload_policy(registry: dict[str, Any], findings: list[str]) -> None:
- policy = registry.get("raw_payload_policy", {})
- payload_kinds = set(policy.get("payload_kinds", {}))
- missing_terms = sorted(RAW_PAYLOAD_TERMS - payload_kinds)
- if missing_terms:
- findings.append(f"raw_payload_policy missing payload kinds: {missing_terms}")
- forbidden = set(policy.get("forbidden_keys", []))
- for key in ["password", "token", "api_key", "secret", "dsn"]:
- if key not in forbidden:
- findings.append(f"raw_payload_policy.forbidden_keys missing {key}")
- def _check_migration_aliases(registry: dict[str, Any], findings: list[str]) -> None:
- aliases = registry.get("migration_aliases", {})
- legacy_runtime_names = set((aliases.get("legacy_runtime_files") or {}).keys())
- missing = sorted(LEGACY_RUNTIME_NAMES - legacy_runtime_names)
- if missing:
- findings.append(f"migration_aliases missing legacy runtime names: {missing}")
- registry_text = json.dumps(registry, ensure_ascii=False)
- for legacy_name in LEGACY_RUNTIME_NAMES:
- if legacy_name in registry_text and legacy_name not in json.dumps(aliases, ensure_ascii=False):
- findings.append(f"{legacy_name} appears outside migration_aliases")
- def _check_source_documents(registry: dict[str, Any], findings: list[str]) -> None:
- for rel_path in registry.get("source_documents", []):
- if not (ROOT / rel_path).exists():
- findings.append(f"source_documents missing path: {rel_path}")
- sql_source = registry.get("database", {}).get("sql_source")
- if sql_source and not (ROOT / sql_source).exists():
- findings.append(f"database.sql_source missing path: {sql_source}")
- if __name__ == "__main__":
- main()
|