test_database_runtime.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. import json
  2. import re
  3. from pathlib import Path
  4. from content_agent.integrations.database_runtime import (
  5. ContentSupplyDbConfig,
  6. DatabaseRuntimeStore,
  7. )
  8. class FakeCursor:
  9. def __init__(self, connection):
  10. self.connection = connection
  11. self._one = None
  12. self._all = []
  13. def __enter__(self):
  14. return self
  15. def __exit__(self, *_args):
  16. return None
  17. def execute(self, sql, params=None):
  18. self.connection.statements.append((sql, list(params or [])))
  19. if sql.startswith("SELECT COUNT(*)"):
  20. self._one = {"cnt": 0}
  21. self._all = []
  22. elif sql.startswith("SELECT"):
  23. self._one = self.connection.select_one_result
  24. self._all = self.connection.select_all_result
  25. def fetchone(self):
  26. return self._one
  27. def fetchall(self):
  28. return self._all
  29. class FakeConnection:
  30. def __init__(self):
  31. self.statements = []
  32. self.commit_count = 0
  33. self.select_one_result = None
  34. self.select_all_result = []
  35. def __enter__(self):
  36. return self
  37. def __exit__(self, *_args):
  38. return None
  39. def cursor(self):
  40. return FakeCursor(self)
  41. def commit(self):
  42. self.commit_count += 1
  43. def test_content_supply_db_config_reads_env_file(tmp_path):
  44. env_file = tmp_path / ".env"
  45. env_file.write_text(
  46. "\n".join(
  47. [
  48. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  49. "CONTENT_SUPPLY_DB_PORT=3307",
  50. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  51. "CONTENT_SUPPLY_DB_USER=content_rw",
  52. "CONTENT_SUPPLY_DB_" + "PASS" + "WORD=dummy_password",
  53. ]
  54. ),
  55. encoding="utf-8",
  56. )
  57. config = ContentSupplyDbConfig.from_env(env_file=env_file)
  58. assert config.host == "127.0.0.1"
  59. assert config.port == 3307
  60. assert config.database == "content-deconstruction-supply"
  61. assert config.user == "content_rw"
  62. def test_content_supply_db_config_requires_all_project_db_keys(tmp_path):
  63. env_file = tmp_path / ".env"
  64. env_file.write_text(
  65. "\n".join(
  66. [
  67. "CONTENT_SUPPLY_DB_HOST=127.0.0.1",
  68. "CONTENT_SUPPLY_DB_PORT=3307",
  69. "CONTENT_SUPPLY_DB_NAME=content-deconstruction-supply",
  70. "CONTENT_SUPPLY_DB_USER=content_rw",
  71. ]
  72. ),
  73. encoding="utf-8",
  74. )
  75. try:
  76. ContentSupplyDbConfig.from_env(env_file=env_file)
  77. except ValueError as exc:
  78. assert "CONTENT_SUPPLY_DB_PASSWORD" in str(exc)
  79. else:
  80. raise AssertionError("expected missing db env key to fail")
  81. def test_database_runtime_writes_source_context_with_db_schema_version():
  82. connection = FakeConnection()
  83. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  84. store.write_json(
  85. "run_001",
  86. "source_context.json",
  87. {
  88. "schema_version": "runtime_record.v1",
  89. "run_id": "run_001",
  90. "demand_content_id": "123",
  91. "ext_data": {
  92. "evidence_pack": {
  93. "pattern_source_system": "pg_pattern_v2",
  94. "source_kind": "pattern_itemset",
  95. "source_post_id": "post_001",
  96. "pattern_execution_id": 581,
  97. "mining_config_id": 2081,
  98. }
  99. },
  100. },
  101. )
  102. sql, params = connection.statements[-1]
  103. values = _insert_values(sql, params)
  104. assert "INSERT INTO `content_agent_source_contexts`" in sql
  105. assert values["schema_version"] == "content_agent.v1"
  106. assert values["run_id"] == "run_001"
  107. assert values["demand_content_id"] == 123
  108. assert json.loads(values["evidence_pack"])["source_post_id"] == "post_001"
  109. assert json.loads(values["source_context"])["schema_version"] == "runtime_record.v1"
  110. def test_database_runtime_derives_itemset_ids_from_seed_pack_itemsets():
  111. connection = FakeConnection()
  112. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  113. store.write_json(
  114. "run_001",
  115. "pattern_seed_pack.json",
  116. {
  117. "schema_version": "runtime_record.v1",
  118. "run_id": "run_001",
  119. "policy_run_id": "policy_run_001",
  120. "source_post_id": "post_001",
  121. "pattern_execution_id": 581,
  122. "itemsets": [{"itemset_id": 1608352}, {"itemset_id": 1608352}],
  123. "seed_terms": ["爱国情感", "人物故事"],
  124. },
  125. )
  126. sql, params = connection.statements[-1]
  127. values = _insert_values(sql, params)
  128. assert "INSERT INTO `content_agent_pattern_seed_packs`" in sql
  129. assert values["schema_version"] == "content_agent.v1"
  130. assert json.loads(values["itemset_ids"]) == [1608352]
  131. assert json.loads(values["pattern_seed_pack"])["schema_version"] == "runtime_record.v1"
  132. def test_database_runtime_appends_jsonl_with_raw_payload():
  133. connection = FakeConnection()
  134. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  135. store.append_jsonl(
  136. "run_001",
  137. "search_queries.jsonl",
  138. [
  139. {
  140. "record_schema_version": "runtime_record.v1",
  141. "run_id": "run_001",
  142. "policy_run_id": "policy_run_001",
  143. "search_query_id": "q_001",
  144. "search_query": "对比分析",
  145. "search_query_generation_method": "item_single",
  146. "pattern_seed_ref": {
  147. "source_field": "seed_terms",
  148. "source_index": 0,
  149. "seed_term": "对比分析",
  150. },
  151. "raw_payload": {"run_id": "run_001", "search_query_id": "q_001"},
  152. }
  153. ],
  154. )
  155. sql, params = connection.statements[-1]
  156. values = _insert_values(sql, params)
  157. assert "INSERT INTO `content_agent_queries`" in sql
  158. assert values["schema_version"] == "content_agent.v1"
  159. assert json.loads(values["pattern_seed_ref"])["seed_term"] == "对比分析"
  160. assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
  161. def test_database_runtime_upserts_content_media_records():
  162. connection = FakeConnection()
  163. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  164. store.append_jsonl(
  165. "run_001",
  166. "content_media_records.jsonl",
  167. [
  168. {
  169. "record_schema_version": "runtime_record.v1",
  170. "run_id": "run_001",
  171. "policy_run_id": "policy_run_001",
  172. "platform": "douyin",
  173. "platform_content_id": "content_001",
  174. "content_media_status": "oss_uploaded",
  175. "content_metadata_source": "douyin_keyword_search",
  176. "play_url": "https://source.example/video.mp4",
  177. "local_path": None,
  178. "oss_url": "https://res.example/video.mp4",
  179. "raw_payload": {
  180. "run_id": "run_001",
  181. "platform_content_id": "content_001",
  182. "oss_object_key": "crawler/video/content_001.mp4",
  183. },
  184. "created_at": "2026-06-15T00:00:00+00:00",
  185. }
  186. ],
  187. )
  188. sql, _ = connection.statements[-1]
  189. assert "INSERT INTO `content_agent_content_media_records`" in sql
  190. assert "ON DUPLICATE KEY UPDATE" in sql
  191. assert "`oss_url` = VALUES(`oss_url`)" in sql
  192. def test_database_runtime_upserts_failed_search_query_status():
  193. connection = FakeConnection()
  194. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  195. store.append_jsonl(
  196. "run_001",
  197. "search_queries.jsonl",
  198. [
  199. {
  200. "record_schema_version": "runtime_record.v1",
  201. "run_id": "run_001",
  202. "policy_run_id": "policy_run_001",
  203. "search_query_id": "q_001",
  204. "search_query": "接口失败",
  205. "search_query_generation_method": "item_single",
  206. "search_query_effect_status": "failed",
  207. "raw_payload": {
  208. "run_id": "run_001",
  209. "policy_run_id": "policy_run_001",
  210. "search_query_id": "q_001",
  211. "search_query_effect_status": "failed",
  212. "query_failure": {
  213. "status": "failed",
  214. "error_code": "PLATFORM_REQUEST_FAILED",
  215. },
  216. },
  217. }
  218. ],
  219. )
  220. sql, params = connection.statements[-1]
  221. values = _insert_values(sql, params)
  222. assert "INSERT INTO `content_agent_queries`" in sql
  223. assert "ON DUPLICATE KEY UPDATE" in sql
  224. assert values["search_query_effect_status"] == "failed"
  225. payload = json.loads(values["raw_payload"])
  226. assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED"
  227. def test_database_runtime_preserves_llm_variant_payload_fields():
  228. connection = FakeConnection()
  229. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  230. store.append_jsonl(
  231. "run_001",
  232. "search_queries.jsonl",
  233. [
  234. {
  235. "record_schema_version": "runtime_record.v1",
  236. "run_id": "run_001",
  237. "policy_run_id": "policy_run_001",
  238. "search_query_id": "q_002",
  239. "search_query": "人物叙事素材",
  240. "search_query_generation_method": "llm_variant",
  241. "pattern_seed_ref": {
  242. "source_field": "seed_terms",
  243. "source_index": 0,
  244. "seed_term": "人物故事",
  245. },
  246. "llm_variant_of": "q_001",
  247. "raw_payload": {
  248. "run_id": "run_001",
  249. "policy_run_id": "policy_run_001",
  250. "search_query_id": "q_002",
  251. "search_query_generation_method": "llm_variant",
  252. "llm_variant_of": "q_001",
  253. "llm_input_evidence": {"seed_term": "人物故事"},
  254. "llm_prompt_version": "fake-query-prompt-v1",
  255. "llm_generation_model": "fake-query-model",
  256. },
  257. }
  258. ],
  259. )
  260. sql, params = connection.statements[-1]
  261. values = _insert_values(sql, params)
  262. assert "INSERT INTO `content_agent_queries`" in sql
  263. assert "llm_variant_of" not in values
  264. payload = json.loads(values["raw_payload"])
  265. assert payload["llm_variant_of"] == "q_001"
  266. assert payload["llm_input_evidence"]["seed_term"] == "人物故事"
  267. assert payload["llm_prompt_version"] == "fake-query-prompt-v1"
  268. assert payload["llm_generation_model"] == "fake-query-model"
  269. def test_database_runtime_upserts_pattern_recall_evidence():
  270. connection = FakeConnection()
  271. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  272. # V3 形状: Gemini 直读判定证据(recall_status=judged + evidence_summary 判定字段);
  273. # decode 时代 7 列已随 V2 数据归档删除,列过滤必须把混入的旧键挡在 DB 外。
  274. store.append_jsonl(
  275. "run_001",
  276. "pattern_recall_evidence.jsonl",
  277. [
  278. {
  279. "record_schema_version": "runtime_record.v1",
  280. "run_id": "run_001",
  281. "policy_run_id": "policy_run_001",
  282. "recall_evidence_id": "recall_001",
  283. "content_discovery_id": "content_001",
  284. "platform": "douyin",
  285. "platform_content_id": "7390000000000000000",
  286. "recall_status": "judged",
  287. "decode_status": "success",
  288. "matched_terms": ["爱国情感"],
  289. "evidence_summary": {
  290. "judge_status": "ok",
  291. "fit_senior_50plus": True,
  292. "relevance_score": 0.85,
  293. },
  294. "raw_payload": {
  295. "platform": "douyin",
  296. "judge_reason": "贴题且适合 50+",
  297. },
  298. }
  299. ],
  300. )
  301. sql, params = connection.statements[-1]
  302. values = _insert_values(sql, params)
  303. assert "INSERT INTO `content_agent_pattern_recall_evidence`" in sql
  304. assert "ON DUPLICATE KEY UPDATE" in sql
  305. assert values["schema_version"] == "content_agent.v1"
  306. assert values["recall_evidence_id"] == "recall_001"
  307. assert values["recall_status"] == "judged"
  308. assert "platform" not in values
  309. assert "decode_status" not in values
  310. assert "matched_terms" not in values
  311. assert json.loads(values["evidence_summary"])["fit_senior_50plus"] is True
  312. assert json.loads(values["raw_payload"])["platform"] == "douyin"
  313. def test_database_runtime_preserves_p5_rule_decision_fields():
  314. connection = FakeConnection()
  315. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  316. store.append_jsonl(
  317. "run_001",
  318. "rule_decisions.jsonl",
  319. [
  320. {
  321. "record_schema_version": "runtime_record.v1",
  322. "run_id": "run_001",
  323. "policy_run_id": "policy_run_001",
  324. "decision_id": "d_001",
  325. "policy_bundle_id": "policy_bundle_v1",
  326. "rule_pack_id": "douyin_content_discovery_rule_pack_v1",
  327. "rule_pack_version": "1.0.0",
  328. "strategy_version": "V1",
  329. "decision_target_type": "content",
  330. "decision_target_id": "content_001",
  331. "decision_action": "REJECT_CONTENT",
  332. "decision_reason_code": "pattern_recall_failed",
  333. "search_query_effect_status": "rule_blocked",
  334. "score": None,
  335. "triggered_blocking_rules": ["gate_pattern_recall_failed"],
  336. "scorecard": {"total_score": None, "score_missing": True},
  337. "decision_replay_data": {
  338. "policy_bundle_hash": "hash_001",
  339. "dispatch_id": "dispatch_content",
  340. "effect_mapping_id": "map_hard_gate_reject_rule_blocked",
  341. },
  342. "raw_payload": {
  343. "decision_id": "d_001",
  344. "search_query_effect_status": "rule_blocked",
  345. "decision_replay_data": {
  346. "policy_bundle_hash": "hash_001",
  347. "dispatch_id": "dispatch_content",
  348. },
  349. },
  350. }
  351. ],
  352. )
  353. sql, params = connection.statements[-1]
  354. values = _insert_values(sql, params)
  355. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  356. assert values["search_query_effect_status"] == "rule_blocked"
  357. assert json.loads(values["triggered_blocking_rules"]) == ["gate_pattern_recall_failed"]
  358. assert json.loads(values["scorecard"])["score_missing"] is True
  359. assert json.loads(values["decision_replay_data"])["dispatch_id"] == "dispatch_content"
  360. assert json.loads(values["raw_payload"])["decision_replay_data"]["policy_bundle_hash"] == "hash_001"
  361. def test_database_runtime_preserves_v4_score_and_walk_json_contract():
  362. connection = FakeConnection()
  363. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  364. store.append_jsonl(
  365. "run_001",
  366. "rule_decisions.jsonl",
  367. [
  368. {
  369. "record_schema_version": "runtime_record.v1",
  370. "run_id": "run_001",
  371. "policy_run_id": "policy_run_001",
  372. "decision_id": "d_v4_001",
  373. "policy_bundle_id": "policy_bundle_v4",
  374. "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
  375. "rule_pack_version": "4.0.0",
  376. "strategy_version": "V4",
  377. "decision_target_type": "content",
  378. "decision_target_id": "content_001",
  379. "decision_action": "ADD_TO_CONTENT_POOL",
  380. "decision_reason_code": "v4_query_and_platform_pass",
  381. "search_query_effect_status": "success",
  382. "score": 80,
  383. "scorecard": {
  384. "schema_version": "v4_scorecard.v1",
  385. "query_relevance_score": 82,
  386. "platform_performance_score": 78,
  387. "missing_observable_fields": ["view_count"],
  388. },
  389. "decision_replay_data": {
  390. "policy_bundle_hash": "hash_v4",
  391. "rule_pack_id": "douyin_content_discovery_rule_pack_v4",
  392. "rule_pack_version": "4.0.0",
  393. "dispatch_id": "dispatch_content_v4",
  394. "strategy_version": "V4",
  395. "allow_walk": True,
  396. "walk_gate_snapshot": {
  397. "query_relevance_score": 82,
  398. "platform_performance_score": 78,
  399. "score": 80,
  400. },
  401. },
  402. "raw_payload": {
  403. "decision_id": "d_v4_001",
  404. "v4_contract": {
  405. "query_relevance_score": 82,
  406. "platform_performance_score": 78,
  407. },
  408. },
  409. }
  410. ],
  411. )
  412. sql, params = connection.statements[-1]
  413. values = _insert_values(sql, params)
  414. assert "INSERT INTO `content_agent_rule_decisions`" in sql
  415. scorecard = json.loads(values["scorecard"])
  416. replay_data = json.loads(values["decision_replay_data"])
  417. assert scorecard["schema_version"] == "v4_scorecard.v1"
  418. assert scorecard["query_relevance_score"] == 82
  419. assert scorecard["platform_performance_score"] == 78
  420. assert scorecard["missing_observable_fields"] == ["view_count"]
  421. assert replay_data["allow_walk"] is True
  422. assert replay_data["walk_gate_snapshot"]["score"] == 80
  423. def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
  424. connection = FakeConnection()
  425. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  426. store.append_jsonl(
  427. "run_001",
  428. "search_clues.jsonl",
  429. [
  430. {
  431. "record_schema_version": "runtime_record.v1",
  432. "run_id": "run_001",
  433. "policy_run_id": "policy_run_001",
  434. "clue_id": "clue_001",
  435. "search_query_id": "q_001",
  436. "search_query": "爱国情感",
  437. "discovery_start_source": "pattern_seed",
  438. "previous_discovery_step": "search_query_generated",
  439. "result_count": 1,
  440. "pooled_content_count": 0,
  441. "review_content_count": 0,
  442. "pending_content_count": 0,
  443. "rejected_content_count": 1,
  444. "search_query_effect_status": "rule_blocked",
  445. "query_aggregation_id": "agg_query_rule_blocked",
  446. "walk_next_step": "stop_search_query",
  447. "raw_payload": {
  448. "clue_id": "clue_001",
  449. "query_aggregation_id": "agg_query_rule_blocked",
  450. "effect_status_counts": {"rule_blocked": 1},
  451. },
  452. }
  453. ],
  454. )
  455. sql, params = connection.statements[-1]
  456. values = _insert_values(sql, params)
  457. assert "INSERT INTO `content_agent_search_clues`" in sql
  458. assert "query_aggregation_id" not in values
  459. assert values["search_query_effect_status"] == "rule_blocked"
  460. assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
  461. def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event():
  462. connection = FakeConnection()
  463. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  464. query_failure = {
  465. "search_query_id": "q_001",
  466. "search_query": "接口失败",
  467. "search_query_generation_method": "item_single",
  468. "status": "failed",
  469. "error_code": "PLATFORM_REQUEST_FAILED",
  470. "message": "platform query failed",
  471. }
  472. store.append_jsonl(
  473. "run_001",
  474. "search_clues.jsonl",
  475. [
  476. {
  477. "record_schema_version": "runtime_record.v1",
  478. "run_id": "run_001",
  479. "policy_run_id": "policy_run_001",
  480. "clue_id": "clue_001",
  481. "search_query_id": "q_001",
  482. "search_query": "接口失败",
  483. "discovery_start_source": "pattern_itemset",
  484. "previous_discovery_step": "pattern_search_query",
  485. "result_count": 0,
  486. "pooled_content_count": 0,
  487. "review_content_count": 0,
  488. "pending_content_count": 0,
  489. "rejected_content_count": 0,
  490. "search_query_effect_status": "failed",
  491. "query_aggregation_id": "platform_query_failure",
  492. "walk_next_step": "stop_search_query",
  493. "raw_payload": {
  494. "clue_id": "clue_001",
  495. "query_failure": query_failure,
  496. },
  497. }
  498. ],
  499. )
  500. store.append_jsonl(
  501. "run_001",
  502. "run_events.jsonl",
  503. [
  504. {
  505. "record_schema_version": "runtime_record.v1",
  506. "run_id": "run_001",
  507. "policy_run_id": "policy_run_001",
  508. "event_id": "evt_001",
  509. "event_type": "platform_query_failed",
  510. "status": "failed",
  511. "input_ref": "search_queries.jsonl:q_001",
  512. "output_ref": "search_clues.jsonl",
  513. "error_code": "PLATFORM_REQUEST_FAILED",
  514. "message": "platform query failed",
  515. "raw_payload": {
  516. "event_id": "evt_001",
  517. "query_failure": query_failure,
  518. },
  519. }
  520. ],
  521. )
  522. clue_sql, clue_params = connection.statements[-2]
  523. clue_values = _insert_values(clue_sql, clue_params)
  524. assert "INSERT INTO `content_agent_search_clues`" in clue_sql
  525. assert "ON DUPLICATE KEY UPDATE" in clue_sql
  526. assert clue_values["search_query_effect_status"] == "failed"
  527. assert clue_values["walk_next_step"] == "stop_search_query"
  528. assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001"
  529. event_sql, event_params = connection.statements[-1]
  530. event_values = _insert_values(event_sql, event_params)
  531. assert "INSERT INTO `content_agent_run_events`" in event_sql
  532. assert "ON DUPLICATE KEY UPDATE" in event_sql
  533. assert event_values["event_type"] == "platform_query_failed"
  534. assert event_values["status"] == "failed"
  535. assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED"
  536. def test_database_runtime_writes_publish_jobs_db_only_records():
  537. connection = FakeConnection()
  538. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  539. store.write_publish_jobs(
  540. "run_001",
  541. "policy_run_001",
  542. [
  543. {
  544. "publish_job_id": "publish_job_001",
  545. "platform_content_id": "7390000000000000000",
  546. "job_status": "created",
  547. "trigger_mode": "manual_review",
  548. "request_payload": {
  549. "decision_id": "decision_001",
  550. "source_path_record_ids": ["path_001"],
  551. },
  552. "response_payload": {},
  553. }
  554. ],
  555. )
  556. sql, params = connection.statements[-1]
  557. values = _insert_values(sql, params)
  558. assert "INSERT INTO `content_agent_publish_jobs`" in sql
  559. assert "ON DUPLICATE KEY UPDATE" in sql
  560. assert values["schema_version"] == "content_agent.v1"
  561. assert values["run_id"] == "run_001"
  562. assert values["policy_run_id"] == "policy_run_001"
  563. assert values["publish_job_id"] == "publish_job_001"
  564. assert values["job_status"] == "created"
  565. assert values["trigger_mode"] == "manual_review"
  566. assert json.loads(values["request_payload"])["decision_id"] == "decision_001"
  567. def test_database_runtime_writes_author_assets():
  568. connection = FakeConnection()
  569. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  570. store.write_author_assets(
  571. [
  572. {
  573. "author_asset_id": "author_asset_001",
  574. "platform": "douyin",
  575. "platform_author_id": "author_001",
  576. "author_display_name": "作者一号",
  577. "asset_status": "active",
  578. "source_type": "runtime_author_work",
  579. "validation_status": "validated",
  580. "eligible_as_source": 1,
  581. "elderly_ratio": 0.72,
  582. "elderly_tgi": 138,
  583. "content_tags": ["人物故事"],
  584. "source_run_id": "run_001",
  585. "source_policy_run_id": "policy_run_001",
  586. "profile_snapshot": {"sample_count": 9},
  587. "evidence_refs": {"decision_ids": ["d_001"]},
  588. "raw_payload": {"author_asset_id": "author_asset_001"},
  589. }
  590. ]
  591. )
  592. sql, params = connection.statements[-1]
  593. values = _insert_values(sql, params)
  594. assert "INSERT INTO `content_agent_author_assets`" in sql
  595. assert "ON DUPLICATE KEY UPDATE" in sql
  596. assert values["schema_version"] == "content_agent.v1"
  597. assert values["author_asset_id"] == "author_asset_001"
  598. assert values["platform_author_id"] == "author_001"
  599. assert values["eligible_as_source"] == 1
  600. assert values["elderly_ratio"] == 0.72
  601. assert values["elderly_tgi"] == 138
  602. assert json.loads(values["content_tags"]) == ["人物故事"]
  603. assert json.loads(values["profile_snapshot"])["sample_count"] == 9
  604. assert json.loads(values["evidence_refs"])["decision_ids"] == ["d_001"]
  605. def test_database_runtime_writes_author_asset_roles():
  606. connection = FakeConnection()
  607. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  608. store.write_author_asset_roles(
  609. [
  610. {
  611. "author_asset_id": "author_asset_001",
  612. "role": "source_seed",
  613. "role_status": "active",
  614. "role_reason_code": "p7_author_asset_eligible",
  615. "assigned_by": "system",
  616. "source_run_id": "run_001",
  617. "raw_payload": {"role": "source_seed"},
  618. }
  619. ]
  620. )
  621. sql, params = connection.statements[-1]
  622. values = _insert_values(sql, params)
  623. assert "INSERT INTO `content_agent_author_asset_roles`" in sql
  624. assert "ON DUPLICATE KEY UPDATE" in sql
  625. assert values["schema_version"] == "content_agent.v1"
  626. assert values["author_asset_id"] == "author_asset_001"
  627. assert values["role"] == "source_seed"
  628. assert values["assigned_by"] == "system"
  629. assert json.loads(values["raw_payload"])["role"] == "source_seed"
  630. def test_database_runtime_writes_search_clue_assets():
  631. connection = FakeConnection()
  632. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  633. store.write_search_clue_assets(
  634. [
  635. {
  636. "search_clue_asset_id": "search_clue_asset_001",
  637. "platform": "douyin",
  638. "clue_type": "search_query",
  639. "normalized_clue_text": "银发旅行",
  640. "display_clue_text": "银发旅行",
  641. "promotion_status": "promoted",
  642. "reusable_priority": 1,
  643. "can_seed_next_run": 1,
  644. "first_seen_run_id": "run_001",
  645. "first_seen_policy_run_id": "policy_run_001",
  646. "summary_metrics": {"pooled_content_count": 1},
  647. "raw_payload": {"promotion_reason": "success_search_clue"},
  648. }
  649. ]
  650. )
  651. sql, params = connection.statements[-1]
  652. values = _insert_values(sql, params)
  653. assert "INSERT INTO `content_agent_search_clue_assets`" in sql
  654. assert "ON DUPLICATE KEY UPDATE" in sql
  655. assert values["schema_version"] == "content_agent.v1"
  656. assert values["search_clue_asset_id"] == "search_clue_asset_001"
  657. assert values["can_seed_next_run"] == 1
  658. assert json.loads(values["summary_metrics"])["pooled_content_count"] == 1
  659. def test_database_runtime_writes_search_clue_asset_evidence():
  660. connection = FakeConnection()
  661. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  662. store.write_search_clue_asset_evidence(
  663. [
  664. {
  665. "evidence_id": "search_clue_evidence_001",
  666. "search_clue_asset_id": "search_clue_asset_001",
  667. "run_id": "run_001",
  668. "policy_run_id": "policy_run_001",
  669. "clue_id": "clue_001",
  670. "search_query_id": "q_001",
  671. "pooled_content_count": 1,
  672. "source_path_record_ids": ["path_001"],
  673. "decision_ids": ["decision_001"],
  674. "performance_feedback_refs": [],
  675. "raw_payload": {"clue_id": "clue_001"},
  676. }
  677. ]
  678. )
  679. sql, params = connection.statements[-1]
  680. values = _insert_values(sql, params)
  681. assert "INSERT INTO `content_agent_search_clue_asset_evidence`" in sql
  682. assert "ON DUPLICATE KEY UPDATE" in sql
  683. assert values["schema_version"] == "content_agent.v1"
  684. assert values["run_id"] == "run_001"
  685. assert values["clue_id"] == "clue_001"
  686. assert json.loads(values["source_path_record_ids"]) == ["path_001"]
  687. assert json.loads(values["decision_ids"]) == ["decision_001"]
  688. def test_database_runtime_update_final_output_upserts_validation_status():
  689. connection = FakeConnection()
  690. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  691. store.update_json(
  692. "run_001",
  693. "final_output.json",
  694. {
  695. "schema_version": "runtime_record.v1",
  696. "run_id": "run_001",
  697. "policy_run_id": "policy_run_001",
  698. "summary": {"run_path_complete": True},
  699. "validation_status": "pass",
  700. },
  701. )
  702. sql, params = connection.statements[-1]
  703. values = _insert_values(sql, params)
  704. assert "INSERT INTO `content_agent_final_outputs`" in sql
  705. assert "ON DUPLICATE KEY UPDATE" in sql
  706. assert values["validation_status"] == "pass"
  707. assert json.loads(values["summary"])["run_path_complete"] is True
  708. def test_database_runtime_preserves_m5_explanation_payloads():
  709. connection = FakeConnection()
  710. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  711. explanation = {
  712. "schema_version": "v4_decision_explanation.v1",
  713. "query_relevance_score": 80,
  714. "platform_performance_score": 70,
  715. "score": 75,
  716. "allow_walk": True,
  717. "walk_gate_snapshot": {"score": 75},
  718. }
  719. v4_summary = {
  720. "schema_version": "v4_strategy_review_summary.v1",
  721. "score_buckets": {"pool": 1},
  722. "allow_walk_distribution": {"allowed": 1, "denied": 0, "missing": 0},
  723. "walk_gate_review": {"v4_gate_distribution": {"allowed": 1}},
  724. }
  725. store.update_json(
  726. "run_001",
  727. "final_output.json",
  728. {
  729. "schema_version": "runtime_record.v1",
  730. "run_id": "run_001",
  731. "policy_run_id": "policy_run_001",
  732. "summary": {},
  733. "validation_status": "pass",
  734. "decision_records": [
  735. {
  736. "decision_id": "decision_001",
  737. "v4_explanation": explanation,
  738. }
  739. ],
  740. },
  741. )
  742. store.update_json(
  743. "run_001",
  744. "strategy_review.json",
  745. {
  746. "schema_version": "runtime_record.v1",
  747. "run_id": "run_001",
  748. "policy_run_id": "policy_run_001",
  749. "review_id": "review_001",
  750. "review_status": "generated",
  751. "summary": {},
  752. "v4_summary": v4_summary,
  753. "effective_search_queries": [],
  754. "weak_search_queries": [],
  755. "top_reject_reasons": [],
  756. "productive_paths": [],
  757. "suggestions": [],
  758. "raw_payload": {"v4_summary": v4_summary},
  759. },
  760. )
  761. final_values = _insert_values(*connection.statements[-2])
  762. final_payload = json.loads(final_values["final_output"])
  763. assert final_payload["decision_records"][0]["v4_explanation"] == explanation
  764. review_values = _insert_values(*connection.statements[-1])
  765. review_payload = json.loads(review_values["raw_payload"])
  766. assert review_payload["v4_summary"] == v4_summary
  767. def test_database_runtime_reads_performance_feedback_payloads():
  768. connection = FakeConnection()
  769. connection.select_all_result = [
  770. {
  771. "raw_payload": json.dumps(
  772. {
  773. "run_id": "run_001",
  774. "policy_run_id": "policy_run_001",
  775. "feedback_id": "feedback_001",
  776. "completion_rate": 0.8,
  777. }
  778. )
  779. }
  780. ]
  781. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  782. rows = store.read_performance_feedback("run_001", "policy_run_001")
  783. sql, params = connection.statements[-1]
  784. assert "FROM `content_agent_performance_feedback`" in sql
  785. assert params == ["run_001", "policy_run_001"]
  786. assert rows[0]["feedback_id"] == "feedback_001"
  787. assert rows[0]["raw_payload"]["completion_rate"] == 0.8
  788. def test_database_runtime_read_jsonl_reconstructs_runtime_payload():
  789. connection = FakeConnection()
  790. connection.select_all_result = [
  791. {
  792. "raw_payload": json.dumps(
  793. {
  794. "record_schema_version": "runtime_record.v1",
  795. "run_id": "run_001",
  796. "policy_run_id": "policy_run_001",
  797. "search_query_id": "q_001",
  798. }
  799. )
  800. }
  801. ]
  802. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  803. rows = store.read_jsonl("run_001", "search_queries.jsonl")
  804. assert rows[0]["search_query_id"] == "q_001"
  805. assert rows[0]["raw_payload"]["search_query_id"] == "q_001"
  806. def test_database_runtime_rejects_forbidden_raw_payload_keys_in_lists():
  807. connection = FakeConnection()
  808. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  809. try:
  810. store.append_jsonl(
  811. "run_001",
  812. "search_queries.jsonl",
  813. [
  814. {
  815. "record_schema_version": "runtime_record.v1",
  816. "run_id": "run_001",
  817. "policy_run_id": "policy_run_001",
  818. "search_query_id": "q_001",
  819. "search_query": "对比分析",
  820. "raw_payload": {"items": [{"dsn": "should_not_be_stored"}]},
  821. }
  822. ],
  823. )
  824. except ValueError as exc:
  825. assert "forbidden key" in str(exc)
  826. else:
  827. raise AssertionError("expected forbidden raw_payload key to be rejected")
  828. def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
  829. connection = FakeConnection()
  830. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  831. store.update_run_record("run_001", {"unknown_field": "ignored", "status": None})
  832. assert connection.statements == []
  833. def test_database_runtime_update_run_record_persists_platform_failure_detail():
  834. connection = FakeConnection()
  835. store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
  836. store.update_run_record(
  837. "run_001",
  838. {
  839. "status": "failed",
  840. "error_code": "PLATFORM_REQUEST_FAILED",
  841. "error_detail": {
  842. "query_failures": [
  843. {
  844. "search_query_id": "q_001",
  845. "status": "failed",
  846. "error_code": "PLATFORM_REQUEST_FAILED",
  847. }
  848. ]
  849. },
  850. },
  851. )
  852. sql, params = connection.statements[-1]
  853. assert "UPDATE `content_agent_runs` SET" in sql
  854. assert "`error_detail` = %s" in sql
  855. assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001"
  856. assert params[-1] == "run_001"
  857. def test_business_modules_do_not_import_or_name_database_tables():
  858. root = Path("content_agent/business_modules")
  859. text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))
  860. assert not re.search(
  861. r"pymysql|sqlalchemy|psycopg|sqlite3|SELECT |INSERT |UPDATE |DELETE |SHOW |CREATE |ALTER |content_agent_",
  862. text,
  863. )
  864. def _config():
  865. return ContentSupplyDbConfig(
  866. host="127.0.0.1",
  867. port=3306,
  868. user="content_rw",
  869. password="dummy_password",
  870. database="content-deconstruction-supply",
  871. )
  872. def _insert_values(sql, params):
  873. match = re.search(r"\((.*?)\) VALUES", sql)
  874. assert match, sql
  875. columns = [part.strip().strip("`") for part in match.group(1).split(",")]
  876. return dict(zip(columns, params))