123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- import configparser
- import os
- import random
- import subprocess
- import sys
- import time
- import urllib.parse
- import requests
- from datetime import datetime
- sys.path.append(os.getcwd())
- from common.db import MysqlHelper
- from common.material import Material
- from common import Common, Oss, Feishu
- config = configparser.ConfigParser()
- config.read('./config.ini') # 替换为您的配置文件路径
- video_path = config['PATHS']['VIDEO_PATH']
- srt_path = config['PATHS']['SRT_PATH']
- oss_path = config['PATHS']['OSS_PATH']
- txt_path = config['PATHS']['TXT_PATH']
- class AgcVidoe():
- # 获取未使用的视频链接
- @classmethod
- def get_url_list(cls, user, mark, limit_count):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- url_list = f"""SELECT a.video_id,a.account_id,a.oss_object_key FROM agc_video_url a WHERE NOT EXISTS (
- SELECT video_id
- FROM agc_video_deposit b
- WHERE a.oss_object_key = b.oss_object_key AND b.time = '{formatted_time}'
- ) AND a.account_id = {user} and a.`status` = 1 and a.mark = '{mark}' limit {limit_count};"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- return url_list
- # 随机生成id
- @classmethod
- def random_id(cls):
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
- return oss_id
- # 获取已入库的用户id
- @classmethod
- def get_user_id(cls, channel_type, mark):
- account_id = f"""select account_id from agc_video_url where mark = '{mark}' and oss_object_key LIKE '%{channel_type}%' group by account_id ;"""
- account_id = MysqlHelper.get_values(account_id, "prod")
- return account_id
- # 获取已入库数量
- @classmethod
- def get_link_count(cls, mark, platform):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- count = f"""SELECT COUNT(*) AS total_count FROM ( SELECT audio, account_id FROM agc_video_deposit WHERE time = '{formatted_time}' AND platform = '{platform}' and mark = '{mark}' GROUP BY audio, account_id) AS subquery;"""
- count = MysqlHelper.get_values(count, "prod")
- if count == None:
- count = 0
- count = str(count).replace('(', '').replace(')', '').replace(',', '')
- return int(count)
- @classmethod
- def create_subtitle_file(cls, srt, s_path):
- # 创建临时字幕文件
- with open(s_path, 'w') as f:
- f.write(srt)
- @classmethod
- def convert_srt_to_ass(cls, s_path, a_path):
- # 使用 FFmpeg 将 SRT 转换为 ASS
- subprocess.run(["ffmpeg", "-i", s_path, a_path])
- # 新生成视频上传到对应账号下
- @classmethod
- def insert_piaoquantv(cls, oss_object_key, title_list, pq_ids_list):
- for pq_ids in pq_ids_list:
- title_list = [item for item in title_list if item is not None]
- title = random.choice(title_list)
- url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
- payload = dict(pageSource='vlog-pages/post/post-video-post', videoPath=oss_object_key, width='720',
- height='1280', fileExtensions='mp4', viewStatus='1', title=title,
- careModelStatus='1',
- token='f04f58d6e664cbc9902660a1e8d20ce6cd7fdb0f', loginUid=pq_ids,
- versionCode='719',
- machineCode='weixin_openid_o0w175aZ4FJtqVsA1tcozJDJHdDU', appId='wx89e7eb06478361d7',
- clientTimestamp='1703337579331',
- machineInfo='{"sdkVersion":"3.2.5","brand":"iPhone","language":"zh_CN","model":"iPhone 12 Pro<iPhone13,3>","platform":"ios","system":"iOS 15.6.1","weChatVersion":"8.0.44","screenHeight":844,"screenWidth":390,"pixelRatio":3,"windowHeight":762,"windowWidth":390,"softVersion":"4.1.719"}',
- sessionId='1703337560040-27bfe208-a389-f476-db1d-840681e04b32',
- subSessionId='1703337569952-8f56d53c-b36d-760e-8abe-0b4a027cd5bd', senceType='1089',
- hotSenceType='1089', id='1050', channel='pq')
- payload['videoPath'] = oss_object_key
- payload['title'] = title
- data = urllib.parse.urlencode(payload)
- headers = {
- 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.44(0x18002c2d) NetType/WIFI Language/zh_CN',
- 'Accept-Encoding': 'gzip,compress,br,deflate',
- 'Referer': 'https://servicewechat.com/wx89e7eb06478361d7/726/page-frame.html',
- 'Content-Type': 'application/x-www-form-urlencoded',
- 'Cookie': 'JSESSIONID=A60D96E7A300A25EA05425B069C8B459'
- }
- response = requests.post(url, data=data, headers=headers)
- data = response.json()
- code = data["code"]
- if code == 0:
- return True
- else:
- return False
- # 获取视频链接
- @classmethod
- def get_audio_url(cls, uid, mark):
- cookie = Material.get_houtai_cookie()
- url = f"https://admin.piaoquantv.com/manager/video/detail/{uid}"
- payload = {}
- headers = {
- 'authority': 'admin.piaoquantv.com',
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'cookie': cookie,
- 'pragma': 'no-cache',
- 'referer': f'https://admin.piaoquantv.com/cms/post-detail/{uid}/detail',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
- }
- response = requests.request("GET", url, headers=headers, data=payload)
- data = response.json()
- try:
- code = data["code"]
- if code != 0:
- Common.logger("video").info(
- f"未登录,请更换cookie,{data}")
- Feishu.bot('recommend', '管理后台', '管理后台cookie失效,请及时更换~', mark)
- return ""
- audio_url = data["content"]["transedVideoPath"]
- return audio_url
- except Exception as e:
- Common.logger("video").warning(f"获取音频视频链接失败:{e}\n")
- return ""
- # 获取视频时长
- @classmethod
- def get_audio_duration(cls, video_url):
- ffprobe_cmd = [
- "ffprobe",
- "-i", video_url,
- "-show_entries", "format=duration",
- "-v", "quiet",
- "-of", "csv=p=0"
- ]
- output = subprocess.check_output(ffprobe_cmd).decode("utf-8").strip()
- return float(output)
- # 获取视频文件的时长(秒)
- @classmethod
- def get_video_duration(cls, video_file):
- result = subprocess.run(
- ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- "-of", "default=noprint_wrappers=1:nokey=1", video_file],
- capture_output=True, text=True)
- return float(result.stdout)
- # 计算需要拼接的视频
- @classmethod
- def concat_videos_with_subtitles(cls, videos, audio_duration, platform):
- # 计算视频文件列表总时长
- total_video_duration = sum(cls.get_video_duration(video_file[3]) for video_file in videos)
- if platform == "koubo":
- # 视频时长大于音频时长
- if total_video_duration > audio_duration:
- return videos
- # 计算音频秒数与视频秒数的比率,然后加一得到需要的视频数量
- video_audio_ratio = audio_duration / total_video_duration
- videos_needed = int(video_audio_ratio) + 2
- trimmed_video_list = videos * videos_needed
- return trimmed_video_list
- else:
- # 如果视频总时长小于音频时长,则不做拼接
- if total_video_duration < audio_duration:
- Common.logger("video").info(f"时长小于等于目标时长,不做视频拼接")
- return ""
- # 如果视频总时长大于音频时长,则截断视频
- trimmed_video_list = []
- remaining_duration = audio_duration
- for video_file in videos:
- video_duration = cls.get_video_duration(video_file[3])
- if video_duration <= remaining_duration:
- # 如果视频时长小于或等于剩余时长,则将整个视频添加到列表中
- trimmed_video_list.append(video_file)
- remaining_duration -= video_duration
- else:
- trimmed_video_list.append(video_file)
- break
- return trimmed_video_list
- # 已使用视频链接存表
- @classmethod
- def insert_videoAudio(cls, video_files, uid, platform, mark):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- for j in video_files:
- insert_sql = f"""INSERT INTO agc_video_deposit (audio, video_id, account_id, oss_object_key, time, platform, mark) values ('{uid}', '{j[0]}', '{j[1]}', '{j[2]}', '{formatted_time}', {platform}, {mark})"""
- MysqlHelper.update_values(
- sql=insert_sql,
- env="prod",
- machine="",
- )
- # 视频拼接
- @classmethod
- def concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, v_path, mark, t_path):
- video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform)
- if video_files == "":
- return ""
- print(f"{mark}的{platform}:开始拼接视频喽~~~")
- Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~")
- # 将使用视频写入文件中
- with open(f'{t_path}', 'w') as f:
- for video_file in video_files:
- f.write(f"file '{video_file[3]}'\n")
- # 字幕参数
- if os.path.exists(s_path):
- # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=wqy-zenhei,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- else:
- subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=50:fontcolor=black:x=(w-text_w)/2:y=h-text_h-50"
- # 背景色参数
- background_cmd = "drawbox=y=ih-158:color=yellow@1.0:width=iw:height=0:t=fill"
- # 分辨率参数
- resolution_cmd = "-s", "480x854"
- # 多线程数
- num_threads = 4
- # 构建 FFmpeg 命令
- ffmpeg_cmd = [
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{t_path}", # 视频文件列表
- "-i", audio_video, # 音频文件
- "-c:v", "libx264",
- "-c:a", "aac",
- "-threads", str(num_threads),
- "-vf", f"{background_cmd},{subtitle_cmd}", # 添加背景色和字幕
- "-b:v", "9997K",
- "-shortest", # 保持与音频时长一致
- "-map", "0:v:0", # 映射第一个输入的视频流
- "-map", "1:a:0", # 映射第二个输入的音频流
- *resolution_cmd, # 添加分辨率参数
- "-y", # 覆盖输出文件
- v_path
- ]
- # 执行 FFmpeg 命令
- subprocess.run(ffmpeg_cmd)
- print(f"{mark}的{platform}:视频拼接成功啦~~~")
- Common.logger("video").info(f"{mark}的{platform}:视频拼接成功啦~~~")
- return video_files
- @classmethod
- def video_stitching(cls, ex_list):
- try:
- pq_ids = ex_list["pq_id"]
- pq_ids_list = pq_ids.split(',')
- mark = ex_list["mark"]
- feishu_id = ex_list["feishu_id"]
- video_call = ex_list["video_call"]
- parts = video_call.split(',')
- result = []
- for part in parts:
- sub_parts = part.split('--')
- result.append(sub_parts)
- link = result[0][0]
- yhmw_all_count = result[0][1]
- if int(yhmw_all_count) == 0:
- yhmw_count = 0
- else:
- yhmw_count = int(int(yhmw_all_count)/2)
- # kb_link = result[1][0]
- kb_count = int(result[1][1])
- channel = ['douyin', 'kuaishou', 'koubo']
- for platform in channel:
- limit_count = 35
- count = cls.get_link_count(mark, platform)
- if platform == "douyin" and count >= yhmw_count:
- continue
- elif platform == "kuaishou" and count >= yhmw_count:
- continue
- elif platform == "koubo":
- link = result[1][0]
- limit_count = 1
- if kb_count >= count or kb_count == 0:
- Feishu.bot('recommend', 'AGC完成通知', '今日视频拼接完成', mark)
- return mark
- # 获取音频类型+字幕+标题
- uid, srt, title_list = Material.get_all_data(feishu_id, link, mark)
- # 获取已入库的用户id
- user_id = cls.get_user_id(platform, mark)
- user = random.choice(user_id)
- user = str(user).replace('(', '').replace(')', '').replace(',', '')
- Common.logger("video").info(f"{mark}的{platform}渠道获取的用户ID:{user}")
- # 获取 未使用的视频链接
- url_list = cls.get_url_list(user, mark, limit_count)
- if url_list == None:
- Common.logger("video").info(f"未使用视频链接为空:{url_list}")
- return ''
- videos = [list(item) for item in url_list]
- # 下载视频
- videos = Oss.get_oss_url(videos, video_path)
- # srt 文件地址
- s_path = srt_path + mark + ".srt"
- # ass 文件地址
- t_path = txt_path + mark + ".txt"
- # 最终生成视频地址
- v_path = oss_path + mark + ".mp4"
- if srt:
- # 创建临时字幕文件
- cls.create_subtitle_file(srt, s_path)
- Common.logger("video").info(f"SRT 文件目录创建成功")
- # 获取音频
- audio_video = cls.get_audio_url(uid, mark)
- Common.logger("video").info(f"获取需要拼接的音频成功")
- # 获取音频秒数
- audio_duration = cls.get_audio_duration(audio_video)
- Common.logger("video").info(f"获取需要拼接的音频秒数为:{audio_duration}")
- video_files = cls.concatenate_videos(videos, audio_duration, audio_video, platform, s_path, v_path, mark, t_path)
- if video_files == "":
- Common.logger("video").info(f"使用拼接视频为空")
- return ""
- # 随机生成视频oss_id
- oss_id = cls.random_id()
- Common.logger("video").info(f"上传到 OSS 生成视频id为:{oss_id}")
- # 上传 oss
- oss_object_key = Oss.stitching_sync_upload_oss(v_path, oss_id)
- status = oss_object_key.get("status")
- if status == 200:
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger("video").info(f"拼接视频发送成功,OSS 地址:{oss_object_key}")
- time.sleep(10)
- # 已使用视频存入数据库
- Common.logger("video").info(f"开始已使用视频存入数据库")
- cls.insert_videoAudio(video_files, uid, platform, mark)
- Common.logger("video").info(f"完成已使用视频存入数据库")
- Common.logger("video").info(f"开始视频添加到对应用户")
- piaoquantv = cls.insert_piaoquantv(oss_object_key, title_list, pq_ids_list)
- if piaoquantv:
- Common.logger("video").info(f"视频添加到对应用户成功")
- if os.path.isfile(s_path):
- os.remove(s_path)
- os.remove(t_path)
- else:
- Common.logger("video").info(f"文件不存在{s_path}")
- if os.path.isfile(v_path):
- os.remove(v_path)
- else:
- Common.logger("video").info(f"文件不存在{v_path}")
- for video in videos:
- filename = video[2].split("/")[-1]
- os.remove(f'{video_path}{filename}.mp4')
- Common.logger("video").info(f"{mark}的临时文件删除成功")
- return ''
- except Exception as e:
- Common.logger("video").warning(f"拼接视频失败了:{e}\n")
- return ''
|