ffmpeg_test.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. import asyncio
  2. import os
  3. import subprocess
  4. import time
  5. from typing import List
  6. import cv2
  7. from loguru import logger
  8. from mutagen.mp3 import MP3
  9. class FFmpeg():
  10. """
  11. 时间转换
  12. """
  13. @classmethod
  14. def seconds_to_srt_time(cls, seconds):
  15. hours = int(seconds // 3600)
  16. minutes = int((seconds % 3600) // 60)
  17. seconds = seconds % 60
  18. milliseconds = int((seconds - int(seconds)) * 1000)
  19. return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
  20. """
  21. 获取单个视频时长
  22. """
  23. @classmethod
  24. def get_video_duration(cls, video_url):
  25. cap = cv2.VideoCapture(video_url)
  26. if cap.isOpened():
  27. rate = cap.get(5)
  28. frame_num = cap.get(7)
  29. duration = int(frame_num / rate)
  30. return duration
  31. return 0
  32. # """
  33. # 获取视频文件的时长(秒)
  34. # """
  35. # @classmethod
  36. # def get_videos_duration(cls, video_file):
  37. # result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
  38. # "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
  39. # return float(result)
  40. """
  41. 获取视频宽高
  42. """
  43. @classmethod
  44. def get_w_h_size(cls, new_video_path):
  45. try:
  46. # 获取视频的原始宽高信息
  47. ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
  48. output_decoded = ffprobe_cmd.strip()
  49. split_output = [value for value in output_decoded.split(',') if value.strip()]
  50. height, width = map(int, split_output)
  51. return width, height
  52. except ValueError as e:
  53. return 1920, 1080
  54. """
  55. 视频裁剪
  56. """
  57. @classmethod
  58. def video_crop(cls, video_path, file_path):
  59. crop_url = file_path + 'crop.mp4'
  60. try:
  61. # 获取视频的原始宽高信息
  62. ffprobe_cmd = cls.asyncio_run_subprocess(
  63. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  64. "csv=p=0", video_path], timeout=10)
  65. width, height = map(int, ffprobe_cmd.strip().split(','))
  66. # 计算裁剪后的高度
  67. new_height = int(height * 0.8)
  68. # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
  69. cls.asyncio_run_subprocess(
  70. [
  71. "ffmpeg",
  72. "-i", video_path,
  73. "-vf", f"crop={width}:{new_height}",
  74. "-c:v", "libx264",
  75. "-c:a", "aac",
  76. "-y",
  77. crop_url
  78. ],timeout=240)
  79. return crop_url
  80. except Exception as e:
  81. return crop_url
  82. """
  83. 视频截断
  84. """
  85. @classmethod
  86. def video_ggduration(cls, video_path, file_path, gg_duration_total):
  87. gg_duration_url = file_path + 'gg_duration.mp4'
  88. # 获取视频时长
  89. try:
  90. total_duration = cls.get_video_duration(video_path)
  91. if total_duration == 0:
  92. return gg_duration_url
  93. duration = int(total_duration) - int(gg_duration_total)
  94. if int(total_duration) < int(gg_duration_total):
  95. return gg_duration_url
  96. cls.asyncio_run_subprocess([
  97. "ffmpeg",
  98. "-i", video_path,
  99. "-c:v", "libx264",
  100. "-c:a", "aac",
  101. "-t", str(duration),
  102. "-y",
  103. gg_duration_url
  104. ], timeout= 360)
  105. return gg_duration_url
  106. except Exception as e:
  107. return gg_duration_url
  108. """
  109. 截取原视频最后一帧
  110. """
  111. @classmethod
  112. def video_png(cls, video_path, file_path):
  113. # 获取视频的原始宽高信息
  114. jpg_url = file_path + 'png.jpg'
  115. try:
  116. cls.asyncio_run_subprocess(
  117. ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1', "-y", jpg_url], timeout=120)
  118. return jpg_url
  119. except Exception as e:
  120. return jpg_url
  121. """
  122. 获取视频音频
  123. """
  124. @classmethod
  125. def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
  126. pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
  127. try:
  128. cls.asyncio_run_subprocess([
  129. 'ffmpeg',
  130. '-i', video_file,
  131. '-q:a', '0',
  132. '-map', 'a',
  133. pw_mp3_path
  134. ], timeout=120)
  135. time.sleep(1)
  136. return pw_mp3_path
  137. except Exception as e:
  138. return pw_mp3_path
  139. """横屏视频改为竖屏"""
  140. @classmethod
  141. def update_video_h_w(cls, video_path, file_path):
  142. video_h_w_path = file_path +'video_h_w_video.mp4'
  143. try:
  144. cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
  145. return video_h_w_path
  146. except Exception as e:
  147. return video_h_w_path
  148. """视频转为640像素"""
  149. @classmethod
  150. def video_640(cls, video_path, file_path):
  151. video_url = file_path + 'pixelvideo.mp4'
  152. try:
  153. cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
  154. return video_url
  155. except Exception as e:
  156. return video_url
  157. """视频拼接到一起"""
  158. @classmethod
  159. def h_b_video(cls, video_path, pw_path, file_path):
  160. video_url = file_path + 'hbvideo.mp4'
  161. try:
  162. cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
  163. return video_url
  164. except Exception as e:
  165. return video_url
  166. """横屏视频顶部增加字幕"""
  167. @classmethod
  168. def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
  169. single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
  170. single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
  171. single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
  172. try:
  173. duration = cls.get_video_duration(new_video_path)
  174. if duration == 0:
  175. return new_video_path
  176. start_time = cls.seconds_to_srt_time(0)
  177. end_time = cls.seconds_to_srt_time(duration)
  178. # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
  179. with open(single_video_txt, 'w') as f:
  180. f.write(f"file '{new_video_path}'\n")
  181. with open(single_video_srt, 'w') as f:
  182. f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
  183. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
  184. draw = f"{subtitle_cmd}"
  185. cls.asyncio_run_subprocess([
  186. "ffmpeg",
  187. "-f", "concat",
  188. "-safe", "0",
  189. "-i", single_video_txt,
  190. "-c:v", "libx264",
  191. "-c:a", "aac",
  192. "-vf", draw,
  193. "-y",
  194. single_video
  195. ],timeout=500)
  196. # subprocess.run(ffmpeg_cmd)
  197. return single_video
  198. except Exception as e:
  199. return single_video
  200. """获取mp3时长"""
  201. @classmethod
  202. def get_mp3_duration(cls, file_path):
  203. audio = MP3(file_path)
  204. duration = audio.info.length
  205. if duration:
  206. return int(duration)
  207. return 0
  208. """
  209. 生成片尾视频
  210. """
  211. @classmethod
  212. def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt):
  213. # 添加音频到图片
  214. """
  215. jpg_url 图片地址
  216. pw_video 提供的片尾视频
  217. pw_duration 提供的片尾视频时长
  218. new_video_path 视频位置
  219. subtitle_cmd 字幕
  220. pw_url 生成视频地址
  221. :return:
  222. """
  223. pw_srt_path = file_path +'pw_video.srt'
  224. with open(pw_srt_path, 'w') as f:
  225. f.write(pw_srt)
  226. pw_url_path = file_path + 'pw_video.mp4'
  227. try:
  228. pw_duration = cls.get_mp3_duration(pw_mp3_path)
  229. if pw_duration == 0:
  230. return pw_url_path
  231. time.sleep(2)
  232. # 添加字幕 wqy-zenhei Hiragino Sans GB
  233. height = 1080
  234. margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
  235. subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
  236. bg_position_offset = (int(360) - 360//8) / 1.75
  237. background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
  238. cls.asyncio_run_subprocess([
  239. 'ffmpeg',
  240. '-loop', '1',
  241. '-i', jpg_path, # 输入的图片文件
  242. '-i', pw_mp3_path, # 输入的音频文件
  243. '-c:v', 'libx264', # 视频编码格式
  244. '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
  245. '-pix_fmt', 'yuv420p', # 像素格式
  246. '-c:a', 'aac', # 音频编码格式
  247. '-strict', 'experimental', # 使用实验性编码器
  248. '-shortest', # 确保输出视频的长度与音频一致
  249. '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
  250. pw_url_path # 输出的视频文件路径
  251. ], timeout=500)
  252. if os.path.exists(pw_srt_path):
  253. os.remove(pw_srt_path)
  254. return pw_url_path
  255. except Exception as e:
  256. return pw_url_path
  257. """
  258. 单个视频拼接
  259. """
  260. @classmethod
  261. def single_video(cls, video_path, file_path, zm):
  262. single_video_url = file_path + 'single_video.mp4'
  263. single_video_srt = file_path + 'single_video.srt'
  264. # 获取时长
  265. try:
  266. duration = cls.get_video_duration(video_path)
  267. if duration == 0:
  268. return single_video_url
  269. start_time = cls.seconds_to_srt_time(2)
  270. end_time = cls.seconds_to_srt_time(duration)
  271. single_video_txt = file_path + 'single_video.txt'
  272. with open(single_video_txt, 'w') as f:
  273. f.write(f"file '{video_path}'\n")
  274. if zm:
  275. with open(single_video_srt, 'w') as f:
  276. f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
  277. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  278. else:
  279. subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  280. # 多线程数
  281. num_threads = 5
  282. # 构建 FFmpeg 命令,生成视频
  283. cls.asyncio_run_subprocess([
  284. "ffmpeg",
  285. "-f", "concat",
  286. "-safe", "0",
  287. "-i", f"{single_video_txt}",
  288. "-c:v", "libx264",
  289. "-c:a", "aac",
  290. '-b:v', '260k',
  291. "-b:a", "96k",
  292. "-threads", str(num_threads),
  293. "-vf", subtitle_cmd,
  294. "-y",
  295. single_video_url
  296. ], timeout=400)
  297. if os.path.exists(single_video_srt):
  298. os.remove(single_video_srt)
  299. return single_video_url
  300. except Exception as e:
  301. return single_video_url
  302. @classmethod
  303. def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
  304. async def run_subprocess():
  305. process = await asyncio.create_subprocess_exec(
  306. params[0],
  307. *params[1:],
  308. stdout=asyncio.subprocess.PIPE,
  309. stderr=asyncio.subprocess.PIPE,
  310. )
  311. try:
  312. out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
  313. if process.returncode != 0:
  314. raise IOError(err)
  315. return out.decode()
  316. except asyncio.TimeoutError:
  317. process.kill()
  318. out, err = await process.communicate()
  319. raise IOError(err)
  320. return asyncio.run(run_subprocess())
  321. if __name__ == '__main__':
  322. FFmpeg.video_png("/Users/z/Downloads/c2ce4ba6-fdb3-4e1b-bdba-dc05cb292abfvideo.mp4", "/Users/z/Downloads/")