gemini_video.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """Gemini 视频判定 client (V3-M2B).
  2. 实现 interfaces.GeminiVideoClient.analyze:取视频(video_fetch)→ 多模态投给
  3. Gemini(OpenRouter image_url data URL)→ 解析结构化判定 4 字段。复用 query_variant
  4. 的 OpenRouter httpx 骨架,不引入新 SDK。任何失败一律返回 fail 结构,不抛、不卡 run。
  5. """
  6. from __future__ import annotations
  7. import json
  8. import os
  9. from pathlib import Path
  10. from typing import Any, Callable, Mapping
  11. import httpx
  12. from content_agent.integrations import video_fetch
  13. DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
  14. DEFAULT_VIDEO_MODEL = "google/gemini-3-flash-preview"
  15. DEFAULT_VIDEO_TIMEOUT_SECONDS = 90.0
  16. # 原片留档目录(2026-06-12 拍板:全量存,含未过审;相对仓库根,服务器同款)。
  17. DEFAULT_RAW_VIDEO_DIR = "data"
  18. _SYSTEM_PROMPT = "你是面向中国中老年内容池的视频审核助手。只输出一个 JSON 对象,不要任何解释或 markdown。"
  19. _USER_PROMPT = (
  20. "判断这条视频,严格按以下 JSON 结构输出(只输出 JSON):\n"
  21. '{{"fit_senior_50plus": true/false, "fit_confidence": 0~1 的小数, '
  22. '"relevance_score": 0~1 的小数, "reason": "中文简述理由"}}\n'
  23. "字段含义:\n"
  24. "- fit_senior_50plus: 内容是否适合中国 50 岁以上老年人观看(健康/安全/无误导/无低俗)。\n"
  25. "- fit_confidence: 你对上面判断的置信度。\n"
  26. "- relevance_score: 视频内容与需求关键词【{seed_terms}】的相关程度。\n"
  27. "- reason: 一句话理由。"
  28. )
  29. def _fail(reason: str) -> dict[str, Any]:
  30. return {
  31. "fit_senior_50plus": False,
  32. "fit_confidence": 0.0,
  33. "relevance_score": 0.0,
  34. "reason": reason,
  35. "status": "failed",
  36. }
  37. def _clamp01(value: Any) -> float:
  38. try:
  39. number = float(value)
  40. except (TypeError, ValueError):
  41. return 0.0
  42. return max(0.0, min(1.0, number))
  43. def _parse(payload: dict[str, Any]) -> dict[str, Any]:
  44. content = payload["choices"][0]["message"]["content"]
  45. text = str(content).strip()
  46. if text.startswith("```"):
  47. text = text.split("```", 2)[1]
  48. if text.startswith("json"):
  49. text = text[4:]
  50. data = json.loads(text)
  51. return {
  52. "fit_senior_50plus": bool(data["fit_senior_50plus"]),
  53. "fit_confidence": _clamp01(data.get("fit_confidence")),
  54. "relevance_score": _clamp01(data.get("relevance_score")),
  55. "reason": str(data.get("reason") or ""),
  56. }
  57. def _seed_terms(source_context: dict[str, Any]) -> str:
  58. evidence = source_context.get("ext_data", {}).get("evidence_pack", {})
  59. terms = evidence.get("seed_terms") or []
  60. return "、".join(str(t) for t in terms) or "(未指定)"
  61. class GeminiVideoClient:
  62. def __init__(
  63. self,
  64. *,
  65. api_key: str,
  66. model: str = DEFAULT_VIDEO_MODEL,
  67. base_url: str = DEFAULT_OPENROUTER_BASE_URL,
  68. timeout_seconds: float = DEFAULT_VIDEO_TIMEOUT_SECONDS,
  69. fetch_fn: Callable[..., str] = video_fetch.fetch_and_compress,
  70. http_post: Callable[..., Any] = httpx.post,
  71. raw_video_save_dir: str | None = None,
  72. ) -> None:
  73. self.api_key = api_key
  74. self.model = model
  75. self.base_url = base_url.rstrip("/")
  76. self.timeout_seconds = timeout_seconds
  77. self.fetch_fn = fetch_fn
  78. self.http_post = http_post
  79. self.raw_video_save_dir = raw_video_save_dir
  80. @classmethod
  81. def from_env(cls, env: Mapping[str, str] | None = None) -> "GeminiVideoClient":
  82. source = os.environ if env is None else env
  83. api_key = source.get("OPENROUTER_API_KEY") or source.get("OPEN_ROUTER_API_KEY")
  84. if not api_key:
  85. return MissingGeminiVideoClient("gemini video config missing: OPENROUTER_API_KEY")
  86. return cls(
  87. api_key=api_key,
  88. model=source.get("CONTENT_AGENT_VIDEO_LLM_MODEL") or DEFAULT_VIDEO_MODEL,
  89. base_url=source.get("OPENROUTER_BASE_URL") or DEFAULT_OPENROUTER_BASE_URL,
  90. timeout_seconds=float(source.get("CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS") or DEFAULT_VIDEO_TIMEOUT_SECONDS),
  91. raw_video_save_dir=DEFAULT_RAW_VIDEO_DIR,
  92. )
  93. def _raw_save_path(self, content: dict[str, Any]) -> str | None:
  94. """原片留档路径 data/{run_id}/{platform_content_id}.mp4;身份字段缺失则不存。"""
  95. if not self.raw_video_save_dir:
  96. return None
  97. run_id = content.get("run_id")
  98. platform_content_id = content.get("platform_content_id")
  99. if not run_id or not platform_content_id:
  100. return None
  101. return str(Path(self.raw_video_save_dir) / str(run_id) / f"{platform_content_id}.mp4")
  102. def analyze(
  103. self,
  104. content: dict[str, Any],
  105. media: dict[str, Any],
  106. source_context: dict[str, Any],
  107. ) -> dict[str, Any]:
  108. play_url = media.get("play_url")
  109. if not play_url:
  110. return _fail("no_play_url")
  111. fetch_kwargs: dict[str, Any] = {}
  112. save_path = self._raw_save_path(content)
  113. if save_path:
  114. fetch_kwargs["save_raw_to"] = save_path
  115. try:
  116. data_url = self.fetch_fn(play_url, content.get("platform", "douyin"), **fetch_kwargs)
  117. except Exception as exc:
  118. return _fail(f"video_fetch_failed: {type(exc).__name__}")
  119. messages = [
  120. {"role": "system", "content": _SYSTEM_PROMPT},
  121. {
  122. "role": "user",
  123. "content": [
  124. {"type": "text", "text": _USER_PROMPT.format(seed_terms=_seed_terms(source_context))},
  125. {"type": "image_url", "image_url": {"url": data_url}},
  126. ],
  127. },
  128. ]
  129. try:
  130. response = self.http_post(
  131. f"{self.base_url}/chat/completions",
  132. headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
  133. json={"model": self.model, "messages": messages},
  134. timeout=self.timeout_seconds,
  135. )
  136. response.raise_for_status()
  137. return _parse(response.json())
  138. except httpx.HTTPError as exc:
  139. return _fail(f"gemini_http_error: {type(exc).__name__}")
  140. except (KeyError, IndexError, TypeError, ValueError) as exc:
  141. return _fail(f"gemini_response_invalid: {type(exc).__name__}")
  142. class MissingGeminiVideoClient:
  143. def __init__(self, reason: str) -> None:
  144. self.reason = reason
  145. def analyze(
  146. self,
  147. content: dict[str, Any],
  148. media: dict[str, Any],
  149. source_context: dict[str, Any],
  150. ) -> dict[str, Any]:
  151. return _fail(self.reason)