123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- # -*- coding: utf-8 -*-
- # @Time: 2023/12/26
- import datetime
- import random
- import os
- import re
- import sys
- import time
- import resource
- import requests
- import urllib.parse
- sys.path.append(os.getcwd())
- from datetime import datetime
- from common import Feishu
- from common.aliyun_oss_uploading import Oss
- from common.common import Common
- from common.db import MysqlHelper
- from common.material import Material
- from moviepy.editor import concatenate_videoclips
- from moviepy.video.io.VideoFileClip import VideoFileClip
- from moviepy import editor
- from moviepy.video.compositing.concatenate import concatenate
- output_path = "./video_stitching/video/new_video.mp4"
- class VideoStitching():
- @classmethod
- def split_text(cls, text, max_length):
- words = text.split(' ')
- lines = []
- current_line = ''
- for word in words:
- if len(current_line) + len(word) <= max_length:
- current_line += word + ' '
- else:
- lines.append(current_line.strip())
- current_line = word + ' '
- lines.append(current_line.strip())
- result = ''.join(lines)
- result = result[:11] + '\n' + result[11:] # 在第10个字后面增加换行
- return result
- @classmethod
- def srt_to_seconds(cls,srt_time):
- hours, minutes, seconds = map(float, srt_time.replace(',', '.').split(':'))
- return hours * 3600 + minutes * 60 + seconds
- @classmethod
- def insert_videoAudio(cls, audio_url, i, channel_type):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- if channel_type == "douyin":
- video_type = 0
- elif channel_type == "kschunjie":
- video_type = 4
- else:
- video_type = 2
- for j in audio_url:
- insert_sql = f"""INSERT INTO video_audio (audio, video_id, account_id, oss_object_key, time, video_type) values ('{i}', '{j[0]}', '{j[1]}', '{j[2]}', '{formatted_time}', {video_type})"""
- MysqlHelper.update_values(
- sql=insert_sql,
- env="prod",
- machine="",
- )
- @classmethod
- def insert_video_typeAudio(cls, video, audio_id, channel_type):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- if channel_type == "jieri":
- video_type = 3
- else:
- video_type = 1
- insert_sql = f"""INSERT INTO video_audio (audio, account_id, oss_object_key, time, video_type) values ('{audio_id}', '{video}', '{video}', '{formatted_time}', {video_type})"""
- MysqlHelper.update_values(
- sql=insert_sql,
- env="prod",
- machine="",
- )
- # 随机生成id
- @classmethod
- def random_id(cls):
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
- return id
- @classmethod
- def get_account_id(cls, channel_type):
- account_id = f"""select account_id from video_url where oss_object_key LIKE '%{channel_type}%' group by account_id ;"""
- account_id = MysqlHelper.get_values(account_id, "prod")
- return account_id
- @classmethod
- def get_audio_list(cls):
- audio_list = f"""select oss_object_key from video_koubo_url ;"""
- audio_list = MysqlHelper.get_values(audio_list, "prod")
- return audio_list
- @classmethod
- def get_url_list(cls, audio_id, account):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- url_list = f"""SELECT a.video_id,a.account_id,a.oss_object_key FROM video_url a WHERE NOT EXISTS (
- SELECT video_id
- FROM video_audio b
- WHERE a.oss_object_key = b.oss_object_key AND b.time = '{formatted_time}'
- ) AND a.account_id = {account} and a.`status` = 1 limit 35;"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- return url_list
- @classmethod
- def get_zizhi_url_list(cls, account):
- url_list = f"""SELECT video_id,account_id,oss_object_key FROM video_url where account_id = {account} limit 35;"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- return url_list
- @classmethod
- def get_audio_url_list(cls, videos, audio_id):
- url_list = f"""SELECT audio FROM video_audio where audio = {audio_id} and oss_object_key = '{videos}';"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- return url_list
- # 新生成视频上传到对应账号下
- @classmethod
- def insert_piaoquantv(cls, oss_object_key, title_list, video_type, channel_type):
- #list = ["66481136", "66481137", "66481140", "66481141", "66481142"] 老用户id
- if video_type == "自制--春节":
- title = title_list
- list = ['15924999', '50322241', '50322258', '57463797', '50322235', '57463790', '50322234', '6605563', '18981907', '50322198', '50322239', '57463838', '14500202']
- code = 1
- for i in range(len(list)):
- # title_list = [item for item in title_list if item is not None]
- # title = random.choice(title_list)
- # title = title[0].strip("[]'")
- url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
- payload = dict(pageSource='vlog-pages/post/post-video-post', videoPath=oss_object_key, width='720',
- height='1280', fileExtensions='mp4', viewStatus='1', title=title, careModelStatus='1',
- token='f04f58d6e664cbc9902660a1e8d20ce6cd7fdb0f', loginUid=list[i], versionCode='719',
- machineCode='weixin_openid_o0w175aZ4FJtqVsA1tcozJDJHdDU', appId='wx89e7eb06478361d7',
- clientTimestamp='1703337579331',
- machineInfo='{"sdkVersion":"3.2.5","brand":"iPhone","language":"zh_CN","model":"iPhone 12 Pro<iPhone13,3>","platform":"ios","system":"iOS 15.6.1","weChatVersion":"8.0.44","screenHeight":844,"screenWidth":390,"pixelRatio":3,"windowHeight":762,"windowWidth":390,"softVersion":"4.1.719"}',
- sessionId='1703337560040-27bfe208-a389-f476-db1d-840681e04b32',
- subSessionId='1703337569952-8f56d53c-b36d-760e-8abe-0b4a027cd5bd', senceType='1089',
- hotSenceType='1089', id='1050', channel='pq')
- payload['videoPath'] = oss_object_key
- payload['title'] = title
- data = urllib.parse.urlencode(payload)
- headers = {
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.44(0x18002c2d) NetType/WIFI Language/zh_CN',
- 'Accept-Encoding': 'gzip,compress,br,deflate',
- 'Referer': 'https://servicewechat.com/wx89e7eb06478361d7/726/page-frame.html',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'Cookie': 'JSESSIONID=A60D96E7A300A25EA05425B069C8B459'
- }
- response = requests.post(url, data=data, headers=headers)
- data = response.json()
- code = data["code"]
- if code == 0:
- return True
- else:
- return False
- else:
- if video_type == "口播--美文类":
- list = ["67231152", "67231153", "67231154", "67231155", "67231157"]
- elif channel_type == "douyin":
- list = ["67231113", "67231112", "67231111", "67231110", "67231109"]
- else:
- list = ["67413406", "67413407", "67413408", "67413409", "67413410"]
- code = 1
- for i in range(2):
- item = random.choice(list)
- title_list = [item for item in title_list if item is not None]
- title = random.choice(title_list)
- # title = title[0].strip("[]'")
- url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
- payload = dict(pageSource='vlog-pages/post/post-video-post', videoPath=oss_object_key, width='720',
- height='1280', fileExtensions='mp4', viewStatus='1', title=title, careModelStatus='1',
- token='f04f58d6e664cbc9902660a1e8d20ce6cd7fdb0f', loginUid=item, versionCode='719',
- machineCode='weixin_openid_o0w175aZ4FJtqVsA1tcozJDJHdDU', appId='wx89e7eb06478361d7',
- clientTimestamp='1703337579331',
- machineInfo='{"sdkVersion":"3.2.5","brand":"iPhone","language":"zh_CN","model":"iPhone 12 Pro<iPhone13,3>","platform":"ios","system":"iOS 15.6.1","weChatVersion":"8.0.44","screenHeight":844,"screenWidth":390,"pixelRatio":3,"windowHeight":762,"windowWidth":390,"softVersion":"4.1.719"}',
- sessionId='1703337560040-27bfe208-a389-f476-db1d-840681e04b32',
- subSessionId='1703337569952-8f56d53c-b36d-760e-8abe-0b4a027cd5bd', senceType='1089',
- hotSenceType='1089', id='1050', channel='pq')
- payload['videoPath'] = oss_object_key
- payload['title'] = title
- data = urllib.parse.urlencode(payload)
- headers = {
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.44(0x18002c2d) NetType/WIFI Language/zh_CN',
- 'Accept-Encoding': 'gzip,compress,br,deflate',
- 'Referer': 'https://servicewechat.com/wx89e7eb06478361d7/726/page-frame.html',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'Cookie': 'JSESSIONID=A60D96E7A300A25EA05425B069C8B459'
- }
- response = requests.post(url, data=data, headers=headers)
- data = response.json()
- code = data["code"]
- if code == 0:
- return True
- else:
- return False
- # 视频拼接
- @classmethod
- def concatenate_videos(cls, videos, audio, srt, type_audio):
- clips = []
- total_duration = 0
- included_videos = []
- # 设置最大可使用的内存限制(单位:字节)
- memory_limit = 6 * 1024 * 1024 * 1024 # 6GB
- resource.setrlimit(resource.RLIMIT_AS, (memory_limit, memory_limit))
- # 提取视频的音频
- Common.logger("video").info(f"开始提取视频的音频{audio}")
- video1 = VideoFileClip(audio)
- mp3 = video1.audio
- Common.logger("video").info(f"提取视频的音频成功")
- # 获取音频时长(以秒为单位)
- duration_limit = mp3.duration
- if type_audio == "口播--美文类":
- # 读取视频文件,获取视频时长
- clip = f"./video_stitching/video_material/koubo.mp4"
- clip = VideoFileClip(clip)
- video_duration = clip.duration
- # 计算需要循环播放的视频个数
- n_videos = int(duration_limit / video_duration) + 1
- # 循环生成视频剪辑
- video_clips = [clip] * n_videos
- # 连接视频剪辑
- final_clip = concatenate(video_clips, method="compose")
- # 设置最终剪辑的时长与音频时长一致
- final_clip = final_clip.set_duration(duration_limit)
- else:
- # 遍历每个视频并计算总时长
- for i, video in enumerate(videos):
- filename = video[2].split("/")[-1]
- clip = VideoFileClip(f'./video_stitching/video_material/{filename}.mp4')
- clips.append(clip)
- total_duration += clip.duration
- if total_duration >= duration_limit:
- break
- # 如果总时长小于等于目标时长,则不做视频拼接
- if total_duration <= duration_limit:
- Common.logger("video").info(f"时长小于等于目标时长,不做视频拼接")
- # 关闭视频文件
- for clip in clips:
- clip.close()
- for video in videos:
- filename = video[2].split("/")[-1]
- os.remove(f'./video_stitching/video_material/{filename}.mp4')
- return ""
- else:
- Common.logger("video").info(f"总时长大于目标时长")
- remaining_time = duration_limit
- final_clips = []
- for clip, video in zip(clips, videos):
- if remaining_time - clip.duration >= 0:
- final_clips.append(clip)
- included_videos.append(video)
- remaining_time -= clip.duration
- else:
- # 如果剩余时间不足以加入下一个视频,则截断当前视频并返回已包含的URL
- final_clips.append(clip.subclip(0, remaining_time))
- included_videos.append(video)
- break
- final_clip = concatenate_videoclips(final_clips)
- final_clip = final_clip.set_audio(mp3)
- # 统一设置视频分辨率
- final_width = 320
- final_height = 480
- final_clip = final_clip.resize((final_width, final_height))
- color_clip = editor.ColorClip(size=(final_width, 90),
- color=(255, 255, 0)).set_duration(duration_limit)
- final_clip = editor.CompositeVideoClip([final_clip, color_clip.set_position(("center", final_height - 80))])
- Common.logger("video").info(f"字幕内容为:{srt}")
- if srt != None:
- Common.logger("video").info(f"处理字幕文件")
- # 使用正则表达式提取时间码和字幕内容
- pattern = r"(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]+?(?=\n\d|$))"
- matches = re.findall(pattern, srt)
- Common.logger("video").info(f"字幕{matches}")
- # 获取字幕
- subtitle_clips = []
- for match in matches:
- start = editor.cvsecs(cls.srt_to_seconds(match[0]))
- end = editor.cvsecs(cls.srt_to_seconds(match[1]))
- text = match[2].strip()
- text = cls.split_text(text, 10)
- # 设置背景色
- # color_clip = editor.ColorClip(size=(final_width, 90),
- # color=(255, 255, 0)).set_duration(end - start).set_start(start)
- # final_clip = editor.CompositeVideoClip(
- # [final_clip, color_clip.set_position(("center", final_height - 80))])
- # /System/Library/Fonts/Hiragino Sans GB.ttc 本地字体
- # /usr/share/fonts/truetype/wqy/wqy-zenhei.ttc 服务器地址
- Common.logger("video").info(f"字幕:{text}")
- sub = editor.TextClip(text, font="/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
- fontsize=18, color="black").set_duration(end - start).set_start(
- start).set_position(
- ("center", final_height - 60)).set_opacity(0.8)
- subtitle_clips.append(sub)
- Common.logger("video").info(f"将字幕添加到视频上")
- # 将字幕添加到视频上
- video_with_subtitles = editor.CompositeVideoClip([final_clip] + subtitle_clips)
- else:
- Common.logger("video").info(f"添加固定字幕")
- text_clip = (
- editor.TextClip("分享、转发给群友", font="/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
- fontsize=30, color="black").
- set_position(("center", final_height - 70)).
- set_duration(duration_limit).
- set_opacity(0.8)
- )
- # 把 `文本剪贴板` 贴在视频上
- video_with_subtitles = editor.CompositeVideoClip([final_clip, text_clip])
- # 生成视频
- video_with_subtitles.write_videofile(output_path, fps=35)
- if os.path.isfile(output_path):
- Common.logger("video").info("视频生成成功!生成路径为:", output_path)
- return included_videos, video_with_subtitles, clips
- else:
- Common.logger("video").info("视频生成失败,请检查代码和文件路径。")
- return "", video_with_subtitles, clips
- @classmethod
- def get_audio_url(cls, i):
- cookie = Material.get_houtai_cookie()
- url = f"https://admin.piaoquantv.com/manager/video/detail/{i}"
- payload = {}
- headers = {
- 'authority': 'admin.piaoquantv.com',
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'cookie': cookie,
- 'pragma': 'no-cache',
- 'referer': f'https://admin.piaoquantv.com/cms/post-detail/{i}/detail',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
- }
- response = requests.request("GET", url, headers=headers, data=payload)
- data = response.json()
- try:
- code = data["code"]
- if code != 0:
- Common.logger("video").info(
- f"未登录,请更换cookie,{data}")
- Feishu.bot('recommend', '管理后台', '管理后台cookie失效,请及时更换~')
- return ""
- audio_url = data["content"]["transedVideoPath"]
- print(audio_url)
- return audio_url
- except Exception as e:
- Common.logger("video").warning(f"获取音频视频链接失败:{e}\n")
- return ""
- @classmethod
- def video_stitching(cls, video_type, count, channel_type):
- Common.logger("video").info(f"获取音频类型+字幕")
- # 获取音频类型+字幕
- audio_id, srt, title_list = Material.get_audio_type(video_type, count, channel_type)
- # 获取音频url
- if audio_id == None:
- Common.logger("video").info(f"获取音频url为空")
- return
- audio = cls.get_audio_url(audio_id)
- if audio == "":
- Common.logger("video").info(f"获取音频地址为空")
- return
- Common.logger("video").info(f"获取音频地址:{audio},获取用户id:{audio_id}")
- if video_type == "口播--美文类":
- # 获取已入库的口播视频
- audio_list = cls.get_audio_list()
- videos = [list(item) for item in audio_list]
- videos = random.choice(videos) #随机选择视频
- # 判断该视频+音频是否生成过视频
- # video = VideoStitching.get_audio_url_list(videos, audio_id)
- # if video:
- # Common.logger("video").info(f"该视频+音频已经生成过视频,不做视频拼接")
- # return
- elif video_type == "自制--春节":
- # 获取已入库的用户id
- account_id = cls.get_account_id(channel_type)
- account = random.choice(account_id)
- account = str(account).replace('(', '').replace(')', '').replace(',', '')
- Common.logger("video").info(f"获取用户ID:{account}")
- # 获取视频链接
- url_list = cls.get_zizhi_url_list(account)
- if url_list == None:
- Common.logger("video").info(f"未使用视频链接为空:{url_list}")
- return
- videos = [list(item) for item in url_list]
- else:
- # 获取已入库的用户id
- account_id = cls.get_account_id(channel_type)
- account = random.choice(account_id)
- account = str(account).replace('(', '').replace(')', '').replace(',', '')
- Common.logger("video").info(f"获取用户ID:{account}")
- # 获取 未使用的视频链接
- url_list = cls.get_url_list(audio_id, account)
- if url_list == None:
- Common.logger("video").info(f"未使用视频链接为空:{url_list}")
- return
- videos = [list(item) for item in url_list]
- videos = Oss.get_oss_url(video_type, videos)
- # 视频截取
- try:
- audio_url, video_with_subtitles, clips = cls.concatenate_videos(videos, str(audio), srt, video_type)
- if len(audio_url) == 0:
- Common.logger("video").info(f"视频生成失败")
- # 随机生成视频id
- id = cls.random_id()
- Common.logger("video").info(f"生成视频id为:{id}")
- # 上传 oss
- oss_object_key = Oss.stitching_sync_upload_oss(output_path, id)
- status = oss_object_key.get("status")
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger("video").info(f"新拼接视频,oss发送成功,oss地址:{oss_object_key}")
- if status == 200:
- time.sleep(10)
- # 发送成功 已使用视频存入数据库
- if video_type == "口播--美文类":
- cls.insert_video_typeAudio(videos, audio_id, channel_type)
- else:
- cls.insert_videoAudio(audio_url, audio_id, channel_type)
- Common.logger("video").info(f"发送成功 已使用视频存入数据库完成")
- if os.path.isfile(output_path):
- os.remove(output_path)
- Common.logger("video").info(f"文件删除成功{output_path}")
- else:
- Common.logger("video").info(f"文件不存在{output_path}")
- if video_type == "口播--美文类":
- os.remove("./video_stitching/video_material/koubo.mp4")
- else:
- for video in videos:
- filename = video[2].split("/")[-1]
- os.remove(f'./video_stitching/video_material/{filename}.mp4')
- piaoquantv = cls.insert_piaoquantv(oss_object_key, title_list, video_type, channel_type)
- if piaoquantv:
- Common.logger("video").info(f"视频添加到对应用户成功")
- # 关闭视频文件
- for clip in clips:
- clip.close()
- # 释放视频对象
- video_with_subtitles.close()
- except Exception as e:
- Common.logger("video").warning(f"新拼接视频发送oss失败:{e}\n")
- return
- if __name__ == '__main__':
- koubo_count = 1
- video_type = "口播"
- channel_type = "douyin"
- VideoStitching.video_stitching(video_type, koubo_count, channel_type)
|