# -*- coding: utf-8 -*- # @Time: 2023/12/26 import datetime import random import os import re import sys import time import resource import requests import urllib.parse sys.path.append(os.getcwd()) from datetime import datetime from common import Feishu from common.aliyun_oss_uploading import Oss from common.common import Common from common.db import MysqlHelper from common.material import Material from moviepy.editor import concatenate_videoclips from moviepy.video.io.VideoFileClip import VideoFileClip from moviepy import editor from moviepy.video.compositing.concatenate import concatenate output_path = "./video_stitching/video/new_video.mp4" class VideoStitching(): @classmethod def split_text(cls, text, max_length): words = text.split(' ') lines = [] current_line = '' for word in words: if len(current_line) + len(word) <= max_length: current_line += word + ' ' else: lines.append(current_line.strip()) current_line = word + ' ' lines.append(current_line.strip()) result = ''.join(lines) result = result[:11] + '\n' + result[11:] # 在第10个字后面增加换行 return result @classmethod def srt_to_seconds(cls,srt_time): hours, minutes, seconds = map(float, srt_time.replace(',', '.').split(':')) return hours * 3600 + minutes * 60 + seconds @classmethod def insert_videoAudio(cls, audio_url, i, channel_type): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d") if channel_type == "douyin": video_type = 0 elif channel_type == "kspinjie": video_type = 5 elif channel_type == "dypinjie": video_type = 6 else: video_type = 2 for j in audio_url: insert_sql = f"""INSERT INTO video_audio (audio, video_id, account_id, oss_object_key, time, video_type) values ('{i}', '{j[0]}', '{j[1]}', '{j[2]}', '{formatted_time}', {video_type})""" MysqlHelper.update_values( sql=insert_sql, env="prod", machine="", ) @classmethod def insert_video_typeAudio(cls, video, audio_id, channel_type): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d") if channel_type == "jieri": video_type = 3 else: video_type = 1 insert_sql = f"""INSERT INTO video_audio (audio, account_id, oss_object_key, time, video_type) values ('{audio_id}', '{video}', '{video}', '{formatted_time}', {video_type})""" MysqlHelper.update_values( sql=insert_sql, env="prod", machine="", ) # 随机生成id @classmethod def random_id(cls): now = datetime.now() rand_num = random.randint(10000, 99999) id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num) return id @classmethod def get_account_id(cls, channel_type): account_id = f"""select account_id from video_url where oss_object_key LIKE '%{channel_type}%' group by account_id ;""" account_id = MysqlHelper.get_values(account_id, "prod") return account_id @classmethod def get_audio_list(cls): audio_list = f"""select oss_object_key from video_koubo_url ;""" audio_list = MysqlHelper.get_values(audio_list, "prod") return audio_list @classmethod def get_url_list(cls, audio_id, account): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d") url_list = f"""SELECT a.video_id,a.account_id,a.oss_object_key FROM video_url a WHERE NOT EXISTS ( SELECT video_id FROM video_audio b WHERE a.oss_object_key = b.oss_object_key AND b.time = '{formatted_time}' ) AND a.account_id = {account} and a.`status` = 1 limit 35;""" url_list = MysqlHelper.get_values(url_list, "prod") return url_list @classmethod def get_zizhi_url_list(cls, account): url_list = f"""SELECT video_id,account_id,oss_object_key FROM video_url where account_id = {account} limit 35;""" url_list = MysqlHelper.get_values(url_list, "prod") return url_list @classmethod def get_audio_url_list(cls, videos, audio_id): url_list = f"""SELECT audio FROM video_audio where audio = {audio_id} and oss_object_key = '{videos}';""" url_list = MysqlHelper.get_values(url_list, "prod") return url_list # 新生成视频上传到对应账号下 @classmethod def insert_piaoquantv(cls, oss_object_key, title, video_type, channel_type): #list = ["66481136", "66481137", "66481140", "66481141", "66481142"] 老用户id if video_type == "自制--春节": list = ['15924999', '50322241', '50322258', '57463797', '50322235', '57463790', '50322234', '6605563', '18981907', '50322198', '50322239', '57463838', '14500202'] code = 1 for i in range(len(list)): # title_list = [item for item in title_list if item is not None] # title = random.choice(title_list) # title = title[0].strip("[]'") url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send" payload = dict(pageSource='vlog-pages/post/post-video-post', videoPath=oss_object_key, width='720', height='1280', fileExtensions='mp4', viewStatus='1', title=title, careModelStatus='1', token='f04f58d6e664cbc9902660a1e8d20ce6cd7fdb0f', loginUid=list[i], versionCode='719', machineCode='weixin_openid_o0w175aZ4FJtqVsA1tcozJDJHdDU', appId='wx89e7eb06478361d7', clientTimestamp='1703337579331', machineInfo='{"sdkVersion":"3.2.5","brand":"iPhone","language":"zh_CN","model":"iPhone 12 Pro","platform":"ios","system":"iOS 15.6.1","weChatVersion":"8.0.44","screenHeight":844,"screenWidth":390,"pixelRatio":3,"windowHeight":762,"windowWidth":390,"softVersion":"4.1.719"}', sessionId='1703337560040-27bfe208-a389-f476-db1d-840681e04b32', subSessionId='1703337569952-8f56d53c-b36d-760e-8abe-0b4a027cd5bd', senceType='1089', hotSenceType='1089', id='1050', channel='pq') payload['videoPath'] = oss_object_key payload['title'] = title data = urllib.parse.urlencode(payload) headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.44(0x18002c2d) NetType/WIFI Language/zh_CN', 'Accept-Encoding': 'gzip,compress,br,deflate', 'Referer': 'https://servicewechat.com/wx89e7eb06478361d7/726/page-frame.html', 'Content-Type': 'application/x-www-form-urlencoded', 'Cookie': 'JSESSIONID=A60D96E7A300A25EA05425B069C8B459' } response = requests.post(url, data=data, headers=headers) data = response.json() code = data["code"] if code == 0: return True else: return False else: if video_type == "口播--美文类": list = ["67231152", "67231153", "67231154", "67231155", "67231157"] elif channel_type == "douyin": list = ["67231113", "67231112", "67231111", "67231110", "67231109"] elif channel_type == "dypinjie": list = ["68276195", "68276196", "68276197", "68276198", "68276199"] elif channel_type == "kspinjie": list = ["68276262", "68276263", "68276264", "68276265", "68276267"] else: list = ["67413406", "67413407", "67413408", "67413409", "67413410"] code = 1 for i in range(2): item = random.choice(list) # title = title[0].strip("[]'") url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send" payload = dict(pageSource='vlog-pages/post/post-video-post', videoPath=oss_object_key, width='720', height='1280', fileExtensions='mp4', viewStatus='1', title=title, careModelStatus='1', token='f04f58d6e664cbc9902660a1e8d20ce6cd7fdb0f', loginUid=item, versionCode='719', machineCode='weixin_openid_o0w175aZ4FJtqVsA1tcozJDJHdDU', appId='wx89e7eb06478361d7', clientTimestamp='1703337579331', machineInfo='{"sdkVersion":"3.2.5","brand":"iPhone","language":"zh_CN","model":"iPhone 12 Pro","platform":"ios","system":"iOS 15.6.1","weChatVersion":"8.0.44","screenHeight":844,"screenWidth":390,"pixelRatio":3,"windowHeight":762,"windowWidth":390,"softVersion":"4.1.719"}', sessionId='1703337560040-27bfe208-a389-f476-db1d-840681e04b32', subSessionId='1703337569952-8f56d53c-b36d-760e-8abe-0b4a027cd5bd', senceType='1089', hotSenceType='1089', id='1050', channel='pq') payload['videoPath'] = oss_object_key payload['title'] = title data = urllib.parse.urlencode(payload) headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.44(0x18002c2d) NetType/WIFI Language/zh_CN', 'Accept-Encoding': 'gzip,compress,br,deflate', 'Referer': 'https://servicewechat.com/wx89e7eb06478361d7/726/page-frame.html', 'Content-Type': 'application/x-www-form-urlencoded', 'Cookie': 'JSESSIONID=A60D96E7A300A25EA05425B069C8B459' } response = requests.post(url, data=data, headers=headers) data = response.json() code = data["code"] if code == 0: return True else: return False # 视频拼接 @classmethod def concatenate_videos(cls, videos, audio, srt, type_audio): clips = [] total_duration = 0 included_videos = [] # 设置最大可使用的内存限制(单位:字节) memory_limit = 6 * 1024 * 1024 * 1024 # 6GB resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit)) # 提取视频的音频 Common.logger("video").info(f"开始提取视频的音频{audio}") video1 = VideoFileClip(audio) mp3 = video1.audio Common.logger("video").info(f"提取视频的音频成功") # 获取音频时长(以秒为单位) duration_limit = mp3.duration if type_audio == "口播--美文类": # 读取视频文件,获取视频时长 clip = f"./video_stitching/video_material/koubo.mp4" clip = VideoFileClip(clip) video_duration = clip.duration # 计算需要循环播放的视频个数 n_videos = int(duration_limit / video_duration) + 1 # 循环生成视频剪辑 video_clips = [clip] * n_videos # 连接视频剪辑 final_clip = concatenate(video_clips, method="compose") # 设置最终剪辑的时长与音频时长一致 final_clip = final_clip.set_duration(duration_limit) else: # 遍历每个视频并计算总时长 for i, video in enumerate(videos): filename = video[2].split("/")[-1] clip = VideoFileClip(f'./video_stitching/video_material/{filename}.mp4') clips.append(clip) total_duration += clip.duration if total_duration >= duration_limit: break # 如果总时长小于等于目标时长,则不做视频拼接 if total_duration <= duration_limit: Common.logger("video").info(f"时长小于等于目标时长,不做视频拼接") # 关闭视频文件 for clip in clips: clip.close() for video in videos: filename = video[2].split("/")[-1] os.remove(f'./video_stitching/video_material/{filename}.mp4') return "" else: Common.logger("video").info(f"总时长大于目标时长") remaining_time = duration_limit final_clips = [] for clip, video in zip(clips, videos): if remaining_time - clip.duration >= 0: final_clips.append(clip) included_videos.append(video) remaining_time -= clip.duration else: # 如果剩余时间不足以加入下一个视频,则截断当前视频并返回已包含的URL final_clips.append(clip.subclip(0, remaining_time)) included_videos.append(video) break final_clip = concatenate_videoclips(final_clips) final_clip = final_clip.set_audio(mp3) # 统一设置视频分辨率 final_width = 320 final_height = 480 final_clip = final_clip.resize((final_width, final_height)) color_clip = editor.ColorClip(size=(final_width, 90), color=(255, 255, 0)).set_duration(duration_limit) final_clip = editor.CompositeVideoClip([final_clip, color_clip.set_position(("center", final_height - 80))]) Common.logger("video").info(f"字幕内容为:{srt}") if srt != None: Common.logger("video").info(f"处理字幕文件") # 使用正则表达式提取时间码和字幕内容 pattern = r"(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]+?(?=\n\d|$))" matches = re.findall(pattern, srt) Common.logger("video").info(f"字幕{matches}") # 获取字幕 subtitle_clips = [] for match in matches: start = editor.cvsecs(cls.srt_to_seconds(match[0])) end = editor.cvsecs(cls.srt_to_seconds(match[1])) text = match[2].strip() text = cls.split_text(text, 10) # 设置背景色 # color_clip = editor.ColorClip(size=(final_width, 90), # color=(255, 255, 0)).set_duration(end - start).set_start(start) # final_clip = editor.CompositeVideoClip( # [final_clip, color_clip.set_position(("center", final_height - 80))]) # /System/Library/Fonts/Hiragino Sans GB.ttc 本地字体 # /usr/share/fonts/truetype/wqy/wqy-zenhei.ttc 服务器地址 Common.logger("video").info(f"字幕:{text}") sub = editor.TextClip(text, font="/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", fontsize=18, color="black").set_duration(end - start).set_start( start).set_position( ("center", final_height - 60)).set_opacity(0.8) subtitle_clips.append(sub) Common.logger("video").info(f"将字幕添加到视频上") # 将字幕添加到视频上 video_with_subtitles = editor.CompositeVideoClip([final_clip] + subtitle_clips) else: Common.logger("video").info(f"添加固定字幕") text_clip = ( editor.TextClip("分享、转发给群友", font="/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", fontsize=30, color="black"). set_position(("center", final_height - 70)). set_duration(duration_limit). set_opacity(0.8) ) # 把 `文本剪贴板` 贴在视频上 video_with_subtitles = editor.CompositeVideoClip([final_clip, text_clip]) # 生成视频 video_with_subtitles.write_videofile(output_path, fps=35) if os.path.isfile(output_path): Common.logger("video").info("视频生成成功!生成路径为:", output_path) return included_videos, video_with_subtitles, clips else: Common.logger("video").info("视频生成失败,请检查代码和文件路径。") return "", video_with_subtitles, clips @classmethod def get_audio_url(cls, i): cookie = Material.get_houtai_cookie() url = f"https://admin.piaoquantv.com/manager/video/detail/{i}" payload = {} headers = { 'authority': 'admin.piaoquantv.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'cookie': cookie, 'pragma': 'no-cache', 'referer': f'https://admin.piaoquantv.com/cms/post-detail/{i}/detail', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' } response = requests.request("GET", url, headers=headers, data=payload) data = response.json() try: code = data["code"] if code != 0: Common.logger("video").info( f"未登录,请更换cookie,{data}") Feishu.bot('recommend', '管理后台', '管理后台cookie失效,请及时更换~') return "" audio_url = data["content"]["transedVideoPath"] audio_title = data["content"]['title'] print(audio_url) return audio_url, audio_title except Exception as e: Common.logger("video").warning(f"获取音频视频链接失败:{e}\n") return "" @classmethod def video_stitching(cls, video_type, count, channel_type): Common.logger("video").info(f"获取音频类型+字幕") # 获取音频类型+字幕 audio_id, srt, title_list = Material.get_audio_type(video_type, count, channel_type) # 获取音频url if audio_id == None: Common.logger("video").info(f"获取音频url为空") return audio, audio_title = cls.get_audio_url(audio_id) if audio == "": Common.logger("video").info(f"获取音频地址为空") return Common.logger("video").info(f"获取音频地址:{audio},获取用户id:{audio_id}") if video_type == "口播--美文类": # 获取已入库的口播视频 audio_list = cls.get_audio_list() videos = [list(item) for item in audio_list] videos = random.choice(videos) #随机选择视频 # 判断该视频+音频是否生成过视频 # video = VideoStitching.get_audio_url_list(videos, audio_id) # if video: # Common.logger("video").info(f"该视频+音频已经生成过视频,不做视频拼接") # return elif video_type == "自制--春节": # 获取已入库的用户id account_id = cls.get_account_id(channel_type) account = random.choice(account_id) account = str(account).replace('(', '').replace(')', '').replace(',', '') Common.logger("video").info(f"获取用户ID:{account}") # 获取视频链接 url_list = cls.get_zizhi_url_list(account) if url_list == None: Common.logger("video").info(f"未使用视频链接为空:{url_list}") return videos = [list(item) for item in url_list] else: # 获取已入库的用户id account_id = cls.get_account_id(channel_type) account = random.choice(account_id) account = str(account).replace('(', '').replace(')', '').replace(',', '') Common.logger("video").info(f"获取用户ID:{account}") # 获取 未使用的视频链接 url_list = cls.get_url_list(audio_id, account) if url_list == None: Common.logger("video").info(f"未使用视频链接为空:{url_list}") return videos = [list(item) for item in url_list] videos = Oss.get_oss_url(video_type, videos) # 视频截取 try: audio_url, video_with_subtitles, clips = cls.concatenate_videos(videos, str(audio), srt, video_type) if len(audio_url) == 0: Common.logger("video").info(f"视频生成失败") # 随机生成视频id id = cls.random_id() Common.logger("video").info(f"生成视频id为:{id}") # 上传 oss oss_object_key = Oss.stitching_sync_upload_oss(output_path, id) status = oss_object_key.get("status") # 获取 oss 视频地址 oss_object_key = oss_object_key.get("oss_object_key") Common.logger("video").info(f"新拼接视频,oss发送成功,oss地址:{oss_object_key}") if status == 200: time.sleep(10) # 发送成功 已使用视频存入数据库 if video_type == "口播--美文类": cls.insert_video_typeAudio(videos, audio_id, channel_type) else: cls.insert_videoAudio(audio_url, audio_id, channel_type) Common.logger("video").info(f"发送成功 已使用视频存入数据库完成") if os.path.isfile(output_path): os.remove(output_path) Common.logger("video").info(f"文件删除成功{output_path}") else: Common.logger("video").info(f"文件不存在{output_path}") if video_type == "口播--美文类": os.remove("./video_stitching/video_material/koubo.mp4") else: for video in videos: filename = video[2].split("/")[-1] os.remove(f'./video_stitching/video_material/{filename}.mp4') piaoquantv = cls.insert_piaoquantv(oss_object_key, audio_title, video_type, channel_type) if piaoquantv: Common.logger("video").info(f"视频添加到对应用户成功") # 关闭视频文件 for clip in clips: clip.close() # 释放视频对象 video_with_subtitles.close() except Exception as e: Common.logger("video").warning(f"新拼接视频发送oss失败:{e}\n") return # 视频拼接 @classmethod def concatenate_pinjie_videos(cls, videos): clips = [] total_duration = 0 included_videos = [] # 设置最大可使用的内存限制(单位:字节) memory_limit = 6 * 1024 * 1024 * 1024 # 6GB resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit)) # 设置固定 duration_limit = 120 # 遍历每个视频并计算总时长 for i, video in enumerate(videos): filename = video[2].split("/")[-1] clip = VideoFileClip(f'./video_stitching/video_material/{filename}.mp4') clips.append(clip) total_duration += clip.duration if total_duration >= duration_limit: break # 如果总时长小于等于目标时长,则不做视频拼接 if total_duration <= duration_limit: Common.logger("video").info(f"时长小于等于目标时长,不做视频拼接") # 关闭视频文件 for clip in clips: clip.close() for video in videos: filename = video[2].split("/")[-1] os.remove(f'./video_stitching/video_material/{filename}.mp4') return "" else: Common.logger("video").info(f"总时长大于目标时长") remaining_time = duration_limit final_clips = [] for clip, video in zip(clips, videos): if remaining_time - clip.duration >= 0: final_clips.append(clip) included_videos.append(video) remaining_time -= clip.duration else: # 如果剩余时间不足以加入下一个视频,则截断当前视频并返回已包含的URL final_clips.append(clip) included_videos.append(video) break final_clip = concatenate_videoclips(final_clips) # 统一设置视频分辨率 final_width = 320 final_height = 480 video_with_subtitles = final_clip.resize((final_width, final_height)) # 生成视频 video_with_subtitles.write_videofile(output_path, fps=35) if os.path.isfile(output_path): Common.logger("video").info("视频生成成功!生成路径为:", output_path) return included_videos, video_with_subtitles, clips else: Common.logger("video").info("视频生成失败,请检查代码和文件路径。") return "", video_with_subtitles, clips @classmethod def video_stitching_pinjie(cls, video_type, count, channel_type): title_list = Material.get_pinjie_title() # 获取已入库的用户id account_id = cls.get_account_id(channel_type) account = random.choice(account_id) account = str(account).replace('(', '').replace(')', '').replace(',', '') Common.logger("video").info(f"获取用户ID:{account}") # 获取 未使用的视频链接 url_list = cls.get_url_list('', account) if url_list == None: Common.logger("video").info(f"未使用视频链接为空:{url_list}") return videos = [list(item) for item in url_list] videos = Oss.get_oss_url(video_type, videos) # 视频截取 try: audio_url, video_with_subtitles, clips = cls.concatenate_pinjie_videos(videos) if len(audio_url) == 0: Common.logger("video").info(f"视频生成失败") # 随机生成视频id id = cls.random_id() Common.logger("video").info(f"生成视频id为:{id}") # 上传 oss oss_object_key = Oss.stitching_sync_upload_oss(output_path, id) status = oss_object_key.get("status") # 获取 oss 视频地址 oss_object_key = oss_object_key.get("oss_object_key") Common.logger("video").info(f"新拼接视频,oss发送成功,oss地址:{oss_object_key}") if status == 200: time.sleep(10) # 发送成功 已使用视频存入数据库 cls.insert_videoAudio(audio_url, '', channel_type) Common.logger("video").info(f"发送成功 已使用视频存入数据库完成") if os.path.isfile(output_path): os.remove(output_path) Common.logger("video").info(f"文件删除成功{output_path}") else: Common.logger("video").info(f"文件不存在{output_path}") for video in videos: filename = video[2].split("/")[-1] os.remove(f'./video_stitching/video_material/{filename}.mp4') piaoquantv = cls.insert_piaoquantv(oss_object_key, title_list, video_type, channel_type) if piaoquantv: Common.logger("video").info(f"视频添加到对应用户成功") # 关闭视频文件 for clip in clips: clip.close() # 释放视频对象 video_with_subtitles.close() except Exception as e: Common.logger("video").warning(f"新拼接视频发送oss失败:{e}\n") return if __name__ == '__main__': koubo_count = 1 video_type = "口播" channel_type = "douyin" VideoStitching.video_stitching(video_type, koubo_count, channel_type)