import json import os import random import re import sys import time import uuid from datetime import datetime import orjson from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger sys.path.append('/app') from utils.redis import RedisHelper from utils.aliyun_log import AliyunLogger from utils.aliyun_oss import Oss from utils.download_video import DownLoad from utils.dy_ks_get_url import Dy_KS from utils.feishu_form import Material from utils.feishu_utils import Feishu from utils.ffmpeg import FFmpeg from utils.gpt4o_mini_help import GPT4oMini from utils.piaoquan import PQ from utils.sql_help import sqlCollect from utils.tag_video import Tag from utils.tts_help import TTS from utils.google_ai_studio import GoogleAI CACHE_DIR = '/app/cache/' # CACHE_DIR = '/Users/z/Downloads/' class ConsumptionRecommend(object): @classmethod def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark, sub_crawler_src_code): logger.info(f"[+] 开始写入票圈") n_ids = str(data["pq_ids"]) if ',' in n_ids: n_id_list = n_ids.split(',') else: n_id_list = [n_ids] pq_list = [] for n_id in n_id_list: code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None, sub_crawler_src_code) if not code: logger.error(f"[+] 写入票圈后台失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,写入票圈后台失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频写入票圈后台失败,视频ID{code}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") continue pq_list.append(code) logger.info(f"[+] 写入票圈成功,返回视频id{code}") tag_status = Tag.video_tag(code, str(tags)) if tag_status == 0: logger.info(f"[+] 写入标签成功,后台视频ID为{code}") try: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(data["name"], task_mark, tag_transport_channel, data["video_url"], data["video_url"], data["pq_ids"], data["title_category"], code, formatted_time, data["title_category"], oss_object_key) pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 values = [ [ str(code), str(n_id), formatted_time, channel_mark, data["name"], data["pq_ids"], data["pq_label"], data["activate_data"], data["video_url"], data["title_category"], tag_transport_channel, data["tag_transport_scene"], data["tag_transport_keyword"], data["tag"], data["transform_rule"], data["video_share"], data["trailer_share"], data["trailer_share_audio"], data["video_clipping"], data["video_clipping_time"], data["title_transform"], pq_url ] ] name_to_sheet = { "范军": "276ffc", "鲁涛": "QqrKRY", "余海涛": "dTzUlI", "罗情": "8JPv9g", "刘诗雨": "HqwG0o", "王媛": "vtWvle", "周仙琴": "MWUqWt", "王雪珂": "xN1KrU", "信欣": "PtoeGT", "邓锋": "dgV2Af", "王知微":"QDyCg6", "刘兆恒": "vtRMGX", "张博": "LEZMRr", "子涵": "tCw56r" } name = re.sub(r"\s+", "", data.get("name", "")) sheet = name_to_sheet.get(name) Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "A2:Z2", values) logger.info(f"[处理] 写入飞书成功") except Exception as e: logger.error(f"[处理] 写入飞书失败{e}") pass AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造成功", "1000", str(data), str(pq_list)) return @classmethod def data_handle(cls, data, file_path, redis_name,studio_key): url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具") if url == "重新处理" or not url: RedisHelper().get_client().rpush(redis_name, json.dumps(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 没有获取到视频链接,等待重新处理\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 搬运&改造效率工具失败通知 】") return elif url == "作品不存在" or url == "链接不是抖/快" or url == "note": if url == "note": url = "图文" text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: {url},不做处理\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 搬运&改造效率工具失败通知 】") return AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "扫描到一条视频", "2001", str(data)) AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "符合规则等待改造", "2004", str(data)) logger.info(f"[处理] {url}开始下载视频") video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] {url}下载失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "视频下载失败等待重新处理", "3002", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频下载失败等待重新处理,视频链接{url}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] {url}视频下载成功") if data["title_category"] == "AI标题" or str(data["trailer_share"]) == "AI标题": title = GPT4oMini.get_ai_mini_title( original_title if data["title_category"] == "AI标题" else data["title_category"]) else: title = original_title if data["title_category"] == "原标题" else data["title_category"] if tag_transport_channel == "抖音": if "复制打开抖音" in data['video_url']: channel_mark = "APP" else: channel_mark = "PC" else: if "https://www.kuaishou.com/f" in data['video_url']: channel_mark = "PC" else: channel_mark = "APP" if data["transform_rule"] == '否' or data["transform_rule"] == "是": logger.info(f"[处理] 数据开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS oss_object_key = oss_object_key.get("oss_object_key") tags = ','.join(filter(None, [ data['pq_label'], channel_mark, tag_transport_channel, data['tag_transport_scene'], data['tag_transport_keyword'], "搬运工具", data['tag'] ])) cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark, "搬运工具", "MANUAL_TRANSPORT_TOOL_ORIGIN") if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是": try: width, height = FFmpeg.get_w_h_size(video_path) if width < height: # 判断是否需要修改为竖屏 video_path = FFmpeg.update_video_h_w(video_path, file_path) logger.info(f"[处理] 视频更改分辨率处理") video_path = FFmpeg.video_640(video_path, file_path) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 视频更改分辨率失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾拼接失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频更改分辨率失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 视频更改分辨率处理成功") if data["video_clipping"]: # 判断是否需要裁剪 video_path = FFmpeg.video_crop(video_path, file_path) if data["video_clipping_time"]: # 判断是否需要指定视频时长 video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"]) if data['trailer_share'] == "内容分析": video_text = GoogleAI.run(studio_key, video_path) if not video_text: logger.error(f"[处理] 视频内容分析获取内容信息失败") data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,视频内容分析获取内容信息失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频内容分析获取内容信息失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 片尾引导-开始获取AI片尾") pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text) else: prompt = Material.get_propmt_data(str(data['trailer_share'])) pw_srt_text = GPT4oMini.get_ai_mini_pw(title, prompt) voice = data['trailer_share_audio'] if voice: if ',' in voice: voices = voice.split(',') else: voices = [voice] voice = random.choice(voices) else: voice = "zhifeng_emo" pw_url = TTS.get_pw_zm(pw_srt_text, voice) if not pw_url: logger.error(f"[处理] 数据片尾获取失败") data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾获取失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 获取片尾失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据片尾获取成功") pw_srt = TTS.getSrt(pw_url) if not pw_srt: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾音频srt获取失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾音频下载失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return pw_mp3_path = TTS.download_mp3(pw_url, file_path) if not pw_mp3_path: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾音频下载失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾音频下载失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据片尾音频下载成功") if str(data['trailer_share_video']) and str(data['trailer_share_video'] ) != "None": rg_pw = str(data["trailer_share_video"]) if ',' in rg_pw: rg_pw_list = rg_pw.split(',') else: rg_pw_list = [rg_pw] rg_pw_url_list = PQ.get_pq_oss(rg_pw_list) if not rg_pw_url_list: AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "无法获取站内视频链接", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 无法获取站内视频链接\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return pw_url_duration = FFmpeg.get_http_duration([pw_url]) pw_videos_duration = FFmpeg.get_http_duration(rg_pw_url_list) if pw_videos_duration < pw_url_duration: jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾获取最后一帧失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,获取最后一帧失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 获取视频最后一帧失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据片尾获取最后一帧成功") else: rg_pw_url = DownLoad.download_pq_video(file_path, rg_pw_url_list) rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path) if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾拼接失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾拼接失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾拼接失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return jpg_path = FFmpeg.video_640(rg_pw_list, file_path) logger.info(f"[处理] 生成人工片尾成功") else: jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾获取最后一帧失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,获取最后一帧失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 获取视频最后一帧失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据片尾获取最后一帧成功") pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据片尾拼接失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,片尾拼接失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾拼接失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据合并开始拼接") trailer_share_bgm = data['trailer_share_bgm'] # trailer_share_bgm = '48594759' if trailer_share_bgm and trailer_share_bgm != "None": try: logger.info(f"[处理] 获取bgm") rg_bgm_list = PQ.get_pq_oss([trailer_share_bgm]) rg_bgm_url = DownLoad.download_pq_video(file_path, rg_bgm_list) bgm_mp3_path = FFmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0]) pw_path = FFmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path) logger.info(f"[处理] 片尾bgm添加成功") except Exception as e: logger.error(f"[处理] 片尾bgm添加失败") video_path = FFmpeg.h_b_video(video_path, pw_path, file_path) video_path = FFmpeg.single_video(video_path, file_path, data["video_share"]) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[处理] 数据添加片中字幕失败") AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "改造失败,添加片中字幕失败", "3001", str(data)) text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频片中增加字幕失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return logger.info(f"[处理] 数据添加片中字幕成功") logger.info(f"[处理] 数据开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS logger.info(f"[处理] 数据发送oss成功") oss_object_key = oss_object_key.get("oss_object_key") tags = ','.join(filter(None, [ data['pq_label'], channel_mark, tag_transport_channel, data['tag_transport_scene'], data['tag_transport_keyword'], "搬运改造", data['tag'] ])) cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark, "搬运改造", "MANUAL_TRANSPORT_TOOL_TRANSFORM") return except Exception as e: data["transform_rule"] = "仅改造" RedisHelper().get_client().rpush(redis_name, json.dumps(data)) logger.error(f"[+] 视频改造失败{e}") text = ( f"**负责人**: {data['name']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频改造失败{e}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", "【 搬运&改造效率工具失败通知 】") return @classmethod def run(cls): uid = str(uuid.uuid4()) file_path = os.path.join(CACHE_DIR, uid) logger.info(f"[处理] 开始获取redis数据") fs_data = os.getenv("FS_DATA") # fs_data = '周仙琴,2WIcBU,task:carry_data_redis_zxq' fs_data_list = fs_data.split(',') logger.info(f"[+] fs_data_list=={fs_data_list}") redis_name = fs_data_list[2] studio_key = fs_data_list[3] logger.info(f"[+] redis_name == {redis_name} studio_key == {studio_key}") data = RedisHelper().get_client().lpop(name=redis_name) logger.info(f"[+] 本次处理的数据=={data}") if not data: logger.info('[处理] 无待执行的扫描任务') return data = orjson.loads(data) try: cls.data_handle(data, file_path, redis_name,studio_key) for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"已删除文件: {file_path}") except Exception as e: logger.error(f"删除文件时出错: {file_path}, 错误: {e}") return except Exception as e: RedisHelper().get_client().rpush(redis_name, json.dumps(data)) for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"已删除文件: {file_path}") except Exception as e: logger.error(f"删除文件时出错: {file_path}, 错误: {e}") return def run(): scheduler = BlockingScheduler() try: logger.info(f"[处理] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次 scheduler.start() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[处理] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': run()