| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939 |
- from __future__ import annotations
- import json
- import os
- from dataclasses import dataclass
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any, Callable
- import pymysql
- from content_agent.constants import DB_SCHEMA_VERSION
- ConnectionFactory = Callable[[], Any]
- FORBIDDEN_RAW_PAYLOAD_KEYS = {
- "password",
- "token",
- "access_token",
- "refresh_token",
- "api_key",
- "apikey",
- "secret",
- "dsn",
- "authorization",
- "cookie",
- "session",
- "credential",
- }
- RUNTIME_FILE_TABLES = {
- "source_context.json": "content_agent_source_contexts",
- "pattern_seed_pack.json": "content_agent_pattern_seed_packs",
- "search_queries.jsonl": "content_agent_queries",
- "discovered_content_items.jsonl": "content_agent_discovered_content_items",
- "content_media_records.jsonl": "content_agent_content_media_records",
- "pattern_recall_evidence.jsonl": "content_agent_pattern_recall_evidence",
- "rule_decisions.jsonl": "content_agent_rule_decisions",
- "walk_actions.jsonl": "content_agent_walk_actions",
- "run_events.jsonl": "content_agent_run_events",
- "source_path_records.jsonl": "content_agent_source_path_records",
- "search_clues.jsonl": "content_agent_search_clues",
- "final_output.json": "content_agent_final_outputs",
- "strategy_review.json": "content_agent_strategy_reviews",
- }
- JSON_COLUMNS_BY_TABLE = {
- "content_agent_source_contexts": {"evidence_pack", "source_context", "raw_demand_content"},
- "content_agent_pattern_seed_packs": {"itemset_ids", "seed_terms", "pattern_seed_pack"},
- "content_agent_queries": {"pattern_seed_ref", "raw_payload"},
- "content_agent_discovered_content_items": {
- "statistics",
- "tags",
- "text_extra",
- "source_evidence",
- "pattern_match_result",
- "content_audience_profile",
- "platform_raw_payload",
- "raw_payload",
- },
- "content_agent_content_media_records": {"raw_payload"},
- "content_agent_pattern_recall_evidence": {
- "matched_terms",
- "matched_category_paths",
- "decode_elements",
- "match_paths_request",
- "match_paths_response",
- "evidence_summary",
- "raw_payload",
- },
- "content_agent_rule_decisions": {
- "triggered_blocking_rules",
- "scorecard",
- "source_evidence",
- "decision_replay_data",
- "raw_payload",
- },
- "content_agent_walk_actions": {"raw_payload"},
- "content_agent_source_path_records": {"raw_payload"},
- "content_agent_search_clues": {"raw_payload"},
- "content_agent_run_events": {"raw_payload"},
- "content_agent_final_outputs": {"summary", "final_output"},
- "content_agent_publish_jobs": {"request_payload", "response_payload"},
- "content_agent_author_assets": {
- "content_tags",
- "profile_snapshot",
- "evidence_refs",
- "raw_payload",
- },
- "content_agent_author_asset_roles": {"raw_payload"},
- "content_agent_strategy_reviews": {
- "summary",
- "effective_search_queries",
- "weak_search_queries",
- "top_reject_reasons",
- "productive_paths",
- "suggestions",
- "raw_payload",
- },
- "content_agent_runs": {"source_ref", "error_detail"},
- "content_agent_policy_runs": {
- "strategy_source_ref",
- "rule_pack_source_ref",
- "metrics",
- "decision_summary",
- "raw_payload",
- },
- }
- DATETIME_COLUMNS = {"started_at", "completed_at", "created_at", "updated_at"}
- JSON_FILE_PAYLOAD_COLUMNS = {
- "source_context.json": "source_context",
- "pattern_seed_pack.json": "pattern_seed_pack",
- "final_output.json": "final_output",
- "strategy_review.json": "raw_payload",
- }
- @dataclass(frozen=True)
- class ContentSupplyDbConfig:
- host: str
- port: int
- user: str
- password: str
- database: str
- timeout: int = 8
- @classmethod
- def from_env(
- cls,
- env_file: str | Path | None = ".env",
- ) -> "ContentSupplyDbConfig":
- env: dict[str, str] = _load_env_file(env_file)
- env.update({
- key: os.environ[key]
- for key in [
- "CONTENT_SUPPLY_DB_HOST",
- "CONTENT_SUPPLY_DB_PORT",
- "CONTENT_SUPPLY_DB_NAME",
- "CONTENT_SUPPLY_DB_USER",
- "CONTENT_SUPPLY_DB_PASSWORD",
- ]
- if os.environ.get(key)
- })
- required_keys = [
- "CONTENT_SUPPLY_DB_HOST",
- "CONTENT_SUPPLY_DB_PORT",
- "CONTENT_SUPPLY_DB_NAME",
- "CONTENT_SUPPLY_DB_USER",
- "CONTENT_SUPPLY_DB_PASSWORD",
- ]
- missing_keys = [key for key in required_keys if not env.get(key)]
- if missing_keys:
- raise ValueError(f"Missing required CFA db env keys: {', '.join(missing_keys)}")
- return cls(
- host=env["CONTENT_SUPPLY_DB_HOST"],
- port=int(env["CONTENT_SUPPLY_DB_PORT"]),
- user=env["CONTENT_SUPPLY_DB_USER"],
- password=env["CONTENT_SUPPLY_DB_PASSWORD"],
- database=env["CONTENT_SUPPLY_DB_NAME"],
- )
- def connect(self) -> Any:
- return pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset="utf8mb4",
- cursorclass=pymysql.cursors.DictCursor,
- connect_timeout=self.timeout,
- read_timeout=self.timeout,
- write_timeout=self.timeout,
- )
- class DatabaseRuntimeStore:
- def __init__(
- self,
- config: ContentSupplyDbConfig,
- connection_factory: ConnectionFactory | None = None,
- ) -> None:
- self.config = config
- self._connection_factory = connection_factory or config.connect
- def prepare_run(self, run_id: str) -> Path:
- if self._run_has_runtime_records(run_id):
- raise FileExistsError(f"run already exists in database runtime tables: {run_id}")
- return self.run_dir(run_id)
- def run_dir(self, run_id: str) -> Path:
- return Path("database_runtime") / run_id
- def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- table, record = _record_for_json(filename, data)
- if record["run_id"] != run_id:
- raise ValueError(f"{filename} run_id does not match runtime run_id")
- self._insert(table, record)
- return self.run_dir(run_id) / filename
- def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- table, record = _record_for_json(filename, data)
- if record["run_id"] != run_id:
- raise ValueError(f"{filename} run_id does not match runtime run_id")
- if filename == "final_output.json":
- self._upsert(
- table,
- record,
- key_columns=("run_id", "policy_run_id", "output_version"),
- )
- return self.run_dir(run_id) / filename
- self._insert(table, record)
- return self.run_dir(run_id) / filename
- def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
- table = _table_for_runtime_file(filename)
- for row in rows:
- if row.get("run_id") != run_id:
- raise ValueError(f"{filename} row run_id does not match runtime run_id")
- record = _record_for_jsonl(filename, row)
- if filename == "pattern_recall_evidence.jsonl":
- self._upsert(
- table,
- record,
- key_columns=("run_id", "policy_run_id", "recall_evidence_id"),
- )
- else:
- self._insert(table, record)
- return self.run_dir(run_id) / filename
- def read_json(self, run_id: str, filename: str) -> dict[str, Any]:
- table = _table_for_runtime_file(filename)
- payload_column = JSON_FILE_PAYLOAD_COLUMNS.get(filename)
- if not payload_column:
- raise ValueError(f"{filename} is not a JSON runtime file")
- row = self._fetch_one(
- f"SELECT `{payload_column}` FROM `{table}` WHERE `run_id` = %s ORDER BY `id` DESC LIMIT 1",
- (run_id,),
- )
- if not row:
- raise FileNotFoundError(f"{filename} not found for run: {run_id}")
- payload = _decode_json_payload(row[payload_column]) or {}
- if payload_column == "raw_payload":
- return _runtime_payload(payload)
- return payload
- def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]:
- table = _table_for_runtime_file(filename)
- rows = self._fetch_all(
- f"SELECT `raw_payload` FROM `{table}` WHERE `run_id` = %s ORDER BY `id`",
- (run_id,),
- )
- return [_runtime_payload(_decode_json_payload(row["raw_payload"]) or {}) for row in rows]
- def file_status(self, run_id: str) -> dict[str, bool]:
- return {
- filename: self._runtime_file_exists(run_id, filename)
- for filename in RUNTIME_FILE_TABLES
- }
- def create_run_record(self, record: dict[str, Any]) -> None:
- self._insert("content_agent_runs", _run_record(record))
- def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None:
- if not updates:
- return
- record = _sanitize_record("content_agent_runs", updates)
- if not record:
- return
- assignments = ", ".join(f"`{column}` = %s" for column in record)
- params = [
- _db_value("content_agent_runs", column, value)
- for column, value in record.items()
- ]
- params.append(run_id)
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(
- f"UPDATE `content_agent_runs` SET {assignments} WHERE `run_id` = %s",
- params,
- )
- conn.commit()
- def record_policy_run(self, record: dict[str, Any]) -> None:
- self._insert("content_agent_policy_runs", _policy_run_record(record))
- def append_run_event_records(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- prepared_rows = [
- {**row, "run_id": run_id, "policy_run_id": row.get("policy_run_id", policy_run_id)}
- for row in rows
- ]
- self.append_jsonl(run_id, "run_events.jsonl", prepared_rows)
- def write_publish_jobs(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- for row in rows:
- record = {
- **row,
- "run_id": run_id,
- "policy_run_id": row.get("policy_run_id", policy_run_id),
- }
- self._upsert(
- "content_agent_publish_jobs",
- _with_db_schema(record),
- key_columns=("run_id", "policy_run_id", "publish_job_id"),
- )
- def write_author_assets(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_author_assets",
- _with_db_schema(row),
- key_columns=("author_asset_id",),
- )
- def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_author_asset_roles",
- _with_db_schema(row),
- key_columns=("author_asset_id", "role"),
- )
- def _insert(self, table: str, record: dict[str, Any]) -> None:
- sanitized = _sanitize_record(table, record)
- columns = list(sanitized)
- placeholders = ", ".join(["%s"] * len(columns))
- column_sql = ", ".join(f"`{column}`" for column in columns)
- values = [
- _db_value(table, column, sanitized[column])
- for column in columns
- ]
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(
- f"INSERT INTO `{table}` ({column_sql}) VALUES ({placeholders})",
- values,
- )
- conn.commit()
- def _upsert(
- self,
- table: str,
- record: dict[str, Any],
- key_columns: tuple[str, ...],
- ) -> None:
- sanitized = _sanitize_record(table, record)
- columns = list(sanitized)
- placeholders = ", ".join(["%s"] * len(columns))
- column_sql = ", ".join(f"`{column}`" for column in columns)
- update_columns = [column for column in columns if column not in key_columns]
- assignments = ", ".join(f"`{column}` = VALUES(`{column}`)" for column in update_columns)
- values = [
- _db_value(table, column, sanitized[column])
- for column in columns
- ]
- sql = f"INSERT INTO `{table}` ({column_sql}) VALUES ({placeholders})"
- if assignments:
- sql += f" ON DUPLICATE KEY UPDATE {assignments}"
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, values)
- conn.commit()
- def _fetch_one(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return cur.fetchone()
- def _fetch_all(self, sql: str, params: tuple[Any, ...]) -> list[dict[str, Any]]:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return list(cur.fetchall())
- def _run_has_runtime_records(self, run_id: str) -> bool:
- tables = [
- "content_agent_runs",
- "content_agent_policy_runs",
- *RUNTIME_FILE_TABLES.values(),
- ]
- for table in tables:
- row = self._fetch_one(
- f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
- (run_id,),
- )
- if row and row.get("cnt", 0):
- return True
- return False
- def _runtime_file_exists(self, run_id: str, filename: str) -> bool:
- table = _table_for_runtime_file(filename)
- row = self._fetch_one(
- f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
- (run_id,),
- )
- return bool(row and row.get("cnt", 0))
- def _record_for_json(filename: str, data: dict[str, Any]) -> tuple[str, dict[str, Any]]:
- table = _table_for_runtime_file(filename)
- if filename == "source_context.json":
- evidence_pack = data.get("ext_data", {}).get("evidence_pack") or {}
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "demand_content_id": _int_or_none(data.get("demand_content_id")),
- "pattern_source_system": evidence_pack.get("pattern_source_system"),
- "source_kind": evidence_pack.get("source_kind"),
- "source_post_id": evidence_pack.get("source_post_id"),
- "pattern_execution_id": evidence_pack.get("pattern_execution_id"),
- "mining_config_id": evidence_pack.get("mining_config_id"),
- "evidence_pack": evidence_pack,
- "source_context": data,
- "raw_demand_content": data.get("raw_demand_content"),
- }
- if filename == "pattern_seed_pack.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "source_post_id": data.get("source_post_id"),
- "pattern_execution_id": data.get("pattern_execution_id"),
- "itemset_ids": _itemset_ids_from_seed_pack(data),
- "seed_terms": data.get("seed_terms"),
- "pattern_seed_pack": data,
- }
- if filename == "final_output.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "output_version": data.get("output_version", "v1"),
- "summary": data.get("summary"),
- "final_output": data,
- "validation_status": data.get("validation_status"),
- }
- if filename == "strategy_review.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "review_id": data["review_id"],
- "review_status": data.get("review_status", "generated"),
- "summary": data.get("summary"),
- "effective_search_queries": data.get("effective_search_queries"),
- "weak_search_queries": data.get("weak_search_queries"),
- "top_reject_reasons": data.get("top_reject_reasons"),
- "productive_paths": data.get("productive_paths"),
- "suggestions": data.get("suggestions"),
- "raw_payload": data.get("raw_payload", data),
- }
- raise ValueError(f"unsupported JSON runtime file: {filename}")
- def _record_for_jsonl(filename: str, row: dict[str, Any]) -> dict[str, Any]:
- if filename == "search_queries.jsonl":
- return _with_db_schema(row)
- if filename == "discovered_content_items.jsonl":
- return _with_db_schema(row)
- if filename == "content_media_records.jsonl":
- return _with_db_schema(row)
- if filename == "pattern_recall_evidence.jsonl":
- return _with_db_schema(row)
- if filename == "rule_decisions.jsonl":
- return _with_db_schema(row)
- if filename == "walk_actions.jsonl":
- return _with_db_schema(row)
- if filename == "run_events.jsonl":
- return _with_db_schema(row)
- if filename == "source_path_records.jsonl":
- return _with_db_schema(row)
- if filename == "search_clues.jsonl":
- return _with_db_schema(row)
- raise ValueError(f"unsupported JSONL runtime file: {filename}")
- def _run_record(record: dict[str, Any]) -> dict[str, Any]:
- return _with_db_schema(record)
- def _policy_run_record(record: dict[str, Any]) -> dict[str, Any]:
- return _with_db_schema(record)
- def _with_db_schema(record: dict[str, Any]) -> dict[str, Any]:
- return {**record, "schema_version": DB_SCHEMA_VERSION}
- def _sanitize_record(table: str, record: dict[str, Any]) -> dict[str, Any]:
- result = {
- column: value
- for column, value in record.items()
- if column in _allowed_columns(table) and value is not None
- }
- raw_payload = result.get("raw_payload")
- if isinstance(raw_payload, dict):
- _assert_no_forbidden_raw_payload_keys(raw_payload)
- return result
- def _allowed_columns(table: str) -> set[str]:
- return TABLE_COLUMNS[table]
- TABLE_COLUMNS = {
- "content_agent_source_contexts": {
- "schema_version",
- "run_id",
- "demand_content_id",
- "pattern_source_system",
- "source_kind",
- "source_post_id",
- "pattern_execution_id",
- "mining_config_id",
- "evidence_pack",
- "source_context",
- "raw_demand_content",
- "created_at",
- },
- "content_agent_pattern_seed_packs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "source_post_id",
- "pattern_execution_id",
- "itemset_ids",
- "seed_terms",
- "pattern_seed_pack",
- "created_at",
- },
- "content_agent_queries": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "search_query_id",
- "search_query",
- "search_query_generation_method",
- "discovery_start_source",
- "previous_discovery_step",
- "search_query_effect_status",
- "pattern_seed_ref",
- "raw_payload",
- "created_at",
- },
- "content_agent_discovered_content_items": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "content_discovery_id",
- "search_query_id",
- "platform",
- "platform_content_id",
- "platform_content_format",
- "platform_content_url",
- "description",
- "platform_author_id",
- "author_display_name",
- "discovery_start_source",
- "previous_discovery_step",
- "statistics",
- "tags",
- "text_extra",
- "source_evidence",
- "pattern_match_result",
- "content_audience_profile",
- "platform_raw_payload",
- "raw_payload",
- "created_at",
- },
- "content_agent_content_media_records": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "platform",
- "platform_content_id",
- "content_media_status",
- "content_metadata_source",
- "play_url",
- "local_path",
- "oss_url",
- "raw_payload",
- "created_at",
- },
- "content_agent_pattern_recall_evidence": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "recall_evidence_id",
- "content_discovery_id",
- "platform_content_id",
- "decode_status",
- "decode_task_id",
- "recall_status",
- "matched_terms",
- "matched_category_paths",
- "decode_elements",
- "match_paths_request",
- "match_paths_response",
- "evidence_summary",
- "raw_payload",
- "created_at",
- },
- "content_agent_rule_decisions": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "decision_id",
- "policy_bundle_id",
- "rule_pack_id",
- "rule_pack_version",
- "strategy_version",
- "decision_target_type",
- "decision_target_id",
- "decision_action",
- "decision_reason_code",
- "search_query_effect_status",
- "score",
- "age_50_plus_level",
- "triggered_blocking_rules",
- "scorecard",
- "source_evidence",
- "decision_replay_data",
- "raw_payload",
- "created_at",
- },
- "content_agent_walk_actions": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "walk_action_id",
- "edge_id",
- "edge_type",
- "from_node_type",
- "from_node_id",
- "to_node_type",
- "to_node_id",
- "walk_action",
- "walk_status",
- "budget_tier",
- "depth",
- "page_cursor",
- "next_cursor",
- "decision_id",
- "rule_pack_id",
- "rule_pack_version",
- "reason_code",
- "source_path_record_id",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_source_path_records": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "source_path_record_id",
- "source_path_type",
- "from_node_type",
- "from_node_id",
- "to_node_type",
- "to_node_id",
- "decision_id",
- "rule_pack_id",
- "discovery_start_source",
- "previous_discovery_step",
- "origin_path_id",
- "source_evidence_ref",
- "raw_payload",
- "created_at",
- },
- "content_agent_search_clues": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "clue_id",
- "search_query_id",
- "search_query",
- "discovery_start_source",
- "previous_discovery_step",
- "result_count",
- "pooled_content_count",
- "review_content_count",
- "pending_content_count",
- "rejected_content_count",
- "search_query_effect_status",
- "walk_next_step",
- "raw_payload",
- "created_at",
- },
- "content_agent_run_events": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "event_id",
- "event_type",
- "status",
- "input_ref",
- "output_ref",
- "error_code",
- "message",
- "raw_payload",
- "created_at",
- },
- "content_agent_final_outputs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "output_version",
- "summary",
- "final_output",
- "validation_status",
- "created_at",
- "updated_at",
- },
- "content_agent_publish_jobs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "publish_job_id",
- "platform_content_id",
- "job_status",
- "trigger_mode",
- "crawler_plan_id",
- "produce_plan_id",
- "publish_plan_id",
- "request_payload",
- "response_payload",
- "error_code",
- "error_message",
- "created_at",
- "updated_at",
- },
- "content_agent_author_assets": {
- "schema_version",
- "author_asset_id",
- "platform",
- "platform_author_id",
- "author_display_name",
- "author_profile_url",
- "asset_status",
- "source_type",
- "validation_status",
- "eligible_as_source",
- "elderly_ratio",
- "elderly_tgi",
- "content_tags",
- "source_run_id",
- "source_policy_run_id",
- "last_profile_fetch_at",
- "last_works_fetch_at",
- "last_validated_at",
- "profile_snapshot",
- "evidence_refs",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_author_asset_roles": {
- "schema_version",
- "author_asset_id",
- "role",
- "role_status",
- "role_reason_code",
- "assigned_by",
- "source_run_id",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_strategy_reviews": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "review_id",
- "review_status",
- "summary",
- "effective_search_queries",
- "weak_search_queries",
- "top_reject_reasons",
- "productive_paths",
- "suggestions",
- "raw_payload",
- "created_at",
- },
- "content_agent_runs": {
- "schema_version",
- "run_id",
- "demand_content_id",
- "run_label",
- "platform",
- "platform_mode",
- "strategy_version",
- "status",
- "current_step",
- "validation_status",
- "source_ref",
- "error_code",
- "error_message",
- "error_detail",
- "started_at",
- "completed_at",
- "created_at",
- "updated_at",
- },
- "content_agent_policy_runs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "experiment_name",
- "run_role",
- "policy_bundle_id",
- "rule_pack_id",
- "strategy_id",
- "strategy_version",
- "rule_pack_version",
- "walk_strategy_version",
- "policy_bundle_hash",
- "strategy_source_ref",
- "rule_pack_source_ref",
- "evidence_bundle_schema_version",
- "runtime_record_schema_version",
- "status",
- "metrics",
- "decision_summary",
- "raw_payload",
- "created_at",
- },
- }
- def _table_for_runtime_file(filename: str) -> str:
- try:
- return RUNTIME_FILE_TABLES[filename]
- except KeyError as exc:
- raise ValueError(f"unsupported runtime file: {filename}") from exc
- def _db_value(table: str, column: str, value: Any) -> Any:
- if column in JSON_COLUMNS_BY_TABLE.get(table, set()):
- return _json_dump(value)
- if column in DATETIME_COLUMNS:
- return _datetime_value(value)
- return value
- def _json_dump(value: Any) -> str | None:
- if value is None:
- return None
- return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
- def _decode_json_payload(value: Any) -> Any:
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- return json.loads(value)
- def _runtime_payload(payload: dict[str, Any]) -> dict[str, Any]:
- if "raw_payload" in payload:
- return payload
- return {**payload, "raw_payload": dict(payload)}
- def _datetime_value(value: Any) -> Any:
- if value is None or isinstance(value, datetime):
- return value
- if isinstance(value, str):
- try:
- parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
- except ValueError:
- return value
- if parsed.tzinfo:
- parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None)
- return parsed
- return value
- def _itemset_ids_from_seed_pack(data: dict[str, Any]) -> list[Any]:
- if data.get("itemset_ids"):
- return list(data["itemset_ids"])
- itemsets = data.get("itemsets") or []
- ids = [
- itemset.get("itemset_id")
- for itemset in itemsets
- if isinstance(itemset, dict) and itemset.get("itemset_id") is not None
- ]
- return list(dict.fromkeys(ids))
- def _int_or_none(value: Any) -> int | None:
- if value in (None, ""):
- return None
- return int(value)
- def _assert_no_forbidden_raw_payload_keys(payload: Any) -> None:
- if isinstance(payload, list):
- for item in payload:
- _assert_no_forbidden_raw_payload_keys(item)
- return
- if not isinstance(payload, dict):
- return
- for key, value in payload.items():
- lowered = str(key).lower()
- if lowered in FORBIDDEN_RAW_PAYLOAD_KEYS:
- raise ValueError(f"raw_payload contains forbidden key: {key}")
- _assert_no_forbidden_raw_payload_keys(value)
- def _load_env_file(path_value: str | Path | None) -> dict[str, str]:
- if not path_value:
- return {}
- path = Path(path_value)
- if not path.exists():
- return {}
- result: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- result[key.strip()] = value.strip().strip('"').strip("'")
- return result
|