from __future__ import annotations import json import os from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable import pymysql from content_agent.constants import DB_SCHEMA_VERSION ConnectionFactory = Callable[[], Any] FORBIDDEN_RAW_PAYLOAD_KEYS = { "password", "token", "access_token", "refresh_token", "api_key", "apikey", "secret", "dsn", "authorization", "cookie", "session", "credential", } RUNTIME_FILE_TABLES = { "source_context.json": "content_agent_source_contexts", "pattern_seed_pack.json": "content_agent_pattern_seed_packs", "search_queries.jsonl": "content_agent_queries", "discovered_content_items.jsonl": "content_agent_discovered_content_items", "content_media_records.jsonl": "content_agent_content_media_records", "pattern_recall_evidence.jsonl": "content_agent_pattern_recall_evidence", "rule_decisions.jsonl": "content_agent_rule_decisions", "walk_actions.jsonl": "content_agent_walk_actions", "run_events.jsonl": "content_agent_run_events", "source_path_records.jsonl": "content_agent_source_path_records", "search_clues.jsonl": "content_agent_search_clues", "final_output.json": "content_agent_final_outputs", "strategy_review.json": "content_agent_strategy_reviews", } JSON_COLUMNS_BY_TABLE = { "content_agent_source_contexts": {"evidence_pack", "source_context", "raw_demand_content"}, "content_agent_pattern_seed_packs": {"itemset_ids", "seed_terms", "pattern_seed_pack"}, "content_agent_queries": {"pattern_seed_ref", "raw_payload"}, "content_agent_discovered_content_items": { "statistics", "tags", "text_extra", "source_evidence", "pattern_match_result", "platform_raw_payload", "raw_payload", }, "content_agent_content_media_records": {"raw_payload"}, "content_agent_pattern_recall_evidence": { "evidence_summary", "raw_payload", }, "content_agent_rule_decisions": { "triggered_blocking_rules", "scorecard", "source_evidence", "decision_replay_data", "raw_payload", }, "content_agent_walk_actions": {"raw_payload"}, "content_agent_source_path_records": {"raw_payload"}, "content_agent_search_clues": {"raw_payload"}, "content_agent_run_events": {"raw_payload"}, "content_agent_final_outputs": {"summary", "final_output"}, "content_agent_publish_jobs": {"request_payload", "response_payload"}, "content_agent_author_assets": { "content_tags", "profile_snapshot", "evidence_refs", "raw_payload", }, "content_agent_author_asset_roles": {"raw_payload"}, "content_agent_search_clue_assets": {"summary_metrics", "raw_payload"}, "content_agent_search_clue_asset_evidence": { "source_path_record_ids", "decision_ids", "performance_feedback_refs", "raw_payload", }, "content_agent_strategy_reviews": { "summary", "effective_search_queries", "weak_search_queries", "top_reject_reasons", "productive_paths", "suggestions", "raw_payload", }, "content_agent_performance_feedback": {"raw_payload"}, "content_agent_runs": {"source_ref", "error_detail"}, "content_agent_policy_runs": { "strategy_source_ref", "rule_pack_source_ref", "metrics", "decision_summary", "raw_payload", }, } DATETIME_COLUMNS = {"started_at", "completed_at", "created_at", "updated_at"} JSON_FILE_PAYLOAD_COLUMNS = { "source_context.json": "source_context", "pattern_seed_pack.json": "pattern_seed_pack", "final_output.json": "final_output", "strategy_review.json": "raw_payload", } JSONL_UPSERT_KEYS = { "search_queries.jsonl": ("run_id", "policy_run_id", "search_query_id"), "pattern_recall_evidence.jsonl": ("run_id", "policy_run_id", "recall_evidence_id"), "search_clues.jsonl": ("run_id", "policy_run_id", "clue_id"), "run_events.jsonl": ("run_id", "policy_run_id", "event_id"), } @dataclass(frozen=True) class ContentSupplyDbConfig: host: str port: int user: str password: str database: str timeout: int = 8 @classmethod def from_env( cls, env_file: str | Path | None = ".env", ) -> "ContentSupplyDbConfig": env: dict[str, str] = _load_env_file(env_file) env.update({ key: os.environ[key] for key in [ "CONTENT_SUPPLY_DB_HOST", "CONTENT_SUPPLY_DB_PORT", "CONTENT_SUPPLY_DB_NAME", "CONTENT_SUPPLY_DB_USER", "CONTENT_SUPPLY_DB_PASSWORD", ] if os.environ.get(key) }) required_keys = [ "CONTENT_SUPPLY_DB_HOST", "CONTENT_SUPPLY_DB_PORT", "CONTENT_SUPPLY_DB_NAME", "CONTENT_SUPPLY_DB_USER", "CONTENT_SUPPLY_DB_PASSWORD", ] missing_keys = [key for key in required_keys if not env.get(key)] if missing_keys: raise ValueError(f"Missing required CFA db env keys: {', '.join(missing_keys)}") return cls( host=env["CONTENT_SUPPLY_DB_HOST"], port=int(env["CONTENT_SUPPLY_DB_PORT"]), user=env["CONTENT_SUPPLY_DB_USER"], password=env["CONTENT_SUPPLY_DB_PASSWORD"], database=env["CONTENT_SUPPLY_DB_NAME"], ) def connect(self) -> Any: return pymysql.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, connect_timeout=self.timeout, read_timeout=self.timeout, write_timeout=self.timeout, ) class DatabaseRuntimeStore: def __init__( self, config: ContentSupplyDbConfig, connection_factory: ConnectionFactory | None = None, ) -> None: self.config = config self._connection_factory = connection_factory or config.connect def prepare_run(self, run_id: str) -> Path: if self._run_has_runtime_records(run_id): raise FileExistsError(f"run already exists in database runtime tables: {run_id}") return self.run_dir(run_id) def run_dir(self, run_id: str) -> Path: return Path("database_runtime") / run_id def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path: table, record = _record_for_json(filename, data) if record["run_id"] != run_id: raise ValueError(f"{filename} run_id does not match runtime run_id") self._insert(table, record) return self.run_dir(run_id) / filename def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path: table, record = _record_for_json(filename, data) if record["run_id"] != run_id: raise ValueError(f"{filename} run_id does not match runtime run_id") if filename == "final_output.json": self._upsert( table, record, key_columns=("run_id", "policy_run_id", "output_version"), ) return self.run_dir(run_id) / filename self._insert(table, record) return self.run_dir(run_id) / filename def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path: table = _table_for_runtime_file(filename) # 整批共用一个连接、一次 commit:避免每行新建连接+commit 的 N 次网络往返。 statements: list[tuple[str, list[Any]]] = [] for row in rows: if row.get("run_id") != run_id: raise ValueError(f"{filename} row run_id does not match runtime run_id") record = _record_for_jsonl(filename, row) statements.append( self._row_sql(table, record, key_columns=JSONL_UPSERT_KEYS.get(filename)) ) if statements: with self._connection_factory() as conn: with conn.cursor() as cur: for sql, values in statements: cur.execute(sql, values) conn.commit() return self.run_dir(run_id) / filename def read_json(self, run_id: str, filename: str) -> dict[str, Any]: table = _table_for_runtime_file(filename) payload_column = JSON_FILE_PAYLOAD_COLUMNS.get(filename) if not payload_column: raise ValueError(f"{filename} is not a JSON runtime file") row = self._fetch_one( f"SELECT `{payload_column}` FROM `{table}` WHERE `run_id` = %s ORDER BY `id` DESC LIMIT 1", (run_id,), ) if not row: raise FileNotFoundError(f"{filename} not found for run: {run_id}") payload = _decode_json_payload(row[payload_column]) or {} if payload_column == "raw_payload": return _runtime_payload(payload) return payload def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]: table = _table_for_runtime_file(filename) rows = self._fetch_all( f"SELECT `raw_payload` FROM `{table}` WHERE `run_id` = %s ORDER BY `id`", (run_id,), ) return [_runtime_payload(_decode_json_payload(row["raw_payload"]) or {}) for row in rows] def read_performance_feedback( self, run_id: str, policy_run_id: str, ) -> list[dict[str, Any]]: rows = self._fetch_all( "SELECT `raw_payload` FROM `content_agent_performance_feedback` " "WHERE `run_id` = %s AND `policy_run_id` = %s ORDER BY `id`", (run_id, policy_run_id), ) return [_runtime_payload(_decode_json_payload(row["raw_payload"]) or {}) for row in rows] def file_status(self, run_id: str) -> dict[str, bool]: return { filename: self._runtime_file_exists(run_id, filename) for filename in RUNTIME_FILE_TABLES } def create_run_record(self, record: dict[str, Any]) -> None: self._insert("content_agent_runs", _run_record(record)) def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None: if not updates: return record = _sanitize_record("content_agent_runs", updates) if not record: return assignments = ", ".join(f"`{column}` = %s" for column in record) params = [ _db_value("content_agent_runs", column, value) for column, value in record.items() ] params.append(run_id) with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute( f"UPDATE `content_agent_runs` SET {assignments} WHERE `run_id` = %s", params, ) conn.commit() def record_policy_run(self, record: dict[str, Any]) -> None: self._insert("content_agent_policy_runs", _policy_run_record(record)) def append_run_event_records( self, run_id: str, policy_run_id: str, rows: list[dict[str, Any]], ) -> None: prepared_rows = [ {**row, "run_id": run_id, "policy_run_id": row.get("policy_run_id", policy_run_id)} for row in rows ] self.append_jsonl(run_id, "run_events.jsonl", prepared_rows) def write_publish_jobs( self, run_id: str, policy_run_id: str, rows: list[dict[str, Any]], ) -> None: for row in rows: record = { **row, "run_id": run_id, "policy_run_id": row.get("policy_run_id", policy_run_id), } self._upsert( "content_agent_publish_jobs", _with_db_schema(record), key_columns=("run_id", "policy_run_id", "publish_job_id"), ) def write_author_assets(self, rows: list[dict[str, Any]]) -> None: for row in rows: self._upsert( "content_agent_author_assets", _with_db_schema(row), key_columns=("author_asset_id",), ) def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None: for row in rows: self._upsert( "content_agent_author_asset_roles", _with_db_schema(row), key_columns=("author_asset_id", "role"), ) def write_search_clue_assets(self, rows: list[dict[str, Any]]) -> None: for row in rows: self._upsert( "content_agent_search_clue_assets", _with_db_schema(row), key_columns=("search_clue_asset_id",), ) def write_search_clue_asset_evidence(self, rows: list[dict[str, Any]]) -> None: for row in rows: self._upsert( "content_agent_search_clue_asset_evidence", _with_db_schema(row), key_columns=("run_id", "policy_run_id", "clue_id"), ) def _row_sql( self, table: str, record: dict[str, Any], key_columns: tuple[str, ...] | None = None, ) -> tuple[str, list[Any]]: sanitized = _sanitize_record(table, record) columns = list(sanitized) placeholders = ", ".join(["%s"] * len(columns)) column_sql = ", ".join(f"`{column}`" for column in columns) values = [ _db_value(table, column, sanitized[column]) for column in columns ] sql = f"INSERT INTO `{table}` ({column_sql}) VALUES ({placeholders})" if key_columns: update_columns = [column for column in columns if column not in key_columns] assignments = ", ".join(f"`{column}` = VALUES(`{column}`)" for column in update_columns) if assignments: sql += f" ON DUPLICATE KEY UPDATE {assignments}" return sql, values def _insert(self, table: str, record: dict[str, Any]) -> None: sql, values = self._row_sql(table, record) with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute(sql, values) conn.commit() def _upsert( self, table: str, record: dict[str, Any], key_columns: tuple[str, ...], ) -> None: sql, values = self._row_sql(table, record, key_columns=key_columns) with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute(sql, values) conn.commit() def _fetch_one(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None: with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute(sql, params) return cur.fetchone() def _fetch_all(self, sql: str, params: tuple[Any, ...]) -> list[dict[str, Any]]: with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute(sql, params) return list(cur.fetchall()) def _run_has_runtime_records(self, run_id: str) -> bool: tables = [ "content_agent_runs", "content_agent_policy_runs", *RUNTIME_FILE_TABLES.values(), ] for table in tables: row = self._fetch_one( f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s", (run_id,), ) if row and row.get("cnt", 0): return True return False def _runtime_file_exists(self, run_id: str, filename: str) -> bool: table = _table_for_runtime_file(filename) row = self._fetch_one( f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s", (run_id,), ) return bool(row and row.get("cnt", 0)) def _record_for_json(filename: str, data: dict[str, Any]) -> tuple[str, dict[str, Any]]: table = _table_for_runtime_file(filename) if filename == "source_context.json": evidence_pack = data.get("ext_data", {}).get("evidence_pack") or {} return table, { "schema_version": DB_SCHEMA_VERSION, "run_id": data["run_id"], "demand_content_id": _int_or_none(data.get("demand_content_id")), "pattern_source_system": evidence_pack.get("pattern_source_system"), "source_kind": evidence_pack.get("source_kind"), "source_post_id": evidence_pack.get("source_post_id"), "pattern_execution_id": evidence_pack.get("pattern_execution_id"), "mining_config_id": evidence_pack.get("mining_config_id"), "evidence_pack": evidence_pack, "source_context": data, "raw_demand_content": data.get("raw_demand_content"), } if filename == "pattern_seed_pack.json": return table, { "schema_version": DB_SCHEMA_VERSION, "run_id": data["run_id"], "policy_run_id": data["policy_run_id"], "source_post_id": data.get("source_post_id"), "pattern_execution_id": data.get("pattern_execution_id"), "itemset_ids": _itemset_ids_from_seed_pack(data), "seed_terms": data.get("seed_terms"), "pattern_seed_pack": data, } if filename == "final_output.json": return table, { "schema_version": DB_SCHEMA_VERSION, "run_id": data["run_id"], "policy_run_id": data["policy_run_id"], "output_version": data.get("output_version", "v1"), "summary": data.get("summary"), "final_output": data, "validation_status": data.get("validation_status"), } if filename == "strategy_review.json": return table, { "schema_version": DB_SCHEMA_VERSION, "run_id": data["run_id"], "policy_run_id": data["policy_run_id"], "review_id": data["review_id"], "review_status": data.get("review_status", "generated"), "summary": data.get("summary"), "effective_search_queries": data.get("effective_search_queries"), "weak_search_queries": data.get("weak_search_queries"), "top_reject_reasons": data.get("top_reject_reasons"), "productive_paths": data.get("productive_paths"), "suggestions": data.get("suggestions"), "raw_payload": data.get("raw_payload", data), } raise ValueError(f"unsupported JSON runtime file: {filename}") def _record_for_jsonl(filename: str, row: dict[str, Any]) -> dict[str, Any]: if filename == "search_queries.jsonl": return _with_db_schema(row) if filename == "discovered_content_items.jsonl": return _with_db_schema(row) if filename == "content_media_records.jsonl": return _with_db_schema(row) if filename == "pattern_recall_evidence.jsonl": return _with_db_schema(row) if filename == "rule_decisions.jsonl": return _with_db_schema(row) if filename == "walk_actions.jsonl": return _with_db_schema(row) if filename == "run_events.jsonl": return _with_db_schema(row) if filename == "source_path_records.jsonl": return _with_db_schema(row) if filename == "search_clues.jsonl": return _with_db_schema(row) raise ValueError(f"unsupported JSONL runtime file: {filename}") def _run_record(record: dict[str, Any]) -> dict[str, Any]: return _with_db_schema(record) def _policy_run_record(record: dict[str, Any]) -> dict[str, Any]: return _with_db_schema(record) def _with_db_schema(record: dict[str, Any]) -> dict[str, Any]: return {**record, "schema_version": DB_SCHEMA_VERSION} def _sanitize_record(table: str, record: dict[str, Any]) -> dict[str, Any]: result = { column: value for column, value in record.items() if column in _allowed_columns(table) and value is not None } raw_payload = result.get("raw_payload") if isinstance(raw_payload, dict): _assert_no_forbidden_raw_payload_keys(raw_payload) return result def _allowed_columns(table: str) -> set[str]: return TABLE_COLUMNS[table] TABLE_COLUMNS = { "content_agent_source_contexts": { "schema_version", "run_id", "demand_content_id", "pattern_source_system", "source_kind", "source_post_id", "pattern_execution_id", "mining_config_id", "evidence_pack", "source_context", "raw_demand_content", "created_at", }, "content_agent_pattern_seed_packs": { "schema_version", "run_id", "policy_run_id", "source_post_id", "pattern_execution_id", "itemset_ids", "seed_terms", "pattern_seed_pack", "created_at", }, "content_agent_queries": { "schema_version", "run_id", "policy_run_id", "search_query_id", "search_query", "search_query_generation_method", "discovery_start_source", "previous_discovery_step", "search_query_effect_status", "pattern_seed_ref", "raw_payload", "created_at", }, "content_agent_discovered_content_items": { "schema_version", "run_id", "policy_run_id", "content_discovery_id", "search_query_id", "platform", "platform_content_id", "platform_content_format", "platform_content_url", "description", "platform_author_id", "author_display_name", "discovery_start_source", "previous_discovery_step", "statistics", "tags", "text_extra", "source_evidence", "pattern_match_result", "platform_raw_payload", "raw_payload", "created_at", }, "content_agent_content_media_records": { "schema_version", "run_id", "policy_run_id", "platform", "platform_content_id", "content_media_status", "content_metadata_source", "play_url", "local_path", "oss_url", "raw_payload", "created_at", }, "content_agent_pattern_recall_evidence": { "schema_version", "run_id", "policy_run_id", "recall_evidence_id", "content_discovery_id", "platform_content_id", "recall_status", "evidence_summary", "raw_payload", "created_at", }, "content_agent_rule_decisions": { "schema_version", "run_id", "policy_run_id", "decision_id", "policy_bundle_id", "rule_pack_id", "rule_pack_version", "strategy_version", "decision_target_type", "decision_target_id", "decision_action", "decision_reason_code", "search_query_effect_status", "score", "triggered_blocking_rules", "scorecard", "source_evidence", "decision_replay_data", "raw_payload", "created_at", }, "content_agent_walk_actions": { "schema_version", "run_id", "policy_run_id", "walk_action_id", "edge_id", "edge_type", "from_node_type", "from_node_id", "to_node_type", "to_node_id", "walk_action", "walk_status", "budget_tier", "depth", "page_cursor", "next_cursor", "decision_id", "rule_pack_id", "rule_pack_version", "reason_code", "source_path_record_id", "raw_payload", "created_at", "updated_at", }, "content_agent_source_path_records": { "schema_version", "run_id", "policy_run_id", "source_path_record_id", "source_path_type", "from_node_type", "from_node_id", "to_node_type", "to_node_id", "decision_id", "rule_pack_id", "discovery_start_source", "previous_discovery_step", "origin_path_id", "source_evidence_ref", "raw_payload", "created_at", }, "content_agent_search_clues": { "schema_version", "run_id", "policy_run_id", "clue_id", "search_query_id", "search_query", "discovery_start_source", "previous_discovery_step", "result_count", "pooled_content_count", "review_content_count", "pending_content_count", "rejected_content_count", "search_query_effect_status", "walk_next_step", "raw_payload", "created_at", }, "content_agent_run_events": { "schema_version", "run_id", "policy_run_id", "event_id", "event_type", "status", "input_ref", "output_ref", "error_code", "message", "raw_payload", "created_at", }, "content_agent_final_outputs": { "schema_version", "run_id", "policy_run_id", "output_version", "summary", "final_output", "validation_status", "created_at", "updated_at", }, "content_agent_publish_jobs": { "schema_version", "run_id", "policy_run_id", "publish_job_id", "platform_content_id", "job_status", "trigger_mode", "crawler_plan_id", "produce_plan_id", "publish_plan_id", "request_payload", "response_payload", "error_code", "error_message", "created_at", "updated_at", }, "content_agent_author_assets": { "schema_version", "author_asset_id", "platform", "platform_author_id", "author_display_name", "author_profile_url", "asset_status", "source_type", "validation_status", "eligible_as_source", "elderly_ratio", "elderly_tgi", "content_tags", "source_run_id", "source_policy_run_id", "last_profile_fetch_at", "last_works_fetch_at", "last_validated_at", "profile_snapshot", "evidence_refs", "raw_payload", "created_at", "updated_at", }, "content_agent_author_asset_roles": { "schema_version", "author_asset_id", "role", "role_status", "role_reason_code", "assigned_by", "source_run_id", "raw_payload", "created_at", "updated_at", }, "content_agent_strategy_reviews": { "schema_version", "run_id", "policy_run_id", "review_id", "review_status", "summary", "effective_search_queries", "weak_search_queries", "top_reject_reasons", "productive_paths", "suggestions", "raw_payload", "created_at", }, "content_agent_performance_feedback": { "schema_version", "run_id", "policy_run_id", "feedback_id", "platform", "platform_content_id", "content_asset_id", "feedback_source", "feedback_status", "metric_window_start", "metric_window_end", "completion_rate", "share_rate", "average_watch_seconds", "total_watch_seconds", "impression_count", "play_count", "like_count", "comment_count", "share_count", "collect_count", "raw_payload", "created_at", "updated_at", }, "content_agent_search_clue_assets": { "schema_version", "search_clue_asset_id", "platform", "clue_type", "normalized_clue_text", "display_clue_text", "promotion_status", "reusable_priority", "can_seed_next_run", "first_seen_run_id", "first_seen_policy_run_id", "last_validated_at", "summary_metrics", "raw_payload", "created_at", "updated_at", }, "content_agent_search_clue_asset_evidence": { "schema_version", "evidence_id", "search_clue_asset_id", "run_id", "policy_run_id", "clue_id", "search_query_id", "pooled_content_count", "review_content_count", "failed_content_count", "source_path_record_ids", "decision_ids", "performance_feedback_refs", "raw_payload", "created_at", }, "content_agent_runs": { "schema_version", "run_id", "demand_content_id", "run_label", "platform", "platform_mode", "strategy_version", "status", "current_step", "validation_status", "source_ref", "error_code", "error_message", "error_detail", "started_at", "completed_at", "created_at", "updated_at", }, "content_agent_policy_runs": { "schema_version", "run_id", "policy_run_id", "experiment_name", "run_role", "policy_bundle_id", "rule_pack_id", "strategy_id", "strategy_version", "rule_pack_version", "walk_strategy_version", "policy_bundle_hash", "strategy_source_ref", "rule_pack_source_ref", "evidence_bundle_schema_version", "runtime_record_schema_version", "status", "metrics", "decision_summary", "raw_payload", "created_at", }, } def _table_for_runtime_file(filename: str) -> str: try: return RUNTIME_FILE_TABLES[filename] except KeyError as exc: raise ValueError(f"unsupported runtime file: {filename}") from exc def _db_value(table: str, column: str, value: Any) -> Any: if column in JSON_COLUMNS_BY_TABLE.get(table, set()): return _json_dump(value) if column in DATETIME_COLUMNS: return _datetime_value(value) return value def _json_dump(value: Any) -> str | None: if value is None: return None return json.dumps(value, ensure_ascii=False, separators=(",", ":")) def _decode_json_payload(value: Any) -> Any: if value is None: return None if isinstance(value, (dict, list)): return value return json.loads(value) def _runtime_payload(payload: dict[str, Any]) -> dict[str, Any]: if "raw_payload" in payload: return payload return {**payload, "raw_payload": dict(payload)} def _datetime_value(value: Any) -> Any: if value is None or isinstance(value, datetime): return value if isinstance(value, str): try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return value if parsed.tzinfo: parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None) return parsed return value def _itemset_ids_from_seed_pack(data: dict[str, Any]) -> list[Any]: if data.get("itemset_ids"): return list(data["itemset_ids"]) itemsets = data.get("itemsets") or [] ids = [ itemset.get("itemset_id") for itemset in itemsets if isinstance(itemset, dict) and itemset.get("itemset_id") is not None ] return list(dict.fromkeys(ids)) def _int_or_none(value: Any) -> int | None: if value in (None, ""): return None return int(value) def _assert_no_forbidden_raw_payload_keys(payload: Any) -> None: if isinstance(payload, list): for item in payload: _assert_no_forbidden_raw_payload_keys(item) return if not isinstance(payload, dict): return for key, value in payload.items(): lowered = str(key).lower() if lowered in FORBIDDEN_RAW_PAYLOAD_KEYS: raise ValueError(f"raw_payload contains forbidden key: {key}") _assert_no_forbidden_raw_payload_keys(value) def _load_env_file(path_value: str | Path | None) -> dict[str, str]: if not path_value: return {} path = Path(path_value) if not path.exists(): return {} result: dict[str, str] = {} for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: continue key, value = stripped.split("=", 1) result[key.strip()] = value.strip().strip('"').strip("'") return result