import configparser import os import random import subprocess import sys import time import shutil from datetime import datetime from common.sql_help import sqlHelp sys.path.append(os.getcwd()) from common.material import Material from common import Common, Oss, Feishu, PQ from common.srt import SRT config = configparser.ConfigParser() config.read('./config.ini') # 替换为您的配置文件路径 class AGC(): """清除文件下所有mp4文件""" @classmethod def clear_mp4_files(cls, mark): video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" # 获取文件夹中所有扩展名为 '.mp4' 的文件路径列表 if os.path.exists(video_path_url): # 列出目录中的所有文件和文件夹 for filename in os.listdir(video_path_url): file_path = os.path.join(video_path_url, filename) try: # 如果是文件,则删除 if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) # 如果是文件夹,则删除文件夹及其内容 elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: print(f"Failed to delete {file_path}. Reason: {e}") print(f"{mark}的文件已清空。") """ 站外视频拼接 """ @classmethod def zw_concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path): Common.logger("video").info(f"{mark}的{platform}渠道获取待拼接视频") video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark) Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}") if video_files == "": return "" print(f"{mark}的{platform}:开始拼接视频喽~~~") Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~") if os.path.exists(s_path): # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" else: start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(audio_duration) with open(s_path, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n") # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" # 背景色参数 background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill" VIDEO_COUNTER = 0 FF_INPUT = "" FF_SCALE = "" FF_FILTER = "" ffmpeg_cmd = ["ffmpeg"] for videos in video_files: Common.logger("video").info(f"{mark}的{platform}视频:{videos[3]}") # 添加输入文件 FF_INPUT += f" -i {videos[3]}" # 为每个视频文件统一长宽,并设置SAR(采样宽高比) FF_SCALE += f"[{VIDEO_COUNTER}:v]scale=320x480,setsar=1[v{VIDEO_COUNTER}];" # 为每个视频文件创建一个输入流,并添加到-filter_complex参数中 FF_FILTER += f"[v{VIDEO_COUNTER}][{VIDEO_COUNTER}:a]" # 增加视频计数器 VIDEO_COUNTER += 1 # 构建最终的FFmpeg命令 ffmpeg_cmd.extend(FF_INPUT.split()) ffmpeg_cmd.extend(["-filter_complex", f"{FF_SCALE}{FF_FILTER}concat=n={VIDEO_COUNTER}:v=1:a=1[v][a]", "-map", "[v]", "-map", "[a]", v_path]) # 多线程数 num_threads = 4 # 构建 FFmpeg 命令,生成视频 ffmpeg_cmd_oss = [ "ffmpeg", "-i", v_path, # 视频文件列表 "-i", audio_video, # 音频文件 "-c:v", "libx264", # 复制视频流 "-c:a", "aac", # 编码音频流为AAC "-threads", str(num_threads), "-vf", f"{background_cmd},{subtitle_cmd}", # 添加背景色和字幕 "-t", str(int(audio_duration)), # 保持与音频时长一致 "-map", "0:v:0", # 映射第一个输入的视频流 "-map", "1:a:0", # 映射第二个输入的音频流 "-y", # 覆盖输出文件 v_oss_path ] try: timeout_seconds = 25 * 60 subprocess.run(ffmpeg_cmd, timeout=timeout_seconds) if os.path.isfile(v_path): subprocess.run(ffmpeg_cmd_oss, timeout=timeout_seconds) print("视频处理完成!") except subprocess.TimeoutExpired: print("视频处理超时,处理失败!") return "" except subprocess.CalledProcessError as e: print(f"视频处理失败:{e}") return "" print(f"{mark}的{platform}:视频拼接成功啦~~~") Common.logger("video").info(f"{mark}的{platform}:视频拼接成功啦~~~") return video_files """视频秒数转换""" @classmethod def seconds_to_srt_time(cls, seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 milliseconds = int((seconds - int(seconds)) * 1000) return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}" """ 获取视频文件的时长(秒) """ @classmethod def get_video_duration(cls, video_file): result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_file], capture_output=True, text=True) try: return float(result.stdout.strip()) except ValueError: print(f"{video_file}视频无法从输出中解析时长:'{result.stdout.strip()}'") return 0 """计算需要拼接的视频""" @classmethod def concat_videos_with_subtitles(cls, videos, audio_duration, platform, mark): # 计算视频文件列表总时长 if platform == "爆款" or platform == "跟随": total_video_duration = sum(cls.get_video_duration(video_file) for video_file in videos) else: total_video_duration = sum(cls.get_video_duration(video_file[3]) for video_file in videos) Common.logger("video").info(f"{mark}的{platform}渠道总时长{total_video_duration}") if platform == "爆款" or platform == "跟随": # 视频时长大于音频时长 if total_video_duration > audio_duration: return videos # 计算音频秒数与视频秒数的比率,然后加一得到需要的视频数量 video_audio_ratio = audio_duration / total_video_duration videos_needed = int(video_audio_ratio) + 2 trimmed_video_list = videos * videos_needed return trimmed_video_list else: # 如果视频总时长小于音频时长,则不做拼接 if total_video_duration < audio_duration: Common.logger("video").info(f"{mark}的{platform}渠道时长小于等于目标时长,不做视频拼接") return "" # 如果视频总时长大于音频时长,则截断视频 trimmed_video_list = [] remaining_duration = audio_duration for video_file in videos: video_duration = cls.get_video_duration(video_file[3]) if video_duration <= remaining_duration: # 如果视频时长小于或等于剩余时长,则将整个视频添加到列表中 trimmed_video_list.append(video_file) remaining_duration -= video_duration else: trimmed_video_list.append(video_file) break return trimmed_video_list """ text文件没有则创建目录 """ @classmethod def bk_text_folders(cls, mark): oss_id = cls.random_id() v_text_url = config['PATHS']['VIDEO_PATH'] + mark + "/text/" if not os.path.exists(v_text_url): os.makedirs(v_text_url) # srt 文件地址 text_path = v_text_url + mark + f"{str(oss_id)}.text" return text_path """ 站内视频拼接 """ @classmethod def zn_concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, mark, v_oss_path): text_ptah = cls.bk_text_folders(mark) video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark) if video_files == "": return "" with open(text_ptah, 'w') as f: for file in video_files: f.write(f"file '{file}'\n") Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}") print(f"{mark}的{platform}:开始拼接视频喽~~~") Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~") if os.path.exists(s_path): # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" else: start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(audio_duration) with open(s_path, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n") # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" # 背景色参数 background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill" # 多线程数 num_threads = 4 # 构建 FFmpeg 命令,生成视频 ffmpeg_cmd_oss = [ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{text_ptah}", # 视频文件列表 "-i", audio_video, # 音频文件 "-c:v", "libx264", "-c:a", "aac", "-threads", str(num_threads), "-vf", f"scale=320x480,{background_cmd},{subtitle_cmd}", # 添加背景色和字幕 "-t", str(int(audio_duration)), # 保持与音频时长一致 "-map", "0:v:0", # 映射第一个输入的视频流 "-map", "1:a:0", # 映射第二个输入的音频流 "-y", # 覆盖输出文件 v_oss_path ] try: subprocess.run(ffmpeg_cmd_oss) print("视频处理完成!") if os.path.isfile(text_ptah): os.remove(text_ptah) except subprocess.CalledProcessError as e: print(f"视频处理失败:{e}") print(f"{mark}:视频拼接成功啦~~~") Common.logger("video").info(f"{mark}:视频拼接成功啦~~~") return v_oss_path """ 获取视频时长 """ @classmethod def get_audio_duration(cls, video_url): ffprobe_cmd = [ "ffprobe", "-i", video_url, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0" ] output = subprocess.check_output(ffprobe_cmd).decode("utf-8").strip() return float(output) """ 创建临时字幕 """ @classmethod def create_subtitle_file(cls, srt, s_path): with open(s_path, 'w') as f: f.write(srt) """ 随机生成id """ @classmethod def random_id(cls): now = datetime.now() rand_num = random.randint(10000, 99999) oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num) return oss_id """ 文件没有则创建目录 """ @classmethod def create_folders(cls, mark): oss_id = cls.random_id() video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" # srt 目录 s_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/srt/" # oss 目录 v_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/oss/" if not os.path.exists(video_path_url): os.makedirs(video_path_url) if not os.path.exists(s_path_url): os.makedirs(s_path_url) if not os.path.exists(v_path_url): os.makedirs(v_path_url) # srt 文件地址 s_path = s_path_url + mark + f"{str(oss_id)}.srt" # 最终生成视频地址 v_path = v_path_url + mark + f"{str(oss_id)}.mp4" v_oss_path = v_path_url + mark + f"{str(oss_id)}oss.mp4" return s_path, v_path, video_path_url, v_oss_path """ 获取未使用的数据 """ @classmethod def get_unique_uid_data(cls, data, count): unique_data_dict = {item['uid']: item for item in data} unique_data = list(unique_data_dict.values()) if count >= len(unique_data): return unique_data else: selected_items = [] selected_uids = set() while len(selected_items) < count: # 随机选择一个元素 item = random.choice(unique_data) uid = item['uid'] if uid not in selected_uids: # 如果该uid还未被选择过,则将该元素添加到选中项列表中,并记录已选择的uid selected_items.append(item) selected_uids.add(uid) return selected_items """ 任务处理 """ @classmethod def video(cls, data, platform): mark_name = data['mark_name'] # 负责人 mark = data["mark"] # 标示 if "-" in mark: name = mark.split("-")[0] else: name = mark if platform == "爆款": pq_ids = data["pq_id"] pq_ids_list = pq_ids.split(',') # 账号ID feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] # 脚本sheet list_data = Material.get_allbk_data(feishu_id, video_call) if len(list_data) == 0: Feishu.bot('recommend', 'AGC完成通知', f'{platform}任务数为0,不做拼接', name, mark_name) return elif platform == "常规": pq_ids = data["pq_id"]# 账号ID pq_ids_list = pq_ids.split(',') feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] # 脚本sheet video_count = data["video_count"] if int(video_count) == 0: Feishu.bot('recommend', 'AGC完成通知', f'{platform}任务数为{video_count},不做拼接', name, mark_name) return mark data_list, videos_mark = Material.get_all_data(feishu_id, video_call, mark) list_data = cls.get_unique_uid_data(data_list, int(video_count)) elif platform == "跟随": pq_ids = data["pq_id"] pq_ids_list = pq_ids.split(',') # 账号ID feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] video_count = data["video_count"] if int(video_count) == 0: Feishu.bot('recommend', 'AGC完成通知', f'{platform}任务数为{video_count},不做拼接', name, mark_name) return mark data_list, videos = Material.get_all_data(feishu_id, video_call, mark) list_data = cls.get_unique_uid_data(data_list, int(video_count)) count = 0 pj_count = 0 error_count = 0 total_count = 0 while True: # 清空所有文件 cls.clear_mp4_files(mark) time.sleep(10) s_path, v_path, video_path_url, v_oss_path = cls.create_folders(mark) if pj_count == 10: Feishu.bot('recommend', 'AGC拼接画面不足通知', f'{platform}任务视频画面不足,请及时补充!!', name, mark_name) return if count == len(list_data): break try: d_list = list_data[count] uid = d_list['uid'] # 音频id cover = d_list['cover'] audio_title = d_list['title'] srt_new = SRT.getSrt(int(uid)) Common.logger("video").info(f"S{mark}的{platform}渠道音频ID{uid}") if error_count == 5: Common.logger("video").info(f"{platform}渠道音频id为{uid},任务处理异常5次,该任务跳过,请检查格式,{name}") # Feishu.bot('recommend', 'AGC异常通知', f'{platform}渠道音频id为{uid},任务处理异常5次,该任务跳过,请检查格式!', name, mark_name) count += 1 if platform == "常规": time.sleep(300) continue if srt_new: # 创建临时字幕文件 cls.create_subtitle_file(srt_new, s_path) Common.logger("video").info(f"S{mark}的{platform}渠道SRT 文件目录创建成功") try: # 获取音频 audio_video = PQ.get_audio_url(uid) Common.logger("video").info(f"{mark}的{platform}渠道获音频成功{audio_video}") audio_duration = cls.get_audio_duration(audio_video) Common.logger("video").info(f"{mark}的{platform}渠道获音频秒数{audio_duration}") if int(audio_duration) >= 300: print(f"{mark}的{platform}渠道获取需要拼接的音频秒数为:{audio_duration},超过300秒不做拼接") count += 1 continue Common.logger("video").info(f"{mark}的{platform}渠道获取需要拼接的音频秒数为:{audio_duration}") except Exception as e: count += 1 if platform == "常规": time.sleep(150) # 清空所有mp4数据 cls.clear_mp4_files(mark) Common.logger("video1").info(f"{platform}任务下,{uid}音频格式填写错误该任务跳过,请关注,{name},异常信息:{e}") # Feishu.bot('recommend', 'AGC异常通知', f'{platform}任务下,{uid}音频格式填写错误该任务跳过,请关注', name, mark_name) continue if platform != "常规": if platform == "爆款": videos = str(d_list['video']) if ',' in videos: videos = str(videos).split(',') else: videos = [str(videos)] video_id = random.choice(videos) video_url = PQ.get_audio_url(video_id) time.sleep(10) Common.logger("video").info(f"{mark}的{platform}渠道获取视频链接成功") download_video = Oss.download_url(video_url, video_path_url, str(video_id)) if download_video: video_files = cls.zn_concatenate_videos(download_video, audio_duration, audio_video, platform, s_path, mark, v_oss_path) if os.path.isfile(v_oss_path): Common.logger("video").info(f"{mark}的{platform}渠道新视频生成成功") else: Common.logger("video").info(f"{mark}的{platform}渠道新视频生成失败") continue else: # chnnel_count = int(len(list_data)/2) channels = ["douyin", "kuaishou"] channel = random.choice(channels) user_id = sqlHelp.get_user_id(channel, mark) url_list, user = sqlHelp.get_url_list(user_id, mark, "50") videos = [list(item) for item in url_list] Common.logger("video").info(f"{mark}的{platform}渠道下载视频") videos = Oss.get_oss_url(videos, video_path_url) time.sleep(10) Common.logger("video").info(f"{mark}的{platform}渠道下载视频成功") video_files = cls.zw_concatenate_videos(videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path) if video_files == "": pj_count += 1 Common.logger("video").info(f"{mark}的{platform}渠道使用拼接视频为空") continue if os.path.isfile(v_oss_path): Common.logger("video").info(f"{mark}的{platform}渠道新视频生成成功") else: Common.logger("video").info(f"{mark}的{platform}渠道新视频生成失败") continue pj_count = 0 error_count = 0 # 随机生成视频oss_id oss_id = cls.random_id() Common.logger("video").info(f"{mark}的{platform}渠道上传到 OSS 生成视频id为:{oss_id}") oss_object_key = Oss.stitching_sync_upload_oss(v_oss_path, oss_id) status = oss_object_key.get("status") if status == 200: # 获取 oss 视频地址 oss_object_key = oss_object_key.get("oss_object_key") Common.logger("video").info(f"{mark}的{platform}渠道拼接视频发送成功,OSS 地址:{oss_object_key}") time.sleep(10) if platform == "常规": # 已使用视频存入数据库 Common.logger("video").info(f"{mark}的{platform}渠道开始已使用视频存入数据库") sqlHelp.insert_videoAudio(video_files, uid, platform, mark) Common.logger("video").info(f"{mark}的{platform}渠道完成已使用视频存入数据库") Common.logger("video").info(f"{mark}的{platform}渠道开始视频添加到对应用户") new_video_id, title = PQ.insert_piaoquantv(oss_object_key, audio_title, pq_ids_list, cover, uid) if new_video_id: Common.logger("video").info(f"{mark}的{platform}渠道视频添加到对应用户成功") count += 1 total_count += 1 if mark_name == "穆新艺": sheet = '50b8a1' elif mark_name == "信欣": sheet = 'UyVK7y' elif mark_name == "范军": sheet = 'uP3zbf' elif mark_name == "鲁涛": sheet = 'iDTHt4' elif mark_name == "余海涛": sheet = 'R1jIeT' elif mark_name == "罗情": sheet = 'iuxfAt' current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") if platform == "常规": third_chars = [j[2] for j in video_files] data = ",".join(third_chars) user_id = user else: user_id = video_id data = '' values = [[mark, str(uid), str(user_id), data, title, new_video_id, formatted_time]] Feishu.insert_columns("LAn9so7E0hxRYht2UMEcK5wpnMj", sheet, "ROWS", 1, 2) random_wait_time = random.uniform(0.5, 2.5) time.sleep(random_wait_time) Feishu.update_values("LAn9so7E0hxRYht2UMEcK5wpnMj", sheet, "A2:Z2", values) if srt_new: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[mark, str(uid), srt_new, formatted_time]] random_wait_time = random.uniform(0.5, 2.5) time.sleep(random_wait_time) Feishu.insert_columns("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "ROWS", 1, 2) time.sleep(random_wait_time) Feishu.update_values("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "A2:Z2", values) if platform == "常规": time.sleep(300) except Exception as e: error_count += 1 Common.logger("video").warning(f"{mark}的视频拼接失败:{e}\n") # 清空所有mp4数据 cls.clear_mp4_files(mark) continue Feishu.bot('recommend', 'AGC完成通知', f'今日{platform}任务拼接任务完成,共{total_count}条', name, mark_name)