""" YouTube 平台实现 后端:crawler.aiddit.com/crawler/youtube """ import json from typing import Any, Dict, List, Optional import httpx from agent.tools.models import ToolResult from agent.tools.utils.image import build_image_grid, encode_base64, load_images from agent.tools.builtin.content.registry import ( PlatformDef, ParamSpec, register_platform, ) CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler" DEFAULT_TIMEOUT = 60.0 # ── 搜索 ── async def search( platform_id: str, keyword: str, max_count: int = 20, cursor: str = "", extras: Optional[Dict[str, Any]] = None, ) -> ToolResult: try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: response = await client.post( f"{CRAWLER_BASE_URL}/youtube/keyword", json={"keyword": keyword}, ) response.raise_for_status() data = response.json() if data.get("code") != 0: return ToolResult(title="YouTube 搜索失败", output="", error=data.get("msg", "未知错误")) result_data = data.get("data", {}) videos = result_data.get("data", []) if isinstance(result_data, dict) else [] # 动态导入评价模块 try: from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator evaluator = SourceQualityEvaluator() except ImportError: evaluator = None # 概览 summary_list = [] for idx, video in enumerate(videos[:max_count], 1): score_info = {} if evaluator: try: eval_res = evaluator.evaluate_post(video) score_info = { "quality_score": eval_res["total_score"], "quality_grade": eval_res["grade"] } video["_quality_score"] = eval_res["total_score"] video["_quality_grade"] = eval_res["grade"] except Exception: pass summary_item = { "index": idx, "title": video.get("title", ""), "author": video.get("author", ""), "video_id": video.get("video_id", ""), } summary_item.update(score_info) summary_list.append(summary_item) # 拼图 images = [] collage_obj = await _build_video_collage(videos[:max_count]) if collage_obj: images.append(collage_obj) return ToolResult( title=f"YouTube: {keyword}", output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2), long_term_memory=f"Searched YouTube for '{keyword}', {len(videos)} results.", images=images, metadata={"posts": videos[:max_count]}, ) except Exception as e: return ToolResult(title="YouTube 搜索异常", output="", error=str(e)) # ── 详情 ── async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult: """ YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。 post 来自搜索缓存,extras 支持 include_captions / download_video。 """ extras = extras or {} content_id = post.get("video_id", "") include_captions = extras.get("include_captions", True) download_video = extras.get("download_video", False) try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: resp = await client.post( f"{CRAWLER_BASE_URL}/youtube/detail", json={"content_id": content_id}, ) resp.raise_for_status() detail_data = resp.json() if detail_data.get("code") != 0: return ToolResult(title="详情获取失败", output="", error=detail_data.get("msg", "未知错误")) result_data = detail_data.get("data", {}) video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {} # 字幕 captions_text = None if include_captions or download_video: try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: cap_resp = await client.post( f"{CRAWLER_BASE_URL}/youtube/captions", json={"content_id": content_id}, ) cap_resp.raise_for_status() cap_data = cap_resp.json() if cap_data.get("code") == 0: inner = cap_data.get("data", {}) if isinstance(inner, dict): inner2 = inner.get("data", {}) if isinstance(inner2, dict): captions_text = inner2.get("content") except Exception: pass # 下载 video_path = None video_outline = None if download_video: import asyncio from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline video_path = await asyncio.to_thread(download_youtube_video, content_id) if captions_text: video_outline = parse_srt_to_outline(captions_text) output_data = { "video_id": content_id, "title": video_info.get("title", ""), "channel": video_info.get("channel_account_name", ""), "description": video_info.get("body_text", ""), "like_count": video_info.get("like_count"), "comment_count": video_info.get("comment_count"), "content_link": video_info.get("content_link", ""), "captions": captions_text, } if download_video: output_data["video_path"] = video_path output_data["video_outline"] = video_outline output_json = json.dumps(output_data, ensure_ascii=False, indent=2) output_text = ( output_json + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。" ) return ToolResult( title=f"YouTube 详情: {video_info.get('title', content_id)}", output=output_text, long_term_memory=f"YouTube detail for {content_id}" + (" with captions" if captions_text else ""), ) except Exception as e: return ToolResult(title="YouTube 详情异常", output="", error=str(e)) # ── 拼图 ── async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]: urls, titles = [], [] for video in videos: thumb = None if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]: thumb = video["thumbnails"][0].get("url") elif "thumbnail" in video: thumb = video.get("thumbnail") elif "cover_url" in video: thumb = video.get("cover_url") if thumb: urls.append(thumb) base_title = video.get("title", "") score = video.get("_quality_score") if score is not None: title_with_score = f"[{score}分] {base_title}" else: title_with_score = base_title titles.append(title_with_score) if not urls: return None loaded = await load_images(urls) valid_images, valid_labels = [], [] for (_, img), title in zip(loaded, titles): if img is not None: valid_images.append(img) valid_labels.append(title) if not valid_images: return None grid = build_image_grid(images=valid_images, labels=valid_labels) import io buf = io.BytesIO() grid.save(buf, format="PNG") img_bytes = buf.getvalue() try: from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss import hashlib md5_hash = hashlib.md5(img_bytes).hexdigest()[:12] filename = f"youtube_collage_{md5_hash}.png" cdn_url = await _upload_bytes_to_oss(img_bytes, filename) return {"type": "url", "url": cdn_url} except Exception as e: import logging logging.getLogger(__name__).warning("Failed to upload youtube collage to CDN: %s", e) b64, _ = encode_base64(grid, format="PNG") return {"type": "base64", "media_type": "image/png", "data": b64} # ── 注册 ── _YOUTUBE = PlatformDef( id="youtube", name="YouTube", aliases=["yt", "油管"], detail_extras={ "include_captions": ParamSpec(note="是否获取字幕,默认 True"), "download_video": ParamSpec(note="是否下载视频到本地,默认 False"), }, ) _YOUTUBE.search_impl = search _YOUTUBE.detail_impl = detail register_platform(_YOUTUBE)