| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- """V3-M5: 并发判定的确定性验收——并发结果必须与串行逐条一致。
- 固定 run_id(R9)+ 确定性 fake 下,max_workers=1(串行)与 4(并发)产出的核心产物
- 剔除时间戳键后逐条相等;Jittered fake 强制乱序完成以暴露 offset 错位;
- 配额截断按 offset 预判(与完成序无关);analyze 意外异常经 _safe_analyze 兜底不炸 run。
- """
- from __future__ import annotations
- from typing import Any
- from content_agent.business_modules.content_discovery import pattern_recall
- from content_agent.business_modules.content_discovery.pattern_recall import recall_decision
- from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore
- from content_agent.run_service import RunService
- from tests.gemini_helpers import (
- FakeGeminiVideoClient,
- JitteredFakeGeminiVideoClient,
- fake_gemini_pool,
- )
- from tests.replay_harness import replay_case
- _CORE_FILES = [
- "discovered_content_items.jsonl",
- "pattern_recall_evidence.jsonl",
- "rule_decisions.jsonl",
- "walk_actions.jsonl",
- "source_path_records.jsonl",
- ]
- _TIMESTAMP_KEYS = {"created_at", "updated_at", "started_at", "ended_at", "duration_ms"}
- def _scrub(value: Any) -> Any:
- """递归剔除时间戳键(两次 run 的 datetime.now 必然不同,不属确定性合同)。"""
- if isinstance(value, dict):
- return {k: _scrub(v) for k, v in value.items() if k not in _TIMESTAMP_KEYS}
- if isinstance(value, list):
- return [_scrub(item) for item in value]
- return value
- def test_serial_vs_concurrent_replay_identical(tmp_path, monkeypatch):
- run_id = "v1_run_m5fixed00001"
- artifacts = {}
- for label, workers in [("serial", 1), ("concurrent", 4)]:
- monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
- artifacts[label] = replay_case(
- "real_id45",
- runtime_root=tmp_path / label,
- gemini_video_client=FakeGeminiVideoClient(),
- run_id=run_id,
- )
- assert artifacts["serial"].run_id == artifacts["concurrent"].run_id == run_id
- for filename in _CORE_FILES:
- serial = _scrub(artifacts["serial"].files[filename])
- concurrent = _scrub(artifacts["concurrent"].files[filename])
- assert serial == concurrent, f"{filename} diverged between serial and concurrent"
- def _synthetic_recall_inputs(count: int) -> tuple[list, list, list]:
- items = [{"platform_content_id": f"content_{i:03d}", "platform": "douyin"} for i in range(count)]
- media = [{"platform_content_id": f"content_{i:03d}"} for i in range(count)]
- bundles = [{"content": {"platform_content_id": f"content_{i:03d}"}} for i in range(count)]
- return items, media, bundles
- def test_jittered_completion_preserves_offset_order(tmp_path):
- # 每条内容的预置结果不同;乱序完成后若 offset 错位,判定会张冠李戴。
- items, media, bundles = _synthetic_recall_inputs(8)
- results = {
- item["platform_content_id"]: {**fake_gemini_pool(), "relevance_score": round(0.1 * (i + 1), 2)}
- for i, item in enumerate(items)
- }
- runtime = LocalRuntimeFileStore(tmp_path / "rt")
- runtime.prepare_run("run_001")
- recalled = pattern_recall.run(
- "run_001", "policy_run_001", items, media, bundles, {}, runtime,
- JitteredFakeGeminiVideoClient(result_by_content_id=results),
- )
- for i, updated in enumerate(recalled["discovered_content_items"]):
- expected = results[updated["platform_content_id"]]["relevance_score"]
- assert updated["pattern_match_result"]["relevance_score"] == expected
- assert updated["pattern_match_result"]["pattern_recall_evidence_id"] == f"recall_{i + 1:03d}"
- def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
- items, media, bundles = _synthetic_recall_inputs(5)
- statuses = {}
- for label, workers in [("serial", 1), ("concurrent", 4)]:
- monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
- runtime = LocalRuntimeFileStore(tmp_path / label)
- runtime.prepare_run("run_001")
- client = QuotaCappedGeminiVideoClient(FakeGeminiVideoClient(), cap=2)
- recalled = pattern_recall.run(
- "run_001", "policy_run_001", items, media, bundles, {}, runtime, client,
- )
- statuses[label] = [
- (row["pattern_match_result"]["judge_status"], row["pattern_match_result"]["reason"])
- for row in recalled["discovered_content_items"]
- ]
- assert client.used == 2
- # 截断边界按 offset 预判:前 2 条真判、后 3 条配额拒,串/并行完全一致。
- assert statuses["serial"] == statuses["concurrent"]
- assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "failed", "failed", "failed"]
- assert all(reason == "gemini_quota_exhausted" for _, reason in statuses["serial"][2:])
- def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
- import content_agent.run_service as run_service_module
- monkeypatch.setattr(run_service_module, "_gemini_calls_cap", lambda: 2)
- artifacts = replay_case(
- "sph_caihong",
- runtime_root=tmp_path / "rt",
- gemini_video_client=FakeGeminiVideoClient(),
- )
- assert artifacts.state["status"] == "success"
- quota_rows = [
- row for row in artifacts.files["pattern_recall_evidence.jsonl"]
- if row["evidence_summary"]["reason"] == "gemini_quota_exhausted"
- ]
- assert quota_rows
- assert all(row["evidence_summary"]["judge_status"] == "failed" for row in quota_rows)
- quota_events = [
- row for row in artifacts.files["run_events.jsonl"]
- if row["event_type"] == "gemini_quota_exhausted"
- ]
- assert len(quota_events) == 1
- assert quota_events[0]["raw_payload"]["cap"] == 2
- assert quota_events[0]["raw_payload"]["used"] == 2
- def test_analyze_exception_does_not_break_run(tmp_path):
- class RaisingGeminiVideoClient(FakeGeminiVideoClient):
- def analyze(self, content, media, source_context):
- raise RuntimeError("boom")
- artifacts = replay_case(
- "real_id45",
- runtime_root=tmp_path / "rt",
- gemini_video_client=RaisingGeminiVideoClient(),
- )
- # _safe_analyze 兜底:意外异常转 failed 判定,run 本身不崩(与串行 analyze 自吞语义一致)。
- assert artifacts.state["status"] == "success"
- assert artifacts.files["pattern_recall_evidence.jsonl"]
- assert all(
- row["evidence_summary"]["judge_status"] == "failed"
- and row["evidence_summary"]["reason"].startswith("analyze_raised")
- for row in artifacts.files["pattern_recall_evidence.jsonl"]
- )
|