| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- """Gemini 视频判定 client (V3-M2B).
- 实现 interfaces.GeminiVideoClient.analyze:取视频(video_fetch)→ 多模态投给
- Gemini(OpenRouter image_url data URL)→ 解析结构化判定 4 字段。复用 query_variant
- 的 OpenRouter httpx 骨架,不引入新 SDK。任何失败一律返回 fail 结构,不抛、不卡 run。
- """
- from __future__ import annotations
- import json
- import os
- from pathlib import Path
- from typing import Any, Callable, Mapping
- import httpx
- from content_agent.integrations import video_fetch
- DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
- DEFAULT_VIDEO_MODEL = "google/gemini-3-flash-preview"
- DEFAULT_VIDEO_TIMEOUT_SECONDS = 90.0
- # 原片留档目录(2026-06-12 拍板:全量存,含未过审;相对仓库根,服务器同款)。
- DEFAULT_RAW_VIDEO_DIR = "data"
- _SYSTEM_PROMPT = "你是面向中国中老年内容池的视频审核助手。只输出一个 JSON 对象,不要任何解释或 markdown。"
- _USER_PROMPT = (
- "判断这条视频,严格按以下 JSON 结构输出(只输出 JSON):\n"
- '{{"fit_senior_50plus": true/false, "fit_confidence": 0~1 的小数, '
- '"relevance_score": 0~1 的小数, "reason": "中文简述理由"}}\n'
- "字段含义:\n"
- "- fit_senior_50plus: 内容是否适合中国 50 岁以上老年人观看(健康/安全/无误导/无低俗)。\n"
- "- fit_confidence: 你对上面判断的置信度。\n"
- "- relevance_score: 视频内容与需求关键词【{seed_terms}】的相关程度。\n"
- "- reason: 一句话理由。"
- )
- def _fail(reason: str) -> dict[str, Any]:
- return {
- "fit_senior_50plus": False,
- "fit_confidence": 0.0,
- "relevance_score": 0.0,
- "reason": reason,
- "status": "failed",
- }
- def _clamp01(value: Any) -> float:
- try:
- number = float(value)
- except (TypeError, ValueError):
- return 0.0
- return max(0.0, min(1.0, number))
- def _parse(payload: dict[str, Any]) -> dict[str, Any]:
- content = payload["choices"][0]["message"]["content"]
- text = str(content).strip()
- if text.startswith("```"):
- text = text.split("```", 2)[1]
- if text.startswith("json"):
- text = text[4:]
- data = json.loads(text)
- return {
- "fit_senior_50plus": bool(data["fit_senior_50plus"]),
- "fit_confidence": _clamp01(data.get("fit_confidence")),
- "relevance_score": _clamp01(data.get("relevance_score")),
- "reason": str(data.get("reason") or ""),
- }
- def _seed_terms(source_context: dict[str, Any]) -> str:
- evidence = source_context.get("ext_data", {}).get("evidence_pack", {})
- terms = evidence.get("seed_terms") or []
- return "、".join(str(t) for t in terms) or "(未指定)"
- class GeminiVideoClient:
- def __init__(
- self,
- *,
- api_key: str,
- model: str = DEFAULT_VIDEO_MODEL,
- base_url: str = DEFAULT_OPENROUTER_BASE_URL,
- timeout_seconds: float = DEFAULT_VIDEO_TIMEOUT_SECONDS,
- fetch_fn: Callable[..., str] = video_fetch.fetch_and_compress,
- http_post: Callable[..., Any] = httpx.post,
- raw_video_save_dir: str | None = None,
- ) -> None:
- self.api_key = api_key
- self.model = model
- self.base_url = base_url.rstrip("/")
- self.timeout_seconds = timeout_seconds
- self.fetch_fn = fetch_fn
- self.http_post = http_post
- self.raw_video_save_dir = raw_video_save_dir
- @classmethod
- def from_env(cls, env: Mapping[str, str] | None = None) -> "GeminiVideoClient":
- source = os.environ if env is None else env
- api_key = source.get("OPENROUTER_API_KEY") or source.get("OPEN_ROUTER_API_KEY")
- if not api_key:
- return MissingGeminiVideoClient("gemini video config missing: OPENROUTER_API_KEY")
- return cls(
- api_key=api_key,
- model=source.get("CONTENT_AGENT_VIDEO_LLM_MODEL") or DEFAULT_VIDEO_MODEL,
- base_url=source.get("OPENROUTER_BASE_URL") or DEFAULT_OPENROUTER_BASE_URL,
- timeout_seconds=float(source.get("CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS") or DEFAULT_VIDEO_TIMEOUT_SECONDS),
- raw_video_save_dir=DEFAULT_RAW_VIDEO_DIR,
- )
- def _raw_save_path(self, content: dict[str, Any]) -> str | None:
- """原片留档路径 data/{run_id}/{platform_content_id}.mp4;身份字段缺失则不存。"""
- if not self.raw_video_save_dir:
- return None
- run_id = content.get("run_id")
- platform_content_id = content.get("platform_content_id")
- if not run_id or not platform_content_id:
- return None
- return str(Path(self.raw_video_save_dir) / str(run_id) / f"{platform_content_id}.mp4")
- def analyze(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- play_url = media.get("play_url")
- if not play_url:
- return _fail("no_play_url")
- fetch_kwargs: dict[str, Any] = {}
- save_path = self._raw_save_path(content)
- if save_path:
- fetch_kwargs["save_raw_to"] = save_path
- try:
- data_url = self.fetch_fn(play_url, content.get("platform", "douyin"), **fetch_kwargs)
- except Exception as exc:
- return _fail(f"video_fetch_failed: {type(exc).__name__}")
- messages = [
- {"role": "system", "content": _SYSTEM_PROMPT},
- {
- "role": "user",
- "content": [
- {"type": "text", "text": _USER_PROMPT.format(seed_terms=_seed_terms(source_context))},
- {"type": "image_url", "image_url": {"url": data_url}},
- ],
- },
- ]
- try:
- response = self.http_post(
- f"{self.base_url}/chat/completions",
- headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
- json={"model": self.model, "messages": messages},
- timeout=self.timeout_seconds,
- )
- response.raise_for_status()
- return _parse(response.json())
- except httpx.HTTPError as exc:
- return _fail(f"gemini_http_error: {type(exc).__name__}")
- except (KeyError, IndexError, TypeError, ValueError) as exc:
- return _fail(f"gemini_response_invalid: {type(exc).__name__}")
- class MissingGeminiVideoClient:
- def __init__(self, reason: str) -> None:
- self.reason = reason
- def analyze(
- self,
- content: dict[str, Any],
- media: dict[str, Any],
- source_context: dict[str, Any],
- ) -> dict[str, Any]:
- return _fail(self.reason)
|