123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- import os
- import sys
- import time
- import uuid
- from datetime import datetime
- import orjson
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from loguru import logger
- sys.path.append('/app')
- from utils.redis import RedisHelper
- from utils.aliyun_log import AliyunLogger
- from utils.aliyun_oss import Oss
- from utils.download_video import DownLoad
- from utils.feishu_utils import Feishu
- from utils.ffmpeg import FFmpeg
- from utils.google_ai_studio import GoogleAI
- from utils.gpt4o_mini_help import GPT4oMini
- from utils.piaoquan import PQ
- from utils.sql_help import sqlCollect
- from utils.tag_video import Tag
- from utils.tts_help import TTS
- CACHE_DIR = '/tmp/'
- # CACHE_DIR = '/Users/z/Downloads/'
- class ConsumptionRecommend(object):
- @classmethod
- def insert_pq(cls, data, oss_object_key, title, cover):
- logger.info(f"[内容分析] 开始写入票圈")
- code = PQ.insert_piaoquantv(oss_object_key, title, '50322062', cover)
- if not code:
- logger.error(f"[内容分析] 写入票圈后台失败")
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 写入票圈后台失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 写入票圈成功,返回视频id{code}")
- tag_status = Tag.video_tag(code, "lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60")
- Tag.video_tag(data["videoid"], "lev-供给,rol-机器,#str-搬运改造内容理解引导语base_61")
- if tag_status == 0:
- logger.info(f"[内容分析] 写入标签成功,后台视频ID为{code}")
- try:
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- sqlCollect.insert_machine_making_data(data["channel"], data["channel"], data["channel"],
- data["videoid"], data["videoid"], "50322062",
- title,
- code,
- formatted_time, title, oss_object_key)
- pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
- values = [
- [
- data["videoid"],
- code,
- data["channel"],
- data["dt"],
- formatted_time,
- pq_url
- ]
- ]
- Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "A2:Z2", values)
- logger.info(f"[内容分析] 写入飞书成功")
- return
- except Exception as e:
- logger.error(f"[内容分析] 写入飞书失败{e}")
- return
- @classmethod
- def data_handle(cls, data, file_path):
- video_id = data["videoid"]
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "扫描到一条视频", "2001", str(data))
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "符合规则等待改造", "2004", str(data))
- logger.info(f"[内容分析] 获取{video_id}的视频链接")
- video_path, cover_path, old_title = PQ.get_pq_oss_path(video_id)
- if not video_path:
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "没有获取到视频链接", "3001",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 没有获取到视频链接\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- video_url = f"http://rescdn.yishihui.com/{video_path}"
- video_path = DownLoad.download_video(video_url, file_path, '', video_id)
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- logger.error(f"[内容分析] {video_url}下载失败")
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "视频下载失败等待重新处理",
- "3002",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 视频下载失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] {video_url}视频下载成功")
- logger.info(f"[内容分析] {video_url}开始处理标题")
- video_path = FFmpeg.video_640(video_path, file_path)
- logger.info(f"[内容分析] 视频更改分辨率处理成功")
- logger.info(f"[内容分析] 片尾引导-开始获取视频口播内容")
- video_text = GoogleAI.run("AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8", video_path)
- if not video_text:
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,获取口播文案失败",
- "3003",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 获取口播文案失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-开始获取AI片尾")
- pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
- pw_url = TTS.get_pw_zm(pw_srt_text, 'zhifeng_emo')
- if not pw_url:
- logger.error(f"[内容分析] 片尾引导-片尾获取失败")
- data["transform_rule"] = "仅改造"
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾获取失败",
- "3003",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾获取失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-片尾获取成功")
- pw_srt = TTS.getSrt(pw_url)
- if not pw_srt:
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频获取失败",
- "3003",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾音频获取失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- pw_mp3_path = TTS.download_mp3(pw_url, file_path)
- if not pw_mp3_path:
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频下载失败",
- "3003",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾音频下载失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-片尾音频下载成功")
- logger.info(f"[内容分析] 片尾引导-片尾获取最后一帧成功")
- jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
- pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
- if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
- logger.error(f"[内容分析] 片尾引导-片尾拼接失败")
- AliyunLogger.logging(data["type"], "片尾引导", "", data["video_url"],
- "片尾引导,片尾拼接失败", "3003", str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 片尾拼接失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-合并开始拼接")
- video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
- if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
- logger.error(f"[内容分析] 片尾引导-添加片尾失败")
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 添加片尾失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-开始发送oss")
- oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
- status = oss_object_key.get("status")
- if status != 200:
- logger.error(f"[内容分析] 片尾引导-发送oss失败")
- AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,发送oss失败",
- "3003",
- str(data))
- text = (
- f"**渠道**: {data['channel']}\n"
- f"**内容**: {data}\n"
- f"**失败信息**: 发送oss失败\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
- f"【 内容理解-{data['channel']}失败通知 】")
- return
- logger.info(f"[内容分析] 片尾引导-发送oss成功")
- oss_object_key = oss_object_key.get("oss_object_key")
- cls.insert_pq(data, oss_object_key, old_title, cover_path)
- return
- @classmethod
- def run(cls):
- logger.info(f"[内容分析] 开始获取redis数据")
- data = RedisHelper().get_client().rpop(name = "task:carry_redis_by_nrfx")
- if not data:
- logger.info('[内容分析] 无待执行的扫描任务')
- return
- data = orjson.loads(data)
- uid = str(uuid.uuid4())
- file_path = os.path.join(CACHE_DIR, uid)
- try:
- cls.data_handle(data, file_path)
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
- return
- except Exception as e:
- for filename in os.listdir(CACHE_DIR):
- # 检查文件名是否包含关键字
- if uid in filename:
- file_path = os.path.join(CACHE_DIR, filename)
- try:
- # 删除文件
- os.remove(file_path)
- logger.info(f"已删除文件: {file_path}")
- except Exception as e:
- logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
- return
- def run():
- scheduler = BlockingScheduler()
- try:
- logger.info(f"[内容分析] 开始启动")
- scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=5)) # 每5分钟启动一次
- scheduler.start()
- except KeyboardInterrupt:
- pass
- except Exception as e:
- logger.error(f"[内容分析] 启动异常,异常信息:{e}")
- pass
- finally:
- scheduler.shutdown()
- if __name__ == '__main__':
- run()
|