| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585 |
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- EXPECTED_TABLES = [
- "content_agent_runs",
- "content_agent_source_contexts",
- "content_agent_pattern_seed_packs",
- "content_agent_queries",
- "content_agent_discovered_content_items",
- "content_agent_content_media_records",
- "content_agent_rule_decisions",
- "content_agent_walk_actions",
- "content_agent_source_path_records",
- "content_agent_search_clues",
- "content_agent_run_events",
- "content_agent_final_outputs",
- "content_agent_publish_jobs",
- "content_agent_author_assets",
- "content_agent_author_asset_roles",
- "content_agent_pattern_recall_evidence",
- "content_agent_strategy_reviews",
- "content_agent_performance_feedback",
- "content_agent_search_clue_assets",
- "content_agent_search_clue_asset_evidence",
- "content_agent_policy_runs",
- ]
- REQUIRED_PRIVILEGES = ("SELECT", "INSERT", "UPDATE")
- COMMON_COLUMNS = {"id", "schema_version", "run_id", "created_at"}
- POLICY_RUN_TABLES = set(EXPECTED_TABLES) - {
- "content_agent_runs",
- "content_agent_source_contexts",
- "content_agent_author_assets",
- "content_agent_author_asset_roles",
- "content_agent_search_clue_assets",
- }
- REQUIRED_COLUMNS_BY_TABLE = {
- table: set(COMMON_COLUMNS) | ({"policy_run_id"} if table in POLICY_RUN_TABLES else set())
- for table in EXPECTED_TABLES
- }
- REQUIRED_COLUMNS_BY_TABLE["content_agent_policy_runs"].update(
- {
- "policy_bundle_id",
- "rule_pack_id",
- "strategy_id",
- "strategy_version",
- "rule_pack_version",
- "policy_bundle_hash",
- "strategy_source_ref",
- "rule_pack_source_ref",
- "evidence_bundle_schema_version",
- "runtime_record_schema_version",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_content_media_records"].add("platform")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].update(
- {
- "author_asset_id",
- "platform",
- "platform_author_id",
- "asset_status",
- "source_type",
- "validation_status",
- "eligible_as_source",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_assets"].discard("run_id")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].update(
- {
- "author_asset_id",
- "role",
- "role_status",
- "assigned_by",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_author_asset_roles"].discard("run_id")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_performance_feedback"].update(
- {
- "feedback_id",
- "platform",
- "platform_content_id",
- "feedback_source",
- "feedback_status",
- "raw_payload",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].update(
- {
- "search_clue_asset_id",
- "platform",
- "clue_type",
- "normalized_clue_text",
- "display_clue_text",
- "promotion_status",
- "can_seed_next_run",
- "first_seen_run_id",
- "first_seen_policy_run_id",
- "summary_metrics",
- "raw_payload",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_assets"].discard("run_id")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clue_asset_evidence"].update(
- {
- "evidence_id",
- "search_clue_asset_id",
- "clue_id",
- "search_query_id",
- "source_path_record_ids",
- "decision_ids",
- "performance_feedback_refs",
- "raw_payload",
- }
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_walk_actions"].update(
- {
- "walk_action_id",
- "edge_id",
- "walk_action",
- "walk_status",
- "raw_payload",
- }
- )
- for table in [
- "content_agent_queries",
- "content_agent_discovered_content_items",
- "content_agent_content_media_records",
- "content_agent_rule_decisions",
- "content_agent_walk_actions",
- "content_agent_source_path_records",
- "content_agent_search_clues",
- "content_agent_run_events",
- "content_agent_strategy_reviews",
- ]:
- REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload")
- REQUIRED_COLUMNS_BY_TABLE["content_agent_runs"].update(
- {"status", "error_code", "error_message", "error_detail"}
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_queries"].update(
- {"search_query_id", "search_query", "search_query_effect_status"}
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clues"].update(
- {"search_query_id", "search_query_effect_status"}
- )
- REQUIRED_COLUMNS_BY_TABLE["content_agent_run_events"].update(
- {"event_type", "status", "error_code", "message"}
- )
- REQUIRED_UNIQUE_INDEXES_BY_TABLE = {
- "content_agent_runs": [{"run_id"}],
- "content_agent_source_contexts": [{"run_id"}],
- "content_agent_pattern_seed_packs": [{"run_id", "policy_run_id"}],
- "content_agent_queries": [{"run_id", "policy_run_id", "search_query_id"}],
- "content_agent_discovered_content_items": [
- {"run_id", "policy_run_id", "content_discovery_id"},
- {"run_id", "policy_run_id", "platform", "platform_content_id"},
- ],
- "content_agent_content_media_records": [
- {"run_id", "policy_run_id", "platform", "platform_content_id"}
- ],
- "content_agent_rule_decisions": [
- {"run_id", "policy_run_id", "decision_id"},
- {"run_id", "policy_run_id", "decision_target_type", "decision_target_id"},
- ],
- "content_agent_walk_actions": [
- {"run_id", "policy_run_id", "walk_action_id"},
- ],
- "content_agent_source_path_records": [
- {"run_id", "policy_run_id", "source_path_record_id"}
- ],
- "content_agent_search_clues": [
- {"run_id", "policy_run_id", "clue_id"},
- {"run_id", "policy_run_id", "search_query_id"},
- ],
- "content_agent_run_events": [{"run_id", "policy_run_id", "event_id"}],
- "content_agent_final_outputs": [{"run_id", "policy_run_id", "output_version"}],
- "content_agent_publish_jobs": [{"run_id", "policy_run_id", "publish_job_id"}],
- "content_agent_author_assets": [
- {"author_asset_id"},
- {"platform", "platform_author_id"},
- ],
- "content_agent_author_asset_roles": [{"author_asset_id", "role"}],
- "content_agent_pattern_recall_evidence": [
- {"run_id", "policy_run_id", "recall_evidence_id"}
- ],
- "content_agent_strategy_reviews": [{"run_id", "policy_run_id", "review_id"}],
- "content_agent_performance_feedback": [{"run_id", "policy_run_id", "feedback_id"}],
- "content_agent_search_clue_assets": [
- {"search_clue_asset_id"},
- {"platform", "clue_type", "normalized_clue_text"},
- ],
- "content_agent_search_clue_asset_evidence": [
- {"evidence_id"},
- {"run_id", "policy_run_id", "clue_id"},
- ],
- "content_agent_policy_runs": [{"run_id", "policy_run_id"}],
- }
- EXPECTED_UNIQUE_INDEX_COUNT = sum(
- len(required_indexes) for required_indexes in REQUIRED_UNIQUE_INDEXES_BY_TABLE.values()
- )
- def main() -> int:
- args = _parse_args()
- try:
- import pymysql
- except ModuleNotFoundError:
- print(
- "Missing pymysql. Run with: uv run --with pymysql python scripts/validate_content_agent_db.py",
- file=sys.stderr,
- )
- return 2
- env = _load_env_file(args.env_file)
- raw_cfg = {
- "host": args.host or _env_value(env, "CONTENT_SUPPLY_DB_HOST"),
- "port": args.port or _env_value(env, "CONTENT_SUPPLY_DB_PORT"),
- "user": args.user or _env_value(env, "CONTENT_SUPPLY_DB_USER"),
- "password": args.password or _env_value(env, "CONTENT_SUPPLY_DB_PASSWORD"),
- "database": args.database or _env_value(env, "CONTENT_SUPPLY_DB_NAME"),
- }
- missing_keys = [
- key
- for key, value in {
- "CONTENT_SUPPLY_DB_HOST": raw_cfg["host"],
- "CONTENT_SUPPLY_DB_PORT": raw_cfg["port"],
- "CONTENT_SUPPLY_DB_USER": raw_cfg["user"],
- "CONTENT_SUPPLY_DB_PASSWORD": raw_cfg["password"],
- "CONTENT_SUPPLY_DB_NAME": raw_cfg["database"],
- }.items()
- if not value
- ]
- if missing_keys:
- print(
- f"Missing required CFA db env keys: {', '.join(missing_keys)}",
- file=sys.stderr,
- )
- return 2
- cfg = {
- "host": raw_cfg["host"],
- "port": int(raw_cfg["port"]),
- "user": raw_cfg["user"],
- "password": raw_cfg["password"],
- "database": raw_cfg["database"],
- "charset": "utf8mb4",
- "cursorclass": pymysql.cursors.DictCursor,
- "connect_timeout": args.timeout,
- "read_timeout": args.timeout,
- "write_timeout": args.timeout,
- }
- result: dict[str, Any] = {
- "connect": "pending",
- "host": cfg["host"],
- "database": cfg["database"],
- "expected_table_count": len(EXPECTED_TABLES),
- "expected_unique_index_count": EXPECTED_UNIQUE_INDEX_COUNT,
- }
- with pymysql.connect(**cfg) as conn:
- with conn.cursor() as cur:
- cur.execute(
- "SELECT DATABASE() AS db_name, CURRENT_USER() AS current_user_name, "
- "@@version AS mysql_version"
- )
- meta = cur.fetchone()
- result.update(
- {
- "connect": "ok",
- "current_user": meta["current_user_name"],
- "mysql_version": meta["mysql_version"],
- }
- )
- cur.execute("SHOW GRANTS FOR CURRENT_USER()")
- grants = [next(iter(row.values())) for row in cur.fetchall()]
- result["grants"] = grants
- result["has_runtime_privileges"] = _has_required_privileges(grants, cfg["database"])
- result["has_create_privilege"] = _has_create_privilege(grants, cfg["database"])
- cur.execute("SHOW TABLES LIKE 'content_agent_%'")
- found_tables = sorted(next(iter(row.values())) for row in cur.fetchall())
- missing_tables = sorted(set(EXPECTED_TABLES) - set(found_tables))
- extra_tables = sorted(set(found_tables) - set(EXPECTED_TABLES))
- result.update(
- {
- "found_table_count": len(found_tables),
- "found_tables": found_tables,
- "missing_tables": missing_tables,
- "extra_content_agent_tables": extra_tables,
- }
- )
- cur.execute(
- "SELECT COUNT(*) AS cnt "
- "FROM `content-deconstruction-supply`.demand_content "
- "WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) LIKE %s",
- (args.run_label_like,),
- )
- result["demand_batch_count"] = cur.fetchone()["cnt"]
- column_status = {}
- index_status = {}
- for table in found_tables:
- cur.execute(f"SHOW COLUMNS FROM `{table}`")
- columns = {row["Field"] for row in cur.fetchall()}
- required_columns = REQUIRED_COLUMNS_BY_TABLE.get(table, set())
- column_status[table] = {
- "has_id": "id" in columns,
- "has_run_id": "run_id" in columns,
- "has_created_at": "created_at" in columns,
- "has_schema_version": "schema_version" in columns,
- "has_policy_run_id": "policy_run_id" in columns,
- "missing_required_columns": sorted(required_columns - columns),
- }
- cur.execute(f"SHOW INDEX FROM `{table}`")
- unique_indexes: dict[str, set[str]] = {}
- for row in cur.fetchall():
- if row["Non_unique"] == 0:
- unique_indexes.setdefault(row["Key_name"], set()).add(row["Column_name"])
- required_unique_indexes = REQUIRED_UNIQUE_INDEXES_BY_TABLE.get(table, [])
- missing_required_unique_indexes = [
- sorted(required_unique)
- for required_unique in required_unique_indexes
- if not any(
- index_columns == required_unique
- for index_columns in unique_indexes.values()
- )
- ]
- index_status[table] = {
- "has_required_unique_index": not missing_required_unique_indexes,
- "has_required_unique_indexes": not missing_required_unique_indexes,
- "required_unique_indexes": [
- sorted(required_unique) for required_unique in required_unique_indexes
- ],
- "missing_required_unique_indexes": missing_required_unique_indexes,
- "unique_indexes": {
- name: sorted(columns) for name, columns in sorted(unique_indexes.items())
- },
- }
- result["column_status"] = column_status
- result["index_status"] = index_status
- if args.audit_run_id:
- result["audit_runs"] = _audit_query_failures(cur, args.audit_run_id)
- schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {}))
- result["schema_ready"] = schema_ready
- print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
- if args.allow_missing_tables:
- return 0
- audit_ready = all(
- audit.get("status") in {"pass", "skipped"}
- for audit in result.get("audit_runs", [])
- )
- if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready or not audit_ready:
- return 1
- return 0
- def _schema_ready(
- column_status: dict[str, dict[str, Any]],
- index_status: dict[str, dict[str, Any]],
- ) -> bool:
- for table in EXPECTED_TABLES:
- status = column_status.get(table)
- if not status or status.get("missing_required_columns"):
- return False
- index = index_status.get(table)
- if not index or not index.get("has_required_unique_indexes"):
- return False
- return True
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Validate Content Agent cloud MySQL schema readiness.")
- parser.add_argument("--env-file", default=".env", help="Primary env file. Defaults to project .env.")
- parser.add_argument("--host", default=None)
- parser.add_argument("--port", default=None)
- parser.add_argument("--database", default=None)
- parser.add_argument("--user", default=None)
- parser.add_argument("--password", default=None)
- parser.add_argument("--timeout", type=int, default=8)
- parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%")
- parser.add_argument(
- "--audit-run-id",
- action="append",
- default=[],
- help="Optional exact run_id to audit query failure closure. May be passed more than once.",
- )
- parser.add_argument(
- "--allow-missing-tables",
- action="store_true",
- help="Return success even when content_agent_* tables have not been created yet.",
- )
- return parser.parse_args()
- def _audit_query_failures(cur: Any, run_ids: list[str]) -> list[dict[str, Any]]:
- audits: list[dict[str, Any]] = []
- for run_id in run_ids:
- cur.execute(
- "SELECT `run_id`, `error_code`, `error_detail` "
- "FROM `content_agent_runs` WHERE `run_id` = %s",
- (run_id,),
- )
- run_row = cur.fetchone()
- if not run_row:
- audits.append({"run_id": run_id, "status": "fail", "findings": ["run_not_found"]})
- continue
- error_detail = _decode_json(run_row.get("error_detail")) or {}
- query_failures = error_detail.get("query_failures")
- if not isinstance(query_failures, list) or not query_failures:
- audits.append({"run_id": run_id, "status": "skipped", "reason": "no_query_failures"})
- continue
- failure_ids = [
- str(failure.get("search_query_id"))
- for failure in query_failures
- if isinstance(failure, dict) and failure.get("search_query_id")
- ]
- findings: list[str] = []
- if not failure_ids:
- findings.append("query_failures_missing_search_query_id")
- query_rows = _fetch_rows_for_ids(
- cur,
- "content_agent_queries",
- run_id,
- failure_ids,
- "search_query_id, search_query_effect_status, raw_payload",
- )
- query_by_id = {row["search_query_id"]: row for row in query_rows}
- for query_id in failure_ids:
- row = query_by_id.get(query_id)
- if not row:
- findings.append(f"query_missing:{query_id}")
- continue
- if row.get("search_query_effect_status") != "failed":
- findings.append(f"query_not_failed:{query_id}")
- raw_payload = _decode_json(row.get("raw_payload")) or {}
- if not isinstance(raw_payload.get("query_failure"), dict):
- findings.append(f"query_failure_payload_missing:{query_id}")
- clue_rows = _fetch_rows_for_ids(
- cur,
- "content_agent_search_clues",
- run_id,
- failure_ids,
- "search_query_id, search_query_effect_status, raw_payload",
- )
- clue_by_id = {row["search_query_id"]: row for row in clue_rows}
- for query_id in failure_ids:
- row = clue_by_id.get(query_id)
- if not row:
- findings.append(f"search_clue_missing:{query_id}")
- continue
- if row.get("search_query_effect_status") != "failed":
- findings.append(f"search_clue_not_failed:{query_id}")
- event_refs = [f"search_queries.jsonl:{query_id}" for query_id in failure_ids]
- event_rows = _fetch_rows_for_values(
- cur,
- "content_agent_run_events",
- run_id,
- event_refs,
- "input_ref",
- "input_ref, event_type, status",
- extra_where="AND event_type = 'platform_query_failed'",
- )
- event_by_ref = {row["input_ref"]: row for row in event_rows}
- for event_ref in event_refs:
- row = event_by_ref.get(event_ref)
- if not row:
- findings.append(f"platform_query_failed_event_missing:{event_ref}")
- continue
- if row.get("status") != "failed":
- findings.append(f"platform_query_failed_event_status:{event_ref}")
- audits.append(
- {
- "run_id": run_id,
- "query_failure_count": len(failure_ids),
- "status": "fail" if findings else "pass",
- "findings": findings,
- }
- )
- return audits
- def _fetch_rows_for_ids(
- cur: Any,
- table: str,
- run_id: str,
- ids: list[str],
- columns: str,
- ) -> list[dict[str, Any]]:
- return _fetch_rows_for_values(cur, table, run_id, ids, "search_query_id", columns)
- def _fetch_rows_for_values(
- cur: Any,
- table: str,
- run_id: str,
- values: list[str],
- column: str,
- columns: str,
- *,
- extra_where: str = "",
- ) -> list[dict[str, Any]]:
- if not values:
- return []
- placeholders = ", ".join(["%s"] * len(values))
- cur.execute(
- f"SELECT {columns} FROM `{table}` "
- f"WHERE `run_id` = %s AND `{column}` IN ({placeholders}) {extra_where}",
- [run_id, *values],
- )
- return list(cur.fetchall())
- def _decode_json(value: Any) -> Any:
- if value is None or isinstance(value, (dict, list)):
- return value
- return json.loads(value)
- def _load_env_file(path_value: str | None) -> dict[str, str]:
- if not path_value:
- return {}
- path = Path(path_value)
- if not path.exists():
- return {}
- result: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- result[key.strip()] = value.strip().strip('"').strip("'")
- return result
- def _env_value(env: dict[str, str], *keys: str, default: str | None = None) -> str | None:
- for key in keys:
- value = env.get(key)
- if value:
- return value
- return default
- def _has_required_privileges(grants: list[str], database: str) -> bool:
- scoped_grants = _content_agent_scoped_grants(grants, database)
- normalized = " ".join(scoped_grants).upper()
- return all(privilege in normalized for privilege in REQUIRED_PRIVILEGES) or "ALL PRIVILEGES" in normalized
- def _has_create_privilege(grants: list[str], database: str) -> bool:
- scoped_grants = _database_or_global_grants(grants, database)
- normalized = " ".join(scoped_grants).upper()
- return "CREATE" in normalized or "ALL PRIVILEGES" in normalized
- def _content_agent_scoped_grants(grants: list[str], database: str) -> list[str]:
- scoped: list[str] = []
- db_scope = f"ON `{database}`.*"
- for grant in grants:
- if "ON *.*" in grant or db_scope in grant:
- scoped.append(grant)
- continue
- if f"ON `{database}`.`content_agent_" in grant:
- scoped.append(grant)
- return scoped
- def _database_or_global_grants(grants: list[str], database: str) -> list[str]:
- db_scope = f"ON `{database}`.*"
- return [grant for grant in grants if "ON *.*" in grant or db_scope in grant]
- if __name__ == "__main__":
- raise SystemExit(main())
|