test_database_runtime.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. import json
  2. import re
  3. from pathlib import Path
  4. from content_agent.integrations.database_runtime import (
  5. ContentSupplyDbConfig,
  6. DatabaseRuntimeStore,
  7. )
  8. class FakeCursor:
  9. def __init__(self, connection):
  10. self.connection = connection
  11. self._one = None
  12. self._all = []
  13. def __enter__(self):
  14. return self
  15. def __exit__(self, *_args):
  16. return None
  17. def execute(self, sql, params=None):
  18. self.connection.statements.append((sql, list(params or [])))
  19. if sql.startswith("SELECT COUNT(*)"):
  20. self._one = {"cnt": 0}
  21. self._all = []
  22. elif sql.startswith("SELECT"):
  23. self._one = self.connection.select_one_result
  24. self._all = self.connection.select_all_result
  25. def fetchone(self):
  26. return self._one
  27. def fetchall(self):
  28. return self._all
  29. class FakeConnection:
  30. def __init__(self):
  31. self.statements = []
  32. self.commit_count = 0
  33. self.select_one_result = None
  34. self.select_all_result = []
  35. def __enter__(self):
  36. return self
  37. def __exit__(self, *_args):
  38. return None
  39. def cursor(self):
  40. return FakeCursor(self)
  41. def commit(self):
  42. self.commit_count += 1
  43. def test_content_supply_db_config_reads_env_file(tmp_path):
  44. env_file = tmp_path / ".env"
  45. env_file.write_text(
  46. "\n".join(
  47. [
  48. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  49. "CONTENT_SUPPLY_DB_PORT=3307",
  50. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  51. "CONTENT_SUPPLY_DB_USER=content_rw",
  52. "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
  53. ]
  54. ),
  55. encoding="utf-8",
  56. )
  57. config = ContentSupplyDbConfig.from_env(env_file=env_file)
  58. assert config.host == "127.0.0.1"
  59. assert config.port == 3307
  60. assert config.database == "content-deconstruction-supply"
  61. assert config.user == "content_rw"
  62. def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
  63. env_file = tmp_path / ".env"
  64. env_file.write_text(
  65. "\n".join(
  66. [
  67. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  68. "CONTENT_SUPPLY_DB_PORT=3307",
  69. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  70. "CONTENT_SUPPLY_DB_USER=content_rw",
  71. ]
  72. ),
  73. encoding="utf-8",
  74. )
  75. try:
  76. ContentSupplyDbConfig.from_env(env_file=env_file)
  77. except ValueError as exc:
  78. assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
  79. else:
  80. raise AssertionError("expected missing db env key to fail")
  81. def test_database_runtime_writes_source_context_with_db_schema_version():
  82. connection = FakeConnection()
  83. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  84. store.write_json(
  85. "run_001",
  86. "source_context.json",
  87. {
  88. "schema_version": "runtime_record.v1",
  89. "run_id": "run_001",
  90. "demand_content_id": "123",
  91. "ext_data": {
  92. "evidence_pack": {
  93. "pattern_source_system": "pg_pattern_v2",
  94. "source_kind": "pattern_itemset",
  95. "source_post_id": "post_001",
  96. "pattern_execution_id": 581,
  97. "mining_config_id": 2081,
  98. }
  99. },
  100. },
  101. )
  102. sql, params = connection.statements[-1]
  103. values = _insert_values(sql, params)
  104. assert "INSERT INTO `content_agent_source_contexts`" in sql
  105. assert values["schema_version"] == "content_agent.v1"
  106. assert values["run_id"] == "run_001"
  107. assert values["demand_content_id"] == 123
  108. assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
  109. assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
  110. def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
  111. connection = FakeConnection()
  112. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  113. store.write_json(
  114. "run_001",
  115. "pattern_seed_pack.json",
  116. {
  117. "schema_version": "runtime_record.v1",
  118. "run_id": "run_001",
  119. "policy_run_id": "policy_run_001",
  120. "source_post_id": "post_001",
  121. "pattern_execution_id": 581,
  122. "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
  123. "seed_terms": ["爱国情感", "人物故事"],
  124. },
  125. )
  126. sql, params = connection.statements[-1]
  127. values = _insert_values(sql, params)
  128. assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
  129. assert values["schema_version"] == "content_agent.v1"
  130. assert json.loads(values["itemset_ids"]) == [1608352]
  131. assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
  132. def test_database_runtime_appends_jsonl_with_raw_payload():
  133. connection = FakeConnection()
  134. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  135. store.append_jsonl(
  136. "run_001",
  137. "search_queries.jsonl",
  138. [
  139. {
  140. "record_schema_version": "runtime_record.v1",
  141. "run_id": "run_001",
  142. "policy_run_id": "policy_run_001",
  143. "search_query_id": "q_001",
  144. "search_query": "对比分析",
  145. "search_query_generation_method": "item_single",
  146. "pattern_seed_ref": {
  147. "source_field": "seed_terms",
  148. "source_index": 0,
  149. "seed_term": "对比分析",
  150. },
  151. "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
  152. }
  153. ],
  154. )
  155. sql, params = connection.statements[-1]
  156. values = _insert_values(sql, params)
  157. assert "INSERT INTO `content_agent_queries`" in sql
  158. assert values["schema_version"] == "content_agent.v1"
  159. assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
  160. assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
  161. def test_database_runtime_preserves_llm_variant_payload_fields():
  162. connection = FakeConnection()
  163. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  164. store.append_jsonl(
  165. "run_001",
  166. "search_queries.jsonl",
  167. [
  168. {
  169. "record_schema_version": "runtime_record.v1",
  170. "run_id": "run_001",
  171. "policy_run_id": "policy_run_001",
  172. "search_query_id": "q_002",
  173. "search_query": "人物叙事素材",
  174. "search_query_generation_method": "llm_variant",
  175. "pattern_seed_ref": {
  176. "source_field": "seed_terms",
  177. "source_index": 0,
  178. "seed_term": "人物故事",
  179. },
  180. "llm_variant_of": "q_001",
  181. "raw_payload": {
  182. "run_id": "run_001",
  183. "policy_run_id": "policy_run_001",
  184. "search_query_id": "q_002",
  185. "search_query_generation_method": "llm_variant",
  186. "llm_variant_of": "q_001",
  187. "llm_input_evidence": {"seed_term": "人物故事"},
  188. "llm_prompt_version": "fake-query-prompt-v1",
  189. "llm_generation_model": "fake-query-model",
  190. },
  191. }
  192. ],
  193. )
  194. sql, params = connection.statements[-1]
  195. values = _insert_values(sql, params)
  196. assert "INSERT INTO `content_agent_queries`" in sql
  197. assert "llm_variant_of" not in values
  198. payload = json.loads(values["raw_payload"])
  199. assert payload["llm_variant_of"] == "q_001"
  200. assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
  201. assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
  202. assert payload["llm_generation_model"] == "fake-query-model"
  203. def test_database_runtime_upserts_pattern_recall_evidence():
  204. connection = FakeConnection()
  205. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  206. store.append_jsonl(
  207. "run_001",
  208. "pattern_recall_evidence.jsonl",
  209. [
  210. {
  211. "record_schema_version": "runtime_record.v1",
  212. "run_id": "run_001",
  213. "policy_run_id": "policy_run_001",
  214. "recall_evidence_id": "recall_001",
  215. "content_discovery_id": "content_001",
  216. "platform": "douyin",
  217. "platform_content_id": "7390000000000000000",
  218. "decode_status": "success",
  219. "decode_task_id": "decode_task_001",
  220. "recall_status": "matched",
  221. "matched_terms": ["爱国情感"],
  222. "matched_category_paths": [
  223. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  224. ],
  225. "match_paths_request": {"source_type": "实质"},
  226. "match_paths_response": {"data": []},
  227. "evidence_summary": {
  228. "primary_matched_category_path": (
  229. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  230. )
  231. },
  232. "raw_payload": {
  233. "platform": "douyin",
  234. "primary_matched_category_path": (
  235. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  236. ),
  237. },
  238. }
  239. ],
  240. )
  241. sql, params = connection.statements[-1]
  242. values = _insert_values(sql, params)
  243. assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
  244. assert "ON DUPLICATE KEY UPDATE" in sql
  245. assert values["schema_version"] == "content_agent.v1"
  246. assert values["recall_evidence_id"] == "recall_001"
  247. assert "platform" not in values
  248. assert "primary_matched_category_path" not in values
  249. assert json.loads(values["matched_terms"]) == ["爱国情感"]
  250. assert json.loads(values["raw_payload"])["platform"] == "douyin"
  251. def test_database_runtime_preserves_p5_rule_decision_fields():
  252. connection = FakeConnection()
  253. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  254. store.append_jsonl(
  255. "run_001",
  256. "rule_decisions.jsonl",
  257. [
  258. {
  259. "record_schema_version": "runtime_record.v1",
  260. "run_id": "run_001",
  261. "policy_run_id": "policy_run_001",
  262. "decision_id": "d_001",
  263. "policy_bundle_id": "policy_bundle_v1",
  264. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  265. "rule_pack_version": "1.0.0",
  266. "strategy_version": "V1",
  267. "decision_target_type": "content",
  268. "decision_target_id": "content_001",
  269. "decision_action": "REJECT_CONTENT",
  270. "decision_reason_code": "pattern_recall_failed",
  271. "search_query_effect_status": "rule_blocked",
  272. "score": None,
  273. "triggered_blocking_rules": ["gate_pattern_recall_failed"],
  274. "scorecard": {"total_score": None, "score_missing": True},
  275. "decision_replay_data": {
  276. "policy_bundle_hash": "hash_001",
  277. "dispatch_id": "dispatch_content",
  278. "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
  279. },
  280. "raw_payload": {
  281. "decision_id": "d_001",
  282. "search_query_effect_status": "rule_blocked",
  283. "decision_replay_data": {
  284. "policy_bundle_hash": "hash_001",
  285. "dispatch_id": "dispatch_content",
  286. },
  287. },
  288. }
  289. ],
  290. )
  291. sql, params = connection.statements[-1]
  292. values = _insert_values(sql, params)
  293. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  294. assert values["search_query_effect_status"] == "rule_blocked"
  295. assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
  296. assert json.loads(values["scorecard"])["score_missing"] is True
  297. assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
  298. assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
  299. def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
  300. connection = FakeConnection()
  301. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  302. store.append_jsonl(
  303. "run_001",
  304. "search_clues.jsonl",
  305. [
  306. {
  307. "record_schema_version": "runtime_record.v1",
  308. "run_id": "run_001",
  309. "policy_run_id": "policy_run_001",
  310. "clue_id": "clue_001",
  311. "search_query_id": "q_001",
  312. "search_query": "爱国情感",
  313. "discovery_start_source": "pattern_seed",
  314. "previous_discovery_step": "search_query_generated",
  315. "result_count": 1,
  316. "pooled_content_count": 0,
  317. "review_content_count": 0,
  318. "pending_content_count": 0,
  319. "rejected_content_count": 1,
  320. "search_query_effect_status": "rule_blocked",
  321. "query_aggregation_id": "agg_query_rule_blocked",
  322. "walk_next_step": "stop_search_query",
  323. "raw_payload": {
  324. "clue_id": "clue_001",
  325. "query_aggregation_id": "agg_query_rule_blocked",
  326. "effect_status_counts": {"rule_blocked": 1},
  327. },
  328. }
  329. ],
  330. )
  331. sql, params = connection.statements[-1]
  332. values = _insert_values(sql, params)
  333. assert "INSERT INTO `content_agent_search_clues`" in sql
  334. assert "query_aggregation_id" not in values
  335. assert values["search_query_effect_status"] == "rule_blocked"
  336. assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
  337. def test_database_runtime_writes_publish_jobs_db_only_records():
  338. connection = FakeConnection()
  339. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  340. store.write_publish_jobs(
  341. "run_001",
  342. "policy_run_001",
  343. [
  344. {
  345. "publish_job_id": "publish_job_001",
  346. "platform_content_id": "7390000000000000000",
  347. "job_status": "created",
  348. "trigger_mode": "manual_review",
  349. "request_payload": {
  350. "decision_id": "decision_001",
  351. "source_path_record_ids": ["path_001"],
  352. },
  353. "response_payload": {},
  354. }
  355. ],
  356. )
  357. sql, params = connection.statements[-1]
  358. values = _insert_values(sql, params)
  359. assert "INSERT INTO `content_agent_publish_jobs`" in sql
  360. assert "ON DUPLICATE KEY UPDATE" in sql
  361. assert values["schema_version"] == "content_agent.v1"
  362. assert values["run_id"] == "run_001"
  363. assert values["policy_run_id"] == "policy_run_001"
  364. assert values["publish_job_id"] == "publish_job_001"
  365. assert values["job_status"] == "created"
  366. assert values["trigger_mode"] == "manual_review"
  367. assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
  368. def test_database_runtime_writes_author_assets():
  369. connection = FakeConnection()
  370. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  371. store.write_author_assets(
  372. [
  373. {
  374. "author_asset_id": "author_asset_001",
  375. "platform": "douyin",
  376. "platform_author_id": "author_001",
  377. "author_display_name": "作者一号",
  378. "asset_status": "active",
  379. "source_type": "runtime_author_work",
  380. "validation_status": "validated",
  381. "eligible_as_source": 1,
  382. "content_tags": ["人物故事"],
  383. "source_run_id": "run_001",
  384. "source_policy_run_id": "policy_run_001",
  385. "profile_snapshot": {"sample_count": 9},
  386. "evidence_refs": {"decision_ids": ["d_001"]},
  387. "raw_payload": {"author_asset_id": "author_asset_001"},
  388. }
  389. ]
  390. )
  391. sql, params = connection.statements[-1]
  392. values = _insert_values(sql, params)
  393. assert "INSERT INTO `content_agent_author_assets`" in sql
  394. assert "ON DUPLICATE KEY UPDATE" in sql
  395. assert values["schema_version"] == "content_agent.v1"
  396. assert values["author_asset_id"] == "author_asset_001"
  397. assert values["platform_author_id"] == "author_001"
  398. assert values["eligible_as_source"] == 1
  399. assert json.loads(values["content_tags"]) == ["人物故事"]
  400. assert json.loads(values["profile_snapshot"])["sample_count"] == 9
  401. assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
  402. def test_database_runtime_writes_author_asset_roles():
  403. connection = FakeConnection()
  404. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  405. store.write_author_asset_roles(
  406. [
  407. {
  408. "author_asset_id": "author_asset_001",
  409. "role": "source_seed",
  410. "role_status": "active",
  411. "role_reason_code": "p7_author_asset_eligible",
  412. "assigned_by": "system",
  413. "source_run_id": "run_001",
  414. "raw_payload": {"role": "source_seed"},
  415. }
  416. ]
  417. )
  418. sql, params = connection.statements[-1]
  419. values = _insert_values(sql, params)
  420. assert "INSERT INTO `content_agent_author_asset_roles`" in sql
  421. assert "ON DUPLICATE KEY UPDATE" in sql
  422. assert values["schema_version"] == "content_agent.v1"
  423. assert values["author_asset_id"] == "author_asset_001"
  424. assert values["role"] == "source_seed"
  425. assert values["assigned_by"] == "system"
  426. assert json.loads(values["raw_payload"])["role"] == "source_seed"
  427. def test_database_runtime_update_final_output_upserts_validation_status():
  428. connection = FakeConnection()
  429. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  430. store.update_json(
  431. "run_001",
  432. "final_output.json",
  433. {
  434. "schema_version": "runtime_record.v1",
  435. "run_id": "run_001",
  436. "policy_run_id": "policy_run_001",
  437. "summary": {"run_path_complete": True},
  438. "validation_status": "pass",
  439. },
  440. )
  441. sql, params = connection.statements[-1]
  442. values = _insert_values(sql, params)
  443. assert "INSERT INTO `content_agent_final_outputs`" in sql
  444. assert "ON DUPLICATE KEY UPDATE" in sql
  445. assert values["validation_status"] == "pass"
  446. assert json.loads(values["summary"])["run_path_complete"] is True
  447. def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
  448. connection = FakeConnection()
  449. connection.select_all_result = [
  450. {
  451. "raw_payload": json.dumps(
  452. {
  453. "record_schema_version": "runtime_record.v1",
  454. "run_id": "run_001",
  455. "policy_run_id": "policy_run_001",
  456. "search_query_id": "q_001",
  457. }
  458. )
  459. }
  460. ]
  461. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  462. rows = store.read_jsonl("run_001", "search_queries.jsonl")
  463. assert rows[0]["search_query_id"] == "q_001"
  464. assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
  465. def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
  466. connection = FakeConnection()
  467. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  468. try:
  469. store.append_jsonl(
  470. "run_001",
  471. "search_queries.jsonl",
  472. [
  473. {
  474. "record_schema_version": "runtime_record.v1",
  475. "run_id": "run_001",
  476. "policy_run_id": "policy_run_001",
  477. "search_query_id": "q_001",
  478. "search_query": "对比分析",
  479. "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
  480. }
  481. ],
  482. )
  483. except ValueError as exc:
  484. assert "forbidden key" in str(exc)
  485. else:
  486. raise AssertionError("expected forbidden raw_payload key to be rejected")
  487. def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
  488. connection = FakeConnection()
  489. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  490. store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
  491. assert connection.statements == []
  492. def test_business_modules_do_not_import_or_name_database_tables():
  493. root = Path("content_agent/business_modules")
  494. text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
  495. assert not re.search(
  496. r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
  497. text,
  498. )
  499. def _config():
  500. return ContentSupplyDbConfig(
  501. host="127.0.0.1",
  502. port=3306,
  503. user="content_rw",
  504. password="dummy_password",
  505. database="content-deconstruction-supply",
  506. )
  507. def _insert_values(sql, params):
  508. match = re.search(r"\((.*?)\) VALUES", sql)
  509. assert match, sql
  510. columns = [part.strip().strip("`") for part in match.group(1).split(",")]
  511. return dict(zip(columns, params))