ffmpeg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import asyncio
  2. import os
  3. import subprocess
  4. import time
  5. from typing import List
  6. import cv2
  7. from loguru import logger
  8. from mutagen.mp3 import MP3
  9. class FFmpeg():
  10. """获取mp3时长"""
  11. @classmethod
  12. def get_mp3_duration(cls, file_path):
  13. audio = MP3(file_path)
  14. duration = audio.info.length
  15. if duration:
  16. return int(duration)
  17. return 0
  18. """
  19. 时间转换
  20. """
  21. @classmethod
  22. def seconds_to_srt_time(cls, seconds):
  23. hours = int(seconds // 3600)
  24. minutes = int((seconds % 3600) // 60)
  25. seconds = seconds % 60
  26. milliseconds = int((seconds - int(seconds)) * 1000)
  27. return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
  28. """
  29. 获取单个视频时长
  30. """
  31. @classmethod
  32. def get_video_duration(cls, video_url):
  33. cap = cv2.VideoCapture(video_url)
  34. if cap.isOpened():
  35. rate = cap.get(5)
  36. frame_num = cap.get(7)
  37. duration = int(frame_num / rate)
  38. return duration
  39. return 0
  40. """视频转为640像素"""
  41. @classmethod
  42. def video_640(cls, video_path, file_path):
  43. video_url = file_path + 'pixelvideo.mp4'
  44. try:
  45. cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-vf", "scale=360:640", video_url], timeout=420)
  46. return video_url
  47. except Exception as e:
  48. return video_url
  49. """
  50. 获取视频宽高
  51. """
  52. @classmethod
  53. def get_w_h_size(cls, new_video_path):
  54. try:
  55. # 获取视频的原始宽高信息
  56. ffprobe_cmd = cls.asyncio_run_subprocess(
  57. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  58. "csv=p=0", new_video_path], timeout=10)
  59. output_decoded = ffprobe_cmd.strip()
  60. split_output = [value for value in output_decoded.split(',') if value.strip()]
  61. height, width = map(int, split_output)
  62. return width, height
  63. except ValueError as e:
  64. return 1920, 1080
  65. """
  66. 视频裁剪
  67. """
  68. @classmethod
  69. def video_crop(cls, new_video_path, video_path_url, pw_random_id):
  70. crop_url = video_path_url + str(pw_random_id) + 'crop.mp4'
  71. # 获取视频的原始宽高信息
  72. ffprobe_cmd = cls.asyncio_run_subprocess(
  73. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  74. "csv=p=0", new_video_path], timeout=10)
  75. width, height = map(int, ffprobe_cmd.strip().split(','))
  76. # 计算裁剪后的高度
  77. new_height = int(height * 0.8)
  78. # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
  79. cls.asyncio_run_subprocess(
  80. [
  81. "ffmpeg",
  82. "-i", new_video_path,
  83. "-vf", f"crop={width}:{new_height}",
  84. "-c:v", "libx264",
  85. "-c:a", "aac",
  86. "-y",
  87. crop_url
  88. ], timeout=240)
  89. return crop_url
  90. """
  91. 视频截断
  92. """
  93. @classmethod
  94. def video_ggduration(cls, new_video_path, video_path_url, pw_random_id, gg_duration_total):
  95. gg_duration_url = video_path_url + str(pw_random_id) + 'gg_duration.mp4'
  96. # 获取视频时长
  97. total_duration = cls.get_video_duration(new_video_path)
  98. if total_duration == 0:
  99. return gg_duration_url
  100. duration = int(total_duration) - int(gg_duration_total)
  101. if int(total_duration) < int(gg_duration_total):
  102. return gg_duration_url
  103. cls.asyncio_run_subprocess([
  104. "ffmpeg",
  105. "-i", new_video_path,
  106. "-c:v", "libx264",
  107. "-c:a", "aac",
  108. "-t", str(duration),
  109. "-y",
  110. gg_duration_url
  111. ], timeout=360)
  112. return gg_duration_url
  113. """
  114. 截取原视频最后一帧
  115. """
  116. @classmethod
  117. def video_png(cls, new_video_path, video_path_url, pw_random_id):
  118. """
  119. jpg_url 生成图片位置
  120. :param new_video_path: 视频地址
  121. :return:
  122. """
  123. # 获取视频的原始宽高信息
  124. jpg_url = video_path_url + str(pw_random_id) + 'png.jpg'
  125. # 获取视频时长
  126. cls.asyncio_run_subprocess(
  127. ["ffmpeg", "-sseof", "-1", '-i', new_video_path, '-frames:v', '1', "-y", jpg_url], timeout=120)
  128. return jpg_url
  129. """
  130. 获取视频音频
  131. """
  132. @classmethod
  133. def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
  134. pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
  135. cls.asyncio_run_subprocess([
  136. 'ffmpeg',
  137. '-i', video_file,
  138. '-q:a', '0',
  139. '-map', 'a',
  140. pw_mp3_path
  141. ], timeout=120)
  142. time.sleep(1)
  143. return pw_mp3_path
  144. """横屏视频改为竖屏"""
  145. @classmethod
  146. def update_video_h_w(cls, new_video_path, video_path_url, pw_random_id):
  147. video_h_w_path = video_path_url + str(pw_random_id) +'video_h_w_video.mp4'
  148. cls.asyncio_run_subprocess(
  149. ["ffmpeg", "-i", new_video_path, "-vf", "scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2",
  150. video_h_w_path], timeout=420)
  151. return video_h_w_path
  152. """横屏视频顶部增加字幕"""
  153. @classmethod
  154. def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
  155. single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
  156. single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
  157. single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
  158. duration = cls.get_video_duration(new_video_path)
  159. if duration == 0:
  160. return new_video_path
  161. start_time = cls.seconds_to_srt_time(0)
  162. end_time = cls.seconds_to_srt_time(duration)
  163. # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
  164. with open(single_video_txt, 'w') as f:
  165. f.write(f"file '{new_video_path}'\n")
  166. with open(single_video_srt, 'w') as f:
  167. f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
  168. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
  169. draw = f"{subtitle_cmd}"
  170. cls.asyncio_run_subprocess([
  171. "ffmpeg",
  172. "-f", "concat",
  173. "-safe", "0",
  174. "-i", single_video_txt,
  175. "-c:v", "libx264",
  176. "-c:a", "aac",
  177. "-vf", draw,
  178. "-y",
  179. single_video
  180. ], timeout=500)
  181. return single_video
  182. """
  183. 生成片尾视频
  184. """
  185. @classmethod
  186. def pw_video(cls, jpg_url, video_path_url, pw_srt, pw_random_id, pw_mp3_path):
  187. # 添加音频到图片
  188. """
  189. jpg_url 图片地址
  190. pw_video 提供的片尾视频
  191. pw_duration 提供的片尾视频时长
  192. new_video_path 视频位置
  193. subtitle_cmd 字幕
  194. pw_url 生成视频地址
  195. :return:
  196. """
  197. pw_srt_path = video_path_url + str(pw_random_id) +'pw_video.srt'
  198. # 创建临时字幕文件
  199. with open(pw_srt_path, 'w') as f:
  200. f.write(pw_srt)
  201. # 片尾位置
  202. pw_url_path = video_path_url + str(pw_random_id) + 'pw_video.mp4'
  203. # 获取视频时长
  204. pw_duration = cls.get_mp3_duration(pw_mp3_path)
  205. if pw_duration == 0:
  206. return pw_url_path
  207. time.sleep(2)
  208. # 添加字幕 wqy-zenhei Hiragino Sans GB
  209. height = 1080
  210. margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
  211. subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
  212. bg_position_offset = (int(360) - 360//8) / 1.75
  213. background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
  214. cls.asyncio_run_subprocess([
  215. 'ffmpeg',
  216. '-loop', '1',
  217. '-i', jpg_url, # 输入的图片文件
  218. '-i', pw_mp3_path, # 输入的音频文件
  219. '-c:v', 'libx264', # 视频编码格式
  220. '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
  221. '-pix_fmt', 'yuv420p', # 像素格式
  222. '-c:a', 'aac', # 音频编码格式
  223. '-strict', 'experimental', # 使用实验性编码器
  224. '-shortest', # 确保输出视频的长度与音频一致
  225. '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
  226. pw_url_path # 输出的视频文件路径
  227. ], timeout=500)
  228. if os.path.exists(pw_srt_path):
  229. os.remove(pw_srt_path)
  230. return pw_url_path
  231. """视频拼接到一起"""
  232. @classmethod
  233. def h_b_video(cls, video_path, pw_path, file_path):
  234. video_url = file_path + 'hbvideo.mp4'
  235. try:
  236. cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-i", pw_path, "-filter_complex",
  237. "[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]",
  238. "-map", "[outv]", "-map", "[outa]", video_url], timeout=500)
  239. return video_url
  240. except Exception as e:
  241. return video_url
  242. """
  243. 单个视频拼接
  244. """
  245. @classmethod
  246. def single_video(cls, new_video_path, video_path_url, zm):
  247. single_video_url = video_path_url + 'single_video.mp4'
  248. single_video_srt = video_path_url + 'single_video.srt'
  249. # 获取时长
  250. duration = cls.get_video_duration(new_video_path)
  251. if duration == 0:
  252. return single_video_url
  253. start_time = cls.seconds_to_srt_time(2)
  254. end_time = cls.seconds_to_srt_time(duration)
  255. single_video_txt = video_path_url + 'single_video.txt'
  256. with open(single_video_txt, 'w') as f:
  257. f.write(f"file '{new_video_path}'\n")
  258. if zm:
  259. with open(single_video_srt, 'w') as f:
  260. f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
  261. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  262. else:
  263. subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  264. # 多线程数
  265. num_threads = 5
  266. # 构建 FFmpeg 命令,生成视频
  267. cls.asyncio_run_subprocess([
  268. "ffmpeg",
  269. "-f", "concat",
  270. "-safe", "0",
  271. "-i", f"{single_video_txt}",
  272. "-c:v", "libx264",
  273. "-c:a", "aac",
  274. '-b:v', '260k',
  275. "-b:a", "96k",
  276. "-threads", str(num_threads),
  277. "-vf", subtitle_cmd,
  278. "-y",
  279. single_video_url
  280. ], timeout=400)
  281. if os.path.exists(single_video_srt):
  282. os.remove(single_video_srt)
  283. return single_video_url
  284. @classmethod
  285. def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
  286. async def run_subprocess():
  287. process = await asyncio.create_subprocess_exec(
  288. params[0],
  289. *params[1:],
  290. stdout=asyncio.subprocess.PIPE,
  291. stderr=asyncio.subprocess.PIPE,
  292. )
  293. try:
  294. out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
  295. if process.returncode != 0:
  296. raise IOError(err)
  297. return out.decode()
  298. except asyncio.TimeoutError:
  299. process.kill()
  300. out, err = await process.communicate()
  301. raise IOError(err)
  302. return asyncio.run(run_subprocess())
  303. if __name__ == '__main__':
  304. new_video_path = '/Users/tzld/Desktop/video_rewriting/path/output1.mp4'
  305. video_path_url = '/Users/tzld/Desktop/video_rewriting/path/'
  306. zm = '温馨提示:下方按钮可分享到群'
  307. FFmpeg.single_video(new_video_path, video_path_url, zm)