validate_content_agent_db.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. from __future__ import annotations
  2. import argparse
  3. import json
  4. import sys
  5. from pathlib import Path
  6. from typing import Any
  7. EXPECTED_TABLES = [
  8. "content_agent_runs",
  9. "content_agent_source_contexts",
  10. "content_agent_pattern_seed_packs",
  11. "content_agent_queries",
  12. "content_agent_discovered_content_items",
  13. "content_agent_content_media_records",
  14. "content_agent_rule_decisions",
  15. "content_agent_walk_actions",
  16. "content_agent_source_path_records",
  17. "content_agent_search_clues",
  18. "content_agent_run_events",
  19. "content_agent_final_outputs",
  20. "content_agent_publish_jobs",
  21. "content_agent_author_assets",
  22. "content_agent_author_asset_roles",
  23. "content_agent_pattern_recall_evidence",
  24. "content_agent_strategy_reviews",
  25. "content_agent_performance_feedback",
  26. "content_agent_search_clue_assets",
  27. "content_agent_search_clue_asset_evidence",
  28. "content_agent_policy_runs",
  29. ]
  30. REQUIRED_PRIVILEGES = ("SELECT", "INSERT", "UPDATE")
  31. COMMON_COLUMNS = {"id", "schema_version", "run_id", "created_at"}
  32. POLICY_RUN_TABLES = set(EXPECTED_TABLES) - {
  33. "content_agent_runs",
  34. "content_agent_source_contexts",
  35. "content_agent_author_assets",
  36. "content_agent_author_asset_roles",
  37. "content_agent_search_clue_assets",
  38. }
  39. REQUIRED_COLUMNS_BY_TABLE = {
  40. table: set(COMMON_COLUMNS) | ({"policy_run_id"} if table in POLICY_RUN_TABLES else set())
  41. for table in EXPECTED_TABLES
  42. }
  43. REQUIRED_COLUMNS_BY_TABLE["content_agent_policy_runs"].update(
  44. {
  45. "policy_bundle_id",
  46. "rule_pack_id",
  47. "strategy_id",
  48. "strategy_version",
  49. "rule_pack_version",
  50. "policy_bundle_hash",
  51. "strategy_source_ref",
  52. "rule_pack_source_ref",
  53. "evidence_bundle_schema_version",
  54. "runtime_record_schema_version",
  55. }
  56. )
  57. REQUIRED_COLUMNS_BY_TABLE["content_agent_content_media_records"].add("platform")
  58. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].update(
  59. {
  60. "author_asset_id",
  61. "platform",
  62. "platform_author_id",
  63. "asset_status",
  64. "source_type",
  65. "validation_status",
  66. "eligible_as_source",
  67. }
  68. )
  69. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].discard("run_id")
  70. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].update(
  71. {
  72. "author_asset_id",
  73. "role",
  74. "role_status",
  75. "assigned_by",
  76. }
  77. )
  78. REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].discard("run_id")
  79. REQUIRED_COLUMNS_BY_TABLE["content_agent_performance_feedback"].update(
  80. {
  81. "feedback_id",
  82. "platform",
  83. "platform_content_id",
  84. "feedback_source",
  85. "feedback_status",
  86. "raw_payload",
  87. }
  88. )
  89. REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].update(
  90. {
  91. "search_clue_asset_id",
  92. "platform",
  93. "clue_type",
  94. "normalized_clue_text",
  95. "display_clue_text",
  96. "promotion_status",
  97. "can_seed_next_run",
  98. "first_seen_run_id",
  99. "first_seen_policy_run_id",
  100. "summary_metrics",
  101. "raw_payload",
  102. }
  103. )
  104. REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].discard("run_id")
  105. REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_asset_evidence"].update(
  106. {
  107. "evidence_id",
  108. "search_clue_asset_id",
  109. "clue_id",
  110. "search_query_id",
  111. "source_path_record_ids",
  112. "decision_ids",
  113. "performance_feedback_refs",
  114. "raw_payload",
  115. }
  116. )
  117. REQUIRED_COLUMNS_BY_TABLE["content_agent_walk_actions"].update(
  118. {
  119. "walk_action_id",
  120. "edge_id",
  121. "walk_action",
  122. "walk_status",
  123. "raw_payload",
  124. }
  125. )
  126. for table in [
  127. "content_agent_queries",
  128. "content_agent_discovered_content_items",
  129. "content_agent_content_media_records",
  130. "content_agent_rule_decisions",
  131. "content_agent_walk_actions",
  132. "content_agent_source_path_records",
  133. "content_agent_search_clues",
  134. "content_agent_run_events",
  135. "content_agent_strategy_reviews",
  136. ]:
  137. REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload")
  138. REQUIRED_COLUMNS_BY_TABLE["content_agent_runs"].update(
  139. {"status", "error_code", "error_message", "error_detail"}
  140. )
  141. REQUIRED_COLUMNS_BY_TABLE["content_agent_queries"].update(
  142. {"search_query_id", "search_query", "search_query_effect_status"}
  143. )
  144. REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clues"].update(
  145. {"search_query_id", "search_query_effect_status"}
  146. )
  147. REQUIRED_COLUMNS_BY_TABLE["content_agent_run_events"].update(
  148. {"event_type", "status", "error_code", "message"}
  149. )
  150. REQUIRED_UNIQUE_INDEXES_BY_TABLE = {
  151. "content_agent_runs": [{"run_id"}],
  152. "content_agent_source_contexts": [{"run_id"}],
  153. "content_agent_pattern_seed_packs": [{"run_id", "policy_run_id"}],
  154. "content_agent_queries": [{"run_id", "policy_run_id", "search_query_id"}],
  155. "content_agent_discovered_content_items": [
  156. {"run_id", "policy_run_id", "content_discovery_id"},
  157. {"run_id", "policy_run_id", "platform", "platform_content_id"},
  158. ],
  159. "content_agent_content_media_records": [
  160. {"run_id", "policy_run_id", "platform", "platform_content_id"}
  161. ],
  162. "content_agent_rule_decisions": [
  163. {"run_id", "policy_run_id", "decision_id"},
  164. {"run_id", "policy_run_id", "decision_target_type", "decision_target_id"},
  165. ],
  166. "content_agent_walk_actions": [
  167. {"run_id", "policy_run_id", "walk_action_id"},
  168. ],
  169. "content_agent_source_path_records": [
  170. {"run_id", "policy_run_id", "source_path_record_id"}
  171. ],
  172. "content_agent_search_clues": [
  173. {"run_id", "policy_run_id", "clue_id"},
  174. {"run_id", "policy_run_id", "search_query_id"},
  175. ],
  176. "content_agent_run_events": [{"run_id", "policy_run_id", "event_id"}],
  177. "content_agent_final_outputs": [{"run_id", "policy_run_id", "output_version"}],
  178. "content_agent_publish_jobs": [{"run_id", "policy_run_id", "publish_job_id"}],
  179. "content_agent_author_assets": [
  180. {"author_asset_id"},
  181. {"platform", "platform_author_id"},
  182. ],
  183. "content_agent_author_asset_roles": [{"author_asset_id", "role"}],
  184. "content_agent_pattern_recall_evidence": [
  185. {"run_id", "policy_run_id", "recall_evidence_id"}
  186. ],
  187. "content_agent_strategy_reviews": [{"run_id", "policy_run_id", "review_id"}],
  188. "content_agent_performance_feedback": [{"run_id", "policy_run_id", "feedback_id"}],
  189. "content_agent_search_clue_assets": [
  190. {"search_clue_asset_id"},
  191. {"platform", "clue_type", "normalized_clue_text"},
  192. ],
  193. "content_agent_search_clue_asset_evidence": [
  194. {"evidence_id"},
  195. {"run_id", "policy_run_id", "clue_id"},
  196. ],
  197. "content_agent_policy_runs": [{"run_id", "policy_run_id"}],
  198. }
  199. EXPECTED_UNIQUE_INDEX_COUNT = sum(
  200. len(required_indexes) for required_indexes in REQUIRED_UNIQUE_INDEXES_BY_TABLE.values()
  201. )
  202. def main() -> int:
  203. args = _parse_args()
  204. try:
  205. import pymysql
  206. except ModuleNotFoundError:
  207. print(
  208. "Missing pymysql. Run with: uv run --with pymysql python scripts/validate_content_agent_db.py",
  209. file=sys.stderr,
  210. )
  211. return 2
  212. env = _load_env_file(args.env_file)
  213. raw_cfg = {
  214. "host": args.host or _env_value(env, "CONTENT_SUPPLY_DB_HOST"),
  215. "port": args.port or _env_value(env, "CONTENT_SUPPLY_DB_PORT"),
  216. "user": args.user or _env_value(env, "CONTENT_SUPPLY_DB_USER"),
  217. "password": args.password or _env_value(env, "CONTENT_SUPPLY_DB_PASSWORD"),
  218. "database": args.database or _env_value(env, "CONTENT_SUPPLY_DB_NAME"),
  219. }
  220. missing_keys = [
  221. key
  222. for key, value in {
  223. "CONTENT_SUPPLY_DB_HOST": raw_cfg["host"],
  224. "CONTENT_SUPPLY_DB_PORT": raw_cfg["port"],
  225. "CONTENT_SUPPLY_DB_USER": raw_cfg["user"],
  226. "CONTENT_SUPPLY_DB_PASSWORD": raw_cfg["password"],
  227. "CONTENT_SUPPLY_DB_NAME": raw_cfg["database"],
  228. }.items()
  229. if not value
  230. ]
  231. if missing_keys:
  232. print(
  233. f"Missing required CFA db env keys: {', '.join(missing_keys)}",
  234. file=sys.stderr,
  235. )
  236. return 2
  237. cfg = {
  238. "host": raw_cfg["host"],
  239. "port": int(raw_cfg["port"]),
  240. "user": raw_cfg["user"],
  241. "password": raw_cfg["password"],
  242. "database": raw_cfg["database"],
  243. "charset": "utf8mb4",
  244. "cursorclass": pymysql.cursors.DictCursor,
  245. "connect_timeout": args.timeout,
  246. "read_timeout": args.timeout,
  247. "write_timeout": args.timeout,
  248. }
  249. result: dict[str, Any] = {
  250. "connect": "pending",
  251. "host": cfg["host"],
  252. "database": cfg["database"],
  253. "expected_table_count": len(EXPECTED_TABLES),
  254. "expected_unique_index_count": EXPECTED_UNIQUE_INDEX_COUNT,
  255. }
  256. with pymysql.connect(**cfg) as conn:
  257. with conn.cursor() as cur:
  258. cur.execute(
  259. "SELECT DATABASE() AS db_name, CURRENT_USER() AS current_user_name, "
  260. "@@version AS mysql_version"
  261. )
  262. meta = cur.fetchone()
  263. result.update(
  264. {
  265. "connect": "ok",
  266. "current_user": meta["current_user_name"],
  267. "mysql_version": meta["mysql_version"],
  268. }
  269. )
  270. cur.execute("SHOW GRANTS FOR CURRENT_USER()")
  271. grants = [next(iter(row.values())) for row in cur.fetchall()]
  272. result["grants"] = grants
  273. result["has_runtime_privileges"] = _has_required_privileges(grants, cfg["database"])
  274. result["has_create_privilege"] = _has_create_privilege(grants, cfg["database"])
  275. cur.execute("SHOW TABLES LIKE 'content_agent_%'")
  276. found_tables = sorted(next(iter(row.values())) for row in cur.fetchall())
  277. missing_tables = sorted(set(EXPECTED_TABLES) - set(found_tables))
  278. extra_tables = sorted(set(found_tables) - set(EXPECTED_TABLES))
  279. result.update(
  280. {
  281. "found_table_count": len(found_tables),
  282. "found_tables": found_tables,
  283. "missing_tables": missing_tables,
  284. "extra_content_agent_tables": extra_tables,
  285. }
  286. )
  287. cur.execute(
  288. "SELECT COUNT(*) AS cnt "
  289. "FROM `content-deconstruction-supply`.demand_content "
  290. "WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) LIKE %s",
  291. (args.run_label_like,),
  292. )
  293. result["demand_batch_count"] = cur.fetchone()["cnt"]
  294. column_status = {}
  295. index_status = {}
  296. for table in found_tables:
  297. cur.execute(f"SHOW COLUMNS FROM `{table}`")
  298. columns = {row["Field"] for row in cur.fetchall()}
  299. required_columns = REQUIRED_COLUMNS_BY_TABLE.get(table, set())
  300. column_status[table] = {
  301. "has_id": "id" in columns,
  302. "has_run_id": "run_id" in columns,
  303. "has_created_at": "created_at" in columns,
  304. "has_schema_version": "schema_version" in columns,
  305. "has_policy_run_id": "policy_run_id" in columns,
  306. "missing_required_columns": sorted(required_columns - columns),
  307. }
  308. cur.execute(f"SHOW INDEX FROM `{table}`")
  309. unique_indexes: dict[str, set[str]] = {}
  310. for row in cur.fetchall():
  311. if row["Non_unique"] == 0:
  312. unique_indexes.setdefault(row["Key_name"], set()).add(row["Column_name"])
  313. required_unique_indexes = REQUIRED_UNIQUE_INDEXES_BY_TABLE.get(table, [])
  314. missing_required_unique_indexes = [
  315. sorted(required_unique)
  316. for required_unique in required_unique_indexes
  317. if not any(
  318. index_columns == required_unique
  319. for index_columns in unique_indexes.values()
  320. )
  321. ]
  322. index_status[table] = {
  323. "has_required_unique_index": not missing_required_unique_indexes,
  324. "has_required_unique_indexes": not missing_required_unique_indexes,
  325. "required_unique_indexes": [
  326. sorted(required_unique) for required_unique in required_unique_indexes
  327. ],
  328. "missing_required_unique_indexes": missing_required_unique_indexes,
  329. "unique_indexes": {
  330. name: sorted(columns) for name, columns in sorted(unique_indexes.items())
  331. },
  332. }
  333. result["column_status"] = column_status
  334. result["index_status"] = index_status
  335. if args.audit_run_id:
  336. result["audit_runs"] = _audit_query_failures(cur, args.audit_run_id)
  337. schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {}))
  338. result["schema_ready"] = schema_ready
  339. print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
  340. if args.allow_missing_tables:
  341. return 0
  342. audit_ready = all(
  343. audit.get("status") in {"pass", "skipped"}
  344. for audit in result.get("audit_runs", [])
  345. )
  346. if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready or not audit_ready:
  347. return 1
  348. return 0
  349. def _schema_ready(
  350. column_status: dict[str, dict[str, Any]],
  351. index_status: dict[str, dict[str, Any]],
  352. ) -> bool:
  353. for table in EXPECTED_TABLES:
  354. status = column_status.get(table)
  355. if not status or status.get("missing_required_columns"):
  356. return False
  357. index = index_status.get(table)
  358. if not index or not index.get("has_required_unique_indexes"):
  359. return False
  360. return True
  361. def _parse_args() -> argparse.Namespace:
  362. parser = argparse.ArgumentParser(description="Validate Content Agent cloud MySQL schema readiness.")
  363. parser.add_argument("--env-file", default=".env", help="Primary env file. Defaults to project .env.")
  364. parser.add_argument("--host", default=None)
  365. parser.add_argument("--port", default=None)
  366. parser.add_argument("--database", default=None)
  367. parser.add_argument("--user", default=None)
  368. parser.add_argument("--password", default=None)
  369. parser.add_argument("--timeout", type=int, default=8)
  370. parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%")
  371. parser.add_argument(
  372. "--audit-run-id",
  373. action="append",
  374. default=[],
  375. help="Optional exact run_id to audit query failure closure. May be passed more than once.",
  376. )
  377. parser.add_argument(
  378. "--allow-missing-tables",
  379. action="store_true",
  380. help="Return success even when content_agent_* tables have not been created yet.",
  381. )
  382. return parser.parse_args()
  383. def _audit_query_failures(cur: Any, run_ids: list[str]) -> list[dict[str, Any]]:
  384. audits: list[dict[str, Any]] = []
  385. for run_id in run_ids:
  386. cur.execute(
  387. "SELECT `run_id`, `error_code`, `error_detail` "
  388. "FROM `content_agent_runs` WHERE `run_id` = %s",
  389. (run_id,),
  390. )
  391. run_row = cur.fetchone()
  392. if not run_row:
  393. audits.append({"run_id": run_id, "status": "fail", "findings": ["run_not_found"]})
  394. continue
  395. error_detail = _decode_json(run_row.get("error_detail")) or {}
  396. query_failures = error_detail.get("query_failures")
  397. if not isinstance(query_failures, list) or not query_failures:
  398. audits.append({"run_id": run_id, "status": "skipped", "reason": "no_query_failures"})
  399. continue
  400. failure_ids = [
  401. str(failure.get("search_query_id"))
  402. for failure in query_failures
  403. if isinstance(failure, dict) and failure.get("search_query_id")
  404. ]
  405. findings: list[str] = []
  406. if not failure_ids:
  407. findings.append("query_failures_missing_search_query_id")
  408. query_rows = _fetch_rows_for_ids(
  409. cur,
  410. "content_agent_queries",
  411. run_id,
  412. failure_ids,
  413. "search_query_id, search_query_effect_status, raw_payload",
  414. )
  415. query_by_id = {row["search_query_id"]: row for row in query_rows}
  416. for query_id in failure_ids:
  417. row = query_by_id.get(query_id)
  418. if not row:
  419. findings.append(f"query_missing:{query_id}")
  420. continue
  421. if row.get("search_query_effect_status") != "failed":
  422. findings.append(f"query_not_failed:{query_id}")
  423. raw_payload = _decode_json(row.get("raw_payload")) or {}
  424. if not isinstance(raw_payload.get("query_failure"), dict):
  425. findings.append(f"query_failure_payload_missing:{query_id}")
  426. clue_rows = _fetch_rows_for_ids(
  427. cur,
  428. "content_agent_search_clues",
  429. run_id,
  430. failure_ids,
  431. "search_query_id, search_query_effect_status, raw_payload",
  432. )
  433. clue_by_id = {row["search_query_id"]: row for row in clue_rows}
  434. for query_id in failure_ids:
  435. row = clue_by_id.get(query_id)
  436. if not row:
  437. findings.append(f"search_clue_missing:{query_id}")
  438. continue
  439. if row.get("search_query_effect_status") != "failed":
  440. findings.append(f"search_clue_not_failed:{query_id}")
  441. event_refs = [f"search_queries.jsonl:{query_id}" for query_id in failure_ids]
  442. event_rows = _fetch_rows_for_values(
  443. cur,
  444. "content_agent_run_events",
  445. run_id,
  446. event_refs,
  447. "input_ref",
  448. "input_ref, event_type, status",
  449. extra_where="AND event_type = 'platform_query_failed'",
  450. )
  451. event_by_ref = {row["input_ref"]: row for row in event_rows}
  452. for event_ref in event_refs:
  453. row = event_by_ref.get(event_ref)
  454. if not row:
  455. findings.append(f"platform_query_failed_event_missing:{event_ref}")
  456. continue
  457. if row.get("status") != "failed":
  458. findings.append(f"platform_query_failed_event_status:{event_ref}")
  459. audits.append(
  460. {
  461. "run_id": run_id,
  462. "query_failure_count": len(failure_ids),
  463. "status": "fail" if findings else "pass",
  464. "findings": findings,
  465. }
  466. )
  467. return audits
  468. def _fetch_rows_for_ids(
  469. cur: Any,
  470. table: str,
  471. run_id: str,
  472. ids: list[str],
  473. columns: str,
  474. ) -> list[dict[str, Any]]:
  475. return _fetch_rows_for_values(cur, table, run_id, ids, "search_query_id", columns)
  476. def _fetch_rows_for_values(
  477. cur: Any,
  478. table: str,
  479. run_id: str,
  480. values: list[str],
  481. column: str,
  482. columns: str,
  483. *,
  484. extra_where: str = "",
  485. ) -> list[dict[str, Any]]:
  486. if not values:
  487. return []
  488. placeholders = ", ".join(["%s"] * len(values))
  489. cur.execute(
  490. f"SELECT {columns} FROM `{table}` "
  491. f"WHERE `run_id` = %s AND `{column}` IN ({placeholders}) {extra_where}",
  492. [run_id, *values],
  493. )
  494. return list(cur.fetchall())
  495. def _decode_json(value: Any) -> Any:
  496. if value is None or isinstance(value, (dict, list)):
  497. return value
  498. return json.loads(value)
  499. def _load_env_file(path_value: str | None) -> dict[str, str]:
  500. if not path_value:
  501. return {}
  502. path = Path(path_value)
  503. if not path.exists():
  504. return {}
  505. result: dict[str, str] = {}
  506. for line in path.read_text(encoding="utf-8").splitlines():
  507. stripped = line.strip()
  508. if not stripped or stripped.startswith("#") or "=" not in stripped:
  509. continue
  510. key, value = stripped.split("=", 1)
  511. result[key.strip()] = value.strip().strip('"').strip("'")
  512. return result
  513. def _env_value(env: dict[str, str], *keys: str, default: str | None = None) -> str | None:
  514. for key in keys:
  515. value = env.get(key)
  516. if value:
  517. return value
  518. return default
  519. def _has_required_privileges(grants: list[str], database: str) -> bool:
  520. scoped_grants = _content_agent_scoped_grants(grants, database)
  521. normalized = " ".join(scoped_grants).upper()
  522. return all(privilege in normalized for privilege in REQUIRED_PRIVILEGES) or "ALL PRIVILEGES" in normalized
  523. def _has_create_privilege(grants: list[str], database: str) -> bool:
  524. scoped_grants = _database_or_global_grants(grants, database)
  525. normalized = " ".join(scoped_grants).upper()
  526. return "CREATE" in normalized or "ALL PRIVILEGES" in normalized
  527. def _content_agent_scoped_grants(grants: list[str], database: str) -> list[str]:
  528. scoped: list[str] = []
  529. db_scope = f"ON `{database}`.*"
  530. for grant in grants:
  531. if "ON *.*" in grant or db_scope in grant:
  532. scoped.append(grant)
  533. continue
  534. if f"ON `{database}`.`content_agent_" in grant:
  535. scoped.append(grant)
  536. return scoped
  537. def _database_or_global_grants(grants: list[str], database: str) -> list[str]:
  538. db_scope = f"ON `{database}`.*"
  539. return [grant for grant in grants if "ON *.*" in grant or db_scope in grant]
  540. if __name__ == "__main__":
  541. raise SystemExit(main())