transcription.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. """Download a post's source video, extract audio, transcribe via Deepgram.
  2. Used by platform detail() implementations whose posts ship raw video URLs
  3. (X, sph, douyin) and don't already supply captions. YouTube has its own
  4. captions endpoint and bypasses this module.
  5. Pipeline per video:
  6. 1. extract_video_url(platform, post) -> source url (page or direct)
  7. 2. download to %TEMP%/content_transcribe/<platform>/<stem>.mp4
  8. - X : yt-dlp on the page URL (most robust against rotating video URLs)
  9. - douyin: httpx + Referer https://www.douyin.com/
  10. - sph : httpx + Referer https://channels.weixin.qq.com/
  11. 3. ffmpeg -> 16kHz mono AAC 64kbps m4a (~3% the size of the source mp4)
  12. 4. POST to Deepgram /v1/listen, model=whisper-large by default
  13. 5. Strip spaces inserted by Deepgram between consecutive CJK characters
  14. Returns transcript text on success, None on any failure (silent fallback).
  15. """
  16. from __future__ import annotations
  17. import asyncio
  18. import logging
  19. import os
  20. import re
  21. import subprocess
  22. from pathlib import Path
  23. from typing import Any, Optional
  24. import httpx
  25. logger = logging.getLogger(__name__)
  26. DEEPGRAM_URL = "https://api.deepgram.com/v1/listen"
  27. DEEPGRAM_MODEL_DEFAULT = "whisper-large"
  28. DEEPGRAM_REQUEST_TIMEOUT = 600.0
  29. DOWNLOAD_TIMEOUT = 300
  30. FFMPEG_TIMEOUT = 600
  31. UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  32. "(KHTML, like Gecko) Chrome/124.0 Safari/537.36")
  33. # 项目根目录 / .cache / content_videos —— 不再用系统 %TEMP%,避免被 Windows 偶发清理
  34. # 也避免 8GB+ 视频堆在 AppData\Local\Temp 看不见。
  35. # parents[4]: transcription.py → content/ → builtin/ → tools/ → agent/ → project root
  36. _CACHE_ROOT = Path(__file__).resolve().parents[4] / ".cache" / "content_videos"
  37. _SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")
  38. # Zero-width lookbehind/lookahead: remove whitespace strictly between CJK chars,
  39. # preserve CJK<->ASCII boundaries (e.g. "Remotion 是工具" stays intact).
  40. _CJK_SPACE_RE = re.compile(r"(?<=[一-鿿])\s+(?=[一-鿿])")
  41. # Referer headers required by some CDNs for ffprobe / yt-dlp / httpx to access video URLs.
  42. _PLATFORM_REFERERS = {
  43. "douyin": "https://www.douyin.com/",
  44. "sph": "https://channels.weixin.qq.com/",
  45. "xhs": "https://www.xiaohongshu.com/",
  46. "bili": "https://www.bilibili.com/",
  47. "weibo": "https://weibo.com/",
  48. }
  49. _DURATION_PROBE_TIMEOUT = 15
  50. def extract_video_url(platform: str, post: dict[str, Any]) -> Optional[str]:
  51. """Pluck a video URL (page or direct) out of a platform's raw post dict."""
  52. if platform == "x":
  53. vlist = post.get("video_url_list") or []
  54. if vlist:
  55. head = vlist[0]
  56. return head.get("video_url") if isinstance(head, dict) else head
  57. return None
  58. if platform == "youtube":
  59. vid = post.get("video_id") or post.get("content_id")
  60. return f"https://www.youtube.com/watch?v={vid}" if vid else None
  61. # Generic: aigc-channel platforms (xhs / gzh / sph / douyin / bili / zhihu /
  62. # weibo / toutiao / github) all expose video URLs under `videos[0]`.
  63. videos = post.get("videos") or []
  64. if videos:
  65. return videos[0]
  66. return None
  67. def _safe_stem(platform: str, post: dict[str, Any]) -> str:
  68. raw_id = (
  69. post.get("channel_content_id")
  70. or post.get("video_id")
  71. or post.get("content_id")
  72. or "item"
  73. )
  74. return f"{platform}_{_SAFE_RE.sub('_', str(raw_id))[:60]}"
  75. def _yt_dlp_download(url: str, target: Path) -> Optional[Path]:
  76. if target.exists() and target.stat().st_size > 0:
  77. return target
  78. # Format chain: 优先 muxed mp4(YouTube/X/douyin 通常命中,最快),
  79. # fallback 到 bestvideo+bestaudio + ffmpeg merge(bili 等 DASH-only 平台),
  80. # 最后兜底 best。
  81. cmd = ["yt-dlp", "-f", "best[ext=mp4]/bestvideo+bestaudio/best",
  82. "-o", str(target),
  83. "--no-playlist", "--quiet", "--no-warnings", url]
  84. try:
  85. r = subprocess.run(cmd, capture_output=True, text=True, timeout=DOWNLOAD_TIMEOUT)
  86. except (subprocess.TimeoutExpired, FileNotFoundError) as e:
  87. logger.warning("yt-dlp failed for %s: %s", url, e)
  88. return None
  89. if r.returncode != 0:
  90. logger.warning("yt-dlp non-zero for %s: %s", url, (r.stderr or r.stdout)[:200])
  91. return None
  92. if target.exists() and target.stat().st_size > 0:
  93. return target
  94. # yt-dlp may have written with a different extension
  95. for f in target.parent.glob(target.stem + ".*"):
  96. if f.is_file() and f.stat().st_size > 0:
  97. return f
  98. return None
  99. async def _httpx_download(url: str, target: Path, referer: Optional[str] = None) -> Optional[Path]:
  100. if target.exists() and target.stat().st_size > 0:
  101. return target
  102. headers = {"User-Agent": UA}
  103. if referer:
  104. headers["Referer"] = referer
  105. try:
  106. async with httpx.AsyncClient(
  107. timeout=DOWNLOAD_TIMEOUT, follow_redirects=True, headers=headers
  108. ) as client:
  109. async with client.stream("GET", url) as resp:
  110. if resp.status_code != 200:
  111. logger.warning("download HTTP %s for %s", resp.status_code, url)
  112. return None
  113. with target.open("wb") as f:
  114. async for chunk in resp.aiter_bytes(chunk_size=64 * 1024):
  115. f.write(chunk)
  116. except Exception as e:
  117. logger.warning("httpx download failed for %s: %s", url, e)
  118. return None
  119. return target if target.exists() and target.stat().st_size > 0 else None
  120. async def _download_video(
  121. platform: str, post: dict[str, Any], video_url: str, target: Path
  122. ) -> Optional[Path]:
  123. """Dispatch to the right downloader per platform.
  124. Per-platform strategies:
  125. x : yt-dlp on the tweet page URL (video URLs are signed/rotating)
  126. douyin : httpx direct with douyin.com Referer (video URL is a play API)
  127. sph : httpx direct with channels.weixin.qq.com Referer (stodownload link)
  128. youtube: yt-dlp on the watch URL
  129. For everything else (xhs / bili / weibo / zhihu / gzh / toutiao / github / ...):
  130. try yt-dlp on the post's page URL first (yt-dlp supports 1000+ sites including
  131. most aigc-channel platforms via cookies-free extractors), and fall back to
  132. plain httpx on `videos[0]` if yt-dlp can't handle it.
  133. """
  134. if platform == "x":
  135. page_url = post.get("link") or video_url
  136. return await asyncio.to_thread(_yt_dlp_download, page_url, target)
  137. if platform == "douyin":
  138. return await _httpx_download(video_url, target, referer="https://www.douyin.com/")
  139. if platform == "sph":
  140. return await _httpx_download(video_url, target, referer="https://channels.weixin.qq.com/")
  141. if platform == "youtube":
  142. return await asyncio.to_thread(_yt_dlp_download, video_url, target)
  143. # Generic two-step fallback for any other platform with a `videos` field.
  144. page_url = post.get("link")
  145. if page_url:
  146. result = await asyncio.to_thread(_yt_dlp_download, page_url, target)
  147. if result:
  148. return result
  149. logger.info("yt-dlp didn't handle %s page URL; falling back to httpx", platform)
  150. return await _httpx_download(video_url, target)
  151. def _extract_m4a(video_path: Path, audio_path: Path) -> bool:
  152. """ffmpeg: video -> 16kHz mono AAC 64kbps m4a. Returns True if file written."""
  153. audio_path.parent.mkdir(parents=True, exist_ok=True)
  154. if audio_path.exists() and audio_path.stat().st_size > 0:
  155. return True
  156. cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
  157. "-i", str(video_path),
  158. "-vn", "-ac", "1", "-ar", "16000",
  159. "-c:a", "aac", "-b:a", "64k",
  160. str(audio_path)]
  161. try:
  162. subprocess.run(cmd, check=True, timeout=FFMPEG_TIMEOUT,
  163. capture_output=True)
  164. except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
  165. logger.warning("ffmpeg failed for %s: %s", video_path, e)
  166. return False
  167. return audio_path.exists() and audio_path.stat().st_size > 0
  168. async def _transcribe_deepgram(
  169. audio_path: Path,
  170. api_key: str,
  171. model: str = DEEPGRAM_MODEL_DEFAULT,
  172. language: Optional[str] = None,
  173. ) -> Optional[str]:
  174. params: dict[str, str] = {
  175. "model": model,
  176. "smart_format": "true",
  177. "punctuate": "true",
  178. }
  179. if language:
  180. params["language"] = language
  181. else:
  182. params["detect_language"] = "true"
  183. headers = {
  184. "Authorization": f"Token {api_key}",
  185. "Content-Type": "audio/mp4",
  186. }
  187. try:
  188. audio_bytes = audio_path.read_bytes()
  189. async with httpx.AsyncClient(timeout=DEEPGRAM_REQUEST_TIMEOUT) as client:
  190. r = await client.post(DEEPGRAM_URL, params=params, headers=headers,
  191. content=audio_bytes)
  192. except Exception as e:
  193. logger.warning("Deepgram request failed for %s: %s", audio_path.name, e)
  194. return None
  195. if r.status_code != 200:
  196. logger.warning("Deepgram HTTP %s: %s", r.status_code, r.text[:200])
  197. return None
  198. try:
  199. data = r.json()
  200. alt = data["results"]["channels"][0]["alternatives"][0]
  201. return alt.get("transcript") or None
  202. except (KeyError, IndexError, ValueError) as e:
  203. logger.warning("Deepgram response malformed: %s", e)
  204. return None
  205. def _clean_chinese_spaces(text: str) -> str:
  206. """Drop whitespace strictly between two CJK characters."""
  207. return _CJK_SPACE_RE.sub("", text)
  208. def _ffprobe_duration_sync(video_url: str, referer: Optional[str] = None) -> Optional[float]:
  209. """Read mp4 moov box over HTTP Range; returns duration (seconds) or None.
  210. Does NOT download the video stream — typically pulls only a few KB even for
  211. multi-GB files. Designed to be called from search() to enrich posts with
  212. duration before scoring, without paying the cost of a full download.
  213. """
  214. cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
  215. "-of", "default=nw=1:nk=1"]
  216. if referer:
  217. cmd += ["-headers", f"Referer: {referer}\r\n"]
  218. cmd += [video_url]
  219. try:
  220. r = subprocess.run(cmd, capture_output=True, text=True, timeout=_DURATION_PROBE_TIMEOUT)
  221. except (subprocess.TimeoutExpired, FileNotFoundError) as e:
  222. logger.info("ffprobe duration probe failed for %s: %s", video_url[:80], e)
  223. return None
  224. out = (r.stdout or "").strip()
  225. if not out:
  226. return None
  227. try:
  228. d = float(out)
  229. except ValueError:
  230. return None
  231. return d if d > 0 else None
  232. async def probe_video_duration(
  233. video_url: str, platform: Optional[str] = None
  234. ) -> Optional[float]:
  235. """Async wrapper. Probes mp4 duration via HTTP Range; returns seconds or None.
  236. Pass `platform` to auto-inject the right Referer header (douyin / sph / xhs / bili
  237. require it). Safe to call concurrently — uses asyncio.to_thread so subprocesses
  238. don't block the event loop. Each call is one ffprobe subprocess; cap parallelism
  239. at the call site if probing many URLs.
  240. """
  241. if not video_url:
  242. return None
  243. referer = _PLATFORM_REFERERS.get(platform) if platform else None
  244. return await asyncio.to_thread(_ffprobe_duration_sync, video_url, referer)
  245. async def probe_durations_for_posts(
  246. platform: str, posts: list, concurrency: int = 8
  247. ) -> None:
  248. """In-place: probe each post's video URL and set post["duration_sec"] if found.
  249. Skips posts with no video URL (image-only posts). Probes happen concurrently
  250. bounded by `concurrency` to avoid spawning a flood of ffprobe subprocesses.
  251. Failures are silent (post just won't have duration_sec — evaluator handles).
  252. """
  253. sem = asyncio.Semaphore(concurrency)
  254. async def _one(post: dict) -> None:
  255. url = extract_video_url(platform, post)
  256. if not url:
  257. return
  258. async with sem:
  259. d = await probe_video_duration(url, platform=platform)
  260. if d is not None:
  261. post["duration_sec"] = d
  262. await asyncio.gather(*[_one(p) for p in posts if isinstance(p, dict)])
  263. def _get_api_key() -> Optional[str]:
  264. key = os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
  265. if key:
  266. return key
  267. try:
  268. from dotenv import load_dotenv
  269. load_dotenv()
  270. except ImportError:
  271. return None
  272. return os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
  273. async def transcribe_video_from_post(
  274. platform: str,
  275. post: dict[str, Any],
  276. *,
  277. model: str = DEEPGRAM_MODEL_DEFAULT,
  278. language: Optional[str] = None,
  279. ) -> Optional[str]:
  280. """End-to-end: locate video, download, extract m4a, STT, clean spaces.
  281. Returns transcript text or None if any step fails (logged at WARNING level).
  282. Caller can safely ignore None and fall back to whatever body text it has.
  283. """
  284. url = extract_video_url(platform, post)
  285. if not url:
  286. return None
  287. api_key = _get_api_key()
  288. if not api_key:
  289. logger.warning("DEEPGRAM_KEY not set; skipping transcription for %s", platform)
  290. return None
  291. stem = _safe_stem(platform, post)
  292. work_dir = _CACHE_ROOT / platform
  293. work_dir.mkdir(parents=True, exist_ok=True)
  294. video_path = work_dir / f"{stem}.mp4"
  295. audio_path = work_dir / f"{stem}.m4a"
  296. video = await _download_video(platform, post, url, video_path)
  297. if not video:
  298. return None
  299. if not await asyncio.to_thread(_extract_m4a, video, audio_path):
  300. return None
  301. transcript = await _transcribe_deepgram(audio_path, api_key, model=model, language=language)
  302. if not transcript:
  303. return None
  304. return _clean_chinese_spaces(transcript).strip()