import asyncio import json import os import time import traceback from typing import List import cv2 import requests from loguru import logger from mutagen.mp3 import MP3 class FFmpeg(): """ 时间转换 """ @classmethod def seconds_to_srt_time(cls, seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 milliseconds = int((seconds - int(seconds)) * 1000) return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}" """ 获取单个视频时长 """ @classmethod def get_video_duration(cls, video_url): cap = cv2.VideoCapture(video_url) if cap.isOpened(): rate = cap.get(5) frame_num = cap.get(7) duration = int(frame_num / rate) return duration return 0 # """ # 获取视频文件的时长(秒) # """ # @classmethod # def get_videos_duration(cls, video_file): # result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration", # "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10) # return float(result) """ 获取视频宽高 """ @classmethod def get_w_h_size(cls, new_video_path): try: # 获取视频的原始宽高信息 ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10) output_decoded = ffprobe_cmd.strip() split_output = [value for value in output_decoded.split(',') if value.strip()] height, width = map(int, split_output) return width, height except ValueError as e: return 1920, 1080 """ 视频裁剪 """ @classmethod def video_crop(cls, video_path, file_path): crop_url = file_path + 'crop.mp4' try: # 获取视频的原始宽高信息 ffprobe_cmd = cls.asyncio_run_subprocess( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", video_path], timeout=10) width, height = map(int, ffprobe_cmd.strip().split(',')) # 计算裁剪后的高度 new_height = int(height * 0.8) # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80% cls.asyncio_run_subprocess( [ "ffmpeg", "-i", video_path, "-vf", f"crop={width}:{new_height}", "-c:v", "libx264", "-c:a", "aac", "-y", crop_url ],timeout=240) return crop_url except Exception as e: return crop_url """ 视频截断 """ @classmethod def video_ggduration(cls, video_path, file_path, gg_duration_total): gg_duration_url = file_path + 'gg_duration.mp4' # 获取视频时长 try: total_duration = cls.get_video_duration(video_path) logger.info(f"[+]视频总时长{total_duration}") if total_duration == 0: return video_path duration = int(total_duration) - int(gg_duration_total) if int(total_duration) < int(gg_duration_total): return video_path cls.asyncio_run_subprocess([ "ffmpeg", "-i", video_path, "-c:v", "libx264", "-c:a", "aac", "-t", str(duration), "-y", gg_duration_url ], timeout= 360) return gg_duration_url except Exception as e: logger.error(f"修改视频时长发生异常:{e}\n{traceback.format_exc()}") return gg_duration_url """ 截取原视频最后一帧 """ @classmethod def video_png(cls, video_path, file_path): # 获取视频的原始宽高信息 jpg_url = file_path + 'png.jpg' try: cls.asyncio_run_subprocess( ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1', "-y", jpg_url], timeout=120) return jpg_url except Exception as e: logger.error(f"[处理] 数据片尾获取最后一帧失败,{e}") return jpg_url """ 获取视频音频 """ @classmethod def get_video_mp3(cls, video_file, video_path_url, pw_random_id): pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3' try: cls.asyncio_run_subprocess([ 'ffmpeg', '-i', video_file, '-q:a', '0', '-map', 'a', pw_mp3_path ], timeout=120) time.sleep(1) return pw_mp3_path except Exception as e: return pw_mp3_path @classmethod def get_pw_video_mp3(cls, video_file, video_path_url): bgm_pw_path_mp3 = video_file + 'bgm_pw.mp3' try: cls.asyncio_run_subprocess([ 'ffmpeg', '-i', video_path_url, # 输入的视频文件路径 '-q:a', '0', # 设置音频质量为最佳 '-map', 'a', '-y', bgm_pw_path_mp3 ], timeout=120) time.sleep(1) return bgm_pw_path_mp3 except Exception as e: return bgm_pw_path_mp3 """ 片尾增加bgm """ @classmethod def video_add_bgm(cls, video_file, bgm_path, video_path_url): bgm_pw_path = video_path_url + 'bgm_pw.mp4' pw_duration = cls.get_video_duration(video_file) try: pw_path_txt = video_path_url + 'bgm_pw_video.txt' with open(pw_path_txt, 'w') as f: f.write(f"file '{video_file}'\n") cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{pw_path_txt}", # 视频序列输入的文本文件 "-i", bgm_path, # 音频文件 "-c:v", "libx264", # 视频编码格式 "-t", str(pw_duration), # 输出视频的持续时间 '-c:a', 'aac', # 音频编码格式 '-b:v', '260k', # 视频比特率 '-b:a', '96k', # 音频比特率 '-threads', '2', # 线程数 # '-vf', f'{background_cmd},{subtitle_cmd}', # 视频过滤器(背景和字幕) "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]", # 混合音频流 "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频) "-map", "[aout]", # 映射混合后的音频流 '-y', # 强制覆盖输出文件 bgm_pw_path # 输出文件路径 ], timeout=500) time.sleep(1) return bgm_pw_path except Exception as e: return bgm_pw_path """横屏视频改为竖屏""" @classmethod def update_video_h_w(cls, video_path, file_path): video_h_w_path = file_path +'video_h_w_video.mp4' try: cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420) return video_h_w_path except Exception as e: return video_h_w_path """视频转为640像素""" @classmethod def video_640(cls, video_path, file_path): video_url = file_path + 'pixelvideo.mp4' try: cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420) return video_url except Exception as e: return video_url @classmethod def concatenate_videos(cls, videos_paths, file_path): video_url = file_path + 'rg_pw.mp4' list_filename = file_path + 'rg_pw.txt' with open(list_filename, "w") as f: for video_path in videos_paths: f.write(f"file '{video_path}'\n") try: cls.asyncio_run_subprocess( ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420) logger.info(f"[+] 视频转为640像素成功") return video_url except Exception as e: return video_url """视频拼接到一起""" @classmethod def h_b_video(cls, video_path, pw_path, file_path): video_url = file_path + 'hbvideo.mp4' try: cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500) return video_url except Exception as e: return video_url """横屏视频顶部增加字幕""" @classmethod def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text): single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt' single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt' single_video = video_path_url + str(pw_random_id) +'video_zm.mp4' try: duration = cls.get_video_duration(new_video_path) if duration == 0: return new_video_path start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(duration) # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人' with open(single_video_txt, 'w') as f: f.write(f"file '{new_video_path}'\n") with open(single_video_srt, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n") subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'" draw = f"{subtitle_cmd}" cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", single_video_txt, "-c:v", "libx264", "-c:a", "aac", "-vf", draw, "-y", single_video ],timeout=500) # subprocess.run(ffmpeg_cmd) return single_video except Exception as e: return single_video """获取mp3时长""" @classmethod def get_mp3_duration(cls, file_path): audio = MP3(file_path) duration = audio.info.length if duration: return int(duration) return 0 """ 生成片尾视频 """ @classmethod def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt): # 添加音频到图片 """ jpg_url 图片地址 pw_video 提供的片尾视频 pw_duration 提供的片尾视频时长 new_video_path 视频位置 subtitle_cmd 字幕 pw_url 生成视频地址 :return: """ pw_srt_path = file_path +'pw_video.srt' with open(pw_srt_path, 'w') as f: f.write(pw_srt) pw_url_path = file_path + 'pw_video.mp4' try: pw_duration = cls.get_mp3_duration(pw_mp3_path) if pw_duration == 0: return pw_url_path time.sleep(2) # 添加字幕 wqy-zenhei Hiragino Sans GB height = 1080 margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离 subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'" bg_position_offset = (int(360) - 360//8) / 1.75 background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill" if "mp4" in jpg_path: pw_path_txt = file_path + 'pw_path_video.txt' with open(pw_path_txt, 'w') as f: f.write(f"file '{jpg_path}'\n") cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{pw_path_txt}", # 视频序列输入的文本文件 "-i", pw_mp3_path, # 音频文件 "-c:v", "libx264", # 视频编码格式 "-t", str(pw_duration), # 输出视频的持续时间 "-c:a", "aac", # 音频编码格式 "-b:v", "260k", # 视频比特率 "-b:a", "96k", # 音频比特率 "-threads", "2", # 线程数 "-vf", f"{background_cmd},{subtitle_cmd}", # 视频过滤器(背景和字幕) "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频) "-map", "1:a:0", # 映射音频流来自第二个输入文件(音频) "-y", # 强制覆盖输出文件 pw_url_path # 输出文件路径 ], timeout=500) else: cls.asyncio_run_subprocess([ 'ffmpeg', '-loop', '1', '-i', jpg_path, # 输入的图片文件 '-i', pw_mp3_path, # 输入的音频文件 '-c:v', 'libx264', # 视频编码格式 '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同 '-pix_fmt', 'yuv420p', # 像素格式 '-c:a', 'aac', # 音频编码格式 '-strict', 'experimental', # 使用实验性编码器 '-shortest', # 确保输出视频的长度与音频一致 '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器 pw_url_path # 输出的视频文件路径 ], timeout=500) if os.path.exists(pw_srt_path): os.remove(pw_srt_path) return pw_url_path except Exception as e: return pw_url_path """ 单个视频拼接 """ @classmethod def single_video(cls, video_path, file_path, zm): single_video_url = file_path + 'single_video.mp4' single_video_srt = file_path + 'single_video.srt' # 获取时长 try: duration = cls.get_video_duration(video_path) if duration == 0: return single_video_url start_time = cls.seconds_to_srt_time(2) end_time = cls.seconds_to_srt_time(duration) single_video_txt = file_path + 'single_video.txt' with open(single_video_txt, 'w') as f: f.write(f"file '{video_path}'\n") if zm: with open(single_video_srt, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n\u2764\uFE0F{zm}\n\n") subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'" else: subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'" # 多线程数 num_threads = 5 # 构建 FFmpeg 命令,生成视频 cls.asyncio_run_subprocess([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{single_video_txt}", "-c:v", "libx264", "-c:a", "aac", '-b:v', '260k', "-b:a", "96k", "-threads", str(num_threads), "-vf", subtitle_cmd, "-y", single_video_url ], timeout=400) if os.path.exists(single_video_srt): os.remove(single_video_srt) return single_video_url except Exception as e: return single_video_url @classmethod def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str: async def run_subprocess(): process = await asyncio.create_subprocess_exec( params[0], *params[1:], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: out, err = await asyncio.wait_for(process.communicate(), timeout=timeout) if process.returncode != 0: raise IOError(err) return out.decode() except asyncio.TimeoutError: process.kill() out, err = await process.communicate() raise IOError(err) return asyncio.run(run_subprocess()) @classmethod def get_http_duration(cls, videos_path): total_duration = 0 for video_path in videos_path: url = "http://101.37.24.17:5555/api/v1/ffmpeg/get_meta" payload = json.dumps({ "url": video_path, "referer": "" }) headers = { 'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI', 'Content-Type': 'application/json' } try: response = requests.request("POST", url, headers=headers, data=payload, timeout=30) response = response.json() duration = response['data']['streams'][0]['duration'] total_duration += int(float(duration)) except Exception as e: print(f"Error processing {video_path}: {e}") return total_duration if __name__ == '__main__': file_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49' jpg_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49rg_pixelvideo.mp4' mp3_path='/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49pw_video.mp3' pw_srt = """1 00:00:00,000 --> 00:00:02,842 这个视频揭示了中国近代历史上 2 00:00:02,842 --> 00:00:05,685 一个鲜为人知却又极为重要的故 3 00:00:05,685 --> 00:00:05,888 事 4 00:00:05,888 --> 00:00:07,106 真是让人震惊 5 00:00:07,106 --> 00:00:07,715 看完后 6 00:00:07,715 --> 00:00:10,354 我不禁对历史有了更深的思考 7 00:00:10,354 --> 00:00:12,588 让我们一起重温这段历史 8 00:00:12,588 --> 00:00:14,212 提醒自己珍惜当下 9 00:00:14,212 --> 00:00:17,055 我相信很多朋友也会对这个话题 10 00:00:17,055 --> 00:00:17,664 感兴趣 11 00:00:17,664 --> 00:00:20,506 请把这个视频分享到你们的群聊 12 00:00:20,506 --> 00:00:20,709 中 13 00:00:20,709 --> 00:00:22,740 让更多人了解这段历史 14 00:00:22,820 --> 00:00:23,824 共鸣与反思 15 00:00:23,824 --> 00:00:25,430 是我们共同的责任 16 00:00:25,430 --> 00:00:28,242 也许我们能从中汲取更多的智慧 17 00:00:28,242 --> 00:00:28,844 与力量 18 00:00:28,844 --> 00:00:29,848 快动动手指 19 00:00:29,848 --> 00:00:32,659 让我们一起分享这段重要的历史 20 00:00:32,659 --> 00:00:32,860 吧""" FFmpeg.pw_video(jpg_path, file_path, mp3_path, pw_srt)