from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any EXPECTED_TABLES = [ "content_agent_runs", "content_agent_source_contexts", "content_agent_pattern_seed_packs", "content_agent_queries", "content_agent_discovered_content_items", "content_agent_content_media_records", "content_agent_rule_decisions", "content_agent_walk_actions", "content_agent_source_path_records", "content_agent_search_clues", "content_agent_run_events", "content_agent_final_outputs", "content_agent_publish_jobs", "content_agent_author_assets", "content_agent_author_asset_roles", "content_agent_pattern_recall_evidence", "content_agent_strategy_reviews", "content_agent_performance_feedback", "content_agent_search_clue_assets", "content_agent_search_clue_asset_evidence", "content_agent_policy_runs", ] REQUIRED_PRIVILEGES = ("SELECT", "INSERT", "UPDATE") COMMON_COLUMNS = {"id", "schema_version", "run_id", "created_at"} POLICY_RUN_TABLES = set(EXPECTED_TABLES) - { "content_agent_runs", "content_agent_source_contexts", "content_agent_author_assets", "content_agent_author_asset_roles", "content_agent_search_clue_assets", } REQUIRED_COLUMNS_BY_TABLE = { table: set(COMMON_COLUMNS) | ({"policy_run_id"} if table in POLICY_RUN_TABLES else set()) for table in EXPECTED_TABLES } REQUIRED_COLUMNS_BY_TABLE["content_agent_policy_runs"].update( { "policy_bundle_id", "rule_pack_id", "strategy_id", "strategy_version", "rule_pack_version", "policy_bundle_hash", "strategy_source_ref", "rule_pack_source_ref", "evidence_bundle_schema_version", "runtime_record_schema_version", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_content_media_records"].add("platform") REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].update( { "author_asset_id", "platform", "platform_author_id", "asset_status", "source_type", "validation_status", "eligible_as_source", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].discard("run_id") REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].update( { "author_asset_id", "role", "role_status", "assigned_by", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].discard("run_id") REQUIRED_COLUMNS_BY_TABLE["content_agent_performance_feedback"].update( { "feedback_id", "platform", "platform_content_id", "feedback_source", "feedback_status", "raw_payload", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].update( { "search_clue_asset_id", "platform", "clue_type", "normalized_clue_text", "display_clue_text", "promotion_status", "can_seed_next_run", "first_seen_run_id", "first_seen_policy_run_id", "summary_metrics", "raw_payload", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].discard("run_id") REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_asset_evidence"].update( { "evidence_id", "search_clue_asset_id", "clue_id", "search_query_id", "source_path_record_ids", "decision_ids", "performance_feedback_refs", "raw_payload", } ) REQUIRED_COLUMNS_BY_TABLE["content_agent_walk_actions"].update( { "walk_action_id", "edge_id", "walk_action", "walk_status", "raw_payload", } ) for table in [ "content_agent_queries", "content_agent_discovered_content_items", "content_agent_content_media_records", "content_agent_rule_decisions", "content_agent_walk_actions", "content_agent_source_path_records", "content_agent_search_clues", "content_agent_run_events", "content_agent_strategy_reviews", ]: REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload") REQUIRED_COLUMNS_BY_TABLE["content_agent_runs"].update( {"status", "error_code", "error_message", "error_detail"} ) REQUIRED_COLUMNS_BY_TABLE["content_agent_queries"].update( {"search_query_id", "search_query", "search_query_effect_status"} ) REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clues"].update( {"search_query_id", "search_query_effect_status"} ) REQUIRED_COLUMNS_BY_TABLE["content_agent_run_events"].update( {"event_type", "status", "error_code", "message"} ) REQUIRED_UNIQUE_INDEXES_BY_TABLE = { "content_agent_runs": [{"run_id"}], "content_agent_source_contexts": [{"run_id"}], "content_agent_pattern_seed_packs": [{"run_id", "policy_run_id"}], "content_agent_queries": [{"run_id", "policy_run_id", "search_query_id"}], "content_agent_discovered_content_items": [ {"run_id", "policy_run_id", "content_discovery_id"}, {"run_id", "policy_run_id", "platform", "platform_content_id"}, ], "content_agent_content_media_records": [ {"run_id", "policy_run_id", "platform", "platform_content_id"} ], "content_agent_rule_decisions": [ {"run_id", "policy_run_id", "decision_id"}, {"run_id", "policy_run_id", "decision_target_type", "decision_target_id"}, ], "content_agent_walk_actions": [ {"run_id", "policy_run_id", "walk_action_id"}, ], "content_agent_source_path_records": [ {"run_id", "policy_run_id", "source_path_record_id"} ], "content_agent_search_clues": [ {"run_id", "policy_run_id", "clue_id"}, {"run_id", "policy_run_id", "search_query_id"}, ], "content_agent_run_events": [{"run_id", "policy_run_id", "event_id"}], "content_agent_final_outputs": [{"run_id", "policy_run_id", "output_version"}], "content_agent_publish_jobs": [{"run_id", "policy_run_id", "publish_job_id"}], "content_agent_author_assets": [ {"author_asset_id"}, {"platform", "platform_author_id"}, ], "content_agent_author_asset_roles": [{"author_asset_id", "role"}], "content_agent_pattern_recall_evidence": [ {"run_id", "policy_run_id", "recall_evidence_id"} ], "content_agent_strategy_reviews": [{"run_id", "policy_run_id", "review_id"}], "content_agent_performance_feedback": [{"run_id", "policy_run_id", "feedback_id"}], "content_agent_search_clue_assets": [ {"search_clue_asset_id"}, {"platform", "clue_type", "normalized_clue_text"}, ], "content_agent_search_clue_asset_evidence": [ {"evidence_id"}, {"run_id", "policy_run_id", "clue_id"}, ], "content_agent_policy_runs": [{"run_id", "policy_run_id"}], } EXPECTED_UNIQUE_INDEX_COUNT = sum( len(required_indexes) for required_indexes in REQUIRED_UNIQUE_INDEXES_BY_TABLE.values() ) def main() -> int: args = _parse_args() try: import pymysql except ModuleNotFoundError: print( "Missing pymysql. Run with: uv run --with pymysql python scripts/validate_content_agent_db.py", file=sys.stderr, ) return 2 env = _load_env_file(args.env_file) raw_cfg = { "host": args.host or _env_value(env, "CONTENT_SUPPLY_DB_HOST"), "port": args.port or _env_value(env, "CONTENT_SUPPLY_DB_PORT"), "user": args.user or _env_value(env, "CONTENT_SUPPLY_DB_USER"), "password": args.password or _env_value(env, "CONTENT_SUPPLY_DB_PASSWORD"), "database": args.database or _env_value(env, "CONTENT_SUPPLY_DB_NAME"), } missing_keys = [ key for key, value in { "CONTENT_SUPPLY_DB_HOST": raw_cfg["host"], "CONTENT_SUPPLY_DB_PORT": raw_cfg["port"], "CONTENT_SUPPLY_DB_USER": raw_cfg["user"], "CONTENT_SUPPLY_DB_PASSWORD": raw_cfg["password"], "CONTENT_SUPPLY_DB_NAME": raw_cfg["database"], }.items() if not value ] if missing_keys: print( f"Missing required CFA db env keys: {', '.join(missing_keys)}", file=sys.stderr, ) return 2 cfg = { "host": raw_cfg["host"], "port": int(raw_cfg["port"]), "user": raw_cfg["user"], "password": raw_cfg["password"], "database": raw_cfg["database"], "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, "connect_timeout": args.timeout, "read_timeout": args.timeout, "write_timeout": args.timeout, } result: dict[str, Any] = { "connect": "pending", "host": cfg["host"], "database": cfg["database"], "expected_table_count": len(EXPECTED_TABLES), "expected_unique_index_count": EXPECTED_UNIQUE_INDEX_COUNT, } with pymysql.connect(**cfg) as conn: with conn.cursor() as cur: cur.execute( "SELECT DATABASE() AS db_name, CURRENT_USER() AS current_user_name, " "@@version AS mysql_version" ) meta = cur.fetchone() result.update( { "connect": "ok", "current_user": meta["current_user_name"], "mysql_version": meta["mysql_version"], } ) cur.execute("SHOW GRANTS FOR CURRENT_USER()") grants = [next(iter(row.values())) for row in cur.fetchall()] result["grants"] = grants result["has_runtime_privileges"] = _has_required_privileges(grants, cfg["database"]) result["has_create_privilege"] = _has_create_privilege(grants, cfg["database"]) cur.execute("SHOW TABLES LIKE 'content_agent_%'") found_tables = sorted(next(iter(row.values())) for row in cur.fetchall()) missing_tables = sorted(set(EXPECTED_TABLES) - set(found_tables)) extra_tables = sorted(set(found_tables) - set(EXPECTED_TABLES)) result.update( { "found_table_count": len(found_tables), "found_tables": found_tables, "missing_tables": missing_tables, "extra_content_agent_tables": extra_tables, } ) cur.execute( "SELECT COUNT(*) AS cnt " "FROM `content-deconstruction-supply`.demand_content " "WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) LIKE %s", (args.run_label_like,), ) result["demand_batch_count"] = cur.fetchone()["cnt"] column_status = {} index_status = {} for table in found_tables: cur.execute(f"SHOW COLUMNS FROM `{table}`") columns = {row["Field"] for row in cur.fetchall()} required_columns = REQUIRED_COLUMNS_BY_TABLE.get(table, set()) column_status[table] = { "has_id": "id" in columns, "has_run_id": "run_id" in columns, "has_created_at": "created_at" in columns, "has_schema_version": "schema_version" in columns, "has_policy_run_id": "policy_run_id" in columns, "missing_required_columns": sorted(required_columns - columns), } cur.execute(f"SHOW INDEX FROM `{table}`") unique_indexes: dict[str, set[str]] = {} for row in cur.fetchall(): if row["Non_unique"] == 0: unique_indexes.setdefault(row["Key_name"], set()).add(row["Column_name"]) required_unique_indexes = REQUIRED_UNIQUE_INDEXES_BY_TABLE.get(table, []) missing_required_unique_indexes = [ sorted(required_unique) for required_unique in required_unique_indexes if not any( index_columns == required_unique for index_columns in unique_indexes.values() ) ] index_status[table] = { "has_required_unique_index": not missing_required_unique_indexes, "has_required_unique_indexes": not missing_required_unique_indexes, "required_unique_indexes": [ sorted(required_unique) for required_unique in required_unique_indexes ], "missing_required_unique_indexes": missing_required_unique_indexes, "unique_indexes": { name: sorted(columns) for name, columns in sorted(unique_indexes.items()) }, } result["column_status"] = column_status result["index_status"] = index_status if args.audit_run_id: result["audit_runs"] = _audit_query_failures(cur, args.audit_run_id) schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {})) result["schema_ready"] = schema_ready print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) if args.allow_missing_tables: return 0 audit_ready = all( audit.get("status") in {"pass", "skipped"} for audit in result.get("audit_runs", []) ) if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready or not audit_ready: return 1 return 0 def _schema_ready( column_status: dict[str, dict[str, Any]], index_status: dict[str, dict[str, Any]], ) -> bool: for table in EXPECTED_TABLES: status = column_status.get(table) if not status or status.get("missing_required_columns"): return False index = index_status.get(table) if not index or not index.get("has_required_unique_indexes"): return False return True def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Validate Content Agent cloud MySQL schema readiness.") parser.add_argument("--env-file", default=".env", help="Primary env file. Defaults to project .env.") parser.add_argument("--host", default=None) parser.add_argument("--port", default=None) parser.add_argument("--database", default=None) parser.add_argument("--user", default=None) parser.add_argument("--password", default=None) parser.add_argument("--timeout", type=int, default=8) parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%") parser.add_argument( "--audit-run-id", action="append", default=[], help="Optional exact run_id to audit query failure closure. May be passed more than once.", ) parser.add_argument( "--allow-missing-tables", action="store_true", help="Return success even when content_agent_* tables have not been created yet.", ) return parser.parse_args() def _audit_query_failures(cur: Any, run_ids: list[str]) -> list[dict[str, Any]]: audits: list[dict[str, Any]] = [] for run_id in run_ids: cur.execute( "SELECT `run_id`, `error_code`, `error_detail` " "FROM `content_agent_runs` WHERE `run_id` = %s", (run_id,), ) run_row = cur.fetchone() if not run_row: audits.append({"run_id": run_id, "status": "fail", "findings": ["run_not_found"]}) continue error_detail = _decode_json(run_row.get("error_detail")) or {} query_failures = error_detail.get("query_failures") if not isinstance(query_failures, list) or not query_failures: audits.append({"run_id": run_id, "status": "skipped", "reason": "no_query_failures"}) continue failure_ids = [ str(failure.get("search_query_id")) for failure in query_failures if isinstance(failure, dict) and failure.get("search_query_id") ] findings: list[str] = [] if not failure_ids: findings.append("query_failures_missing_search_query_id") query_rows = _fetch_rows_for_ids( cur, "content_agent_queries", run_id, failure_ids, "search_query_id, search_query_effect_status, raw_payload", ) query_by_id = {row["search_query_id"]: row for row in query_rows} for query_id in failure_ids: row = query_by_id.get(query_id) if not row: findings.append(f"query_missing:{query_id}") continue if row.get("search_query_effect_status") != "failed": findings.append(f"query_not_failed:{query_id}") raw_payload = _decode_json(row.get("raw_payload")) or {} if not isinstance(raw_payload.get("query_failure"), dict): findings.append(f"query_failure_payload_missing:{query_id}") clue_rows = _fetch_rows_for_ids( cur, "content_agent_search_clues", run_id, failure_ids, "search_query_id, search_query_effect_status, raw_payload", ) clue_by_id = {row["search_query_id"]: row for row in clue_rows} for query_id in failure_ids: row = clue_by_id.get(query_id) if not row: findings.append(f"search_clue_missing:{query_id}") continue if row.get("search_query_effect_status") != "failed": findings.append(f"search_clue_not_failed:{query_id}") event_refs = [f"search_queries.jsonl:{query_id}" for query_id in failure_ids] event_rows = _fetch_rows_for_values( cur, "content_agent_run_events", run_id, event_refs, "input_ref", "input_ref, event_type, status", extra_where="AND event_type = 'platform_query_failed'", ) event_by_ref = {row["input_ref"]: row for row in event_rows} for event_ref in event_refs: row = event_by_ref.get(event_ref) if not row: findings.append(f"platform_query_failed_event_missing:{event_ref}") continue if row.get("status") != "failed": findings.append(f"platform_query_failed_event_status:{event_ref}") audits.append( { "run_id": run_id, "query_failure_count": len(failure_ids), "status": "fail" if findings else "pass", "findings": findings, } ) return audits def _fetch_rows_for_ids( cur: Any, table: str, run_id: str, ids: list[str], columns: str, ) -> list[dict[str, Any]]: return _fetch_rows_for_values(cur, table, run_id, ids, "search_query_id", columns) def _fetch_rows_for_values( cur: Any, table: str, run_id: str, values: list[str], column: str, columns: str, *, extra_where: str = "", ) -> list[dict[str, Any]]: if not values: return [] placeholders = ", ".join(["%s"] * len(values)) cur.execute( f"SELECT {columns} FROM `{table}` " f"WHERE `run_id` = %s AND `{column}` IN ({placeholders}) {extra_where}", [run_id, *values], ) return list(cur.fetchall()) def _decode_json(value: Any) -> Any: if value is None or isinstance(value, (dict, list)): return value return json.loads(value) def _load_env_file(path_value: str | None) -> dict[str, str]: if not path_value: return {} path = Path(path_value) if not path.exists(): return {} result: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) result[key.strip()] = value.strip().strip('"').strip("'") return result def _env_value(env: dict[str, str], *keys: str, default: str | None = None) -> str | None: for key in keys: value = env.get(key) if value: return value return default def _has_required_privileges(grants: list[str], database: str) -> bool: scoped_grants = _content_agent_scoped_grants(grants, database) normalized = " ".join(scoped_grants).upper() return all(privilege in normalized for privilege in REQUIRED_PRIVILEGES) or "ALL PRIVILEGES" in normalized def _has_create_privilege(grants: list[str], database: str) -> bool: scoped_grants = _database_or_global_grants(grants, database) normalized = " ".join(scoped_grants).upper() return "CREATE" in normalized or "ALL PRIVILEGES" in normalized def _content_agent_scoped_grants(grants: list[str], database: str) -> list[str]: scoped: list[str] = [] db_scope = f"ON `{database}`.*" for grant in grants: if "ON *.*" in grant or db_scope in grant: scoped.append(grant) continue if f"ON `{database}`.`content_agent_" in grant: scoped.append(grant) return scoped def _database_or_global_grants(grants: list[str], database: str) -> list[str]: db_scope = f"ON `{database}`.*" return [grant for grant in grants if "ON *.*" in grant or db_scope in grant] if __name__ == "__main__": raise SystemExit(main())