test_concurrency_consistency.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. """V3-M5: 并发判定的确定性验收——并发结果必须与串行逐条一致。
  2. 固定 run_id(R9)+ 确定性 fake 下,max_workers=1(串行)与 4(并发)产出的核心产物
  3. 剔除时间戳键后逐条相等;Jittered fake 强制乱序完成以暴露 offset 错位;
  4. 配额截断按 offset 预判(与完成序无关);analyze 意外异常经 _safe_analyze 兜底不炸 run。
  5. """
  6. from __future__ import annotations
  7. from typing import Any
  8. from content_agent.business_modules.content_discovery import pattern_recall
  9. from content_agent.business_modules.content_discovery.pattern_recall import recall_decision
  10. from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
  11. from content_agent.integrations.runtime_files import LocalRuntimeFileStore
  12. from content_agent.run_service import RunService
  13. from tests.gemini_helpers import (
  14. FakeGeminiVideoClient,
  15. JitteredFakeGeminiVideoClient,
  16. fake_gemini_pool,
  17. )
  18. from tests.replay_harness import replay_case
  19. _CORE_FILES = [
  20. "discovered_content_items.jsonl",
  21. "pattern_recall_evidence.jsonl",
  22. "rule_decisions.jsonl",
  23. "walk_actions.jsonl",
  24. "source_path_records.jsonl",
  25. ]
  26. _TIMESTAMP_KEYS = {"created_at", "updated_at", "started_at", "ended_at", "duration_ms"}
  27. def _scrub(value: Any) -> Any:
  28. """递归剔除时间戳键(两次 run 的 datetime.now 必然不同,不属确定性合同)。"""
  29. if isinstance(value, dict):
  30. return {k: _scrub(v) for k, v in value.items() if k not in _TIMESTAMP_KEYS}
  31. if isinstance(value, list):
  32. return [_scrub(item) for item in value]
  33. return value
  34. def test_serial_vs_concurrent_replay_identical(tmp_path, monkeypatch):
  35. run_id = "v1_run_m5fixed00001"
  36. artifacts = {}
  37. for label, workers in [("serial", 1), ("concurrent", 4)]:
  38. monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
  39. artifacts[label] = replay_case(
  40. "real_id45",
  41. runtime_root=tmp_path / label,
  42. gemini_video_client=FakeGeminiVideoClient(),
  43. run_id=run_id,
  44. )
  45. assert artifacts["serial"].run_id == artifacts["concurrent"].run_id == run_id
  46. for filename in _CORE_FILES:
  47. serial = _scrub(artifacts["serial"].files[filename])
  48. concurrent = _scrub(artifacts["concurrent"].files[filename])
  49. assert serial == concurrent, f"{filename} diverged between serial and concurrent"
  50. def _synthetic_recall_inputs(count: int) -> tuple[list, list, list]:
  51. items = [{"platform_content_id": f"content_{i:03d}", "platform": "douyin"} for i in range(count)]
  52. media = [{"platform_content_id": f"content_{i:03d}"} for i in range(count)]
  53. bundles = [{"content": {"platform_content_id": f"content_{i:03d}"}} for i in range(count)]
  54. return items, media, bundles
  55. def test_jittered_completion_preserves_offset_order(tmp_path):
  56. # 每条内容的预置结果不同;乱序完成后若 offset 错位,判定会张冠李戴。
  57. items, media, bundles = _synthetic_recall_inputs(8)
  58. results = {
  59. item["platform_content_id"]: {**fake_gemini_pool(), "relevance_score": round(0.1 * (i + 1), 2)}
  60. for i, item in enumerate(items)
  61. }
  62. runtime = LocalRuntimeFileStore(tmp_path / "rt")
  63. runtime.prepare_run("run_001")
  64. recalled = pattern_recall.run(
  65. "run_001", "policy_run_001", items, media, bundles, {}, runtime,
  66. JitteredFakeGeminiVideoClient(result_by_content_id=results),
  67. )
  68. for i, updated in enumerate(recalled["discovered_content_items"]):
  69. expected = results[updated["platform_content_id"]]["relevance_score"]
  70. assert updated["pattern_match_result"]["relevance_score"] == expected
  71. assert updated["pattern_match_result"]["pattern_recall_evidence_id"] == f"recall_{i + 1:03d}"
  72. def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
  73. items, media, bundles = _synthetic_recall_inputs(5)
  74. statuses = {}
  75. for label, workers in [("serial", 1), ("concurrent", 4)]:
  76. monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
  77. runtime = LocalRuntimeFileStore(tmp_path / label)
  78. runtime.prepare_run("run_001")
  79. client = QuotaCappedGeminiVideoClient(FakeGeminiVideoClient(), cap=2)
  80. recalled = pattern_recall.run(
  81. "run_001", "policy_run_001", items, media, bundles, {}, runtime, client,
  82. )
  83. statuses[label] = [
  84. (row["pattern_match_result"]["judge_status"], row["pattern_match_result"]["reason"])
  85. for row in recalled["discovered_content_items"]
  86. ]
  87. assert client.used == 2
  88. # 截断边界按 offset 预判:前 2 条真判、后 3 条配额拒,串/并行完全一致。
  89. assert statuses["serial"] == statuses["concurrent"]
  90. assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "failed", "failed", "failed"]
  91. assert all(reason == "gemini_quota_exhausted" for _, reason in statuses["serial"][2:])
  92. def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
  93. import content_agent.run_service as run_service_module
  94. monkeypatch.setattr(run_service_module, "_gemini_calls_cap", lambda: 2)
  95. artifacts = replay_case(
  96. "sph_caihong",
  97. runtime_root=tmp_path / "rt",
  98. gemini_video_client=FakeGeminiVideoClient(),
  99. )
  100. assert artifacts.state["status"] == "success"
  101. quota_rows = [
  102. row for row in artifacts.files["pattern_recall_evidence.jsonl"]
  103. if row["evidence_summary"]["reason"] == "gemini_quota_exhausted"
  104. ]
  105. assert quota_rows
  106. assert all(row["evidence_summary"]["judge_status"] == "failed" for row in quota_rows)
  107. quota_events = [
  108. row for row in artifacts.files["run_events.jsonl"]
  109. if row["event_type"] == "gemini_quota_exhausted"
  110. ]
  111. assert len(quota_events) == 1
  112. assert quota_events[0]["raw_payload"]["cap"] == 2
  113. assert quota_events[0]["raw_payload"]["used"] == 2
  114. def test_analyze_exception_does_not_break_run(tmp_path):
  115. class RaisingGeminiVideoClient(FakeGeminiVideoClient):
  116. def analyze(self, content, media, source_context):
  117. raise RuntimeError("boom")
  118. artifacts = replay_case(
  119. "real_id45",
  120. runtime_root=tmp_path / "rt",
  121. gemini_video_client=RaisingGeminiVideoClient(),
  122. )
  123. # _safe_analyze 兜底:意外异常转 failed 判定,run 本身不崩(与串行 analyze 自吞语义一致)。
  124. assert artifacts.state["status"] == "success"
  125. assert artifacts.files["pattern_recall_evidence.jsonl"]
  126. assert all(
  127. row["evidence_summary"]["judge_status"] == "failed"
  128. and row["evidence_summary"]["reason"].startswith("analyze_raised")
  129. for row in artifacts.files["pattern_recall_evidence.jsonl"]
  130. )