""" 媒体处理工具 - extract_video_clip: 从已下载的 YouTube 视频中截取片段 - download_youtube_video / parse_srt_to_outline: 供 YouTube 详情调用的辅助函数 """ import asyncio import json import subprocess import tempfile from pathlib import Path from typing import Dict, List, Optional from agent.tools import tool, ToolResult VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos" VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True) # ── 辅助函数(供 platforms/youtube.py 调用) ── def download_youtube_video(video_id: str) -> Optional[str]: """使用 yt-dlp 下载 YouTube 视频,返回文件路径""" try: output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4" if output_path.exists(): return str(output_path) cmd = [ "yt-dlp", "-f", "best[ext=mp4]", "-o", str(output_path), f"https://www.youtube.com/watch?v={video_id}", ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode == 0 and output_path.exists(): return str(output_path) return None except Exception: return None def parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]: """解析 SRT 字幕,生成带时间戳的大纲""" if not srt_content: return [] outline = [] blocks = srt_content.strip().split("\n\n") for block in blocks: lines = block.strip().split("\n") if len(lines) >= 3: timestamp_line = lines[1] if "-->" in timestamp_line: start_time = timestamp_line.split("-->")[0].strip() text = " ".join(lines[2:]) outline.append({"timestamp": start_time, "text": text}) return outline # ── @tool ── @tool(groups=["content"]) async def extract_video_clip( video_id: str, start_time: str, end_time: str, output_name: Optional[str] = None, ) -> ToolResult: """ 从已下载的 YouTube 视频中截取指定时间段的片段。 必须先通过 content_detail(platform="youtube", index=..., extras={"download_video": true}) 下载视频后才能使用。 Args: video_id: YouTube 视频 ID start_time: 开始时间,格式 HH:MM:SS 或 MM:SS end_time: 结束时间,格式 HH:MM:SS 或 MM:SS output_name: 输出文件名(可选,自动生成) """ source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4" if not source_video.exists(): return ToolResult( title="视频截取失败", output="", error="源视频不存在,请先使用 content_detail(platform='youtube', ..., extras={'download_video': true}) 下载", ) if not output_name: output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4" output_path = VIDEO_DOWNLOAD_DIR / output_name cmd = ["ffmpeg", "-i", str(source_video), "-ss", start_time, "-to", end_time, "-c", "copy", "-y", str(output_path)] try: result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60) except subprocess.TimeoutExpired: return ToolResult(title="视频截取超时", output="", error="ffmpeg 超时(60秒)") if result.returncode == 0 and output_path.exists(): file_size = output_path.stat().st_size / (1024 * 1024) return ToolResult( title=f"视频片段: {start_time} - {end_time}", output=json.dumps({ "video_id": video_id, "clip_path": str(output_path), "start_time": start_time, "end_time": end_time, "file_size_mb": round(file_size, 2), }, ensure_ascii=False, indent=2), long_term_memory=f"Extracted clip from {video_id}: {start_time}-{end_time}", ) return ToolResult(title="视频截取失败", output="", error=f"ffmpeg 执行失败: {result.stderr}")