| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- """
- 媒体处理工具
- - extract_video_clip: 从已下载的 YouTube 视频中截取片段
- - download_youtube_video / parse_srt_to_outline: 供 YouTube 详情调用的辅助函数
- """
- import asyncio
- import json
- import subprocess
- import tempfile
- from pathlib import Path
- from typing import Dict, List, Optional
- from agent.tools import tool, ToolResult
- VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
- VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
- # ── 辅助函数(供 platforms/youtube.py 调用) ──
- def download_youtube_video(video_id: str) -> Optional[str]:
- """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
- try:
- output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
- if output_path.exists():
- return str(output_path)
- cmd = [
- "yt-dlp",
- "-f", "best[ext=mp4]",
- "-o", str(output_path),
- f"https://www.youtube.com/watch?v={video_id}",
- ]
- result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
- if result.returncode == 0 and output_path.exists():
- return str(output_path)
- return None
- except Exception:
- return None
- def parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
- """解析 SRT 字幕,生成带时间戳的大纲"""
- if not srt_content:
- return []
- outline = []
- blocks = srt_content.strip().split("\n\n")
- for block in blocks:
- lines = block.strip().split("\n")
- if len(lines) >= 3:
- timestamp_line = lines[1]
- if "-->" in timestamp_line:
- start_time = timestamp_line.split("-->")[0].strip()
- text = " ".join(lines[2:])
- outline.append({"timestamp": start_time, "text": text})
- return outline
- # ── @tool ──
- @tool(groups=["content"])
- async def extract_video_clip(
- video_id: str,
- start_time: str,
- end_time: str,
- output_name: Optional[str] = None,
- ) -> ToolResult:
- """
- 从已下载的 YouTube 视频中截取指定时间段的片段。
- 必须先通过 content_detail(platform="youtube", index=..., extras={"download_video": true})
- 下载视频后才能使用。
- Args:
- video_id: YouTube 视频 ID
- start_time: 开始时间,格式 HH:MM:SS 或 MM:SS
- end_time: 结束时间,格式 HH:MM:SS 或 MM:SS
- output_name: 输出文件名(可选,自动生成)
- """
- source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
- if not source_video.exists():
- return ToolResult(
- title="视频截取失败",
- output="",
- error="源视频不存在,请先使用 content_detail(platform='youtube', ..., extras={'download_video': true}) 下载",
- )
- if not output_name:
- output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
- output_path = VIDEO_DOWNLOAD_DIR / output_name
- cmd = ["ffmpeg", "-i", str(source_video), "-ss", start_time, "-to", end_time, "-c", "copy", "-y", str(output_path)]
- try:
- result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
- except subprocess.TimeoutExpired:
- return ToolResult(title="视频截取超时", output="", error="ffmpeg 超时(60秒)")
- if result.returncode == 0 and output_path.exists():
- file_size = output_path.stat().st_size / (1024 * 1024)
- return ToolResult(
- title=f"视频片段: {start_time} - {end_time}",
- output=json.dumps({
- "video_id": video_id,
- "clip_path": str(output_path),
- "start_time": start_time,
- "end_time": end_time,
- "file_size_mb": round(file_size, 2),
- }, ensure_ascii=False, indent=2),
- long_term_memory=f"Extracted clip from {video_id}: {start_time}-{end_time}",
- )
- return ToolResult(title="视频截取失败", output="", error=f"ffmpeg 执行失败: {result.stderr}")
|