import configparser import os import random import re import sys import threading import time from datetime import datetime import concurrent.futures from common.tts_help import TTS sys.path.append(os.getcwd()) from common import Material, Feishu, Common, Oss from common.ffmpeg import FFmpeg from common.gpt4o_help import GPT4o from data_channel.douyin import DY from data_channel.kuaishou import KS from data_channel.kuaishouchuangzuozhe import KsFeedVideo from data_channel.piaoquan import PQ from common.sql_help import sqlCollect from data_channel.shipinhao import SPH config = configparser.ConfigParser() config.read('./config.ini') class getVideo: """ 根据标示+任务标示创建目录 """ @classmethod def create_folders(cls, mark, task_mark): video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/" if not os.path.exists(video_path_url): os.makedirs(video_path_url) return video_path_url """ 随机生成ID """ @classmethod def random_id(cls): now = datetime.now() rand_num = random.randint(10000, 99999) oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num) return oss_id """ 删除文件 """ @classmethod def remove_files(cls, video_path_url): if os.path.exists(video_path_url) and os.path.isdir(video_path_url): for root, dirs, files in os.walk(video_path_url): for file in files: file_path = os.path.join(root, file) os.remove(file_path) for dir in dirs: dir_path = os.path.join(root, dir) os.rmdir(dir_path) """ 飞书数据处理 """ @classmethod def video_task(cls, data): mark = data["mark"] name = data["name"] feishu_id = data["feishu_id"] feishu_sheet = data["feishu_sheet"] cookie_sheet = data["cookie_sheet"] pz_sheet = '500Oe0' pw_sheet = 'DgX7vC' task_data = Material.get_task_data(feishu_id, feishu_sheet) if len(task_data) == 0: Feishu.bot(mark, '机器自动改造消息通知', f'今日任务为空,请关注', name) return mark lock = threading.Lock() def process_task(task): task_mark = task["task_mark"] # 任务标示 channel_id = str(task["channel_id"]) channel_urls = str(task["channel_url"]) piaoquan_id = str(task["piaoquan_id"]) number = task["number"] # 指定条数 title = task["title"] video_share = task["video_share"] video_ending = task["video_ending"] crop_total = task["crop_total"] gg_duration_total = task["gg_duration_total"] video_path_url = cls.create_folders(mark, str(task_mark)) # 创建目录 zm = Material.get_pzsrt_data("summary", pz_sheet, video_share) # 获取srt if zm == '': Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片中标示填写错误,请关注!!!!', name) if ',' in channel_urls: channel_url = channel_urls.split(',') else: channel_url = [channel_urls] for url in channel_url: Common.logger("log").info(f"{name}的{task_mark}下的用户:{channel_url}开始获取视频") if channel_id == "抖音": data_list = DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "票圈": data_list = PQ.get_pq_url(task_mark, url, number, mark) elif channel_id == "视频号": data_list = SPH.get_sph_url(task_mark, url, number, mark) elif channel_id == "快手": data_list = KS.get_ks_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "快手创作者版": data_list = KsFeedVideo.get_data(number) if len(data_list) == 0: Common.logger("log").info(f"{name}的{task_mark}下的视频ID{url} 已经改造过了") Feishu.bot("快手创作者版", '机器自动改造消息通知', f'本轮没有获取到改造的视频链接', "王雪珂") cls.remove_files(video_path_url) continue if len(data_list) == 0: Common.logger("log").info(f"{name}的{task_mark}下的视频ID{url} 已经改造过了") Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下的用户ID{url},没有已经改造的视频了', name) cls.remove_files(video_path_url) continue Common.logger("log").info(f"{name}的{task_mark}下的ID{url} 获取视频完成,共{len(data_list)}条") try: for video in data_list: new_title = video['old_title'].strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") if title == "原标题": if new_title == "" or len(new_title) == 0 or new_title == None: new_title = '⭕分享给大家一个视频!值得细品!' elif title == "AI标题": if new_title == "" or len(new_title) == 0 or new_title == None: new_title = '⭕分享给大家一个视频!值得细品!' else: new_title = GPT4o.get_ai_title(new_title) else: if '/' in title: titles = title.split('/') else: titles = [title] new_title = random.choice(titles) v_id = video["video_id"] cover = video["cover"] video_url = video["video_url"] rule = video['rule'] old_title = video['old_title'] if old_title == "" or len(old_title) == 0 or old_title == None: old_title = '⭕分享给大家一个视频!值得细品!' time.sleep(1) pw_random_id = cls.random_id() if channel_id == "票圈": new_video_path = PQ.download_video(video_url, video_path_url, v_id) # 下载视频地址 else: new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id) # 下载视频地址 Common.logger("log").info(f"{name}的{task_mark}下的视频{url},{new_video_path}视频下载成功") if not os.path.isfile(new_video_path): Common.logger("log").info(f"{name}的{task_mark}下的视频{url},{new_video_path}视频下载失败") cls.remove_files(video_path_url) continue Common.logger("log").info(f"{name}的{task_mark}下的视频{url},{new_video_path}视频下载成功") if crop_total and crop_total != 'None': # 判断是否需要裁剪 new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id) if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长 new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id, gg_duration_total) width, height = FFmpeg.get_w_h_size(new_video_path) if width < 1920: new_video_path = FFmpeg.update_video_h_w(new_video_path, video_path_url, pw_random_id) time.sleep(1) if os.path.isfile(new_video_path): if new_title != "" or len(new_title) != 0 or new_title != None: new_title_re = re.sub(r'[^\w\s\u4e00-\u9fff,。!?]', '', new_title) if len(new_title_re) > 12: new_text = '\n'.join( [new_title_re[i:i + 12] for i in range(0, len(new_title_re), 12)]) new_video_path = FFmpeg.add_video_zm(new_video_path, video_path_url, pw_random_id, new_text) if video_ending and video_ending != 'None': if video_ending == "AI片尾引导": pw_srt_text = GPT4o.get_ai_pw(old_title) if pw_srt_text: pw_url = TTS.get_pw_zm(pw_srt_text) Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾音频成功") if pw_url: pw_mp3_path = TTS.download_mp3(pw_url, video_path_url, pw_random_id) pw_url_sec = FFmpeg.get_video_duration(pw_mp3_path) # 获取片尾秒数 Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾秒数成功{pw_url_sec}") pw_srt = TTS.getSrt(pw_url) Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾srt成功") else: Feishu.bot(mark, 'TTS获取失败提示', f'无法获取到片尾音频,及时更换token', "张勇") Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") continue else: Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") continue else: if ',' in video_ending: video_ending_list = video_ending.split(',') else: video_ending_list = [video_ending] ending = random.choice(video_ending_list) pw_list = Material.get_pwsrt_data("summary", pw_sheet, ending) # 获取srt if pw_list: pw_id = pw_list["pw_id"] pw_srt = pw_list["pw_srt"] pw_url = PQ.get_pw_url(pw_id) pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id) else: Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name) for attempt in range(3): jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg if os.path.isfile(jpg_path): Common.logger("log").info(f"{name}的{task_mark}下的视频{url},生成视频最后一帧成功") break time.sleep(1) if not os.path.isfile(jpg_path): Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},获取视频最后一帧失败,请关注', name) cls.remove_files(video_path_url) continue for attempt in range(3): Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取mp3成功") pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_mp3_path, pw_srt, pw_random_id, pw_mp3_path) # 生成片尾视频 if os.path.isfile(pw_path): Common.logger("log").info(f"{task_mark}下的视频{url},生成片尾视频成功") break time.sleep(1) if not os.path.isfile(pw_path): Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},生成片尾视频失败,请关注', name) cls.remove_files(video_path_url) continue pw_video_list = [new_video_path, pw_path] Common.logger("log").info(f"{task_mark}下的视频{url},视频与片尾开始拼接") video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起 Common.logger("log").info(f"{name}的{task_mark}下的视频{url},视频与片尾拼接成功") time.sleep(1) if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(video_path, video_path_url, zm) else: new_video_path = video_path else: if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(new_video_path, video_path_url, zm) time.sleep(1) oss_id = cls.random_id() Common.logger("log").info(f"{name}的{task_mark}下的视频{url},开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS Common.logger("log").info(f"{name}的{task_mark}下的视频{url},发送oss成功{oss_object_key}") status = oss_object_key.get("status") if status == 200: oss_object_key = oss_object_key.get("oss_object_key") time.sleep(1) jpg_path = PQ.download_video_jpg(cover, video_path_url, v_id) # 下载视频封面 if os.path.isfile(jpg_path): oss_jpg_key = Oss.stitching_fm_upload_oss(jpg_path, oss_id) # 封面发送OSS status = oss_jpg_key.get("status") if status == 200: jpg = oss_jpg_key.get("oss_object_key") else: jpg = None else: jpg = None code = PQ.insert_piaoquantv(oss_object_key, new_title, jpg, piaoquan_id) if code: Common.logger("log").info(f"{name}的{task_mark}下的视频ID{v_id}发送成功") sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(name, task_mark, channel_id, url, v_id, piaoquan_id, new_title, code, formatted_time, old_title) if title == "原标题" or title == "AI标题": values = [[name, task_mark, channel_id, url, str(v_id), piaoquan_id, old_title, title,new_title, str(code), formatted_time, str(rule)]] else: values = [[name, task_mark, channel_id, url, str(v_id), piaoquan_id, old_title, "",new_title, str(code), formatted_time, str(rule)]] # 使用锁保护表格插入操作 with lock: if name == "王雪珂": sheet = "vfhHwj" elif name == "王雪珂-1": sheet = "61kvW7" elif name == "鲁涛": sheet = "FhewlS" elif name == "范军": sheet = "B6dCfS" elif name == "余海涛": sheet = "mfBrNT" elif name == "罗情": sheet = "2J3PwN" elif name == "王玉婷": sheet = "bBHFwC" elif name == "刘诗雨": sheet = "fBdxIQ" elif name == "信欣": sheet = "lPe1eT" elif name == "快手创作者版品类推荐流": sheet = "k7l7nQ" elif name == "抖音品类账号": sheet = "Bsg5UR" Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "A2:Z2", values) else: Common.logger("warning").info(f"{name}的{task_mark}下的{url}视频{v_id} 视频发送票圈失败 {code}") cls.remove_files(video_path_url) else: cls.remove_files(video_path_url) Common.logger("warning").info(f"{name}的{task_mark}下的{url}视频{v_id} 视频发送OSS失败 ") Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name) except Exception as e: cls.remove_files(video_path_url) Common.logger("warning").warning(f"{name}的{task_mark}任务处理失败:{e}\n") batch_size = 1 with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor: index = 0 while index < len(task_data): # 计算剩余的任务数量 remaining_tasks = len(task_data) - index # 当前批次大小为剩余任务数量和批次大小中较小的一个 current_batch_size = min(batch_size, remaining_tasks) # 获取当前批次的任务 current_batch = task_data[index:index + batch_size] futures = {executor.submit(process_task, task): task for task in current_batch} for future in concurrent.futures.as_completed(futures): task = futures[future] try: future.result() print(f"Task {task['task_mark']} 完成") except Exception as exc: Common.logger("warning").warning(f"{name}的{task['task_mark']}任务处理失败:{exc}\n") print(f"Task {task['task_mark']} 异常信息: {exc}") # 移动到下一批任务 index += current_batch_size Feishu.bot(mark, '机器自动改造消息通知', f'你的任务全部完成,请关注!!!!!', name) return mark