import os import sys import uuid from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger from utils.gpt4o_mini_help import GPT4oMini from utils.tts_help import TTS # sys.path.append('/app') from utils.download_video import DownLoad from utils.sql_help import sqlCollect from utils.aliyun_oss import Oss from utils.ffmpeg import FFmpeg # CACHE_DIR = '/app/cache/' # CACHE_DIR = '/Users/z/Downloads/' CACHE_DIR = '/root/long_text_video_transform/path/' class ConsumptionRecommend(object): @classmethod def data_handle(cls, oss_url, title, file_path): video_url = f"https://rescdn.yishihui.com/{oss_url}" logger.info(f"[长文] {video_url}视频开始下载") video_path = DownLoad.download_video(video_url, file_path) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: """下载失败修改状态""" sqlCollect.update_oss_path_status(99, oss_url) return logger.info(f"[长文] {video_url}视频下载成功") width, height = FFmpeg.get_w_h_size(video_path) if width < height: # 判断是否需要修改为竖屏 logger.info(f"[长文] {video_url}视频修改为竖屏") video_path = FFmpeg.update_video_h_w(video_path, file_path) logger.info(f"[长文] 视频更改分辨率处理") video_path = FFmpeg.video_640(video_path, file_path) logger.info(f"[长文] 视频更改分辨率处理成功") pw_srt_text = GPT4oMini.get_ai_mini_pw(title) pw_url = TTS.get_pw_zm(pw_srt_text, "zhifeng_emo") if not pw_url: """获取片尾引导失败""" sqlCollect.update_oss_path_status(99, oss_url) return pw_srt = TTS.getSrt(pw_url) if not pw_srt: """获取片尾引导失败""" sqlCollect.update_oss_path_status(99, oss_url) return logger.info(f"[长文] 开始下载音频") pw_mp3_path = TTS.download_mp3(pw_url, file_path) if not pw_mp3_path: """片尾音频下载失败""" sqlCollect.update_oss_path_status(99, oss_url) return logger.info(f"[长文] 片尾下载成功") jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0: """片尾生成失败""" sqlCollect.update_oss_path_status(99, oss_url) return logger.info(f"[长文] 合并开始拼接") video_path = FFmpeg.h_b_video(video_path, pw_path, file_path) zm = "温馨提示:\n点击下方按钮,传递好运" video_path = FFmpeg.single_video(video_path, file_path, zm) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: """视频合并失败""" sqlCollect.update_oss_path_status(99, oss_url) return logger.info(f"[长文] 视频-开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS logger.info(f"[长文] 数据发送oss成功") oss_object_key = oss_object_key.get("oss_object_key") logger.info(f"[长文] oss新地址写入数据") sqlCollect.add_new_oss_path(oss_object_key,oss_url) logger.info(f"[长文] oss新地址写入成功") logger.info(f"[长文] 修改数据库状态") sqlCollect.update_oss_path_status(2, oss_url) logger.info(f"[长文] 修改数据库状态成功") return @classmethod def run(cls): uid = str(uuid.uuid4()) file_path = os.path.join(CACHE_DIR, uid) logger.info(f"[长文] 查询数据") data = sqlCollect.get_oss_path() title = data[0][0] oss_url = data[0][1] if not oss_url: logger.info(f"[长文] 无待处理的数据") return try: sqlCollect.update_oss_path_status(1, oss_url) cls.data_handle(oss_url, title, file_path) for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"[长文] 已删除文件: {file_path}") except Exception as e: logger.error(f"[长文] 删除文件时出错: {file_path}, 错误: {e}") return except Exception as e: for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"[长文] 已删除文件: {file_path}") except Exception as e: logger.error(f"[长文] 删除文件时出错: {file_path}, 错误: {e}") return # # def run(): # scheduler = BlockingScheduler() # try: # logger.info(f"[长文] 开始启动") # scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次 # scheduler.start() # except KeyboardInterrupt: # pass # except Exception as e: # logger.error(f"[长文] 启动异常,异常信息:{e}") # pass # finally: # scheduler.shutdown() # # # if __name__ == '__main__': # ConsumptionRecommend.run()