| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- """Deterministic Gemini video-judgment fakes (V3-M0A, schema backfilled in M2).
- Factories return the real M2 structured-output schema:
- fit_senior_50plus / fit_confidence / relevance_score / reason (+ status on fail).
- """
- from __future__ import annotations
- import copy
- import hashlib
- import threading
- import time
- from typing import Any
- def fake_gemini_pool() -> dict[str, Any]:
- return {"fit_senior_50plus": True, "fit_confidence": 0.9, "relevance_score": 0.85, "reason": "pool stub"}
- def fake_gemini_review() -> dict[str, Any]:
- return {"fit_senior_50plus": True, "fit_confidence": 0.8, "relevance_score": 0.45, "reason": "review stub"}
- def fake_gemini_fail(reason: str = "gemini_timeout") -> dict[str, Any]:
- return {
- "fit_senior_50plus": False,
- "fit_confidence": 0.0,
- "relevance_score": 0.0,
- "reason": reason,
- "status": "failed",
- }
- class FakeGeminiVideoClient:
- def __init__(
- self,
- *,
- result_by_content_id: dict[str, dict[str, Any]] | None = None,
- default_result: dict[str, Any] | None = None,
- ) -> None:
- self.result_by_content_id = result_by_content_id or {}
- self.default_result = default_result or fake_gemini_pool()
- self.calls: list[dict[str, Any]] = []
- self._lock = threading.Lock() # M5: analyze 会被并发调用,calls 记录需加锁
- def analyze(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- with self._lock:
- self.calls.append(
- {"content": copy.deepcopy(content), "media": copy.deepcopy(media)}
- )
- content_id = str(content.get("platform_content_id", ""))
- result = self.result_by_content_id.get(content_id, self.default_result)
- return copy.deepcopy(result)
- class JitteredFakeGeminiVideoClient(FakeGeminiVideoClient):
- """按 content_id 哈希定 0-9ms sleep(确定性),强制完成顺序≠提交顺序,
- 暴露并发回收未按 offset 归位的 bug;返回值仍由 content_id 决定。"""
- def analyze(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- digest = hashlib.sha1(str(content.get("platform_content_id", "")).encode("utf-8")).hexdigest()
- time.sleep((int(digest[:4], 16) % 10) / 1000.0)
- return super().analyze(content, media, source_context)
|