| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- from __future__ import annotations
- import ast
- import json
- import re
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- SQL_PATH = ROOT / "sql/content_agent_schema.sql"
- RUNTIME_FILES_PATH = ROOT / "content_agent/integrations/runtime_files.py"
- REGISTRY_PATH = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry.json"
- REPORT_JSON = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry_count_report.json"
- REPORT_MD = ROOT / "tech_documents/数据库字段总览/content_agent_schema_registry_count_report.md"
- EXPECTED_BUSINESS_MODULES = [
- "source_seed",
- "search_intent",
- "platform_access",
- "content_discovery",
- "rule_judgment",
- "walk_strategy",
- "policy_version",
- "result_source_lookup",
- "run_record",
- "learning_review",
- ]
- @dataclass(frozen=True)
- class ColumnDef:
- name: str
- column_type: str
- data_type: str
- nullable: bool
- default: str | None
- extra: str | None
- ordinal_position: int
- @property
- def is_json(self) -> bool:
- return self.data_type.upper() == "JSON"
- @dataclass(frozen=True)
- class IndexDef:
- name: str
- columns: list[str]
- unique: bool
- @dataclass(frozen=True)
- class TableDef:
- name: str
- columns: list[ColumnDef]
- unique_indexes: list[IndexDef]
- secondary_indexes: list[IndexDef]
- def main() -> None:
- sql_tables = parse_sql_schema(SQL_PATH)
- runtime_files = parse_runtime_filenames(RUNTIME_FILES_PATH)
- registry = _load_registry()
- report = build_report(sql_tables, runtime_files, registry)
- REPORT_JSON.parent.mkdir(parents=True, exist_ok=True)
- REPORT_JSON.write_text(
- json.dumps(report, ensure_ascii=False, indent=2) + "\n",
- encoding="utf-8",
- )
- REPORT_MD.write_text(_render_markdown(report), encoding="utf-8")
- print(json.dumps(report["summary"], ensure_ascii=False))
- def parse_sql_schema(path: Path = SQL_PATH) -> dict[str, TableDef]:
- text = path.read_text(encoding="utf-8")
- tables: dict[str, TableDef] = {}
- pattern = re.compile(
- r"CREATE TABLE IF NOT EXISTS\s+`?(content_agent_[A-Za-z0-9_]+)`?\s*\((.*?)\)\s+ENGINE=",
- re.S,
- )
- for table_name, body in pattern.findall(text):
- columns: list[ColumnDef] = []
- unique_indexes: list[IndexDef] = []
- secondary_indexes: list[IndexDef] = []
- for line in _split_table_body(body):
- parsed_column = _parse_column_line(line, len(columns) + 1)
- if parsed_column:
- columns.append(parsed_column)
- continue
- parsed_index = _parse_index_line(line)
- if parsed_index:
- if parsed_index.unique:
- unique_indexes.append(parsed_index)
- else:
- secondary_indexes.append(parsed_index)
- tables[table_name] = TableDef(
- name=table_name,
- columns=columns,
- unique_indexes=unique_indexes,
- secondary_indexes=secondary_indexes,
- )
- return tables
- def parse_runtime_filenames(path: Path = RUNTIME_FILES_PATH) -> list[str]:
- tree = ast.parse(path.read_text(encoding="utf-8"))
- for node in tree.body:
- if isinstance(node, ast.Assign):
- targets = [target.id for target in node.targets if isinstance(target, ast.Name)]
- if "RUNTIME_FILENAMES" in targets:
- value = ast.literal_eval(node.value)
- if isinstance(value, list):
- return [str(item) for item in value]
- raise ValueError("RUNTIME_FILENAMES not found")
- def build_report(
- sql_tables: dict[str, TableDef],
- runtime_files: list[str],
- registry: dict[str, Any] | None,
- ) -> dict[str, Any]:
- table_summaries = {
- name: {
- "column_count": len(table.columns),
- "json_column_count": sum(1 for column in table.columns if column.is_json),
- "unique_index_count": len(table.unique_indexes),
- "secondary_index_count": len(table.secondary_indexes),
- "json_columns": [column.name for column in table.columns if column.is_json],
- }
- for name, table in sql_tables.items()
- }
- summary = {
- "table_count": len(sql_tables),
- "sql_column_count": sum(len(table.columns) for table in sql_tables.values()),
- "json_column_count": sum(
- 1 for table in sql_tables.values() for column in table.columns if column.is_json
- ),
- "unique_index_count": sum(len(table.unique_indexes) for table in sql_tables.values()),
- "secondary_index_count": sum(
- len(table.secondary_indexes) for table in sql_tables.values()
- ),
- "business_module_count": len(EXPECTED_BUSINESS_MODULES),
- "runtime_file_count": len(runtime_files),
- }
- registry_summary: dict[str, Any] = {}
- if registry:
- registry_summary = {
- "table_count": len(registry.get("tables", {})),
- "business_module_count": len(registry.get("business_modules", {})),
- "runtime_file_count": len(registry.get("runtime_files", {})),
- "field_reference_count": sum(
- len(table.get("columns", [])) for table in registry.get("tables", {}).values()
- ),
- }
- return {
- "summary": summary,
- "source_files": {
- "sql": str(SQL_PATH.relative_to(ROOT)),
- "runtime_files": str(RUNTIME_FILES_PATH.relative_to(ROOT)),
- "registry": str(REGISTRY_PATH.relative_to(ROOT)),
- },
- "tables": table_summaries,
- "runtime_files": runtime_files,
- "business_modules": EXPECTED_BUSINESS_MODULES,
- "registry_summary": registry_summary,
- }
- def _load_registry() -> dict[str, Any] | None:
- if not REGISTRY_PATH.exists():
- return None
- return json.loads(REGISTRY_PATH.read_text(encoding="utf-8"))
- def _split_table_body(body: str) -> list[str]:
- lines: list[str] = []
- current: list[str] = []
- depth = 0
- for char in body:
- if char == "(":
- depth += 1
- elif char == ")":
- depth -= 1
- if char == "," and depth == 0:
- line = "".join(current).strip()
- if line:
- lines.append(line)
- current = []
- else:
- current.append(char)
- line = "".join(current).strip()
- if line:
- lines.append(line)
- return [line.replace("\n", " ").strip() for line in lines]
- def _parse_column_line(line: str, ordinal_position: int) -> ColumnDef | None:
- if line.startswith(("PRIMARY KEY", "UNIQUE KEY", "KEY", "CONSTRAINT")):
- return None
- match = re.match(r"`?([A-Za-z_][A-Za-z0-9_]*)`?\s+(.+)$", line)
- if not match:
- return None
- name, rest = match.groups()
- tokens = rest.split()
- column_type_tokens: list[str] = []
- for token in tokens:
- if token.upper() in {"NOT", "NULL", "DEFAULT", "AUTO_INCREMENT", "ON", "COMMENT"}:
- break
- column_type_tokens.append(token)
- column_type = " ".join(column_type_tokens)
- data_type = column_type.split("(", 1)[0].upper()
- nullable = "NOT NULL" not in rest.upper()
- default_match = re.search(
- r"\bDEFAULT\s+((?:'[^']*')|(?:[A-Za-z0-9_().+-]+))",
- rest,
- flags=re.I,
- )
- default = default_match.group(1) if default_match else None
- extras: list[str] = []
- if "AUTO_INCREMENT" in rest.upper():
- extras.append("AUTO_INCREMENT")
- if "ON UPDATE" in rest.upper():
- extras.append(rest[rest.upper().index("ON UPDATE") :])
- return ColumnDef(
- name=name,
- column_type=column_type,
- data_type=data_type,
- nullable=nullable,
- default=default,
- extra=" ".join(extras) or None,
- ordinal_position=ordinal_position,
- )
- def _parse_index_line(line: str) -> IndexDef | None:
- unique = line.startswith("UNIQUE KEY")
- if not unique and not line.startswith("KEY"):
- return None
- match = re.match(r"(?:UNIQUE\s+)?KEY\s+`?([A-Za-z0-9_]+)`?\s*\((.+)\)", line)
- if not match:
- return None
- name, columns_text = match.groups()
- columns = [
- re.sub(r"\(\d+\)$", "", part.strip().strip("`"))
- for part in columns_text.split(",")
- ]
- return IndexDef(name=name, columns=columns, unique=unique)
- def _render_markdown(report: dict[str, Any]) -> str:
- summary = report["summary"]
- lines = [
- "# Schema Registry 计数报告",
- "",
- f"- 数据库表数量:`{summary['table_count']}`",
- f"- SQL 字段数量:`{summary['sql_column_count']}`",
- f"- JSON 字段数量:`{summary['json_column_count']}`",
- f"- 唯一键数量:`{summary['unique_index_count']}`",
- f"- 二级索引数量:`{summary['secondary_index_count']}`",
- f"- 业务模块数量:`{summary['business_module_count']}`",
- f"- Runtime 文件数量:`{summary['runtime_file_count']}`",
- "",
- "## 按表计数",
- "",
- "| 表 | 字段 | JSON 字段 | 唯一键 | 二级索引 |",
- "|---|---:|---:|---:|---:|",
- ]
- for table_name, table in report["tables"].items():
- lines.append(
- "| {name} | {columns} | {json_columns} | {unique_indexes} | {secondary_indexes} |".format(
- name=f"`{table_name}`",
- columns=table["column_count"],
- json_columns=table["json_column_count"],
- unique_indexes=table["unique_index_count"],
- secondary_indexes=table["secondary_index_count"],
- )
- )
- lines.extend(["", "## Runtime 文件", ""])
- for filename in report["runtime_files"]:
- lines.append(f"- `{filename}`")
- lines.append("")
- return "\n".join(lines)
- if __name__ == "__main__":
- main()
|