import configparser import glob import os import random import re import subprocess import sys import time import urllib.parse import json import requests from datetime import datetime, timedelta from urllib.parse import urlencode from common.sql_help import sqlHelp sys.path.append(os.getcwd()) from common.db import MysqlHelper from common.material import Material from common import Common, Oss, Feishu, PQ from common.srt import SRT config = configparser.ConfigParser() config.read('./config.ini') # 替换为您的配置文件路径 class AGC(): """清除文件下所有mp4文件""" @classmethod def clear_mp4_files(cls, folder_path): # 获取文件夹中所有扩展名为 '.mp4' 的文件路径列表 mp4_files = glob.glob(os.path.join(folder_path, '*.mp4')) if not mp4_files: return # 遍历并删除所有 .mp4 文件 for mp4_file in mp4_files: os.remove(mp4_file) print(f"文件夹 '{folder_path}' 中的所有 .mp4 文件已清空。") """ 站外视频拼接 """ @classmethod def zw_concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path): video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark) Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}") if video_files == "": return "" print(f"{mark}的{platform}:开始拼接视频喽~~~") Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~") if os.path.exists(s_path): # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" else: start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(audio_duration) with open(s_path, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n") # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" # 背景色参数 background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill" VIDEO_COUNTER = 0 FF_INPUT = "" FF_SCALE = "" FF_FILTER = "" ffmpeg_cmd = ["ffmpeg"] for videos in video_files: Common.logger("video").info(f"{mark}的{platform}视频:{videos[3]}") # 添加输入文件 FF_INPUT += f" -i {videos[3]}" # 为每个视频文件统一长宽,并设置SAR(采样宽高比) FF_SCALE += f"[{VIDEO_COUNTER}:v]scale=320x480,setsar=1[v{VIDEO_COUNTER}];" # 为每个视频文件创建一个输入流,并添加到-filter_complex参数中 FF_FILTER += f"[v{VIDEO_COUNTER}][{VIDEO_COUNTER}:a]" # 增加视频计数器 VIDEO_COUNTER += 1 # 构建最终的FFmpeg命令 ffmpeg_cmd.extend(FF_INPUT.split()) ffmpeg_cmd.extend(["-filter_complex", f"{FF_SCALE}{FF_FILTER}concat=n={VIDEO_COUNTER}:v=1:a=1[v][a]", "-map", "[v]", "-map", "[a]", v_path]) # 多线程数 num_threads = 4 # 构建 FFmpeg 命令,生成视频 ffmpeg_cmd_oss = [ "ffmpeg", "-i", v_path, # 视频文件列表 "-i", audio_video, # 音频文件 "-c:v", "libx264", # 复制视频流 "-c:a", "aac", # 编码音频流为AAC "-threads", str(num_threads), "-vf", f"{background_cmd},{subtitle_cmd}", # 添加背景色和字幕 "-t", str(int(audio_duration)), # 保持与音频时长一致 "-map", "0:v:0", # 映射第一个输入的视频流 "-map", "1:a:0", # 映射第二个输入的音频流 "-y", # 覆盖输出文件 v_oss_path ] try: subprocess.run(ffmpeg_cmd) if os.path.isfile(v_path): subprocess.run(ffmpeg_cmd_oss) print("视频处理完成!") except subprocess.CalledProcessError as e: print(f"视频处理失败:{e}") print(f"{mark}的{platform}:视频拼接成功啦~~~") Common.logger("video").info(f"{mark}的{platform}:视频拼接成功啦~~~") return video_files """视频秒数转换""" @classmethod def seconds_to_srt_time(cls, seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 milliseconds = int((seconds - int(seconds)) * 1000) return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}" """ 获取视频文件的时长(秒) """ @classmethod def get_video_duration(cls, video_file): result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", video_file], capture_output=True, text=True) return float(result.stdout) """计算需要拼接的视频""" @classmethod def concat_videos_with_subtitles(cls, videos, audio_duration, platform, mark): # 计算视频文件列表总时长 if platform == "爆款" or platform == "跟随": total_video_duration = sum(cls.get_video_duration(video_file) for video_file in videos) else: total_video_duration = sum(cls.get_video_duration(video_file[3]) for video_file in videos) if platform == "爆款" or platform == "跟随": # 视频时长大于音频时长 if total_video_duration > audio_duration: return videos # 计算音频秒数与视频秒数的比率,然后加一得到需要的视频数量 video_audio_ratio = audio_duration / total_video_duration videos_needed = int(video_audio_ratio) + 2 trimmed_video_list = videos * videos_needed return trimmed_video_list else: # 如果视频总时长小于音频时长,则不做拼接 if total_video_duration < audio_duration: Common.logger("video").info(f"{mark}的{platform}渠道时长小于等于目标时长,不做视频拼接") return "" # 如果视频总时长大于音频时长,则截断视频 trimmed_video_list = [] remaining_duration = audio_duration for video_file in videos: video_duration = cls.get_video_duration(video_file[3]) if video_duration <= remaining_duration: # 如果视频时长小于或等于剩余时长,则将整个视频添加到列表中 trimmed_video_list.append(video_file) remaining_duration -= video_duration else: trimmed_video_list.append(video_file) break return trimmed_video_list """ text文件没有则创建目录 """ @classmethod def bk_text_folders(cls, mark): oss_id = cls.random_id() v_text_url = config['PATHS']['VIDEO_PATH'] + mark + "/text/" if not os.path.exists(v_text_url): os.makedirs(v_text_url) # srt 文件地址 text_path = v_text_url + mark + f"{str(oss_id)}.text" return text_path """ 站内视频拼接 """ @classmethod def zn_concatenate_videos(cls, videos, audio_duration, audio_video, platform, s_path, mark, v_oss_path): text_ptah = cls.bk_text_folders(mark) video_files = cls.concat_videos_with_subtitles(videos, audio_duration, platform, mark) with open(text_ptah, 'w') as f: for file in video_files: f.write(f"file '{file}'\n") Common.logger("video").info(f"{mark}的{platform}视频文件:{video_files}") if video_files == "": return "" print(f"{mark}的{platform}:开始拼接视频喽~~~") Common.logger("video").info(f"{mark}的{platform}:开始拼接视频喽~~~") if os.path.exists(s_path): # subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=11,Fontname=Hiragino Sans GB,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" else: start_time = cls.seconds_to_srt_time(0) end_time = cls.seconds_to_srt_time(audio_duration) with open(s_path, 'w') as f: f.write(f"1\n{start_time} --> {end_time}\n分享、转发给群友\n") # subtitle_cmd = "drawtext=text='分享、转发给群友':fontsize=28:fontcolor=black:x=(w-text_w)/2:y=h-text_h-15" subtitle_cmd = f"subtitles={s_path}:force_style='Fontsize=12,Fontname=wqy-zenhei,Bold=1,Outline=0,PrimaryColour=&H000000,SecondaryColour=&H000000'" # 背景色参数 background_cmd = "drawbox=y=ih-65:color=yellow@1.0:width=iw:height=0:t=fill" # 多线程数 num_threads = 4 # 构建 FFmpeg 命令,生成视频 ffmpeg_cmd_oss = [ "ffmpeg", "-f", "concat", "-safe", "0", "-i", f"{text_ptah}", # 视频文件列表 "-i", audio_video, # 音频文件 "-c:v", "libx264", "-c:a", "aac", "-threads", str(num_threads), "-vf", f"scale=320x480,{background_cmd},{subtitle_cmd}", # 添加背景色和字幕 "-t", str(int(audio_duration)), # 保持与音频时长一致 "-map", "0:v:0", # 映射第一个输入的视频流 "-map", "1:a:0", # 映射第二个输入的音频流 "-y", # 覆盖输出文件 v_oss_path ] try: subprocess.run(ffmpeg_cmd_oss) print("视频处理完成!") if os.path.isfile(text_ptah): os.remove(text_ptah) except subprocess.CalledProcessError as e: print(f"视频处理失败:{e}") print(f"{mark}:视频拼接成功啦~~~") Common.logger("video").info(f"{mark}:视频拼接成功啦~~~") return v_oss_path """ 获取视频时长 """ @classmethod def get_audio_duration(cls, video_url): ffprobe_cmd = [ "ffprobe", "-i", video_url, "-show_entries", "format=duration", "-v", "quiet", "-of", "csv=p=0" ] output = subprocess.check_output(ffprobe_cmd).decode("utf-8").strip() return float(output) """ 创建临时字幕 """ @classmethod def create_subtitle_file(cls, srt, s_path): with open(s_path, 'w') as f: f.write(srt) """ 随机生成id """ @classmethod def random_id(cls): now = datetime.now() rand_num = random.randint(10000, 99999) oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num) return oss_id """ 文件没有则创建目录 """ @classmethod def create_folders(cls, mark): oss_id = cls.random_id() video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" # srt 目录 s_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/srt/" # oss 目录 v_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/oss/" if not os.path.exists(video_path_url): os.makedirs(video_path_url) if not os.path.exists(s_path_url): os.makedirs(s_path_url) if not os.path.exists(v_path_url): os.makedirs(v_path_url) # srt 文件地址 s_path = s_path_url + mark + f"{str(oss_id)}.srt" # 最终生成视频地址 v_path = v_path_url + mark + f"{str(oss_id)}.mp4" v_oss_path = v_path_url + mark + f"{str(oss_id)}oss.mp4" return s_path, v_path, video_path_url, v_oss_path """ 获取未使用的数据 """ @classmethod def get_unique_uid_data(cls, data, count): unique_data_dict = {item['uid']: item for item in data} unique_data = list(unique_data_dict.values()) if count > len(unique_data): return unique_data else: selected_items = [] selected_uids = set() while len(selected_items) < count: # 随机选择一个元素 item = random.choice(unique_data) uid = item['uid'] if uid not in selected_uids: # 如果该uid还未被选择过,则将该元素添加到选中项列表中,并记录已选择的uid selected_items.append(item) selected_uids.add(uid) return selected_items """ 任务处理 """ @classmethod def video(cls, data, platform): mark_name = data['mark_name'] # 负责人 if platform == "爆款": pq_ids = data["pq_id"] pq_ids_list = pq_ids.split(',') # 账号ID mark = data["mark"] # 标示 feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] # 脚本sheet list_data = Material.get_allbk_data(feishu_id, video_call) if len(list_data) == 0: Feishu.bot('recommend', 'AGC完成通知', f'爆款任务数为0,不做拼接', mark, mark_name) return mark elif platform == "常规": pq_ids = data["pq_id"]# 账号ID pq_ids_list = pq_ids.split(',') mark = data["mark"] feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] # 脚本sheet video_count = data["video_count"] if int(video_count) == 0: Feishu.bot('recommend', 'AGC完成通知', f'常规任务数为{video_count},不做拼接', mark, mark_name) return mark data_list, videos_mark = Material.get_all_data(feishu_id, video_call, mark) list_data = cls.get_unique_uid_data(data_list, int(video_count)) elif platform == "跟随": pq_ids = data["pq_id"] pq_ids_list = pq_ids.split(',') # 账号ID mark = data["mark"] # 标示 feishu_id = data["feishu_id"] # 飞书文档ID video_call = data["video_call"] video_count = data["video_count"] if int(video_count) == 0: Feishu.bot('recommend', 'AGC完成通知', f'跟随任务数为{video_count},不做拼接', mark, mark_name) return mark data_list, videos = Material.get_all_data(feishu_id, video_call, mark) list_data = cls.get_unique_uid_data(data_list, int(video_count)) s_path, v_path, video_path_url, v_oss_path = cls.create_folders(mark) count = 0 for d_list in list_data: try: uid = d_list['uid'] # 音频id srt = d_list['text'] # srt cover = d_list['cover'] audio_title = d_list['title'] if srt: # 创建临时字幕文件 cls.create_subtitle_file(srt, s_path) Common.logger("bk_video").info(f"S{mark} 文件目录创建成功") else: srt_new = SRT.getSrt(int(uid)) if srt_new: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [[mark, str(uid), srt_new, formatted_time]] random_wait_time = random.uniform(0.5, 2.5) time.sleep(random_wait_time) Feishu.insert_columns("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "ROWS", 1, 2) time.sleep(random_wait_time) Feishu.update_values("IbVVsKCpbhxhSJtwYOUc8S1jnWb", "jd9qD9", "A2:Z2", values) # 创建临时字幕文件 cls.create_subtitle_file(srt_new, s_path) Common.logger("video").info(f"S{mark}的{platform}渠道RT 文件目录创建成功") # 获取音频 audio_video = PQ.get_audio_url(uid) Common.logger("video").info(f"{mark}的{platform}渠道获音频成功") audio_duration = cls.get_audio_duration(audio_video) Common.logger("video").info(f"{mark}的{platform}渠道获取需要拼接的音频秒数为:{audio_duration}") if platform != "常规": if platform == "爆款": videos = d_list['video'] if ',' in videos: videos = str(videos).split(',') else: videos = [str(videos)] video_id = random.choice(videos) video_url = PQ.get_audio_url(video_id) download_video = Oss.download_url(video_url, video_path_url, str(video_id)) if download_video: video_files = cls.zn_concatenate_videos(download_video, audio_duration, audio_video, platform, s_path, mark, v_oss_path) if os.path.isfile(v_oss_path): Common.logger("video").info(f"{mark}的{platform}渠道新视频生成成功") else: Common.logger("video").info(f"{mark}的{platform}渠道新视频生成失败") continue else: chnnel_count = int(len(list_data)/2) if chnnel_count >= count: channel = "kuaishou" else: channel = "douyin" user_id = sqlHelp.get_user_id(channel, mark) url_list, user = sqlHelp.get_url_list(user_id, mark, "35") videos = [list(item) for item in url_list] videos = Oss.get_oss_url(videos, video_path_url) video_files = cls.zw_concatenate_videos(videos, audio_duration, audio_video, platform, s_path, v_path, mark, v_oss_path) if video_files == "": Common.logger("video").info(f"{mark}的{platform}渠道使用拼接视频为空") continue if os.path.isfile(v_oss_path): Common.logger("video").info(f"{mark}的{platform}渠道新视频生成成功") else: Common.logger("video").info(f"{mark}的{platform}渠道新视频生成失败") continue # 随机生成视频oss_id oss_id = cls.random_id() Common.logger("video").info(f"{mark}的{platform}渠道上传到 OSS 生成视频id为:{oss_id}") oss_object_key = Oss.stitching_sync_upload_oss(v_oss_path, oss_id) status = oss_object_key.get("status") if status == 200: # 获取 oss 视频地址 oss_object_key = oss_object_key.get("oss_object_key") Common.logger("video").info(f"{mark}的{platform}渠道拼接视频发送成功,OSS 地址:{oss_object_key}") time.sleep(10) if platform == "常规": # 已使用视频存入数据库 Common.logger("video").info(f"{mark}的{platform}渠道开始已使用视频存入数据库") sqlHelp.insert_videoAudio(video_files, uid, platform, mark) Common.logger("video").info(f"{mark}的{platform}渠道完成已使用视频存入数据库") Common.logger("video").info(f"{mark}的{platform}渠道开始视频添加到对应用户") new_video_id, title = PQ.insert_piaoquantv(oss_object_key, audio_title, pq_ids_list, cover, uid) if new_video_id: Common.logger("video").info(f"{mark}的{platform}渠道视频添加到对应用户成功") if os.path.isfile(v_oss_path): os.remove(v_oss_path) if os.path.isfile(v_path): os.remove(v_path) if os.path.isfile(s_path): os.remove(s_path) # 清空所有mp4数据 cls.clear_mp4_files(video_path_url) count += 1 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") if platform == "常规": third_chars = [j[2] for j in video_files] data = ",".join(third_chars) user_id = user else: user_id = video_id data = '' values = [[mark, str(uid), str(user_id), data, title, new_video_id, formatted_time]] if mark_name == "穆新艺": sheet = '50b8a1' elif mark_name == "信欣": sheet = 'UyVK7y' elif mark_name == "范军": sheet = 'uP3zbf' elif mark_name == "鲁涛": sheet = 'iDTHt4' elif mark_name == "余海涛": sheet = 'R1jIeT' elif mark_name == "罗情": sheet = 'iuxfAt' Feishu.insert_columns("LAn9so7E0hxRYht2UMEcK5wpnMj", sheet, "ROWS", 1, 2) random_wait_time = random.uniform(0.5, 2.5) time.sleep(random_wait_time) Feishu.update_values("LAn9so7E0hxRYht2UMEcK5wpnMj", sheet, "A2:Z2", values) except Exception as e: Common.logger("bk_video").warning(f"{mark}的视频拼接失败:{e}\n") continue if "-" in mark: name = mark.split("-")[0] else: name = mark Feishu.bot('recommend', 'AGC完成通知', f'今日{platform}任务拼接任务完成,共{count}条', name, mark_name)