| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """Download a post's source video, extract audio, transcribe via Deepgram.
- Used by platform detail() implementations whose posts ship raw video URLs
- (X, sph, douyin) and don't already supply captions. YouTube has its own
- captions endpoint and bypasses this module.
- Pipeline per video:
- 1. extract_video_url(platform, post) -> source url (page or direct)
- 2. download to %TEMP%/content_transcribe/<platform>/<stem>.mp4
- - X : yt-dlp on the page URL (most robust against rotating video URLs)
- - douyin: httpx + Referer https://www.douyin.com/
- - sph : httpx + Referer https://channels.weixin.qq.com/
- 3. ffmpeg -> 16kHz mono AAC 64kbps m4a (~3% the size of the source mp4)
- 4. POST to Deepgram /v1/listen, model=whisper-large by default
- 5. Strip spaces inserted by Deepgram between consecutive CJK characters
- Returns transcript text on success, None on any failure (silent fallback).
- """
- from __future__ import annotations
- import asyncio
- import logging
- import os
- import re
- import subprocess
- from pathlib import Path
- from typing import Any, Optional
- import httpx
- logger = logging.getLogger(__name__)
- DEEPGRAM_URL = "https://api.deepgram.com/v1/listen"
- DEEPGRAM_MODEL_DEFAULT = "whisper-large"
- DEEPGRAM_REQUEST_TIMEOUT = 600.0
- DOWNLOAD_TIMEOUT = 300
- FFMPEG_TIMEOUT = 600
- UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/124.0 Safari/537.36")
- # 项目根目录 / .cache / content_videos —— 不再用系统 %TEMP%,避免被 Windows 偶发清理
- # 也避免 8GB+ 视频堆在 AppData\Local\Temp 看不见。
- # parents[4]: transcription.py → content/ → builtin/ → tools/ → agent/ → project root
- _CACHE_ROOT = Path(__file__).resolve().parents[4] / ".cache" / "content_videos"
- _SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")
- # Zero-width lookbehind/lookahead: remove whitespace strictly between CJK chars,
- # preserve CJK<->ASCII boundaries (e.g. "Remotion 是工具" stays intact).
- _CJK_SPACE_RE = re.compile(r"(?<=[一-鿿])\s+(?=[一-鿿])")
- # Referer headers required by some CDNs for ffprobe / yt-dlp / httpx to access video URLs.
- _PLATFORM_REFERERS = {
- "douyin": "https://www.douyin.com/",
- "sph": "https://channels.weixin.qq.com/",
- "xhs": "https://www.xiaohongshu.com/",
- "bili": "https://www.bilibili.com/",
- "weibo": "https://weibo.com/",
- }
- _DURATION_PROBE_TIMEOUT = 15
- def extract_video_url(platform: str, post: dict[str, Any]) -> Optional[str]:
- """Pluck a video URL (page or direct) out of a platform's raw post dict."""
- if platform == "x":
- vlist = post.get("video_url_list") or []
- if vlist:
- head = vlist[0]
- return head.get("video_url") if isinstance(head, dict) else head
- return None
- if platform == "youtube":
- vid = post.get("video_id") or post.get("content_id")
- return f"https://www.youtube.com/watch?v={vid}" if vid else None
- # Generic: aigc-channel platforms (xhs / gzh / sph / douyin / bili / zhihu /
- # weibo / toutiao / github) all expose video URLs under `videos[0]`.
- videos = post.get("videos") or []
- if videos:
- return videos[0]
- return None
- def _safe_stem(platform: str, post: dict[str, Any]) -> str:
- raw_id = (
- post.get("channel_content_id")
- or post.get("video_id")
- or post.get("content_id")
- or "item"
- )
- return f"{platform}_{_SAFE_RE.sub('_', str(raw_id))[:60]}"
- def _yt_dlp_download(url: str, target: Path) -> Optional[Path]:
- if target.exists() and target.stat().st_size > 0:
- return target
- # Format chain: 优先 muxed mp4(YouTube/X/douyin 通常命中,最快),
- # fallback 到 bestvideo+bestaudio + ffmpeg merge(bili 等 DASH-only 平台),
- # 最后兜底 best。
- cmd = ["yt-dlp", "-f", "best[ext=mp4]/bestvideo+bestaudio/best",
- "-o", str(target),
- "--no-playlist", "--quiet", "--no-warnings", url]
- try:
- r = subprocess.run(cmd, capture_output=True, text=True, timeout=DOWNLOAD_TIMEOUT)
- except (subprocess.TimeoutExpired, FileNotFoundError) as e:
- logger.warning("yt-dlp failed for %s: %s", url, e)
- return None
- if r.returncode != 0:
- logger.warning("yt-dlp non-zero for %s: %s", url, (r.stderr or r.stdout)[:200])
- return None
- if target.exists() and target.stat().st_size > 0:
- return target
- # yt-dlp may have written with a different extension
- for f in target.parent.glob(target.stem + ".*"):
- if f.is_file() and f.stat().st_size > 0:
- return f
- return None
- async def _httpx_download(url: str, target: Path, referer: Optional[str] = None) -> Optional[Path]:
- if target.exists() and target.stat().st_size > 0:
- return target
- headers = {"User-Agent": UA}
- if referer:
- headers["Referer"] = referer
- try:
- async with httpx.AsyncClient(
- timeout=DOWNLOAD_TIMEOUT, follow_redirects=True, headers=headers
- ) as client:
- async with client.stream("GET", url) as resp:
- if resp.status_code != 200:
- logger.warning("download HTTP %s for %s", resp.status_code, url)
- return None
- with target.open("wb") as f:
- async for chunk in resp.aiter_bytes(chunk_size=64 * 1024):
- f.write(chunk)
- except Exception as e:
- logger.warning("httpx download failed for %s: %s", url, e)
- return None
- return target if target.exists() and target.stat().st_size > 0 else None
- async def _download_video(
- platform: str, post: dict[str, Any], video_url: str, target: Path
- ) -> Optional[Path]:
- """Dispatch to the right downloader per platform.
- Per-platform strategies:
- x : yt-dlp on the tweet page URL (video URLs are signed/rotating)
- douyin : httpx direct with douyin.com Referer (video URL is a play API)
- sph : httpx direct with channels.weixin.qq.com Referer (stodownload link)
- youtube: yt-dlp on the watch URL
- For everything else (xhs / bili / weibo / zhihu / gzh / toutiao / github / ...):
- try yt-dlp on the post's page URL first (yt-dlp supports 1000+ sites including
- most aigc-channel platforms via cookies-free extractors), and fall back to
- plain httpx on `videos[0]` if yt-dlp can't handle it.
- """
- if platform == "x":
- page_url = post.get("link") or video_url
- return await asyncio.to_thread(_yt_dlp_download, page_url, target)
- if platform == "douyin":
- return await _httpx_download(video_url, target, referer="https://www.douyin.com/")
- if platform == "sph":
- return await _httpx_download(video_url, target, referer="https://channels.weixin.qq.com/")
- if platform == "youtube":
- return await asyncio.to_thread(_yt_dlp_download, video_url, target)
- # Generic two-step fallback for any other platform with a `videos` field.
- page_url = post.get("link")
- if page_url:
- result = await asyncio.to_thread(_yt_dlp_download, page_url, target)
- if result:
- return result
- logger.info("yt-dlp didn't handle %s page URL; falling back to httpx", platform)
- return await _httpx_download(video_url, target)
- def _extract_m4a(video_path: Path, audio_path: Path) -> bool:
- """ffmpeg: video -> 16kHz mono AAC 64kbps m4a. Returns True if file written."""
- audio_path.parent.mkdir(parents=True, exist_ok=True)
- if audio_path.exists() and audio_path.stat().st_size > 0:
- return True
- cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
- "-i", str(video_path),
- "-vn", "-ac", "1", "-ar", "16000",
- "-c:a", "aac", "-b:a", "64k",
- str(audio_path)]
- try:
- subprocess.run(cmd, check=True, timeout=FFMPEG_TIMEOUT,
- capture_output=True)
- except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
- logger.warning("ffmpeg failed for %s: %s", video_path, e)
- return False
- return audio_path.exists() and audio_path.stat().st_size > 0
- async def _transcribe_deepgram(
- audio_path: Path,
- api_key: str,
- model: str = DEEPGRAM_MODEL_DEFAULT,
- language: Optional[str] = None,
- ) -> Optional[str]:
- params: dict[str, str] = {
- "model": model,
- "smart_format": "true",
- "punctuate": "true",
- }
- if language:
- params["language"] = language
- else:
- params["detect_language"] = "true"
- headers = {
- "Authorization": f"Token {api_key}",
- "Content-Type": "audio/mp4",
- }
- try:
- audio_bytes = audio_path.read_bytes()
- async with httpx.AsyncClient(timeout=DEEPGRAM_REQUEST_TIMEOUT) as client:
- r = await client.post(DEEPGRAM_URL, params=params, headers=headers,
- content=audio_bytes)
- except Exception as e:
- logger.warning("Deepgram request failed for %s: %s", audio_path.name, e)
- return None
- if r.status_code != 200:
- logger.warning("Deepgram HTTP %s: %s", r.status_code, r.text[:200])
- return None
- try:
- data = r.json()
- alt = data["results"]["channels"][0]["alternatives"][0]
- return alt.get("transcript") or None
- except (KeyError, IndexError, ValueError) as e:
- logger.warning("Deepgram response malformed: %s", e)
- return None
- def _clean_chinese_spaces(text: str) -> str:
- """Drop whitespace strictly between two CJK characters."""
- return _CJK_SPACE_RE.sub("", text)
- def _ffprobe_duration_sync(video_url: str, referer: Optional[str] = None) -> Optional[float]:
- """Read mp4 moov box over HTTP Range; returns duration (seconds) or None.
- Does NOT download the video stream — typically pulls only a few KB even for
- multi-GB files. Designed to be called from search() to enrich posts with
- duration before scoring, without paying the cost of a full download.
- """
- cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- "-of", "default=nw=1:nk=1"]
- if referer:
- cmd += ["-headers", f"Referer: {referer}\r\n"]
- cmd += [video_url]
- try:
- r = subprocess.run(cmd, capture_output=True, text=True, timeout=_DURATION_PROBE_TIMEOUT)
- except (subprocess.TimeoutExpired, FileNotFoundError) as e:
- logger.info("ffprobe duration probe failed for %s: %s", video_url[:80], e)
- return None
- out = (r.stdout or "").strip()
- if not out:
- return None
- try:
- d = float(out)
- except ValueError:
- return None
- return d if d > 0 else None
- async def probe_video_duration(
- video_url: str, platform: Optional[str] = None
- ) -> Optional[float]:
- """Async wrapper. Probes mp4 duration via HTTP Range; returns seconds or None.
- Pass `platform` to auto-inject the right Referer header (douyin / sph / xhs / bili
- require it). Safe to call concurrently — uses asyncio.to_thread so subprocesses
- don't block the event loop. Each call is one ffprobe subprocess; cap parallelism
- at the call site if probing many URLs.
- """
- if not video_url:
- return None
- referer = _PLATFORM_REFERERS.get(platform) if platform else None
- return await asyncio.to_thread(_ffprobe_duration_sync, video_url, referer)
- async def probe_durations_for_posts(
- platform: str, posts: list, concurrency: int = 8
- ) -> None:
- """In-place: probe each post's video URL and set post["duration_sec"] if found.
- Skips posts with no video URL (image-only posts). Probes happen concurrently
- bounded by `concurrency` to avoid spawning a flood of ffprobe subprocesses.
- Failures are silent (post just won't have duration_sec — evaluator handles).
- """
- sem = asyncio.Semaphore(concurrency)
- async def _one(post: dict) -> None:
- url = extract_video_url(platform, post)
- if not url:
- return
- async with sem:
- d = await probe_video_duration(url, platform=platform)
- if d is not None:
- post["duration_sec"] = d
- await asyncio.gather(*[_one(p) for p in posts if isinstance(p, dict)])
- def _get_api_key() -> Optional[str]:
- key = os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
- if key:
- return key
- try:
- from dotenv import load_dotenv
- load_dotenv()
- except ImportError:
- return None
- return os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
- async def transcribe_video_from_post(
- platform: str,
- post: dict[str, Any],
- *,
- model: str = DEEPGRAM_MODEL_DEFAULT,
- language: Optional[str] = None,
- ) -> Optional[str]:
- """End-to-end: locate video, download, extract m4a, STT, clean spaces.
- Returns transcript text or None if any step fails (logged at WARNING level).
- Caller can safely ignore None and fall back to whatever body text it has.
- """
- url = extract_video_url(platform, post)
- if not url:
- return None
- api_key = _get_api_key()
- if not api_key:
- logger.warning("DEEPGRAM_KEY not set; skipping transcription for %s", platform)
- return None
- stem = _safe_stem(platform, post)
- work_dir = _CACHE_ROOT / platform
- work_dir.mkdir(parents=True, exist_ok=True)
- video_path = work_dir / f"{stem}.mp4"
- audio_path = work_dir / f"{stem}.m4a"
- video = await _download_video(platform, post, url, video_path)
- if not video:
- return None
- if not await asyncio.to_thread(_extract_m4a, video, audio_path):
- return None
- transcript = await _transcribe_deepgram(audio_path, api_key, model=model, language=language)
- if not transcript:
- return None
- return _clean_chinese_spaces(transcript).strip()
|