import asyncio import os import subprocess import time from typing import List import cv2 from loguru import logger from mutagen.mp3 import MP3 class FFmpeg(): """获取mp3时长""" @classmethod def get_mp3_duration(cls, file_path): audio = MP3(file_path) duration = audio.info.length if duration: return int(duration) return 0 """ 时间转换 """ @classmethod def seconds_to_srt_time(cls, seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 milliseconds = int((seconds - int(seconds)) * 1000) return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}" """ 获取单个视频时长 """ @classmethod def get_video_duration(cls, video_url): cap = cv2.VideoCapture(video_url) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = int(frame_num / rate) return duration return 0 """视频转为640像素""" @classmethod def video_640(cls, video_path, file_path): video_url = file_path + 'pixelvideo.mp4' try: cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-vf", "scale=360:640", video_url], timeout=420) logger.info(f"[+] 视频转为640像素成功") return video_url except Exception as e: return video_url """ 获取视频宽高 """ @classmethod def get_w_h_size(cls, new_video_path): try: # 获取视频的原始宽高信息 ffprobe_cmd = cls.asyncio_run_subprocess( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", new_video_path], timeout=10) output_decoded = ffprobe_cmd.strip() split_output = [value for value in output_decoded.split(',') if value.strip()] height, width = map(int, split_output) logger.info(f"[+] 获取视频宽高成功") return width, height except ValueError as e: return 1920, 1080 """ 视频裁剪 """ @classmethod def video_crop(cls, new_video_path, video_path_url, pw_random_id): crop_url = video_path_url + str(pw_random_id) + 'crop.mp4' # 获取视频的原始宽高信息 ffprobe_cmd = cls.asyncio_run_subprocess( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", new_video_path], timeout=10) width, height = map(int, ffprobe_cmd.strip().split(',')) # 计算裁剪后的高度 new_height = int(height * 0.8) # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80% cls.asyncio_run_subprocess( [ "ffmpeg", "-i", new_video_path, "-vf", f"crop={width}:{new_height}", "-c:v", "libx264", "-c:a", "aac", "-y", crop_url ], timeout=200) logger.info(f"[+] 视频裁剪成功") return crop_url """ 视频截断 """ @classmethod def video_ggduration(cls, new_video_path, video_path_url, pw_random_id, gg_duration_total): gg_duration_url = video_path_url + str(pw_random_id) + 'gg_duration.mp4' # 获取视频时长 total_duration = cls.get_video_duration(new_video_path) if total_duration == 0: return gg_duration_url duration = int(total_duration) - int(gg_duration_total) if int(total_duration) < int(gg_duration_total): return gg_duration_url cls.asyncio_run_subprocess([ "ffmpeg", "-i", new_video_path, "-c:v", "libx264", "-c:a", "aac", "-t", str(duration), "-y", gg_duration_url ], timeout=200) logger.info(f"[+] 视频截断成功") return gg_duration_url """ 截取原视频最后一帧 """ @classmethod def video_png(cls, new_video_path, video_path_url, pw_random_id): """ jpg_url 生成图片位置 :param new_video_path: 视频地址 :return: """ # 获取视频的原始宽高信息 jpg_url = video_path_url + str(pw_random_id) + 'png.jpg' # 获取视频时长 cls.asyncio_run_subprocess( ["ffmpeg", "-sseof", "-1", '-i', new_video_path, '-frames:v', '1', "-y", jpg_url], timeout=120) logger.info(f"[+] 截取原视频最后一帧成功") return jpg_url """ 获取视频音频 """ @classmethod def get_video_mp3(cls, video_file, video_path_url, pw_random_id): pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3' cls.asyncio_run_subprocess([ 'ffmpeg', '-i', video_file, '-q:a', '0', '-map', 'a', pw_mp3_path ], timeout=120) time.sleep(1) logger.info(f"[+] 获取视频音频成功") return pw_mp3_path """横屏视频改为竖屏""" @classmethod def update_video_h_w(cls, new_video_path, video_path_url, pw_random_id): video_h_w_path = video_path_url + str(pw_random_id) +'video_h_w_video.mp4' cls.asyncio_run_subprocess( ["ffmpeg", "-i", new_video_path, "-vf", "scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2", video_h_w_path], timeout=360) logger.info(f"[+] 横屏视频改为竖屏成功") return video_h_w_path """横屏视频顶部增加字幕""" @classmethod def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text): single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt' single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt' single_video = video_path_url + str(pw_random_id) +'video_zm.mp4' duration = cls.get_video_duration(new_video_path) if duration == 0: return new_video_path start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(duration) # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人' with open(single_video_txt, 'w') as f: f.write(f"file '{new_video_path}'\n") with open(single_video_srt, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n") subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'" draw = f"{subtitle_cmd}" cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", single_video_txt, "-c:v", "libx264", "-c:a", "aac", "-vf", draw, "-y", single_video ], timeout=500) logger.info(f"[+] 横屏视频顶部增加字幕成功") return single_video """ 生成片尾视频 """ @classmethod def pw_video(cls, jpg_url, video_path_url, pw_srt, pw_random_id, pw_mp3_path): # 添加音频到图片 """ jpg_url 图片地址 pw_video 提供的片尾视频 pw_duration 提供的片尾视频时长 new_video_path 视频位置 subtitle_cmd 字幕 pw_url 生成视频地址 :return: """ pw_srt_path = video_path_url + str(pw_random_id) +'pw_video.srt' # 创建临时字幕文件 with open(pw_srt_path, 'w') as f: f.write(pw_srt) # 片尾位置 pw_url_path = video_path_url + str(pw_random_id) + 'pw_video.mp4' # 获取视频时长 pw_duration = cls.get_mp3_duration(pw_mp3_path) if pw_duration == 0: return pw_url_path time.sleep(2) # 添加字幕 wqy-zenhei Hiragino Sans GB height = 1080 margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离 subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'" bg_position_offset = (int(360) - 360//8) / 1.75 background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill" cls.asyncio_run_subprocess([ 'ffmpeg', '-loop', '1', '-i', jpg_url, # 输入的图片文件 '-i', pw_mp3_path, # 输入的音频文件 '-c:v', 'libx264', # 视频编码格式 '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同 '-pix_fmt', 'yuv420p', # 像素格式 '-c:a', 'aac', # 音频编码格式 '-strict', 'experimental', # 使用实验性编码器 '-shortest', # 确保输出视频的长度与音频一致 '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器 pw_url_path # 输出的视频文件路径 ], timeout=300) if os.path.exists(pw_srt_path): os.remove(pw_srt_path) logger.info(f"[+] 生成片尾视频成功") return pw_url_path """视频拼接到一起""" @classmethod def h_b_video(cls, video_path, pw_path, file_path): video_url = file_path + 'hbvideo.mp4' try: cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-i", pw_path, "-filter_complex", "[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]", "-map", "[outv]", "-map", "[outa]", video_url], timeout=500) logger.info(f"[+] 视频拼接成功") return video_url except Exception as e: return video_url """ 单个视频拼接 """ @classmethod def single_video(cls, new_video_path, video_path_url, zm): single_video_url = video_path_url + 'single_video.mp4' single_video_srt = video_path_url + 'single_video.srt' # 获取时长 duration = cls.get_video_duration(new_video_path) if duration == 0: return single_video_url start_time = cls.seconds_to_srt_time(2) end_time = cls.seconds_to_srt_time(duration) single_video_txt = video_path_url + 'single_video.txt' with open(single_video_txt, 'w') as f: f.write(f"file '{new_video_path}'\n") if zm: with open(single_video_srt, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n\u2764\uFE0F{zm}\n\n") subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'" else: subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'" # 多线程数 num_threads = 5 # 构建 FFmpeg 命令,生成视频 cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{single_video_txt}", "-c:v", "libx264", "-c:a", "aac", '-b:v', '260k', "-b:a", "96k", "-threads", str(num_threads), "-vf", subtitle_cmd, "-y", single_video_url ], timeout=400) if os.path.exists(single_video_srt): os.remove(single_video_srt) return single_video_url @classmethod def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str: async def run_subprocess(): process = await asyncio.create_subprocess_exec( params[0], *params[1:], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: out, err = await asyncio.wait_for(process.communicate(), timeout=timeout) if process.returncode != 0: raise IOError(err) return out.decode() except asyncio.TimeoutError: process.kill() out, err = await process.communicate() raise IOError(err) return asyncio.run(run_subprocess()) if __name__ == '__main__': new_video_path = '/Users/tzld/Desktop/video_rewriting/path/output1.mp4' video_path_url = '/Users/tzld/Desktop/video_rewriting/path/' zm = '温馨提示:下方按钮可分享到群' FFmpeg.single_video(new_video_path, video_path_url, zm)