validate_content_agent_db.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. from pathlib import Path
  6. from typing import Any
  7. EXPECTED_TABLES = [
  8. "content_agent_runs",
  9. "content_agent_source_contexts",
  10. "content_agent_pattern_seed_packs",
  11. "content_agent_queries",
  12. "content_agent_discovered_content_items",
  13. "content_agent_content_media_records",
  14. "content_agent_rule_decisions",
  15. "content_agent_walk_actions",
  16. "content_agent_source_path_records",
  17. "content_agent_search_clues",
  18. "content_agent_run_events",
  19. "content_agent_final_outputs",
  20. "content_agent_publish_jobs",
  21. "content_agent_author_assets",
  22. "content_agent_author_asset_roles",
  23. "content_agent_pattern_recall_evidence",
  24. "content_agent_strategy_reviews",
  25. "content_agent_policy_runs",
  26. ]
  27. REQUIRED_PRIVILEGES = ("SELECT", "INSERT", "UPDATE")
  28. COMMON_COLUMNS = {"id", "schema_version", "run_id", "created_at"}
  29. POLICY_RUN_TABLES = set(EXPECTED_TABLES) - {
  30. "content_agent_runs",
  31. "content_agent_source_contexts",
  32. "content_agent_author_assets",
  33. "content_agent_author_asset_roles",
  34. }
  35. REQUIRED_COLUMNS_BY_TABLE = {
  36. table: set(COMMON_COLUMNS) | ({"policy_run_id"} if table in POLICY_RUN_TABLES else set())
  37. for table in EXPECTED_TABLES
  38. }
  39. REQUIRED_COLUMNS_BY_TABLE["content_agent_policy_runs"].update(
  40. {
  41. "policy_bundle_id",
  42. "rule_pack_id",
  43. "strategy_id",
  44. "strategy_version",
  45. "rule_pack_version",
  46. "policy_bundle_hash",
  47. "strategy_source_ref",
  48. "rule_pack_source_ref",
  49. "evidence_bundle_schema_version",
  50. "runtime_record_schema_version",
  51. }
  52. )
  53. REQUIRED_COLUMNS_BY_TABLE["content_agent_content_media_records"].add("platform")
  54. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].update(
  55. {
  56. "author_asset_id",
  57. "platform",
  58. "platform_author_id",
  59. "asset_status",
  60. "source_type",
  61. "validation_status",
  62. "eligible_as_source",
  63. }
  64. )
  65. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].discard("run_id")
  66. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].update(
  67. {
  68. "author_asset_id",
  69. "role",
  70. "role_status",
  71. "assigned_by",
  72. }
  73. )
  74. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].discard("run_id")
  75. REQUIRED_COLUMNS_BY_TABLE["content_agent_walk_actions"].update(
  76. {
  77. "walk_action_id",
  78. "edge_id",
  79. "walk_action",
  80. "walk_status",
  81. "raw_payload",
  82. }
  83. )
  84. for table in [
  85. "content_agent_queries",
  86. "content_agent_discovered_content_items",
  87. "content_agent_content_media_records",
  88. "content_agent_rule_decisions",
  89. "content_agent_walk_actions",
  90. "content_agent_source_path_records",
  91. "content_agent_search_clues",
  92. "content_agent_run_events",
  93. "content_agent_strategy_reviews",
  94. ]:
  95. REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload")
  96. REQUIRED_UNIQUE_INDEXES_BY_TABLE = {
  97. "content_agent_runs": [{"run_id"}],
  98. "content_agent_source_contexts": [{"run_id"}],
  99. "content_agent_pattern_seed_packs": [{"run_id", "policy_run_id"}],
  100. "content_agent_queries": [{"run_id", "policy_run_id", "search_query_id"}],
  101. "content_agent_discovered_content_items": [
  102. {"run_id", "policy_run_id", "content_discovery_id"},
  103. {"run_id", "policy_run_id", "platform", "platform_content_id"},
  104. ],
  105. "content_agent_content_media_records": [
  106. {"run_id", "policy_run_id", "platform", "platform_content_id"}
  107. ],
  108. "content_agent_rule_decisions": [
  109. {"run_id", "policy_run_id", "decision_id"},
  110. {"run_id", "policy_run_id", "decision_target_type", "decision_target_id"},
  111. ],
  112. "content_agent_walk_actions": [
  113. {"run_id", "policy_run_id", "walk_action_id"},
  114. ],
  115. "content_agent_source_path_records": [
  116. {"run_id", "policy_run_id", "source_path_record_id"}
  117. ],
  118. "content_agent_search_clues": [
  119. {"run_id", "policy_run_id", "clue_id"},
  120. {"run_id", "policy_run_id", "search_query_id"},
  121. ],
  122. "content_agent_run_events": [{"run_id", "policy_run_id", "event_id"}],
  123. "content_agent_final_outputs": [{"run_id", "policy_run_id", "output_version"}],
  124. "content_agent_publish_jobs": [{"run_id", "policy_run_id", "publish_job_id"}],
  125. "content_agent_author_assets": [
  126. {"author_asset_id"},
  127. {"platform", "platform_author_id"},
  128. ],
  129. "content_agent_author_asset_roles": [{"author_asset_id", "role"}],
  130. "content_agent_pattern_recall_evidence": [
  131. {"run_id", "policy_run_id", "recall_evidence_id"}
  132. ],
  133. "content_agent_strategy_reviews": [{"run_id", "policy_run_id", "review_id"}],
  134. "content_agent_policy_runs": [{"run_id", "policy_run_id"}],
  135. }
  136. EXPECTED_UNIQUE_INDEX_COUNT = sum(
  137. len(required_indexes) for required_indexes in REQUIRED_UNIQUE_INDEXES_BY_TABLE.values()
  138. )
  139. def main() -> int:
  140. args = _parse_args()
  141. try:
  142. import pymysql
  143. except ModuleNotFoundError:
  144. print(
  145. "Missing pymysql. Run with: uv run --with pymysql python scripts/validate_content_agent_db.py",
  146. file=sys.stderr,
  147. )
  148. return 2
  149. env = _load_env_file(args.env_file)
  150. raw_cfg = {
  151. "host": args.host or _env_value(env, "CONTENT_SUPPLY_DB_HOST"),
  152. "port": args.port or _env_value(env, "CONTENT_SUPPLY_DB_PORT"),
  153. "user": args.user or _env_value(env, "CONTENT_SUPPLY_DB_USER"),
  154. "password": args.password or _env_value(env, "CONTENT_SUPPLY_DB_PASSWORD"),
  155. "database": args.database or _env_value(env, "CONTENT_SUPPLY_DB_NAME"),
  156. }
  157. missing_keys = [
  158. key
  159. for key, value in {
  160. "CONTENT_SUPPLY_DB_HOST": raw_cfg["host"],
  161. "CONTENT_SUPPLY_DB_PORT": raw_cfg["port"],
  162. "CONTENT_SUPPLY_DB_USER": raw_cfg["user"],
  163. "CONTENT_SUPPLY_DB_PASSWORD": raw_cfg["password"],
  164. "CONTENT_SUPPLY_DB_NAME": raw_cfg["database"],
  165. }.items()
  166. if not value
  167. ]
  168. if missing_keys:
  169. print(
  170. f"Missing required CFA db env keys: {', '.join(missing_keys)}",
  171. file=sys.stderr,
  172. )
  173. return 2
  174. cfg = {
  175. "host": raw_cfg["host"],
  176. "port": int(raw_cfg["port"]),
  177. "user": raw_cfg["user"],
  178. "password": raw_cfg["password"],
  179. "database": raw_cfg["database"],
  180. "charset": "utf8mb4",
  181. "cursorclass": pymysql.cursors.DictCursor,
  182. "connect_timeout": args.timeout,
  183. "read_timeout": args.timeout,
  184. "write_timeout": args.timeout,
  185. }
  186. result: dict[str, Any] = {
  187. "connect": "pending",
  188. "host": cfg["host"],
  189. "database": cfg["database"],
  190. "expected_table_count": len(EXPECTED_TABLES),
  191. "expected_unique_index_count": EXPECTED_UNIQUE_INDEX_COUNT,
  192. }
  193. with pymysql.connect(**cfg) as conn:
  194. with conn.cursor() as cur:
  195. cur.execute(
  196. "SELECT DATABASE() AS db_name, CURRENT_USER() AS current_user_name, "
  197. "@@version AS mysql_version"
  198. )
  199. meta = cur.fetchone()
  200. result.update(
  201. {
  202. "connect": "ok",
  203. "current_user": meta["current_user_name"],
  204. "mysql_version": meta["mysql_version"],
  205. }
  206. )
  207. cur.execute("SHOW GRANTS FOR CURRENT_USER()")
  208. grants = [next(iter(row.values())) for row in cur.fetchall()]
  209. result["grants"] = grants
  210. result["has_runtime_privileges"] = _has_required_privileges(grants, cfg["database"])
  211. result["has_create_privilege"] = _has_create_privilege(grants, cfg["database"])
  212. cur.execute("SHOW TABLES LIKE 'content_agent_%'")
  213. found_tables = sorted(next(iter(row.values())) for row in cur.fetchall())
  214. missing_tables = sorted(set(EXPECTED_TABLES) - set(found_tables))
  215. extra_tables = sorted(set(found_tables) - set(EXPECTED_TABLES))
  216. result.update(
  217. {
  218. "found_table_count": len(found_tables),
  219. "found_tables": found_tables,
  220. "missing_tables": missing_tables,
  221. "extra_content_agent_tables": extra_tables,
  222. }
  223. )
  224. cur.execute(
  225. "SELECT COUNT(*) AS cnt "
  226. "FROM `content-deconstruction-supply`.demand_content "
  227. "WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) LIKE %s",
  228. (args.run_label_like,),
  229. )
  230. result["demand_batch_count"] = cur.fetchone()["cnt"]
  231. column_status = {}
  232. index_status = {}
  233. for table in found_tables:
  234. cur.execute(f"SHOW COLUMNS FROM `{table}`")
  235. columns = {row["Field"] for row in cur.fetchall()}
  236. required_columns = REQUIRED_COLUMNS_BY_TABLE.get(table, set())
  237. column_status[table] = {
  238. "has_id": "id" in columns,
  239. "has_run_id": "run_id" in columns,
  240. "has_created_at": "created_at" in columns,
  241. "has_schema_version": "schema_version" in columns,
  242. "has_policy_run_id": "policy_run_id" in columns,
  243. "missing_required_columns": sorted(required_columns - columns),
  244. }
  245. cur.execute(f"SHOW INDEX FROM `{table}`")
  246. unique_indexes: dict[str, set[str]] = {}
  247. for row in cur.fetchall():
  248. if row["Non_unique"] == 0:
  249. unique_indexes.setdefault(row["Key_name"], set()).add(row["Column_name"])
  250. required_unique_indexes = REQUIRED_UNIQUE_INDEXES_BY_TABLE.get(table, [])
  251. missing_required_unique_indexes = [
  252. sorted(required_unique)
  253. for required_unique in required_unique_indexes
  254. if not any(
  255. index_columns == required_unique
  256. for index_columns in unique_indexes.values()
  257. )
  258. ]
  259. index_status[table] = {
  260. "has_required_unique_index": not missing_required_unique_indexes,
  261. "has_required_unique_indexes": not missing_required_unique_indexes,
  262. "required_unique_indexes": [
  263. sorted(required_unique) for required_unique in required_unique_indexes
  264. ],
  265. "missing_required_unique_indexes": missing_required_unique_indexes,
  266. "unique_indexes": {
  267. name: sorted(columns) for name, columns in sorted(unique_indexes.items())
  268. },
  269. }
  270. result["column_status"] = column_status
  271. result["index_status"] = index_status
  272. schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {}))
  273. result["schema_ready"] = schema_ready
  274. print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
  275. if args.allow_missing_tables:
  276. return 0
  277. if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready:
  278. return 1
  279. return 0
  280. def _schema_ready(
  281. column_status: dict[str, dict[str, Any]],
  282. index_status: dict[str, dict[str, Any]],
  283. ) -> bool:
  284. for table in EXPECTED_TABLES:
  285. status = column_status.get(table)
  286. if not status or status.get("missing_required_columns"):
  287. return False
  288. index = index_status.get(table)
  289. if not index or not index.get("has_required_unique_indexes"):
  290. return False
  291. return True
  292. def _parse_args() -> argparse.Namespace:
  293. parser = argparse.ArgumentParser(description="Validate Content Agent cloud MySQL schema readiness.")
  294. parser.add_argument("--env-file", default=".env", help="Primary env file. Defaults to project .env.")
  295. parser.add_argument("--host", default=None)
  296. parser.add_argument("--port", default=None)
  297. parser.add_argument("--database", default=None)
  298. parser.add_argument("--user", default=None)
  299. parser.add_argument("--password", default=None)
  300. parser.add_argument("--timeout", type=int, default=8)
  301. parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%")
  302. parser.add_argument(
  303. "--allow-missing-tables",
  304. action="store_true",
  305. help="Return success even when content_agent_* tables have not been created yet.",
  306. )
  307. return parser.parse_args()
  308. def _load_env_file(path_value: str | None) -> dict[str, str]:
  309. if not path_value:
  310. return {}
  311. path = Path(path_value)
  312. if not path.exists():
  313. return {}
  314. result: dict[str, str] = {}
  315. for line in path.read_text(encoding="utf-8").splitlines():
  316. stripped = line.strip()
  317. if not stripped or stripped.startswith("#") or "=" not in stripped:
  318. continue
  319. key, value = stripped.split("=", 1)
  320. result[key.strip()] = value.strip().strip('"').strip("'")
  321. return result
  322. def _env_value(env: dict[str, str], *keys: str, default: str | None = None) -> str | None:
  323. for key in keys:
  324. value = env.get(key)
  325. if value:
  326. return value
  327. return default
  328. def _has_required_privileges(grants: list[str], database: str) -> bool:
  329. scoped_grants = _content_agent_scoped_grants(grants, database)
  330. normalized = " ".join(scoped_grants).upper()
  331. return all(privilege in normalized for privilege in REQUIRED_PRIVILEGES) or "ALL PRIVILEGES" in normalized
  332. def _has_create_privilege(grants: list[str], database: str) -> bool:
  333. scoped_grants = _database_or_global_grants(grants, database)
  334. normalized = " ".join(scoped_grants).upper()
  335. return "CREATE" in normalized or "ALL PRIVILEGES" in normalized
  336. def _content_agent_scoped_grants(grants: list[str], database: str) -> list[str]:
  337. scoped: list[str] = []
  338. db_scope = f"ON `{database}`.*"
  339. for grant in grants:
  340. if "ON *.*" in grant or db_scope in grant:
  341. scoped.append(grant)
  342. continue
  343. if f"ON `{database}`.`content_agent_" in grant:
  344. scoped.append(grant)
  345. return scoped
  346. def _database_or_global_grants(grants: list[str], database: str) -> list[str]:
  347. db_scope = f"ON `{database}`.*"
  348. return [grant for grant in grants if "ON *.*" in grant or db_scope in grant]
  349. if __name__ == "__main__":
  350. raise SystemExit(main())