ffmpeg.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import asyncio
  2. import os
  3. import subprocess
  4. import time
  5. from typing import List
  6. import cv2
  7. from loguru import logger
  8. from mutagen.mp3 import MP3
  9. class FFmpeg():
  10. """获取mp3时长"""
  11. @classmethod
  12. def get_mp3_duration(cls, file_path):
  13. audio = MP3(file_path)
  14. duration = audio.info.length
  15. if duration:
  16. return int(duration)
  17. return 0
  18. """
  19. 时间转换
  20. """
  21. @classmethod
  22. def seconds_to_srt_time(cls, seconds):
  23. hours = int(seconds // 3600)
  24. minutes = int((seconds % 3600) // 60)
  25. seconds = seconds % 60
  26. milliseconds = int((seconds - int(seconds)) * 1000)
  27. return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
  28. """
  29. 获取单个视频时长
  30. """
  31. @classmethod
  32. def get_video_duration(cls, video_url):
  33. cap = cv2.VideoCapture(video_url)
  34. if cap.isOpened():
  35. rate = cap.get(5)
  36. frame_num = cap.get(7)
  37. duration = int(frame_num / rate)
  38. return duration
  39. return 0
  40. """视频转为640像素"""
  41. @classmethod
  42. def video_640(cls, video_path, file_path):
  43. video_url = file_path + 'pixelvideo.mp4'
  44. try:
  45. cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-vf", "scale=360:640", video_url], timeout=420)
  46. logger.info(f"[+] 视频转为640像素成功")
  47. return video_url
  48. except Exception as e:
  49. return video_url
  50. """
  51. 获取视频宽高
  52. """
  53. @classmethod
  54. def get_w_h_size(cls, new_video_path):
  55. try:
  56. # 获取视频的原始宽高信息
  57. ffprobe_cmd = cls.asyncio_run_subprocess(
  58. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  59. "csv=p=0", new_video_path], timeout=10)
  60. output_decoded = ffprobe_cmd.strip()
  61. split_output = [value for value in output_decoded.split(',') if value.strip()]
  62. height, width = map(int, split_output)
  63. logger.info(f"[+] 获取视频宽高成功")
  64. return width, height
  65. except ValueError as e:
  66. return 1920, 1080
  67. """
  68. 视频裁剪
  69. """
  70. @classmethod
  71. def video_crop(cls, new_video_path, video_path_url, pw_random_id):
  72. crop_url = video_path_url + str(pw_random_id) + 'crop.mp4'
  73. # 获取视频的原始宽高信息
  74. ffprobe_cmd = cls.asyncio_run_subprocess(
  75. ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
  76. "csv=p=0", new_video_path], timeout=10)
  77. width, height = map(int, ffprobe_cmd.strip().split(','))
  78. # 计算裁剪后的高度
  79. new_height = int(height * 0.8)
  80. # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
  81. cls.asyncio_run_subprocess(
  82. [
  83. "ffmpeg",
  84. "-i", new_video_path,
  85. "-vf", f"crop={width}:{new_height}",
  86. "-c:v", "libx264",
  87. "-c:a", "aac",
  88. "-y",
  89. crop_url
  90. ], timeout=200)
  91. logger.info(f"[+] 视频裁剪成功")
  92. return crop_url
  93. """
  94. 视频截断
  95. """
  96. @classmethod
  97. def video_ggduration(cls, new_video_path, video_path_url, pw_random_id, gg_duration_total):
  98. gg_duration_url = video_path_url + str(pw_random_id) + 'gg_duration.mp4'
  99. # 获取视频时长
  100. total_duration = cls.get_video_duration(new_video_path)
  101. if total_duration == 0:
  102. return gg_duration_url
  103. duration = int(total_duration) - int(gg_duration_total)
  104. if int(total_duration) < int(gg_duration_total):
  105. return gg_duration_url
  106. cls.asyncio_run_subprocess([
  107. "ffmpeg",
  108. "-i", new_video_path,
  109. "-c:v", "libx264",
  110. "-c:a", "aac",
  111. "-t", str(duration),
  112. "-y",
  113. gg_duration_url
  114. ], timeout=200)
  115. logger.info(f"[+] 视频截断成功")
  116. return gg_duration_url
  117. """
  118. 截取原视频最后一帧
  119. """
  120. @classmethod
  121. def video_png(cls, new_video_path, video_path_url, pw_random_id):
  122. """
  123. jpg_url 生成图片位置
  124. :param new_video_path: 视频地址
  125. :return:
  126. """
  127. # 获取视频的原始宽高信息
  128. jpg_url = video_path_url + str(pw_random_id) + 'png.jpg'
  129. # 获取视频时长
  130. cls.asyncio_run_subprocess(
  131. ["ffmpeg", "-sseof", "-1", '-i', new_video_path, '-frames:v', '1', "-y", jpg_url], timeout=120)
  132. logger.info(f"[+] 截取原视频最后一帧成功")
  133. return jpg_url
  134. """
  135. 获取视频音频
  136. """
  137. @classmethod
  138. def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
  139. pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
  140. cls.asyncio_run_subprocess([
  141. 'ffmpeg',
  142. '-i', video_file,
  143. '-q:a', '0',
  144. '-map', 'a',
  145. pw_mp3_path
  146. ], timeout=120)
  147. time.sleep(1)
  148. logger.info(f"[+] 获取视频音频成功")
  149. return pw_mp3_path
  150. """横屏视频改为竖屏"""
  151. @classmethod
  152. def update_video_h_w(cls, new_video_path, video_path_url, pw_random_id):
  153. video_h_w_path = video_path_url + str(pw_random_id) +'video_h_w_video.mp4'
  154. cls.asyncio_run_subprocess(
  155. ["ffmpeg", "-i", new_video_path, "-vf", "scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2",
  156. video_h_w_path], timeout=360)
  157. logger.info(f"[+] 横屏视频改为竖屏成功")
  158. return video_h_w_path
  159. """横屏视频顶部增加字幕"""
  160. @classmethod
  161. def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
  162. single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
  163. single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
  164. single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
  165. duration = cls.get_video_duration(new_video_path)
  166. if duration == 0:
  167. return new_video_path
  168. start_time = cls.seconds_to_srt_time(0)
  169. end_time = cls.seconds_to_srt_time(duration)
  170. # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
  171. with open(single_video_txt, 'w') as f:
  172. f.write(f"file '{new_video_path}'\n")
  173. with open(single_video_srt, 'w') as f:
  174. f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
  175. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
  176. draw = f"{subtitle_cmd}"
  177. cls.asyncio_run_subprocess([
  178. "ffmpeg",
  179. "-f", "concat",
  180. "-safe", "0",
  181. "-i", single_video_txt,
  182. "-c:v", "libx264",
  183. "-c:a", "aac",
  184. "-vf", draw,
  185. "-y",
  186. single_video
  187. ], timeout=500)
  188. logger.info(f"[+] 横屏视频顶部增加字幕成功")
  189. return single_video
  190. """
  191. 生成片尾视频
  192. """
  193. @classmethod
  194. def pw_video(cls, jpg_url, video_path_url, pw_srt, pw_random_id, pw_mp3_path):
  195. # 添加音频到图片
  196. """
  197. jpg_url 图片地址
  198. pw_video 提供的片尾视频
  199. pw_duration 提供的片尾视频时长
  200. new_video_path 视频位置
  201. subtitle_cmd 字幕
  202. pw_url 生成视频地址
  203. :return:
  204. """
  205. pw_srt_path = video_path_url + str(pw_random_id) +'pw_video.srt'
  206. # 创建临时字幕文件
  207. with open(pw_srt_path, 'w') as f:
  208. f.write(pw_srt)
  209. # 片尾位置
  210. pw_url_path = video_path_url + str(pw_random_id) + 'pw_video.mp4'
  211. # 获取视频时长
  212. pw_duration = cls.get_mp3_duration(pw_mp3_path)
  213. if pw_duration == 0:
  214. return pw_url_path
  215. time.sleep(2)
  216. # 添加字幕 wqy-zenhei Hiragino Sans GB
  217. height = 1080
  218. margin_v = int(height) // 8 # 可根据需要调整字幕和背景之间的距离
  219. subtitle_cmd = f"subtitles={pw_srt_path}:force_style='Fontsize=13,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000,Bold=1,MarginV={margin_v}'"
  220. bg_position_offset = (int(360) - 360//8) / 1.75
  221. background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
  222. cls.asyncio_run_subprocess([
  223. 'ffmpeg',
  224. '-loop', '1',
  225. '-i', jpg_url, # 输入的图片文件
  226. '-i', pw_mp3_path, # 输入的音频文件
  227. '-c:v', 'libx264', # 视频编码格式
  228. '-t', str(pw_duration), # 输出视频的持续时间,与音频持续时间相同
  229. '-pix_fmt', 'yuv420p', # 像素格式
  230. '-c:a', 'aac', # 音频编码格式
  231. '-strict', 'experimental', # 使用实验性编码器
  232. '-shortest', # 确保输出视频的长度与音频一致
  233. '-vf', f"{background_cmd},{subtitle_cmd}", # 视频过滤器,设置分辨率和其他过滤器
  234. pw_url_path # 输出的视频文件路径
  235. ], timeout=300)
  236. if os.path.exists(pw_srt_path):
  237. os.remove(pw_srt_path)
  238. logger.info(f"[+] 生成片尾视频成功")
  239. return pw_url_path
  240. """视频拼接到一起"""
  241. @classmethod
  242. def h_b_video(cls, video_path, pw_path, file_path):
  243. video_url = file_path + 'hbvideo.mp4'
  244. try:
  245. cls.asyncio_run_subprocess(["ffmpeg", "-i", video_path, "-i", pw_path, "-filter_complex",
  246. "[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]",
  247. "-map", "[outv]", "-map", "[outa]", video_url], timeout=500)
  248. logger.info(f"[+] 视频拼接成功")
  249. return video_url
  250. except Exception as e:
  251. return video_url
  252. """
  253. 单个视频拼接
  254. """
  255. @classmethod
  256. def single_video(cls, new_video_path, video_path_url, zm):
  257. single_video_url = video_path_url + 'single_video.mp4'
  258. single_video_srt = video_path_url + 'single_video.srt'
  259. # 获取时长
  260. duration = cls.get_video_duration(new_video_path)
  261. if duration == 0:
  262. return single_video_url
  263. start_time = cls.seconds_to_srt_time(2)
  264. end_time = cls.seconds_to_srt_time(duration)
  265. single_video_txt = video_path_url + 'single_video.txt'
  266. with open(single_video_txt, 'w') as f:
  267. f.write(f"file '{new_video_path}'\n")
  268. if zm:
  269. with open(single_video_srt, 'w') as f:
  270. f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
  271. subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  272. else:
  273. subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
  274. # 多线程数
  275. num_threads = 5
  276. # 构建 FFmpeg 命令,生成视频
  277. cls.asyncio_run_subprocess([
  278. "ffmpeg",
  279. "-f", "concat",
  280. "-safe", "0",
  281. "-i", f"{single_video_txt}",
  282. "-c:v", "libx264",
  283. "-c:a", "aac",
  284. '-b:v', '260k',
  285. "-b:a", "96k",
  286. "-threads", str(num_threads),
  287. "-vf", subtitle_cmd,
  288. "-y",
  289. single_video_url
  290. ], timeout=400)
  291. if os.path.exists(single_video_srt):
  292. os.remove(single_video_srt)
  293. return single_video_url
  294. @classmethod
  295. def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
  296. async def run_subprocess():
  297. process = await asyncio.create_subprocess_exec(
  298. params[0],
  299. *params[1:],
  300. stdout=asyncio.subprocess.PIPE,
  301. stderr=asyncio.subprocess.PIPE,
  302. )
  303. try:
  304. out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
  305. if process.returncode != 0:
  306. raise IOError(err)
  307. return out.decode()
  308. except asyncio.TimeoutError:
  309. process.kill()
  310. out, err = await process.communicate()
  311. raise IOError(err)
  312. return asyncio.run(run_subprocess())
  313. if __name__ == '__main__':
  314. new_video_path = '/Users/tzld/Desktop/video_rewriting/path/output1.mp4'
  315. video_path_url = '/Users/tzld/Desktop/video_rewriting/path/'
  316. zm = '温馨提示:下方按钮可分享到群'
  317. FFmpeg.single_video(new_video_path, video_path_url, zm)