test_database_runtime.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. import json
  2. import re
  3. from pathlib import Path
  4. from content_agent.integrations.database_runtime import (
  5. ContentSupplyDbConfig,
  6. DatabaseRuntimeStore,
  7. )
  8. class FakeCursor:
  9. def __init__(self, connection):
  10. self.connection = connection
  11. self._one = None
  12. self._all = []
  13. def __enter__(self):
  14. return self
  15. def __exit__(self, *_args):
  16. return None
  17. def execute(self, sql, params=None):
  18. self.connection.statements.append((sql, list(params or [])))
  19. if sql.startswith("SELECT COUNT(*)"):
  20. self._one = {"cnt": 0}
  21. self._all = []
  22. elif sql.startswith("SELECT"):
  23. self._one = self.connection.select_one_result
  24. self._all = self.connection.select_all_result
  25. def fetchone(self):
  26. return self._one
  27. def fetchall(self):
  28. return self._all
  29. class FakeConnection:
  30. def __init__(self):
  31. self.statements = []
  32. self.commit_count = 0
  33. self.select_one_result = None
  34. self.select_all_result = []
  35. def __enter__(self):
  36. return self
  37. def __exit__(self, *_args):
  38. return None
  39. def cursor(self):
  40. return FakeCursor(self)
  41. def commit(self):
  42. self.commit_count += 1
  43. def test_content_supply_db_config_reads_env_file(tmp_path):
  44. env_file = tmp_path / ".env"
  45. env_file.write_text(
  46. "\n".join(
  47. [
  48. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  49. "CONTENT_SUPPLY_DB_PORT=3307",
  50. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  51. "CONTENT_SUPPLY_DB_USER=content_rw",
  52. "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
  53. ]
  54. ),
  55. encoding="utf-8",
  56. )
  57. config = ContentSupplyDbConfig.from_env(env_file=env_file)
  58. assert config.host == "127.0.0.1"
  59. assert config.port == 3307
  60. assert config.database == "content-deconstruction-supply"
  61. assert config.user == "content_rw"
  62. def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
  63. env_file = tmp_path / ".env"
  64. env_file.write_text(
  65. "\n".join(
  66. [
  67. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  68. "CONTENT_SUPPLY_DB_PORT=3307",
  69. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  70. "CONTENT_SUPPLY_DB_USER=content_rw",
  71. ]
  72. ),
  73. encoding="utf-8",
  74. )
  75. try:
  76. ContentSupplyDbConfig.from_env(env_file=env_file)
  77. except ValueError as exc:
  78. assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
  79. else:
  80. raise AssertionError("expected missing db env key to fail")
  81. def test_database_runtime_writes_source_context_with_db_schema_version():
  82. connection = FakeConnection()
  83. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  84. store.write_json(
  85. "run_001",
  86. "source_context.json",
  87. {
  88. "schema_version": "runtime_record.v1",
  89. "run_id": "run_001",
  90. "demand_content_id": "123",
  91. "ext_data": {
  92. "evidence_pack": {
  93. "pattern_source_system": "pg_pattern_v2",
  94. "source_kind": "pattern_itemset",
  95. "source_post_id": "post_001",
  96. "pattern_execution_id": 581,
  97. "mining_config_id": 2081,
  98. }
  99. },
  100. },
  101. )
  102. sql, params = connection.statements[-1]
  103. values = _insert_values(sql, params)
  104. assert "INSERT INTO `content_agent_source_contexts`" in sql
  105. assert values["schema_version"] == "content_agent.v1"
  106. assert values["run_id"] == "run_001"
  107. assert values["demand_content_id"] == 123
  108. assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
  109. assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
  110. def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
  111. connection = FakeConnection()
  112. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  113. store.write_json(
  114. "run_001",
  115. "pattern_seed_pack.json",
  116. {
  117. "schema_version": "runtime_record.v1",
  118. "run_id": "run_001",
  119. "policy_run_id": "policy_run_001",
  120. "source_post_id": "post_001",
  121. "pattern_execution_id": 581,
  122. "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
  123. "seed_terms": ["爱国情感", "人物故事"],
  124. },
  125. )
  126. sql, params = connection.statements[-1]
  127. values = _insert_values(sql, params)
  128. assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
  129. assert values["schema_version"] == "content_agent.v1"
  130. assert json.loads(values["itemset_ids"]) == [1608352]
  131. assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
  132. def test_database_runtime_appends_jsonl_with_raw_payload():
  133. connection = FakeConnection()
  134. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  135. store.append_jsonl(
  136. "run_001",
  137. "search_queries.jsonl",
  138. [
  139. {
  140. "record_schema_version": "runtime_record.v1",
  141. "run_id": "run_001",
  142. "policy_run_id": "policy_run_001",
  143. "search_query_id": "q_001",
  144. "search_query": "对比分析",
  145. "search_query_generation_method": "item_single",
  146. "pattern_seed_ref": {
  147. "source_field": "seed_terms",
  148. "source_index": 0,
  149. "seed_term": "对比分析",
  150. },
  151. "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
  152. }
  153. ],
  154. )
  155. sql, params = connection.statements[-1]
  156. values = _insert_values(sql, params)
  157. assert "INSERT INTO `content_agent_queries`" in sql
  158. assert values["schema_version"] == "content_agent.v1"
  159. assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
  160. assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
  161. def test_database_runtime_upserts_failed_search_query_status():
  162. connection = FakeConnection()
  163. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  164. store.append_jsonl(
  165. "run_001",
  166. "search_queries.jsonl",
  167. [
  168. {
  169. "record_schema_version": "runtime_record.v1",
  170. "run_id": "run_001",
  171. "policy_run_id": "policy_run_001",
  172. "search_query_id": "q_001",
  173. "search_query": "接口失败",
  174. "search_query_generation_method": "item_single",
  175. "search_query_effect_status": "failed",
  176. "raw_payload": {
  177. "run_id": "run_001",
  178. "policy_run_id": "policy_run_001",
  179. "search_query_id": "q_001",
  180. "search_query_effect_status": "failed",
  181. "query_failure": {
  182. "status": "failed",
  183. "error_code": "PLATFORM_REQUEST_FAILED",
  184. },
  185. },
  186. }
  187. ],
  188. )
  189. sql, params = connection.statements[-1]
  190. values = _insert_values(sql, params)
  191. assert "INSERT INTO `content_agent_queries`" in sql
  192. assert "ON DUPLICATE KEY UPDATE" in sql
  193. assert values["search_query_effect_status"] == "failed"
  194. payload = json.loads(values["raw_payload"])
  195. assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED"
  196. def test_database_runtime_preserves_llm_variant_payload_fields():
  197. connection = FakeConnection()
  198. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  199. store.append_jsonl(
  200. "run_001",
  201. "search_queries.jsonl",
  202. [
  203. {
  204. "record_schema_version": "runtime_record.v1",
  205. "run_id": "run_001",
  206. "policy_run_id": "policy_run_001",
  207. "search_query_id": "q_002",
  208. "search_query": "人物叙事素材",
  209. "search_query_generation_method": "llm_variant",
  210. "pattern_seed_ref": {
  211. "source_field": "seed_terms",
  212. "source_index": 0,
  213. "seed_term": "人物故事",
  214. },
  215. "llm_variant_of": "q_001",
  216. "raw_payload": {
  217. "run_id": "run_001",
  218. "policy_run_id": "policy_run_001",
  219. "search_query_id": "q_002",
  220. "search_query_generation_method": "llm_variant",
  221. "llm_variant_of": "q_001",
  222. "llm_input_evidence": {"seed_term": "人物故事"},
  223. "llm_prompt_version": "fake-query-prompt-v1",
  224. "llm_generation_model": "fake-query-model",
  225. },
  226. }
  227. ],
  228. )
  229. sql, params = connection.statements[-1]
  230. values = _insert_values(sql, params)
  231. assert "INSERT INTO `content_agent_queries`" in sql
  232. assert "llm_variant_of" not in values
  233. payload = json.loads(values["raw_payload"])
  234. assert payload["llm_variant_of"] == "q_001"
  235. assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
  236. assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
  237. assert payload["llm_generation_model"] == "fake-query-model"
  238. def test_database_runtime_upserts_pattern_recall_evidence():
  239. connection = FakeConnection()
  240. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  241. # V3 形状: Gemini 直读判定证据(recall_status=judged + evidence_summary 判定字段);
  242. # decode 时代 7 列已随 V2 数据归档删除,列过滤必须把混入的旧键挡在 DB 外。
  243. store.append_jsonl(
  244. "run_001",
  245. "pattern_recall_evidence.jsonl",
  246. [
  247. {
  248. "record_schema_version": "runtime_record.v1",
  249. "run_id": "run_001",
  250. "policy_run_id": "policy_run_001",
  251. "recall_evidence_id": "recall_001",
  252. "content_discovery_id": "content_001",
  253. "platform": "douyin",
  254. "platform_content_id": "7390000000000000000",
  255. "recall_status": "judged",
  256. "decode_status": "success",
  257. "matched_terms": ["爱国情感"],
  258. "evidence_summary": {
  259. "judge_status": "ok",
  260. "fit_senior_50plus": True,
  261. "relevance_score": 0.85,
  262. },
  263. "raw_payload": {
  264. "platform": "douyin",
  265. "judge_reason": "贴题且适合 50+",
  266. },
  267. }
  268. ],
  269. )
  270. sql, params = connection.statements[-1]
  271. values = _insert_values(sql, params)
  272. assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
  273. assert "ON DUPLICATE KEY UPDATE" in sql
  274. assert values["schema_version"] == "content_agent.v1"
  275. assert values["recall_evidence_id"] == "recall_001"
  276. assert values["recall_status"] == "judged"
  277. assert "platform" not in values
  278. assert "decode_status" not in values
  279. assert "matched_terms" not in values
  280. assert json.loads(values["evidence_summary"])["fit_senior_50plus"] is True
  281. assert json.loads(values["raw_payload"])["platform"] == "douyin"
  282. def test_database_runtime_preserves_p5_rule_decision_fields():
  283. connection = FakeConnection()
  284. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  285. store.append_jsonl(
  286. "run_001",
  287. "rule_decisions.jsonl",
  288. [
  289. {
  290. "record_schema_version": "runtime_record.v1",
  291. "run_id": "run_001",
  292. "policy_run_id": "policy_run_001",
  293. "decision_id": "d_001",
  294. "policy_bundle_id": "policy_bundle_v1",
  295. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  296. "rule_pack_version": "1.0.0",
  297. "strategy_version": "V1",
  298. "decision_target_type": "content",
  299. "decision_target_id": "content_001",
  300. "decision_action": "REJECT_CONTENT",
  301. "decision_reason_code": "pattern_recall_failed",
  302. "search_query_effect_status": "rule_blocked",
  303. "score": None,
  304. "triggered_blocking_rules": ["gate_pattern_recall_failed"],
  305. "scorecard": {"total_score": None, "score_missing": True},
  306. "decision_replay_data": {
  307. "policy_bundle_hash": "hash_001",
  308. "dispatch_id": "dispatch_content",
  309. "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
  310. },
  311. "raw_payload": {
  312. "decision_id": "d_001",
  313. "search_query_effect_status": "rule_blocked",
  314. "decision_replay_data": {
  315. "policy_bundle_hash": "hash_001",
  316. "dispatch_id": "dispatch_content",
  317. },
  318. },
  319. }
  320. ],
  321. )
  322. sql, params = connection.statements[-1]
  323. values = _insert_values(sql, params)
  324. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  325. assert values["search_query_effect_status"] == "rule_blocked"
  326. assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
  327. assert json.loads(values["scorecard"])["score_missing"] is True
  328. assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
  329. assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
  330. def test_database_runtime_preserves_v4_score_and_walk_json_contract():
  331. connection = FakeConnection()
  332. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  333. store.append_jsonl(
  334. "run_001",
  335. "rule_decisions.jsonl",
  336. [
  337. {
  338. "record_schema_version": "runtime_record.v1",
  339. "run_id": "run_001",
  340. "policy_run_id": "policy_run_001",
  341. "decision_id": "d_v4_001",
  342. "policy_bundle_id": "policy_bundle_v4",
  343. "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
  344. "rule_pack_version": "4.0.0",
  345. "strategy_version": "V4",
  346. "decision_target_type": "content",
  347. "decision_target_id": "content_001",
  348. "decision_action": "ADD_TO_CONTENT_POOL",
  349. "decision_reason_code": "v4_query_and_platform_pass",
  350. "search_query_effect_status": "success",
  351. "score": 80,
  352. "scorecard": {
  353. "schema_version": "v4_scorecard.v1",
  354. "query_relevance_score": 82,
  355. "platform_performance_score": 78,
  356. "missing_observable_fields": ["view_count"],
  357. },
  358. "decision_replay_data": {
  359. "policy_bundle_hash": "hash_v4",
  360. "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
  361. "rule_pack_version": "4.0.0",
  362. "dispatch_id": "dispatch_content_v4",
  363. "strategy_version": "V4",
  364. "allow_walk": True,
  365. "walk_gate_snapshot": {
  366. "query_relevance_score": 82,
  367. "platform_performance_score": 78,
  368. "score": 80,
  369. },
  370. },
  371. "raw_payload": {
  372. "decision_id": "d_v4_001",
  373. "v4_contract": {
  374. "query_relevance_score": 82,
  375. "platform_performance_score": 78,
  376. },
  377. },
  378. }
  379. ],
  380. )
  381. sql, params = connection.statements[-1]
  382. values = _insert_values(sql, params)
  383. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  384. scorecard = json.loads(values["scorecard"])
  385. replay_data = json.loads(values["decision_replay_data"])
  386. assert scorecard["schema_version"] == "v4_scorecard.v1"
  387. assert scorecard["query_relevance_score"] == 82
  388. assert scorecard["platform_performance_score"] == 78
  389. assert scorecard["missing_observable_fields"] == ["view_count"]
  390. assert replay_data["allow_walk"] is True
  391. assert replay_data["walk_gate_snapshot"]["score"] == 80
  392. def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
  393. connection = FakeConnection()
  394. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  395. store.append_jsonl(
  396. "run_001",
  397. "search_clues.jsonl",
  398. [
  399. {
  400. "record_schema_version": "runtime_record.v1",
  401. "run_id": "run_001",
  402. "policy_run_id": "policy_run_001",
  403. "clue_id": "clue_001",
  404. "search_query_id": "q_001",
  405. "search_query": "爱国情感",
  406. "discovery_start_source": "pattern_seed",
  407. "previous_discovery_step": "search_query_generated",
  408. "result_count": 1,
  409. "pooled_content_count": 0,
  410. "review_content_count": 0,
  411. "pending_content_count": 0,
  412. "rejected_content_count": 1,
  413. "search_query_effect_status": "rule_blocked",
  414. "query_aggregation_id": "agg_query_rule_blocked",
  415. "walk_next_step": "stop_search_query",
  416. "raw_payload": {
  417. "clue_id": "clue_001",
  418. "query_aggregation_id": "agg_query_rule_blocked",
  419. "effect_status_counts": {"rule_blocked": 1},
  420. },
  421. }
  422. ],
  423. )
  424. sql, params = connection.statements[-1]
  425. values = _insert_values(sql, params)
  426. assert "INSERT INTO `content_agent_search_clues`" in sql
  427. assert "query_aggregation_id" not in values
  428. assert values["search_query_effect_status"] == "rule_blocked"
  429. assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
  430. def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event():
  431. connection = FakeConnection()
  432. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  433. query_failure = {
  434. "search_query_id": "q_001",
  435. "search_query": "接口失败",
  436. "search_query_generation_method": "item_single",
  437. "status": "failed",
  438. "error_code": "PLATFORM_REQUEST_FAILED",
  439. "message": "platform query failed",
  440. }
  441. store.append_jsonl(
  442. "run_001",
  443. "search_clues.jsonl",
  444. [
  445. {
  446. "record_schema_version": "runtime_record.v1",
  447. "run_id": "run_001",
  448. "policy_run_id": "policy_run_001",
  449. "clue_id": "clue_001",
  450. "search_query_id": "q_001",
  451. "search_query": "接口失败",
  452. "discovery_start_source": "pattern_itemset",
  453. "previous_discovery_step": "pattern_search_query",
  454. "result_count": 0,
  455. "pooled_content_count": 0,
  456. "review_content_count": 0,
  457. "pending_content_count": 0,
  458. "rejected_content_count": 0,
  459. "search_query_effect_status": "failed",
  460. "query_aggregation_id": "platform_query_failure",
  461. "walk_next_step": "stop_search_query",
  462. "raw_payload": {
  463. "clue_id": "clue_001",
  464. "query_failure": query_failure,
  465. },
  466. }
  467. ],
  468. )
  469. store.append_jsonl(
  470. "run_001",
  471. "run_events.jsonl",
  472. [
  473. {
  474. "record_schema_version": "runtime_record.v1",
  475. "run_id": "run_001",
  476. "policy_run_id": "policy_run_001",
  477. "event_id": "evt_001",
  478. "event_type": "platform_query_failed",
  479. "status": "failed",
  480. "input_ref": "search_queries.jsonl:q_001",
  481. "output_ref": "search_clues.jsonl",
  482. "error_code": "PLATFORM_REQUEST_FAILED",
  483. "message": "platform query failed",
  484. "raw_payload": {
  485. "event_id": "evt_001",
  486. "query_failure": query_failure,
  487. },
  488. }
  489. ],
  490. )
  491. clue_sql, clue_params = connection.statements[-2]
  492. clue_values = _insert_values(clue_sql, clue_params)
  493. assert "INSERT INTO `content_agent_search_clues`" in clue_sql
  494. assert "ON DUPLICATE KEY UPDATE" in clue_sql
  495. assert clue_values["search_query_effect_status"] == "failed"
  496. assert clue_values["walk_next_step"] == "stop_search_query"
  497. assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001"
  498. event_sql, event_params = connection.statements[-1]
  499. event_values = _insert_values(event_sql, event_params)
  500. assert "INSERT INTO `content_agent_run_events`" in event_sql
  501. assert "ON DUPLICATE KEY UPDATE" in event_sql
  502. assert event_values["event_type"] == "platform_query_failed"
  503. assert event_values["status"] == "failed"
  504. assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED"
  505. def test_database_runtime_writes_publish_jobs_db_only_records():
  506. connection = FakeConnection()
  507. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  508. store.write_publish_jobs(
  509. "run_001",
  510. "policy_run_001",
  511. [
  512. {
  513. "publish_job_id": "publish_job_001",
  514. "platform_content_id": "7390000000000000000",
  515. "job_status": "created",
  516. "trigger_mode": "manual_review",
  517. "request_payload": {
  518. "decision_id": "decision_001",
  519. "source_path_record_ids": ["path_001"],
  520. },
  521. "response_payload": {},
  522. }
  523. ],
  524. )
  525. sql, params = connection.statements[-1]
  526. values = _insert_values(sql, params)
  527. assert "INSERT INTO `content_agent_publish_jobs`" in sql
  528. assert "ON DUPLICATE KEY UPDATE" in sql
  529. assert values["schema_version"] == "content_agent.v1"
  530. assert values["run_id"] == "run_001"
  531. assert values["policy_run_id"] == "policy_run_001"
  532. assert values["publish_job_id"] == "publish_job_001"
  533. assert values["job_status"] == "created"
  534. assert values["trigger_mode"] == "manual_review"
  535. assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
  536. def test_database_runtime_writes_author_assets():
  537. connection = FakeConnection()
  538. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  539. store.write_author_assets(
  540. [
  541. {
  542. "author_asset_id": "author_asset_001",
  543. "platform": "douyin",
  544. "platform_author_id": "author_001",
  545. "author_display_name": "作者一号",
  546. "asset_status": "active",
  547. "source_type": "runtime_author_work",
  548. "validation_status": "validated",
  549. "eligible_as_source": 1,
  550. "elderly_ratio": 0.72,
  551. "elderly_tgi": 138,
  552. "content_tags": ["人物故事"],
  553. "source_run_id": "run_001",
  554. "source_policy_run_id": "policy_run_001",
  555. "profile_snapshot": {"sample_count": 9},
  556. "evidence_refs": {"decision_ids": ["d_001"]},
  557. "raw_payload": {"author_asset_id": "author_asset_001"},
  558. }
  559. ]
  560. )
  561. sql, params = connection.statements[-1]
  562. values = _insert_values(sql, params)
  563. assert "INSERT INTO `content_agent_author_assets`" in sql
  564. assert "ON DUPLICATE KEY UPDATE" in sql
  565. assert values["schema_version"] == "content_agent.v1"
  566. assert values["author_asset_id"] == "author_asset_001"
  567. assert values["platform_author_id"] == "author_001"
  568. assert values["eligible_as_source"] == 1
  569. assert values["elderly_ratio"] == 0.72
  570. assert values["elderly_tgi"] == 138
  571. assert json.loads(values["content_tags"]) == ["人物故事"]
  572. assert json.loads(values["profile_snapshot"])["sample_count"] == 9
  573. assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
  574. def test_database_runtime_writes_author_asset_roles():
  575. connection = FakeConnection()
  576. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  577. store.write_author_asset_roles(
  578. [
  579. {
  580. "author_asset_id": "author_asset_001",
  581. "role": "source_seed",
  582. "role_status": "active",
  583. "role_reason_code": "p7_author_asset_eligible",
  584. "assigned_by": "system",
  585. "source_run_id": "run_001",
  586. "raw_payload": {"role": "source_seed"},
  587. }
  588. ]
  589. )
  590. sql, params = connection.statements[-1]
  591. values = _insert_values(sql, params)
  592. assert "INSERT INTO `content_agent_author_asset_roles`" in sql
  593. assert "ON DUPLICATE KEY UPDATE" in sql
  594. assert values["schema_version"] == "content_agent.v1"
  595. assert values["author_asset_id"] == "author_asset_001"
  596. assert values["role"] == "source_seed"
  597. assert values["assigned_by"] == "system"
  598. assert json.loads(values["raw_payload"])["role"] == "source_seed"
  599. def test_database_runtime_writes_search_clue_assets():
  600. connection = FakeConnection()
  601. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  602. store.write_search_clue_assets(
  603. [
  604. {
  605. "search_clue_asset_id": "search_clue_asset_001",
  606. "platform": "douyin",
  607. "clue_type": "search_query",
  608. "normalized_clue_text": "银发旅行",
  609. "display_clue_text": "银发旅行",
  610. "promotion_status": "promoted",
  611. "reusable_priority": 1,
  612. "can_seed_next_run": 1,
  613. "first_seen_run_id": "run_001",
  614. "first_seen_policy_run_id": "policy_run_001",
  615. "summary_metrics": {"pooled_content_count": 1},
  616. "raw_payload": {"promotion_reason": "success_search_clue"},
  617. }
  618. ]
  619. )
  620. sql, params = connection.statements[-1]
  621. values = _insert_values(sql, params)
  622. assert "INSERT INTO `content_agent_search_clue_assets`" in sql
  623. assert "ON DUPLICATE KEY UPDATE" in sql
  624. assert values["schema_version"] == "content_agent.v1"
  625. assert values["search_clue_asset_id"] == "search_clue_asset_001"
  626. assert values["can_seed_next_run"] == 1
  627. assert json.loads(values["summary_metrics"])["pooled_content_count"] == 1
  628. def test_database_runtime_writes_search_clue_asset_evidence():
  629. connection = FakeConnection()
  630. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  631. store.write_search_clue_asset_evidence(
  632. [
  633. {
  634. "evidence_id": "search_clue_evidence_001",
  635. "search_clue_asset_id": "search_clue_asset_001",
  636. "run_id": "run_001",
  637. "policy_run_id": "policy_run_001",
  638. "clue_id": "clue_001",
  639. "search_query_id": "q_001",
  640. "pooled_content_count": 1,
  641. "source_path_record_ids": ["path_001"],
  642. "decision_ids": ["decision_001"],
  643. "performance_feedback_refs": [],
  644. "raw_payload": {"clue_id": "clue_001"},
  645. }
  646. ]
  647. )
  648. sql, params = connection.statements[-1]
  649. values = _insert_values(sql, params)
  650. assert "INSERT INTO `content_agent_search_clue_asset_evidence`" in sql
  651. assert "ON DUPLICATE KEY UPDATE" in sql
  652. assert values["schema_version"] == "content_agent.v1"
  653. assert values["run_id"] == "run_001"
  654. assert values["clue_id"] == "clue_001"
  655. assert json.loads(values["source_path_record_ids"]) == ["path_001"]
  656. assert json.loads(values["decision_ids"]) == ["decision_001"]
  657. def test_database_runtime_update_final_output_upserts_validation_status():
  658. connection = FakeConnection()
  659. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  660. store.update_json(
  661. "run_001",
  662. "final_output.json",
  663. {
  664. "schema_version": "runtime_record.v1",
  665. "run_id": "run_001",
  666. "policy_run_id": "policy_run_001",
  667. "summary": {"run_path_complete": True},
  668. "validation_status": "pass",
  669. },
  670. )
  671. sql, params = connection.statements[-1]
  672. values = _insert_values(sql, params)
  673. assert "INSERT INTO `content_agent_final_outputs`" in sql
  674. assert "ON DUPLICATE KEY UPDATE" in sql
  675. assert values["validation_status"] == "pass"
  676. assert json.loads(values["summary"])["run_path_complete"] is True
  677. def test_database_runtime_reads_performance_feedback_payloads():
  678. connection = FakeConnection()
  679. connection.select_all_result = [
  680. {
  681. "raw_payload": json.dumps(
  682. {
  683. "run_id": "run_001",
  684. "policy_run_id": "policy_run_001",
  685. "feedback_id": "feedback_001",
  686. "completion_rate": 0.8,
  687. }
  688. )
  689. }
  690. ]
  691. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  692. rows = store.read_performance_feedback("run_001", "policy_run_001")
  693. sql, params = connection.statements[-1]
  694. assert "FROM `content_agent_performance_feedback`" in sql
  695. assert params == ["run_001", "policy_run_001"]
  696. assert rows[0]["feedback_id"] == "feedback_001"
  697. assert rows[0]["raw_payload"]["completion_rate"] == 0.8
  698. def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
  699. connection = FakeConnection()
  700. connection.select_all_result = [
  701. {
  702. "raw_payload": json.dumps(
  703. {
  704. "record_schema_version": "runtime_record.v1",
  705. "run_id": "run_001",
  706. "policy_run_id": "policy_run_001",
  707. "search_query_id": "q_001",
  708. }
  709. )
  710. }
  711. ]
  712. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  713. rows = store.read_jsonl("run_001", "search_queries.jsonl")
  714. assert rows[0]["search_query_id"] == "q_001"
  715. assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
  716. def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
  717. connection = FakeConnection()
  718. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  719. try:
  720. store.append_jsonl(
  721. "run_001",
  722. "search_queries.jsonl",
  723. [
  724. {
  725. "record_schema_version": "runtime_record.v1",
  726. "run_id": "run_001",
  727. "policy_run_id": "policy_run_001",
  728. "search_query_id": "q_001",
  729. "search_query": "对比分析",
  730. "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
  731. }
  732. ],
  733. )
  734. except ValueError as exc:
  735. assert "forbidden key" in str(exc)
  736. else:
  737. raise AssertionError("expected forbidden raw_payload key to be rejected")
  738. def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
  739. connection = FakeConnection()
  740. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  741. store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
  742. assert connection.statements == []
  743. def test_database_runtime_update_run_record_persists_platform_failure_detail():
  744. connection = FakeConnection()
  745. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  746. store.update_run_record(
  747. "run_001",
  748. {
  749. "status": "failed",
  750. "error_code": "PLATFORM_REQUEST_FAILED",
  751. "error_detail": {
  752. "query_failures": [
  753. {
  754. "search_query_id": "q_001",
  755. "status": "failed",
  756. "error_code": "PLATFORM_REQUEST_FAILED",
  757. }
  758. ]
  759. },
  760. },
  761. )
  762. sql, params = connection.statements[-1]
  763. assert "UPDATE `content_agent_runs` SET" in sql
  764. assert "`error_detail` = %s" in sql
  765. assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001"
  766. assert params[-1] == "run_001"
  767. def test_business_modules_do_not_import_or_name_database_tables():
  768. root = Path("content_agent/business_modules")
  769. text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
  770. assert not re.search(
  771. r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
  772. text,
  773. )
  774. def _config():
  775. return ContentSupplyDbConfig(
  776. host="127.0.0.1",
  777. port=3306,
  778. user="content_rw",
  779. password="dummy_password",
  780. database="content-deconstruction-supply",
  781. )
  782. def _insert_values(sql, params):
  783. match = re.search(r"\((.*?)\) VALUES", sql)
  784. assert match, sql
  785. columns = [part.strip().strip("`") for part in match.group(1).split(",")]
  786. return dict(zip(columns, params))