123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554 |
- import asyncio
- import json
- import os
- import time
- import traceback
- from typing import List
- import cv2
- import requests
- from loguru import logger
- from mutagen.mp3 import MP3
- class FFmpeg():
- """
- 时间转换
- """
- @classmethod
- def seconds_to_srt_time(cls, seconds):
- hours = int(seconds // 3600)
- minutes = int((seconds % 3600) // 60)
- seconds = seconds % 60
- milliseconds = int((seconds - int(seconds)) * 1000)
- return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
- """
- 获取单个视频时长
- """
- @classmethod
- def get_video_duration(cls, video_url):
- cap = cv2.VideoCapture(video_url)
- if cap.isOpened():
- rate = cap.get(5)
- frame_num = cap.get(7)
- duration = int(frame_num / rate)
- return duration
- return 0
- # """
- # 获取视频文件的时长(秒)
- # """
- # @classmethod
- # def get_videos_duration(cls, video_file):
- # result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
- # "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
- # return float(result)
- """
- 获取视频宽高
- """
- @classmethod
- def get_w_h_size(cls, new_video_path):
- try:
- # 获取视频的原始宽高信息
- ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
- output_decoded = ffprobe_cmd.strip()
- split_output = [value for value in output_decoded.split(',') if value.strip()]
- height, width = map(int, split_output)
- return width, height
- except ValueError as e:
- return 1920, 1080
- """
- 视频裁剪
- """
- @classmethod
- def video_crop(cls, video_path, file_path):
- crop_url = file_path + 'crop.mp4'
- try:
- # 获取视频的原始宽高信息
- ffprobe_cmd = cls.asyncio_run_subprocess(
- ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
- "csv=p=0", video_path], timeout=10)
- width, height = map(int, ffprobe_cmd.strip().split(','))
- # 计算裁剪后的高度
- new_height = int(height * 0.8)
- # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
- cls.asyncio_run_subprocess(
- [
- "ffmpeg",
- "-i", video_path,
- "-vf", f"crop={width}:{new_height}",
- "-c:v", "libx264",
- "-c:a", "aac",
- "-y",
- crop_url
- ],timeout=240)
- return crop_url
- except Exception as e:
- return crop_url
- """
- 视频截断
- """
- @classmethod
- def video_ggduration(cls, video_path, file_path, gg_duration_total):
- gg_duration_url = file_path + 'gg_duration.mp4'
- # 获取视频时长
- try:
- total_duration = cls.get_video_duration(video_path)
- logger.info(f"[+]视频总时长{total_duration}")
- if total_duration == 0:
- return video_path
- duration = int(total_duration) - int(gg_duration_total)
- if int(total_duration) < int(gg_duration_total):
- return video_path
- cls.asyncio_run_subprocess([
- "ffmpeg",
- "-i", video_path,
- "-c:v", "libx264",
- "-c:a", "aac",
- "-t", str(duration),
- "-y",
- gg_duration_url
- ], timeout= 360)
- return gg_duration_url
- except Exception as e:
- logger.error(f"修改视频时长发生异常:{e}\n{traceback.format_exc()}")
- return gg_duration_url
- """
- 截取原视频最后一帧
- """
- @classmethod
- def video_png(cls, video_path, file_path):
- # 获取视频的原始宽高信息
- jpg_url = file_path + 'png.jpg'
- try:
- cls.asyncio_run_subprocess(
- ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1', "-y", jpg_url], timeout=120)
- return jpg_url
- except Exception as e:
- logger.error(f"[处理] 数据片尾获取最后一帧失败,{e}")
- return jpg_url
- """
- 获取视频音频
- """
- @classmethod
- def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
- pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
- try:
- cls.asyncio_run_subprocess([
- 'ffmpeg',
- '-i', video_file,
- '-q:a', '0',
- '-map', 'a',
- pw_mp3_path
- ], timeout=120)
- time.sleep(1)
- return pw_mp3_path
- except Exception as e:
- return pw_mp3_path
- @classmethod
- def get_pw_video_mp3(cls, video_file, video_path_url):
- bgm_pw_path_mp3 = video_file + 'bgm_pw.mp3'
- try:
- cls.asyncio_run_subprocess([
- 'ffmpeg',
- '-i', video_path_url, # 输入的视频文件路径
- '-q:a', '0', # 设置音频质量为最佳
- '-map', 'a',
- '-y',
- bgm_pw_path_mp3
- ], timeout=120)
- time.sleep(1)
- return bgm_pw_path_mp3
- except Exception as e:
- return bgm_pw_path_mp3
- """
- 片尾增加bgm
- """
- @classmethod
- def video_add_bgm(cls, video_file, bgm_path, video_path_url):
- bgm_pw_path = video_path_url + 'bgm_pw.mp4'
- pw_duration = cls.get_video_duration(video_file)
- try:
- pw_path_txt = video_path_url + 'bgm_pw_video.txt'
- with open(pw_path_txt, 'w') as f:
- f.write(f"file '{video_file}'\n")
- cls.asyncio_run_subprocess([
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{pw_path_txt}", # 视频序列输入的文本文件
- "-i", bgm_path, # 音频文件
- "-c:v", "libx264", # 视频编码格式
- "-t", str(pw_duration), # 输出视频的持续时间
- '-c:a', 'aac', # 音频编码格式
- '-b:v', '260k', # 视频比特率
- '-b:a', '96k', # 音频比特率
- '-threads', '2', # 线程数
- # '-vf', f'{background_cmd},{subtitle_cmd}', # 视频过滤器(背景和字幕)
- "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]", # 混合音频流
- "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频)
- "-map", "[aout]", # 映射混合后的音频流
- '-y', # 强制覆盖输出文件
- bgm_pw_path # 输出文件路径
- ], timeout=500)
- time.sleep(1)
- return bgm_pw_path
- except Exception as e:
- return bgm_pw_path
- """横屏视频改为竖屏"""
- @classmethod
- def update_video_h_w(cls, video_path, file_path):
- video_h_w_path = file_path +'video_h_w_video.mp4'
- try:
- cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
- return video_h_w_path
- except Exception as e:
- return video_h_w_path
- """视频转为640像素"""
- @classmethod
- def video_640(cls, video_path, file_path):
- video_url = file_path + 'pixelvideo.mp4'
- try:
- cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
- return video_url
- except Exception as e:
- return video_url
- @classmethod
- def concatenate_videos(cls, videos_paths, file_path):
- video_url = file_path + 'rg_pw.mp4'
- list_filename = file_path + 'rg_pw.txt'
- with open(list_filename, "w") as f:
- for video_path in videos_paths:
- f.write(f"file '{video_path}'\n")
- try:
- cls.asyncio_run_subprocess(
- ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
- logger.info(f"[+] 视频转为640像素成功")
- return video_url
- except Exception as e:
- return video_url
- """视频拼接到一起"""
- @classmethod
- def h_b_video(cls, video_path, pw_path, file_path):
- video_url = file_path + 'hbvideo.mp4'
- try:
- cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
- return video_url
- except Exception as e:
- return video_url
- """横屏视频顶部增加字幕"""
- @classmethod
- def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
- single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
- single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
- single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
- try:
- duration = cls.get_video_duration(new_video_path)
- if duration == 0:
- return new_video_path
- start_time = cls.seconds_to_srt_time(0)
- end_time = cls.seconds_to_srt_time(duration)
- # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
- with open(single_video_txt, 'w') as f:
- f.write(f"file '{new_video_path}'\n")
- with open(single_video_srt, 'w') as f:
- f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
- subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
- draw = f"{subtitle_cmd}"
- cls.asyncio_run_subprocess([
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", single_video_txt,
- "-c:v", "libx264",
- "-c:a", "aac",
- "-vf", draw,
- "-y",
- single_video
- ],timeout=500)
- # subprocess.run(ffmpeg_cmd)
- return single_video
- except Exception as e:
- return single_video
- """获取mp3时长"""
- @classmethod
- def get_mp3_duration(cls, file_path):
- audio = MP3(file_path)
- duration = audio.info.length
- if duration:
- return int(duration)
- return 0
- """
- 生成片尾视频
- """
- @classmethod
- def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt):
- # 添加音频到图片
- """
- jpg_url 图片地址
- pw_video 提供的片尾视频
- pw_duration 提供的片尾视频时长
- new_video_path 视频位置
- subtitle_cmd 字幕
- pw_url 生成视频地址
- :return:
- """
- pw_srt_path = file_path +'pw_video.srt'
- with open(pw_srt_path, 'w') as f:
- f.write(pw_srt)
- pw_url_path = file_path + 'pw_video.mp4'
- try:
- pw_duration = cls.get_mp3_duration(pw_mp3_path)
- if pw_duration == 0:
- return pw_url_path
- time.sleep(2)
- # 添加字幕 wqy-zenhei Hiragino Sans GB
- height = 1080
- margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
- subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
- bg_position_offset = (int(360) - 360//8) / 1.75
- background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
- if "mp4" in jpg_path:
- pw_path_txt = file_path + 'pw_path_video.txt'
- with open(pw_path_txt, 'w') as f:
- f.write(f"file '{jpg_path}'\n")
- cls.asyncio_run_subprocess([
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{pw_path_txt}", # 视频序列输入的文本文件
- "-i", pw_mp3_path, # 音频文件
- "-c:v", "libx264", # 视频编码格式
- "-t", str(pw_duration), # 输出视频的持续时间
- "-c:a", "aac", # 音频编码格式
- "-b:v", "260k", # 视频比特率
- "-b:a", "96k", # 音频比特率
- "-threads", "2", # 线程数
- "-vf", f"{background_cmd},{subtitle_cmd}", # 视频过滤器(背景和字幕)
- "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频)
- "-map", "1:a:0", # 映射音频流来自第二个输入文件(音频)
- "-y", # 强制覆盖输出文件
- pw_url_path # 输出文件路径
- ], timeout=500)
- else:
- cls.asyncio_run_subprocess([
- 'ffmpeg',
- '-loop', '1',
- '-i', jpg_path, # 输入的图片文件
- '-i', pw_mp3_path, # 输入的音频文件
- '-c:v', 'libx264', # 视频编码格式
- '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
- '-pix_fmt', 'yuv420p', # 像素格式
- '-c:a', 'aac', # 音频编码格式
- '-strict', 'experimental', # 使用实验性编码器
- '-shortest', # 确保输出视频的长度与音频一致
- '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
- pw_url_path # 输出的视频文件路径
- ], timeout=500)
- if os.path.exists(pw_srt_path):
- os.remove(pw_srt_path)
- return pw_url_path
- except Exception as e:
- return pw_url_path
- """
- 单个视频拼接
- """
- @classmethod
- def single_video(cls, video_path, file_path, zm):
- single_video_url = file_path + 'single_video.mp4'
- single_video_srt = file_path + 'single_video.srt'
- # 获取时长
- try:
- duration = cls.get_video_duration(video_path)
- if duration == 0:
- return single_video_url
- start_time = cls.seconds_to_srt_time(2)
- end_time = cls.seconds_to_srt_time(duration)
- single_video_txt = file_path + 'single_video.txt'
- with open(single_video_txt, 'w') as f:
- f.write(f"file '{video_path}'\n")
- if zm:
- with open(single_video_srt, 'w') as f:
- f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
- subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
- else:
- subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
- # 多线程数
- num_threads = 5
- # 构建 FFmpeg 命令,生成视频
- cls.asyncio_run_subprocess([
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{single_video_txt}",
- "-c:v", "libx264",
- "-c:a", "aac",
- '-b:v', '260k',
- "-b:a", "96k",
- "-threads", str(num_threads),
- "-vf", subtitle_cmd,
- "-y",
- single_video_url
- ], timeout=400)
- if os.path.exists(single_video_srt):
- os.remove(single_video_srt)
- return single_video_url
- except Exception as e:
- return single_video_url
- @classmethod
- def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
- async def run_subprocess():
- process = await asyncio.create_subprocess_exec(
- params[0],
- *params[1:],
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
- if process.returncode != 0:
- raise IOError(err)
- return out.decode()
- except asyncio.TimeoutError:
- process.kill()
- out, err = await process.communicate()
- raise IOError(err)
- return asyncio.run(run_subprocess())
- @classmethod
- def get_http_duration(cls, videos_path):
- total_duration = 0
- for video_path in videos_path:
- url = "http://101.37.24.17:5555/api/v1/ffmpeg/get_meta"
- payload = json.dumps({
- "url": video_path,
- "referer": ""
- })
- headers = {
- 'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI',
- 'Content-Type': 'application/json'
- }
- try:
- response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
- response = response.json()
- duration = response['data']['streams'][0]['duration']
- total_duration += int(float(duration))
- except Exception as e:
- print(f"Error processing {video_path}: {e}")
- return total_duration
- if __name__ == '__main__':
- file_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49'
- jpg_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49rg_pixelvideo.mp4'
- mp3_path='/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49pw_video.mp3'
- pw_srt = """1
- 00:00:00,000 --> 00:00:02,842
- 这个视频揭示了中国近代历史上
- 2
- 00:00:02,842 --> 00:00:05,685
- 一个鲜为人知却又极为重要的故
- 3
- 00:00:05,685 --> 00:00:05,888
- 事
- 4
- 00:00:05,888 --> 00:00:07,106
- 真是让人震惊
- 5
- 00:00:07,106 --> 00:00:07,715
- 看完后
- 6
- 00:00:07,715 --> 00:00:10,354
- 我不禁对历史有了更深的思考
- 7
- 00:00:10,354 --> 00:00:12,588
- 让我们一起重温这段历史
- 8
- 00:00:12,588 --> 00:00:14,212
- 提醒自己珍惜当下
- 9
- 00:00:14,212 --> 00:00:17,055
- 我相信很多朋友也会对这个话题
- 10
- 00:00:17,055 --> 00:00:17,664
- 感兴趣
- 11
- 00:00:17,664 --> 00:00:20,506
- 请把这个视频分享到你们的群聊
- 12
- 00:00:20,506 --> 00:00:20,709
- 中
- 13
- 00:00:20,709 --> 00:00:22,740
- 让更多人了解这段历史
- 14
- 00:00:22,820 --> 00:00:23,824
- 共鸣与反思
- 15
- 00:00:23,824 --> 00:00:25,430
- 是我们共同的责任
- 16
- 00:00:25,430 --> 00:00:28,242
- 也许我们能从中汲取更多的智慧
- 17
- 00:00:28,242 --> 00:00:28,844
- 与力量
- 18
- 00:00:28,844 --> 00:00:29,848
- 快动动手指
- 19
- 00:00:29,848 --> 00:00:32,659
- 让我们一起分享这段重要的历史
- 20
- 00:00:32,659 --> 00:00:32,860
- 吧"""
- FFmpeg.pw_video(jpg_path, file_path, mp3_path, pw_srt)
|