import json import re from pathlib import Path from content_agent.integrations.database_runtime import ( ContentSupplyDbConfig, DatabaseRuntimeStore, ) class FakeCursor: def __init__(self, connection): self.connection = connection self._one = None self._all = [] def __enter__(self): return self def __exit__(self, *_args): return None def execute(self, sql, params=None): self.connection.statements.append((sql, list(params or []))) if sql.startswith("SELECT COUNT(*)"): self._one = {"cnt": 0} self._all = [] elif sql.startswith("SELECT"): self._one = self.connection.select_one_result self._all = self.connection.select_all_result def fetchone(self): return self._one def fetchall(self): return self._all class FakeConnection: def __init__(self): self.statements = [] self.commit_count = 0 self.select_one_result = None self.select_all_result = [] def __enter__(self): return self def __exit__(self, *_args): return None def cursor(self): return FakeCursor(self) def commit(self): self.commit_count += 1 def test_content_supply_db_config_reads_env_file(tmp_path): env_file = tmp_path / ".env" env_file.write_text( "\n".join( [ "CONTENT_SUPPLY_DB_HOST=127.0.0.1", "CONTENT_SUPPLY_DB_PORT=3307", "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply", "CONTENT_SUPPLY_DB_USER=content_rw", "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password", ] ), encoding="utf-8", ) config = ContentSupplyDbConfig.from_env(env_file=env_file) assert config.host == "127.0.0.1" assert config.port == 3307 assert config.database == "content-deconstruction-supply" assert config.user == "content_rw" def test_content_supply_db_config_requires_all_project_db_keys(tmp_path): env_file = tmp_path / ".env" env_file.write_text( "\n".join( [ "CONTENT_SUPPLY_DB_HOST=127.0.0.1", "CONTENT_SUPPLY_DB_PORT=3307", "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply", "CONTENT_SUPPLY_DB_USER=content_rw", ] ), encoding="utf-8", ) try: ContentSupplyDbConfig.from_env(env_file=env_file) except ValueError as exc: assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc) else: raise AssertionError("expected missing db env key to fail") def test_database_runtime_writes_source_context_with_db_schema_version(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_json( "run_001", "source_context.json", { "schema_version": "runtime_record.v1", "run_id": "run_001", "demand_content_id": "123", "ext_data": { "evidence_pack": { "pattern_source_system": "pg_pattern_v2", "source_kind": "pattern_itemset", "source_post_id": "post_001", "pattern_execution_id": 581, "mining_config_id": 2081, } }, }, ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_source_contexts`" in sql assert values["schema_version"] == "content_agent.v1" assert values["run_id"] == "run_001" assert values["demand_content_id"] == 123 assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001" assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1" def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_json( "run_001", "pattern_seed_pack.json", { "schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "source_post_id": "post_001", "pattern_execution_id": 581, "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}], "seed_terms": ["爱国情感", "人物故事"], }, ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql assert values["schema_version"] == "content_agent.v1" assert json.loads(values["itemset_ids"]) == [1608352] assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1" def test_database_runtime_appends_jsonl_with_raw_payload(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl( "run_001", "search_queries.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_001", "search_query": "对比分析", "search_query_generation_method": "item_single", "pattern_seed_ref": { "source_field": "seed_terms", "source_index": 0, "seed_term": "对比分析", }, "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"}, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_queries`" in sql assert values["schema_version"] == "content_agent.v1" assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析" assert json.loads(values["raw_payload"])["search_query_id"] == "q_001" def test_database_runtime_upserts_failed_search_query_status(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl( "run_001", "search_queries.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_001", "search_query": "接口失败", "search_query_generation_method": "item_single", "search_query_effect_status": "failed", "raw_payload": { "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_001", "search_query_effect_status": "failed", "query_failure": { "status": "failed", "error_code": "PLATFORM_REQUEST_FAILED", }, }, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_queries`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["search_query_effect_status"] == "failed" payload = json.loads(values["raw_payload"]) assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED" def test_database_runtime_preserves_llm_variant_payload_fields(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl( "run_001", "search_queries.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_002", "search_query": "人物叙事素材", "search_query_generation_method": "llm_variant", "pattern_seed_ref": { "source_field": "seed_terms", "source_index": 0, "seed_term": "人物故事", }, "llm_variant_of": "q_001", "raw_payload": { "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_002", "search_query_generation_method": "llm_variant", "llm_variant_of": "q_001", "llm_input_evidence": {"seed_term": "人物故事"}, "llm_prompt_version": "fake-query-prompt-v1", "llm_generation_model": "fake-query-model", }, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_queries`" in sql assert "llm_variant_of" not in values payload = json.loads(values["raw_payload"]) assert payload["llm_variant_of"] == "q_001" assert payload["llm_input_evidence"]["seed_term"] == "人物故事" assert payload["llm_prompt_version"] == "fake-query-prompt-v1" assert payload["llm_generation_model"] == "fake-query-model" def test_database_runtime_upserts_pattern_recall_evidence(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) # V3 形状: Gemini 直读判定证据(recall_status=judged + evidence_summary 判定字段); # decode 时代 7 列已随 V2 数据归档删除,列过滤必须把混入的旧键挡在 DB 外。 store.append_jsonl( "run_001", "pattern_recall_evidence.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "recall_evidence_id": "recall_001", "content_discovery_id": "content_001", "platform": "douyin", "platform_content_id": "7390000000000000000", "recall_status": "judged", "decode_status": "success", "matched_terms": ["爱国情感"], "evidence_summary": { "judge_status": "ok", "fit_senior_50plus": True, "relevance_score": 0.85, }, "raw_payload": { "platform": "douyin", "judge_reason": "贴题且适合 50+", }, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["recall_evidence_id"] == "recall_001" assert values["recall_status"] == "judged" assert "platform" not in values assert "decode_status" not in values assert "matched_terms" not in values assert json.loads(values["evidence_summary"])["fit_senior_50plus"] is True assert json.loads(values["raw_payload"])["platform"] == "douyin" def test_database_runtime_preserves_p5_rule_decision_fields(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl( "run_001", "rule_decisions.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "decision_id": "d_001", "policy_bundle_id": "policy_bundle_v1", "rule_pack_id": "douyin_content_discovery_rule_pack_v1", "rule_pack_version": "1.0.0", "strategy_version": "V1", "decision_target_type": "content", "decision_target_id": "content_001", "decision_action": "REJECT_CONTENT", "decision_reason_code": "pattern_recall_failed", "search_query_effect_status": "rule_blocked", "score": None, "triggered_blocking_rules": ["gate_pattern_recall_failed"], "scorecard": {"total_score": None, "score_missing": True}, "decision_replay_data": { "policy_bundle_hash": "hash_001", "dispatch_id": "dispatch_content", "effect_mapping_id": "map_hard_gate_reject_rule_blocked", }, "raw_payload": { "decision_id": "d_001", "search_query_effect_status": "rule_blocked", "decision_replay_data": { "policy_bundle_hash": "hash_001", "dispatch_id": "dispatch_content", }, }, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_rule_decisions`" in sql assert values["search_query_effect_status"] == "rule_blocked" assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"] assert json.loads(values["scorecard"])["score_missing"] is True assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content" assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001" def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.append_jsonl( "run_001", "search_clues.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "clue_id": "clue_001", "search_query_id": "q_001", "search_query": "爱国情感", "discovery_start_source": "pattern_seed", "previous_discovery_step": "search_query_generated", "result_count": 1, "pooled_content_count": 0, "review_content_count": 0, "pending_content_count": 0, "rejected_content_count": 1, "search_query_effect_status": "rule_blocked", "query_aggregation_id": "agg_query_rule_blocked", "walk_next_step": "stop_search_query", "raw_payload": { "clue_id": "clue_001", "query_aggregation_id": "agg_query_rule_blocked", "effect_status_counts": {"rule_blocked": 1}, }, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_search_clues`" in sql assert "query_aggregation_id" not in values assert values["search_query_effect_status"] == "rule_blocked" assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked" def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) query_failure = { "search_query_id": "q_001", "search_query": "接口失败", "search_query_generation_method": "item_single", "status": "failed", "error_code": "PLATFORM_REQUEST_FAILED", "message": "platform query failed", } store.append_jsonl( "run_001", "search_clues.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "clue_id": "clue_001", "search_query_id": "q_001", "search_query": "接口失败", "discovery_start_source": "pattern_itemset", "previous_discovery_step": "pattern_search_query", "result_count": 0, "pooled_content_count": 0, "review_content_count": 0, "pending_content_count": 0, "rejected_content_count": 0, "search_query_effect_status": "failed", "query_aggregation_id": "platform_query_failure", "walk_next_step": "stop_search_query", "raw_payload": { "clue_id": "clue_001", "query_failure": query_failure, }, } ], ) store.append_jsonl( "run_001", "run_events.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "event_id": "evt_001", "event_type": "platform_query_failed", "status": "failed", "input_ref": "search_queries.jsonl:q_001", "output_ref": "search_clues.jsonl", "error_code": "PLATFORM_REQUEST_FAILED", "message": "platform query failed", "raw_payload": { "event_id": "evt_001", "query_failure": query_failure, }, } ], ) clue_sql, clue_params = connection.statements[-2] clue_values = _insert_values(clue_sql, clue_params) assert "INSERT INTO `content_agent_search_clues`" in clue_sql assert "ON DUPLICATE KEY UPDATE" in clue_sql assert clue_values["search_query_effect_status"] == "failed" assert clue_values["walk_next_step"] == "stop_search_query" assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001" event_sql, event_params = connection.statements[-1] event_values = _insert_values(event_sql, event_params) assert "INSERT INTO `content_agent_run_events`" in event_sql assert "ON DUPLICATE KEY UPDATE" in event_sql assert event_values["event_type"] == "platform_query_failed" assert event_values["status"] == "failed" assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED" def test_database_runtime_writes_publish_jobs_db_only_records(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_publish_jobs( "run_001", "policy_run_001", [ { "publish_job_id": "publish_job_001", "platform_content_id": "7390000000000000000", "job_status": "created", "trigger_mode": "manual_review", "request_payload": { "decision_id": "decision_001", "source_path_record_ids": ["path_001"], }, "response_payload": {}, } ], ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_publish_jobs`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["run_id"] == "run_001" assert values["policy_run_id"] == "policy_run_001" assert values["publish_job_id"] == "publish_job_001" assert values["job_status"] == "created" assert values["trigger_mode"] == "manual_review" assert json.loads(values["request_payload"])["decision_id"] == "decision_001" def test_database_runtime_writes_author_assets(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_author_assets( [ { "author_asset_id": "author_asset_001", "platform": "douyin", "platform_author_id": "author_001", "author_display_name": "作者一号", "asset_status": "active", "source_type": "runtime_author_work", "validation_status": "validated", "eligible_as_source": 1, "content_tags": ["人物故事"], "source_run_id": "run_001", "source_policy_run_id": "policy_run_001", "profile_snapshot": {"sample_count": 9}, "evidence_refs": {"decision_ids": ["d_001"]}, "raw_payload": {"author_asset_id": "author_asset_001"}, } ] ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_author_assets`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["author_asset_id"] == "author_asset_001" assert values["platform_author_id"] == "author_001" assert values["eligible_as_source"] == 1 assert json.loads(values["content_tags"]) == ["人物故事"] assert json.loads(values["profile_snapshot"])["sample_count"] == 9 assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"] def test_database_runtime_writes_author_asset_roles(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_author_asset_roles( [ { "author_asset_id": "author_asset_001", "role": "source_seed", "role_status": "active", "role_reason_code": "p7_author_asset_eligible", "assigned_by": "system", "source_run_id": "run_001", "raw_payload": {"role": "source_seed"}, } ] ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_author_asset_roles`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["author_asset_id"] == "author_asset_001" assert values["role"] == "source_seed" assert values["assigned_by"] == "system" assert json.loads(values["raw_payload"])["role"] == "source_seed" def test_database_runtime_writes_search_clue_assets(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_search_clue_assets( [ { "search_clue_asset_id": "search_clue_asset_001", "platform": "douyin", "clue_type": "search_query", "normalized_clue_text": "银发旅行", "display_clue_text": "银发旅行", "promotion_status": "promoted", "reusable_priority": 1, "can_seed_next_run": 1, "first_seen_run_id": "run_001", "first_seen_policy_run_id": "policy_run_001", "summary_metrics": {"pooled_content_count": 1}, "raw_payload": {"promotion_reason": "success_search_clue"}, } ] ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_search_clue_assets`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["search_clue_asset_id"] == "search_clue_asset_001" assert values["can_seed_next_run"] == 1 assert json.loads(values["summary_metrics"])["pooled_content_count"] == 1 def test_database_runtime_writes_search_clue_asset_evidence(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.write_search_clue_asset_evidence( [ { "evidence_id": "search_clue_evidence_001", "search_clue_asset_id": "search_clue_asset_001", "run_id": "run_001", "policy_run_id": "policy_run_001", "clue_id": "clue_001", "search_query_id": "q_001", "pooled_content_count": 1, "source_path_record_ids": ["path_001"], "decision_ids": ["decision_001"], "performance_feedback_refs": [], "raw_payload": {"clue_id": "clue_001"}, } ] ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_search_clue_asset_evidence`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["schema_version"] == "content_agent.v1" assert values["run_id"] == "run_001" assert values["clue_id"] == "clue_001" assert json.loads(values["source_path_record_ids"]) == ["path_001"] assert json.loads(values["decision_ids"]) == ["decision_001"] def test_database_runtime_update_final_output_upserts_validation_status(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.update_json( "run_001", "final_output.json", { "schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "summary": {"run_path_complete": True}, "validation_status": "pass", }, ) sql, params = connection.statements[-1] values = _insert_values(sql, params) assert "INSERT INTO `content_agent_final_outputs`" in sql assert "ON DUPLICATE KEY UPDATE" in sql assert values["validation_status"] == "pass" assert json.loads(values["summary"])["run_path_complete"] is True def test_database_runtime_reads_performance_feedback_payloads(): connection = FakeConnection() connection.select_all_result = [ { "raw_payload": json.dumps( { "run_id": "run_001", "policy_run_id": "policy_run_001", "feedback_id": "feedback_001", "completion_rate": 0.8, } ) } ] store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) rows = store.read_performance_feedback("run_001", "policy_run_001") sql, params = connection.statements[-1] assert "FROM `content_agent_performance_feedback`" in sql assert params == ["run_001", "policy_run_001"] assert rows[0]["feedback_id"] == "feedback_001" assert rows[0]["raw_payload"]["completion_rate"] == 0.8 def test_database_runtime_read_jsonl_reconstructs_runtime_payload(): connection = FakeConnection() connection.select_all_result = [ { "raw_payload": json.dumps( { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_001", } ) } ] store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) rows = store.read_jsonl("run_001", "search_queries.jsonl") assert rows[0]["search_query_id"] == "q_001" assert rows[0]["raw_payload"]["search_query_id"] == "q_001" def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) try: store.append_jsonl( "run_001", "search_queries.jsonl", [ { "record_schema_version": "runtime_record.v1", "run_id": "run_001", "policy_run_id": "policy_run_001", "search_query_id": "q_001", "search_query": "对比分析", "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]}, } ], ) except ValueError as exc: assert "forbidden key" in str(exc) else: raise AssertionError("expected forbidden raw_payload key to be rejected") def test_database_runtime_update_run_record_ignores_empty_sanitized_updates(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.update_run_record("run_001", {"unknown_field": "ignored", "status": None}) assert connection.statements == [] def test_database_runtime_update_run_record_persists_platform_failure_detail(): connection = FakeConnection() store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection) store.update_run_record( "run_001", { "status": "failed", "error_code": "PLATFORM_REQUEST_FAILED", "error_detail": { "query_failures": [ { "search_query_id": "q_001", "status": "failed", "error_code": "PLATFORM_REQUEST_FAILED", } ] }, }, ) sql, params = connection.statements[-1] assert "UPDATE `content_agent_runs` SET" in sql assert "`error_detail` = %s" in sql assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001" assert params[-1] == "run_001" def test_business_modules_do_not_import_or_name_database_tables(): root = Path("content_agent/business_modules") text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py")) assert not re.search( r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_", text, ) def _config(): return ContentSupplyDbConfig( host="127.0.0.1", port=3306, user="content_rw", password="dummy_password", database="content-deconstruction-supply", ) def _insert_values(sql, params): match = re.search(r"\((.*?)\) VALUES", sql) assert match, sql columns = [part.strip().strip("`") for part in match.group(1).split(",")] return dict(zip(columns, params))