from __future__ import annotations import ast import json import re from dataclasses import dataclass from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] SQL_PATH = ROOT / "sql/content_agent_schema.sql" RUNTIME_FILES_PATH = ROOT / "content_agent/integrations/runtime_files.py" REGISTRY_PATH = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry.json" REPORT_JSON = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry_count_report.json" REPORT_MD = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry_count_report.md" EXPECTED_BUSINESS_MODULES = [ "source_seed", "search_intent", "platform_access", "content_discovery", "rule_judgment", "walk_strategy", "policy_version", "result_source_lookup", "run_record", "learning_review", ] @dataclass(frozen=True) class ColumnDef: name: str column_type: str data_type: str nullable: bool default: str | None extra: str | None ordinal_position: int @property def is_json(self) -> bool: return self.data_type.upper() == "JSON" @dataclass(frozen=True) class IndexDef: name: str columns: list[str] unique: bool @dataclass(frozen=True) class TableDef: name: str columns: list[ColumnDef] unique_indexes: list[IndexDef] secondary_indexes: list[IndexDef] def main() -> None: sql_tables = parse_sql_schema(SQL_PATH) runtime_files = parse_runtime_filenames(RUNTIME_FILES_PATH) registry = _load_registry() report = build_report(sql_tables, runtime_files, registry) REPORT_JSON.parent.mkdir(parents=True, exist_ok=True) REPORT_JSON.write_text( json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) REPORT_MD.write_text(_render_markdown(report), encoding="utf-8") print(json.dumps(report["summary"], ensure_ascii=False)) def parse_sql_schema(path: Path = SQL_PATH) -> dict[str, TableDef]: text = path.read_text(encoding="utf-8") tables: dict[str, TableDef] = {} pattern = re.compile( r"CREATE TABLE IF NOT EXISTS\s+`?(content_agent_[A-Za-z0-9_]+)`?\s*\((.*?)\)\s+ENGINE=", re.S, ) for table_name, body in pattern.findall(text): columns: list[ColumnDef] = [] unique_indexes: list[IndexDef] = [] secondary_indexes: list[IndexDef] = [] for line in _split_table_body(body): parsed_column = _parse_column_line(line, len(columns) + 1) if parsed_column: columns.append(parsed_column) continue parsed_index = _parse_index_line(line) if parsed_index: if parsed_index.unique: unique_indexes.append(parsed_index) else: secondary_indexes.append(parsed_index) tables[table_name] = TableDef( name=table_name, columns=columns, unique_indexes=unique_indexes, secondary_indexes=secondary_indexes, ) return tables def parse_runtime_filenames(path: Path = RUNTIME_FILES_PATH) -> list[str]: tree = ast.parse(path.read_text(encoding="utf-8")) for node in tree.body: if isinstance(node, ast.Assign): targets = [target.id for target in node.targets if isinstance(target, ast.Name)] if "RUNTIME_FILENAMES" in targets: value = ast.literal_eval(node.value) if isinstance(value, list): return [str(item) for item in value] raise ValueError("RUNTIME_FILENAMES not found") def build_report( sql_tables: dict[str, TableDef], runtime_files: list[str], registry: dict[str, Any] | None, ) -> dict[str, Any]: table_summaries = { name: { "column_count": len(table.columns), "json_column_count": sum(1 for column in table.columns if column.is_json), "unique_index_count": len(table.unique_indexes), "secondary_index_count": len(table.secondary_indexes), "json_columns": [column.name for column in table.columns if column.is_json], } for name, table in sql_tables.items() } summary = { "table_count": len(sql_tables), "sql_column_count": sum(len(table.columns) for table in sql_tables.values()), "json_column_count": sum( 1 for table in sql_tables.values() for column in table.columns if column.is_json ), "unique_index_count": sum(len(table.unique_indexes) for table in sql_tables.values()), "secondary_index_count": sum( len(table.secondary_indexes) for table in sql_tables.values() ), "business_module_count": len(EXPECTED_BUSINESS_MODULES), "runtime_file_count": len(runtime_files), } registry_summary: dict[str, Any] = {} if registry: registry_summary = { "table_count": len(registry.get("tables", {})), "business_module_count": len(registry.get("business_modules", {})), "runtime_file_count": len(registry.get("runtime_files", {})), "field_reference_count": sum( len(table.get("columns", [])) for table in registry.get("tables", {}).values() ), } return { "summary": summary, "source_files": { "sql": str(SQL_PATH.relative_to(ROOT)), "runtime_files": str(RUNTIME_FILES_PATH.relative_to(ROOT)), "registry": str(REGISTRY_PATH.relative_to(ROOT)), }, "tables": table_summaries, "runtime_files": runtime_files, "business_modules": EXPECTED_BUSINESS_MODULES, "registry_summary": registry_summary, } def _load_registry() -> dict[str, Any] | None: if not REGISTRY_PATH.exists(): return None return json.loads(REGISTRY_PATH.read_text(encoding="utf-8")) def _split_table_body(body: str) -> list[str]: lines: list[str] = [] current: list[str] = [] depth = 0 for char in body: if char == "(": depth += 1 elif char == ")": depth -= 1 if char == "," and depth == 0: line = "".join(current).strip() if line: lines.append(line) current = [] else: current.append(char) line = "".join(current).strip() if line: lines.append(line) return [line.replace("\n", " ").strip() for line in lines] def _parse_column_line(line: str, ordinal_position: int) -> ColumnDef | None: if line.startswith(("PRIMARY KEY", "UNIQUE KEY", "KEY", "CONSTRAINT")): return None match = re.match(r"`?([A-Za-z_][A-Za-z0-9_]*)`?\s+(.+)$", line) if not match: return None name, rest = match.groups() tokens = rest.split() column_type_tokens: list[str] = [] for token in tokens: if token.upper() in {"NOT", "NULL", "DEFAULT", "AUTO_INCREMENT", "ON", "COMMENT"}: break column_type_tokens.append(token) column_type = " ".join(column_type_tokens) data_type = column_type.split("(", 1)[0].upper() nullable = "NOT NULL" not in rest.upper() default_match = re.search( r"\bDEFAULT\s+((?:'[^']*')|(?:[A-Za-z0-9_().+-]+))", rest, flags=re.I, ) default = default_match.group(1) if default_match else None extras: list[str] = [] if "AUTO_INCREMENT" in rest.upper(): extras.append("AUTO_INCREMENT") if "ON UPDATE" in rest.upper(): extras.append(rest[rest.upper().index("ON UPDATE") :]) return ColumnDef( name=name, column_type=column_type, data_type=data_type, nullable=nullable, default=default, extra=" ".join(extras) or None, ordinal_position=ordinal_position, ) def _parse_index_line(line: str) -> IndexDef | None: unique = line.startswith("UNIQUE KEY") if not unique and not line.startswith("KEY"): return None match = re.match(r"(?:UNIQUE\s+)?KEY\s+`?([A-Za-z0-9_]+)`?\s*\((.+)\)", line) if not match: return None name, columns_text = match.groups() columns = [ re.sub(r"\(\d+\)$", "", part.strip().strip("`")) for part in columns_text.split(",") ] return IndexDef(name=name, columns=columns, unique=unique) def _render_markdown(report: dict[str, Any]) -> str: summary = report["summary"] lines = [ "# Schema Registry 计数报告", "", f"- 数据库表数量:`{summary['table_count']}`", f"- SQL 字段数量:`{summary['sql_column_count']}`", f"- JSON 字段数量:`{summary['json_column_count']}`", f"- 唯一键数量:`{summary['unique_index_count']}`", f"- 二级索引数量:`{summary['secondary_index_count']}`", f"- 业务模块数量:`{summary['business_module_count']}`", f"- Runtime 文件数量:`{summary['runtime_file_count']}`", "", "## 按表计数", "", "| 表 | 字段 | JSON 字段 | 唯一键 | 二级索引 |", "|---|---:|---:|---:|---:|", ] for table_name, table in report["tables"].items(): lines.append( "| {name} | {columns} | {json_columns} | {unique_indexes} | {secondary_indexes} |".format( name=f"`{table_name}`", columns=table["column_count"], json_columns=table["json_column_count"], unique_indexes=table["unique_index_count"], secondary_indexes=table["secondary_index_count"], ) ) lines.extend(["", "## Runtime 文件", ""]) for filename in report["runtime_files"]: lines.append(f"- `{filename}`") lines.append("") return "\n".join(lines) if __name__ == "__main__": main()