import os import sys import time import uuid from datetime import datetime import orjson from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger sys.path.append('/app') from utils.redis import RedisHelper from utils.aliyun_log import AliyunLogger from utils.aliyun_oss import Oss from utils.download_video import DownLoad from utils.feishu_utils import Feishu from utils.ffmpeg import FFmpeg from utils.google_ai_studio import GoogleAI from utils.gpt4o_mini_help import GPT4oMini from utils.piaoquan import PQ from utils.sql_help import sqlCollect from utils.tag_video import Tag from utils.tts_help import TTS CACHE_DIR = '/app/cache/' # CACHE_DIR = '/Users/z/Downloads/' class ConsumptionRecommend(object): @classmethod def insert_pq(cls, data, oss_object_key, title, cover): logger.info(f"[内容分析] 开始写入票圈") code = PQ.insert_piaoquantv(oss_object_key, title, '50322062', cover) if not code: logger.error(f"[内容分析] 写入票圈后台失败") text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 写入票圈后台失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 写入票圈成功,返回视频id{code}") tag_status = Tag.video_tag(code, "lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60") Tag.video_tag(data["videoid"], "lev-供给,rol-机器,#str-搬运改造内容理解引导语base_61") if tag_status == 0: logger.info(f"[内容分析] 写入标签成功,后台视频ID为{code}") try: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(data["channel"], data["channel"], data["channel"], data["videoid"], data["videoid"], "50322062", title, code, formatted_time, title, oss_object_key) pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 values = [ [ data["videoid"], code, data["channel"], data["dt"], formatted_time, pq_url ] ] Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "A2:Z2", values) logger.info(f"[内容分析] 写入飞书成功") return except Exception as e: logger.error(f"[内容分析] 写入飞书失败{e}") return @classmethod def data_handle(cls, data, file_path): video_id = data["videoid"] AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "扫描到一条视频", "2001", str(data)) AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "符合规则等待改造", "2004", str(data)) logger.info(f"[内容分析] 获取{video_id}的视频链接") video_path, cover_path, old_title = PQ.get_pq_oss_path(video_id) if not video_path: AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "没有获取到视频链接", "3001", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 没有获取到视频链接\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return video_url = f"http://rescdn.yishihui.com/{video_path}" video_path = DownLoad.download_video(video_url, file_path, '', video_id) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: logger.error(f"[内容分析] {video_url}下载失败") AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "视频下载失败等待重新处理", "3002", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 视频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] {video_url}视频下载成功") logger.info(f"[内容分析] {video_url}开始处理标题") video_path = FFmpeg.video_640(video_path, file_path) logger.info(f"[内容分析] 视频更改分辨率处理成功") logger.info(f"[内容分析] 片尾引导-开始获取视频口播内容") video_text = GoogleAI.run("AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8", video_path) if not video_text: AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,获取口播文案失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 获取口播文案失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-开始获取AI片尾") pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text) pw_url = TTS.get_pw_zm(pw_srt_text, 'zhifeng_emo') if not pw_url: logger.error(f"[内容分析] 片尾引导-片尾获取失败") data["transform_rule"] = "仅改造" AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾获取失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾获取失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-片尾获取成功") pw_srt = TTS.getSrt(pw_url) if not pw_srt: AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频获取失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频获取失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return pw_mp3_path = TTS.download_mp3(pw_url, file_path) if not pw_mp3_path: AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频下载失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾音频下载失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-片尾音频下载成功") logger.info(f"[内容分析] 片尾引导-片尾获取最后一帧成功") jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0: logger.error(f"[内容分析] 片尾引导-片尾拼接失败") AliyunLogger.logging(data["type"], "片尾引导", "", data["video_url"], "片尾引导,片尾拼接失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 片尾拼接失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-合并开始拼接") video_path = FFmpeg.h_b_video(video_path, pw_path, file_path) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: logger.error(f"[内容分析] 片尾引导-添加片尾失败") text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 添加片尾失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS status = oss_object_key.get("status") if status != 200: logger.error(f"[内容分析] 片尾引导-发送oss失败") AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,发送oss失败", "3003", str(data)) text = ( f"**渠道**: {data['channel']}\n" f"**内容**: {data}\n" f"**失败信息**: 发送oss失败\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd", f"【 内容理解-{data['channel']}失败通知 】") return logger.info(f"[内容分析] 片尾引导-发送oss成功") oss_object_key = oss_object_key.get("oss_object_key") cls.insert_pq(data, oss_object_key, old_title, cover_path) return @classmethod def run(cls): logger.info(f"[内容分析] 开始获取redis数据") data = RedisHelper().get_client().rpop(name = "task:carry_redis_by_nrfx") if not data: logger.info('[内容分析] 无待执行的扫描任务') return data = orjson.loads(data) uid = str(uuid.uuid4()) file_path = os.path.join(CACHE_DIR, uid) try: cls.data_handle(data, file_path) for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"已删除文件: {file_path}") except Exception as e: logger.error(f"删除文件时出错: {file_path}, 错误: {e}") return except Exception as e: for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"已删除文件: {file_path}") except Exception as e: logger.error(f"删除文件时出错: {file_path}, 错误: {e}") return def run(): scheduler = BlockingScheduler() try: logger.info(f"[内容分析] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=5)) # 每5分钟启动一次 scheduler.start() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[内容分析] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': run()