gemini_quota.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. """单 run Gemini 调用配额闸(V3-M5C)。
  2. 实现 GeminiVideoClient Protocol,包装真/假 client 在 run_service 单点注入——
  3. 对 recall/walk_engine/graph 全部 analyze 透明,零签名改。每 run new 一个实例,
  4. 计数跨"初始 recall + walk 内两次 recall"累计。截断的正常路径是 recall 提交前
  5. 按 offset 预判(remaining_quota/consume);analyze 内超额返回 _fail 仅作 backstop。
  6. """
  7. from __future__ import annotations
  8. import threading
  9. from typing import Any
  10. from content_agent.integrations.gemini_video import _fail
  11. from content_agent.interfaces import GeminiVideoClient
  12. _UNLIMITED = 1_000_000_000
  13. class QuotaCappedGeminiVideoClient:
  14. def __init__(self, inner: GeminiVideoClient, cap: int | None) -> None:
  15. self.inner = inner
  16. self.cap = cap
  17. self.used = 0
  18. self._lock = threading.Lock()
  19. def remaining_quota(self) -> int:
  20. with self._lock:
  21. return (self.cap - self.used) if self.cap is not None else _UNLIMITED
  22. def consume(self, count: int) -> None:
  23. with self._lock:
  24. self.used += count
  25. def analyze(
  26. self,
  27. content: dict[str, Any],
  28. media: dict[str, Any],
  29. source_context: dict[str, Any],
  30. ) -> dict[str, Any]:
  31. if self.cap is not None and self.remaining_quota() < 0:
  32. return _fail("gemini_quota_exhausted")
  33. return self.inner.analyze(content, media, source_context)