import configparser import os import random import re import sys import threading import time from datetime import datetime import concurrent.futures from common.tts_help import TTS from common import Material, Feishu, Common, Oss from common.ffmpeg import FFmpeg from common.gpt4o_help import GPT4o from data_channel.douyin import DY from data_channel.kuaishou import KS from data_channel.kuaishouchuangzuozhe import KsFeedVideo from data_channel.piaoquan import PQ from common.sql_help import sqlCollect from data_channel.shipinhao import SPH # 读取配置文件 from data_channel.shipinhaodandian import SPHDD config = configparser.ConfigParser() config.read('./config.ini') class VideoProcessor: """ 视频处理类,包含创建文件夹、生成随机ID、删除文件和处理视频任务等方法。 """ @classmethod def create_folders(cls, mark, task_mark): """ 根据标示和任务标示创建目录 """ video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/" if not os.path.exists(video_path_url): os.makedirs(video_path_url) return video_path_url @classmethod def random_id(cls): """ 随机生成ID """ now = datetime.now() rand_num = random.randint(10000, 99999) return f"{now.strftime('%Y%m%d%H%M%S')}{rand_num}" @classmethod def remove_files(cls, video_path_url): """ 删除指定目录下的所有文件和子目录 """ if os.path.exists(video_path_url) and os.path.isdir(video_path_url): for root, dirs, files in os.walk(video_path_url): for file in files: file_path = os.path.join(root, file) os.remove(file_path) for dir in dirs: dir_path = os.path.join(root, dir) os.rmdir(dir_path) @classmethod def process_task(cls, task, mark, name, feishu_id, cookie_sheet): """ 处理单个任务 """ task_mark = task["task_mark"] channel_id = str(task["channel_id"]) channel_urls = str(task["channel_url"]) piaoquan_id = str(task["piaoquan_id"]) number = task["number"] title = task["title"] video_share = task["video_share"] video_ending = task["video_ending"] crop_total = task["crop_total"] gg_duration_total = task["gg_duration_total"] video_path_url = cls.create_folders(mark, str(task_mark)) zm = Material.get_pzsrt_data("summary", "500Oe0", video_share) if not zm: Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片中标示填写错误,请关注!!!!', name) return if ',' in channel_urls: channel_url_list = channel_urls.split(',') else: channel_url_list = [channel_urls] for url in channel_url_list: Common.logger("log").info(f"{name}的{task_mark}下的用户:{channel_url_list}开始获取视频") data_list = cls.get_data_list(channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name) if not data_list: Common.logger("log").info(f"{name}的{task_mark}下的视频ID{url} 已经改造过了") Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下的用户ID{url},没有已经改造的视频了', name) cls.remove_files(video_path_url) continue Common.logger("log").info(f"{name}的{task_mark}下的ID{url} 获取视频完成,共{len(data_list)}条") for video in data_list: try: new_title = cls.generate_title(video, title) v_id = video["video_id"] cover = video["cover"] video_url = video["video_url"] old_title = video['old_title'] rule = video['rule'] if not old_title: old_title = '⭕分享给大家一个视频!值得细品!' Common.logger("title").info(f"{name}的{task_mark}下的视频{url},标题为空,使用兜底标题生成片尾") time.sleep(1) pw_random_id = cls.random_id() new_video_path = cls.download_and_process_video(channel_id, video_url, video_path_url, v_id, crop_total, gg_duration_total, pw_random_id, new_title) if not os.path.isfile(new_video_path): Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},视频下载失败,请关注', name) cls.remove_files(video_path_url) continue if new_video_path: if video_ending and video_ending != 'None': new_video_path = cls.handle_video_ending(new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm) else: if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(new_video_path, video_path_url, zm) if not os.path.isfile(new_video_path): Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},视频改造失败,请关注', name) cls.remove_files(video_path_url) continue # 上传视频和封面,并更新数据库 values, code = cls.upload_video_and_thumbnail(new_video_path, cover, v_id, new_title, task_mark, name, piaoquan_id, video_path_url, mark, channel_id, url, old_title, title, rule) # 更新已使用的视频号状态 if name == "视频号单视频": sphdd_status = sqlCollect.update_shp_dd_vid(v_id) if sphdd_status == 1: Common.logger("log").info(f"{name}的{task_mark}下的ID{url} 视频修改已使用,状态已修改") pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 from_user_name = video['from_user_name'] # 来源用户 from_group_name = video['from_group_name'] # 来源群组 text = ( f"**站内视频链接**: {pq_url}\n" f"**来源用户**: {from_user_name}\n" f"**来源群组**: {from_group_name}\n" f"**原视频链接**: {video['video_url']}\n" f"**原视频封面**: {video['cover']}\n" f"**原视频标题**: {video['old_title']}\n" ) Feishu.finish_bot(text) if values: if name == "王雪珂": sheet = "vfhHwj" elif name == "抖音品类账号-1": sheet = "61kvW7" elif name == "鲁涛": sheet = "FhewlS" elif name == "范军": sheet = "B6dCfS" elif name == "余海涛": sheet = "mfBrNT" elif name == "罗情": sheet = "2J3PwN" elif name == "王玉婷": sheet = "bBHFwC" elif name == "刘诗雨": sheet = "fBdxIQ" elif name == "信欣": sheet = "lPe1eT" elif name == "快手创作者版品类推荐流": sheet = "k7l7nQ" elif name == "抖音品类账号": sheet = "Bsg5UR" elif name == "视频号品类账号": sheet = "b0uLWw" elif name == "视频号单视频": sheet = "ptgCXW" Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "A2:Z2", values) except Exception as e: Common.logger("error").warning(f"{name}的{task_mark}任务处理失败:{e}") cls.remove_files(video_path_url) Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name) @classmethod def get_data_list(cls, channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name): """ 根据渠道ID获取数据列表 """ if channel_id == "抖音": return DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "票圈": return PQ.get_pq_url(task_mark, url, number, mark) elif channel_id == "视频号": return SPH.get_sph_url(task_mark, url, number, mark) elif channel_id == "快手": return KS.get_ks_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "快手创作者版": return KsFeedVideo.get_data() elif channel_id == "视频号单视频": return SPHDD.get_sphdd_data(url) @classmethod def generate_title(cls, video, title): """ 生成新标题 """ if video['old_title']: new_title = video['old_title'].strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") else: return '⭕分享给大家一个视频!值得细品!' if title == "原标题": if not new_title: new_title = '⭕分享给大家一个视频!值得细品!' elif title == "AI标题": if not new_title: new_title = '⭕分享给大家一个视频!值得细品!' else: new_title = GPT4o.get_ai_title(new_title) else: titles = title.split('/') if '/' in title else [title] new_title = random.choice(titles) return new_title @classmethod def download_and_process_video(cls, channel_id, video_url, video_path_url, v_id, crop_total, gg_duration_total, pw_random_id, new_title): """ 下载并处理视频 """ if channel_id in ["票圈", "快手创作者版", "视频号单视频"]: new_video_path = PQ.download_video(video_url, video_path_url, v_id) Common.logger("log").info(f"{channel_id}视频下载成功: {new_video_path}") else: Common.logger("log").info(f"视频准备下载") new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id) Common.logger("log").info(f"视频下载成功: {new_video_path}") if os.path.isfile(new_video_path): if crop_total and crop_total != 'None': # 判断是否需要裁剪 new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id) if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长 new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id, gg_duration_total) width, height = FFmpeg.get_w_h_size(new_video_path) if width < height: # 判断是否需要修改为竖屏 new_video_path = FFmpeg.update_video_h_w(new_video_path, video_path_url, pw_random_id) new_title_re = re.sub(r'[^\w\s\u4e00-\u9fff,。!?]', '', new_title) if len(new_title_re) > 12: new_text = '\n'.join( [new_title_re[i:i + 12] for i in range(0, len(new_title_re), 12)]) new_video_path = FFmpeg.add_video_zm(new_video_path, video_path_url, pw_random_id, new_text) return new_video_path else: Common.logger("log").info(f"视频下载失败: {new_video_path}") cls.remove_files(video_path_url) return new_video_path @classmethod def handle_video_ending(cls, new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm): """ 处理视频片尾 """ if video_ending == "AI片尾引导": pw_srt_text = GPT4o.get_ai_pw(old_title) if pw_srt_text: pw_url = TTS.get_pw_zm(pw_srt_text) if pw_url: pw_mp3_path = TTS.download_mp3(pw_url, video_path_url, pw_random_id) # pw_url_sec = FFmpeg.get_video_duration(pw_mp3_path) pw_srt = TTS.getSrt(pw_url) Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾srt成功") else: Feishu.bot(mark, 'TTS获取失败提示', f'无法获取到片尾音频,及时更换token', "张勇") Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") return None else: Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") return None else: if ',' in video_ending: video_ending_list = video_ending.split(',') else: video_ending_list = [video_ending] ending = random.choice(video_ending_list) pw_list = Material.get_pwsrt_data("summary", "DgX7vC", ending) # 获取srt if pw_list: pw_id = pw_list["pw_id"] pw_srt = pw_list["pw_srt"] pw_url = PQ.get_pw_url(pw_id) pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id) else: Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name) for attempt in range(3): jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg if os.path.isfile(jpg_path): Common.logger("log").info(f"{name}的{task_mark}下的视频{url},生成视频最后一帧成功") break time.sleep(1) for attempt in range(3): Common.logger("log").info(f"{name}的{task_mark}下的视频{url},获取mp3成功") pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_mp3_path, pw_srt, pw_random_id, pw_mp3_path) # 生成片尾视频 if os.path.isfile(pw_path): Common.logger("log").info(f"{task_mark}下的视频{url},生成片尾视频成功") break time.sleep(1) pw_video_list = [new_video_path, pw_path] Common.logger("log").info(f"{task_mark}下的视频{url},视频与片尾开始拼接") video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起 Common.logger("log").info(f"{name}的{task_mark}下的视频{url},视频与片尾拼接成功") time.sleep(1) if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(video_path, video_path_url, zm) else: new_video_path = video_path return new_video_path @classmethod def upload_video_and_thumbnail(cls, new_video_path, cover, v_id, new_title, task_mark, name, piaoquan_id, video_path_url, mark, channel_id, url, old_title, title, rule): """ 上传视频和封面到OSS,并更新数据库 """ try: oss_id = cls.random_id() Common.logger("log").info(f"{name}的{task_mark},开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS Common.logger("log").info(f"{name}的{task_mark},发送oss成功{oss_object_key}") status = oss_object_key.get("status") if status == 200: oss_object_key = oss_object_key.get("oss_object_key") time.sleep(1) jpg_path = PQ.download_video_jpg(cover, video_path_url, v_id) # 下载视频封面 if os.path.isfile(jpg_path): oss_jpg_key = Oss.stitching_fm_upload_oss(jpg_path, oss_id) # 封面发送OSS status = oss_jpg_key.get("status") if status == 200: jpg = oss_jpg_key.get("oss_object_key") else: jpg = None else: jpg = None code = PQ.insert_piaoquantv(oss_object_key, new_title, jpg, piaoquan_id) Common.logger("log").info(f"{name}的{task_mark}下的视频ID{v_id}发送成功") sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(name, task_mark, channel_id, url, v_id, piaoquan_id, new_title, code, formatted_time, old_title, oss_object_key) values = [ [ name, task_mark, channel_id, url, str(v_id), piaoquan_id, old_title, title if title in ["原标题", "AI标题"] else "", new_title, str(code), formatted_time, str(rule) ] ] return values, code except Exception as e: cls.remove_files(video_path_url) Common.logger("error").warning(f"{name}的{task_mark}上传视频和封面到OSS,并更新数据库失败:{e}\n") return @classmethod def main(cls, data): """ 主函数,初始化任务并使用线程池处理任务。 """ mark = data["mark"] name = data["name"] feishu_id = data["feishu_id"] feishu_sheet = data["feishu_sheet"] cookie_sheet = data["cookie_sheet"] task_data = Material.get_task_data(feishu_id, feishu_sheet) for task in task_data: try: VideoProcessor.process_task(task, mark, name, feishu_id, cookie_sheet) except Exception as e: Common.logger("error").error(f"任务处理失败: {e}") continue # if __name__ == "__main__": # main()