media.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. """
  2. 媒体处理工具
  3. - extract_video_clip: 从已下载的 YouTube 视频中截取片段
  4. - download_youtube_video / parse_srt_to_outline: 供 YouTube 详情调用的辅助函数
  5. """
  6. import asyncio
  7. import json
  8. import subprocess
  9. import tempfile
  10. from pathlib import Path
  11. from typing import Dict, List, Optional
  12. from agent.tools import tool, ToolResult
  13. VIDEO_DOWNLOAD_DIR = Path(tempfile.gettempdir()) / "youtube_videos"
  14. VIDEO_DOWNLOAD_DIR.mkdir(exist_ok=True)
  15. # ── 辅助函数(供 platforms/youtube.py 调用) ──
  16. def download_youtube_video(video_id: str) -> Optional[str]:
  17. """使用 yt-dlp 下载 YouTube 视频,返回文件路径"""
  18. try:
  19. output_path = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
  20. if output_path.exists():
  21. return str(output_path)
  22. cmd = [
  23. "yt-dlp",
  24. "-f", "best[ext=mp4]",
  25. "-o", str(output_path),
  26. f"https://www.youtube.com/watch?v={video_id}",
  27. ]
  28. result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
  29. if result.returncode == 0 and output_path.exists():
  30. return str(output_path)
  31. return None
  32. except Exception:
  33. return None
  34. def parse_srt_to_outline(srt_content: str) -> List[Dict[str, str]]:
  35. """解析 SRT 字幕,生成带时间戳的大纲"""
  36. if not srt_content:
  37. return []
  38. outline = []
  39. blocks = srt_content.strip().split("\n\n")
  40. for block in blocks:
  41. lines = block.strip().split("\n")
  42. if len(lines) >= 3:
  43. timestamp_line = lines[1]
  44. if "-->" in timestamp_line:
  45. start_time = timestamp_line.split("-->")[0].strip()
  46. text = " ".join(lines[2:])
  47. outline.append({"timestamp": start_time, "text": text})
  48. return outline
  49. # ── @tool ──
  50. @tool(groups=["content"])
  51. async def extract_video_clip(
  52. video_id: str,
  53. start_time: str,
  54. end_time: str,
  55. output_name: Optional[str] = None,
  56. ) -> ToolResult:
  57. """
  58. 从已下载的 YouTube 视频中截取指定时间段的片段。
  59. 必须先通过 content_detail(platform="youtube", index=..., extras={"download_video": true})
  60. 下载视频后才能使用。
  61. Args:
  62. video_id: YouTube 视频 ID
  63. start_time: 开始时间,格式 HH:MM:SS 或 MM:SS
  64. end_time: 结束时间,格式 HH:MM:SS 或 MM:SS
  65. output_name: 输出文件名(可选,自动生成)
  66. """
  67. source_video = VIDEO_DOWNLOAD_DIR / f"{video_id}.mp4"
  68. if not source_video.exists():
  69. return ToolResult(
  70. title="视频截取失败",
  71. output="",
  72. error="源视频不存在,请先使用 content_detail(platform='youtube', ..., extras={'download_video': true}) 下载",
  73. )
  74. if not output_name:
  75. output_name = f"{video_id}_clip_{start_time.replace(':', '-')}_{end_time.replace(':', '-')}.mp4"
  76. output_path = VIDEO_DOWNLOAD_DIR / output_name
  77. cmd = ["ffmpeg", "-i", str(source_video), "-ss", start_time, "-to", end_time, "-c", "copy", "-y", str(output_path)]
  78. try:
  79. result = await asyncio.to_thread(subprocess.run, cmd, capture_output=True, text=True, timeout=60)
  80. except subprocess.TimeoutExpired:
  81. return ToolResult(title="视频截取超时", output="", error="ffmpeg 超时(60秒)")
  82. if result.returncode == 0 and output_path.exists():
  83. file_size = output_path.stat().st_size / (1024 * 1024)
  84. return ToolResult(
  85. title=f"视频片段: {start_time} - {end_time}",
  86. output=json.dumps({
  87. "video_id": video_id,
  88. "clip_path": str(output_path),
  89. "start_time": start_time,
  90. "end_time": end_time,
  91. "file_size_mb": round(file_size, 2),
  92. }, ensure_ascii=False, indent=2),
  93. long_term_memory=f"Extracted clip from {video_id}: {start_time}-{end_time}",
  94. )
  95. return ToolResult(title="视频截取失败", output="", error=f"ffmpeg 执行失败: {result.stderr}")