youtube.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. """
  2. YouTube 平台实现
  3. 后端:crawler.aiddit.com/crawler/youtube
  4. """
  5. import json
  6. from typing import Any, Dict, List, Optional
  7. import httpx
  8. from agent.tools.models import ToolResult
  9. from agent.tools.utils.image import build_image_grid, encode_base64, load_images
  10. from agent.tools.builtin.content.registry import (
  11. PlatformDef, ParamSpec, register_platform,
  12. )
  13. CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
  14. DEFAULT_TIMEOUT = 60.0
  15. # ── 搜索 ──
  16. async def search(
  17. platform_id: str,
  18. keyword: str,
  19. max_count: int = 20,
  20. cursor: str = "",
  21. extras: Optional[Dict[str, Any]] = None,
  22. ) -> ToolResult:
  23. try:
  24. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  25. response = await client.post(
  26. f"{CRAWLER_BASE_URL}/youtube/keyword",
  27. json={"keyword": keyword},
  28. )
  29. response.raise_for_status()
  30. data = response.json()
  31. if data.get("code") != 0:
  32. return ToolResult(title="YouTube 搜索失败", output="", error=data.get("msg", "未知错误"))
  33. result_data = data.get("data", {})
  34. videos = result_data.get("data", []) if isinstance(result_data, dict) else []
  35. # 动态导入评价模块
  36. try:
  37. from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
  38. evaluator = SourceQualityEvaluator()
  39. except ImportError:
  40. evaluator = None
  41. # 概览
  42. summary_list = []
  43. for idx, video in enumerate(videos[:max_count], 1):
  44. score_info = {}
  45. if evaluator:
  46. try:
  47. eval_res = evaluator.evaluate_post(video)
  48. score_info = {
  49. "quality_score": eval_res["total_score"],
  50. "quality_grade": eval_res["grade"]
  51. }
  52. video["_quality_score"] = eval_res["total_score"]
  53. video["_quality_grade"] = eval_res["grade"]
  54. except Exception:
  55. pass
  56. summary_item = {
  57. "index": idx,
  58. "title": video.get("title", ""),
  59. "author": video.get("author", ""),
  60. "video_id": video.get("video_id", ""),
  61. }
  62. summary_item.update(score_info)
  63. summary_list.append(summary_item)
  64. # 拼图
  65. images = []
  66. collage_obj = await _build_video_collage(videos[:max_count])
  67. if collage_obj:
  68. images.append(collage_obj)
  69. return ToolResult(
  70. title=f"YouTube: {keyword}",
  71. output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
  72. long_term_memory=f"Searched YouTube for '{keyword}', {len(videos)} results.",
  73. images=images,
  74. metadata={"posts": videos[:max_count]},
  75. )
  76. except Exception as e:
  77. return ToolResult(title="YouTube 搜索异常", output="", error=str(e))
  78. # ── 详情 ──
  79. async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
  80. """
  81. YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。
  82. post 来自搜索缓存,extras 支持 include_captions / download_video。
  83. """
  84. extras = extras or {}
  85. content_id = post.get("video_id", "")
  86. include_captions = extras.get("include_captions", True)
  87. download_video = extras.get("download_video", False)
  88. try:
  89. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  90. resp = await client.post(
  91. f"{CRAWLER_BASE_URL}/youtube/detail",
  92. json={"content_id": content_id},
  93. )
  94. resp.raise_for_status()
  95. detail_data = resp.json()
  96. if detail_data.get("code") != 0:
  97. return ToolResult(title="详情获取失败", output="", error=detail_data.get("msg", "未知错误"))
  98. result_data = detail_data.get("data", {})
  99. video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
  100. # 字幕
  101. captions_text = None
  102. if include_captions or download_video:
  103. try:
  104. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  105. cap_resp = await client.post(
  106. f"{CRAWLER_BASE_URL}/youtube/captions",
  107. json={"content_id": content_id},
  108. )
  109. cap_resp.raise_for_status()
  110. cap_data = cap_resp.json()
  111. if cap_data.get("code") == 0:
  112. inner = cap_data.get("data", {})
  113. if isinstance(inner, dict):
  114. inner2 = inner.get("data", {})
  115. if isinstance(inner2, dict):
  116. captions_text = inner2.get("content")
  117. except Exception:
  118. pass
  119. # 下载
  120. video_path = None
  121. video_outline = None
  122. if download_video:
  123. import asyncio
  124. from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline
  125. video_path = await asyncio.to_thread(download_youtube_video, content_id)
  126. if captions_text:
  127. video_outline = parse_srt_to_outline(captions_text)
  128. output_data = {
  129. "video_id": content_id,
  130. "title": video_info.get("title", ""),
  131. "channel": video_info.get("channel_account_name", ""),
  132. "description": video_info.get("body_text", ""),
  133. "like_count": video_info.get("like_count"),
  134. "comment_count": video_info.get("comment_count"),
  135. "content_link": video_info.get("content_link", ""),
  136. "captions": captions_text,
  137. }
  138. if download_video:
  139. output_data["video_path"] = video_path
  140. output_data["video_outline"] = video_outline
  141. output_json = json.dumps(output_data, ensure_ascii=False, indent=2)
  142. output_text = (
  143. output_json
  144. + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。"
  145. )
  146. return ToolResult(
  147. title=f"YouTube 详情: {video_info.get('title', content_id)}",
  148. output=output_text,
  149. long_term_memory=f"YouTube detail for {content_id}" + (" with captions" if captions_text else ""),
  150. )
  151. except Exception as e:
  152. return ToolResult(title="YouTube 详情异常", output="", error=str(e))
  153. # ── 拼图 ──
  154. async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
  155. urls, titles = [], []
  156. for video in videos:
  157. thumb = None
  158. if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
  159. thumb = video["thumbnails"][0].get("url")
  160. elif "thumbnail" in video:
  161. thumb = video.get("thumbnail")
  162. elif "cover_url" in video:
  163. thumb = video.get("cover_url")
  164. if thumb:
  165. urls.append(thumb)
  166. base_title = video.get("title", "")
  167. score = video.get("_quality_score")
  168. if score is not None:
  169. title_with_score = f"[{score}分] {base_title}"
  170. else:
  171. title_with_score = base_title
  172. titles.append(title_with_score)
  173. if not urls:
  174. return None
  175. loaded = await load_images(urls)
  176. valid_images, valid_labels = [], []
  177. for (_, img), title in zip(loaded, titles):
  178. if img is not None:
  179. valid_images.append(img)
  180. valid_labels.append(title)
  181. if not valid_images:
  182. return None
  183. grid = build_image_grid(images=valid_images, labels=valid_labels)
  184. import io
  185. buf = io.BytesIO()
  186. grid.save(buf, format="PNG")
  187. img_bytes = buf.getvalue()
  188. try:
  189. from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
  190. import hashlib
  191. md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
  192. filename = f"youtube_collage_{md5_hash}.png"
  193. cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
  194. return {"type": "url", "url": cdn_url}
  195. except Exception as e:
  196. import logging
  197. logging.getLogger(__name__).warning("Failed to upload youtube collage to CDN: %s", e)
  198. b64, _ = encode_base64(grid, format="PNG")
  199. return {"type": "base64", "media_type": "image/png", "data": b64}
  200. # ── 注册 ──
  201. _YOUTUBE = PlatformDef(
  202. id="youtube",
  203. name="YouTube",
  204. aliases=["yt", "油管"],
  205. detail_extras={
  206. "include_captions": ParamSpec(note="是否获取字幕,默认 True"),
  207. "download_video": ParamSpec(note="是否下载视频到本地,默认 False"),
  208. },
  209. )
  210. _YOUTUBE.search_impl = search
  211. _YOUTUBE.detail_impl = detail
  212. register_platform(_YOUTUBE)