1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042 |
- import configparser
- import glob
- import os
- import random
- import re
- import subprocess
- import sys
- import time
- import urllib.parse
- import json
- import requests
- from datetime import datetime, timedelta
- from urllib.parse import urlencode
- sys.path.append(os.getcwd())
- from common.db import MysqlHelper
- from common.material import Material
- from common import Common, Oss, Feishu
- from common.srt import SRT
- config = configparser.ConfigParser()
- config.read('./config.ini') # 替换为您的配置文件路径
- class AgcVidoe():
- # 获取未使用的视频链接
- @classmethod
- def get_url_gs_list(cls, user_list, mark, limit_count):
- for i in range(5):
- user = random.choice(user_list)
- Common.logger("gs_video").info(f"account_id 为{user}")
- current_time = datetime.now()
- three_days_ago = current_time - timedelta(days=3)
- formatted_current_time = current_time.strftime("%Y-%m-%d")
- formatted_three_days_ago = three_days_ago.strftime("%Y-%m-%d")
- if limit_count == 1:
- url_list = f"""SELECT a.video_id, a.account_id, a.oss_object_key
- FROM agc_video_url a
- LEFT JOIN agc_video_deposit b
- ON a.oss_object_key = b.oss_object_key
- AND b.time = '{formatted_current_time}'
- WHERE b.video_id IS NULL
- AND a.account_id = '{user}'
- AND a.status = 1
- AND a.mark = '{mark}'
- LIMIT {limit_count};"""
- Common.logger("gs_video").info(f"{mark}sql{url_list} ")
- url_list = MysqlHelper.get_values(url_list, "prod")
- Common.logger("gs_video").info(f"{mark}查询数据{url_list} ")
- if url_list:
- return url_list
- else:
- url_list = f"""SELECT a.video_id, a.account_id, a.oss_object_key
- FROM agc_video_url a
- LEFT JOIN agc_video_deposit b
- ON a.oss_object_key = b.oss_object_key
- AND b.time >= '{formatted_three_days_ago}'
- AND b.time <= '{formatted_current_time}'
- WHERE b.video_id IS NULL
- AND a.account_id = '{user}'
- AND a.status = 1
- AND a.mark = '{mark}'
- LIMIT {limit_count};"""
- Common.logger("gs_video").info(f"{mark}sql{url_list} ")
- url_list = MysqlHelper.get_values(url_list, "prod")
- Common.logger("gs_video").info(f"{mark}查询数据{url_list} ")
- if url_list:
- if len(url_list) >= 30:
- return url_list
- return None
- # 获取未使用的视频链接
- @classmethod
- def get_url_list(cls, user_list, mark, limit_count):
- for i in range(5):
- user = str(random.choice(user_list))
- user = user.replace('(', '').replace(')', '').replace(',', '')
- current_time = datetime.now()
- three_days_ago = current_time - timedelta(days=3)
- formatted_current_time = current_time.strftime("%Y-%m-%d")
- formatted_three_days_ago = three_days_ago.strftime("%Y-%m-%d")
- if limit_count == 1:
- url_list = f"""SELECT a.video_id, a.account_id, a.oss_object_key
- FROM agc_video_url a
- LEFT JOIN agc_video_deposit b
- ON a.oss_object_key = b.oss_object_key
- AND b.time = '{formatted_current_time}'
- WHERE b.video_id IS NULL
- AND a.account_id = {user}
- AND a.status = 1
- AND a.mark = '{mark}'
- LIMIT {limit_count};"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- if url_list:
- return url_list
- else:
- url_list = f"""SELECT a.video_id, a.account_id, a.oss_object_key
- FROM agc_video_url a
- LEFT JOIN agc_video_deposit b
- ON a.oss_object_key = b.oss_object_key
- AND b.time >= '{formatted_three_days_ago}'
- AND b.time <= '{formatted_current_time}'
- WHERE b.video_id IS NULL
- AND a.account_id = {user}
- AND a.status = 1
- AND a.mark = '{mark}'
- LIMIT {limit_count};"""
- url_list = MysqlHelper.get_values(url_list, "prod")
- if url_list:
- if len(url_list) >= 30:
- return url_list
- return None
- # 随机生成id
- @classmethod
- def random_id(cls):
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
- return oss_id
- # 获取已入库的用户id
- @classmethod
- def get_user_id(cls, channel_type, mark):
- account_id = f"""select account_id from agc_video_url where mark = '{mark}' and oss_object_key LIKE '%{channel_type}%' group by account_id ;"""
- account_id = MysqlHelper.get_values(account_id, "prod")
- return account_id
- # 获取已入库数量
- @classmethod
- def get_link_count(cls, mark, platform):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- count = f"""SELECT COUNT(*) AS total_count FROM ( SELECT audio, account_id FROM agc_video_deposit WHERE time = '{formatted_time}' AND platform = '{platform}' and mark = '{mark}' GROUP BY audio, account_id) AS subquery;"""
- count = MysqlHelper.get_values(count, "prod")
- if count == None:
- count = 0
- count = str(count).replace('(', '').replace(')', '').replace(',', '')
- return int(count)
- # 获取跟随脚本已入库数量
- @classmethod
- def get_link_gs_count(cls, mark):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- count = f"""SELECT COUNT(*) AS total_count FROM ( SELECT audio, account_id FROM agc_video_deposit WHERE time = '{formatted_time}' and mark LIKE '%{mark}%' GROUP BY audio, account_id) AS subquery;"""
- count = MysqlHelper.get_values(count, "prod")
- if count == None:
- count = 0
- count = str(count).replace('(', '').replace(')', '').replace(',', '')
- return int(count)
- # 获取跟随脚本站外已入库数量
- @classmethod
- def get_link_zw_count(cls, mark, platform):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- count = f"""SELECT COUNT(*) AS total_count FROM ( SELECT audio, account_id FROM agc_video_deposit WHERE time = '{formatted_time}' and mark = '{mark}' GROUP BY audio, account_id) AS subquery;"""
- count = MysqlHelper.get_values(count, "prod")
- if count == None:
- count = 0
- count = str(count).replace('(', '').replace(')', '').replace(',', '')
- return int(count)
- # 获取跟随脚本站内已入库数量
- @classmethod
- def get_link_zn_count(cls, mark, platform):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- count = f"""SELECT COUNT(*) AS total_count FROM ( SELECT audio, account_id FROM agc_video_deposit WHERE time = '{formatted_time}' AND platform = '{platform}' and mark = '{mark}' GROUP BY audio, account_id) AS subquery;"""
- count = MysqlHelper.get_values(count, "prod")
- if count == None:
- count = 0
- count = str(count).replace('(', '').replace(')', '').replace(',', '')
- return int(count)
- @classmethod
- def create_subtitle_file(cls, srt, s_path):
- # 创建临时字幕文件
- with open(s_path, 'w') as f:
- f.write(srt)
- @classmethod
- def convert_srt_to_ass(cls, s_path, a_path):
- # 使用 FFmpeg 将 SRT 转换为 ASS
- subprocess.run(["ffmpeg", "-i", s_path, a_path])
- @classmethod
- def get_cover(cls, uid):
- time.sleep(1)
- url = "https://admin.piaoquantv.com/manager/video/multiCover/listV2"
- payload = json.dumps({
- "videoId": uid,
- "range": "2h"
- })
- headers = {
- 'accept': 'application/json',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'content-type': 'application/json',
- 'cookie': 'SESSION=YjU3MzgwNTMtM2QyYi00YjljLWI3YWUtZTBjNWYwMGQzYWNl',
- 'origin': 'https://admin.piaoquantv.com',
- 'pragma': 'no-cache',
- 'priority': 'u=1, i',
- 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- data = response.json()
- content = data["content"]
- if len(content) == 1:
- return content[0]["coverUrl"]
- max_share_count = 0
- selected_cover_url = ""
- for item in content:
- share_count = item.get("shareWeight")
- if share_count is not None and share_count > max_share_count:
- max_share_count = share_count
- selected_cover_url = item["coverUrl"]
- elif share_count == max_share_count and item["createUser"] == "用户":
- selected_cover_url = item["coverUrl"]
- return selected_cover_url
- @classmethod
- def get_title(cls, uid):
- url = "https://admin.piaoquantv.com/manager/video/multiTitleV2/listV2"
- payload = json.dumps({
- "videoId": uid,
- "range": "4h"
- })
- headers = {
- 'accept': 'application/json',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'content-type': 'application/json',
- 'cookie': 'SESSION=YjU3MzgwNTMtM2QyYi00YjljLWI3YWUtZTBjNWYwMGQzYWNl',
- 'origin': 'https://admin.piaoquantv.com',
- 'pragma': 'no-cache',
- 'priority': 'u=1, i',
- 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- data = response.json()
- content = data["content"]
- if len(content) == 1:
- return content[0]["title"]
- max_share_count = 0
- selected_title = ""
- for item in content:
- share_count = item.get("shareWeight")
- if share_count is not None and share_count > max_share_count:
- max_share_count = share_count
- selected_title = item["title"]
- elif share_count == max_share_count and item["createUser"] == "用户":
- selected_title = item["title"]
- return selected_title
- # 新生成视频上传到对应账号下
- @classmethod
- def insert_piaoquantv(cls, oss_object_key, audio_title, pq_ids_list, cover, uid):
- if audio_title == '' or None == audio_title:
- title = cls.get_title(uid)
- else:
- if '/' in audio_title:
- new_titles = audio_title.split('/')
- else:
- new_titles = [audio_title]
- title = random.choice(new_titles)
- cover_url = ''
- if None == cover or cover == '':
- cover_url = cls.get_cover(uid)
- pq_id_list = random.choice(pq_ids_list)
- url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
- headers = {
- 'User-Agent': 'PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0',
- 'cookie': 'JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78',
- 'referer': 'http://appspeed.piaoquantv.com',
- 'token': '524a8bc871dbb0f4d4717895083172ab37c02d2f',
- 'accept-language': 'zh-CN,zh-Hans;q=0.9',
- 'Content-Type': 'application/x-www-form-urlencoded'
- }
- payload = {
- 'coverImgPath': cover_url,
- 'deviceToken': '9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408',
- 'fileExtensions': 'MP4',
- 'loginUid': pq_id_list,
- 'networkType': 'Wi-Fi',
- 'platform': 'iOS',
- 'requestId': 'fb972cbd4f390afcfd3da1869cd7d001',
- 'sessionId': '362290597725ce1fa870d7be4f46dcc2',
- 'subSessionId': '362290597725ce1fa870d7be4f46dcc2',
- 'title': title,
- 'token': '524a8bc871dbb0f4d4717895083172ab37c02d2f',
- 'uid': pq_id_list,
- 'versionCode': '486',
- 'versionName': '3.4.12',
- 'videoFromScene': '1',
- 'videoPath': oss_object_key,
- 'viewStatus': '1'
- }
- encoded_payload = urlencode(payload)
- requests.request("POST", url, headers=headers, data=encoded_payload)
- return True
- # 获取视频链接
- @classmethod
- def get_audio_url(cls, uid, mark, mark_name):
- cookie = Material.get_houtai_cookie()
- url = f"https://admin.piaoquantv.com/manager/video/detail/{uid}"
- payload = {}
- headers = {
- 'authority': 'admin.piaoquantv.com',
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'cookie': cookie,
- 'pragma': 'no-cache',
- 'referer': f'https://admin.piaoquantv.com/cms/post-detail/{uid}/detail',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
- }
- response = requests.request("GET", url, headers=headers, data=payload)
- data = response.json()
- try:
- code = data["code"]
- if code != 0:
- Common.logger("video").info(
- f"未登录,请更换cookie,{data}")
- Feishu.bot('recommend', '管理后台', '管理后台cookie失效,请及时更换~', mark, mark_name)
- return ""
- audio_url = data["content"]["transedVideoPath"]
- # audio_title = data["content"]['title']
- return audio_url
- except Exception as e:
- Common.logger("video").warning(f"获取音频视频链接失败:{e}\n")
- return ""
- # 获取视频时长
- @classmethod
- def get_audio_duration(cls, video_url):
- ffprobe_cmd = [
- "ffprobe",
- "-i", video_url,
- "-show_entries", "format=duration",
- "-v", "quiet",
- "-of", "csv=p=0"
- ]
- output = subprocess.check_output(ffprobe_cmd).decode("utf-8").strip()
- return float(output)
- # 获取视频文件的时长(秒)
- @classmethod
- def get_video_duration(cls, video_file):
- result = subprocess.run(
- ["ffprobe", "-v", "error", "-show_entries", "format=duration",
- "-of", "default=noprint_wrappers=1:nokey=1", video_file],
- capture_output=True, text=True)
- return float(result.stdout)
- @classmethod
- def clear_mp4_files(cls, folder_path):
- # 获取文件夹中所有扩展名为 '.mp4' 的文件路径列表
- mp4_files = glob.glob(os.path.join(folder_path, '*.mp4'))
- if not mp4_files:
- return
- # 遍历并删除所有 .mp4 文件
- for mp4_file in mp4_files:
- os.remove(mp4_file)
- print(f"文件夹 '{folder_path}' 中的所有 .mp4 文件已清空。")
- # 计算需要拼接的视频
- @classmethod
- def concat_videos_with_subtitles(cls, videos, audio_duration, platform, mark):
- # 计算视频文件列表总时长
- if platform == "baokuai":
- total_video_duration = sum(cls.get_video_duration(video_file) for video_file in videos)
- else:
- total_video_duration = sum(cls.get_video_duration(video_file[3]) for video_file in videos)
- if platform == "koubo" or platform == "zhannei" or platform == "baokuai":
- # 视频时长大于音频时长
- if total_video_duration > audio_duration:
- return videos
- # 计算音频秒数与视频秒数的比率,然后加一得到需要的视频数量
- video_audio_ratio = audio_duration / total_video_duration
- videos_needed = int(video_audio_ratio) + 2
- trimmed_video_list = videos * videos_needed
- return trimmed_video_list
- else:
- # 如果视频总时长小于音频时长,则不做拼接
- if total_video_duration < audio_duration:
- Common.logger("video").info(f"{mark}的{platform}渠道时长小于等于目标时长,不做视频拼接")
- return ""
- # 如果视频总时长大于音频时长,则截断视频
- trimmed_video_list = []
- remaining_duration = audio_duration
- for video_file in videos:
- video_duration = cls.get_video_duration(video_file[3])
- if video_duration <= remaining_duration:
- # 如果视频时长小于或等于剩余时长,则将整个视频添加到列表中
- trimmed_video_list.append(video_file)
- remaining_duration -= video_duration
- else:
- trimmed_video_list.append(video_file)
- break
- return trimmed_video_list
- # 已使用视频链接存表
- @classmethod
- def insert_videoAudio(cls, video_files, uid, platform, mark):
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d")
- for j in video_files:
- insert_sql = f"""INSERT INTO agc_video_deposit (audio, video_id, account_id, oss_object_key, time, platform, mark) values ('{uid}', '{j[0]}', '{j[1]}', '{j[2]}', '{formatted_time}', '{platform}', '{mark}')"""
- MysqlHelper.update_values(
- sql=insert_sql,
- env="prod",
- machine="",
- )
- #文件没有则创建目录
- @classmethod
- def create_folders(cls, mark):
- oss_id = cls.random_id()
- video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/"
- # srt 目录
- s_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/srt/"
- # oss 目录
- v_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/oss/"
- if not os.path.exists(video_path_url):
- os.makedirs(video_path_url)
- if not os.path.exists(s_path_url):
- os.makedirs(s_path_url)
- if not os.path.exists(v_path_url):
- os.makedirs(v_path_url)
- # srt 文件地址
- s_path = s_path_url + mark + f"{str(oss_id)}.srt"
- # 最终生成视频地址
- v_path = v_path_url + mark + f"{str(oss_id)}.mp4"
- v_oss_path = v_path_url + mark + f"{str(oss_id)}oss.mp4"
- return s_path, v_path, video_path_url, v_oss_path
- # 视频秒数转换
- @classmethod
- def seconds_to_srt_time(cls, seconds):
- hours = int(seconds // 3600)
- minutes = int((seconds % 3600) // 60)
- seconds = seconds % 60
- milliseconds = int((seconds - int(seconds)) * 1000)
- return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
- # 视频拼接
- @classmethod
- def concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path ):
- video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark)
- Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}")
- Common.logger("video").info(f"{mark}的{platform}渠道待生成视频为:{video_files}")
- if video_files == "":
- return ""
- print(f"{mark}的{platform}:开始拼接视频喽~~~")
- Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~")
- if os.path.exists(s_path):
- # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- else:
- start_time = cls.seconds_to_srt_time(0)
- end_time = cls.seconds_to_srt_time(audio_duration)
- with open(s_path, 'w') as f:
- f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n")
- # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- # 背景色参数
- background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill"
- if platform == "koubo" or platform == "zhannei":
- text_ptah = cls.bk_text_folders(mark)
- with open(text_ptah, 'w') as f:
- for file in video_files:
- f.write(f"file '{file[3]}'\n")
- # 多线程数
- num_threads = 4
- ffmpeg_cmd_oss = [
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{text_ptah}", # 视频文件列表
- "-i", audio_video, # 音频文件
- "-c:v", "libx264",
- "-c:a", "aac",
- "-threads", str(num_threads),
- "-vf", f"scale=320x480,{background_cmd},{subtitle_cmd}", # 添加背景色和字幕
- "-t", str(int(audio_duration)), # 保持与音频时长一致
- "-map", "0:v:0", # 映射第一个输入的视频流
- "-map", "1:a:0", # 映射第二个输入的音频流
- "-y", # 覆盖输出文件
- v_oss_path
- ]
- try:
- subprocess.run(ffmpeg_cmd_oss)
- print("视频处理完成!")
- if os.path.isfile(text_ptah):
- os.remove(text_ptah)
- except subprocess.CalledProcessError as e:
- print(f"视频处理失败:{e}")
- else:
- VIDEO_COUNTER = 0
- FF_INPUT = ""
- FF_SCALE = ""
- FF_FILTER = ""
- ffmpeg_cmd = ["ffmpeg"]
- for videos in video_files:
- Common.logger("video").info(f"{mark}的{platform}视频:{videos[3]}")
- # 添加输入文件
- FF_INPUT += f" -i {videos[3]}"
- # 为每个视频文件统一长宽,并设置SAR(采样宽高比)
- FF_SCALE += f"[{VIDEO_COUNTER}:v]scale=320x480,setsar=1[v{VIDEO_COUNTER}];"
- # 为每个视频文件创建一个输入流,并添加到-filter_complex参数中
- FF_FILTER += f"[v{VIDEO_COUNTER}][{VIDEO_COUNTER}:a]"
- # 增加视频计数器
- VIDEO_COUNTER += 1
- # 构建最终的FFmpeg命令
- ffmpeg_cmd.extend(FF_INPUT.split())
- ffmpeg_cmd.extend(["-filter_complex", f"{FF_SCALE}{FF_FILTER}concat=n={VIDEO_COUNTER}:v=1:a=1[v][a]",
- "-map", "[v]", "-map", "[a]", v_path])
- # 多线程数
- num_threads = 4
- # 构建 FFmpeg 命令,生成视频
- ffmpeg_cmd_oss = [
- "ffmpeg",
- "-i", v_path, # 视频文件列表
- "-i", audio_video, # 音频文件
- "-c:v", "libx264", # 复制视频流
- "-c:a", "aac", # 编码音频流为AAC
- "-threads", str(num_threads),
- "-vf", f"{background_cmd},{subtitle_cmd}", # 添加背景色和字幕
- "-t", str(int(audio_duration)), # 保持与音频时长一致
- "-map", "0:v:0", # 映射第一个输入的视频流
- "-map", "1:a:0", # 映射第二个输入的音频流
- "-y", # 覆盖输出文件
- v_oss_path
- ]
- try:
- subprocess.run(ffmpeg_cmd)
- if os.path.isfile(v_path):
- subprocess.run(ffmpeg_cmd_oss)
- print("视频处理完成!")
- except subprocess.CalledProcessError as e:
- print(f"视频处理失败:{e}")
- print(f"{mark}的{platform}:视频拼接成功啦~~~")
- Common.logger("video").info(f"{mark}的{platform}:视频拼接成功啦~~~")
- return video_files
- # 常规任务
- @classmethod
- def video_stitching(cls, ex_list):
- pq_ids = ex_list["pq_id"]
- pq_ids_list = pq_ids.split(',')
- mark_name = ex_list['mark_name']
- mark = ex_list["mark"]
- feishu_id = ex_list["feishu_id"]
- video_call = ex_list["video_call"]
- parts = video_call.split(',')
- result = []
- for part in parts:
- sub_parts = part.split('--')
- result.append(sub_parts)
- link = result[0][0]
- yhmw_all_count = result[0][1]
- if int(yhmw_all_count) == 0:
- yhmw_count = 0
- else:
- yhmw_count = int(int(yhmw_all_count)/2)
- # 如果没有该文件目录则创建,有文件目录的话 则删除文件
- s_path, v_path, video_path_url, v_oss_path = cls.create_folders(mark)
- kb_count = int(result[1][1])
- channel = ['kuaishou', 'douyin', 'koubo']
- try:
- for platform in channel:
- limit_count = 35
- count = cls.get_link_count(mark, platform)
- if platform == "douyin" and count >= yhmw_count:
- continue
- elif platform == "kuaishou" and count >= yhmw_count:
- continue
- elif platform == "koubo":
- link = result[1][0]
- limit_count = 1
- if count >= kb_count or kb_count == 0:
- Feishu.bot('recommend', 'AGC完成通知', '今日常规自制视频拼接任务完成啦~', mark, mark_name)
- return mark
- # 获取音频类型+字幕+标题
- uid, srt, video_list, cover_status, audio_title = Material.get_all_data(feishu_id, link, mark)
- # 获取已入库的用户id
- user_id = cls.get_user_id(platform, mark)
- # 获取 未使用的视频链接
- url_list = cls.get_url_list(user_id, mark, limit_count)
- if url_list == None:
- Common.logger("video").info(f"未使用视频链接为空:{url_list}")
- return ''
- videos = [list(item) for item in url_list]
- # 下载视频
- videos = Oss.get_oss_url(videos, video_path_url)
- if srt:
- # 创建临时字幕文件
- cls.create_subtitle_file(srt, s_path)
- Common.logger("video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功")
- else:
- srt_new = SRT.getSrt(int(uid))
- if srt_new:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [[mark, str(uid), srt_new , formatted_time]]
- Feishu.insert_columns("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "ROWS", 1, 2)
- random_wait_time = random.uniform(0.5, 2.5)
- time.sleep(random_wait_time)
- Feishu.update_values("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "A2:Z2", values)
- # 创建临时字幕文件
- cls.create_subtitle_file(srt_new, s_path)
- Common.logger("gs_video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功")
- # 获取音频
- audio_video = cls.get_audio_url(uid, mark, mark_name)
- Common.logger("video").info(f"{mark}的{platform}渠道获取需要拼接的音频成功")
- # 获取音频秒数
- audio_duration = cls.get_audio_duration(audio_video)
- Common.logger("video").info(f"{mark}的{platform}渠道获取需要拼接的音频秒数为:{audio_duration}")
- video_files = cls.concatenate_videos(videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path)
- if video_files == "":
- Common.logger("video").info(f"{mark}的{platform}渠道使用拼接视频为空")
- return ""
- if os.path.isfile(v_oss_path):
- Common.logger("video").info(f"{mark}的{platform}渠道新视频生成成功")
- else:
- Common.logger("video").info(f"{mark}的{platform}渠道新视频生成失败")
- return ""
- # 随机生成视频oss_id
- oss_id = cls.random_id()
- # 获取新生成视频时长
- v_path_duration = cls.get_audio_duration(v_oss_path)
- if v_path_duration > audio_duration+3 or v_path_duration < audio_duration-3:
- print(f"{mark}的{platform}渠道最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- Common.logger("video").info(f"{mark}的{platform}渠道最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- return ""
- # 上传 oss
- Common.logger("video").info(f"{mark}的{platform}渠道上传到 OSS 生成视频id为:{oss_id}")
- oss_object_key = Oss.stitching_sync_upload_oss(v_oss_path, oss_id)
- status = oss_object_key.get("status")
- if status == 200:
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger("video").info(f"{mark}的{platform}渠道拼接视频发送成功,OSS 地址:{oss_object_key}")
- time.sleep(10)
- # 已使用视频存入数据库
- Common.logger("video").info(f"{mark}的{platform}渠道开始已使用视频存入数据库")
- cls.insert_videoAudio(video_files, uid, platform, mark)
- Common.logger("video").info(f"{mark}的{platform}渠道完成已使用视频存入数据库")
- Common.logger("video").info(f"{mark}的{platform}渠道开始视频添加到对应用户")
- piaoquantv = cls.insert_piaoquantv(oss_object_key, audio_title, pq_ids_list, cover_status, uid)
- if piaoquantv:
- Common.logger("video").info(f"{mark}的{platform}渠道视频添加到对应用户成功")
- if os.path.isfile(v_oss_path):
- os.remove(v_oss_path)
- if os.path.isfile(v_path):
- os.remove(v_path)
- if os.path.isfile(s_path):
- os.remove(s_path)
- # 清空所有mp4数据
- cls.clear_mp4_files(video_path_url)
- return ''
- except Exception as e:
- Common.logger("video").warning(f"{mark}的视频拼接失败:{e}\n")
- return ''
- # 脚本跟随任务
- @classmethod
- def video_gs_stitching(cls, ex_list):
- pq_ids = ex_list["pq_id"]
- pq_ids_list = pq_ids.split(',') # 账号ID
- mark_name = ex_list['mark_name'] # 负责人
- mark = ex_list["mark"] # 标示
- feishu_id = ex_list["feishu_id"] # 飞书文档ID
- video_call = ex_list["video_call"]
- parts = video_call.split(',')
- result = []
- for part in parts:
- sub_parts = part.split('--')
- result.append(sub_parts)
- link = result[0][0] # 脚本链接
- count = result[0][1] # 生成条数
- zd_count = ex_list["zd_count"] # 生成总条数
- # # 总条数
- # result = re.match(r'([^0-9]+)', mark).group()
- # all_count = cls.get_link_gs_count(result)
- # if all_count >= int(zd_count):
- # Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', mark.split("-")[0], mark_name)
- # return mark
- # 获取音频类型+字幕+标题
- uid, srt, video_list, cover, audio_title = Material.get_all_data(feishu_id, link, mark)
- platform_list = ex_list["platform_list"] # 渠道
- # 如果没有该文件目录则创建,有文件目录的话 则删除文件
- s_path, v_path, video_path_url, v_oss_path = cls.create_folders(mark)
- platform = ''
- if platform_list:
- platform_name_list = random.choice(platform_list)
- platform_name = platform_name_list[1]
- platform_url = platform_name_list[0]
- if platform_name == "快手":
- platform = 'kuaishou'
- elif platform_name == "抖音":
- platform = 'douyin'
- zw_count = cls.get_link_zw_count(mark, "zhannei")
- if zw_count >= int(count):
- return video_call
- # 获取所有视频素材ID
- video_list = Material.get_user_id(feishu_id, platform_url)
- limit_count = 35
- else:
- platform = 'zhannei'
- zw_count = cls.get_link_zn_count(mark, platform)
- if zw_count >= int(count):
- return video_call
- limit_count = 1
- Common.logger("gs_video").info(f"{mark}的{platform} 开始查询 {video_list}")
- url_list = cls.get_url_gs_list(video_list, mark, limit_count)
- if url_list == None:
- Common.logger("gs_video").info(f"{mark}的{platform} 渠道 视频画面不足无法拼接")
- return
- videos = [list(item) for item in url_list]
- try:
- # 下载视频
- videos = Oss.get_oss_url(videos, video_path_url)
- if srt:
- # 创建临时字幕文件
- cls.create_subtitle_file(srt, s_path)
- Common.logger("gs_video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功")
- else:
- srt_new = SRT.getSrt(int(uid))
- if srt_new:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [[mark, str(uid), srt_new, formatted_time]]
- Feishu.insert_columns("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "ROWS", 1, 2)
- random_wait_time = random.uniform(0.5, 2.5)
- time.sleep(random_wait_time)
- Feishu.update_values("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "A2:Z2", values)
- # 创建临时字幕文件
- cls.create_subtitle_file(srt_new, s_path)
- Common.logger("gs_video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功")
- # 获取音频
- audio_video = cls.get_audio_url(uid, mark, mark_name)
- Common.logger("gs_video").info(f"{mark}的{platform}渠道获取需要拼接的音频成功")
- # 获取音频秒数
- audio_duration = cls.get_audio_duration(audio_video)
- Common.logger("gs_video").info(f"{mark}的{platform}渠道获取需要拼接的音频秒数为:{audio_duration}")
- video_files = cls.concatenate_videos(videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path)
- if video_files == "":
- Common.logger("gs_video").info(f"{mark}的{platform}渠道使用拼接视频为空")
- return ""
- if os.path.isfile(v_oss_path):
- Common.logger("gs_video").info(f"{mark}的{platform}渠道新视频生成成功")
- else:
- Common.logger("gs_video").info(f"{mark}的{platform}渠道新视频生成失败")
- return ""
- # 随机生成视频oss_id
- oss_id = cls.random_id()
- # 获取新生成视频时长
- v_path_duration = cls.get_audio_duration(v_oss_path)
- if v_path_duration > audio_duration+3 or v_path_duration < audio_duration-3:
- print(f"{mark}的{platform}渠道最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- Common.logger("gs_video").info(f"{mark}的{platform}渠道最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- return ""
- # 上传 oss
- Common.logger("gs_video").info(f"{mark}的{platform}渠道上传到 OSS 生成视频id为:{oss_id}")
- oss_object_key = Oss.stitching_sync_upload_oss(v_oss_path, oss_id)
- status = oss_object_key.get("status")
- if status == 200:
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger("gs_video").info(f"{mark}的{platform}渠道拼接视频发送成功,OSS 地址:{oss_object_key}")
- time.sleep(10)
- # 已使用视频存入数据库
- Common.logger("gs_video").info(f"{mark}的{platform}渠道开始已使用视频存入数据库")
- cls.insert_videoAudio(video_files, uid, platform, mark)
- Common.logger("gs_video").info(f"{mark}的{platform}渠道完成已使用视频存入数据库")
- Common.logger("gs_video").info(f"{mark}的{platform}渠道开始视频添加到对应用户")
- piaoquantv = cls.insert_piaoquantv(oss_object_key, audio_title, pq_ids_list, cover, uid)
- if piaoquantv:
- Common.logger("gs_video").info(f"{mark}的{platform}渠道视频添加到对应用户成功")
- if os.path.isfile(v_oss_path):
- os.remove(v_oss_path)
- if os.path.isfile(v_path):
- os.remove(v_path)
- if os.path.isfile(s_path):
- os.remove(s_path)
- # 清空所有mp4数据
- for file_path in videos:
- os.remove(file_path[3])
- print(f"已删除文件:{file_path[3]}")
- return ''
- except Exception as e:
- Common.logger("gs_video").warning(f"{mark}的视频拼接失败:{e}\n")
- return ''
- # 爆款跟随任务
- @classmethod
- def video_bk_stitching(cls, ex_list):
- pq_ids = ex_list["pq_id"]
- pq_ids_list = pq_ids.split(',') # 账号ID
- mark_name = ex_list['mark_name'] # 负责人
- mark = ex_list["mark"] # 标示
- feishu_id = ex_list["feishu_id"] # 飞书文档ID
- video_call = ex_list["video_call"] #脚本sheet
- platform = 'baokuai'
- list_data = Material.get_allbk_data(feishu_id, video_call, mark)
- if len(list_data) == 0:
- Feishu.bot('recommend', 'AGC脚本通知', f'今日没有爆款视频拼接任务', mark.split("-")[0], mark_name)
- return mark
- # 如果没有该文件目录则创建,有文件目录的话 则删除文件
- s_path, v_path, video_path_url, v_oss_path = cls.create_folders(mark)
- for data in list_data:
- try:
- uid = data['uid'] # 音频id
- srt = data['text'] # srt
- videos = data['video']
- cover = data['cover']
- audio_title = data['title']
- if ',' in videos:
- videos = str(videos).split(',')
- else:
- videos = [str(videos)]
- if srt:
- # 创建临时字幕文件
- cls.create_subtitle_file(srt, s_path)
- Common.logger("bk_video").info(f"S{mark} 文件目录创建成功")
- else:
- srt_new = SRT.getSrt(int(uid))
- if srt_new:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [[mark, str(uid), srt_new, formatted_time]]
- Feishu.insert_columns("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "ROWS", 1, 2)
- random_wait_time = random.uniform(0.5, 2.5)
- time.sleep(random_wait_time)
- Feishu.update_values("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "A2:Z2", values)
- # 创建临时字幕文件
- cls.create_subtitle_file(srt_new, s_path)
- Common.logger("gs_video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功")
- # 获取音频
- audio_video = cls.get_audio_url(uid, mark, mark_name)
- Common.logger("bk_video").info(f"{mark}获取需要拼接的音频成功")
- # 获取音频秒数
- audio_duration = cls.get_audio_duration(audio_video)
- video = random.choice(videos)
- video_url = cls.get_zn_video(video, mark, mark_name)
- download_video = Oss.get_bk_url(video_url, video_path_url, video)
- if download_video:
- video_files = cls.bk_concatenate_videos(download_video, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path)
- if video_files == "":
- Common.logger("bk_video").info(f"{mark}的{platform}渠道使用拼接视频为空")
- continue
- if os.path.isfile(v_oss_path):
- Common.logger("bk_video").info(f"{mark}的{platform}渠道新视频生成成功")
- else:
- Common.logger("bk_video").info(f"{mark}的{platform}渠道新视频生成失败")
- continue
- # 随机生成视频oss_id
- oss_id = cls.random_id()
- # 获取新生成视频时长
- v_path_duration = cls.get_audio_duration(v_oss_path)
- if v_path_duration > audio_duration + 3 or v_path_duration < audio_duration - 3:
- print(f"{mark}最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- Common.logger("gs_video").info(
- f"{mark}最终生成视频秒数错误,生成了:{v_path_duration}秒,实际秒数{audio_duration}")
- continue
- # 上传 oss
- Common.logger("bk_video").info(f"{mark}上传到 OSS 生成视频id为:{oss_id}")
- oss_object_key = Oss.stitching_sync_upload_oss(v_oss_path, oss_id)
- status = oss_object_key.get("status")
- if status == 200:
- # 获取 oss 视频地址
- oss_object_key = oss_object_key.get("oss_object_key")
- Common.logger("bk_video").info(f"{mark}拼接视频发送成功,OSS 地址:{oss_object_key}")
- time.sleep(10)
- Common.logger("bk_video").info(f"{mark}开始视频添加到对应用户")
- piaoquantv = cls.insert_piaoquantv(oss_object_key, audio_title, pq_ids_list, cover, uid)
- if piaoquantv:
- Common.logger("bk_video").info(f"{mark}视频添加到对应用户成功")
- if os.path.isfile(v_oss_path):
- os.remove(v_oss_path)
- if os.path.isfile(v_path):
- os.remove(v_path)
- if os.path.isfile(s_path):
- os.remove(s_path)
- # 清空所有mp4数据
- for file_path in download_video:
- os.remove(file_path)
- print(f"已删除文件:{file_path}")
- random_wait_time = random.randint(10, 80)
- time.sleep(random_wait_time)
- else:
- Common.logger("bk_video").info(f"{mark}的视频下载视频")
- continue
- except Exception as e:
- Common.logger("bk_video").warning(f"{mark}的视频拼接失败:{e}\n")
- continue
- Feishu.bot('recommend', 'AGC完成通知', f'今日脚本爆款视频拼接任务完成,共{len(list_data)}条', mark.split("-")[0], mark_name)
- return mark
- @classmethod
- def get_zn_video(cls, video, mark, mark_name):
- cookie = Material.get_houtai_cookie()
- url = f"https://admin.piaoquantv.com/manager/video/detail/{video}"
- payload = {}
- headers = {
- 'authority': 'admin.piaoquantv.com',
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'zh-CN,zh;q=0.9',
- 'cache-control': 'no-cache',
- 'cookie': cookie,
- 'pragma': 'no-cache',
- 'referer': f'https://admin.piaoquantv.com/cms/post-detail/{video}/detail',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
- }
- response = requests.request("GET", url, headers=headers, data=payload)
- data = response.json()
- code = data["code"]
- if code != 0:
- if "-" in mark:
- mark1 = mark.split("-")[0]
- Common.logger("video").info(
- f"未登录,请更换cookie,{data}")
- Feishu.bot('recommend', '管理后台', '管理后台cookie失效,请及时更换~', mark1, mark_name)
- return
- video_url = data["content"]["transedVideoPath"]
- return video_url
- # text文件没有则创建目录
- @classmethod
- def bk_text_folders(cls, mark):
- oss_id = cls.random_id()
- v_text_url = config['PATHS']['VIDEO_PATH'] + mark + "/text/"
- if not os.path.exists(v_text_url):
- os.makedirs(v_text_url)
- # srt 文件地址
- text_path = v_text_url + mark + f"{str(oss_id)}.text"
- return text_path
- # 爆款视频拼接
- @classmethod
- def bk_concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path):
- text_ptah = cls.bk_text_folders(mark)
- video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark)
- with open(text_ptah, 'w') as f:
- for file in video_files:
- f.write(f"file '{file}'\n")
- Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}")
- if video_files == "":
- return ""
- print(f"{mark}的{platform}:开始拼接视频喽~~~")
- Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~")
- if os.path.exists(s_path):
- # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- else:
- start_time = cls.seconds_to_srt_time(0)
- end_time = cls.seconds_to_srt_time(audio_duration)
- with open(s_path, 'w') as f:
- f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n")
- # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15"
- subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'"
- # 背景色参数
- background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill"
- # 多线程数
- num_threads = 4
- # 构建 FFmpeg 命令,生成视频
- ffmpeg_cmd_oss = [
- "ffmpeg",
- "-f", "concat",
- "-safe", "0",
- "-i", f"{text_ptah}", # 视频文件列表
- "-i", audio_video, # 音频文件
- "-c:v", "libx264",
- "-c:a", "aac",
- "-threads", str(num_threads),
- "-vf", f"scale=320x480,{background_cmd},{subtitle_cmd}", # 添加背景色和字幕
- "-t", str(int(audio_duration)), # 保持与音频时长一致
- "-map", "0:v:0", # 映射第一个输入的视频流
- "-map", "1:a:0", # 映射第二个输入的音频流
- "-y", # 覆盖输出文件
- v_oss_path
- ]
- try:
- subprocess.run(ffmpeg_cmd_oss)
- print("视频处理完成!")
- if os.path.isfile(text_ptah):
- os.remove(text_ptah)
- except subprocess.CalledProcessError as e:
- print(f"视频处理失败:{e}")
- print(f"{mark}:视频拼接成功啦~~~")
- Common.logger("video").info(f"{mark}:视频拼接成功啦~~~")
- return video_files
|