test_runtime_files.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. import json
  2. from pathlib import Path
  3. from content_agent.errors import ErrorCode
  4. from content_agent.integrations.runtime_files import LocalRuntimeFileStore
  5. from content_agent.run_service import RunService
  6. from content_agent.schemas import RunStartRequest
  7. from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
  8. def _start_mock_run(tmp_path, **kwargs):
  9. service = RunService(
  10. runtime_root=tmp_path / "runtime" / "v1",
  11. query_variant_client=FakeQueryVariantClient(),
  12. )
  13. kwargs.setdefault("source", str(REAL_SOURCE_FIXTURE))
  14. state = service.start_run(RunStartRequest(platform_mode="mock", **kwargs))
  15. assert state["status"] == "success"
  16. return service, state["run_id"]
  17. def test_runtime_files_are_parseable_and_consistent(tmp_path):
  18. service, run_id = _start_mock_run(tmp_path)
  19. run_dir = service.runtime.run_dir(run_id)
  20. for json_file in [
  21. "source_context.json",
  22. "pattern_seed_pack.json",
  23. "final_output.json",
  24. "strategy_review.json",
  25. ]:
  26. data = json.loads((run_dir / json_file).read_text(encoding="utf-8"))
  27. assert data["schema_version"] == "runtime_record.v1"
  28. for jsonl_file in [
  29. "search_queries.jsonl",
  30. "discovered_content_items.jsonl",
  31. "content_media_records.jsonl",
  32. "pattern_recall_evidence.jsonl",
  33. "rule_decisions.jsonl",
  34. "walk_actions.jsonl",
  35. "run_events.jsonl",
  36. "source_path_records.jsonl",
  37. "search_clues.jsonl",
  38. ]:
  39. for line in (run_dir / jsonl_file).read_text(encoding="utf-8").splitlines():
  40. json.loads(line)
  41. items = service.read_jsonl(run_id, "discovered_content_items.jsonl")
  42. decisions = service.read_jsonl(run_id, "rule_decisions.jsonl")
  43. paths = service.read_jsonl(run_id, "source_path_records.jsonl")
  44. final_output = service.read_json(run_id, "final_output.json")
  45. source_context = service.read_json(run_id, "source_context.json")
  46. policy_run_id = service.read_json(run_id, "pattern_seed_pack.json")["policy_run_id"]
  47. content_ids = {item["platform_content_id"] for item in items}
  48. decision_target_ids = {decision["decision_target_id"] for decision in decisions}
  49. assert content_ids == decision_target_ids
  50. media_records = service.read_jsonl(run_id, "content_media_records.jsonl")
  51. recall_evidence = service.read_jsonl(run_id, "pattern_recall_evidence.jsonl")
  52. walk_actions = service.read_jsonl(run_id, "walk_actions.jsonl")
  53. for row in [*items, *media_records, *recall_evidence, *decisions, *walk_actions, *paths]:
  54. assert row["policy_run_id"] == policy_run_id
  55. assert row["record_schema_version"] == "runtime_record.v1"
  56. assert row["raw_payload"]["run_id"] == run_id
  57. assert {row["walk_status"] for row in walk_actions} <= {
  58. "success",
  59. "pending",
  60. "failed",
  61. "skipped",
  62. "rule_blocked",
  63. }
  64. assert all(row["walk_action_id"].startswith("wa_") for row in walk_actions)
  65. for media_record in media_records:
  66. assert media_record["platform"] == "douyin"
  67. assert final_output["policy_run_id"] == policy_run_id
  68. assert final_output["policy"]["policy_bundle_id"] == "douyin_policy_bundle_v1"
  69. assert final_output["policy"]["rule_pack_source_ref"]["file"].endswith(
  70. "douyin_rule_packs.v1.json"
  71. )
  72. assert final_output["walk_strategy"]["walk_strategy_version"] == "V1.0"
  73. assert "policy_run_id" not in source_context
  74. assert {decision["decision_action"] for decision in decisions} <= {
  75. "ADD_TO_CONTENT_POOL",
  76. "KEEP_CONTENT_FOR_REVIEW",
  77. "REJECT_CONTENT",
  78. }
  79. assert {decision["search_query_effect_status"] for decision in decisions} <= {
  80. "success",
  81. "pending",
  82. "failed",
  83. "rule_blocked",
  84. }
  85. search_clues = service.read_jsonl(run_id, "search_clues.jsonl")
  86. assert {clue["search_query_effect_status"] for clue in search_clues} <= {
  87. "success",
  88. "pending",
  89. "failed",
  90. "rule_blocked",
  91. }
  92. path_ids = {path["source_path_record_id"] for path in paths}
  93. for asset in final_output["content_assets"]:
  94. assert set(asset["source_path_record_ids"]) <= path_ids
  95. evidence_pack = source_context["ext_data"]["evidence_pack"]
  96. assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
  97. assert evidence_pack["pattern_execution_id"] == 581
  98. assert evidence_pack["mining_config_id"] == 2082
  99. assert evidence_pack["itemset_ids"] == [1608352]
  100. validation = service.validate_run(run_id)
  101. assert validation["status"] == "pass"
  102. def test_all_platform_query_failure_writes_failed_query_runtime_records(tmp_path):
  103. service = RunService(
  104. runtime_root=tmp_path / "runtime" / "v1",
  105. query_variant_client=FakeQueryVariantClient(),
  106. )
  107. service._platform_client = lambda platform, platform_mode: _AllFailurePlatformClient()
  108. state = service.start_run(
  109. RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
  110. )
  111. assert state["status"] == "failed"
  112. assert state["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
  113. search_queries = service.read_jsonl(state["run_id"], "search_queries.jsonl")
  114. search_clues = service.read_jsonl(state["run_id"], "search_clues.jsonl")
  115. run_events = service.read_jsonl(state["run_id"], "run_events.jsonl")
  116. assert len(search_queries) == len(state["error_detail"]["query_failures"])
  117. assert {query["search_query_effect_status"] for query in search_queries} == {"failed"}
  118. assert all(query["raw_payload"]["query_failure"]["status"] == "failed" for query in search_queries)
  119. assert {clue["search_query_effect_status"] for clue in search_clues} == {"failed"}
  120. assert {clue["result_count"] for clue in search_clues} == {0}
  121. assert {clue["query_aggregation_id"] for clue in search_clues} == {"platform_query_failure"}
  122. platform_failures = [
  123. event for event in run_events if event["event_type"] == "platform_query_failed"
  124. ]
  125. assert len(platform_failures) == len(search_queries)
  126. assert {event["error_code"] for event in platform_failures} == {
  127. ErrorCode.PLATFORM_REQUEST_FAILED.value
  128. }
  129. def test_runtime_validation_catches_summary_drift(tmp_path):
  130. service, run_id = _start_mock_run(tmp_path)
  131. final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
  132. final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
  133. final_output["summary"]["pooled_content_count"] = 99
  134. final_output_path.write_text(
  135. json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
  136. encoding="utf-8",
  137. )
  138. validation = service.validate_run(run_id)
  139. assert validation["status"] == "fail"
  140. assert any(finding["check_id"] == "summary_mismatch" for finding in validation["findings"])
  141. def test_local_runtime_replaces_pattern_recall_evidence_by_id(tmp_path):
  142. runtime = LocalRuntimeFileStore(tmp_path / "runtime")
  143. runtime.prepare_run("run_001")
  144. runtime.append_jsonl(
  145. "run_001",
  146. "pattern_recall_evidence.jsonl",
  147. [
  148. {
  149. "run_id": "run_001",
  150. "policy_run_id": "policy_001",
  151. "recall_evidence_id": "recall_001",
  152. "recall_status": "pending",
  153. }
  154. ],
  155. )
  156. runtime.append_jsonl(
  157. "run_001",
  158. "pattern_recall_evidence.jsonl",
  159. [
  160. {
  161. "run_id": "run_001",
  162. "policy_run_id": "policy_001",
  163. "recall_evidence_id": "recall_001",
  164. "recall_status": "matched",
  165. }
  166. ],
  167. )
  168. rows = runtime.read_jsonl("run_001", "pattern_recall_evidence.jsonl")
  169. assert len(rows) == 1
  170. assert rows[0]["recall_status"] == "matched"
  171. def test_runtime_validation_catches_missing_policy_run_id(tmp_path):
  172. service, run_id = _start_mock_run(tmp_path)
  173. decisions_path = service.runtime.run_dir(run_id) / "rule_decisions.jsonl"
  174. decisions = [
  175. json.loads(line)
  176. for line in decisions_path.read_text(encoding="utf-8").splitlines()
  177. if line.strip()
  178. ]
  179. decisions[0].pop("policy_run_id")
  180. decisions_path.write_text(
  181. "".join(
  182. json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n"
  183. for decision in decisions
  184. ),
  185. encoding="utf-8",
  186. )
  187. validation = service.validate_run(run_id)
  188. assert validation["status"] == "fail"
  189. assert any(finding["check_id"] == "policy_run_id_mismatch" for finding in validation["findings"])
  190. def test_runtime_validation_catches_missing_record_schema_version(tmp_path):
  191. service, run_id = _start_mock_run(tmp_path)
  192. queries_path = service.runtime.run_dir(run_id) / "search_queries.jsonl"
  193. queries = [
  194. json.loads(line)
  195. for line in queries_path.read_text(encoding="utf-8").splitlines()
  196. if line.strip()
  197. ]
  198. queries[0].pop("record_schema_version")
  199. queries_path.write_text(
  200. "".join(
  201. json.dumps(query, ensure_ascii=False, separators=(",", ":")) + "\n"
  202. for query in queries
  203. ),
  204. encoding="utf-8",
  205. )
  206. validation = service.validate_run(run_id)
  207. assert validation["status"] == "fail"
  208. assert any(
  209. finding["check_id"] == "record_schema_version_missing"
  210. for finding in validation["findings"]
  211. )
  212. class _AllFailurePlatformClient:
  213. def search(self, search_query: dict) -> list[dict]:
  214. raise RuntimeError("platform unavailable")
  215. def test_runtime_validation_catches_missing_raw_payload(tmp_path):
  216. service, run_id = _start_mock_run(tmp_path)
  217. media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl"
  218. media_records = [
  219. json.loads(line)
  220. for line in media_path.read_text(encoding="utf-8").splitlines()
  221. if line.strip()
  222. ]
  223. media_records[0].pop("raw_payload")
  224. media_path.write_text(
  225. "".join(
  226. json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n"
  227. for media_record in media_records
  228. ),
  229. encoding="utf-8",
  230. )
  231. validation = service.validate_run(run_id)
  232. assert validation["status"] == "fail"
  233. assert any(finding["check_id"] == "raw_payload_missing" for finding in validation["findings"])
  234. def test_runtime_validation_catches_forbidden_raw_payload_key(tmp_path):
  235. service, run_id = _start_mock_run(tmp_path)
  236. media_path = service.runtime.run_dir(run_id) / "content_media_records.jsonl"
  237. media_records = [
  238. json.loads(line)
  239. for line in media_path.read_text(encoding="utf-8").splitlines()
  240. if line.strip()
  241. ]
  242. media_records[0]["raw_payload"]["secret"] = "should_not_be_stored"
  243. media_path.write_text(
  244. "".join(
  245. json.dumps(media_record, ensure_ascii=False, separators=(",", ":")) + "\n"
  246. for media_record in media_records
  247. ),
  248. encoding="utf-8",
  249. )
  250. validation = service.validate_run(run_id)
  251. assert validation["status"] == "fail"
  252. assert any(
  253. finding["check_id"] == "raw_payload_forbidden_key"
  254. for finding in validation["findings"]
  255. )
  256. def test_runtime_validation_catches_missing_pattern_recall_evidence(tmp_path):
  257. service, run_id = _start_mock_run(tmp_path)
  258. items_path = service.runtime.run_dir(run_id) / "discovered_content_items.jsonl"
  259. items = [
  260. json.loads(line)
  261. for line in items_path.read_text(encoding="utf-8").splitlines()
  262. if line.strip()
  263. ]
  264. items[0]["pattern_match_result"].pop("pattern_recall_evidence_id", None)
  265. items_path.write_text(
  266. "".join(
  267. json.dumps(item, ensure_ascii=False, separators=(",", ":")) + "\n"
  268. for item in items
  269. ),
  270. encoding="utf-8",
  271. )
  272. validation = service.validate_run(run_id)
  273. assert validation["status"] == "fail"
  274. assert any(
  275. finding["check_id"] == "pattern_recall_evidence_missing"
  276. for finding in validation["findings"]
  277. )
  278. def test_runtime_validation_allows_missing_decode_case_ids_in_source_evidence(tmp_path):
  279. service, run_id = _start_mock_run(tmp_path)
  280. run_dir = service.runtime.run_dir(run_id)
  281. decisions_path = run_dir / "rule_decisions.jsonl"
  282. decisions = [
  283. json.loads(line)
  284. for line in decisions_path.read_text(encoding="utf-8").splitlines()
  285. if line.strip()
  286. ]
  287. for decision in decisions:
  288. decision["source_evidence"].pop("decode_case_ids", None)
  289. decisions_path.write_text(
  290. "".join(
  291. json.dumps(decision, ensure_ascii=False, separators=(",", ":")) + "\n"
  292. for decision in decisions
  293. ),
  294. encoding="utf-8",
  295. )
  296. final_output_path = run_dir / "final_output.json"
  297. final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
  298. for section in ["content_assets", "reject_records", "decision_records"]:
  299. for row in final_output.get(section, []):
  300. row.get("source_evidence", {}).pop("decode_case_ids", None)
  301. final_output_path.write_text(
  302. json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
  303. encoding="utf-8",
  304. )
  305. validation = service.validate_run(run_id)
  306. assert validation["status"] == "pass"
  307. def test_runtime_validation_catches_missing_final_decision_record(tmp_path):
  308. service, run_id = _start_mock_run(tmp_path)
  309. final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
  310. final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
  311. final_output["decision_records"] = final_output["decision_records"][:-1]
  312. final_output_path.write_text(
  313. json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
  314. encoding="utf-8",
  315. )
  316. validation = service.validate_run(run_id)
  317. assert validation["status"] == "fail"
  318. assert any(finding["check_id"] == "final_decision_missing" for finding in validation["findings"])
  319. def test_runtime_validation_catches_platform_content_id_source_pollution(tmp_path):
  320. service, run_id = _start_mock_run(tmp_path)
  321. final_output_path = service.runtime.run_dir(run_id) / "final_output.json"
  322. final_output = json.loads(final_output_path.read_text(encoding="utf-8"))
  323. source_evidence = final_output["decision_records"][0]["source_evidence"]
  324. source_evidence["source_post_id"] = source_evidence["discovered_platform_content_id"]
  325. final_output_path.write_text(
  326. json.dumps(final_output, ensure_ascii=False, indent=2) + "\n",
  327. encoding="utf-8",
  328. )
  329. validation = service.validate_run(run_id)
  330. assert validation["status"] == "fail"
  331. assert any(
  332. finding["check_id"] == "source_evidence_content_pollution"
  333. for finding in validation["findings"]
  334. )
  335. def test_runtime_validation_catches_reject_source_path_break(tmp_path):
  336. service, run_id = _start_mock_run(tmp_path)
  337. paths_path = service.runtime.run_dir(run_id) / "source_path_records.jsonl"
  338. paths = [
  339. json.loads(line)
  340. for line in paths_path.read_text(encoding="utf-8").splitlines()
  341. if line.strip()
  342. ]
  343. paths = [
  344. path
  345. for path in paths
  346. if not (
  347. path.get("source_path_type") == "search_query_to_content"
  348. and path.get("to_node_id") == "7390000000000000099"
  349. )
  350. ]
  351. paths_path.write_text(
  352. "".join(json.dumps(path, ensure_ascii=False, separators=(",", ":")) + "\n" for path in paths),
  353. encoding="utf-8",
  354. )
  355. validation = service.validate_run(run_id)
  356. assert validation["status"] == "fail"
  357. assert any(finding["check_id"] == "source_path_broken" for finding in validation["findings"])
  358. def test_real_source_fixture_keeps_upstream_evidence_pack(tmp_path):
  359. source_path = Path("tests/fixtures/real_case_source/source_context.json")
  360. service, run_id = _start_mock_run(tmp_path, source=str(source_path))
  361. source_context = service.read_json(run_id, "source_context.json")
  362. evidence_pack = source_context["ext_data"]["evidence_pack"]
  363. decisions = service.read_jsonl(run_id, "rule_decisions.jsonl")
  364. assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
  365. assert evidence_pack["source_certainty"] == "db_validated"
  366. assert evidence_pack["validation_status"] == "passed"
  367. assert evidence_pack["source_post_id"] == "51978710"
  368. assert evidence_pack["pattern_execution_id"] == 581
  369. assert evidence_pack["mining_config_id"] == 2082
  370. assert evidence_pack["itemset_ids"] == [1608352]
  371. assert evidence_pack["upstream_run_id"] == "f405f129-3341-4f4a-98e6-fd3f73632adb"
  372. assert evidence_pack["support"] == 0.0045734552921411235
  373. assert evidence_pack["absolute_support"] == 49
  374. assert evidence_pack["decode_case_ids"] == []
  375. assert decisions[0]["source_evidence"]["source_certainty"] == "db_validated"
  376. assert (
  377. decisions[0]["source_evidence"]["discovered_platform_content_id"]
  378. not in evidence_pack["matched_post_ids"]
  379. )
  380. assert service.validate_run(run_id)["status"] == "pass"
  381. def test_demand_content_json_array_source_is_adapted_to_source_context(tmp_path):
  382. source_path = tmp_path / "demand_content.json"
  383. source_path.write_text(
  384. json.dumps(
  385. [
  386. {
  387. "id": 123,
  388. "merge_leve2": "PG Pattern V2 需求测试",
  389. "name": "爱国情感,人物故事",
  390. "suggestion": None,
  391. "score": 1.0,
  392. "dt": "20260604",
  393. "ext_data": {
  394. "evidence_pack": {
  395. "source_kind": "pattern_itemset",
  396. "pattern_source_system": "pg_pattern_v2",
  397. "case_id_type": "post_id",
  398. "source_post_id": "51978710",
  399. "pattern_execution_id": 581,
  400. "mining_config_id": 2082,
  401. "itemset_ids": [1608352],
  402. "itemset_items": [{"itemset_id": 1608352}],
  403. "category_bindings": [{"category_id": 76006}],
  404. "element_bindings": [{"category_id": 76006}],
  405. "support": 0.0045734552921411235,
  406. "absolute_support": 49,
  407. "matched_post_ids": ["51978710"],
  408. "video_ids": ["51978710"],
  409. "case_ids": ["51978710"],
  410. "decode_case_ids": [],
  411. "seed_terms": ["爱国情感", "人物故事"],
  412. "source_certainty": "db_validated",
  413. "validation_status": "passed",
  414. }
  415. },
  416. }
  417. ],
  418. ensure_ascii=False,
  419. ),
  420. encoding="utf-8",
  421. )
  422. service, run_id = _start_mock_run(tmp_path, source=str(source_path))
  423. source_context = service.read_json(run_id, "source_context.json")
  424. evidence_pack = source_context["ext_data"]["evidence_pack"]
  425. assert source_context["demand_content_id"] == "123"
  426. assert evidence_pack["pattern_source_system"] == "pg_pattern_v2"
  427. assert evidence_pack["pattern_execution_id"] == 581
  428. assert evidence_pack["mining_config_id"] == 2082
  429. assert evidence_pack["itemset_ids"] == [1608352]
  430. assert service.validate_run(run_id)["status"] == "pass"
  431. def test_old_mysql_source_system_is_rejected(tmp_path):
  432. source_path = tmp_path / "source_context.json"
  433. source_path.write_text(
  434. json.dumps(
  435. {
  436. "run_id": "old_run",
  437. "demand_content_id": "old",
  438. "merge_leve2": "历史样例",
  439. "name": "旧 MySQL 样例",
  440. "ext_data": {
  441. "evidence_pack": {
  442. "source_kind": "pattern_itemset",
  443. "pattern_source_system": "mysql_topic_pattern",
  444. "case_id_type": "post_id",
  445. "source_post_id": "51978710",
  446. "pattern_execution_id": 581,
  447. "mining_config_id": 2082,
  448. "itemset_ids": [1608352],
  449. "itemset_items": [{"itemset_id": 1608352}],
  450. "category_bindings": [{"category_id": 76006}],
  451. "support": 0.0045734552921411235,
  452. "absolute_support": 49,
  453. "matched_post_ids": ["51978710"],
  454. "video_ids": ["51978710"],
  455. "case_ids": ["51978710"],
  456. "decode_case_ids": [],
  457. "seed_terms": ["爱国情感", "人物故事"],
  458. "source_certainty": "db_validated",
  459. "validation_status": "passed",
  460. }
  461. },
  462. },
  463. ensure_ascii=False,
  464. ),
  465. encoding="utf-8",
  466. )
  467. service = RunService(runtime_root=tmp_path / "runtime" / "v1")
  468. state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path)))
  469. assert state["status"] == "failed"
  470. assert state["error_code"] == ErrorCode.INVALID_SOURCE.value
  471. assert state["errors"] == ["invalid source"]