test_database_runtime.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. import json
  2. import re
  3. from pathlib import Path
  4. from content_agent.integrations.database_runtime import (
  5. ContentSupplyDbConfig,
  6. DatabaseRuntimeStore,
  7. )
  8. class FakeCursor:
  9. def __init__(self, connection):
  10. self.connection = connection
  11. self._one = None
  12. self._all = []
  13. def __enter__(self):
  14. return self
  15. def __exit__(self, *_args):
  16. return None
  17. def execute(self, sql, params=None):
  18. self.connection.statements.append((sql, list(params or [])))
  19. if sql.startswith("SELECT COUNT(*)"):
  20. self._one = {"cnt": 0}
  21. self._all = []
  22. elif sql.startswith("SELECT"):
  23. self._one = self.connection.select_one_result
  24. self._all = self.connection.select_all_result
  25. def fetchone(self):
  26. return self._one
  27. def fetchall(self):
  28. return self._all
  29. class FakeConnection:
  30. def __init__(self):
  31. self.statements = []
  32. self.commit_count = 0
  33. self.select_one_result = None
  34. self.select_all_result = []
  35. def __enter__(self):
  36. return self
  37. def __exit__(self, *_args):
  38. return None
  39. def cursor(self):
  40. return FakeCursor(self)
  41. def commit(self):
  42. self.commit_count += 1
  43. def test_content_supply_db_config_reads_env_file(tmp_path):
  44. env_file = tmp_path / ".env"
  45. env_file.write_text(
  46. "\n".join(
  47. [
  48. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  49. "CONTENT_SUPPLY_DB_PORT=3307",
  50. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  51. "CONTENT_SUPPLY_DB_USER=content_rw",
  52. "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
  53. ]
  54. ),
  55. encoding="utf-8",
  56. )
  57. config = ContentSupplyDbConfig.from_env(env_file=env_file)
  58. assert config.host == "127.0.0.1"
  59. assert config.port == 3307
  60. assert config.database == "content-deconstruction-supply"
  61. assert config.user == "content_rw"
  62. def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
  63. env_file = tmp_path / ".env"
  64. env_file.write_text(
  65. "\n".join(
  66. [
  67. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  68. "CONTENT_SUPPLY_DB_PORT=3307",
  69. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  70. "CONTENT_SUPPLY_DB_USER=content_rw",
  71. ]
  72. ),
  73. encoding="utf-8",
  74. )
  75. try:
  76. ContentSupplyDbConfig.from_env(env_file=env_file)
  77. except ValueError as exc:
  78. assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
  79. else:
  80. raise AssertionError("expected missing db env key to fail")
  81. def test_database_runtime_writes_source_context_with_db_schema_version():
  82. connection = FakeConnection()
  83. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  84. store.write_json(
  85. "run_001",
  86. "source_context.json",
  87. {
  88. "schema_version": "runtime_record.v1",
  89. "run_id": "run_001",
  90. "demand_content_id": "123",
  91. "ext_data": {
  92. "evidence_pack": {
  93. "pattern_source_system": "pg_pattern_v2",
  94. "source_kind": "pattern_itemset",
  95. "source_post_id": "post_001",
  96. "pattern_execution_id": 581,
  97. "mining_config_id": 2081,
  98. }
  99. },
  100. },
  101. )
  102. sql, params = connection.statements[-1]
  103. values = _insert_values(sql, params)
  104. assert "INSERT INTO `content_agent_source_contexts`" in sql
  105. assert values["schema_version"] == "content_agent.v1"
  106. assert values["run_id"] == "run_001"
  107. assert values["demand_content_id"] == 123
  108. assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
  109. assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
  110. def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
  111. connection = FakeConnection()
  112. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  113. store.write_json(
  114. "run_001",
  115. "pattern_seed_pack.json",
  116. {
  117. "schema_version": "runtime_record.v1",
  118. "run_id": "run_001",
  119. "policy_run_id": "policy_run_001",
  120. "source_post_id": "post_001",
  121. "pattern_execution_id": 581,
  122. "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
  123. "seed_terms": ["爱国情感", "人物故事"],
  124. },
  125. )
  126. sql, params = connection.statements[-1]
  127. values = _insert_values(sql, params)
  128. assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
  129. assert values["schema_version"] == "content_agent.v1"
  130. assert json.loads(values["itemset_ids"]) == [1608352]
  131. assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
  132. def test_database_runtime_appends_jsonl_with_raw_payload():
  133. connection = FakeConnection()
  134. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  135. store.append_jsonl(
  136. "run_001",
  137. "search_queries.jsonl",
  138. [
  139. {
  140. "record_schema_version": "runtime_record.v1",
  141. "run_id": "run_001",
  142. "policy_run_id": "policy_run_001",
  143. "search_query_id": "q_001",
  144. "search_query": "对比分析",
  145. "search_query_generation_method": "item_single",
  146. "pattern_seed_ref": {
  147. "source_field": "seed_terms",
  148. "source_index": 0,
  149. "seed_term": "对比分析",
  150. },
  151. "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
  152. }
  153. ],
  154. )
  155. sql, params = connection.statements[-1]
  156. values = _insert_values(sql, params)
  157. assert "INSERT INTO `content_agent_queries`" in sql
  158. assert values["schema_version"] == "content_agent.v1"
  159. assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
  160. assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
  161. def test_database_runtime_upserts_failed_search_query_status():
  162. connection = FakeConnection()
  163. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  164. store.append_jsonl(
  165. "run_001",
  166. "search_queries.jsonl",
  167. [
  168. {
  169. "record_schema_version": "runtime_record.v1",
  170. "run_id": "run_001",
  171. "policy_run_id": "policy_run_001",
  172. "search_query_id": "q_001",
  173. "search_query": "接口失败",
  174. "search_query_generation_method": "item_single",
  175. "search_query_effect_status": "failed",
  176. "raw_payload": {
  177. "run_id": "run_001",
  178. "policy_run_id": "policy_run_001",
  179. "search_query_id": "q_001",
  180. "search_query_effect_status": "failed",
  181. "query_failure": {
  182. "status": "failed",
  183. "error_code": "PLATFORM_REQUEST_FAILED",
  184. },
  185. },
  186. }
  187. ],
  188. )
  189. sql, params = connection.statements[-1]
  190. values = _insert_values(sql, params)
  191. assert "INSERT INTO `content_agent_queries`" in sql
  192. assert "ON DUPLICATE KEY UPDATE" in sql
  193. assert values["search_query_effect_status"] == "failed"
  194. payload = json.loads(values["raw_payload"])
  195. assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED"
  196. def test_database_runtime_preserves_llm_variant_payload_fields():
  197. connection = FakeConnection()
  198. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  199. store.append_jsonl(
  200. "run_001",
  201. "search_queries.jsonl",
  202. [
  203. {
  204. "record_schema_version": "runtime_record.v1",
  205. "run_id": "run_001",
  206. "policy_run_id": "policy_run_001",
  207. "search_query_id": "q_002",
  208. "search_query": "人物叙事素材",
  209. "search_query_generation_method": "llm_variant",
  210. "pattern_seed_ref": {
  211. "source_field": "seed_terms",
  212. "source_index": 0,
  213. "seed_term": "人物故事",
  214. },
  215. "llm_variant_of": "q_001",
  216. "raw_payload": {
  217. "run_id": "run_001",
  218. "policy_run_id": "policy_run_001",
  219. "search_query_id": "q_002",
  220. "search_query_generation_method": "llm_variant",
  221. "llm_variant_of": "q_001",
  222. "llm_input_evidence": {"seed_term": "人物故事"},
  223. "llm_prompt_version": "fake-query-prompt-v1",
  224. "llm_generation_model": "fake-query-model",
  225. },
  226. }
  227. ],
  228. )
  229. sql, params = connection.statements[-1]
  230. values = _insert_values(sql, params)
  231. assert "INSERT INTO `content_agent_queries`" in sql
  232. assert "llm_variant_of" not in values
  233. payload = json.loads(values["raw_payload"])
  234. assert payload["llm_variant_of"] == "q_001"
  235. assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
  236. assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
  237. assert payload["llm_generation_model"] == "fake-query-model"
  238. def test_database_runtime_upserts_pattern_recall_evidence():
  239. connection = FakeConnection()
  240. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  241. # V3 形状: Gemini 直读判定证据(recall_status=judged + evidence_summary 判定字段);
  242. # decode 时代 7 列已随 V2 数据归档删除,列过滤必须把混入的旧键挡在 DB 外。
  243. store.append_jsonl(
  244. "run_001",
  245. "pattern_recall_evidence.jsonl",
  246. [
  247. {
  248. "record_schema_version": "runtime_record.v1",
  249. "run_id": "run_001",
  250. "policy_run_id": "policy_run_001",
  251. "recall_evidence_id": "recall_001",
  252. "content_discovery_id": "content_001",
  253. "platform": "douyin",
  254. "platform_content_id": "7390000000000000000",
  255. "recall_status": "judged",
  256. "decode_status": "success",
  257. "matched_terms": ["爱国情感"],
  258. "evidence_summary": {
  259. "judge_status": "ok",
  260. "fit_senior_50plus": True,
  261. "relevance_score": 0.85,
  262. },
  263. "raw_payload": {
  264. "platform": "douyin",
  265. "judge_reason": "贴题且适合 50+",
  266. },
  267. }
  268. ],
  269. )
  270. sql, params = connection.statements[-1]
  271. values = _insert_values(sql, params)
  272. assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
  273. assert "ON DUPLICATE KEY UPDATE" in sql
  274. assert values["schema_version"] == "content_agent.v1"
  275. assert values["recall_evidence_id"] == "recall_001"
  276. assert values["recall_status"] == "judged"
  277. assert "platform" not in values
  278. assert "decode_status" not in values
  279. assert "matched_terms" not in values
  280. assert json.loads(values["evidence_summary"])["fit_senior_50plus"] is True
  281. assert json.loads(values["raw_payload"])["platform"] == "douyin"
  282. def test_database_runtime_preserves_p5_rule_decision_fields():
  283. connection = FakeConnection()
  284. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  285. store.append_jsonl(
  286. "run_001",
  287. "rule_decisions.jsonl",
  288. [
  289. {
  290. "record_schema_version": "runtime_record.v1",
  291. "run_id": "run_001",
  292. "policy_run_id": "policy_run_001",
  293. "decision_id": "d_001",
  294. "policy_bundle_id": "policy_bundle_v1",
  295. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  296. "rule_pack_version": "1.0.0",
  297. "strategy_version": "V1",
  298. "decision_target_type": "content",
  299. "decision_target_id": "content_001",
  300. "decision_action": "REJECT_CONTENT",
  301. "decision_reason_code": "pattern_recall_failed",
  302. "search_query_effect_status": "rule_blocked",
  303. "score": None,
  304. "triggered_blocking_rules": ["gate_pattern_recall_failed"],
  305. "scorecard": {"total_score": None, "score_missing": True},
  306. "decision_replay_data": {
  307. "policy_bundle_hash": "hash_001",
  308. "dispatch_id": "dispatch_content",
  309. "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
  310. },
  311. "raw_payload": {
  312. "decision_id": "d_001",
  313. "search_query_effect_status": "rule_blocked",
  314. "decision_replay_data": {
  315. "policy_bundle_hash": "hash_001",
  316. "dispatch_id": "dispatch_content",
  317. },
  318. },
  319. }
  320. ],
  321. )
  322. sql, params = connection.statements[-1]
  323. values = _insert_values(sql, params)
  324. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  325. assert values["search_query_effect_status"] == "rule_blocked"
  326. assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
  327. assert json.loads(values["scorecard"])["score_missing"] is True
  328. assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
  329. assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
  330. def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
  331. connection = FakeConnection()
  332. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  333. store.append_jsonl(
  334. "run_001",
  335. "search_clues.jsonl",
  336. [
  337. {
  338. "record_schema_version": "runtime_record.v1",
  339. "run_id": "run_001",
  340. "policy_run_id": "policy_run_001",
  341. "clue_id": "clue_001",
  342. "search_query_id": "q_001",
  343. "search_query": "爱国情感",
  344. "discovery_start_source": "pattern_seed",
  345. "previous_discovery_step": "search_query_generated",
  346. "result_count": 1,
  347. "pooled_content_count": 0,
  348. "review_content_count": 0,
  349. "pending_content_count": 0,
  350. "rejected_content_count": 1,
  351. "search_query_effect_status": "rule_blocked",
  352. "query_aggregation_id": "agg_query_rule_blocked",
  353. "walk_next_step": "stop_search_query",
  354. "raw_payload": {
  355. "clue_id": "clue_001",
  356. "query_aggregation_id": "agg_query_rule_blocked",
  357. "effect_status_counts": {"rule_blocked": 1},
  358. },
  359. }
  360. ],
  361. )
  362. sql, params = connection.statements[-1]
  363. values = _insert_values(sql, params)
  364. assert "INSERT INTO `content_agent_search_clues`" in sql
  365. assert "query_aggregation_id" not in values
  366. assert values["search_query_effect_status"] == "rule_blocked"
  367. assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
  368. def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event():
  369. connection = FakeConnection()
  370. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  371. query_failure = {
  372. "search_query_id": "q_001",
  373. "search_query": "接口失败",
  374. "search_query_generation_method": "item_single",
  375. "status": "failed",
  376. "error_code": "PLATFORM_REQUEST_FAILED",
  377. "message": "platform query failed",
  378. }
  379. store.append_jsonl(
  380. "run_001",
  381. "search_clues.jsonl",
  382. [
  383. {
  384. "record_schema_version": "runtime_record.v1",
  385. "run_id": "run_001",
  386. "policy_run_id": "policy_run_001",
  387. "clue_id": "clue_001",
  388. "search_query_id": "q_001",
  389. "search_query": "接口失败",
  390. "discovery_start_source": "pattern_itemset",
  391. "previous_discovery_step": "pattern_search_query",
  392. "result_count": 0,
  393. "pooled_content_count": 0,
  394. "review_content_count": 0,
  395. "pending_content_count": 0,
  396. "rejected_content_count": 0,
  397. "search_query_effect_status": "failed",
  398. "query_aggregation_id": "platform_query_failure",
  399. "walk_next_step": "stop_search_query",
  400. "raw_payload": {
  401. "clue_id": "clue_001",
  402. "query_failure": query_failure,
  403. },
  404. }
  405. ],
  406. )
  407. store.append_jsonl(
  408. "run_001",
  409. "run_events.jsonl",
  410. [
  411. {
  412. "record_schema_version": "runtime_record.v1",
  413. "run_id": "run_001",
  414. "policy_run_id": "policy_run_001",
  415. "event_id": "evt_001",
  416. "event_type": "platform_query_failed",
  417. "status": "failed",
  418. "input_ref": "search_queries.jsonl:q_001",
  419. "output_ref": "search_clues.jsonl",
  420. "error_code": "PLATFORM_REQUEST_FAILED",
  421. "message": "platform query failed",
  422. "raw_payload": {
  423. "event_id": "evt_001",
  424. "query_failure": query_failure,
  425. },
  426. }
  427. ],
  428. )
  429. clue_sql, clue_params = connection.statements[-2]
  430. clue_values = _insert_values(clue_sql, clue_params)
  431. assert "INSERT INTO `content_agent_search_clues`" in clue_sql
  432. assert "ON DUPLICATE KEY UPDATE" in clue_sql
  433. assert clue_values["search_query_effect_status"] == "failed"
  434. assert clue_values["walk_next_step"] == "stop_search_query"
  435. assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001"
  436. event_sql, event_params = connection.statements[-1]
  437. event_values = _insert_values(event_sql, event_params)
  438. assert "INSERT INTO `content_agent_run_events`" in event_sql
  439. assert "ON DUPLICATE KEY UPDATE" in event_sql
  440. assert event_values["event_type"] == "platform_query_failed"
  441. assert event_values["status"] == "failed"
  442. assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED"
  443. def test_database_runtime_writes_publish_jobs_db_only_records():
  444. connection = FakeConnection()
  445. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  446. store.write_publish_jobs(
  447. "run_001",
  448. "policy_run_001",
  449. [
  450. {
  451. "publish_job_id": "publish_job_001",
  452. "platform_content_id": "7390000000000000000",
  453. "job_status": "created",
  454. "trigger_mode": "manual_review",
  455. "request_payload": {
  456. "decision_id": "decision_001",
  457. "source_path_record_ids": ["path_001"],
  458. },
  459. "response_payload": {},
  460. }
  461. ],
  462. )
  463. sql, params = connection.statements[-1]
  464. values = _insert_values(sql, params)
  465. assert "INSERT INTO `content_agent_publish_jobs`" in sql
  466. assert "ON DUPLICATE KEY UPDATE" in sql
  467. assert values["schema_version"] == "content_agent.v1"
  468. assert values["run_id"] == "run_001"
  469. assert values["policy_run_id"] == "policy_run_001"
  470. assert values["publish_job_id"] == "publish_job_001"
  471. assert values["job_status"] == "created"
  472. assert values["trigger_mode"] == "manual_review"
  473. assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
  474. def test_database_runtime_writes_author_assets():
  475. connection = FakeConnection()
  476. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  477. store.write_author_assets(
  478. [
  479. {
  480. "author_asset_id": "author_asset_001",
  481. "platform": "douyin",
  482. "platform_author_id": "author_001",
  483. "author_display_name": "作者一号",
  484. "asset_status": "active",
  485. "source_type": "runtime_author_work",
  486. "validation_status": "validated",
  487. "eligible_as_source": 1,
  488. "content_tags": ["人物故事"],
  489. "source_run_id": "run_001",
  490. "source_policy_run_id": "policy_run_001",
  491. "profile_snapshot": {"sample_count": 9},
  492. "evidence_refs": {"decision_ids": ["d_001"]},
  493. "raw_payload": {"author_asset_id": "author_asset_001"},
  494. }
  495. ]
  496. )
  497. sql, params = connection.statements[-1]
  498. values = _insert_values(sql, params)
  499. assert "INSERT INTO `content_agent_author_assets`" in sql
  500. assert "ON DUPLICATE KEY UPDATE" in sql
  501. assert values["schema_version"] == "content_agent.v1"
  502. assert values["author_asset_id"] == "author_asset_001"
  503. assert values["platform_author_id"] == "author_001"
  504. assert values["eligible_as_source"] == 1
  505. assert json.loads(values["content_tags"]) == ["人物故事"]
  506. assert json.loads(values["profile_snapshot"])["sample_count"] == 9
  507. assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
  508. def test_database_runtime_writes_author_asset_roles():
  509. connection = FakeConnection()
  510. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  511. store.write_author_asset_roles(
  512. [
  513. {
  514. "author_asset_id": "author_asset_001",
  515. "role": "source_seed",
  516. "role_status": "active",
  517. "role_reason_code": "p7_author_asset_eligible",
  518. "assigned_by": "system",
  519. "source_run_id": "run_001",
  520. "raw_payload": {"role": "source_seed"},
  521. }
  522. ]
  523. )
  524. sql, params = connection.statements[-1]
  525. values = _insert_values(sql, params)
  526. assert "INSERT INTO `content_agent_author_asset_roles`" in sql
  527. assert "ON DUPLICATE KEY UPDATE" in sql
  528. assert values["schema_version"] == "content_agent.v1"
  529. assert values["author_asset_id"] == "author_asset_001"
  530. assert values["role"] == "source_seed"
  531. assert values["assigned_by"] == "system"
  532. assert json.loads(values["raw_payload"])["role"] == "source_seed"
  533. def test_database_runtime_writes_search_clue_assets():
  534. connection = FakeConnection()
  535. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  536. store.write_search_clue_assets(
  537. [
  538. {
  539. "search_clue_asset_id": "search_clue_asset_001",
  540. "platform": "douyin",
  541. "clue_type": "search_query",
  542. "normalized_clue_text": "银发旅行",
  543. "display_clue_text": "银发旅行",
  544. "promotion_status": "promoted",
  545. "reusable_priority": 1,
  546. "can_seed_next_run": 1,
  547. "first_seen_run_id": "run_001",
  548. "first_seen_policy_run_id": "policy_run_001",
  549. "summary_metrics": {"pooled_content_count": 1},
  550. "raw_payload": {"promotion_reason": "success_search_clue"},
  551. }
  552. ]
  553. )
  554. sql, params = connection.statements[-1]
  555. values = _insert_values(sql, params)
  556. assert "INSERT INTO `content_agent_search_clue_assets`" in sql
  557. assert "ON DUPLICATE KEY UPDATE" in sql
  558. assert values["schema_version"] == "content_agent.v1"
  559. assert values["search_clue_asset_id"] == "search_clue_asset_001"
  560. assert values["can_seed_next_run"] == 1
  561. assert json.loads(values["summary_metrics"])["pooled_content_count"] == 1
  562. def test_database_runtime_writes_search_clue_asset_evidence():
  563. connection = FakeConnection()
  564. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  565. store.write_search_clue_asset_evidence(
  566. [
  567. {
  568. "evidence_id": "search_clue_evidence_001",
  569. "search_clue_asset_id": "search_clue_asset_001",
  570. "run_id": "run_001",
  571. "policy_run_id": "policy_run_001",
  572. "clue_id": "clue_001",
  573. "search_query_id": "q_001",
  574. "pooled_content_count": 1,
  575. "source_path_record_ids": ["path_001"],
  576. "decision_ids": ["decision_001"],
  577. "performance_feedback_refs": [],
  578. "raw_payload": {"clue_id": "clue_001"},
  579. }
  580. ]
  581. )
  582. sql, params = connection.statements[-1]
  583. values = _insert_values(sql, params)
  584. assert "INSERT INTO `content_agent_search_clue_asset_evidence`" in sql
  585. assert "ON DUPLICATE KEY UPDATE" in sql
  586. assert values["schema_version"] == "content_agent.v1"
  587. assert values["run_id"] == "run_001"
  588. assert values["clue_id"] == "clue_001"
  589. assert json.loads(values["source_path_record_ids"]) == ["path_001"]
  590. assert json.loads(values["decision_ids"]) == ["decision_001"]
  591. def test_database_runtime_update_final_output_upserts_validation_status():
  592. connection = FakeConnection()
  593. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  594. store.update_json(
  595. "run_001",
  596. "final_output.json",
  597. {
  598. "schema_version": "runtime_record.v1",
  599. "run_id": "run_001",
  600. "policy_run_id": "policy_run_001",
  601. "summary": {"run_path_complete": True},
  602. "validation_status": "pass",
  603. },
  604. )
  605. sql, params = connection.statements[-1]
  606. values = _insert_values(sql, params)
  607. assert "INSERT INTO `content_agent_final_outputs`" in sql
  608. assert "ON DUPLICATE KEY UPDATE" in sql
  609. assert values["validation_status"] == "pass"
  610. assert json.loads(values["summary"])["run_path_complete"] is True
  611. def test_database_runtime_reads_performance_feedback_payloads():
  612. connection = FakeConnection()
  613. connection.select_all_result = [
  614. {
  615. "raw_payload": json.dumps(
  616. {
  617. "run_id": "run_001",
  618. "policy_run_id": "policy_run_001",
  619. "feedback_id": "feedback_001",
  620. "completion_rate": 0.8,
  621. }
  622. )
  623. }
  624. ]
  625. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  626. rows = store.read_performance_feedback("run_001", "policy_run_001")
  627. sql, params = connection.statements[-1]
  628. assert "FROM `content_agent_performance_feedback`" in sql
  629. assert params == ["run_001", "policy_run_001"]
  630. assert rows[0]["feedback_id"] == "feedback_001"
  631. assert rows[0]["raw_payload"]["completion_rate"] == 0.8
  632. def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
  633. connection = FakeConnection()
  634. connection.select_all_result = [
  635. {
  636. "raw_payload": json.dumps(
  637. {
  638. "record_schema_version": "runtime_record.v1",
  639. "run_id": "run_001",
  640. "policy_run_id": "policy_run_001",
  641. "search_query_id": "q_001",
  642. }
  643. )
  644. }
  645. ]
  646. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  647. rows = store.read_jsonl("run_001", "search_queries.jsonl")
  648. assert rows[0]["search_query_id"] == "q_001"
  649. assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
  650. def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
  651. connection = FakeConnection()
  652. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  653. try:
  654. store.append_jsonl(
  655. "run_001",
  656. "search_queries.jsonl",
  657. [
  658. {
  659. "record_schema_version": "runtime_record.v1",
  660. "run_id": "run_001",
  661. "policy_run_id": "policy_run_001",
  662. "search_query_id": "q_001",
  663. "search_query": "对比分析",
  664. "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
  665. }
  666. ],
  667. )
  668. except ValueError as exc:
  669. assert "forbidden key" in str(exc)
  670. else:
  671. raise AssertionError("expected forbidden raw_payload key to be rejected")
  672. def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
  673. connection = FakeConnection()
  674. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  675. store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
  676. assert connection.statements == []
  677. def test_database_runtime_update_run_record_persists_platform_failure_detail():
  678. connection = FakeConnection()
  679. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  680. store.update_run_record(
  681. "run_001",
  682. {
  683. "status": "failed",
  684. "error_code": "PLATFORM_REQUEST_FAILED",
  685. "error_detail": {
  686. "query_failures": [
  687. {
  688. "search_query_id": "q_001",
  689. "status": "failed",
  690. "error_code": "PLATFORM_REQUEST_FAILED",
  691. }
  692. ]
  693. },
  694. },
  695. )
  696. sql, params = connection.statements[-1]
  697. assert "UPDATE `content_agent_runs` SET" in sql
  698. assert "`error_detail` = %s" in sql
  699. assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001"
  700. assert params[-1] == "run_001"
  701. def test_business_modules_do_not_import_or_name_database_tables():
  702. root = Path("content_agent/business_modules")
  703. text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
  704. assert not re.search(
  705. r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
  706. text,
  707. )
  708. def _config():
  709. return ContentSupplyDbConfig(
  710. host="127.0.0.1",
  711. port=3306,
  712. user="content_rw",
  713. password="dummy_password",
  714. database="content-deconstruction-supply",
  715. )
  716. def _insert_values(sql, params):
  717. match = re.search(r"\((.*?)\) VALUES", sql)
  718. assert match, sql
  719. columns = [part.strip().strip("`") for part in match.group(1).split(",")]
  720. return dict(zip(columns, params))