Просмотр исходного кода

test(v3-m5d): 并发一致性验收 + R9 固定 run_id 注入

- 新建 test_concurrency_consistency.py 5 例:①串行(workers=1)vs 并发(4)固定 run_id
  下 5 类核心 jsonl 递归剔时戳键后逐条相等(含 wa_id/decision_id/recall_evidence_id);
  ②JitteredFake(sha1(content_id) 定 0-9ms sleep)强制乱序完成,验 offset 归位无错位;
  ③cap=2 确定性截断(前2真判/后3 quota,串并行边界同);④配额命中 evidence+run_event
  双通道可观测;⑤analyze 抛错经 _safe_analyze 兜底 run 仍 success
- R9 兑现:RunStartRequest 加可选 run_id / start_run request.run_id or 随机 / replay_case 透传
- FakeGeminiVideoClient.calls 加 threading.Lock(并发下消 flaky)+ 新增 Jittered 变体
- 现有测试零改动(并发不改结果);基线 321→326 passed,一致性测试 3 连跑稳定

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee 2 дней назад
Родитель
Сommit
8fc18b1cf3
4 измененных файлов с 180 добавлено и 4 удалено
  1. 4 0
      content_agent/schemas.py
  2. 23 3
      tests/gemini_helpers.py
  3. 2 1
      tests/replay_harness.py
  4. 151 0
      tests/test_concurrency_consistency.py

+ 4 - 0
content_agent/schemas.py

@@ -10,6 +10,10 @@ from content_agent.constants import DEFAULT_STRATEGY_VERSION
 class RunStartRequest(BaseModel):
     model_config = ConfigDict(extra="forbid")
 
+    run_id: str | None = Field(
+        default=None,
+        description="Optional fixed run_id (V3-M5: deterministic replay for concurrency-consistency tests).",
+    )
     source: str | None = Field(
         default=None,
         description="Optional source_context.json or DemandAgent demand_content.json path.",

+ 23 - 3
tests/gemini_helpers.py

@@ -7,6 +7,9 @@ fit_senior_50plus / fit_confidence / relevance_score / reason (+ status on fail)
 from __future__ import annotations
 
 import copy
+import hashlib
+import threading
+import time
 from typing import Any
 
 
@@ -38,6 +41,7 @@ class FakeGeminiVideoClient:
         self.result_by_content_id = result_by_content_id or {}
         self.default_result = default_result or fake_gemini_pool()
         self.calls: list[dict[str, Any]] = []
+        self._lock = threading.Lock()  # M5: analyze 会被并发调用,calls 记录需加锁
 
     def analyze(
         self,
@@ -45,9 +49,25 @@ class FakeGeminiVideoClient:
         media: dict[str, Any],
         source_context: dict[str, Any],
     ) -> dict[str, Any]:
-        self.calls.append(
-            {"content": copy.deepcopy(content), "media": copy.deepcopy(media)}
-        )
+        with self._lock:
+            self.calls.append(
+                {"content": copy.deepcopy(content), "media": copy.deepcopy(media)}
+            )
         content_id = str(content.get("platform_content_id", ""))
         result = self.result_by_content_id.get(content_id, self.default_result)
         return copy.deepcopy(result)
+
+
+class JitteredFakeGeminiVideoClient(FakeGeminiVideoClient):
+    """按 content_id 哈希定 0-9ms sleep(确定性),强制完成顺序≠提交顺序,
+    暴露并发回收未按 offset 归位的 bug;返回值仍由 content_id 决定。"""
+
+    def analyze(
+        self,
+        content: dict[str, Any],
+        media: dict[str, Any],
+        source_context: dict[str, Any],
+    ) -> dict[str, Any]:
+        digest = hashlib.sha1(str(content.get("platform_content_id", "")).encode("utf-8")).hexdigest()
+        time.sleep((int(digest[:4], 16) % 10) / 1000.0)
+        return super().analyze(content, media, source_context)

+ 2 - 1
tests/replay_harness.py

@@ -67,6 +67,7 @@ def replay_case(
     cases_dir: Path | str = CASES_DIR,
     config_overrides: dict[str, Any] | None = None,
     gemini_video_client: GeminiVideoClient | None = None,
+    run_id: str | None = None,
 ) -> RunArtifacts:
     corpus = load_corpus(case_id, cases_dir)
     source_context = corpus["source_context.json"]
@@ -89,7 +90,7 @@ def replay_case(
         service.policy_store = config_overrides["policy_store"]
     service._platform_client = lambda platform, platform_mode: CorpusPlatformClient(discovered)
 
-    state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path)))
+    state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path), run_id=run_id))
 
     files: dict[str, Any] = {}
     for filename in RUNTIME_FILENAMES:

+ 151 - 0
tests/test_concurrency_consistency.py

@@ -0,0 +1,151 @@
+"""V3-M5: 并发判定的确定性验收——并发结果必须与串行逐条一致。
+
+固定 run_id(R9)+ 确定性 fake 下,max_workers=1(串行)与 4(并发)产出的核心产物
+剔除时间戳键后逐条相等;Jittered fake 强制乱序完成以暴露 offset 错位;
+配额截断按 offset 预判(与完成序无关);analyze 意外异常经 _safe_analyze 兜底不炸 run。
+"""
+
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.business_modules.content_discovery import pattern_recall
+from content_agent.business_modules.content_discovery.pattern_recall import recall_decision
+from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore
+from content_agent.run_service import RunService
+from tests.gemini_helpers import (
+    FakeGeminiVideoClient,
+    JitteredFakeGeminiVideoClient,
+    fake_gemini_pool,
+)
+from tests.replay_harness import replay_case
+
+_CORE_FILES = [
+    "discovered_content_items.jsonl",
+    "pattern_recall_evidence.jsonl",
+    "rule_decisions.jsonl",
+    "walk_actions.jsonl",
+    "source_path_records.jsonl",
+]
+_TIMESTAMP_KEYS = {"created_at", "updated_at", "started_at", "ended_at", "duration_ms"}
+
+
+def _scrub(value: Any) -> Any:
+    """递归剔除时间戳键(两次 run 的 datetime.now 必然不同,不属确定性合同)。"""
+    if isinstance(value, dict):
+        return {k: _scrub(v) for k, v in value.items() if k not in _TIMESTAMP_KEYS}
+    if isinstance(value, list):
+        return [_scrub(item) for item in value]
+    return value
+
+
+def test_serial_vs_concurrent_replay_identical(tmp_path, monkeypatch):
+    run_id = "v1_run_m5fixed00001"
+    artifacts = {}
+    for label, workers in [("serial", 1), ("concurrent", 4)]:
+        monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
+        artifacts[label] = replay_case(
+            "real_id45",
+            runtime_root=tmp_path / label,
+            gemini_video_client=FakeGeminiVideoClient(),
+            run_id=run_id,
+        )
+    assert artifacts["serial"].run_id == artifacts["concurrent"].run_id == run_id
+    for filename in _CORE_FILES:
+        serial = _scrub(artifacts["serial"].files[filename])
+        concurrent = _scrub(artifacts["concurrent"].files[filename])
+        assert serial == concurrent, f"{filename} diverged between serial and concurrent"
+
+
+def _synthetic_recall_inputs(count: int) -> tuple[list, list, list]:
+    items = [{"platform_content_id": f"content_{i:03d}", "platform": "douyin"} for i in range(count)]
+    media = [{"platform_content_id": f"content_{i:03d}"} for i in range(count)]
+    bundles = [{"content": {"platform_content_id": f"content_{i:03d}"}} for i in range(count)]
+    return items, media, bundles
+
+
+def test_jittered_completion_preserves_offset_order(tmp_path):
+    # 每条内容的预置结果不同;乱序完成后若 offset 错位,判定会张冠李戴。
+    items, media, bundles = _synthetic_recall_inputs(8)
+    results = {
+        item["platform_content_id"]: {**fake_gemini_pool(), "relevance_score": round(0.1 * (i + 1), 2)}
+        for i, item in enumerate(items)
+    }
+    runtime = LocalRuntimeFileStore(tmp_path / "rt")
+    runtime.prepare_run("run_001")
+    recalled = pattern_recall.run(
+        "run_001", "policy_run_001", items, media, bundles, {}, runtime,
+        JitteredFakeGeminiVideoClient(result_by_content_id=results),
+    )
+    for i, updated in enumerate(recalled["discovered_content_items"]):
+        expected = results[updated["platform_content_id"]]["relevance_score"]
+        assert updated["pattern_match_result"]["relevance_score"] == expected
+        assert updated["pattern_match_result"]["pattern_recall_evidence_id"] == f"recall_{i + 1:03d}"
+
+
+def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
+    items, media, bundles = _synthetic_recall_inputs(5)
+    statuses = {}
+    for label, workers in [("serial", 1), ("concurrent", 4)]:
+        monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
+        runtime = LocalRuntimeFileStore(tmp_path / label)
+        runtime.prepare_run("run_001")
+        client = QuotaCappedGeminiVideoClient(FakeGeminiVideoClient(), cap=2)
+        recalled = pattern_recall.run(
+            "run_001", "policy_run_001", items, media, bundles, {}, runtime, client,
+        )
+        statuses[label] = [
+            (row["pattern_match_result"]["judge_status"], row["pattern_match_result"]["reason"])
+            for row in recalled["discovered_content_items"]
+        ]
+        assert client.used == 2
+    # 截断边界按 offset 预判:前 2 条真判、后 3 条配额拒,串/并行完全一致。
+    assert statuses["serial"] == statuses["concurrent"]
+    assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "failed", "failed", "failed"]
+    assert all(reason == "gemini_quota_exhausted" for _, reason in statuses["serial"][2:])
+
+
+def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
+    import content_agent.run_service as run_service_module
+
+    monkeypatch.setattr(run_service_module, "_gemini_calls_cap", lambda: 2)
+    artifacts = replay_case(
+        "sph_caihong",
+        runtime_root=tmp_path / "rt",
+        gemini_video_client=FakeGeminiVideoClient(),
+    )
+    assert artifacts.state["status"] == "success"
+    quota_rows = [
+        row for row in artifacts.files["pattern_recall_evidence.jsonl"]
+        if row["evidence_summary"]["reason"] == "gemini_quota_exhausted"
+    ]
+    assert quota_rows
+    assert all(row["evidence_summary"]["judge_status"] == "failed" for row in quota_rows)
+    quota_events = [
+        row for row in artifacts.files["run_events.jsonl"]
+        if row["event_type"] == "gemini_quota_exhausted"
+    ]
+    assert len(quota_events) == 1
+    assert quota_events[0]["raw_payload"]["cap"] == 2
+    assert quota_events[0]["raw_payload"]["used"] == 2
+
+
+def test_analyze_exception_does_not_break_run(tmp_path):
+    class RaisingGeminiVideoClient(FakeGeminiVideoClient):
+        def analyze(self, content, media, source_context):
+            raise RuntimeError("boom")
+
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=tmp_path / "rt",
+        gemini_video_client=RaisingGeminiVideoClient(),
+    )
+    # _safe_analyze 兜底:意外异常转 failed 判定,run 本身不崩(与串行 analyze 自吞语义一致)。
+    assert artifacts.state["status"] == "success"
+    assert artifacts.files["pattern_recall_evidence.jsonl"]
+    assert all(
+        row["evidence_summary"]["judge_status"] == "failed"
+        and row["evidence_summary"]["reason"].startswith("analyze_raised")
+        for row in artifacts.files["pattern_recall_evidence.jsonl"]
+    )