"""视频获取链 (V3-M2A). 从 play_url 下载视频(带平台下载头)→ imageio-ffmpeg 压到 ~4MB 低清 → base64 data URL,供 GeminiVideoClient 投喂(OpenRouter image_url)。 真实下载/压缩只在 M7 live smoke 跑;单测全 mock。 2026-06-12 拍板:下载成功的原片全量落盘 data/(过没过审都存,play_url 有时效留不住)。 """ from __future__ import annotations import base64 import subprocess from pathlib import Path from typing import Any import httpx import imageio_ffmpeg # platform_profiles 里写的是 "iOS UA"/"PC UA" 占位,这里映射成真实串 + Referer。 _PLATFORM_DOWNLOAD_HEADERS = { "douyin": { "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148", "Referer": "https://www.douyin.com/", }, "shipinhao": { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36", "Referer": "https://channels.weixin.qq.com/", }, } # 已拍板压缩档:360p / 1fps / 低清,实测 ~4MB(memory/video-multimodal-analysis)。 _FFMPEG_ARGS = ["-vf", "scale=360:-2,fps=1", "-crf", "33", "-c:a", "aac", "-b:a", "32k", "-ac", "1"] MAX_INLINE_BYTES = 30 * 1024 * 1024 # OpenRouter inline base64 平台硬上限 COMPRESS_TIMEOUT_SECONDS = 120.0 # 实测 64MB/720p 压缩 ~8s,120s 足够余量 class VideoFetchError(RuntimeError): """下载/压缩/超限失败,由 GeminiVideoClient 捕获转 fail。""" def _download_headers(platform: str, override: dict[str, str] | None) -> dict[str, str]: if override is not None: return override return _PLATFORM_DOWNLOAD_HEADERS.get(platform, {}) def _save_raw(save_path: str, raw: bytes) -> None: # 原片留档是 best-effort:磁盘问题绝不影响判定链路。 try: path = Path(save_path) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(raw) except OSError: pass def _compress(raw: bytes, ffmpeg_exe: str) -> bytes: # 超时保护:坏视频会让 ffmpeg 卡死,进而挂住一个判定并发线程(实测正常压缩 ~8s)。 try: proc = subprocess.run( [ffmpeg_exe, "-i", "pipe:0", *_FFMPEG_ARGS, "-f", "mp4", "-movflags", "frag_keyframe+empty_moov", "pipe:1"], input=raw, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=COMPRESS_TIMEOUT_SECONDS, ) except subprocess.TimeoutExpired as exc: raise VideoFetchError("ffmpeg compression timeout") from exc if proc.returncode != 0 or not proc.stdout: raise VideoFetchError("ffmpeg compression failed") return proc.stdout def fetch_and_compress( play_url: str, platform: str, *, headers: dict[str, str] | None = None, http_client: Any | None = None, ffmpeg_exe: str | None = None, timeout_seconds: float = 90.0, save_raw_to: str | None = None, ) -> str: if not play_url: raise VideoFetchError("missing play_url") client = http_client or httpx try: response = client.get( play_url, headers=_download_headers(platform, headers), follow_redirects=True, timeout=timeout_seconds, ) response.raise_for_status() raw = response.content except httpx.HTTPError as exc: raise VideoFetchError(f"download failed: {type(exc).__name__}") from exc if not raw: raise VideoFetchError("empty download") if save_raw_to: _save_raw(save_raw_to, raw) compressed = _compress(raw, ffmpeg_exe or imageio_ffmpeg.get_ffmpeg_exe()) if len(compressed) > MAX_INLINE_BYTES: raise VideoFetchError(f"compressed video oversize: {len(compressed)} bytes") return f"data:video/mp4;base64,{base64.b64encode(compressed).decode('ascii')}"