| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- import json
- import re
- from pathlib import Path
- from content_agent.integrations.database_runtime import (
- ContentSupplyDbConfig,
- DatabaseRuntimeStore,
- )
- class FakeCursor:
- def __init__(self, connection):
- self.connection = connection
- self._one = None
- self._all = []
- def __enter__(self):
- return self
- def __exit__(self, *_args):
- return None
- def execute(self, sql, params=None):
- self.connection.statements.append((sql, list(params or [])))
- if sql.startswith("SELECT COUNT(*)"):
- self._one = {"cnt": 0}
- self._all = []
- elif sql.startswith("SELECT"):
- self._one = self.connection.select_one_result
- self._all = self.connection.select_all_result
- def fetchone(self):
- return self._one
- def fetchall(self):
- return self._all
- class FakeConnection:
- def __init__(self):
- self.statements = []
- self.commit_count = 0
- self.select_one_result = None
- self.select_all_result = []
- def __enter__(self):
- return self
- def __exit__(self, *_args):
- return None
- def cursor(self):
- return FakeCursor(self)
- def commit(self):
- self.commit_count += 1
- def test_content_supply_db_config_reads_env_file(tmp_path):
- env_file = tmp_path / ".env"
- env_file.write_text(
- "\n".join(
- [
- "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
- "CONTENT_SUPPLY_DB_PORT=3307",
- "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
- "CONTENT_SUPPLY_DB_USER=content_rw",
- "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
- ]
- ),
- encoding="utf-8",
- )
- config = ContentSupplyDbConfig.from_env(env_file=env_file)
- assert config.host == "127.0.0.1"
- assert config.port == 3307
- assert config.database == "content-deconstruction-supply"
- assert config.user == "content_rw"
- def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
- env_file = tmp_path / ".env"
- env_file.write_text(
- "\n".join(
- [
- "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
- "CONTENT_SUPPLY_DB_PORT=3307",
- "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
- "CONTENT_SUPPLY_DB_USER=content_rw",
- ]
- ),
- encoding="utf-8",
- )
- try:
- ContentSupplyDbConfig.from_env(env_file=env_file)
- except ValueError as exc:
- assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
- else:
- raise AssertionError("expected missing db env key to fail")
- def test_database_runtime_writes_source_context_with_db_schema_version():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.write_json(
- "run_001",
- "source_context.json",
- {
- "schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "demand_content_id": "123",
- "ext_data": {
- "evidence_pack": {
- "pattern_source_system": "pg_pattern_v2",
- "source_kind": "pattern_itemset",
- "source_post_id": "post_001",
- "pattern_execution_id": 581,
- "mining_config_id": 2081,
- }
- },
- },
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_source_contexts`" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["run_id"] == "run_001"
- assert values["demand_content_id"] == 123
- assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
- assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
- def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.write_json(
- "run_001",
- "pattern_seed_pack.json",
- {
- "schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "source_post_id": "post_001",
- "pattern_execution_id": 581,
- "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
- "seed_terms": ["爱国情感", "人物故事"],
- },
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert json.loads(values["itemset_ids"]) == [1608352]
- assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
- def test_database_runtime_appends_jsonl_with_raw_payload():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl(
- "run_001",
- "search_queries.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "search_query_id": "q_001",
- "search_query": "对比分析",
- "search_query_generation_method": "item_single",
- "pattern_seed_ref": {
- "source_field": "seed_terms",
- "source_index": 0,
- "seed_term": "对比分析",
- },
- "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_queries`" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
- assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
- def test_database_runtime_preserves_llm_variant_payload_fields():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl(
- "run_001",
- "search_queries.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "search_query_id": "q_002",
- "search_query": "人物叙事素材",
- "search_query_generation_method": "llm_variant",
- "pattern_seed_ref": {
- "source_field": "seed_terms",
- "source_index": 0,
- "seed_term": "人物故事",
- },
- "llm_variant_of": "q_001",
- "raw_payload": {
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "search_query_id": "q_002",
- "search_query_generation_method": "llm_variant",
- "llm_variant_of": "q_001",
- "llm_input_evidence": {"seed_term": "人物故事"},
- "llm_prompt_version": "fake-query-prompt-v1",
- "llm_generation_model": "fake-query-model",
- },
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_queries`" in sql
- assert "llm_variant_of" not in values
- payload = json.loads(values["raw_payload"])
- assert payload["llm_variant_of"] == "q_001"
- assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
- assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
- assert payload["llm_generation_model"] == "fake-query-model"
- def test_database_runtime_upserts_pattern_recall_evidence():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl(
- "run_001",
- "pattern_recall_evidence.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "recall_evidence_id": "recall_001",
- "content_discovery_id": "content_001",
- "platform": "douyin",
- "platform_content_id": "7390000000000000000",
- "decode_status": "success",
- "decode_task_id": "decode_task_001",
- "recall_status": "matched",
- "matched_terms": ["爱国情感"],
- "matched_category_paths": [
- "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
- ],
- "match_paths_request": {"source_type": "实质"},
- "match_paths_response": {"data": []},
- "evidence_summary": {
- "primary_matched_category_path": (
- "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
- )
- },
- "raw_payload": {
- "platform": "douyin",
- "primary_matched_category_path": (
- "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
- ),
- },
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
- assert "ON DUPLICATE KEY UPDATE" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["recall_evidence_id"] == "recall_001"
- assert "platform" not in values
- assert "primary_matched_category_path" not in values
- assert json.loads(values["matched_terms"]) == ["爱国情感"]
- assert json.loads(values["raw_payload"])["platform"] == "douyin"
- def test_database_runtime_preserves_p5_rule_decision_fields():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl(
- "run_001",
- "rule_decisions.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "decision_id": "d_001",
- "policy_bundle_id": "policy_bundle_v1",
- "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "strategy_version": "V1",
- "decision_target_type": "content",
- "decision_target_id": "content_001",
- "decision_action": "REJECT_CONTENT",
- "decision_reason_code": "pattern_recall_failed",
- "search_query_effect_status": "rule_blocked",
- "score": None,
- "triggered_blocking_rules": ["gate_pattern_recall_failed"],
- "scorecard": {"total_score": None, "score_missing": True},
- "decision_replay_data": {
- "policy_bundle_hash": "hash_001",
- "dispatch_id": "dispatch_content",
- "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
- },
- "raw_payload": {
- "decision_id": "d_001",
- "search_query_effect_status": "rule_blocked",
- "decision_replay_data": {
- "policy_bundle_hash": "hash_001",
- "dispatch_id": "dispatch_content",
- },
- },
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_rule_decisions`" in sql
- assert values["search_query_effect_status"] == "rule_blocked"
- assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
- assert json.loads(values["scorecard"])["score_missing"] is True
- assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
- assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
- def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.append_jsonl(
- "run_001",
- "search_clues.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "clue_id": "clue_001",
- "search_query_id": "q_001",
- "search_query": "爱国情感",
- "discovery_start_source": "pattern_seed",
- "previous_discovery_step": "search_query_generated",
- "result_count": 1,
- "pooled_content_count": 0,
- "review_content_count": 0,
- "pending_content_count": 0,
- "rejected_content_count": 1,
- "search_query_effect_status": "rule_blocked",
- "query_aggregation_id": "agg_query_rule_blocked",
- "walk_next_step": "stop_search_query",
- "raw_payload": {
- "clue_id": "clue_001",
- "query_aggregation_id": "agg_query_rule_blocked",
- "effect_status_counts": {"rule_blocked": 1},
- },
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_search_clues`" in sql
- assert "query_aggregation_id" not in values
- assert values["search_query_effect_status"] == "rule_blocked"
- assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
- def test_database_runtime_writes_publish_jobs_db_only_records():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.write_publish_jobs(
- "run_001",
- "policy_run_001",
- [
- {
- "publish_job_id": "publish_job_001",
- "platform_content_id": "7390000000000000000",
- "job_status": "created",
- "trigger_mode": "manual_review",
- "request_payload": {
- "decision_id": "decision_001",
- "source_path_record_ids": ["path_001"],
- },
- "response_payload": {},
- }
- ],
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_publish_jobs`" in sql
- assert "ON DUPLICATE KEY UPDATE" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["run_id"] == "run_001"
- assert values["policy_run_id"] == "policy_run_001"
- assert values["publish_job_id"] == "publish_job_001"
- assert values["job_status"] == "created"
- assert values["trigger_mode"] == "manual_review"
- assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
- def test_database_runtime_writes_author_assets():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.write_author_assets(
- [
- {
- "author_asset_id": "author_asset_001",
- "platform": "douyin",
- "platform_author_id": "author_001",
- "author_display_name": "作者一号",
- "asset_status": "active",
- "source_type": "runtime_author_work",
- "validation_status": "validated",
- "eligible_as_source": 1,
- "content_tags": ["人物故事"],
- "source_run_id": "run_001",
- "source_policy_run_id": "policy_run_001",
- "profile_snapshot": {"sample_count": 9},
- "evidence_refs": {"decision_ids": ["d_001"]},
- "raw_payload": {"author_asset_id": "author_asset_001"},
- }
- ]
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_author_assets`" in sql
- assert "ON DUPLICATE KEY UPDATE" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["author_asset_id"] == "author_asset_001"
- assert values["platform_author_id"] == "author_001"
- assert values["eligible_as_source"] == 1
- assert json.loads(values["content_tags"]) == ["人物故事"]
- assert json.loads(values["profile_snapshot"])["sample_count"] == 9
- assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
- def test_database_runtime_writes_author_asset_roles():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.write_author_asset_roles(
- [
- {
- "author_asset_id": "author_asset_001",
- "role": "source_seed",
- "role_status": "active",
- "role_reason_code": "p7_author_asset_eligible",
- "assigned_by": "system",
- "source_run_id": "run_001",
- "raw_payload": {"role": "source_seed"},
- }
- ]
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_author_asset_roles`" in sql
- assert "ON DUPLICATE KEY UPDATE" in sql
- assert values["schema_version"] == "content_agent.v1"
- assert values["author_asset_id"] == "author_asset_001"
- assert values["role"] == "source_seed"
- assert values["assigned_by"] == "system"
- assert json.loads(values["raw_payload"])["role"] == "source_seed"
- def test_database_runtime_update_final_output_upserts_validation_status():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.update_json(
- "run_001",
- "final_output.json",
- {
- "schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "summary": {"run_path_complete": True},
- "validation_status": "pass",
- },
- )
- sql, params = connection.statements[-1]
- values = _insert_values(sql, params)
- assert "INSERT INTO `content_agent_final_outputs`" in sql
- assert "ON DUPLICATE KEY UPDATE" in sql
- assert values["validation_status"] == "pass"
- assert json.loads(values["summary"])["run_path_complete"] is True
- def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
- connection = FakeConnection()
- connection.select_all_result = [
- {
- "raw_payload": json.dumps(
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "search_query_id": "q_001",
- }
- )
- }
- ]
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- rows = store.read_jsonl("run_001", "search_queries.jsonl")
- assert rows[0]["search_query_id"] == "q_001"
- assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
- def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- try:
- store.append_jsonl(
- "run_001",
- "search_queries.jsonl",
- [
- {
- "record_schema_version": "runtime_record.v1",
- "run_id": "run_001",
- "policy_run_id": "policy_run_001",
- "search_query_id": "q_001",
- "search_query": "对比分析",
- "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
- }
- ],
- )
- except ValueError as exc:
- assert "forbidden key" in str(exc)
- else:
- raise AssertionError("expected forbidden raw_payload key to be rejected")
- def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
- connection = FakeConnection()
- store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
- store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
- assert connection.statements == []
- def test_business_modules_do_not_import_or_name_database_tables():
- root = Path("content_agent/business_modules")
- text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
- assert not re.search(
- r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
- text,
- )
- def _config():
- return ContentSupplyDbConfig(
- host="127.0.0.1",
- port=3306,
- user="content_rw",
- password="dummy_password",
- database="content-deconstruction-supply",
- )
- def _insert_values(sql, params):
- match = re.search(r"\((.*?)\) VALUES", sql)
- assert match, sql
- columns = [part.strip().strip("`") for part in match.group(1).split(",")]
- return dict(zip(columns, params))
|