| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- from pathlib import Path
- import ffmpeg
- import numpy as np
- class VideoLoader:
- def __init__(self, video_path: Path):
- self.video_path = video_path
- self.get_video_info()
- def get_video_info(self):
- probe = ffmpeg.probe(self.video_path)
- video_info = next(s for s in probe["streams"] if s["codec_type"] == "video")
- width = int(video_info["width"])
- height = int(video_info["height"])
- fps = eval(video_info["r_frame_rate"])
- self.width = width
- self.height = height
- self.fps = fps
- if "nb_frames" in video_info:
- self.total_frames = int(video_info["nb_frames"])
- else:
- # 通过时长计算
- duration = float(video_info.get("duration", probe["format"]["duration"]))
- self.total_frames = int(duration * self.fps)
- original_bitrate = video_info.get("bit_rate", None)
- self.original_bitrate = original_bitrate
- def __len__(self):
- return self.total_frames
- def __iter__(self):
- process_in = (
- ffmpeg.input(self.video_path)
- .output("pipe:", format="rawvideo", pix_fmt="bgr24")
- .global_args("-loglevel", "error")
- .run_async(pipe_stdout=True)
- )
- try:
- while True:
- in_bytes = process_in.stdout.read(self.width * self.height * 3)
- if not in_bytes:
- break
- frame = np.frombuffer(in_bytes, np.uint8).reshape(
- [self.height, self.width, 3]
- )
- yield frame
- finally:
- # 确保进程被清理
- process_in.stdout.close()
- if process_in.stderr:
- process_in.stderr.close()
- process_in.wait()
- if __name__ == "__main__":
- from tqdm import tqdm
- video_path = Path("resources/dog_vs_sam.mp4")
- # 创建 VideoLoader 实例
- loader = VideoLoader(video_path)
- # 显示视频信息
- print(f"视频路径: {video_path}")
- print(f"分辨率: {loader.width}x{loader.height}")
- print(f"帧率: {loader.fps:.2f} fps")
- print(f"总帧数: {loader.total_frames}")
- print(f"时长: {loader.total_frames / loader.fps:.2f} 秒")
- print("-" * 50)
- # 遍历所有帧并显示进度
- frame_count = 0
- for frame in tqdm(loader, total=len(loader), desc="处理视频"):
- frame_count += 1
- # 每隔 30 帧显示一次信息(可选)
- if frame_count % 30 == 0:
- print(
- f"\n第 {frame_count} 帧 - shape: {frame.shape}, dtype: {frame.dtype}, "
- f"min: {frame.min()}, max: {frame.max()}"
- )
- print(f"\n处理完成!共处理 {frame_count} 帧")
- # 测试提前退出(验证资源清理)
- print("\n测试提前退出...")
- loader2 = VideoLoader(video_path)
- for i, frame in enumerate(loader2):
- if i >= 5: # 只读取前 5 帧
- print(f"提前退出,已读取 {i+1} 帧")
- break
- print("资源已正确清理")
|