|
|
@@ -0,0 +1,595 @@
|
|
|
+"""
|
|
|
+爬虫服务工具模块
|
|
|
+
|
|
|
+提供 YouTube、X (Twitter) 和微信/通用链接的搜索和详情查询功能。
|
|
|
+"""
|
|
|
+
|
|
|
+import asyncio
|
|
|
+import base64
|
|
|
+import io
|
|
|
+import json
|
|
|
+import math
|
|
|
+import os
|
|
|
+import subprocess
|
|
|
+import tempfile
|
|
|
+from pathlib import Path
|
|
|
+from typing import Optional, List, Dict, Any
|
|
|
+
|
|
|
+import httpx
|
|
|
+from PIL import Image, ImageDraw, ImageFont
|
|
|
+
|
|
|
+from agent.tools import tool, ToolResult
|
|
|
+
|
|
|
+
|
|
|
+# API 配置
|
|
|
+CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
|
|
|
+AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
|
|
|
+DEFAULT_TIMEOUT = 60.0
|
|
|
+
|
|
|
+# 拼接图配置
|
|
|
+THUMB_WIDTH = 250
|
|
|
+THUMB_HEIGHT = 250
|
|
|
+TEXT_HEIGHT = 80
|
|
|
+GRID_COLS = 5
|
|
|
+PADDING = 12
|
|
|
+BG_COLOR = (255, 255, 255)
|
|
|
+TEXT_COLOR = (30, 30, 30)
|
|
|
+INDEX_COLOR = (220, 60, 60)
|
|
|
+
|
|
|
+# 视频处理相关配置
|
|
|
+VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
|
|
|
+VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
|
|
|
+
|
|
|
+
|
|
|
+# ── 辅助函数 ──
|
|
|
+
|
|
|
+def _truncate_text(text: str, max_len: int = 14) -> str:
|
|
|
+ """截断文本,超出部分用省略号"""
|
|
|
+ return text[:max_len] + "..." if len(text) > max_len else text
|
|
|
+
|
|
|
+
|
|
|
+async def _download_image(client: httpx.AsyncClient, url: str) -> Optional[Image.Image]:
|
|
|
+ """下载单张图片,失败返回 None"""
|
|
|
+ try:
|
|
|
+ resp = await client.get(url, timeout=15.0)
|
|
|
+ resp.raise_for_status()
|
|
|
+ return Image.open(io.BytesIO(resp.content)).convert("RGB")
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _build_video_collage(videos: List[Dict[str, Any]]) -> Optional[str]:
|
|
|
+ """
|
|
|
+ 将视频缩略图+序号+标题拼接成网格图,返回 base64 编码的 PNG。
|
|
|
+ """
|
|
|
+ if not videos:
|
|
|
+ return None
|
|
|
+
|
|
|
+ items = []
|
|
|
+ for idx, video in enumerate(videos):
|
|
|
+ thumbnail = None
|
|
|
+ if "thumbnails" in video and isinstance(video["thumbnails"], list) and video["thumbnails"]:
|
|
|
+ thumbnail = video["thumbnails"][0].get("url")
|
|
|
+ elif "thumbnail" in video:
|
|
|
+ thumbnail = video.get("thumbnail")
|
|
|
+ elif "cover_url" in video:
|
|
|
+ thumbnail = video.get("cover_url")
|
|
|
+
|
|
|
+ title = video.get("title", "") or video.get("text", "")
|
|
|
+ if thumbnail:
|
|
|
+ items.append({"url": thumbnail, "title": title, "index": idx + 1})
|
|
|
+ if not items:
|
|
|
+ return None
|
|
|
+
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
+ tasks = [_download_image(client, item["url"]) for item in items]
|
|
|
+ downloaded = await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ valid = [(item, img) for item, img in zip(items, downloaded) if img is not None]
|
|
|
+ if not valid:
|
|
|
+ return None
|
|
|
+
|
|
|
+ cols = min(GRID_COLS, len(valid))
|
|
|
+ rows = math.ceil(len(valid) / cols)
|
|
|
+ cell_w = THUMB_WIDTH + PADDING
|
|
|
+ cell_h = THUMB_HEIGHT + TEXT_HEIGHT + PADDING
|
|
|
+ canvas_w = cols * cell_w + PADDING
|
|
|
+ canvas_h = rows * cell_h + PADDING
|
|
|
+
|
|
|
+ canvas = Image.new("RGB", (canvas_w, canvas_h), BG_COLOR)
|
|
|
+ draw = ImageDraw.Draw(canvas)
|
|
|
+
|
|
|
+ font_title = None
|
|
|
+ font_index = None
|
|
|
+ font_candidates = [
|
|
|
+ "msyh.ttc", "simhei.ttf", "simsun.ttc",
|
|
|
+ "/System/Library/Fonts/PingFang.ttc",
|
|
|
+ "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
|
|
|
+ "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
|
|
|
+ "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
|
|
|
+ ]
|
|
|
+ for font_path in font_candidates:
|
|
|
+ try:
|
|
|
+ font_title = ImageFont.truetype(font_path, 16)
|
|
|
+ font_index = ImageFont.truetype(font_path, 32)
|
|
|
+ break
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
+ if not font_title:
|
|
|
+ font_title = ImageFont.load_default()
|
|
|
+ font_index = font_title
|
|
|
+
|
|
|
+ for item, img in valid:
|
|
|
+ idx = item["index"]
|
|
|
+ col = (idx - 1) % cols
|
|
|
+ row = (idx - 1) // cols
|
|
|
+ x = PADDING + col * cell_w
|
|
|
+ y = PADDING + row * cell_h
|
|
|
+
|
|
|
+ scale = min(THUMB_WIDTH / img.width, THUMB_HEIGHT / img.height)
|
|
|
+ new_w = int(img.width * scale)
|
|
|
+ new_h = int(img.height * scale)
|
|
|
+ thumb = img.resize((new_w, new_h), Image.LANCZOS)
|
|
|
+ offset_x = x + (THUMB_WIDTH - new_w) // 2
|
|
|
+ offset_y = y + (THUMB_HEIGHT - new_h) // 2
|
|
|
+ canvas.paste(thumb, (offset_x, offset_y))
|
|
|
+
|
|
|
+ index_text = str(idx)
|
|
|
+ idx_x = offset_x
|
|
|
+ idx_y = offset_y + 4
|
|
|
+ box_size = 52
|
|
|
+ draw.rectangle([idx_x, idx_y, idx_x + box_size, idx_y + box_size], fill=INDEX_COLOR)
|
|
|
+ bbox = draw.textbbox((0, 0), index_text, font=font_index)
|
|
|
+ tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
|
|
+ text_x = idx_x + (box_size - tw) // 2
|
|
|
+ text_y = idx_y + (box_size - th) // 2
|
|
|
+ draw.text((text_x, text_y), index_text, fill=(255, 255, 255), font=font_index)
|
|
|
+
|
|
|
+ title = item["title"] or ""
|
|
|
+ if title:
|
|
|
+ words = list(title)
|
|
|
+ lines = []
|
|
|
+ current_line = ""
|
|
|
+ for ch in words:
|
|
|
+ test_line = current_line + ch
|
|
|
+ bbox_line = draw.textbbox((0, 0), test_line, font=font_title)
|
|
|
+ if bbox_line[2] - bbox_line[0] > THUMB_WIDTH:
|
|
|
+ if current_line:
|
|
|
+ lines.append(current_line)
|
|
|
+ current_line = ch
|
|
|
+ else:
|
|
|
+ current_line = test_line
|
|
|
+ if current_line:
|
|
|
+ lines.append(current_line)
|
|
|
+ for line_i, line in enumerate(lines):
|
|
|
+ draw.text((x, y + THUMB_HEIGHT + 6 + line_i * 22), line, fill=TEXT_COLOR, font=font_title)
|
|
|
+
|
|
|
+ buf = io.BytesIO()
|
|
|
+ canvas.save(buf, format="PNG")
|
|
|
+ return base64.b64encode(buf.getvalue()).decode("utf-8")
|
|
|
+
|
|
|
+
|
|
|
+def _parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
|
|
|
+ """解析 SRT 字幕,生成带时间戳的大纲"""
|
|
|
+ if not srt_content:
|
|
|
+ return []
|
|
|
+
|
|
|
+ outline = []
|
|
|
+ blocks = srt_content.strip().split('\n\n')
|
|
|
+ for block in blocks:
|
|
|
+ lines = block.strip().split('\n')
|
|
|
+ if len(lines) >= 3:
|
|
|
+ timestamp_line = lines[1]
|
|
|
+ if '-->' in timestamp_line:
|
|
|
+ start_time = timestamp_line.split('-->')[0].strip()
|
|
|
+ text = ' '.join(lines[2:])
|
|
|
+ outline.append({'timestamp': start_time, 'text': text})
|
|
|
+ return outline
|
|
|
+
|
|
|
+
|
|
|
+def _download_youtube_video(video_id: str) -> Optional[str]:
|
|
|
+ """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
|
|
|
+ try:
|
|
|
+ output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
|
|
|
+ if output_path.exists():
|
|
|
+ return str(output_path)
|
|
|
+
|
|
|
+ cmd = [
|
|
|
+ 'yt-dlp',
|
|
|
+ '-f', 'best[ext=mp4]',
|
|
|
+ '-o', str(output_path),
|
|
|
+ f'https://www.youtube.com/watch?v={video_id}'
|
|
|
+ ]
|
|
|
+ result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
|
|
+
|
|
|
+ if result.returncode == 0 and output_path.exists():
|
|
|
+ return str(output_path)
|
|
|
+ return None
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+# ── YouTube 工具 ──
|
|
|
+
|
|
|
+@tool()
|
|
|
+async def youtube_search(keyword: str) -> ToolResult:
|
|
|
+ """
|
|
|
+ 搜索 YouTube 视频
|
|
|
+
|
|
|
+ Args:
|
|
|
+ keyword: 搜索关键词
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 搜索结果列表,包含视频标题、ID、频道等信息
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
+ response = await client.post(
|
|
|
+ f"{CRAWLER_BASE_URL}/youtube/keyword",
|
|
|
+ json={"keyword": keyword}
|
|
|
+ )
|
|
|
+ response.raise_for_status()
|
|
|
+ data = response.json()
|
|
|
+
|
|
|
+ if data.get("code") == 0:
|
|
|
+ result_data = data.get("data", {})
|
|
|
+ videos = result_data.get("data", []) if isinstance(result_data, dict) else []
|
|
|
+
|
|
|
+ images = []
|
|
|
+ collage_b64 = await _build_video_collage(videos)
|
|
|
+ if collage_b64:
|
|
|
+ images.append({
|
|
|
+ "type": "base64",
|
|
|
+ "media_type": "image/png",
|
|
|
+ "data": collage_b64
|
|
|
+ })
|
|
|
+
|
|
|
+ summary_list = []
|
|
|
+ for idx, video in enumerate(videos[:20], 1):
|
|
|
+ title = video.get("title", "")
|
|
|
+ author = video.get("author", "")
|
|
|
+ video_id = video.get("video_id", "")
|
|
|
+ summary_list.append(f"{idx}. {title} - {author} (ID: {video_id})")
|
|
|
+
|
|
|
+ output_data = {
|
|
|
+ "keyword": keyword,
|
|
|
+ "total": len(videos),
|
|
|
+ "summary": summary_list,
|
|
|
+ "data": videos
|
|
|
+ }
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"YouTube 搜索: {keyword}",
|
|
|
+ output=json.dumps(output_data, ensure_ascii=False, indent=2),
|
|
|
+ long_term_memory=f"Searched YouTube for '{keyword}', found {len(videos)} videos",
|
|
|
+ images=images
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return ToolResult(
|
|
|
+ title="YouTube 搜索失败",
|
|
|
+ output="",
|
|
|
+ error=f"搜索失败: {data.get('msg', '未知错误')}"
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return ToolResult(
|
|
|
+ title="YouTube 搜索异常",
|
|
|
+ output="",
|
|
|
+ error=str(e)
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+@tool()
|
|
|
+async def youtube_detail(
|
|
|
+ content_id: str,
|
|
|
+ include_captions: bool = True,
|
|
|
+ download_video: bool = False
|
|
|
+) -> ToolResult:
|
|
|
+ """
|
|
|
+ 获取 YouTube 视频详情(可选包含字幕、下载视频并生成大纲)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ content_id: 视频 ID
|
|
|
+ include_captions: 是否包含字幕,默认 True
|
|
|
+ download_video: 是否下载视频并生成带时间戳的大纲,默认 False。
|
|
|
+ 下载后可使用 extract_video_clip 截取视频片段观看。
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 视频详细信息,包含字幕、视频大纲和本地文件路径
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
+ detail_response = await client.post(
|
|
|
+ f"{CRAWLER_BASE_URL}/youtube/detail",
|
|
|
+ json={"content_id": content_id}
|
|
|
+ )
|
|
|
+ detail_response.raise_for_status()
|
|
|
+ detail_data = detail_response.json()
|
|
|
+
|
|
|
+ if detail_data.get("code") != 0:
|
|
|
+ return ToolResult(
|
|
|
+ title="获取详情失败",
|
|
|
+ output="",
|
|
|
+ error=f"获取详情失败: {detail_data.get('msg', '未知错误')}"
|
|
|
+ )
|
|
|
+
|
|
|
+ result_data = detail_data.get("data", {})
|
|
|
+ video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
|
|
|
+
|
|
|
+ # 获取字幕
|
|
|
+ captions_text = None
|
|
|
+ if include_captions or download_video:
|
|
|
+ try:
|
|
|
+ captions_response = await client.post(
|
|
|
+ f"{CRAWLER_BASE_URL}/youtube/captions",
|
|
|
+ json={"content_id": content_id}
|
|
|
+ )
|
|
|
+ captions_response.raise_for_status()
|
|
|
+ captions_data = captions_response.json()
|
|
|
+
|
|
|
+ if captions_data.get("code") == 0:
|
|
|
+ captions_result = captions_data.get("data", {})
|
|
|
+ if isinstance(captions_result, dict):
|
|
|
+ inner_data = captions_result.get("data", {})
|
|
|
+ if isinstance(inner_data, dict):
|
|
|
+ captions_text = inner_data.get("content")
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # 下载视频并生成大纲
|
|
|
+ video_path = None
|
|
|
+ video_outline = None
|
|
|
+ if download_video:
|
|
|
+ video_path = await asyncio.to_thread(_download_youtube_video, content_id)
|
|
|
+ if captions_text:
|
|
|
+ video_outline = _parse_srt_to_outline(captions_text)
|
|
|
+
|
|
|
+ # 合并数据
|
|
|
+ output_data = {
|
|
|
+ "video_id": content_id,
|
|
|
+ "title": video_info.get("title", ""),
|
|
|
+ "channel": video_info.get("channel_account_name", ""),
|
|
|
+ "description": video_info.get("body_text", ""),
|
|
|
+ "like_count": video_info.get("like_count"),
|
|
|
+ "comment_count": video_info.get("comment_count"),
|
|
|
+ "publish_timestamp": video_info.get("publish_timestamp"),
|
|
|
+ "content_link": video_info.get("content_link", ""),
|
|
|
+ "captions": captions_text,
|
|
|
+ "full_data": video_info
|
|
|
+ }
|
|
|
+
|
|
|
+ if download_video:
|
|
|
+ output_data["video_path"] = video_path
|
|
|
+ output_data["video_outline"] = video_outline
|
|
|
+ if not video_path:
|
|
|
+ output_data["download_error"] = "视频下载失败,请检查 yt-dlp 是否可用"
|
|
|
+
|
|
|
+ memory = f"Retrieved YouTube video details for {content_id}"
|
|
|
+ if captions_text:
|
|
|
+ memory += " with captions"
|
|
|
+ if video_path:
|
|
|
+ memory += f", downloaded to {video_path}"
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"YouTube 视频详情: {content_id}",
|
|
|
+ output=json.dumps(output_data, ensure_ascii=False, indent=2),
|
|
|
+ long_term_memory=memory
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return ToolResult(
|
|
|
+ title="YouTube 详情查询异常",
|
|
|
+ output="",
|
|
|
+ error=str(e)
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+# ── X (Twitter) 工具 ──
|
|
|
+
|
|
|
+@tool()
|
|
|
+async def x_search(keyword: str) -> ToolResult:
|
|
|
+ """
|
|
|
+ 搜索 X (Twitter) 内容(数据已结构化,无需访问详情页)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ keyword: 搜索关键词
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 搜索结果列表,包含推文内容、作者、互动数据等
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
+ response = await client.post(
|
|
|
+ "http://crawler.aiddit.com/crawler/x/keyword",
|
|
|
+ json={"keyword": keyword}
|
|
|
+ )
|
|
|
+ response.raise_for_status()
|
|
|
+ data = response.json()
|
|
|
+
|
|
|
+ if data.get("code") == 0:
|
|
|
+ result_data = data.get("data", {})
|
|
|
+ tweets = result_data.get("data", []) if isinstance(result_data, dict) else []
|
|
|
+
|
|
|
+ # 构建拼接图
|
|
|
+ images = []
|
|
|
+ tweets_with_images = []
|
|
|
+ for tweet in tweets:
|
|
|
+ image_list = tweet.get("image_url_list", [])
|
|
|
+ if image_list:
|
|
|
+ tweet["thumbnails"] = [{"url": image_list[0].get("image_url")}]
|
|
|
+ tweets_with_images.append(tweet)
|
|
|
+
|
|
|
+ collage_b64 = await _build_video_collage(tweets_with_images if tweets_with_images else tweets)
|
|
|
+ if collage_b64:
|
|
|
+ images.append({
|
|
|
+ "type": "base64",
|
|
|
+ "media_type": "image/png",
|
|
|
+ "data": collage_b64
|
|
|
+ })
|
|
|
+
|
|
|
+ summary_list = []
|
|
|
+ for idx, tweet in enumerate(tweets[:20], 1):
|
|
|
+ text = tweet.get("body_text", "")[:100]
|
|
|
+ author = tweet.get("channel_account_name", "")
|
|
|
+ summary_list.append(f"{idx}. @{author}: {text}")
|
|
|
+
|
|
|
+ output_data = {
|
|
|
+ "keyword": keyword,
|
|
|
+ "total": len(tweets),
|
|
|
+ "summary": summary_list,
|
|
|
+ "data": tweets
|
|
|
+ }
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"X 搜索: {keyword}",
|
|
|
+ output=json.dumps(output_data, ensure_ascii=False, indent=2),
|
|
|
+ long_term_memory=f"Searched X (Twitter) for '{keyword}', found {len(tweets)} tweets",
|
|
|
+ images=images
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return ToolResult(
|
|
|
+ title="X 搜索失败",
|
|
|
+ output="",
|
|
|
+ error=f"搜索失败: {data.get('msg', '未知错误')}"
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return ToolResult(
|
|
|
+ title="X 搜索异常",
|
|
|
+ output="",
|
|
|
+ error=str(e)
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+# ── 内容导入工具 ──
|
|
|
+
|
|
|
+@tool()
|
|
|
+async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
|
|
|
+ """
|
|
|
+ 导入长文内容(微信公众号、小红书、抖音等通用链接)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ plan_name: 计划名称
|
|
|
+ content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 导入结果,包含 plan_id
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
|
|
|
+ response = await client.post(
|
|
|
+ f"{AIGC_BASE_URL}/weixin/auto_insert",
|
|
|
+ json={"plan_name": plan_name, "data": content_data}
|
|
|
+ )
|
|
|
+ response.raise_for_status()
|
|
|
+ data = response.json()
|
|
|
+
|
|
|
+ if data.get("code") == 0:
|
|
|
+ result_data = data.get("data", {})
|
|
|
+ return ToolResult(
|
|
|
+ title=f"内容导入: {plan_name}",
|
|
|
+ output=json.dumps(result_data, ensure_ascii=False, indent=2),
|
|
|
+ long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return ToolResult(
|
|
|
+ title="导入失败",
|
|
|
+ output="",
|
|
|
+ error=f"导入失败: {data.get('msg', '未知错误')}"
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return ToolResult(
|
|
|
+ title="内容导入异常",
|
|
|
+ output="",
|
|
|
+ error=str(e)
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+# ── 视频截取工具 ──
|
|
|
+
|
|
|
+@tool()
|
|
|
+async def extract_video_clip(
|
|
|
+ video_id: str,
|
|
|
+ start_time: str,
|
|
|
+ end_time: str,
|
|
|
+ output_name: Optional[str] = None
|
|
|
+) -> ToolResult:
|
|
|
+ """
|
|
|
+ 从已下载的 YouTube 视频中截取指定时间段的片段
|
|
|
+
|
|
|
+ Args:
|
|
|
+ video_id: YouTube 视频 ID(必须先通过 youtube_detail(download_video=True) 下载)
|
|
|
+ start_time: 开始时间,格式: HH:MM:SS 或 MM:SS
|
|
|
+ end_time: 结束时间,格式: HH:MM:SS 或 MM:SS
|
|
|
+ output_name: 输出文件名(可选)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 截取的视频片段路径
|
|
|
+
|
|
|
+ Example:
|
|
|
+ extract_video_clip("dQw4w9WgXcQ", "00:00:10", "00:00:30")
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
|
|
|
+ if not source_video.exists():
|
|
|
+ return ToolResult(
|
|
|
+ title="视频截取失败",
|
|
|
+ output="",
|
|
|
+ error="源视频不存在,请先使用 youtube_detail(download_video=True) 下载视频"
|
|
|
+ )
|
|
|
+
|
|
|
+ if not output_name:
|
|
|
+ output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
|
|
|
+
|
|
|
+ output_path = VIDEO_DOWNLOAD_DIR / output_name
|
|
|
+
|
|
|
+ cmd = [
|
|
|
+ 'ffmpeg',
|
|
|
+ '-i', str(source_video),
|
|
|
+ '-ss', start_time,
|
|
|
+ '-to', end_time,
|
|
|
+ '-c', 'copy',
|
|
|
+ '-y',
|
|
|
+ str(output_path)
|
|
|
+ ]
|
|
|
+
|
|
|
+ result = await asyncio.to_thread(
|
|
|
+ subprocess.run, cmd, capture_output=True, text=True, timeout=60
|
|
|
+ )
|
|
|
+
|
|
|
+ if result.returncode == 0 and output_path.exists():
|
|
|
+ file_size = output_path.stat().st_size / (1024 * 1024)
|
|
|
+
|
|
|
+ output_data = {
|
|
|
+ "video_id": video_id,
|
|
|
+ "clip_path": str(output_path),
|
|
|
+ "start_time": start_time,
|
|
|
+ "end_time": end_time,
|
|
|
+ "file_size_mb": round(file_size, 2)
|
|
|
+ }
|
|
|
+
|
|
|
+ return ToolResult(
|
|
|
+ title=f"视频片段截取成功: {start_time} - {end_time}",
|
|
|
+ output=json.dumps(output_data, ensure_ascii=False, indent=2),
|
|
|
+ long_term_memory=f"Extracted video clip from {video_id}: {start_time} to {end_time}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return ToolResult(
|
|
|
+ title="视频截取失败",
|
|
|
+ output="",
|
|
|
+ error=f"ffmpeg 执行失败: {result.stderr}"
|
|
|
+ )
|
|
|
+
|
|
|
+ except subprocess.TimeoutExpired:
|
|
|
+ return ToolResult(
|
|
|
+ title="视频截取超时",
|
|
|
+ output="",
|
|
|
+ error="视频截取超时(60秒)"
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ return ToolResult(
|
|
|
+ title="视频截取异常",
|
|
|
+ output="",
|
|
|
+ error=str(e)
|
|
|
+ )
|