| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- """
- YouTube 平台实现
- 后端:crawler.aiddit.com/crawler/youtube
- """
- import json
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools.models import ToolResult
- from agent.tools.utils.image import build_image_grid, encode_base64, load_images
- from agent.tools.builtin.content.registry import (
- PlatformDef, ParamSpec, register_platform,
- )
- CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
- DEFAULT_TIMEOUT = 60.0
- # ── 搜索 ──
- async def search(
- platform_id: str,
- keyword: str,
- max_count: int = 20,
- cursor: str = "",
- extras: Optional[Dict[str, Any]] = None,
- ) -> ToolResult:
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- response = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/keyword",
- json={"keyword": keyword},
- )
- response.raise_for_status()
- data = response.json()
- if data.get("code") != 0:
- return ToolResult(title="YouTube 搜索失败", output="", error=data.get("msg", "未知错误"))
- result_data = data.get("data", {})
- videos = result_data.get("data", []) if isinstance(result_data, dict) else []
- # 动态导入评价模块
- try:
- from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
- evaluator = SourceQualityEvaluator()
- except ImportError:
- evaluator = None
- # 概览
- summary_list = []
- for idx, video in enumerate(videos[:max_count], 1):
- score_info = {}
- if evaluator:
- try:
- eval_res = evaluator.evaluate_post(video)
- score_info = {
- "quality_score": eval_res["total_score"],
- "quality_grade": eval_res["grade"]
- }
- video["_quality_score"] = eval_res["total_score"]
- video["_quality_grade"] = eval_res["grade"]
- except Exception:
- pass
-
- summary_item = {
- "index": idx,
- "title": video.get("title", ""),
- "author": video.get("author", ""),
- "video_id": video.get("video_id", ""),
- }
- summary_item.update(score_info)
- summary_list.append(summary_item)
- # 拼图
- images = []
- collage_obj = await _build_video_collage(videos[:max_count])
- if collage_obj:
- images.append(collage_obj)
- return ToolResult(
- title=f"YouTube: {keyword}",
- output=json.dumps({"data": summary_list}, ensure_ascii=False, indent=2),
- long_term_memory=f"Searched YouTube for '{keyword}', {len(videos)} results.",
- images=images,
- metadata={"posts": videos[:max_count]},
- )
- except Exception as e:
- return ToolResult(title="YouTube 搜索异常", output="", error=str(e))
- # ── 详情 ──
- async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
- """
- YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。
- post 来自搜索缓存,extras 支持 include_captions / download_video。
- """
- extras = extras or {}
- content_id = post.get("video_id", "")
- include_captions = extras.get("include_captions", True)
- download_video = extras.get("download_video", False)
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/detail",
- json={"content_id": content_id},
- )
- resp.raise_for_status()
- detail_data = resp.json()
- if detail_data.get("code") != 0:
- return ToolResult(title="详情获取失败", output="", error=detail_data.get("msg", "未知错误"))
- result_data = detail_data.get("data", {})
- video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
- # 字幕
- captions_text = None
- if include_captions or download_video:
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- cap_resp = await client.post(
- f"{CRAWLER_BASE_URL}/youtube/captions",
- json={"content_id": content_id},
- )
- cap_resp.raise_for_status()
- cap_data = cap_resp.json()
- if cap_data.get("code") == 0:
- inner = cap_data.get("data", {})
- if isinstance(inner, dict):
- inner2 = inner.get("data", {})
- if isinstance(inner2, dict):
- captions_text = inner2.get("content")
- except Exception:
- pass
- # 下载
- video_path = None
- video_outline = None
- if download_video:
- import asyncio
- from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline
- video_path = await asyncio.to_thread(download_youtube_video, content_id)
- if captions_text:
- video_outline = parse_srt_to_outline(captions_text)
- output_data = {
- "video_id": content_id,
- "title": video_info.get("title", ""),
- "channel": video_info.get("channel_account_name", ""),
- "description": video_info.get("body_text", ""),
- "like_count": video_info.get("like_count"),
- "comment_count": video_info.get("comment_count"),
- "content_link": video_info.get("content_link", ""),
- "captions": captions_text,
- }
- if download_video:
- output_data["video_path"] = video_path
- output_data["video_outline"] = video_outline
- output_json = json.dumps(output_data, ensure_ascii=False, indent=2)
- output_text = (
- output_json
- + "\n\n---\n请基于以上内容,从信息完整度、内容质量和实用价值三个角度,给出一句简短的内容评价。"
- )
- return ToolResult(
- title=f"YouTube 详情: {video_info.get('title', content_id)}",
- output=output_text,
- long_term_memory=f"YouTube detail for {content_id}" + (" with captions" if captions_text else ""),
- )
- except Exception as e:
- return ToolResult(title="YouTube 详情异常", output="", error=str(e))
- # ── 拼图 ──
- async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
- urls, titles = [], []
- for video in videos:
- thumb = None
- if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
- thumb = video["thumbnails"][0].get("url")
- elif "thumbnail" in video:
- thumb = video.get("thumbnail")
- elif "cover_url" in video:
- thumb = video.get("cover_url")
- if thumb:
- urls.append(thumb)
- base_title = video.get("title", "")
- score = video.get("_quality_score")
- if score is not None:
- title_with_score = f"[{score}分] {base_title}"
- else:
- title_with_score = base_title
- titles.append(title_with_score)
- if not urls:
- return None
- loaded = await load_images(urls)
- valid_images, valid_labels = [], []
- for (_, img), title in zip(loaded, titles):
- if img is not None:
- valid_images.append(img)
- valid_labels.append(title)
- if not valid_images:
- return None
- grid = build_image_grid(images=valid_images, labels=valid_labels)
- import io
- buf = io.BytesIO()
- grid.save(buf, format="PNG")
- img_bytes = buf.getvalue()
-
- try:
- from agent.tools.builtin.file.image_cdn import _upload_bytes_to_oss
- import hashlib
-
- md5_hash = hashlib.md5(img_bytes).hexdigest()[:12]
- filename = f"youtube_collage_{md5_hash}.png"
- cdn_url = await _upload_bytes_to_oss(img_bytes, filename)
- return {"type": "url", "url": cdn_url}
- except Exception as e:
- import logging
- logging.getLogger(__name__).warning("Failed to upload youtube collage to CDN: %s", e)
- b64, _ = encode_base64(grid, format="PNG")
- return {"type": "base64", "media_type": "image/png", "data": b64}
- # ── 注册 ──
- _YOUTUBE = PlatformDef(
- id="youtube",
- name="YouTube",
- aliases=["yt", "油管"],
- detail_extras={
- "include_captions": ParamSpec(note="是否获取字幕,默认 True"),
- "download_video": ParamSpec(note="是否下载视频到本地,默认 False"),
- },
- )
- _YOUTUBE.search_impl = search
- _YOUTUBE.detail_impl = detail
- register_platform(_YOUTUBE)
|