|
- import json
- import os
- import random
- import re
- import sys
- import time
- import uuid
- from datetime import datetime
- import orjson
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.redis import RedisHelper
- from utils.aliyun_log import AliyunLogger
- from utils.aliyun_oss import Oss
- from utils.download_video import DownLoad
- from utils.dy_ks_get_url import Dy_KS
- from utils.feishu_form import Material
- from utils.feishu_utils import Feishu
- from utils.ffmpeg import FFmpeg
- from utils.gpt4o_mini_help import GPT4oMini
- from utils.piaoquan import PQ
- from utils.sql_help import sqlCollect
- from utils.tag_video import Tag
- from utils.tts_help import TTS
- from utils.google_ai_studio import GoogleAI
- CACHE_DIR = '/app/cache/'
- # CACHE_DIR = '/Users/z/Downloads/'
- class ConsumptionRecommend(object):
- @classmethod
- def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark):
- logger.info(f"[+] 开始写入票圈")
- n_ids = str(data["pq_ids"])
- if ',' in n_ids:
- n_id_list = n_ids.split(',')
- else:
- n_id_list = [n_ids]
- pq_list = []
- for n_id in n_id_list:
- code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None)
- if not code:
- logger.error(f"[+] 写入票圈后台失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,写入票圈后台失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频写入票圈后台失败,视频ID{code}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- continue
- pq_list.append(code)
- logger.info(f"[+] 写入票圈成功,返回视频id{code}")
- tag_status = Tag.video_tag(code, str(tags))
- if tag_status == 0:
- logger.info(f"[+] 写入标签成功,后台视频ID为{code}")
- try:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- sqlCollect.insert_machine_making_data(data["name"], task_mark, tag_transport_channel,
- data["video_url"], data["video_url"], data["pq_ids"],
- data["title_category"],
- code,
- formatted_time, data["title_category"], oss_object_key)
- pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
- values = [
- [
- str(code),
- str(n_id),
- formatted_time,
- channel_mark,
- data["name"],
- data["pq_ids"],
- data["pq_label"],
- data["activate_data"],
- data["video_url"],
- data["title_category"],
- tag_transport_channel,
- data["tag_transport_scene"],
- data["tag_transport_keyword"],
- data["tag"],
- data["transform_rule"],
- data["video_share"],
- data["trailer_share"],
- data["trailer_share_audio"],
- data["video_clipping"],
- data["video_clipping_time"],
- data["title_transform"],
- pq_url
- ]
- ]
- name_to_sheet = {
- "范军": "276ffc",
- "鲁涛": "QqrKRY",
- "余海涛": "dTzUlI",
- "罗情": "8JPv9g",
- "刘诗雨": "HqwG0o",
- "王媛": "vtWvle",
- "周仙琴": "MWUqWt",
- "王雪珂": "xN1KrU",
- "信欣": "PtoeGT",
- "邓锋": "dgV2Af",
- "王知微":"QDyCg6",
- "刘兆恒": "vtRMGX",
- "张博": "LEZMRr",
- "子涵": "tCw56r"
- }
- name = re.sub(r"\s+", "", data.get("name", ""))
- sheet = name_to_sheet.get(name)
- Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "A2:Z2", values)
- logger.info(f"[处理] 写入飞书成功")
- except Exception as e:
- logger.error(f"[处理] 写入飞书失败{e}")
- pass
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造成功", "1000", str(data), str(pq_list))
- return
- @classmethod
- def data_handle(cls, data, file_path, redis_name,studio_key):
- url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具")
- if url == "重新处理" or not url:
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 没有获取到视频链接,等待重新处理\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 搬运&改造效率工具失败通知 】")
- return
- elif url == "作品不存在" or url == "链接不是抖/快" or url == "note":
- if url == "note":
- url = "图文"
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: {url},不做处理\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 搬运&改造效率工具失败通知 】")
- return
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "扫描到一条视频",
- "2001", str(data))
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "符合规则等待改造",
- "2004", str(data))
- logger.info(f"[处理] {url}开始下载视频")
- video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] {url}下载失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "视频下载失败等待重新处理", "3002", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频下载失败等待重新处理,视频链接{url}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] {url}视频下载成功")
- if data["title_category"] == "AI标题" or str(data["trailer_share"]) == "AI标题":
- title = GPT4oMini.get_ai_mini_title(
- original_title if data["title_category"] == "AI标题" else data["title_category"])
- else:
- title = original_title if data["title_category"] == "原标题" else data["title_category"]
- if tag_transport_channel == "抖音":
- if "复制打开抖音" in data['video_url']:
- channel_mark = "APP"
- else:
- channel_mark = "PC"
- else:
- if "https://www.kuaishou.com/f" in data['video_url']:
- channel_mark = "PC"
- else:
- channel_mark = "APP"
- if data["transform_rule"] == '否' or data["transform_rule"] == "是":
- logger.info(f"[处理] 数据开始发送oss")
- oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
- oss_object_key = oss_object_key.get("oss_object_key")
- tags = ','.join(filter(None, [
- data['pq_label'],
- channel_mark,
- tag_transport_channel,
- data['tag_transport_scene'],
- data['tag_transport_keyword'],
- "搬运工具",
- data['tag']
- ]))
- cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
- "搬运工具")
- if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
- try:
- width, height = FFmpeg.get_w_h_size(video_path)
- if width < height: # 判断是否需要修改为竖屏
- video_path = FFmpeg.update_video_h_w(video_path, file_path)
- logger.info(f"[处理] 视频更改分辨率处理")
- video_path = FFmpeg.video_640(video_path, file_path)
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 视频更改分辨率失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾拼接失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频更改分辨率失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 视频更改分辨率处理成功")
- if data["video_clipping"]: # 判断是否需要裁剪
- video_path = FFmpeg.video_crop(video_path, file_path)
- if data["video_clipping_time"]: # 判断是否需要指定视频时长
- video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
- if data['trailer_share'] == "内容分析":
- video_text = GoogleAI.run(studio_key, video_path)
- if not video_text:
- logger.error(f"[处理] 视频内容分析获取内容信息失败")
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,视频内容分析获取内容信息失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频内容分析获取内容信息失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 片尾引导-开始获取AI片尾")
- pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
- else:
- prompt = Material.get_propmt_data(str(data['trailer_share']))
- pw_srt_text = GPT4oMini.get_ai_mini_pw(title, prompt)
- voice = data['trailer_share_audio']
- if voice:
- if ',' in voice:
- voices = voice.split(',')
- else:
- voices = [voice]
- voice = random.choice(voices)
- else:
- voice = "zhifeng_emo"
- pw_url = TTS.get_pw_zm(pw_srt_text, voice)
- if not pw_url:
- logger.error(f"[处理] 数据片尾获取失败")
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾获取失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 获取片尾失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据片尾获取成功")
- pw_srt = TTS.getSrt(pw_url)
- if not pw_srt:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾音频srt获取失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾音频下载失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾音频下载失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- pw_mp3_path = TTS.download_mp3(pw_url, file_path)
- if not pw_mp3_path:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾音频下载失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾音频下载失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾音频下载失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据片尾音频下载成功")
- if str(data['trailer_share_video']) and str(data['trailer_share_video'] ) != "None":
- rg_pw = str(data["trailer_share_video"])
- if ',' in rg_pw:
- rg_pw_list = rg_pw.split(',')
- else:
- rg_pw_list = [rg_pw]
- rg_pw_url_list = PQ.get_pq_oss(rg_pw_list)
- if not rg_pw_url_list:
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "无法获取站内视频链接", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 无法获取站内视频链接\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- pw_url_duration = FFmpeg.get_http_duration([pw_url])
- pw_videos_duration = FFmpeg.get_http_duration(rg_pw_url_list)
- if pw_videos_duration < pw_url_duration:
- jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
- if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾获取最后一帧失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,获取最后一帧失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 获取视频最后一帧失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据片尾获取最后一帧成功")
- else:
- rg_pw_url = DownLoad.download_pq_video(file_path, rg_pw_url_list)
- rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path)
- if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾拼接失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾拼接失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾拼接失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- jpg_path = FFmpeg.video_640(rg_pw_list, file_path)
- logger.info(f"[处理] 生成人工片尾成功")
- else:
- jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
- if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾获取最后一帧失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,获取最后一帧失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 获取视频最后一帧失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据片尾获取最后一帧成功")
- pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
- if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据片尾拼接失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,片尾拼接失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾拼接失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据合并开始拼接")
- trailer_share_bgm = data['trailer_share_bgm']
- # trailer_share_bgm = '48594759'
- if trailer_share_bgm and trailer_share_bgm != "None":
- try:
- logger.info(f"[处理] 获取bgm")
- rg_bgm_list = PQ.get_pq_oss([trailer_share_bgm])
- rg_bgm_url = DownLoad.download_pq_video(file_path, rg_bgm_list)
- bgm_mp3_path = FFmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
- pw_path = FFmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
- logger.info(f"[处理] 片尾bgm添加成功")
- except Exception as e:
- logger.error(f"[处理] 片尾bgm添加失败")
- video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
- video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[处理] 数据添加片中字幕失败")
- AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
- "改造失败,添加片中字幕失败", "3001", str(data))
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频片中增加字幕失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- logger.info(f"[处理] 数据添加片中字幕成功")
- logger.info(f"[处理] 数据开始发送oss")
- oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
- logger.info(f"[处理] 数据发送oss成功")
- oss_object_key = oss_object_key.get("oss_object_key")
- tags = ','.join(filter(None, [
- data['pq_label'],
- channel_mark,
- tag_transport_channel,
- data['tag_transport_scene'],
- data['tag_transport_keyword'],
- "搬运改造",
- data['tag']
- ]))
- cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
- "搬运改造")
- return
- except Exception as e:
- data["transform_rule"] = "仅改造"
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- logger.error(f"[+] 视频改造失败{e}")
- text = (
- f"**负责人**: {data['name']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频改造失败{e}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- "【 搬运&改造效率工具失败通知 】")
- return
- @classmethod
- def run(cls):
- uid = str(uuid.uuid4())
- file_path = os.path.join(CACHE_DIR, uid)
- logger.info(f"[处理] 开始获取redis数据")
- fs_data = os.getenv("FS_DATA")
- # fs_data = '周仙琴,2WIcBU,task:carry_data_redis_zxq'
- fs_data_list = fs_data.split(',')
- redis_name = fs_data_list[2]
- studio_key = fs_data_list[3]
- data = RedisHelper().get_client().rpop(name=redis_name)
- if not data:
- logger.info('[处理] 无待执行的扫描任务')
- return
- data = orjson.loads(data)
- try:
- cls.data_handle(data, file_path, redis_name,studio_key)
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
- return
- except Exception as e:
- RedisHelper().get_client().rpush(redis_name, json.dumps(data))
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
- return
- def run():
- scheduler = BlockingScheduler()
- try:
- logger.info(f"[处理] 开始启动")
- scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次
- scheduler.start()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.error(f"[处理] 启动异常,异常信息:{e}")
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- run()
|