gemini_helpers.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """Deterministic Gemini video-judgment fakes (V3-M0A, schema backfilled in M2).
  2. Factories return the real M2 structured-output schema:
  3. fit_senior_50plus / fit_confidence / relevance_score / reason (+ status on fail).
  4. """
  5. from __future__ import annotations
  6. import copy
  7. import hashlib
  8. import threading
  9. import time
  10. from typing import Any
  11. def fake_gemini_pool() -> dict[str, Any]:
  12. return {"fit_senior_50plus": True, "fit_confidence": 0.9, "relevance_score": 0.85, "reason": "pool stub"}
  13. def fake_gemini_review() -> dict[str, Any]:
  14. return {"fit_senior_50plus": True, "fit_confidence": 0.8, "relevance_score": 0.45, "reason": "review stub"}
  15. def fake_gemini_fail(reason: str = "gemini_timeout") -> dict[str, Any]:
  16. return {
  17. "fit_senior_50plus": False,
  18. "fit_confidence": 0.0,
  19. "relevance_score": 0.0,
  20. "reason": reason,
  21. "status": "failed",
  22. }
  23. class FakeGeminiVideoClient:
  24. def __init__(
  25. self,
  26. *,
  27. result_by_content_id: dict[str, dict[str, Any]] | None = None,
  28. default_result: dict[str, Any] | None = None,
  29. ) -> None:
  30. self.result_by_content_id = result_by_content_id or {}
  31. self.default_result = default_result or fake_gemini_pool()
  32. self.calls: list[dict[str, Any]] = []
  33. self._lock = threading.Lock() # M5: analyze 会被并发调用,calls 记录需加锁
  34. def analyze(
  35. self,
  36. content: dict[str, Any],
  37. media: dict[str, Any],
  38. source_context: dict[str, Any],
  39. ) -> dict[str, Any]:
  40. with self._lock:
  41. self.calls.append(
  42. {"content": copy.deepcopy(content), "media": copy.deepcopy(media)}
  43. )
  44. content_id = str(content.get("platform_content_id", ""))
  45. result = self.result_by_content_id.get(content_id, self.default_result)
  46. return copy.deepcopy(result)
  47. class JitteredFakeGeminiVideoClient(FakeGeminiVideoClient):
  48. """按 content_id 哈希定 0-9ms sleep(确定性),强制完成顺序≠提交顺序,
  49. 暴露并发回收未按 offset 归位的 bug;返回值仍由 content_id 决定。"""
  50. def analyze(
  51. self,
  52. content: dict[str, Any],
  53. media: dict[str, Any],
  54. source_context: dict[str, Any],
  55. ) -> dict[str, Any]:
  56. digest = hashlib.sha1(str(content.get("platform_content_id", "")).encode("utf-8")).hexdigest()
  57. time.sleep((int(digest[:4], 16) % 10) / 1000.0)
  58. return super().analyze(content, media, source_context)