from __future__ import annotations import json from pathlib import Path from typing import Any from count_schema_registry import ( EXPECTED_BUSINESS_MODULES, REGISTRY_PATH, parse_runtime_filenames, parse_sql_schema, ) ROOT = Path(__file__).resolve().parents[1] EXPECTED_COUNTS = { "table_count": 21, "sql_column_count": 362, "json_column_count": 63, "unique_index_count": 27, "secondary_index_count": 38, "business_module_count": 10, "runtime_file_count": 13, } RAW_PAYLOAD_TERMS = { "raw_payload", "platform_raw_payload", "source_evidence", "final_output", } LEGACY_RUNTIME_NAMES = { "queries.jsonl", "candidate_pool.jsonl", "media_assets.jsonl", "source_edges.jsonl", "trace_events.jsonl", } def main() -> None: findings: list[str] = [] registry = json.loads(REGISTRY_PATH.read_text(encoding="utf-8")) sql_tables = parse_sql_schema() runtime_files = parse_runtime_filenames() _check_top_level_counts(registry, sql_tables, runtime_files, findings) _check_business_modules(registry, findings) _check_tables(registry, sql_tables, findings) _check_runtime_files(registry, runtime_files, findings) _check_field_index(registry, findings) _check_payload_policy(registry, findings) _check_migration_aliases(registry, findings) _check_source_documents(registry, findings) payload = {"status": "fail" if findings else "pass", "findings": findings} print(json.dumps(payload, ensure_ascii=False, indent=2)) if findings: raise SystemExit(1) def _check_top_level_counts( registry: dict[str, Any], sql_tables: dict[str, Any], runtime_files: list[str], findings: list[str], ) -> None: actual = { "table_count": len(sql_tables), "sql_column_count": sum(len(table.columns) for table in sql_tables.values()), "json_column_count": sum( 1 for table in sql_tables.values() for column in table.columns if column.is_json ), "unique_index_count": sum(len(table.unique_indexes) for table in sql_tables.values()), "secondary_index_count": sum( len(table.secondary_indexes) for table in sql_tables.values() ), "business_module_count": len(registry.get("business_modules", {})), "runtime_file_count": len(runtime_files), } targets = registry.get("coverage_targets", {}) for key, expected in EXPECTED_COUNTS.items(): if actual.get(key) != expected: findings.append(f"{key} actual {actual.get(key)} != expected {expected}") if targets.get(key) != expected: findings.append(f"coverage_targets.{key} {targets.get(key)} != expected {expected}") def _check_business_modules(registry: dict[str, Any], findings: list[str]) -> None: modules = registry.get("business_modules", {}) missing = sorted(set(EXPECTED_BUSINESS_MODULES) - set(modules)) extra = sorted(set(modules) - set(EXPECTED_BUSINESS_MODULES)) if missing: findings.append(f"business_modules missing: {missing}") if extra: findings.append(f"business_modules extra: {extra}") for module_id, module in modules.items(): if not module.get("business_label"): findings.append(f"{module_id} missing business_label") if not module.get("writer_boundary"): findings.append(f"{module_id} missing writer_boundary") if "tables" not in module or "runtime_files" not in module: findings.append(f"{module_id} missing tables/runtime_files") def _check_tables( registry: dict[str, Any], sql_tables: dict[str, Any], findings: list[str], ) -> None: registry_tables = registry.get("tables", {}) missing_tables = sorted(set(sql_tables) - set(registry_tables)) extra_tables = sorted(set(registry_tables) - set(sql_tables)) if missing_tables: findings.append(f"tables missing: {missing_tables}") if extra_tables: findings.append(f"tables extra: {extra_tables}") for table_name, sql_table in sql_tables.items(): registry_table = registry_tables.get(table_name) if not registry_table: continue for key in ["owner_module", "writer_boundary", "reader_boundary"]: if not registry_table.get(key): findings.append(f"{table_name} missing table-level {key}") registry_columns = registry_table.get("columns", []) if len(registry_columns) != len(sql_table.columns): findings.append( f"{table_name} column count {len(registry_columns)} != {len(sql_table.columns)}" ) by_name = {column.get("field_name"): column for column in registry_columns} for sql_column in sql_table.columns: registry_column = by_name.get(sql_column.name) if not registry_column: findings.append(f"{table_name}.{sql_column.name} missing from registry") continue if registry_column.get("db_type") != sql_column.column_type: findings.append( f"{table_name}.{sql_column.name} db_type {registry_column.get('db_type')} != {sql_column.column_type}" ) if registry_column.get("nullable") != sql_column.nullable: findings.append(f"{table_name}.{sql_column.name} nullable mismatch") if registry_column.get("default") != sql_column.default: findings.append(f"{table_name}.{sql_column.name} default mismatch") if registry_column.get("storage_kind") not in {"db_column", "json_column"}: findings.append(f"{table_name}.{sql_column.name} invalid storage_kind") if sql_column.is_json and registry_column.get("storage_kind") != "json_column": findings.append(f"{table_name}.{sql_column.name} should be json_column") for key in [ "business_label", "meaning", "owner_module", "writer_boundary", "reader_boundary", "field_role", "promotion_status", ]: if not registry_column.get(key): findings.append(f"{table_name}.{sql_column.name} missing {key}") if len(registry_table.get("unique_indexes", [])) != len(sql_table.unique_indexes): findings.append(f"{table_name} unique index count mismatch") if len(registry_table.get("secondary_indexes", [])) != len(sql_table.secondary_indexes): findings.append(f"{table_name} secondary index count mismatch") def _check_runtime_files( registry: dict[str, Any], runtime_files: list[str], findings: list[str], ) -> None: registry_runtime_files = registry.get("runtime_files", {}) missing = sorted(set(runtime_files) - set(registry_runtime_files)) extra = sorted(set(registry_runtime_files) - set(runtime_files)) if missing: findings.append(f"runtime_files missing: {missing}") if extra: findings.append(f"runtime_files extra: {extra}") for filename, spec in registry_runtime_files.items(): if not spec.get("owner_module"): findings.append(f"{filename} missing owner_module") if not spec.get("writer_boundary"): findings.append(f"{filename} missing writer_boundary") if not spec.get("reader_boundary"): findings.append(f"{filename} missing reader_boundary") if not spec.get("mapped_table"): findings.append(f"{filename} missing mapped_table") def _check_field_index(registry: dict[str, Any], findings: list[str]) -> None: valid_refs = { f"{table_name}.{column['field_name']}" for table_name, table in registry.get("tables", {}).items() for column in table.get("columns", []) } for field_name, refs in registry.get("field_index", {}).items(): if not isinstance(refs, list) or not refs: findings.append(f"field_index.{field_name} must be non-empty list") continue for ref in refs: if ref not in valid_refs: findings.append(f"field_index.{field_name} invalid ref {ref}") def _check_payload_policy(registry: dict[str, Any], findings: list[str]) -> None: policy = registry.get("raw_payload_policy", {}) payload_kinds = set(policy.get("payload_kinds", {})) missing_terms = sorted(RAW_PAYLOAD_TERMS - payload_kinds) if missing_terms: findings.append(f"raw_payload_policy missing payload kinds: {missing_terms}") forbidden = set(policy.get("forbidden_keys", [])) for key in ["password", "token", "api_key", "secret", "dsn"]: if key not in forbidden: findings.append(f"raw_payload_policy.forbidden_keys missing {key}") def _check_migration_aliases(registry: dict[str, Any], findings: list[str]) -> None: aliases = registry.get("migration_aliases", {}) legacy_runtime_names = set((aliases.get("legacy_runtime_files") or {}).keys()) missing = sorted(LEGACY_RUNTIME_NAMES - legacy_runtime_names) if missing: findings.append(f"migration_aliases missing legacy runtime names: {missing}") registry_text = json.dumps(registry, ensure_ascii=False) for legacy_name in LEGACY_RUNTIME_NAMES: if legacy_name in registry_text and legacy_name not in json.dumps(aliases, ensure_ascii=False): findings.append(f"{legacy_name} appears outside migration_aliases") def _check_source_documents(registry: dict[str, Any], findings: list[str]) -> None: for rel_path in registry.get("source_documents", []): if not (ROOT / rel_path).exists(): findings.append(f"source_documents missing path: {rel_path}") sql_source = registry.get("database", {}).get("sql_source") if sql_source and not (ROOT / sql_source).exists(): findings.append(f"database.sql_source missing path: {sql_source}") if __name__ == "__main__": main()