test_database_runtime.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832
  1. import json
  2. import re
  3. from pathlib import Path
  4. from content_agent.integrations.database_runtime import (
  5. ContentSupplyDbConfig,
  6. DatabaseRuntimeStore,
  7. )
  8. class FakeCursor:
  9. def __init__(self, connection):
  10. self.connection = connection
  11. self._one = None
  12. self._all = []
  13. def __enter__(self):
  14. return self
  15. def __exit__(self, *_args):
  16. return None
  17. def execute(self, sql, params=None):
  18. self.connection.statements.append((sql, list(params or [])))
  19. if sql.startswith("SELECT COUNT(*)"):
  20. self._one = {"cnt": 0}
  21. self._all = []
  22. elif sql.startswith("SELECT"):
  23. self._one = self.connection.select_one_result
  24. self._all = self.connection.select_all_result
  25. def fetchone(self):
  26. return self._one
  27. def fetchall(self):
  28. return self._all
  29. class FakeConnection:
  30. def __init__(self):
  31. self.statements = []
  32. self.commit_count = 0
  33. self.select_one_result = None
  34. self.select_all_result = []
  35. def __enter__(self):
  36. return self
  37. def __exit__(self, *_args):
  38. return None
  39. def cursor(self):
  40. return FakeCursor(self)
  41. def commit(self):
  42. self.commit_count += 1
  43. def test_content_supply_db_config_reads_env_file(tmp_path):
  44. env_file = tmp_path / ".env"
  45. env_file.write_text(
  46. "\n".join(
  47. [
  48. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  49. "CONTENT_SUPPLY_DB_PORT=3307",
  50. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  51. "CONTENT_SUPPLY_DB_USER=content_rw",
  52. "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
  53. ]
  54. ),
  55. encoding="utf-8",
  56. )
  57. config = ContentSupplyDbConfig.from_env(env_file=env_file)
  58. assert config.host == "127.0.0.1"
  59. assert config.port == 3307
  60. assert config.database == "content-deconstruction-supply"
  61. assert config.user == "content_rw"
  62. def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
  63. env_file = tmp_path / ".env"
  64. env_file.write_text(
  65. "\n".join(
  66. [
  67. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  68. "CONTENT_SUPPLY_DB_PORT=3307",
  69. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  70. "CONTENT_SUPPLY_DB_USER=content_rw",
  71. ]
  72. ),
  73. encoding="utf-8",
  74. )
  75. try:
  76. ContentSupplyDbConfig.from_env(env_file=env_file)
  77. except ValueError as exc:
  78. assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
  79. else:
  80. raise AssertionError("expected missing db env key to fail")
  81. def test_database_runtime_writes_source_context_with_db_schema_version():
  82. connection = FakeConnection()
  83. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  84. store.write_json(
  85. "run_001",
  86. "source_context.json",
  87. {
  88. "schema_version": "runtime_record.v1",
  89. "run_id": "run_001",
  90. "demand_content_id": "123",
  91. "ext_data": {
  92. "evidence_pack": {
  93. "pattern_source_system": "pg_pattern_v2",
  94. "source_kind": "pattern_itemset",
  95. "source_post_id": "post_001",
  96. "pattern_execution_id": 581,
  97. "mining_config_id": 2081,
  98. }
  99. },
  100. },
  101. )
  102. sql, params = connection.statements[-1]
  103. values = _insert_values(sql, params)
  104. assert "INSERT INTO `content_agent_source_contexts`" in sql
  105. assert values["schema_version"] == "content_agent.v1"
  106. assert values["run_id"] == "run_001"
  107. assert values["demand_content_id"] == 123
  108. assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
  109. assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
  110. def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
  111. connection = FakeConnection()
  112. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  113. store.write_json(
  114. "run_001",
  115. "pattern_seed_pack.json",
  116. {
  117. "schema_version": "runtime_record.v1",
  118. "run_id": "run_001",
  119. "policy_run_id": "policy_run_001",
  120. "source_post_id": "post_001",
  121. "pattern_execution_id": 581,
  122. "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
  123. "seed_terms": ["爱国情感", "人物故事"],
  124. },
  125. )
  126. sql, params = connection.statements[-1]
  127. values = _insert_values(sql, params)
  128. assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
  129. assert values["schema_version"] == "content_agent.v1"
  130. assert json.loads(values["itemset_ids"]) == [1608352]
  131. assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
  132. def test_database_runtime_appends_jsonl_with_raw_payload():
  133. connection = FakeConnection()
  134. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  135. store.append_jsonl(
  136. "run_001",
  137. "search_queries.jsonl",
  138. [
  139. {
  140. "record_schema_version": "runtime_record.v1",
  141. "run_id": "run_001",
  142. "policy_run_id": "policy_run_001",
  143. "search_query_id": "q_001",
  144. "search_query": "对比分析",
  145. "search_query_generation_method": "item_single",
  146. "pattern_seed_ref": {
  147. "source_field": "seed_terms",
  148. "source_index": 0,
  149. "seed_term": "对比分析",
  150. },
  151. "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
  152. }
  153. ],
  154. )
  155. sql, params = connection.statements[-1]
  156. values = _insert_values(sql, params)
  157. assert "INSERT INTO `content_agent_queries`" in sql
  158. assert values["schema_version"] == "content_agent.v1"
  159. assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
  160. assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
  161. def test_database_runtime_upserts_failed_search_query_status():
  162. connection = FakeConnection()
  163. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  164. store.append_jsonl(
  165. "run_001",
  166. "search_queries.jsonl",
  167. [
  168. {
  169. "record_schema_version": "runtime_record.v1",
  170. "run_id": "run_001",
  171. "policy_run_id": "policy_run_001",
  172. "search_query_id": "q_001",
  173. "search_query": "接口失败",
  174. "search_query_generation_method": "item_single",
  175. "search_query_effect_status": "failed",
  176. "raw_payload": {
  177. "run_id": "run_001",
  178. "policy_run_id": "policy_run_001",
  179. "search_query_id": "q_001",
  180. "search_query_effect_status": "failed",
  181. "query_failure": {
  182. "status": "failed",
  183. "error_code": "PLATFORM_REQUEST_FAILED",
  184. },
  185. },
  186. }
  187. ],
  188. )
  189. sql, params = connection.statements[-1]
  190. values = _insert_values(sql, params)
  191. assert "INSERT INTO `content_agent_queries`" in sql
  192. assert "ON DUPLICATE KEY UPDATE" in sql
  193. assert values["search_query_effect_status"] == "failed"
  194. payload = json.loads(values["raw_payload"])
  195. assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED"
  196. def test_database_runtime_preserves_llm_variant_payload_fields():
  197. connection = FakeConnection()
  198. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  199. store.append_jsonl(
  200. "run_001",
  201. "search_queries.jsonl",
  202. [
  203. {
  204. "record_schema_version": "runtime_record.v1",
  205. "run_id": "run_001",
  206. "policy_run_id": "policy_run_001",
  207. "search_query_id": "q_002",
  208. "search_query": "人物叙事素材",
  209. "search_query_generation_method": "llm_variant",
  210. "pattern_seed_ref": {
  211. "source_field": "seed_terms",
  212. "source_index": 0,
  213. "seed_term": "人物故事",
  214. },
  215. "llm_variant_of": "q_001",
  216. "raw_payload": {
  217. "run_id": "run_001",
  218. "policy_run_id": "policy_run_001",
  219. "search_query_id": "q_002",
  220. "search_query_generation_method": "llm_variant",
  221. "llm_variant_of": "q_001",
  222. "llm_input_evidence": {"seed_term": "人物故事"},
  223. "llm_prompt_version": "fake-query-prompt-v1",
  224. "llm_generation_model": "fake-query-model",
  225. },
  226. }
  227. ],
  228. )
  229. sql, params = connection.statements[-1]
  230. values = _insert_values(sql, params)
  231. assert "INSERT INTO `content_agent_queries`" in sql
  232. assert "llm_variant_of" not in values
  233. payload = json.loads(values["raw_payload"])
  234. assert payload["llm_variant_of"] == "q_001"
  235. assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
  236. assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
  237. assert payload["llm_generation_model"] == "fake-query-model"
  238. def test_database_runtime_upserts_pattern_recall_evidence():
  239. connection = FakeConnection()
  240. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  241. store.append_jsonl(
  242. "run_001",
  243. "pattern_recall_evidence.jsonl",
  244. [
  245. {
  246. "record_schema_version": "runtime_record.v1",
  247. "run_id": "run_001",
  248. "policy_run_id": "policy_run_001",
  249. "recall_evidence_id": "recall_001",
  250. "content_discovery_id": "content_001",
  251. "platform": "douyin",
  252. "platform_content_id": "7390000000000000000",
  253. "decode_status": "success",
  254. "decode_task_id": "decode_task_001",
  255. "recall_status": "matched",
  256. "matched_terms": ["爱国情感"],
  257. "matched_category_paths": [
  258. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  259. ],
  260. "match_paths_request": {"source_type": "实质"},
  261. "match_paths_response": {"data": []},
  262. "evidence_summary": {
  263. "primary_matched_category_path": (
  264. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  265. )
  266. },
  267. "raw_payload": {
  268. "platform": "douyin",
  269. "primary_matched_category_path": (
  270. "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"
  271. ),
  272. },
  273. }
  274. ],
  275. )
  276. sql, params = connection.statements[-1]
  277. values = _insert_values(sql, params)
  278. assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
  279. assert "ON DUPLICATE KEY UPDATE" in sql
  280. assert values["schema_version"] == "content_agent.v1"
  281. assert values["recall_evidence_id"] == "recall_001"
  282. assert "platform" not in values
  283. assert "primary_matched_category_path" not in values
  284. assert json.loads(values["matched_terms"]) == ["爱国情感"]
  285. assert json.loads(values["raw_payload"])["platform"] == "douyin"
  286. def test_database_runtime_preserves_p5_rule_decision_fields():
  287. connection = FakeConnection()
  288. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  289. store.append_jsonl(
  290. "run_001",
  291. "rule_decisions.jsonl",
  292. [
  293. {
  294. "record_schema_version": "runtime_record.v1",
  295. "run_id": "run_001",
  296. "policy_run_id": "policy_run_001",
  297. "decision_id": "d_001",
  298. "policy_bundle_id": "policy_bundle_v1",
  299. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  300. "rule_pack_version": "1.0.0",
  301. "strategy_version": "V1",
  302. "decision_target_type": "content",
  303. "decision_target_id": "content_001",
  304. "decision_action": "REJECT_CONTENT",
  305. "decision_reason_code": "pattern_recall_failed",
  306. "search_query_effect_status": "rule_blocked",
  307. "score": None,
  308. "triggered_blocking_rules": ["gate_pattern_recall_failed"],
  309. "scorecard": {"total_score": None, "score_missing": True},
  310. "decision_replay_data": {
  311. "policy_bundle_hash": "hash_001",
  312. "dispatch_id": "dispatch_content",
  313. "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
  314. },
  315. "raw_payload": {
  316. "decision_id": "d_001",
  317. "search_query_effect_status": "rule_blocked",
  318. "decision_replay_data": {
  319. "policy_bundle_hash": "hash_001",
  320. "dispatch_id": "dispatch_content",
  321. },
  322. },
  323. }
  324. ],
  325. )
  326. sql, params = connection.statements[-1]
  327. values = _insert_values(sql, params)
  328. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  329. assert values["search_query_effect_status"] == "rule_blocked"
  330. assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
  331. assert json.loads(values["scorecard"])["score_missing"] is True
  332. assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
  333. assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
  334. def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
  335. connection = FakeConnection()
  336. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  337. store.append_jsonl(
  338. "run_001",
  339. "search_clues.jsonl",
  340. [
  341. {
  342. "record_schema_version": "runtime_record.v1",
  343. "run_id": "run_001",
  344. "policy_run_id": "policy_run_001",
  345. "clue_id": "clue_001",
  346. "search_query_id": "q_001",
  347. "search_query": "爱国情感",
  348. "discovery_start_source": "pattern_seed",
  349. "previous_discovery_step": "search_query_generated",
  350. "result_count": 1,
  351. "pooled_content_count": 0,
  352. "review_content_count": 0,
  353. "pending_content_count": 0,
  354. "rejected_content_count": 1,
  355. "search_query_effect_status": "rule_blocked",
  356. "query_aggregation_id": "agg_query_rule_blocked",
  357. "walk_next_step": "stop_search_query",
  358. "raw_payload": {
  359. "clue_id": "clue_001",
  360. "query_aggregation_id": "agg_query_rule_blocked",
  361. "effect_status_counts": {"rule_blocked": 1},
  362. },
  363. }
  364. ],
  365. )
  366. sql, params = connection.statements[-1]
  367. values = _insert_values(sql, params)
  368. assert "INSERT INTO `content_agent_search_clues`" in sql
  369. assert "query_aggregation_id" not in values
  370. assert values["search_query_effect_status"] == "rule_blocked"
  371. assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
  372. def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event():
  373. connection = FakeConnection()
  374. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  375. query_failure = {
  376. "search_query_id": "q_001",
  377. "search_query": "接口失败",
  378. "search_query_generation_method": "item_single",
  379. "status": "failed",
  380. "error_code": "PLATFORM_REQUEST_FAILED",
  381. "message": "platform query failed",
  382. }
  383. store.append_jsonl(
  384. "run_001",
  385. "search_clues.jsonl",
  386. [
  387. {
  388. "record_schema_version": "runtime_record.v1",
  389. "run_id": "run_001",
  390. "policy_run_id": "policy_run_001",
  391. "clue_id": "clue_001",
  392. "search_query_id": "q_001",
  393. "search_query": "接口失败",
  394. "discovery_start_source": "pattern_itemset",
  395. "previous_discovery_step": "pattern_search_query",
  396. "result_count": 0,
  397. "pooled_content_count": 0,
  398. "review_content_count": 0,
  399. "pending_content_count": 0,
  400. "rejected_content_count": 0,
  401. "search_query_effect_status": "failed",
  402. "query_aggregation_id": "platform_query_failure",
  403. "walk_next_step": "stop_search_query",
  404. "raw_payload": {
  405. "clue_id": "clue_001",
  406. "query_failure": query_failure,
  407. },
  408. }
  409. ],
  410. )
  411. store.append_jsonl(
  412. "run_001",
  413. "run_events.jsonl",
  414. [
  415. {
  416. "record_schema_version": "runtime_record.v1",
  417. "run_id": "run_001",
  418. "policy_run_id": "policy_run_001",
  419. "event_id": "evt_001",
  420. "event_type": "platform_query_failed",
  421. "status": "failed",
  422. "input_ref": "search_queries.jsonl:q_001",
  423. "output_ref": "search_clues.jsonl",
  424. "error_code": "PLATFORM_REQUEST_FAILED",
  425. "message": "platform query failed",
  426. "raw_payload": {
  427. "event_id": "evt_001",
  428. "query_failure": query_failure,
  429. },
  430. }
  431. ],
  432. )
  433. clue_sql, clue_params = connection.statements[-2]
  434. clue_values = _insert_values(clue_sql, clue_params)
  435. assert "INSERT INTO `content_agent_search_clues`" in clue_sql
  436. assert "ON DUPLICATE KEY UPDATE" in clue_sql
  437. assert clue_values["search_query_effect_status"] == "failed"
  438. assert clue_values["walk_next_step"] == "stop_search_query"
  439. assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001"
  440. event_sql, event_params = connection.statements[-1]
  441. event_values = _insert_values(event_sql, event_params)
  442. assert "INSERT INTO `content_agent_run_events`" in event_sql
  443. assert "ON DUPLICATE KEY UPDATE" in event_sql
  444. assert event_values["event_type"] == "platform_query_failed"
  445. assert event_values["status"] == "failed"
  446. assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED"
  447. def test_database_runtime_writes_publish_jobs_db_only_records():
  448. connection = FakeConnection()
  449. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  450. store.write_publish_jobs(
  451. "run_001",
  452. "policy_run_001",
  453. [
  454. {
  455. "publish_job_id": "publish_job_001",
  456. "platform_content_id": "7390000000000000000",
  457. "job_status": "created",
  458. "trigger_mode": "manual_review",
  459. "request_payload": {
  460. "decision_id": "decision_001",
  461. "source_path_record_ids": ["path_001"],
  462. },
  463. "response_payload": {},
  464. }
  465. ],
  466. )
  467. sql, params = connection.statements[-1]
  468. values = _insert_values(sql, params)
  469. assert "INSERT INTO `content_agent_publish_jobs`" in sql
  470. assert "ON DUPLICATE KEY UPDATE" in sql
  471. assert values["schema_version"] == "content_agent.v1"
  472. assert values["run_id"] == "run_001"
  473. assert values["policy_run_id"] == "policy_run_001"
  474. assert values["publish_job_id"] == "publish_job_001"
  475. assert values["job_status"] == "created"
  476. assert values["trigger_mode"] == "manual_review"
  477. assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
  478. def test_database_runtime_writes_author_assets():
  479. connection = FakeConnection()
  480. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  481. store.write_author_assets(
  482. [
  483. {
  484. "author_asset_id": "author_asset_001",
  485. "platform": "douyin",
  486. "platform_author_id": "author_001",
  487. "author_display_name": "作者一号",
  488. "asset_status": "active",
  489. "source_type": "runtime_author_work",
  490. "validation_status": "validated",
  491. "eligible_as_source": 1,
  492. "content_tags": ["人物故事"],
  493. "source_run_id": "run_001",
  494. "source_policy_run_id": "policy_run_001",
  495. "profile_snapshot": {"sample_count": 9},
  496. "evidence_refs": {"decision_ids": ["d_001"]},
  497. "raw_payload": {"author_asset_id": "author_asset_001"},
  498. }
  499. ]
  500. )
  501. sql, params = connection.statements[-1]
  502. values = _insert_values(sql, params)
  503. assert "INSERT INTO `content_agent_author_assets`" in sql
  504. assert "ON DUPLICATE KEY UPDATE" in sql
  505. assert values["schema_version"] == "content_agent.v1"
  506. assert values["author_asset_id"] == "author_asset_001"
  507. assert values["platform_author_id"] == "author_001"
  508. assert values["eligible_as_source"] == 1
  509. assert json.loads(values["content_tags"]) == ["人物故事"]
  510. assert json.loads(values["profile_snapshot"])["sample_count"] == 9
  511. assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
  512. def test_database_runtime_writes_author_asset_roles():
  513. connection = FakeConnection()
  514. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  515. store.write_author_asset_roles(
  516. [
  517. {
  518. "author_asset_id": "author_asset_001",
  519. "role": "source_seed",
  520. "role_status": "active",
  521. "role_reason_code": "p7_author_asset_eligible",
  522. "assigned_by": "system",
  523. "source_run_id": "run_001",
  524. "raw_payload": {"role": "source_seed"},
  525. }
  526. ]
  527. )
  528. sql, params = connection.statements[-1]
  529. values = _insert_values(sql, params)
  530. assert "INSERT INTO `content_agent_author_asset_roles`" in sql
  531. assert "ON DUPLICATE KEY UPDATE" in sql
  532. assert values["schema_version"] == "content_agent.v1"
  533. assert values["author_asset_id"] == "author_asset_001"
  534. assert values["role"] == "source_seed"
  535. assert values["assigned_by"] == "system"
  536. assert json.loads(values["raw_payload"])["role"] == "source_seed"
  537. def test_database_runtime_writes_search_clue_assets():
  538. connection = FakeConnection()
  539. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  540. store.write_search_clue_assets(
  541. [
  542. {
  543. "search_clue_asset_id": "search_clue_asset_001",
  544. "platform": "douyin",
  545. "clue_type": "search_query",
  546. "normalized_clue_text": "银发旅行",
  547. "display_clue_text": "银发旅行",
  548. "promotion_status": "promoted",
  549. "reusable_priority": 1,
  550. "can_seed_next_run": 1,
  551. "first_seen_run_id": "run_001",
  552. "first_seen_policy_run_id": "policy_run_001",
  553. "summary_metrics": {"pooled_content_count": 1},
  554. "raw_payload": {"promotion_reason": "success_search_clue"},
  555. }
  556. ]
  557. )
  558. sql, params = connection.statements[-1]
  559. values = _insert_values(sql, params)
  560. assert "INSERT INTO `content_agent_search_clue_assets`" in sql
  561. assert "ON DUPLICATE KEY UPDATE" in sql
  562. assert values["schema_version"] == "content_agent.v1"
  563. assert values["search_clue_asset_id"] == "search_clue_asset_001"
  564. assert values["can_seed_next_run"] == 1
  565. assert json.loads(values["summary_metrics"])["pooled_content_count"] == 1
  566. def test_database_runtime_writes_search_clue_asset_evidence():
  567. connection = FakeConnection()
  568. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  569. store.write_search_clue_asset_evidence(
  570. [
  571. {
  572. "evidence_id": "search_clue_evidence_001",
  573. "search_clue_asset_id": "search_clue_asset_001",
  574. "run_id": "run_001",
  575. "policy_run_id": "policy_run_001",
  576. "clue_id": "clue_001",
  577. "search_query_id": "q_001",
  578. "pooled_content_count": 1,
  579. "source_path_record_ids": ["path_001"],
  580. "decision_ids": ["decision_001"],
  581. "performance_feedback_refs": [],
  582. "raw_payload": {"clue_id": "clue_001"},
  583. }
  584. ]
  585. )
  586. sql, params = connection.statements[-1]
  587. values = _insert_values(sql, params)
  588. assert "INSERT INTO `content_agent_search_clue_asset_evidence`" in sql
  589. assert "ON DUPLICATE KEY UPDATE" in sql
  590. assert values["schema_version"] == "content_agent.v1"
  591. assert values["run_id"] == "run_001"
  592. assert values["clue_id"] == "clue_001"
  593. assert json.loads(values["source_path_record_ids"]) == ["path_001"]
  594. assert json.loads(values["decision_ids"]) == ["decision_001"]
  595. def test_database_runtime_update_final_output_upserts_validation_status():
  596. connection = FakeConnection()
  597. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  598. store.update_json(
  599. "run_001",
  600. "final_output.json",
  601. {
  602. "schema_version": "runtime_record.v1",
  603. "run_id": "run_001",
  604. "policy_run_id": "policy_run_001",
  605. "summary": {"run_path_complete": True},
  606. "validation_status": "pass",
  607. },
  608. )
  609. sql, params = connection.statements[-1]
  610. values = _insert_values(sql, params)
  611. assert "INSERT INTO `content_agent_final_outputs`" in sql
  612. assert "ON DUPLICATE KEY UPDATE" in sql
  613. assert values["validation_status"] == "pass"
  614. assert json.loads(values["summary"])["run_path_complete"] is True
  615. def test_database_runtime_reads_performance_feedback_payloads():
  616. connection = FakeConnection()
  617. connection.select_all_result = [
  618. {
  619. "raw_payload": json.dumps(
  620. {
  621. "run_id": "run_001",
  622. "policy_run_id": "policy_run_001",
  623. "feedback_id": "feedback_001",
  624. "completion_rate": 0.8,
  625. }
  626. )
  627. }
  628. ]
  629. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  630. rows = store.read_performance_feedback("run_001", "policy_run_001")
  631. sql, params = connection.statements[-1]
  632. assert "FROM `content_agent_performance_feedback`" in sql
  633. assert params == ["run_001", "policy_run_001"]
  634. assert rows[0]["feedback_id"] == "feedback_001"
  635. assert rows[0]["raw_payload"]["completion_rate"] == 0.8
  636. def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
  637. connection = FakeConnection()
  638. connection.select_all_result = [
  639. {
  640. "raw_payload": json.dumps(
  641. {
  642. "record_schema_version": "runtime_record.v1",
  643. "run_id": "run_001",
  644. "policy_run_id": "policy_run_001",
  645. "search_query_id": "q_001",
  646. }
  647. )
  648. }
  649. ]
  650. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  651. rows = store.read_jsonl("run_001", "search_queries.jsonl")
  652. assert rows[0]["search_query_id"] == "q_001"
  653. assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
  654. def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
  655. connection = FakeConnection()
  656. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  657. try:
  658. store.append_jsonl(
  659. "run_001",
  660. "search_queries.jsonl",
  661. [
  662. {
  663. "record_schema_version": "runtime_record.v1",
  664. "run_id": "run_001",
  665. "policy_run_id": "policy_run_001",
  666. "search_query_id": "q_001",
  667. "search_query": "对比分析",
  668. "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
  669. }
  670. ],
  671. )
  672. except ValueError as exc:
  673. assert "forbidden key" in str(exc)
  674. else:
  675. raise AssertionError("expected forbidden raw_payload key to be rejected")
  676. def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
  677. connection = FakeConnection()
  678. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  679. store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
  680. assert connection.statements == []
  681. def test_database_runtime_update_run_record_persists_platform_failure_detail():
  682. connection = FakeConnection()
  683. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  684. store.update_run_record(
  685. "run_001",
  686. {
  687. "status": "failed",
  688. "error_code": "PLATFORM_REQUEST_FAILED",
  689. "error_detail": {
  690. "query_failures": [
  691. {
  692. "search_query_id": "q_001",
  693. "status": "failed",
  694. "error_code": "PLATFORM_REQUEST_FAILED",
  695. }
  696. ]
  697. },
  698. },
  699. )
  700. sql, params = connection.statements[-1]
  701. assert "UPDATE `content_agent_runs` SET" in sql
  702. assert "`error_detail` = %s" in sql
  703. assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001"
  704. assert params[-1] == "run_001"
  705. def test_business_modules_do_not_import_or_name_database_tables():
  706. root = Path("content_agent/business_modules")
  707. text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
  708. assert not re.search(
  709. r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
  710. text,
  711. )
  712. def _config():
  713. return ContentSupplyDbConfig(
  714. host="127.0.0.1",
  715. port=3306,
  716. user="content_rw",
  717. password="dummy_password",
  718. database="content-deconstruction-supply",
  719. )
  720. def _insert_values(sql, params):
  721. match = re.search(r"\((.*?)\) VALUES", sql)
  722. assert match, sql
  723. columns = [part.strip().strip("`") for part in match.group(1).split(",")]
  724. return dict(zip(columns, params))