| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- EXPECTED_TABLES = [
- "content_agent_runs",
- "content_agent_source_contexts",
- "content_agent_pattern_seed_packs",
- "content_agent_queries",
- "content_agent_discovered_content_items",
- "content_agent_content_media_records",
- "content_agent_rule_decisions",
- "content_agent_walk_actions",
- "content_agent_source_path_records",
- "content_agent_search_clues",
- "content_agent_run_events",
- "content_agent_final_outputs",
- "content_agent_publish_jobs",
- "content_agent_author_assets",
- "content_agent_author_asset_roles",
- "content_agent_pattern_recall_evidence",
- "content_agent_strategy_reviews",
- "content_agent_policy_runs",
- ]
- REQUIRED_PRIVILEGES = ("SELECT", "INSERT", "UPDATE")
- COMMON_COLUMNS = {"id", "schema_version", "run_id", "created_at"}
- POLICY_RUN_TABLES = set(EXPECTED_TABLES) - {
- "content_agent_runs",
- "content_agent_source_contexts",
- "content_agent_author_assets",
- "content_agent_author_asset_roles",
- }
- REQUIRED_COLUMNS_BY_TABLE = {
- table: set(COMMON_COLUMNS) | ({"policy_run_id"} if table in POLICY_RUN_TABLES else set())
- for table in EXPECTED_TABLES
- }
- REQUIRED_COLUMNS_BY_TABLE["content_agent_policy_runs"].update(
- {
- "policy_bundle_id",
- "rule_pack_id",
- "strategy_id",
- "strategy_version",
- "rule_pack_version",
- "policy_bundle_hash",
- "strategy_source_ref",
- "rule_pack_source_ref",
- "evidence_bundle_schema_version",
- "runtime_record_schema_version",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_content_media_records"].add("platform")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].update(
- {
- "author_asset_id",
- "platform",
- "platform_author_id",
- "asset_status",
- "source_type",
- "validation_status",
- "eligible_as_source",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].discard("run_id")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].update(
- {
- "author_asset_id",
- "role",
- "role_status",
- "assigned_by",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].discard("run_id")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_walk_actions"].update(
- {
- "walk_action_id",
- "edge_id",
- "walk_action",
- "walk_status",
- "raw_payload",
- }
- )
- for table in [
- "content_agent_queries",
- "content_agent_discovered_content_items",
- "content_agent_content_media_records",
- "content_agent_rule_decisions",
- "content_agent_walk_actions",
- "content_agent_source_path_records",
- "content_agent_search_clues",
- "content_agent_run_events",
- "content_agent_strategy_reviews",
- ]:
- REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload")
- REQUIRED_UNIQUE_INDEXES_BY_TABLE = {
- "content_agent_runs": [{"run_id"}],
- "content_agent_source_contexts": [{"run_id"}],
- "content_agent_pattern_seed_packs": [{"run_id", "policy_run_id"}],
- "content_agent_queries": [{"run_id", "policy_run_id", "search_query_id"}],
- "content_agent_discovered_content_items": [
- {"run_id", "policy_run_id", "content_discovery_id"},
- {"run_id", "policy_run_id", "platform", "platform_content_id"},
- ],
- "content_agent_content_media_records": [
- {"run_id", "policy_run_id", "platform", "platform_content_id"}
- ],
- "content_agent_rule_decisions": [
- {"run_id", "policy_run_id", "decision_id"},
- {"run_id", "policy_run_id", "decision_target_type", "decision_target_id"},
- ],
- "content_agent_walk_actions": [
- {"run_id", "policy_run_id", "walk_action_id"},
- ],
- "content_agent_source_path_records": [
- {"run_id", "policy_run_id", "source_path_record_id"}
- ],
- "content_agent_search_clues": [
- {"run_id", "policy_run_id", "clue_id"},
- {"run_id", "policy_run_id", "search_query_id"},
- ],
- "content_agent_run_events": [{"run_id", "policy_run_id", "event_id"}],
- "content_agent_final_outputs": [{"run_id", "policy_run_id", "output_version"}],
- "content_agent_publish_jobs": [{"run_id", "policy_run_id", "publish_job_id"}],
- "content_agent_author_assets": [
- {"author_asset_id"},
- {"platform", "platform_author_id"},
- ],
- "content_agent_author_asset_roles": [{"author_asset_id", "role"}],
- "content_agent_pattern_recall_evidence": [
- {"run_id", "policy_run_id", "recall_evidence_id"}
- ],
- "content_agent_strategy_reviews": [{"run_id", "policy_run_id", "review_id"}],
- "content_agent_policy_runs": [{"run_id", "policy_run_id"}],
- }
- EXPECTED_UNIQUE_INDEX_COUNT = sum(
- len(required_indexes) for required_indexes in REQUIRED_UNIQUE_INDEXES_BY_TABLE.values()
- )
- def main() -> int:
- args = _parse_args()
- try:
- import pymysql
- except ModuleNotFoundError:
- print(
- "Missing pymysql. Run with: uv run --with pymysql python scripts/validate_content_agent_db.py",
- file=sys.stderr,
- )
- return 2
- env = _load_env_file(args.env_file)
- raw_cfg = {
- "host": args.host or _env_value(env, "CONTENT_SUPPLY_DB_HOST"),
- "port": args.port or _env_value(env, "CONTENT_SUPPLY_DB_PORT"),
- "user": args.user or _env_value(env, "CONTENT_SUPPLY_DB_USER"),
- "password": args.password or _env_value(env, "CONTENT_SUPPLY_DB_PASSWORD"),
- "database": args.database or _env_value(env, "CONTENT_SUPPLY_DB_NAME"),
- }
- missing_keys = [
- key
- for key, value in {
- "CONTENT_SUPPLY_DB_HOST": raw_cfg["host"],
- "CONTENT_SUPPLY_DB_PORT": raw_cfg["port"],
- "CONTENT_SUPPLY_DB_USER": raw_cfg["user"],
- "CONTENT_SUPPLY_DB_PASSWORD": raw_cfg["password"],
- "CONTENT_SUPPLY_DB_NAME": raw_cfg["database"],
- }.items()
- if not value
- ]
- if missing_keys:
- print(
- f"Missing required CFA db env keys: {', '.join(missing_keys)}",
- file=sys.stderr,
- )
- return 2
- cfg = {
- "host": raw_cfg["host"],
- "port": int(raw_cfg["port"]),
- "user": raw_cfg["user"],
- "password": raw_cfg["password"],
- "database": raw_cfg["database"],
- "charset": "utf8mb4",
- "cursorclass": pymysql.cursors.DictCursor,
- "connect_timeout": args.timeout,
- "read_timeout": args.timeout,
- "write_timeout": args.timeout,
- }
- result: dict[str, Any] = {
- "connect": "pending",
- "host": cfg["host"],
- "database": cfg["database"],
- "expected_table_count": len(EXPECTED_TABLES),
- "expected_unique_index_count": EXPECTED_UNIQUE_INDEX_COUNT,
- }
- with pymysql.connect(**cfg) as conn:
- with conn.cursor() as cur:
- cur.execute(
- "SELECT DATABASE() AS db_name, CURRENT_USER() AS current_user_name, "
- "@@version AS mysql_version"
- )
- meta = cur.fetchone()
- result.update(
- {
- "connect": "ok",
- "current_user": meta["current_user_name"],
- "mysql_version": meta["mysql_version"],
- }
- )
- cur.execute("SHOW GRANTS FOR CURRENT_USER()")
- grants = [next(iter(row.values())) for row in cur.fetchall()]
- result["grants"] = grants
- result["has_runtime_privileges"] = _has_required_privileges(grants, cfg["database"])
- result["has_create_privilege"] = _has_create_privilege(grants, cfg["database"])
- cur.execute("SHOW TABLES LIKE 'content_agent_%'")
- found_tables = sorted(next(iter(row.values())) for row in cur.fetchall())
- missing_tables = sorted(set(EXPECTED_TABLES) - set(found_tables))
- extra_tables = sorted(set(found_tables) - set(EXPECTED_TABLES))
- result.update(
- {
- "found_table_count": len(found_tables),
- "found_tables": found_tables,
- "missing_tables": missing_tables,
- "extra_content_agent_tables": extra_tables,
- }
- )
- cur.execute(
- "SELECT COUNT(*) AS cnt "
- "FROM `content-deconstruction-supply`.demand_content "
- "WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) LIKE %s",
- (args.run_label_like,),
- )
- result["demand_batch_count"] = cur.fetchone()["cnt"]
- column_status = {}
- index_status = {}
- for table in found_tables:
- cur.execute(f"SHOW COLUMNS FROM `{table}`")
- columns = {row["Field"] for row in cur.fetchall()}
- required_columns = REQUIRED_COLUMNS_BY_TABLE.get(table, set())
- column_status[table] = {
- "has_id": "id" in columns,
- "has_run_id": "run_id" in columns,
- "has_created_at": "created_at" in columns,
- "has_schema_version": "schema_version" in columns,
- "has_policy_run_id": "policy_run_id" in columns,
- "missing_required_columns": sorted(required_columns - columns),
- }
- cur.execute(f"SHOW INDEX FROM `{table}`")
- unique_indexes: dict[str, set[str]] = {}
- for row in cur.fetchall():
- if row["Non_unique"] == 0:
- unique_indexes.setdefault(row["Key_name"], set()).add(row["Column_name"])
- required_unique_indexes = REQUIRED_UNIQUE_INDEXES_BY_TABLE.get(table, [])
- missing_required_unique_indexes = [
- sorted(required_unique)
- for required_unique in required_unique_indexes
- if not any(
- index_columns == required_unique
- for index_columns in unique_indexes.values()
- )
- ]
- index_status[table] = {
- "has_required_unique_index": not missing_required_unique_indexes,
- "has_required_unique_indexes": not missing_required_unique_indexes,
- "required_unique_indexes": [
- sorted(required_unique) for required_unique in required_unique_indexes
- ],
- "missing_required_unique_indexes": missing_required_unique_indexes,
- "unique_indexes": {
- name: sorted(columns) for name, columns in sorted(unique_indexes.items())
- },
- }
- result["column_status"] = column_status
- result["index_status"] = index_status
- schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {}))
- result["schema_ready"] = schema_ready
- print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
- if args.allow_missing_tables:
- return 0
- if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready:
- return 1
- return 0
- def _schema_ready(
- column_status: dict[str, dict[str, Any]],
- index_status: dict[str, dict[str, Any]],
- ) -> bool:
- for table in EXPECTED_TABLES:
- status = column_status.get(table)
- if not status or status.get("missing_required_columns"):
- return False
- index = index_status.get(table)
- if not index or not index.get("has_required_unique_indexes"):
- return False
- return True
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Validate Content Agent cloud MySQL schema readiness.")
- parser.add_argument("--env-file", default=".env", help="Primary env file. Defaults to project .env.")
- parser.add_argument("--host", default=None)
- parser.add_argument("--port", default=None)
- parser.add_argument("--database", default=None)
- parser.add_argument("--user", default=None)
- parser.add_argument("--password", default=None)
- parser.add_argument("--timeout", type=int, default=8)
- parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%")
- parser.add_argument(
- "--allow-missing-tables",
- action="store_true",
- help="Return success even when content_agent_* tables have not been created yet.",
- )
- return parser.parse_args()
- def _load_env_file(path_value: str | None) -> dict[str, str]:
- if not path_value:
- return {}
- path = Path(path_value)
- if not path.exists():
- return {}
- result: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- result[key.strip()] = value.strip().strip('"').strip("'")
- return result
- def _env_value(env: dict[str, str], *keys: str, default: str | None = None) -> str | None:
- for key in keys:
- value = env.get(key)
- if value:
- return value
- return default
- def _has_required_privileges(grants: list[str], database: str) -> bool:
- scoped_grants = _content_agent_scoped_grants(grants, database)
- normalized = " ".join(scoped_grants).upper()
- return all(privilege in normalized for privilege in REQUIRED_PRIVILEGES) or "ALL PRIVILEGES" in normalized
- def _has_create_privilege(grants: list[str], database: str) -> bool:
- scoped_grants = _database_or_global_grants(grants, database)
- normalized = " ".join(scoped_grants).upper()
- return "CREATE" in normalized or "ALL PRIVILEGES" in normalized
- def _content_agent_scoped_grants(grants: list[str], database: str) -> list[str]:
- scoped: list[str] = []
- db_scope = f"ON `{database}`.*"
- for grant in grants:
- if "ON *.*" in grant or db_scope in grant:
- scoped.append(grant)
- continue
- if f"ON `{database}`.`content_agent_" in grant:
- scoped.append(grant)
- return scoped
- def _database_or_global_grants(grants: list[str], database: str) -> list[str]:
- db_scope = f"ON `{database}`.*"
- return [grant for grant in grants if "ON *.*" in grant or db_scope in grant]
- if __name__ == "__main__":
- raise SystemExit(main())
|