youtube.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. """
  2. YouTube 平台实现
  3. 后端:crawler.aiddit.com/crawler/youtube
  4. """
  5. import json
  6. import re
  7. import time
  8. from typing import Any, Dict, List, Optional
  9. import httpx
  10. from agent.tools.models import ToolResult
  11. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  12. from agent.tools.builtin.content.registry import (
  13. PlatformDef, ParamSpec, register_platform,
  14. )
  15. CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
  16. DEFAULT_TIMEOUT = 60.0
  17. # ── 字段 normalization:YouTube 后端字段名跟 evaluator/其他平台不一致 ──
  18. #
  19. # evaluator 期待的字段 | YouTube 后端返回的字段
  20. # ------------------------+---------------------------
  21. # channel_content_id | video_id
  22. # body_text | description_snippet
  23. # like_count (int) | view_count ("130,461 views")
  24. # publish_timestamp (ms) | published_time ("6 months ago")
  25. # link | url
  26. # duration_sec (float) | duration ("6:15" or "1:23:45")
  27. # images (list[str]) | thumbnails (list[dict])
  28. # content_type=="video" | (缺失)
  29. # videos | (缺失)
  30. #
  31. # 不做 normalization 的话 evaluator 会走 article 路径 + 8 个字段全找不到,
  32. # 视频拿 15 分 F。
  33. def _parse_duration(s: Any) -> Optional[float]:
  34. """Parse 'MM:SS' or 'HH:MM:SS' to seconds (float)."""
  35. if not isinstance(s, str):
  36. return None
  37. parts = s.strip().split(":")
  38. try:
  39. nums = [int(p) for p in parts]
  40. except ValueError:
  41. return None
  42. if len(nums) == 2:
  43. return float(nums[0] * 60 + nums[1])
  44. if len(nums) == 3:
  45. return float(nums[0] * 3600 + nums[1] * 60 + nums[2])
  46. return None
  47. def _parse_view_count(s: Any) -> Optional[int]:
  48. """Parse '130,461 views' (or '1.2M views') to int."""
  49. if not isinstance(s, str):
  50. return None
  51. s = s.strip()
  52. # "1.2M views" / "3.5K views"
  53. m = re.match(r"([\d.]+)\s*([KMBkmb])\b", s)
  54. if m:
  55. try:
  56. num = float(m.group(1))
  57. except ValueError:
  58. return None
  59. mult = {"K": 1_000, "M": 1_000_000, "B": 1_000_000_000}[m.group(2).upper()]
  60. return int(num * mult)
  61. # "130,461 views"
  62. m = re.search(r"([\d,]+)", s)
  63. if m:
  64. try:
  65. return int(m.group(1).replace(",", ""))
  66. except ValueError:
  67. return None
  68. return None
  69. _RELATIVE_TIME_RE = re.compile(
  70. r"(\d+)\s+(minute|hour|day|week|month|year)s?\s+ago", re.IGNORECASE
  71. )
  72. _SECONDS_PER = {
  73. "minute": 60, "hour": 3600, "day": 86400,
  74. "week": 86400 * 7, "month": 86400 * 30, "year": 86400 * 365,
  75. }
  76. def _parse_relative_time(s: Any) -> Optional[int]:
  77. """Parse '6 months ago' -> UTC milliseconds timestamp."""
  78. if not isinstance(s, str):
  79. return None
  80. m = _RELATIVE_TIME_RE.search(s.lower())
  81. if not m:
  82. return None
  83. n = int(m.group(1))
  84. delta = n * _SECONDS_PER.get(m.group(2).lower(), 0)
  85. if not delta:
  86. return None
  87. return int((time.time() - delta) * 1000)
  88. def _normalize_youtube_post(post: Dict[str, Any]) -> None:
  89. """In-place: rewrite YouTube post fields onto the schema evaluator/transcription expect.
  90. Idempotent — only fills missing fields, never overwrites existing values.
  91. """
  92. if not isinstance(post, dict):
  93. return
  94. if post.get("video_id") and not post.get("channel_content_id"):
  95. post["channel_content_id"] = post["video_id"]
  96. if post.get("description_snippet") and not post.get("body_text"):
  97. post["body_text"] = post["description_snippet"]
  98. if post.get("view_count") and not isinstance(post.get("like_count"), (int, float)):
  99. n = _parse_view_count(post["view_count"])
  100. if n is not None:
  101. post["like_count"] = n
  102. if post.get("published_time") and not post.get("publish_timestamp"):
  103. ts = _parse_relative_time(post["published_time"])
  104. if ts:
  105. post["publish_timestamp"] = ts
  106. if post.get("url") and not post.get("link"):
  107. post["link"] = post["url"]
  108. if post.get("duration") and not isinstance(post.get("duration_sec"), (int, float)):
  109. sec = _parse_duration(post["duration"])
  110. if sec:
  111. post["duration_sec"] = sec
  112. if post.get("thumbnails") and not post.get("images"):
  113. imgs = []
  114. for t in post["thumbnails"]:
  115. if isinstance(t, dict) and t.get("url"):
  116. imgs.append(t["url"])
  117. if imgs:
  118. post["images"] = imgs
  119. if not post.get("content_type"):
  120. post["content_type"] = "video"
  121. if not post.get("videos"):
  122. # transcription.extract_video_url for "youtube" uses video_id directly,
  123. # so this `videos` field is just for evaluator.is_video detection.
  124. url = post.get("url")
  125. if url:
  126. post["videos"] = [url]
  127. # ── 搜索 ──
  128. async def search(
  129. platform_id: str,
  130. keyword: str,
  131. max_count: int = 20,
  132. cursor: str = "",
  133. extras: Optional[Dict[str, Any]] = None,
  134. ) -> ToolResult:
  135. try:
  136. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  137. response = await client.post(
  138. f"{CRAWLER_BASE_URL}/youtube/keyword",
  139. json={"keyword": keyword},
  140. )
  141. response.raise_for_status()
  142. data = response.json()
  143. if data.get("code") != 0:
  144. return ToolResult(title="YouTube 搜索失败", output="", error=data.get("msg", "未知错误"))
  145. result_data = data.get("data", {})
  146. videos = result_data.get("data", []) if isinstance(result_data, dict) else []
  147. # YouTube 字段名跟其他平台不一致,先 normalize 让 evaluator 能正确评分
  148. # (并且让 duration_sec / publish_timestamp 等被解析出来,复用 video-mode 评分)
  149. for v in videos:
  150. _normalize_youtube_post(v)
  151. # 动态导入评价模块
  152. try:
  153. from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
  154. evaluator = SourceQualityEvaluator()
  155. except ImportError:
  156. evaluator = None
  157. # 概览
  158. summary_list = []
  159. for idx, video in enumerate(videos[:max_count], 1):
  160. score_info = {}
  161. if evaluator:
  162. try:
  163. eval_res = evaluator.evaluate_post(video)
  164. score_info = {
  165. "quality_score": eval_res["total_score"],
  166. "quality_grade": eval_res["grade"]
  167. }
  168. video["_quality_score"] = eval_res["total_score"]
  169. video["_quality_grade"] = eval_res["grade"]
  170. except Exception:
  171. pass
  172. summary_item = {
  173. "index": idx,
  174. "title": video.get("title", ""),
  175. "author": video.get("author", ""),
  176. "video_id": video.get("video_id", ""),
  177. }
  178. summary_item.update(score_info)
  179. summary_list.append(summary_item)
  180. # 拼图
  181. images = []
  182. collage_obj = await _build_video_collage(videos[:max_count])
  183. if collage_obj:
  184. images.append(collage_obj)
  185. return ToolResult(
  186. title=f"YouTube: {keyword}",
  187. output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
  188. long_term_memory=f"Searched YouTube for '{keyword}', {len(videos)} results.",
  189. images=images,
  190. metadata={"posts": videos[:max_count]},
  191. )
  192. except Exception as e:
  193. return ToolResult(title="YouTube 搜索异常", output="", error=str(e))
  194. # ── 详情 ──
  195. async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
  196. """
  197. YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。
  198. post 来自搜索缓存,extras 支持 include_captions / download_video。
  199. Graceful degrade: 三条数据通路(/youtube/detail 增强元数据、/youtube/captions 官方字幕、
  200. Deepgram 自研转写)独立进行,任何一条失败都不影响其他。特别是 Deepgram 走的是
  201. yt-dlp 下载 watch URL → ffmpeg → Deepgram API,跟 crawler.aiddit.com 后端无关,
  202. 后端宕机时仍应自动跑 transcript。
  203. """
  204. extras = extras or {}
  205. content_id = post.get("video_id") or post.get("channel_content_id", "")
  206. include_captions = extras.get("include_captions", True)
  207. download_video = extras.get("download_video", False)
  208. include_transcript = extras.get("include_transcript", True)
  209. # ── 1) /youtube/detail:拿增强元数据(标题/描述/点赞等)。失败时用 search post 兜底 ──
  210. video_info: Dict[str, Any] = {}
  211. detail_error: Optional[str] = None
  212. try:
  213. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  214. resp = await client.post(
  215. f"{CRAWLER_BASE_URL}/youtube/detail",
  216. json={"content_id": content_id},
  217. )
  218. resp.raise_for_status()
  219. detail_data = resp.json()
  220. if detail_data.get("code") == 0:
  221. result_data = detail_data.get("data", {})
  222. video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
  223. else:
  224. detail_error = detail_data.get("msg") or "未知错误"
  225. except Exception as e:
  226. detail_error = str(e)
  227. # ── 2) /youtube/captions:官方字幕(也走 crawler 后端,同样可能挂) ──
  228. captions_text: Optional[str] = None
  229. if include_captions or download_video:
  230. try:
  231. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  232. cap_resp = await client.post(
  233. f"{CRAWLER_BASE_URL}/youtube/captions",
  234. json={"content_id": content_id},
  235. )
  236. cap_resp.raise_for_status()
  237. cap_data = cap_resp.json()
  238. if cap_data.get("code") == 0:
  239. inner = cap_data.get("data", {})
  240. if isinstance(inner, dict):
  241. inner2 = inner.get("data", {})
  242. if isinstance(inner2, dict):
  243. captions_text = inner2.get("content")
  244. except Exception:
  245. pass
  246. # ── 3) 视频文件下载(用户显式 extras.download_video=True 时才跑) ──
  247. video_path = None
  248. video_outline = None
  249. if download_video:
  250. import asyncio
  251. try:
  252. from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline
  253. video_path = await asyncio.to_thread(download_youtube_video, content_id)
  254. if captions_text:
  255. video_outline = parse_srt_to_outline(captions_text)
  256. except Exception as e:
  257. import logging
  258. logging.getLogger(__name__).warning("youtube download_video failed: %s", e)
  259. # ── 4) Deepgram 转写:独立于 1)/2),走 yt-dlp+Deepgram,不依赖 crawler 后端 ──
  260. #
  261. # 三态语义(跟 extract_sources / aigc_channel.detail 对齐):
  262. # 字段缺失 → 没尝试过,跑 Deepgram
  263. # 字段 = "" → 尝试过但失败,跳过(保护 Deepgram 额度)
  264. # 字段 = text → 已成功,复用
  265. transcript_text: Optional[str] = post.get("video_transcript") or None
  266. field_present = "video_transcript" in post
  267. transcribe_error: Optional[str] = None
  268. if not field_present and include_transcript:
  269. from agent.tools.builtin.content.transcription import transcribe_video_from_post
  270. if not post.get("video_id"):
  271. post["video_id"] = content_id
  272. try:
  273. transcript_text = await transcribe_video_from_post("youtube", post)
  274. except Exception as e:
  275. import logging
  276. logging.getLogger(__name__).warning("youtube transcribe failed: %s", e)
  277. transcript_text = None
  278. transcribe_error = f"{type(e).__name__}: {e}"
  279. # 三态写回:成功 = text;失败/None = "" 作为"已尝试"标记
  280. final_value = transcript_text or ""
  281. post["video_transcript"] = final_value
  282. if not final_value:
  283. post["_transcribe_error"] = (
  284. transcribe_error
  285. or "transcribe returned None (yt-dlp/Deepgram 任一步失败,见 logger.warning)"
  286. )
  287. # cache writeback 失败的 "" 也写,下次 cache hit 短路
  288. import os as _os
  289. from agent.tools.builtin.content import cache as _cache
  290. trace_id = extras.get("__trace_id__") or _os.getenv("TRACE_ID")
  291. if trace_id and content_id:
  292. _cache.update_post_field(trace_id, "youtube", content_id, "video_transcript", final_value)
  293. # ── 5) 组装输出:detail 接口的字段优先,缺失时用 search post 兜底 ──
  294. output_data = {
  295. "video_id": content_id,
  296. "title": video_info.get("title") or post.get("title", ""),
  297. "channel": video_info.get("channel_account_name") or post.get("author", ""),
  298. "description": (
  299. video_info.get("body_text")
  300. or post.get("body_text")
  301. or post.get("description_snippet", "")
  302. ),
  303. "like_count": (
  304. video_info.get("like_count")
  305. if video_info.get("like_count") is not None
  306. else post.get("like_count")
  307. ),
  308. "comment_count": video_info.get("comment_count"),
  309. "content_link": video_info.get("content_link") or post.get("link", ""),
  310. "captions": captions_text, # YouTube 官方字幕(可能为空)
  311. # Deepgram 转写:读 post 字段,三态语义自然透出("" = 已尝试失败)
  312. "video_transcript": post.get("video_transcript", ""),
  313. }
  314. if detail_error:
  315. # 显式标记 graceful degrade 状态,让上层知道这次走的是 fallback
  316. output_data["_detail_backend_error"] = detail_error
  317. if post.get("_transcribe_error"):
  318. # Deepgram 这一路失败原因透到 output,方便 agent/用户判断要不要重试
  319. output_data["_transcribe_error"] = post["_transcribe_error"]
  320. if download_video:
  321. output_data["video_path"] = video_path
  322. output_data["video_outline"] = video_outline
  323. output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
  324. memory_parts = []
  325. if captions_text:
  326. memory_parts.append("captions")
  327. if transcript_text and transcript_text != captions_text:
  328. memory_parts.append("transcript")
  329. if detail_error:
  330. memory_parts.append(f"degraded(detail backend down)")
  331. memory_extra = f" with {'+'.join(memory_parts)}" if memory_parts else ""
  332. title = video_info.get("title") or post.get("title") or content_id
  333. return ToolResult(
  334. title=f"YouTube 详情: {title}",
  335. output=output_text,
  336. long_term_memory=f"YouTube detail for {content_id}{memory_extra}",
  337. )
  338. # ── 拼图 ──
  339. async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
  340. urls, titles = [], []
  341. for video in videos:
  342. thumb = None
  343. if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
  344. thumb = video["thumbnails"][0].get("url")
  345. elif "thumbnail" in video:
  346. thumb = video.get("thumbnail")
  347. elif "cover_url" in video:
  348. thumb = video.get("cover_url")
  349. if thumb:
  350. urls.append(thumb)
  351. base_title = video.get("title", "")
  352. score = video.get("_quality_score")
  353. if score is not None:
  354. title_with_score = f"[{score}分] {base_title}"
  355. else:
  356. title_with_score = base_title
  357. titles.append(title_with_score)
  358. if not urls:
  359. return None
  360. loaded = await load_images(urls)
  361. valid_images, valid_labels = [], []
  362. for (_, img), title in zip(loaded, titles):
  363. if img is not None:
  364. valid_images.append(img)
  365. valid_labels.append(title)
  366. if not valid_images:
  367. return None
  368. grid = build_image_grid(images=valid_images, labels=valid_labels)
  369. import io
  370. buf = io.BytesIO()
  371. grid.save(buf, format="PNG")
  372. img_bytes = buf.getvalue()
  373. try:
  374. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  375. import hashlib
  376. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  377. filename = f"youtube_collage_{md5_hash}.png"
  378. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  379. return {"type": "url", "url": cdn_url}
  380. except Exception as e:
  381. import logging
  382. logging.getLogger(__name__).warning("Failed to upload youtube collage to CDN: %s", e)
  383. b64, _ = encode_base64(grid, format="PNG")
  384. return {"type": "base64", "media_type": "image/png", "data": b64}
  385. # ── 注册 ──
  386. _YOUTUBE = PlatformDef(
  387. id="youtube",
  388. name="YouTube",
  389. aliases=["yt", "油管"],
  390. detail_extras={
  391. "include_captions": ParamSpec(note="是否获取字幕,默认 True"),
  392. "download_video": ParamSpec(note="是否下载视频到本地,默认 False"),
  393. },
  394. )
  395. _YOUTUBE.search_impl = search
  396. _YOUTUBE.detail_impl = detail
  397. register_platform(_YOUTUBE)