import json
import time
import orjson
import requests
from loguru import logger
from protos import task_pb2, task_pb2_grpc
class FFmpeg(object):
def __init__(self, container_id: str, stub: task_pb2_grpc.TaskRunnerStub):
self.container_id = container_id
self.stub = stub
def seconds_to_srt_time(self, seconds):
"""
时间转换
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = seconds % 60
milliseconds = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
# def get_video_duration(self, video_url):
# """
# 获取单个视频时长
# """
# cap = cv2.VideoCapture(video_url)
# if cap.isOpened():
# rate = cap.get(5)
# frame_num = cap.get(7)
# duration = int(frame_num / rate)
# return duration
# return 0
# """
# 获取视频文件的时长(秒)
# """
# def get_videos_duration(self, video_file):
# result = self.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
# "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
# return float(result)
def get_w_h_size(self, new_video_path):
"""
获取视频宽高
"""
try:
# 获取视频的原始宽高信息
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path])
# ffprobe_cmd = self.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
# output_decoded = ffprobe_cmd.strip()
output_decoded = ''
for response in self.stub.RunCommand(request=request):
output_decoded += response.msg
split_output = [value for value in output_decoded.split(',') if value.strip()]
height, width = map(int, split_output)
return width, height
except ValueError as e:
return 1920, 1080
def video_crop(self, video_path, file_path):
"""
视频裁剪
"""
crop_url = file_path + 'crop.mp4'
try:
# # 获取视频的原始宽高信息
# ffprobe_cmd = cls.asyncio_run_subprocess(
# ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
# "csv=p=0", video_path], timeout=10)
# width, height = map(int, ffprobe_cmd.strip().split(','))
# # 计算裁剪后的高度
# new_height = int(height * 0.8)
# 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-vf", f"\"crop=in_w:in_h*0.8\"", "-c:v", "h264_nvenc", "-c:a", "aac", "-y", "-cq", "28", "-preset", "slow", crop_url])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess(
# [
# "ffmpeg",
# "-i", video_path,
# "-vf", f"crop={width}:{new_height}",
# "-c:v", "libx264",
# "-c:a", "aac",
# "-y",
# crop_url
# ],timeout=240)
return crop_url
except Exception as e:
return crop_url
def video_ggduration(self, video_path, file_path, gg_duration_total):
"""
视频截断
"""
gg_duration_url = file_path + 'gg_duration.mp4'
# 获取视频时长
try:
total_duration = self.get_duration(video_path)
if total_duration == 0:
return gg_duration_url
duration = int(total_duration) - int(gg_duration_total)
if int(total_duration) < int(gg_duration_total):
return gg_duration_url
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-c:v", "h264_nvenc", "-c:a", "aac", "-t", str(duration), "-y", "-cq", "28", "-preset", "slow", gg_duration_url])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# "ffmpeg",
# "-i", video_path,
# "-c:v", "libx264",
# "-c:a", "aac",
# "-t", str(duration),
# "-y",
# gg_duration_url
# ], timeout= 360)
return gg_duration_url
except Exception as e:
return gg_duration_url
def video_png(self, video_path, file_path):
"""
截取原视频最后一帧
"""
# 获取视频的原始宽高信息
jpg_url = file_path + 'png.jpg'
try:
# self.asyncio_run_subprocess(
# ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1', "-y", jpg_url], timeout=120)
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1', "-y", jpg_url])
for _ in self.stub.RunCommand(request=request):
pass
return jpg_url
except Exception as e:
return jpg_url
def get_video_mp3(self, video_file, video_path_url, pw_random_id):
"""
获取视频音频
"""
pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_file, '-q:a', '0', '-map', 'a', pw_mp3_path])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# 'ffmpeg',
# '-i', video_file,
# '-q:a', '0',
# '-map', 'a',
# pw_mp3_path
# ], timeout=120)
time.sleep(1)
return pw_mp3_path
except Exception as e:
return pw_mp3_path
def get_pw_video_mp3(self, video_file, video_path_url):
bgm_pw_path_mp3 = video_file + 'bgm_pw.mp3'
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_path_url, '-q:a', '0', '-map', 'a', bgm_pw_path_mp3])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# 'ffmpeg',
# '-i', video_path_url, # 输入的视频文件路径
# '-q:a', '0', # 设置音频质量为最佳
# '-map', 'a',
# '-y',
# bgm_pw_path_mp3
# ], timeout=120)
# time.sleep(1)
return bgm_pw_path_mp3
except Exception as e:
return bgm_pw_path_mp3
def video_add_bgm(self, video_file, bgm_path, video_path_url):
"""
片尾增加bgm
"""
bgm_pw_path = video_path_url + 'bgm_pw.mp4'
pw_duration = self.get_duration(video_file)
try:
pw_path_txt = video_path_url + 'bgm_pw_video.txt'
# with open(pw_path_txt, 'w') as f:
# f.write(f"file '{video_file}'\n")
payload = f"file '{video_file}'\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
self.stub.PutFile(request=request)
request = task_pb2.RunCommandRequest(id=self.container_id,
command=[
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", f"{pw_path_txt}", # 视频序列输入的文本文件
"-i", bgm_path, # 音频文件
"-c:v", "h264_nvenc", # 视频编码格式
'-c:a', 'aac', # 音频编码格式
"-t", str(pw_duration), # 输出视频的持续时间
'-b:v', '260k', # 视频比特率
'-b:a', '96k', # 音频比特率
'-threads', '2', # 线程数
# '-vf', f'{background_cmd},{subtitle_cmd}', # 视频过滤器(背景和字幕)
"-filter_complex", "\"[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]\"", # 混合音频流
"-map", "\"0:v:0\"", # 映射视频流来自第一个输入文件(视频)
"-map", "\"[aout]\"", # 映射混合后的音频流
'-y', # 强制覆盖输出文件
"-cq", "28",
"-preset", "slow",
bgm_pw_path # 输出文件路径
])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# "ffmpeg",
# "-f", "concat",
# "-safe", "0",
# "-i", f"{pw_path_txt}", # 视频序列输入的文本文件
# "-i", bgm_path, # 音频文件
# "-c:v", "libx264", # 视频编码格式
# "-t", str(pw_duration), # 输出视频的持续时间
# '-c:a', 'aac', # 音频编码格式
# '-b:v', '260k', # 视频比特率
# '-b:a', '96k', # 音频比特率
# '-threads', '2', # 线程数
# # '-vf', f'{background_cmd},{subtitle_cmd}', # 视频过滤器(背景和字幕)
# "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]", # 混合音频流
# "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频)
# "-map", "[aout]", # 映射混合后的音频流
# '-y', # 强制覆盖输出文件
# bgm_pw_path # 输出文件路径
# ], timeout=500)
# time.sleep(1)
return bgm_pw_path
except Exception as e:
return bgm_pw_path
def update_video_h_w(self, video_path, file_path):
"""横屏视频改为竖屏"""
video_h_w_path = file_path +'video_h_w_video.mp4'
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2\"" ,video_h_w_path])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
return video_h_w_path
except Exception as e:
return video_h_w_path
def video_640(self, video_path, file_path):
"""视频转为640像素"""
video_url = file_path + 'pixelvideo.mp4'
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=360:640\"" ,video_url])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
return video_url
except Exception as e:
return video_url
def concatenate_videos(self, videos_paths, file_path):
video_url = file_path + 'rg_pw.mp4'
list_filename = file_path + 'rg_pw.txt'
# with open(list_filename, "w") as f:
# for video_path in videos_paths:
# f.write(f"file '{video_path}'\n")
payload = b''
for video_path in videos_paths:
payload += f"file '{video_path}'\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=list_filename)
self.stub.PutFile(request=request)
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess(
# ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
logger.info(f"[+] 视频转为640像素成功")
return video_url
except Exception as e:
return video_url
def h_b_video(self, video_path, pw_path, file_path):
"""视频拼接到一起"""
video_url = file_path + 'hbvideo.mp4'
try:
request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"\"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]\"" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
return video_url
except Exception as e:
return video_url
def add_video_zm(self, new_video_path, video_path_url, pw_random_id, new_text):
"""横屏视频顶部增加字幕"""
single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
try:
duration = self.get_duration(new_video_path)
if duration == 0:
return new_video_path
start_time = self.seconds_to_srt_time(0)
end_time = self.seconds_to_srt_time(duration)
# zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
payload = f"file '{new_video_path}'\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
self.stub.PutFile(request)
# with open(single_video_txt, 'w') as f:
# f.write(f"file '{new_video_path}'\n")
payload = f"1\n{start_time} --> {end_time}\n{new_text}\n\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
self.stub.PutFile(request)
# with open(single_video_srt, 'w') as f:
# f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
request = task_pb2.RunCommandRequest(id=self.container_id,
command=[
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", single_video_txt,
"-c:v", "h264_nvenc",
"-c:a", "aac",
"-vf", f"\"{subtitle_cmd}\"",
"-y",
"-cq", "28",
"-preset", "slow",
single_video
])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# "ffmpeg",
# "-f", "concat",
# "-safe", "0",
# "-i", single_video_txt,
# "-c:v", "libx264",
# "-c:a", "aac",
# "-vf", draw,
# "-y",
# single_video
# ],timeout=500)
# subprocess.run(ffmpeg_cmd)
return single_video
except Exception as e:
return single_video
def get_duration(self, file_path):
"""获取mp3时长"""
# audio = MP3(file_path)
# duration = audio.info.length
# if duration:
# return int(duration)
request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffprobe', '-show_format', '-show_streams', '-of', 'json', '-v', 'quiet', '-hide_banner', file_path])
msg = ''
for response in self.stub.RunCommand(request):
msg += response.msg
obj = orjson.loads(msg)
return int(float(obj['format']['duration']))
def pw_video(self, jpg_path, file_path, pw_mp3_path, pw_srt):
"""
生成片尾视频
jpg_url 图片地址
pw_video 提供的片尾视频
pw_duration 提供的片尾视频时长
new_video_path 视频位置
subtitle_cmd 字幕
pw_url 生成视频地址
:return:
"""
# 添加音频到图片
pw_srt_path = file_path +'pw_video.srt'
payload = pw_srt.encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_srt_path)
self.stub.PutFile(request=request)
# with open(pw_srt_path, 'w') as f:
# f.write(pw_srt)
pw_url_path = file_path + 'pw_video.mp4'
try:
pw_duration = self.get_duration(pw_mp3_path)
if pw_duration == 0:
return pw_url_path
time.sleep(2)
# 添加字幕 wqy-zenhei Hiragino Sans GB
height = 1080
margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
bg_position_offset = (int(360) - 360//8) / 1.75
background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
if "mp4" in jpg_path:
pw_path_txt = file_path + 'pw_path_video.txt'
payload = f"file '{jpg_path}'\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
self.stub.PutFile(request=request)
# with open(pw_path_txt, 'w') as f:
# f.write(f"file '{jpg_path}'\n")
request = task_pb2.RunCommandRequest(id=self.container_id,
command=[
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", f"{pw_path_txt}", # 视频序列输入的文本文件
"-i", pw_mp3_path, # 音频文件
"-c:v", "h264_nvenc", # 视频编码格式
"-t", str(pw_duration), # 输出视频的持续时间
"-c:a", "aac", # 音频编码格式
"-b:v", "260k", # 视频比特率
"-b:a", "96k", # 音频比特率
"-threads", "2", # 线程数
"-vf", f"\"{background_cmd},{subtitle_cmd}\"", # 视频过滤器(背景和字幕)
"-map", "0:v:0", # 映射视频流来自第一个输入文件(视频)
"-map", "1:a:0", # 映射音频流来自第二个输入文件(音频)
"-y", # 强制覆盖输出文件
"-cq", "28",
"-preset", "slow",
pw_url_path # 输出文件路径
])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# "ffmpeg",
# "-f", "concat",
# "-safe", "0",
# "-i", f"{pw_path_txt}", # 视频序列输入的文本文件
# "-i", pw_mp3_path, # 音频文件
# "-c:v", "libx264", # 视频编码格式
# "-t", str(pw_duration), # 输出视频的持续时间
# "-c:a", "aac", # 音频编码格式
# "-b:v", "260k", # 视频比特率
# "-b:a", "96k", # 音频比特率
# "-threads", "2", # 线程数
# "-vf", f"{background_cmd},{subtitle_cmd}", # 视频过滤器(背景和字幕)
# "-map", "0:v:0", # 映射视频流来自第一个输入文件(视频)
# "-map", "1:a:0", # 映射音频流来自第二个输入文件(音频)
# "-y", # 强制覆盖输出文件
# pw_url_path # 输出文件路径
# ], timeout=500)
else:
request = task_pb2.RunCommandRequest(id=self.container_id,
command=[
'ffmpeg',
'-loop', '1',
'-i', jpg_path, # 输入的图片文件
'-i', pw_mp3_path, # 输入的音频文件
'-c:v', 'h264_nvenc', # 视频编码格式
'-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
'-pix_fmt', 'yuv420p', # 像素格式
'-c:a', 'aac', # 音频编码格式
'-strict', 'experimental', # 使用实验性编码器
'-shortest', # 确保输出视频的长度与音频一致
'-vf', f"\"{background_cmd},{subtitle_cmd}\"", # 视频过滤器,设置分辨率和其他过滤器
"-cq", "28",
"-preset", "slow",
pw_url_path # 输出的视频文件路径
])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# 'ffmpeg',
# '-loop', '1',
# '-i', jpg_path, # 输入的图片文件
# '-i', pw_mp3_path, # 输入的音频文件
# '-c:v', 'libx264', # 视频编码格式
# '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
# '-pix_fmt', 'yuv420p', # 像素格式
# '-c:a', 'aac', # 音频编码格式
# '-strict', 'experimental', # 使用实验性编码器
# '-shortest', # 确保输出视频的长度与音频一致
# '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
# pw_url_path # 输出的视频文件路径
# ], timeout=500)
# if os.path.exists(pw_srt_path):
# os.remove(pw_srt_path)
return pw_url_path
except Exception as e:
return pw_url_path
def single_video(self, video_path, file_path, zm):
"""
单个视频拼接
"""
single_video_url = file_path + 'single_video.mp4'
single_video_srt = file_path + 'single_video.srt'
# 获取时长
try:
duration = self.get_duration(video_path)
if duration == 0:
return single_video_url
start_time = self.seconds_to_srt_time(2)
end_time = self.seconds_to_srt_time(duration)
single_video_txt = file_path + 'single_video.txt'
# with open(single_video_txt, 'w') as f:
# f.write(f"file '{video_path}'\n")
payload = f"file '{video_path}'\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
self.stub.PutFile(request=request)
if zm:
payload = f"1\n{start_time} --> {end_time}\n\u2764\uFE0F{zm}\n\n".encode()
request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
self.stub.PutFile(request=request)
# with open(single_video_srt, 'w') as f:
# f.write(f"1\n{start_time} --> {end_time}\n\u2764\uFE0F{zm}\n\n")
subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
else:
subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
# 多线程数
num_threads = 5
# 构建 FFmpeg 命令,生成视频
request = task_pb2.RunCommandRequest(id=self.container_id,
command=[
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", f"{single_video_txt}",
"-c:v", "h264_nvenc",
"-c:a", "aac",
'-b:v', '260k',
"-b:a", "96k",
"-threads", str(num_threads),
"-vf", f"\"{subtitle_cmd}\"",
"-y",
"-cq", "28",
"-preset", "slow",
single_video_url
])
for _ in self.stub.RunCommand(request=request):
pass
# self.asyncio_run_subprocess([
# "ffmpeg",
# "-f", "concat",
# "-safe", "0",
# "-i", f"{single_video_txt}",
# "-c:v", "libx264",
# "-c:a", "aac",
# '-b:v', '260k',
# "-b:a", "96k",
# "-threads", str(num_threads),
# "-vf", subtitle_cmd,
# "-y",
# single_video_url
# ], timeout=400)
# if os.path.exists(single_video_srt):
# os.remove(single_video_srt)
return single_video_url
except Exception as e:
return single_video_url
# def asyncio_run_subprocess(self, params: List[str], timeout: int = 30) -> str:
# async def run_subprocess():
# process = await asyncio.create_subprocess_exec(
# params[0],
# *params[1:],
# stdout=asyncio.subprocess.PIPE,
# stderr=asyncio.subprocess.PIPE,
# )
# try:
# out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
# if process.returncode != 0:
# raise IOError(err)
# return out.decode()
# except asyncio.TimeoutError:
# process.kill()
# out, err = await process.communicate()
# raise IOError(err)
# return asyncio.run(run_subprocess())
def get_http_duration(self, videos_path):
total_duration = 0
for video_path in videos_path:
url = "http://101.37.24.17:5555/api/v1/ffmpeg/get_meta"
payload = json.dumps({
"url": video_path,
"referer": ""
})
headers = {
'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI',
'Content-Type': 'application/json'
}
try:
response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
response = response.json()
duration = response['data']['streams'][0]['duration']
total_duration += int(float(duration))
except Exception as e:
print(f"Error processing {video_path}: {e}")
return total_duration
if __name__ == '__main__':
file_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49'
jpg_path = '/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49rg_pixelvideo.mp4'
mp3_path='/Users/z/Downloads/478de0b6-4e52-44a5-a5d4-967b2cf8ce49pw_video.mp3'
pw_srt = """1
00:00:00,000 --> 00:00:02,842
这个视频揭示了中国近代历史上
2
00:00:02,842 --> 00:00:05,685
一个鲜为人知却又极为重要的故
3
00:00:05,685 --> 00:00:05,888
事
4
00:00:05,888 --> 00:00:07,106
真是让人震惊
5
00:00:07,106 --> 00:00:07,715
看完后
6
00:00:07,715 --> 00:00:10,354
我不禁对历史有了更深的思考
7
00:00:10,354 --> 00:00:12,588
让我们一起重温这段历史
8
00:00:12,588 --> 00:00:14,212
提醒自己珍惜当下
9
00:00:14,212 --> 00:00:17,055
我相信很多朋友也会对这个话题
10
00:00:17,055 --> 00:00:17,664
感兴趣
11
00:00:17,664 --> 00:00:20,506
请把这个视频分享到你们的群聊
12
00:00:20,506 --> 00:00:20,709
中
13
00:00:20,709 --> 00:00:22,740
让更多人了解这段历史
14
00:00:22,820 --> 00:00:23,824
共鸣与反思
15
00:00:23,824 --> 00:00:25,430
是我们共同的责任
16
00:00:25,430 --> 00:00:28,242
也许我们能从中汲取更多的智慧
17
00:00:28,242 --> 00:00:28,844
与力量
18
00:00:28,844 --> 00:00:29,848
快动动手指
19
00:00:29,848 --> 00:00:32,659
让我们一起分享这段重要的历史
20
00:00:32,659 --> 00:00:32,860
吧"""
FFmpeg.pw_video(jpg_path, file_path, mp3_path, pw_srt)