ffmpeg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. import asyncio
  2. import os
  3. import subprocess
  4. import time
  5. from typing import List
  6. import cv2
  7. from loguru import logger
  8. from mutagen.mp3 import MP3
  9. class FFmpeg():
  10. """
  11. 时间转换
  12. """
  13. @classmethod
  14. def seconds_to_srt_time(cls, seconds):
  15. hours = int(seconds // 3600)
  16. minutes = int((seconds % 3600) // 60)
  17. seconds = seconds % 60
  18. milliseconds = int((seconds - int(seconds)) * 1000)
  19. return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
  20. """
  21. 获取单个视频时长
  22. """
  23. @classmethod
  24. def get_video_duration(cls, video_url):
  25. cap = cv2.VideoCapture(video_url)
  26. if cap.isOpened():
  27. rate = cap.get(5)
  28. frame_num = cap.get(7)
  29. duration = int(frame_num / rate)
  30. return duration
  31. return 0
  32. # """
  33. # 获取视频文件的时长(秒)
  34. # """
  35. # @classmethod
  36. # def get_videos_duration(cls, video_file):
  37. # result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
  38. # "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
  39. # return float(result)
  40. """
  41. 获取视频宽高
  42. """
  43. @classmethod
  44. def get_w_h_size(cls, new_video_path):
  45. try:
  46. # 获取视频的原始宽高信息
  47. ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
  48. output_decoded = ffprobe_cmd.strip()
  49. split_output = [value for value in output_decoded.split(',') if value.strip()]
  50. height, width = map(int, split_output)
  51. return width, height
  52. except ValueError as e:
  53. return 1920, 1080
  54. """
  55. 视频裁剪
  56. """
  57. @classmethod
  58. def video_crop(cls, video_path, file_path):
  59. crop_url = file_path + 'crop.mp4'
  60. # 获取视频的原始宽高信息
  61. ffprobe_cmd = cls.asyncio_run_subprocess(
  62. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  63. "csv=p=0", video_path], timeout=10)
  64. width, height = map(int, ffprobe_cmd.strip().split(','))
  65. # 计算裁剪后的高度
  66. new_height = int(height * 0.8)
  67. # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
  68. cls.asyncio_run_subprocess(
  69. [
  70. "ffmpeg",
  71. "-i", video_path,
  72. "-vf", f"crop={width}:{new_height}",
  73. "-c:v", "libx264",
  74. "-c:a", "aac",
  75. "-y",
  76. crop_url
  77. ],timeout=240)
  78. return crop_url
  79. """
  80. 视频截断
  81. """
  82. @classmethod
  83. def video_ggduration(cls, video_path, file_path, gg_duration_total):
  84. gg_duration_url = file_path + 'gg_duration.mp4'
  85. # 获取视频时长
  86. total_duration = cls.get_video_duration(video_path)
  87. if total_duration == 0:
  88. return gg_duration_url
  89. duration = int(total_duration) - int(gg_duration_total)
  90. if int(total_duration) < int(gg_duration_total):
  91. return gg_duration_url
  92. cls.asyncio_run_subprocess([
  93. "ffmpeg",
  94. "-i", video_path,
  95. "-c:v", "libx264",
  96. "-c:a", "aac",
  97. "-t", str(duration),
  98. "-y",
  99. gg_duration_url
  100. ], timeout= 360)
  101. return gg_duration_url
  102. """
  103. 截取原视频最后一帧
  104. """
  105. @classmethod
  106. def video_png(cls, video_path, file_path):
  107. """
  108. """
  109. # 获取视频的原始宽高信息
  110. jpg_url = file_path + 'png.jpg'
  111. # 获取视频时长
  112. total_duration = cls.get_video_duration(video_path)
  113. if total_duration == 0:
  114. return jpg_url
  115. time_offset = total_duration - 1 # 提取倒数第一秒的帧
  116. # 获取视频最后一秒,生成.jpg
  117. cls.asyncio_run_subprocess(
  118. ['ffmpeg', '-ss', str(time_offset), '-i', video_path, '-t', str(total_duration), '-vf', 'fps=1,scale=360:640', "-y", jpg_url], timeout=120)
  119. return jpg_url
  120. """
  121. 获取视频音频
  122. """
  123. @classmethod
  124. def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
  125. pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
  126. cls.asyncio_run_subprocess([
  127. 'ffmpeg',
  128. '-i', video_file,
  129. '-q:a', '0',
  130. '-map', 'a',
  131. pw_mp3_path
  132. ], timeout=120)
  133. time.sleep(1)
  134. return pw_mp3_path
  135. """横屏视频改为竖屏"""
  136. @classmethod
  137. def update_video_h_w(cls, video_path, file_path):
  138. video_h_w_path = file_path +'video_h_w_video.mp4'
  139. cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
  140. return video_h_w_path
  141. """视频转为640像素"""
  142. @classmethod
  143. def video_640(cls, video_path, file_path):
  144. video_url = file_path + 'pixelvideo.mp4'
  145. cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
  146. return video_url
  147. """视频拼接到一起"""
  148. @classmethod
  149. def h_b_video(cls, video_path, pw_path, file_path):
  150. video_url = file_path + 'hbvideo.mp4'
  151. cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
  152. return video_url
  153. """横屏视频顶部增加字幕"""
  154. @classmethod
  155. def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
  156. single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
  157. single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
  158. single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
  159. duration = cls.get_video_duration(new_video_path)
  160. if duration == 0:
  161. return new_video_path
  162. start_time = cls.seconds_to_srt_time(0)
  163. end_time = cls.seconds_to_srt_time(duration)
  164. # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
  165. with open(single_video_txt, 'w') as f:
  166. f.write(f"file '{new_video_path}'\n")
  167. with open(single_video_srt, 'w') as f:
  168. f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
  169. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
  170. draw = f"{subtitle_cmd}"
  171. cls.asyncio_run_subprocess([
  172. "ffmpeg",
  173. "-f", "concat",
  174. "-safe", "0",
  175. "-i", single_video_txt,
  176. "-c:v", "libx264",
  177. "-c:a", "aac",
  178. "-vf", draw,
  179. "-y",
  180. single_video
  181. ],timeout=500)
  182. # subprocess.run(ffmpeg_cmd)
  183. return single_video
  184. """获取mp3时长"""
  185. @classmethod
  186. def get_mp3_duration(cls, file_path):
  187. audio = MP3(file_path)
  188. duration = audio.info.length
  189. if duration:
  190. return int(duration)
  191. return 0
  192. """
  193. 生成片尾视频
  194. """
  195. @classmethod
  196. def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt):
  197. # 添加音频到图片
  198. """
  199. jpg_url 图片地址
  200. pw_video 提供的片尾视频
  201. pw_duration 提供的片尾视频时长
  202. new_video_path 视频位置
  203. subtitle_cmd 字幕
  204. pw_url 生成视频地址
  205. :return:
  206. """
  207. pw_srt_path = file_path +'pw_video.srt'
  208. with open(pw_srt_path, 'w') as f:
  209. f.write(pw_srt)
  210. pw_url_path = file_path + 'pw_video.mp4'
  211. pw_duration = cls.get_mp3_duration(pw_mp3_path)
  212. if pw_duration == 0:
  213. return pw_url_path
  214. time.sleep(2)
  215. # 添加字幕 wqy-zenhei Hiragino Sans GB
  216. height = 1080
  217. margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
  218. subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
  219. bg_position_offset = (int(360) - 360//8) / 1.75
  220. background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
  221. cls.asyncio_run_subprocess([
  222. 'ffmpeg',
  223. '-loop', '1',
  224. '-i', jpg_path, # 输入的图片文件
  225. '-i', pw_mp3_path, # 输入的音频文件
  226. '-c:v', 'libx264', # 视频编码格式
  227. '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
  228. '-pix_fmt', 'yuv420p', # 像素格式
  229. '-c:a', 'aac', # 音频编码格式
  230. '-strict', 'experimental', # 使用实验性编码器
  231. '-shortest', # 确保输出视频的长度与音频一致
  232. '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
  233. pw_url_path # 输出的视频文件路径
  234. ], timeout=500)
  235. if os.path.exists(pw_srt_path):
  236. os.remove(pw_srt_path)
  237. return pw_url_path
  238. """
  239. 单个视频拼接
  240. """
  241. @classmethod
  242. def single_video(cls, video_path, file_path, zm):
  243. single_video_url = file_path + 'single_video.mp4'
  244. single_video_srt = file_path + 'single_video.srt'
  245. # 获取时长
  246. duration = cls.get_video_duration(video_path)
  247. if duration == 0:
  248. return single_video_url
  249. start_time = cls.seconds_to_srt_time(2)
  250. end_time = cls.seconds_to_srt_time(duration)
  251. single_video_txt = file_path + 'single_video.txt'
  252. with open(single_video_txt, 'w') as f:
  253. f.write(f"file '{video_path}'\n")
  254. if zm:
  255. with open(single_video_srt, 'w') as f:
  256. f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
  257. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  258. else:
  259. subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  260. # 多线程数
  261. num_threads = 5
  262. # 构建 FFmpeg 命令,生成视频
  263. cls.asyncio_run_subprocess([
  264. "ffmpeg",
  265. "-f", "concat",
  266. "-safe", "0",
  267. "-i", f"{single_video_txt}",
  268. "-c:v", "libx264",
  269. "-c:a", "aac",
  270. '-b:v', '260k',
  271. "-b:a", "96k",
  272. "-threads", str(num_threads),
  273. "-vf", subtitle_cmd,
  274. "-y",
  275. single_video_url
  276. ], timeout=400)
  277. if os.path.exists(single_video_srt):
  278. os.remove(single_video_srt)
  279. return single_video_url
  280. @classmethod
  281. def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
  282. async def run_subprocess():
  283. process = await asyncio.create_subprocess_exec(
  284. params[0],
  285. *params[1:],
  286. stdout=asyncio.subprocess.PIPE,
  287. stderr=asyncio.subprocess.PIPE,
  288. )
  289. try:
  290. out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
  291. if process.returncode != 0:
  292. raise IOError(err)
  293. return out.decode()
  294. except asyncio.TimeoutError:
  295. process.kill()
  296. out, err = await process.communicate()
  297. raise IOError(err)
  298. return asyncio.run(run_subprocess())
  299. if __name__ == '__main__':
  300. FFmpeg.get_w_h_size("/Users/z/Downloads/c2ce4ba6-fdb3-4e1b-bdba-dc05cb292abfvideo.mp4")