123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- import os
- import random
- import sys
- import time
- import uuid
- from datetime import datetime
- import orjson
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.redis import RedisHelper
- from utils.aliyun_log import AliyunLogger
- from utils.download_video import DownLoad
- from utils.feishu_form import Material
- from utils.feishu_utils import Feishu
- from utils.piaoquan import PQ
- from utils.sql_help import sqlCollect
- from utils.tag_video import Tag
- from channel.dy import DY
- from channel.dy_keyword import DyKeyword
- from channel.ks import KS
- from channel.ks_keyword import KsKeyword
- from channel.ks_xcx import KSXCX
- from utils.aliyun_oss import Oss
- from utils.ffmpeg import FFmpeg
- from utils.google_ai_studio import GoogleAI
- from utils.gpt4o_mini_help import GPT4oMini
- from utils.tts_help import TTS
- CACHE_DIR = '/app/cache/'
- # CACHE_DIR = '/Users/z/Downloads/'
- class ConsumptionRecommend(object):
- @classmethod
- def generate_title(cls, video, title):
- """
- 生成新标题
- """
- if video['old_title']:
- new_title = video['old_title'].strip().replace("\n", "") \
- .replace("/", "").replace("\\", "").replace("\r", "") \
- .replace(":", "").replace("*", "").replace("?", "") \
- .replace("?", "").replace('"', "").replace("<", "") \
- .replace(">", "").replace("|", "").replace(" ", "") \
- .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
- .replace("'", "").replace("#", "").replace("Merge", "")
- else:
- return '这个视频,分享给我的老友,祝愿您能幸福安康'
- if title == "原标题":
- if not new_title:
- new_title = '这个视频,分享给我的老友,祝愿您能幸福安康'
- elif title == "AI标题":
- if not new_title:
- new_title = '这个视频,分享给我的老友,祝愿您能幸福安康'
- else:
- new_title = GPT4oMini.get_ai_mini_title(new_title)
- else:
- titles = title.split('/') if '/' in title else [title]
- new_title = random.choice(titles)
- return new_title
- @classmethod
- def insert_pq(cls, task, oss_object_key, title, tags, fs_channel_name, video, voice, fs_mark):
- logger.info(f"[+] 开始写入票圈")
- n_id = str(task["pd_id"])
- code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None)
- if not code:
- logger.error(f"[机器改造] 写入票圈后台失败")
- text = (
- f"**通知类型**: 写入票圈后台失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "视频下载失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- logger.info(f"[机器改造] 写入票圈成功,返回视频id{code}")
- log_data = f"user:{task['channel']},,video_id:{video['video_id']},,video_url:{video['video_url']},,ai_title:{title},,voice:{voice}"
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频改造成功", "1000", log_data, str(code))
- tag_status = Tag.video_tag(code, str(tags))
- if tag_status == 0:
- logger.info(f"[机器改造] 写入标签成功,后台视频ID为{code}")
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- sqlCollect.insert_machine_making_data(fs_channel_name, task['task_mark'], task['channel'], task['channel_url'], video['video_id'], n_id, title, code,
- formatted_time, title, oss_object_key)
- sqlCollect.insert_task(task['task_mark'], task['channel_url'], fs_mark, task['channel']) # 插入数据库
- try:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
- if '历史抖音' == task['channel'] or '历史快手' == task['channel']:
- explain = "历史爆款"
- else:
- explain = "新供给"
- values = [
- [
- fs_channel_name,
- task['task_mark'],
- task['channel'],
- task['channel_url'],
- str(video['video_id']),
- str(n_id),
- video['old_title'],
- "AI标题",
- title,
- str(code),
- formatted_time,
- video["rule"],
- explain,
- voice,
- tags,
- task['keyword_name'],
- pq_url
- ]
- ]
- if fs_channel_name == "抖音品类账号":
- sheet = "905313"
- elif fs_channel_name == "快手品类账号":
- sheet = "JHVpNK"
- elif fs_channel_name == "抖音关键词搜索":
- sheet = "6nclDV"
- elif fs_channel_name == "快手关键词搜索":
- sheet = "PVd8nj"
- elif fs_channel_name == "快手小程序":
- sheet = "dEjDt1"
- elif fs_channel_name == "top溯源账号":
- sheet = "5tPopP"
- Feishu.insert_columns("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "A2:Z2", values)
- logger.info(f"[处理] 写入飞书成功")
- except Exception as e:
- logger.error(f"[处理] 写入飞书失败{e}")
- pass
- return
- @classmethod
- def get_channle_data(cls, task, fs_channel_name):
- channel = task['channel']
- if channel == "抖音" or channel == "历史抖音":
- return DY.get_dy_list(task, fs_channel_name)
- elif channel == "快手" or channel == "历史快手":
- return KS.get_ks_list(task, fs_channel_name)
- elif channel == "抖音搜索":
- return DyKeyword.get_key_word(task, fs_channel_name)
- elif channel == "快手搜索":
- return KsKeyword.get_key_word(task, fs_channel_name)
- elif channel == '快手小程序':
- return KSXCX.get_xcx_date(task, fs_channel_name)
- @classmethod
- def data_handle(cls, task, file_path, fs_channel_name, ai_key, fs_mark):
- logger.info(f"[机器改造] 开始获取视频数据")
- data_list = cls.get_channle_data(task, fs_channel_name)
- if not data_list:
- logger.info(f"[+] {fs_channel_name}-{task['channel_url']}获取视频完成,没有符合/改造视频")
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], "", "无改造视频", "4000")
- text = (
- f"**通知类型**: 没有改造的视频\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- )
- Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- return
- for video in data_list:
- file_path = file_path + str(uuid.uuid4())
- logger.info(f"[机器改造] {video['video_url']}视频开始下载")
- video_path = DownLoad.download_video(video["video_url"], file_path, task['channel'], video["video_id"])
- logger.info(f"[机器改造] {video['video_url']}视频下载成功")
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- text = (
- f"**通知类型**: 视频下载失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频下载失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- logger.info(f"[机器改造] 视频更改分辨率处理")
- width, height = FFmpeg.get_w_h_size(video_path)
- if width < height: # 判断是否需要修改为竖屏
- video_path = FFmpeg.update_video_h_w(video_path, file_path)
- if task["crop_tool"]: # 判断是否需要裁剪
- video_path = FFmpeg.video_crop(video_path, file_path)
- if task["gg_duration"]: # 判断是否需要指定视频时长
- video_path = FFmpeg.video_ggduration(video_path, file_path, task["gg_duration"])
- video_path = FFmpeg.video_640(video_path, file_path)
- logger.info(f"[机器改造] 视频更改分辨率处理成功")
- video_text = GoogleAI.run(ai_key, video_path)
- if not video_text:
- text = (
- f"**通知类型**: 获取口播文案失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "获取口播文案失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
- voice = task["voice"]
- if voice:
- if ',' in voice:
- voices = voice.split(',')
- else:
- voices = [voice]
- voice = random.choice(voices)
- else:
- voice = "zhifeng_emo"
- pw_url = TTS.get_pw_zm(pw_srt_text, voice)
- if not pw_url:
- text = (
- f"**通知类型**: 获取片尾引导失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "获取片尾引导失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- pw_srt = TTS.getSrt(pw_url)
- if not pw_srt:
- text = (
- f"**通知类型**: 获取片尾引导失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "获取片尾引导失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- logger.info(f"[机器改造] 开始下载音频")
- pw_mp3_path = TTS.download_mp3(pw_url, file_path)
- if not pw_mp3_path:
- text = (
- f"**通知类型**: 片尾音频下载失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "片尾音频下载失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- logger.info(f"[机器改造] 片尾下载成功")
- jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
- pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
- if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
- text = (
- f"**通知类型**: 生成片尾视频失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "生成片尾视频失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- logger.info(f"[机器改造] 合并开始拼接")
- video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
- zm = Material.get_pzsrt_data("summary", "500Oe0", task["video_share"])
- video_path = FFmpeg.single_video(video_path, file_path, zm)
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- text = (
- f"**通知类型**: 合并拼接失败\n"
- f"**负责人**: {fs_channel_name}\n"
- f"**渠道**: {task['channel']}\n"
- f"**视频主页ID**: {task['channel_url']}\n"
- f"**视频Video_id**: {video['video_id']}\n"
- )
- AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
- "合并拼接失败", "3002", f"video_url:{video['video_url']}")
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- continue
- logger.info(f"[机器改造] 视频-开始发送oss")
- oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
- logger.info(f"[机器改造] 数据发送oss成功")
- oss_object_key = oss_object_key.get("oss_object_key")
- base_tags = [task['tag'], f"一级品类_{task['first_category']}",f"来源_{fs_channel_name}"]
- tags = ','.join(filter(None, base_tags))
- title = cls.generate_title(video, task["ai_title"])
- cls.insert_pq(task, oss_object_key,title, tags, fs_channel_name, video, voice, fs_mark)
- @classmethod
- def run(cls):
- uid = str(uuid.uuid4())
- file_path = os.path.join(CACHE_DIR, uid)
- logger.info(f"[机器改造] 开始获取redis数据")
- fs_data = os.getenv("FS_DATA")
- # fs_data = '快手关键词搜索,ks-gjc,B65Gs3bCHhIzj0t7KGWcwZD0nGf,91wp7k,AIzaSyBiwivvBKfqDsxvqAKBrCZyk-wFMhfthXg'
- try:
- fs_data_list = fs_data.split(',')
- fs_channel_name = fs_data_list[0]
- fs_mark = fs_data_list[1]
- fs_id = fs_data_list[2]
- fs_sheet = fs_data_list[3]
- ai_key = fs_data_list[4]
- data = RedisHelper().get_client().rpop(name=f"task:{fs_mark}")
- if not data:
- logger.info('[机器改造] 无待执行的扫描任务,重新获取飞书表格读取任务')
- fs_data = Material.get_carry_data(fs_id, fs_sheet)
- RedisHelper().get_client().rpush(f"task:{fs_mark}", *fs_data)
- return
- task = orjson.loads(data)
- totality_count = task['totality_count']
- if totality_count:
- task_totality_count = sqlCollect.get_mark_count(task["task_mark"])
- if int(totality_count) <= int(task_totality_count[0][0]):
- AliyunLogger.logging((task["channel"]), fs_channel_name, task["channel_url"], '',
- f"{task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}",
- "1111")
- logger.info(
- f"[+] {task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}")
- return
- if fs_channel_name in ["快手关键词搜索","抖音关键词搜索"]:
- gjc_count = RedisHelper().get_client().get(f"{fs_mark}_count")
- RedisHelper().get_client().incrby(f"{fs_mark}_count", 1)
- if int(gjc_count.decode()) >= 300:
- logger.info(f"[机器改造] {fs_channel_name}上限")
- return
- cls.data_handle(task, file_path, fs_channel_name, ai_key, fs_mark)
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"[机器改造] 已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}")
- return
- except Exception as e:
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"[机器改造] 已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}")
- return
- def run():
- scheduler = BlockingScheduler()
- try:
- logger.info(f"[机器改造] 开始启动")
- scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次
- scheduler.start()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.error(f"[机器改造] 启动异常,异常信息:{e}")
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- run()
|