video_utils.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from pathlib import Path
  2. import ffmpeg
  3. import numpy as np
  4. class VideoLoader:
  5. def __init__(self, video_path: Path):
  6. self.video_path = video_path
  7. self.get_video_info()
  8. def get_video_info(self):
  9. probe = ffmpeg.probe(self.video_path)
  10. video_info = next(s for s in probe["streams"] if s["codec_type"] == "video")
  11. width = int(video_info["width"])
  12. height = int(video_info["height"])
  13. fps = eval(video_info["r_frame_rate"])
  14. self.width = width
  15. self.height = height
  16. self.fps = fps
  17. if "nb_frames" in video_info:
  18. self.total_frames = int(video_info["nb_frames"])
  19. else:
  20. # 通过时长计算
  21. duration = float(video_info.get("duration", probe["format"]["duration"]))
  22. self.total_frames = int(duration * self.fps)
  23. original_bitrate = video_info.get("bit_rate", None)
  24. self.original_bitrate = original_bitrate
  25. def __len__(self):
  26. return self.total_frames
  27. def __iter__(self):
  28. process_in = (
  29. ffmpeg.input(self.video_path)
  30. .output("pipe:", format="rawvideo", pix_fmt="bgr24")
  31. .global_args("-loglevel", "error")
  32. .run_async(pipe_stdout=True)
  33. )
  34. try:
  35. while True:
  36. in_bytes = process_in.stdout.read(self.width * self.height * 3)
  37. if not in_bytes:
  38. break
  39. frame = np.frombuffer(in_bytes, np.uint8).reshape(
  40. [self.height, self.width, 3]
  41. )
  42. yield frame
  43. finally:
  44. # 确保进程被清理
  45. process_in.stdout.close()
  46. if process_in.stderr:
  47. process_in.stderr.close()
  48. process_in.wait()
  49. if __name__ == "__main__":
  50. from tqdm import tqdm
  51. video_path = Path("resources/dog_vs_sam.mp4")
  52. # 创建 VideoLoader 实例
  53. loader = VideoLoader(video_path)
  54. # 显示视频信息
  55. print(f"视频路径: {video_path}")
  56. print(f"分辨率: {loader.width}x{loader.height}")
  57. print(f"帧率: {loader.fps:.2f} fps")
  58. print(f"总帧数: {loader.total_frames}")
  59. print(f"时长: {loader.total_frames / loader.fps:.2f} 秒")
  60. print("-" * 50)
  61. # 遍历所有帧并显示进度
  62. frame_count = 0
  63. for frame in tqdm(loader, total=len(loader), desc="处理视频"):
  64. frame_count += 1
  65. # 每隔 30 帧显示一次信息(可选)
  66. if frame_count % 30 == 0:
  67. print(
  68. f"\n第 {frame_count} 帧 - shape: {frame.shape}, dtype: {frame.dtype}, "
  69. f"min: {frame.min()}, max: {frame.max()}"
  70. )
  71. print(f"\n处理完成!共处理 {frame_count} 帧")
  72. # 测试提前退出(验证资源清理)
  73. print("\n测试提前退出...")
  74. loader2 = VideoLoader(video_path)
  75. for i, frame in enumerate(loader2):
  76. if i >= 5: # 只读取前 5 帧
  77. print(f"提前退出,已读取 {i+1} 帧")
  78. break
  79. print("资源已正确清理")