| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031 |
- from __future__ import annotations
- import json
- import os
- from dataclasses import dataclass
- from datetime import datetime, timezone
- from pathlib import Path
- from typing import Any, Callable
- import pymysql
- from content_agent.constants import DB_SCHEMA_VERSION
- ConnectionFactory = Callable[[], Any]
- FORBIDDEN_RAW_PAYLOAD_KEYS = {
- "password",
- "token",
- "access_token",
- "refresh_token",
- "api_key",
- "apikey",
- "secret",
- "dsn",
- "authorization",
- "cookie",
- "session",
- "credential",
- }
- RUNTIME_FILE_TABLES = {
- "source_context.json": "content_agent_source_contexts",
- "pattern_seed_pack.json": "content_agent_pattern_seed_packs",
- "search_queries.jsonl": "content_agent_queries",
- "discovered_content_items.jsonl": "content_agent_discovered_content_items",
- "content_media_records.jsonl": "content_agent_content_media_records",
- "pattern_recall_evidence.jsonl": "content_agent_pattern_recall_evidence",
- "rule_decisions.jsonl": "content_agent_rule_decisions",
- "walk_actions.jsonl": "content_agent_walk_actions",
- "run_events.jsonl": "content_agent_run_events",
- "source_path_records.jsonl": "content_agent_source_path_records",
- "search_clues.jsonl": "content_agent_search_clues",
- "final_output.json": "content_agent_final_outputs",
- "strategy_review.json": "content_agent_strategy_reviews",
- }
- JSON_COLUMNS_BY_TABLE = {
- "content_agent_source_contexts": {"evidence_pack", "source_context", "raw_demand_content"},
- "content_agent_pattern_seed_packs": {"itemset_ids", "seed_terms", "pattern_seed_pack"},
- "content_agent_queries": {"pattern_seed_ref", "raw_payload"},
- "content_agent_discovered_content_items": {
- "statistics",
- "tags",
- "text_extra",
- "source_evidence",
- "pattern_match_result",
- "platform_raw_payload",
- "raw_payload",
- },
- "content_agent_content_media_records": {"raw_payload"},
- "content_agent_pattern_recall_evidence": {
- "evidence_summary",
- "raw_payload",
- },
- "content_agent_rule_decisions": {
- "triggered_blocking_rules",
- "scorecard",
- "source_evidence",
- "decision_replay_data",
- "raw_payload",
- },
- "content_agent_walk_actions": {"raw_payload"},
- "content_agent_source_path_records": {"raw_payload"},
- "content_agent_search_clues": {"raw_payload"},
- "content_agent_run_events": {"raw_payload"},
- "content_agent_final_outputs": {"summary", "final_output"},
- "content_agent_publish_jobs": {"request_payload", "response_payload"},
- "content_agent_author_assets": {
- "content_tags",
- "profile_snapshot",
- "evidence_refs",
- "raw_payload",
- },
- "content_agent_author_asset_roles": {"raw_payload"},
- "content_agent_search_clue_assets": {"summary_metrics", "raw_payload"},
- "content_agent_search_clue_asset_evidence": {
- "source_path_record_ids",
- "decision_ids",
- "performance_feedback_refs",
- "raw_payload",
- },
- "content_agent_strategy_reviews": {
- "summary",
- "effective_search_queries",
- "weak_search_queries",
- "top_reject_reasons",
- "productive_paths",
- "suggestions",
- "raw_payload",
- },
- "content_agent_performance_feedback": {"raw_payload"},
- "content_agent_runs": {"source_ref", "error_detail"},
- "content_agent_policy_runs": {
- "strategy_source_ref",
- "rule_pack_source_ref",
- "metrics",
- "decision_summary",
- "raw_payload",
- },
- }
- DATETIME_COLUMNS = {"started_at", "completed_at", "created_at", "updated_at"}
- JSON_FILE_PAYLOAD_COLUMNS = {
- "source_context.json": "source_context",
- "pattern_seed_pack.json": "pattern_seed_pack",
- "final_output.json": "final_output",
- "strategy_review.json": "raw_payload",
- }
- JSONL_UPSERT_KEYS = {
- "search_queries.jsonl": ("run_id", "policy_run_id", "search_query_id"),
- "pattern_recall_evidence.jsonl": ("run_id", "policy_run_id", "recall_evidence_id"),
- "search_clues.jsonl": ("run_id", "policy_run_id", "clue_id"),
- "run_events.jsonl": ("run_id", "policy_run_id", "event_id"),
- }
- @dataclass(frozen=True)
- class ContentSupplyDbConfig:
- host: str
- port: int
- user: str
- password: str
- database: str
- timeout: int = 8
- @classmethod
- def from_env(
- cls,
- env_file: str | Path | None = ".env",
- ) -> "ContentSupplyDbConfig":
- env: dict[str, str] = _load_env_file(env_file)
- env.update({
- key: os.environ[key]
- for key in [
- "CONTENT_SUPPLY_DB_HOST",
- "CONTENT_SUPPLY_DB_PORT",
- "CONTENT_SUPPLY_DB_NAME",
- "CONTENT_SUPPLY_DB_USER",
- "CONTENT_SUPPLY_DB_PASSWORD",
- ]
- if os.environ.get(key)
- })
- required_keys = [
- "CONTENT_SUPPLY_DB_HOST",
- "CONTENT_SUPPLY_DB_PORT",
- "CONTENT_SUPPLY_DB_NAME",
- "CONTENT_SUPPLY_DB_USER",
- "CONTENT_SUPPLY_DB_PASSWORD",
- ]
- missing_keys = [key for key in required_keys if not env.get(key)]
- if missing_keys:
- raise ValueError(f"Missing required CFA db env keys: {', '.join(missing_keys)}")
- return cls(
- host=env["CONTENT_SUPPLY_DB_HOST"],
- port=int(env["CONTENT_SUPPLY_DB_PORT"]),
- user=env["CONTENT_SUPPLY_DB_USER"],
- password=env["CONTENT_SUPPLY_DB_PASSWORD"],
- database=env["CONTENT_SUPPLY_DB_NAME"],
- )
- def connect(self) -> Any:
- return pymysql.connect(
- host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- charset="utf8mb4",
- cursorclass=pymysql.cursors.DictCursor,
- connect_timeout=self.timeout,
- read_timeout=self.timeout,
- write_timeout=self.timeout,
- )
- class DatabaseRuntimeStore:
- def __init__(
- self,
- config: ContentSupplyDbConfig,
- connection_factory: ConnectionFactory | None = None,
- ) -> None:
- self.config = config
- self._connection_factory = connection_factory or config.connect
- def prepare_run(self, run_id: str) -> Path:
- if self._run_has_runtime_records(run_id):
- raise FileExistsError(f"run already exists in database runtime tables: {run_id}")
- return self.run_dir(run_id)
- def run_dir(self, run_id: str) -> Path:
- return Path("database_runtime") / run_id
- def write_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- table, record = _record_for_json(filename, data)
- if record["run_id"] != run_id:
- raise ValueError(f"{filename} run_id does not match runtime run_id")
- self._insert(table, record)
- return self.run_dir(run_id) / filename
- def update_json(self, run_id: str, filename: str, data: dict[str, Any]) -> Path:
- table, record = _record_for_json(filename, data)
- if record["run_id"] != run_id:
- raise ValueError(f"{filename} run_id does not match runtime run_id")
- if filename == "final_output.json":
- self._upsert(
- table,
- record,
- key_columns=("run_id", "policy_run_id", "output_version"),
- )
- return self.run_dir(run_id) / filename
- self._insert(table, record)
- return self.run_dir(run_id) / filename
- def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
- table = _table_for_runtime_file(filename)
- # 整批共用一个连接、一次 commit:避免每行新建连接+commit 的 N 次网络往返。
- statements: list[tuple[str, list[Any]]] = []
- for row in rows:
- if row.get("run_id") != run_id:
- raise ValueError(f"{filename} row run_id does not match runtime run_id")
- record = _record_for_jsonl(filename, row)
- statements.append(
- self._row_sql(table, record, key_columns=JSONL_UPSERT_KEYS.get(filename))
- )
- if statements:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- for sql, values in statements:
- cur.execute(sql, values)
- conn.commit()
- return self.run_dir(run_id) / filename
- def read_json(self, run_id: str, filename: str) -> dict[str, Any]:
- table = _table_for_runtime_file(filename)
- payload_column = JSON_FILE_PAYLOAD_COLUMNS.get(filename)
- if not payload_column:
- raise ValueError(f"{filename} is not a JSON runtime file")
- row = self._fetch_one(
- f"SELECT `{payload_column}` FROM `{table}` WHERE `run_id` = %s ORDER BY `id` DESC LIMIT 1",
- (run_id,),
- )
- if not row:
- raise FileNotFoundError(f"{filename} not found for run: {run_id}")
- payload = _decode_json_payload(row[payload_column]) or {}
- if payload_column == "raw_payload":
- return _runtime_payload(payload)
- return payload
- def read_jsonl(self, run_id: str, filename: str) -> list[dict[str, Any]]:
- table = _table_for_runtime_file(filename)
- rows = self._fetch_all(
- f"SELECT `raw_payload` FROM `{table}` WHERE `run_id` = %s ORDER BY `id`",
- (run_id,),
- )
- return [_runtime_payload(_decode_json_payload(row["raw_payload"]) or {}) for row in rows]
- def read_performance_feedback(
- self,
- run_id: str,
- policy_run_id: str,
- ) -> list[dict[str, Any]]:
- rows = self._fetch_all(
- "SELECT `raw_payload` FROM `content_agent_performance_feedback` "
- "WHERE `run_id` = %s AND `policy_run_id` = %s ORDER BY `id`",
- (run_id, policy_run_id),
- )
- return [_runtime_payload(_decode_json_payload(row["raw_payload"]) or {}) for row in rows]
- def file_status(self, run_id: str) -> dict[str, bool]:
- return {
- filename: self._runtime_file_exists(run_id, filename)
- for filename in RUNTIME_FILE_TABLES
- }
- def create_run_record(self, record: dict[str, Any]) -> None:
- self._insert("content_agent_runs", _run_record(record))
- def update_run_record(self, run_id: str, updates: dict[str, Any]) -> None:
- if not updates:
- return
- record = _sanitize_record("content_agent_runs", updates)
- if not record:
- return
- assignments = ", ".join(f"`{column}` = %s" for column in record)
- params = [
- _db_value("content_agent_runs", column, value)
- for column, value in record.items()
- ]
- params.append(run_id)
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(
- f"UPDATE `content_agent_runs` SET {assignments} WHERE `run_id` = %s",
- params,
- )
- conn.commit()
- def record_policy_run(self, record: dict[str, Any]) -> None:
- self._insert("content_agent_policy_runs", _policy_run_record(record))
- def append_run_event_records(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- prepared_rows = [
- {**row, "run_id": run_id, "policy_run_id": row.get("policy_run_id", policy_run_id)}
- for row in rows
- ]
- self.append_jsonl(run_id, "run_events.jsonl", prepared_rows)
- def write_publish_jobs(
- self,
- run_id: str,
- policy_run_id: str,
- rows: list[dict[str, Any]],
- ) -> None:
- for row in rows:
- record = {
- **row,
- "run_id": run_id,
- "policy_run_id": row.get("policy_run_id", policy_run_id),
- }
- self._upsert(
- "content_agent_publish_jobs",
- _with_db_schema(record),
- key_columns=("run_id", "policy_run_id", "publish_job_id"),
- )
- def write_author_assets(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_author_assets",
- _with_db_schema(row),
- key_columns=("author_asset_id",),
- )
- def write_author_asset_roles(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_author_asset_roles",
- _with_db_schema(row),
- key_columns=("author_asset_id", "role"),
- )
- def write_search_clue_assets(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_search_clue_assets",
- _with_db_schema(row),
- key_columns=("search_clue_asset_id",),
- )
- def write_search_clue_asset_evidence(self, rows: list[dict[str, Any]]) -> None:
- for row in rows:
- self._upsert(
- "content_agent_search_clue_asset_evidence",
- _with_db_schema(row),
- key_columns=("run_id", "policy_run_id", "clue_id"),
- )
- def _row_sql(
- self,
- table: str,
- record: dict[str, Any],
- key_columns: tuple[str, ...] | None = None,
- ) -> tuple[str, list[Any]]:
- sanitized = _sanitize_record(table, record)
- columns = list(sanitized)
- placeholders = ", ".join(["%s"] * len(columns))
- column_sql = ", ".join(f"`{column}`" for column in columns)
- values = [
- _db_value(table, column, sanitized[column])
- for column in columns
- ]
- sql = f"INSERT INTO `{table}` ({column_sql}) VALUES ({placeholders})"
- if key_columns:
- update_columns = [column for column in columns if column not in key_columns]
- assignments = ", ".join(f"`{column}` = VALUES(`{column}`)" for column in update_columns)
- if assignments:
- sql += f" ON DUPLICATE KEY UPDATE {assignments}"
- return sql, values
- def _insert(self, table: str, record: dict[str, Any]) -> None:
- sql, values = self._row_sql(table, record)
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, values)
- conn.commit()
- def _upsert(
- self,
- table: str,
- record: dict[str, Any],
- key_columns: tuple[str, ...],
- ) -> None:
- sql, values = self._row_sql(table, record, key_columns=key_columns)
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, values)
- conn.commit()
- def _fetch_one(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return cur.fetchone()
- def _fetch_all(self, sql: str, params: tuple[Any, ...]) -> list[dict[str, Any]]:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return list(cur.fetchall())
- def _run_has_runtime_records(self, run_id: str) -> bool:
- tables = [
- "content_agent_runs",
- "content_agent_policy_runs",
- *RUNTIME_FILE_TABLES.values(),
- ]
- for table in tables:
- row = self._fetch_one(
- f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
- (run_id,),
- )
- if row and row.get("cnt", 0):
- return True
- return False
- def _runtime_file_exists(self, run_id: str, filename: str) -> bool:
- table = _table_for_runtime_file(filename)
- row = self._fetch_one(
- f"SELECT COUNT(*) AS cnt FROM `{table}` WHERE `run_id` = %s",
- (run_id,),
- )
- return bool(row and row.get("cnt", 0))
- def _record_for_json(filename: str, data: dict[str, Any]) -> tuple[str, dict[str, Any]]:
- table = _table_for_runtime_file(filename)
- if filename == "source_context.json":
- evidence_pack = data.get("ext_data", {}).get("evidence_pack") or {}
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "demand_content_id": _int_or_none(data.get("demand_content_id")),
- "pattern_source_system": evidence_pack.get("pattern_source_system"),
- "source_kind": evidence_pack.get("source_kind"),
- "source_post_id": evidence_pack.get("source_post_id"),
- "pattern_execution_id": evidence_pack.get("pattern_execution_id"),
- "mining_config_id": evidence_pack.get("mining_config_id"),
- "evidence_pack": evidence_pack,
- "source_context": data,
- "raw_demand_content": data.get("raw_demand_content"),
- }
- if filename == "pattern_seed_pack.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "source_post_id": data.get("source_post_id"),
- "pattern_execution_id": data.get("pattern_execution_id"),
- "itemset_ids": _itemset_ids_from_seed_pack(data),
- "seed_terms": data.get("seed_terms"),
- "pattern_seed_pack": data,
- }
- if filename == "final_output.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "output_version": data.get("output_version", "v1"),
- "summary": data.get("summary"),
- "final_output": data,
- "validation_status": data.get("validation_status"),
- }
- if filename == "strategy_review.json":
- return table, {
- "schema_version": DB_SCHEMA_VERSION,
- "run_id": data["run_id"],
- "policy_run_id": data["policy_run_id"],
- "review_id": data["review_id"],
- "review_status": data.get("review_status", "generated"),
- "summary": data.get("summary"),
- "effective_search_queries": data.get("effective_search_queries"),
- "weak_search_queries": data.get("weak_search_queries"),
- "top_reject_reasons": data.get("top_reject_reasons"),
- "productive_paths": data.get("productive_paths"),
- "suggestions": data.get("suggestions"),
- "raw_payload": data.get("raw_payload", data),
- }
- raise ValueError(f"unsupported JSON runtime file: {filename}")
- def _record_for_jsonl(filename: str, row: dict[str, Any]) -> dict[str, Any]:
- if filename == "search_queries.jsonl":
- return _with_db_schema(row)
- if filename == "discovered_content_items.jsonl":
- return _with_db_schema(row)
- if filename == "content_media_records.jsonl":
- return _with_db_schema(row)
- if filename == "pattern_recall_evidence.jsonl":
- return _with_db_schema(row)
- if filename == "rule_decisions.jsonl":
- return _with_db_schema(row)
- if filename == "walk_actions.jsonl":
- return _with_db_schema(row)
- if filename == "run_events.jsonl":
- return _with_db_schema(row)
- if filename == "source_path_records.jsonl":
- return _with_db_schema(row)
- if filename == "search_clues.jsonl":
- return _with_db_schema(row)
- raise ValueError(f"unsupported JSONL runtime file: {filename}")
- def _run_record(record: dict[str, Any]) -> dict[str, Any]:
- return _with_db_schema(record)
- def _policy_run_record(record: dict[str, Any]) -> dict[str, Any]:
- return _with_db_schema(record)
- def _with_db_schema(record: dict[str, Any]) -> dict[str, Any]:
- return {**record, "schema_version": DB_SCHEMA_VERSION}
- def _sanitize_record(table: str, record: dict[str, Any]) -> dict[str, Any]:
- result = {
- column: value
- for column, value in record.items()
- if column in _allowed_columns(table) and value is not None
- }
- raw_payload = result.get("raw_payload")
- if isinstance(raw_payload, dict):
- _assert_no_forbidden_raw_payload_keys(raw_payload)
- return result
- def _allowed_columns(table: str) -> set[str]:
- return TABLE_COLUMNS[table]
- TABLE_COLUMNS = {
- "content_agent_source_contexts": {
- "schema_version",
- "run_id",
- "demand_content_id",
- "pattern_source_system",
- "source_kind",
- "source_post_id",
- "pattern_execution_id",
- "mining_config_id",
- "evidence_pack",
- "source_context",
- "raw_demand_content",
- "created_at",
- },
- "content_agent_pattern_seed_packs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "source_post_id",
- "pattern_execution_id",
- "itemset_ids",
- "seed_terms",
- "pattern_seed_pack",
- "created_at",
- },
- "content_agent_queries": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "search_query_id",
- "search_query",
- "search_query_generation_method",
- "discovery_start_source",
- "previous_discovery_step",
- "search_query_effect_status",
- "pattern_seed_ref",
- "raw_payload",
- "created_at",
- },
- "content_agent_discovered_content_items": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "content_discovery_id",
- "search_query_id",
- "platform",
- "platform_content_id",
- "platform_content_format",
- "platform_content_url",
- "description",
- "platform_author_id",
- "author_display_name",
- "discovery_start_source",
- "previous_discovery_step",
- "statistics",
- "tags",
- "text_extra",
- "source_evidence",
- "pattern_match_result",
- "platform_raw_payload",
- "raw_payload",
- "created_at",
- },
- "content_agent_content_media_records": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "platform",
- "platform_content_id",
- "content_media_status",
- "content_metadata_source",
- "play_url",
- "local_path",
- "oss_url",
- "raw_payload",
- "created_at",
- },
- "content_agent_pattern_recall_evidence": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "recall_evidence_id",
- "content_discovery_id",
- "platform_content_id",
- "recall_status",
- "evidence_summary",
- "raw_payload",
- "created_at",
- },
- "content_agent_rule_decisions": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "decision_id",
- "policy_bundle_id",
- "rule_pack_id",
- "rule_pack_version",
- "strategy_version",
- "decision_target_type",
- "decision_target_id",
- "decision_action",
- "decision_reason_code",
- "search_query_effect_status",
- "score",
- "triggered_blocking_rules",
- "scorecard",
- "source_evidence",
- "decision_replay_data",
- "raw_payload",
- "created_at",
- },
- "content_agent_walk_actions": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "walk_action_id",
- "edge_id",
- "edge_type",
- "from_node_type",
- "from_node_id",
- "to_node_type",
- "to_node_id",
- "walk_action",
- "walk_status",
- "budget_tier",
- "depth",
- "page_cursor",
- "next_cursor",
- "decision_id",
- "rule_pack_id",
- "rule_pack_version",
- "reason_code",
- "source_path_record_id",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_source_path_records": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "source_path_record_id",
- "source_path_type",
- "from_node_type",
- "from_node_id",
- "to_node_type",
- "to_node_id",
- "decision_id",
- "rule_pack_id",
- "discovery_start_source",
- "previous_discovery_step",
- "origin_path_id",
- "source_evidence_ref",
- "raw_payload",
- "created_at",
- },
- "content_agent_search_clues": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "clue_id",
- "search_query_id",
- "search_query",
- "discovery_start_source",
- "previous_discovery_step",
- "result_count",
- "pooled_content_count",
- "review_content_count",
- "pending_content_count",
- "rejected_content_count",
- "search_query_effect_status",
- "walk_next_step",
- "raw_payload",
- "created_at",
- },
- "content_agent_run_events": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "event_id",
- "event_type",
- "status",
- "input_ref",
- "output_ref",
- "error_code",
- "message",
- "raw_payload",
- "created_at",
- },
- "content_agent_final_outputs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "output_version",
- "summary",
- "final_output",
- "validation_status",
- "created_at",
- "updated_at",
- },
- "content_agent_publish_jobs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "publish_job_id",
- "platform_content_id",
- "job_status",
- "trigger_mode",
- "crawler_plan_id",
- "produce_plan_id",
- "publish_plan_id",
- "request_payload",
- "response_payload",
- "error_code",
- "error_message",
- "created_at",
- "updated_at",
- },
- "content_agent_author_assets": {
- "schema_version",
- "author_asset_id",
- "platform",
- "platform_author_id",
- "author_display_name",
- "author_profile_url",
- "asset_status",
- "source_type",
- "validation_status",
- "eligible_as_source",
- "elderly_ratio",
- "elderly_tgi",
- "content_tags",
- "source_run_id",
- "source_policy_run_id",
- "last_profile_fetch_at",
- "last_works_fetch_at",
- "last_validated_at",
- "profile_snapshot",
- "evidence_refs",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_author_asset_roles": {
- "schema_version",
- "author_asset_id",
- "role",
- "role_status",
- "role_reason_code",
- "assigned_by",
- "source_run_id",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_strategy_reviews": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "review_id",
- "review_status",
- "summary",
- "effective_search_queries",
- "weak_search_queries",
- "top_reject_reasons",
- "productive_paths",
- "suggestions",
- "raw_payload",
- "created_at",
- },
- "content_agent_performance_feedback": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "feedback_id",
- "platform",
- "platform_content_id",
- "content_asset_id",
- "feedback_source",
- "feedback_status",
- "metric_window_start",
- "metric_window_end",
- "completion_rate",
- "share_rate",
- "average_watch_seconds",
- "total_watch_seconds",
- "impression_count",
- "play_count",
- "like_count",
- "comment_count",
- "share_count",
- "collect_count",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_search_clue_assets": {
- "schema_version",
- "search_clue_asset_id",
- "platform",
- "clue_type",
- "normalized_clue_text",
- "display_clue_text",
- "promotion_status",
- "reusable_priority",
- "can_seed_next_run",
- "first_seen_run_id",
- "first_seen_policy_run_id",
- "last_validated_at",
- "summary_metrics",
- "raw_payload",
- "created_at",
- "updated_at",
- },
- "content_agent_search_clue_asset_evidence": {
- "schema_version",
- "evidence_id",
- "search_clue_asset_id",
- "run_id",
- "policy_run_id",
- "clue_id",
- "search_query_id",
- "pooled_content_count",
- "review_content_count",
- "failed_content_count",
- "source_path_record_ids",
- "decision_ids",
- "performance_feedback_refs",
- "raw_payload",
- "created_at",
- },
- "content_agent_runs": {
- "schema_version",
- "run_id",
- "demand_content_id",
- "run_label",
- "platform",
- "platform_mode",
- "strategy_version",
- "status",
- "current_step",
- "validation_status",
- "source_ref",
- "error_code",
- "error_message",
- "error_detail",
- "started_at",
- "completed_at",
- "created_at",
- "updated_at",
- },
- "content_agent_policy_runs": {
- "schema_version",
- "run_id",
- "policy_run_id",
- "experiment_name",
- "run_role",
- "policy_bundle_id",
- "rule_pack_id",
- "strategy_id",
- "strategy_version",
- "rule_pack_version",
- "walk_strategy_version",
- "policy_bundle_hash",
- "strategy_source_ref",
- "rule_pack_source_ref",
- "evidence_bundle_schema_version",
- "runtime_record_schema_version",
- "status",
- "metrics",
- "decision_summary",
- "raw_payload",
- "created_at",
- },
- }
- def _table_for_runtime_file(filename: str) -> str:
- try:
- return RUNTIME_FILE_TABLES[filename]
- except KeyError as exc:
- raise ValueError(f"unsupported runtime file: {filename}") from exc
- def _db_value(table: str, column: str, value: Any) -> Any:
- if column in JSON_COLUMNS_BY_TABLE.get(table, set()):
- return _json_dump(value)
- if column in DATETIME_COLUMNS:
- return _datetime_value(value)
- return value
- def _json_dump(value: Any) -> str | None:
- if value is None:
- return None
- return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
- def _decode_json_payload(value: Any) -> Any:
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- return json.loads(value)
- def _runtime_payload(payload: dict[str, Any]) -> dict[str, Any]:
- if "raw_payload" in payload:
- return payload
- return {**payload, "raw_payload": dict(payload)}
- def _datetime_value(value: Any) -> Any:
- if value is None or isinstance(value, datetime):
- return value
- if isinstance(value, str):
- try:
- parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
- except ValueError:
- return value
- if parsed.tzinfo:
- parsed = parsed.astimezone(timezone.utc).replace(tzinfo=None)
- return parsed
- return value
- def _itemset_ids_from_seed_pack(data: dict[str, Any]) -> list[Any]:
- if data.get("itemset_ids"):
- return list(data["itemset_ids"])
- itemsets = data.get("itemsets") or []
- ids = [
- itemset.get("itemset_id")
- for itemset in itemsets
- if isinstance(itemset, dict) and itemset.get("itemset_id") is not None
- ]
- return list(dict.fromkeys(ids))
- def _int_or_none(value: Any) -> int | None:
- if value in (None, ""):
- return None
- return int(value)
- def _assert_no_forbidden_raw_payload_keys(payload: Any) -> None:
- if isinstance(payload, list):
- for item in payload:
- _assert_no_forbidden_raw_payload_keys(item)
- return
- if not isinstance(payload, dict):
- return
- for key, value in payload.items():
- lowered = str(key).lower()
- if lowered in FORBIDDEN_RAW_PAYLOAD_KEYS:
- raise ValueError(f"raw_payload contains forbidden key: {key}")
- _assert_no_forbidden_raw_payload_keys(value)
- def _load_env_file(path_value: str | Path | None) -> dict[str, str]:
- if not path_value:
- return {}
- path = Path(path_value)
- if not path.exists():
- return {}
- result: dict[str, str] = {}
- for line in path.read_text(encoding="utf-8").splitlines():
- stripped = line.strip()
- if not stripped or stripped.startswith("#") or "=" not in stripped:
- continue
- key, value = stripped.split("=", 1)
- result[key.strip()] = value.strip().strip('"').strip("'")
- return result
|