import os import random import sys import time import uuid from datetime import datetime import orjson from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.interval import IntervalTrigger from loguru import logger sys.path.append('/app') from utils.redis import RedisHelper from utils.aliyun_log import AliyunLogger from utils.download_video import DownLoad from utils.feishu_form import Material from utils.feishu_utils import Feishu from utils.piaoquan import PQ from utils.sql_help import sqlCollect from utils.tag_video import Tag from channel.dy import DY from channel.dy_keyword import DyKeyword from channel.ks import KS from channel.ks_keyword import KsKeyword from channel.ks_xcx import KSXCX from utils.aliyun_oss import Oss from utils.ffmpeg import FFmpeg from utils.google_ai_studio import GoogleAI from utils.gpt4o_mini_help import GPT4oMini from utils.tts_help import TTS CACHE_DIR = '/app/cache/' # CACHE_DIR = '/Users/z/Downloads/' class ConsumptionRecommend(object): @classmethod def generate_title(cls, video, title): """ 生成新标题 """ if video['old_title']: new_title = video['old_title'].strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") else: return '这个视频,分享给我的老友,祝愿您能幸福安康' if title == "原标题": if not new_title: new_title = '这个视频,分享给我的老友,祝愿您能幸福安康' elif title == "AI标题": if not new_title: new_title = '这个视频,分享给我的老友,祝愿您能幸福安康' else: new_title = GPT4oMini.get_ai_mini_title(new_title) else: titles = title.split('/') if '/' in title else [title] new_title = random.choice(titles) return new_title @classmethod def insert_pq(cls, task, oss_object_key, title, tags, fs_channel_name, video, voice, fs_mark): logger.info(f"[+] 开始写入票圈") n_id = str(task["pd_id"]) code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None) if not code: logger.error(f"[机器改造] 写入票圈后台失败") text = ( f"**通知类型**: 写入票圈后台失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频下载失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") logger.info(f"[机器改造] 写入票圈成功,返回视频id{code}") log_data = f"user:{task['channel']},,video_id:{video['video_id']},,video_url:{video['video_url']},,ai_title:{title},,voice:{voice}" AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频改造成功", "1000", log_data, str(code)) tag_status = Tag.video_tag(code, str(tags)) if tag_status == 0: logger.info(f"[机器改造] 写入标签成功,后台视频ID为{code}") current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") sqlCollect.insert_machine_making_data(fs_channel_name, task['task_mark'], task['channel'], task['channel_url'], video['video_id'], n_id, title, code, formatted_time, title, oss_object_key) sqlCollect.insert_task(task['task_mark'], task['channel_url'], fs_mark, task['channel']) # 插入数据库 try: current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 if '历史抖音' == task['channel'] or '历史快手' == task['channel']: explain = "历史爆款" else: explain = "新供给" values = [ [ fs_channel_name, task['task_mark'], task['channel'], task['channel_url'], str(video['video_id']), str(n_id), video['old_title'], "AI标题", title, str(code), formatted_time, video["rule"], explain, voice, tags, task['keyword_name'], pq_url ] ] if fs_channel_name == "抖音品类账号": sheet = "905313" elif fs_channel_name == "快手品类账号": sheet = "JHVpNK" elif fs_channel_name == "抖音关键词搜索": sheet = "6nclDV" elif fs_channel_name == "快手关键词搜索": sheet = "PVd8nj" elif fs_channel_name == "快手小程序": sheet = "dEjDt1" elif fs_channel_name == "top溯源账号": sheet = "5tPopP" Feishu.insert_columns("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "A2:Z2", values) logger.info(f"[处理] 写入飞书成功") except Exception as e: logger.error(f"[处理] 写入飞书失败{e}") pass return @classmethod def get_channle_data(cls, task, fs_channel_name): channel = task['channel'] if channel == "抖音" or channel == "历史抖音": return DY.get_dy_list(task, fs_channel_name) elif channel == "快手" or channel == "历史快手": return KS.get_ks_list(task, fs_channel_name) elif channel == "抖音搜索": return DyKeyword.get_key_word(task, fs_channel_name) elif channel == "快手搜索": return KsKeyword.get_key_word(task, fs_channel_name) elif channel == '快手小程序': return KSXCX.get_xcx_date(task, fs_channel_name) @classmethod def data_handle(cls, task, file_path, fs_channel_name, ai_key, fs_mark): logger.info(f"[机器改造] 开始获取视频数据") data_list = cls.get_channle_data(task, fs_channel_name) if not data_list: logger.info(f"[+] {fs_channel_name}-{task['channel_url']}获取视频完成,没有符合/改造视频") AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], "", "无改造视频", "4000") text = ( f"**通知类型**: 没有改造的视频\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") return for video in data_list: file_path = file_path + str(uuid.uuid4()) logger.info(f"[机器改造] {video['video_url']}视频开始下载") video_path = DownLoad.download_video(video["video_url"], file_path, task['channel'], video["video_id"]) logger.info(f"[机器改造] {video['video_url']}视频下载成功") if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: text = ( f"**通知类型**: 视频下载失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频下载失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue logger.info(f"[机器改造] 视频更改分辨率处理") width, height = FFmpeg.get_w_h_size(video_path) if width < height: # 判断是否需要修改为竖屏 video_path = FFmpeg.update_video_h_w(video_path, file_path) if task["crop_tool"]: # 判断是否需要裁剪 video_path = FFmpeg.video_crop(video_path, file_path) if task["gg_duration"]: # 判断是否需要指定视频时长 video_path = FFmpeg.video_ggduration(video_path, file_path, task["gg_duration"]) video_path = FFmpeg.video_640(video_path, file_path) logger.info(f"[机器改造] 视频更改分辨率处理成功") video_text = GoogleAI.run(ai_key, video_path) if not video_text: text = ( f"**通知类型**: 获取口播文案失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "获取口播文案失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text) voice = task["voice"] if voice: if ',' in voice: voices = voice.split(',') else: voices = [voice] voice = random.choice(voices) else: voice = "zhifeng_emo" pw_url = TTS.get_pw_zm(pw_srt_text, voice) if not pw_url: text = ( f"**通知类型**: 获取片尾引导失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "获取片尾引导失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue pw_srt = TTS.getSrt(pw_url) if not pw_srt: text = ( f"**通知类型**: 获取片尾引导失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "获取片尾引导失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue logger.info(f"[机器改造] 开始下载音频") pw_mp3_path = TTS.download_mp3(pw_url, file_path) if not pw_mp3_path: text = ( f"**通知类型**: 片尾音频下载失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "片尾音频下载失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue logger.info(f"[机器改造] 片尾下载成功") jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0: text = ( f"**通知类型**: 生成片尾视频失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "生成片尾视频失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue logger.info(f"[机器改造] 合并开始拼接") video_path = FFmpeg.h_b_video(video_path, pw_path, file_path) zm = Material.get_pzsrt_data("summary", "500Oe0", task["video_share"]) video_path = FFmpeg.single_video(video_path, file_path, zm) if not os.path.exists(video_path) or os.path.getsize(video_path) == 0: text = ( f"**通知类型**: 合并拼接失败\n" f"**负责人**: {fs_channel_name}\n" f"**渠道**: {task['channel']}\n" f"**视频主页ID**: {task['channel_url']}\n" f"**视频Video_id**: {video['video_id']}\n" ) AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "合并拼接失败", "3002", f"video_url:{video['video_url']}") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue logger.info(f"[机器改造] 视频-开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS logger.info(f"[机器改造] 数据发送oss成功") oss_object_key = oss_object_key.get("oss_object_key") base_tags = [task['tag'], f"一级品类_{task['first_category']}",f"来源_{fs_channel_name}"] tags = ','.join(filter(None, base_tags)) title = cls.generate_title(video, task["ai_title"]) cls.insert_pq(task, oss_object_key,title, tags, fs_channel_name, video, voice, fs_mark) @classmethod def run(cls): uid = str(uuid.uuid4()) file_path = os.path.join(CACHE_DIR, uid) logger.info(f"[机器改造] 开始获取redis数据") fs_data = os.getenv("FS_DATA") # fs_data = '快手关键词搜索,ks-gjc,B65Gs3bCHhIzj0t7KGWcwZD0nGf,91wp7k,AIzaSyBiwivvBKfqDsxvqAKBrCZyk-wFMhfthXg' try: fs_data_list = fs_data.split(',') fs_channel_name = fs_data_list[0] fs_mark = fs_data_list[1] fs_id = fs_data_list[2] fs_sheet = fs_data_list[3] ai_key = fs_data_list[4] data = RedisHelper().get_client().rpop(name=f"task:{fs_mark}") if not data: logger.info('[机器改造] 无待执行的扫描任务,重新获取飞书表格读取任务') fs_data = Material.get_carry_data(fs_id, fs_sheet) RedisHelper().get_client().rpush(f"task:{fs_mark}", *fs_data) return task = orjson.loads(data) totality_count = task['totality_count'] if totality_count: task_totality_count = sqlCollect.get_mark_count(task["task_mark"]) if int(totality_count) <= int(task_totality_count[0][0]): AliyunLogger.logging((task["channel"]), fs_channel_name, task["channel_url"], '', f"{task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}", "1111") logger.info( f"[+] {task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}") return if fs_channel_name in ["快手关键词搜索","抖音关键词搜索"]: gjc_count = RedisHelper().get_client().get(f"{fs_mark}_count") RedisHelper().get_client().incrby(f"{fs_mark}_count", 1) if int(gjc_count.decode()) >= 300: logger.info(f"[机器改造] {fs_channel_name}上限") return cls.data_handle(task, file_path, fs_channel_name, ai_key, fs_mark) for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"[机器改造] 已删除文件: {file_path}") except Exception as e: logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}") return except Exception as e: for filename in os.listdir(CACHE_DIR): # 检查文件名是否包含关键字 if uid in filename: file_path = os.path.join(CACHE_DIR, filename) try: # 删除文件 os.remove(file_path) logger.info(f"[机器改造] 已删除文件: {file_path}") except Exception as e: logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}") return def run(): scheduler = BlockingScheduler() try: logger.info(f"[机器改造] 开始启动") scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次 scheduler.start() except KeyboardInterrupt: pass except Exception as e: logger.error(f"[机器改造] 启动异常,异常信息:{e}") pass finally: scheduler.shutdown() if __name__ == '__main__': run()