123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import configparser
- import glob
- import os
- import random
- import re
- import subprocess
- import sys
- import time
- import urllib.parse
- import json
- import requests
- from datetime import datetime, timedelta
- from urllib.parse import urlencode
- class FFmpeg():
- """
- 获取单个视频时长
- """
- @classmethod
- def get_video_duration(cls, video_url):
- ffprobe_cmd = [
- "ffprobe",
- "-i", video_url,
- "-show_entries", "format=duration",
- "-v", "quiet",
- "-of", "csv=p=0"
- ]
- output = subprocess.check_output(ffprobe_cmd).decode("utf-8").strip()
- return float(output)
- """
- 获取视频文件的时长(秒)
- """
- @classmethod
- def get_videos_duration(cls, video_file):
- result = subprocess.run(
- ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- "-of", "default=noprint_wrappers=1:nokey=1", video_file],
- capture_output=True, text=True)
- return float(result.stdout)
- """
- 视频裁剪
- """
- @classmethod
- def video_tailor(cls, video_url):
- output_video_path = ''
- # 获取视频的原始宽高信息
- ffprobe_cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=p=0 {video_url}"
- ffprobe_process = subprocess.Popen(ffprobe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- output, _ = ffprobe_process.communicate()
- width, height = map(int, output.decode().strip().split(','))
- # 计算裁剪后的高度
- new_height = int(height * 0.7)
- # 构建 FFmpeg 命令,裁剪视频高度为原始高度的70%,并将宽度缩放为320x480
- ffmpeg_cmd = [
- "ffmpeg",
- "-i", video_url,
- "-vf", f"crop={width}:{new_height},scale=320:480",
- "-c:v", "libx264",
- "-c:a", "aac",
- "-y",
- output_video_path
- ]
- # 执行 FFmpeg 命令
- subprocess.run(ffmpeg_cmd)
- return output_video_path
- """
- 截取原视频最后一帧
- """
- @classmethod
- def video_png(cls,video_url):
- """
- png 生成图片位置
- :param video_url: 视频地址
- :return:
- """
- png = ''
- # 获取视频时长
- total_duration = cls.get_video_duration(video_url)
- time_offset = total_duration - 1 # 提取倒数第一秒的帧
- # 获取视频最后一秒,生成.jpg
- subprocess.run(
- ['ffmpeg', '-ss', str(time_offset), '-i', video_url, '-t', str(total_duration), '-vf', 'fps=1', png])
- return png
- """
- 生成片尾视频
- """
- @classmethod
- def new_pw_video(cls):
- # 添加音频到图片
- """
- png_path 图片地址
- pw_video 提供的片尾视频
- pw_duration 提供的片尾视频时长
- background_cmd 视频位置
- subtitle_cmd 字幕
- pw_path 生成视频地址
- :return:
- """
- ffmpeg_cmd = ['ffmpeg', '-loop', '1', '-i', png_path, '-i', pw_video, '-c:v', 'libx264', '-t',
- str(pw_duration), '-pix_fmt', 'yuv420p', '-c:a', 'aac', '-strict', 'experimental', '-shortest',
- '-vf', f"scale=320x480,{background_cmd},{subtitle_cmd}", pw_path]
- subprocess.run(ffmpeg_cmd)
- return pw_path
- """
- 设置统一格式拼接视频
- """
- @classmethod
- def concatenate_videos(cls, video_files, concatenated_video_path):
- # 拼接视频
- VIDEO_COUNTER = 0
- FF_INPUT = ""
- FF_SCALE = ""
- FF_FILTER = ""
- ffmpeg_cmd = ["ffmpeg"]
- for videos in video_files:
- # 添加输入文件
- FF_INPUT += f" -i {videos}"
- # 为每个视频文件统一长宽,并设置SAR(采样宽高比)
- FF_SCALE += f"[{VIDEO_COUNTER}:v]scale=320x480,setsar=1[v{VIDEO_COUNTER}];"
- # 为每个视频文件创建一个输入流,并添加到-filter_complex参数中
- FF_FILTER += f"[v{VIDEO_COUNTER}][{VIDEO_COUNTER}:a]"
- # 增加视频计数器
- VIDEO_COUNTER += 1
- # 构建最终的FFmpeg命令
- ffmpeg_cmd.extend(FF_INPUT.split())
- ffmpeg_cmd.extend(["-filter_complex", f"{FF_SCALE}{FF_FILTER}concat=n={VIDEO_COUNTER}:v=1:a=1[v][a]",
- "-map", "[v]", "-map", "[a]", concatenated_video_path])
- subprocess.run(ffmpeg_cmd)
- return concatenated_video_path
- """
- 单个视频拼接
- """
- @classmethod
- def single_video(cls):
- text_ptah = cls.bk_text_folders(mark)
- video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark)
- with open(text_ptah, 'w') as f:
- for file in video_files:
- f.write(f"file '{file}'\n")
- if video_files == "":
- return ""
- if os.path.exists(s_path):
- # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- else:
- # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15"
- subtitle_cmd = "drawtext=text='分享、转发给群友':x=(w-text_w)/2:y=h-text_h-15:fontsize=28:fontcolor=black:fontfile=/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc"
- # 背景色参数
- background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill"
- # 多线程数
- num_threads = 4
- # 构建 FFmpeg 命令,生成视频
- ffmpeg_cmd_oss = [
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{text_ptah}", # 视频文件列表
- "-i", audio_video, # 音频文件
- "-c:v", "libx264",
- "-c:a", "aac",
- "-threads", str(num_threads),
- "-vf", f"scale=320x480,{background_cmd},{subtitle_cmd}", # 添加背景色和字幕
- "-t", str(int(audio_duration)), # 保持与音频时长一致
- "-map", "0:v:0", # 映射第一个输入的视频流
- "-map", "1:a:0", # 映射第二个输入的音频流
- "-y", # 覆盖输出文件
- v_oss_path
- ]
- try:
- subprocess.run(ffmpeg_cmd_oss)
- print("视频处理完成!")
- if os.path.isfile(text_ptah):
- os.remove(text_ptah)
- except subprocess.CalledProcessError as e:
- print(f"视频处理失败:{e}")
- print(f"{mark}:视频拼接成功啦~~~")
- return video_files
- # 计算需要拼接的视频
- @classmethod
- def concat_videos_with_subtitles(cls, videos, audio_duration, platform, mark):
- # 计算视频文件列表总时长
- if platform == "baokuai":
- total_video_duration = sum(cls.get_video_duration(video_file) for video_file in videos)
- else:
- total_video_duration = sum(cls.get_video_duration(video_file[3]) for video_file in videos)
- if platform == "koubo" or platform == "zhannei" or platform == "baokuai":
- # 视频时长大于音频时长
- if total_video_duration > audio_duration:
- return videos
- # 计算音频秒数与视频秒数的比率,然后加一得到需要的视频数量
- video_audio_ratio = audio_duration / total_video_duration
- videos_needed = int(video_audio_ratio) + 2
- trimmed_video_list = videos * videos_needed
- return trimmed_video_list
- else:
- # 如果视频总时长小于音频时长,则不做拼接
- if total_video_duration < audio_duration:
- return ""
- # 如果视频总时长大于音频时长,则截断视频
- trimmed_video_list = []
- remaining_duration = audio_duration
- for video_file in videos:
- video_duration = cls.get_video_duration(video_file[3])
- if video_duration <= remaining_duration:
- # 如果视频时长小于或等于剩余时长,则将整个视频添加到列表中
- trimmed_video_list.append(video_file)
- remaining_duration -= video_duration
- else:
- trimmed_video_list.append(video_file)
- break
- return trimmed_video_list
|