consumption_work_studio.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import os
  2. import sys
  3. import time
  4. import uuid
  5. from datetime import datetime
  6. import orjson
  7. from apscheduler.schedulers.blocking import BlockingScheduler
  8. from apscheduler.triggers.interval import IntervalTrigger
  9. from loguru import logger
  10. sys.path.append('/app')
  11. from utils.redis import RedisHelper
  12. from utils.aliyun_log import AliyunLogger
  13. from utils.aliyun_oss import Oss
  14. from utils.download_video import DownLoad
  15. from utils.feishu_utils import Feishu
  16. from utils.ffmpeg import FFmpeg
  17. from utils.google_ai_studio import GoogleAI
  18. from utils.gpt4o_mini_help import GPT4oMini
  19. from utils.piaoquan import PQ
  20. from utils.sql_help import sqlCollect
  21. from utils.tag_video import Tag
  22. from utils.tts_help import TTS
  23. CACHE_DIR = '/app/cache/'
  24. # CACHE_DIR = '/Users/z/Downloads/'
  25. class ConsumptionRecommend(object):
  26. @classmethod
  27. def insert_pq(cls, data, oss_object_key, title, cover):
  28. logger.info(f"[内容分析] 开始写入票圈")
  29. code = PQ.insert_piaoquantv(oss_object_key, title, '50322062', cover)
  30. if not code:
  31. logger.error(f"[内容分析] 写入票圈后台失败")
  32. text = (
  33. f"**渠道**: {data['channel']}\n"
  34. f"**内容**: {data}\n"
  35. f"**失败信息**: 写入票圈后台失败\n"
  36. )
  37. Feishu.finish_bot(text,
  38. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  39. f"【 内容理解-{data['channel']}失败通知 】")
  40. return
  41. logger.info(f"[内容分析] 写入票圈成功,返回视频id{code}")
  42. tag_status = Tag.video_tag(code, "lev-供给,rol-机器,#str-搬运改造内容理解引导语实验_60")
  43. Tag.video_tag(data["videoid"], "lev-供给,rol-机器,#str-搬运改造内容理解引导语base_61")
  44. if tag_status == 0:
  45. logger.info(f"[内容分析] 写入标签成功,后台视频ID为{code}")
  46. try:
  47. current_time = datetime.now()
  48. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  49. sqlCollect.insert_machine_making_data(data["channel"], data["channel"], data["channel"],
  50. data["videoid"], data["videoid"], "50322062",
  51. title,
  52. code,
  53. formatted_time, title, oss_object_key)
  54. pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
  55. values = [
  56. [
  57. data["videoid"],
  58. code,
  59. data["channel"],
  60. data["dt"],
  61. formatted_time,
  62. pq_url
  63. ]
  64. ]
  65. Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "ROWS", 1, 2)
  66. time.sleep(0.5)
  67. Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", '1Ycd37', "A2:Z2", values)
  68. logger.info(f"[内容分析] 写入飞书成功")
  69. return
  70. except Exception as e:
  71. logger.error(f"[内容分析] 写入飞书失败{e}")
  72. return
  73. @classmethod
  74. def data_handle(cls, data, file_path):
  75. video_id = data["videoid"]
  76. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "扫描到一条视频", "2001", str(data))
  77. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "符合规则等待改造", "2004", str(data))
  78. logger.info(f"[内容分析] 获取{video_id}的视频链接")
  79. video_path, cover_path, old_title = PQ.get_pq_oss_path(video_id)
  80. if not video_path:
  81. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "没有获取到视频链接", "3001",
  82. str(data))
  83. text = (
  84. f"**渠道**: {data['channel']}\n"
  85. f"**内容**: {data}\n"
  86. f"**失败信息**: 没有获取到视频链接\n"
  87. )
  88. Feishu.finish_bot(text,
  89. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  90. f"【 内容理解-{data['channel']}失败通知 】")
  91. return
  92. video_url = f"http://rescdn.yishihui.com/{video_path}"
  93. video_path = DownLoad.download_video(video_url, file_path, '', video_id)
  94. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  95. logger.error(f"[内容分析] {video_url}下载失败")
  96. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "视频下载失败等待重新处理",
  97. "3002",
  98. str(data))
  99. text = (
  100. f"**渠道**: {data['channel']}\n"
  101. f"**内容**: {data}\n"
  102. f"**失败信息**: 视频下载失败\n"
  103. )
  104. Feishu.finish_bot(text,
  105. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  106. f"【 内容理解-{data['channel']}失败通知 】")
  107. return
  108. logger.info(f"[内容分析] {video_url}视频下载成功")
  109. logger.info(f"[内容分析] {video_url}开始处理标题")
  110. video_path = FFmpeg.video_640(video_path, file_path)
  111. logger.info(f"[内容分析] 视频更改分辨率处理成功")
  112. logger.info(f"[内容分析] 片尾引导-开始获取视频口播内容")
  113. video_text = GoogleAI.run("AIzaSyCor0q5w37Dy6fGxloLlCT7KqyEFU3PWP8", video_path)
  114. if not video_text:
  115. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,获取口播文案失败",
  116. "3003",
  117. str(data))
  118. text = (
  119. f"**渠道**: {data['channel']}\n"
  120. f"**内容**: {data}\n"
  121. f"**失败信息**: 获取口播文案失败\n"
  122. )
  123. Feishu.finish_bot(text,
  124. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  125. f"【 内容理解-{data['channel']}失败通知 】")
  126. return
  127. logger.info(f"[内容分析] 片尾引导-开始获取AI片尾")
  128. pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
  129. pw_url = TTS.get_pw_zm(pw_srt_text, 'zhifeng_emo')
  130. if not pw_url:
  131. logger.error(f"[内容分析] 片尾引导-片尾获取失败")
  132. data["transform_rule"] = "仅改造"
  133. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾获取失败",
  134. "3003",
  135. str(data))
  136. text = (
  137. f"**渠道**: {data['channel']}\n"
  138. f"**内容**: {data}\n"
  139. f"**失败信息**: 片尾获取失败\n"
  140. )
  141. Feishu.finish_bot(text,
  142. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  143. f"【 内容理解-{data['channel']}失败通知 】")
  144. return
  145. logger.info(f"[内容分析] 片尾引导-片尾获取成功")
  146. pw_srt = TTS.getSrt(pw_url)
  147. if not pw_srt:
  148. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频获取失败",
  149. "3003",
  150. str(data))
  151. text = (
  152. f"**渠道**: {data['channel']}\n"
  153. f"**内容**: {data}\n"
  154. f"**失败信息**: 片尾音频获取失败\n"
  155. )
  156. Feishu.finish_bot(text,
  157. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  158. f"【 内容理解-{data['channel']}失败通知 】")
  159. return
  160. pw_mp3_path = TTS.download_mp3(pw_url, file_path)
  161. if not pw_mp3_path:
  162. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,片尾音频下载失败",
  163. "3003",
  164. str(data))
  165. text = (
  166. f"**渠道**: {data['channel']}\n"
  167. f"**内容**: {data}\n"
  168. f"**失败信息**: 片尾音频下载失败\n"
  169. )
  170. Feishu.finish_bot(text,
  171. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  172. f"【 内容理解-{data['channel']}失败通知 】")
  173. return
  174. logger.info(f"[内容分析] 片尾引导-片尾音频下载成功")
  175. logger.info(f"[内容分析] 片尾引导-片尾获取最后一帧成功")
  176. jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
  177. pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
  178. if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
  179. logger.error(f"[内容分析] 片尾引导-片尾拼接失败")
  180. AliyunLogger.logging(data["type"], "片尾引导", "", data["video_url"],
  181. "片尾引导,片尾拼接失败", "3003", str(data))
  182. text = (
  183. f"**渠道**: {data['channel']}\n"
  184. f"**内容**: {data}\n"
  185. f"**失败信息**: 片尾拼接失败\n"
  186. )
  187. Feishu.finish_bot(text,
  188. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  189. f"【 内容理解-{data['channel']}失败通知 】")
  190. return
  191. logger.info(f"[内容分析] 片尾引导-合并开始拼接")
  192. video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
  193. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  194. logger.error(f"[内容分析] 片尾引导-添加片尾失败")
  195. text = (
  196. f"**渠道**: {data['channel']}\n"
  197. f"**内容**: {data}\n"
  198. f"**失败信息**: 添加片尾失败\n"
  199. )
  200. Feishu.finish_bot(text,
  201. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  202. f"【 内容理解-{data['channel']}失败通知 】")
  203. return
  204. logger.info(f"[内容分析] 片尾引导-开始发送oss")
  205. oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
  206. status = oss_object_key.get("status")
  207. if status != 200:
  208. logger.error(f"[内容分析] 片尾引导-发送oss失败")
  209. AliyunLogger.logging(data["type"], "片尾引导", data["channel"], video_id, "片尾引导,发送oss失败",
  210. "3003",
  211. str(data))
  212. text = (
  213. f"**渠道**: {data['channel']}\n"
  214. f"**内容**: {data}\n"
  215. f"**失败信息**: 发送oss失败\n"
  216. )
  217. Feishu.finish_bot(text,
  218. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  219. f"【 内容理解-{data['channel']}失败通知 】")
  220. return
  221. logger.info(f"[内容分析] 片尾引导-发送oss成功")
  222. oss_object_key = oss_object_key.get("oss_object_key")
  223. cls.insert_pq(data, oss_object_key, old_title, cover_path)
  224. return
  225. @classmethod
  226. def run(cls):
  227. logger.info(f"[内容分析] 开始获取redis数据")
  228. data = RedisHelper().get_client().rpop(name = "task:carry_redis_by_nrfx")
  229. if not data:
  230. logger.info('[内容分析] 无待执行的扫描任务')
  231. return
  232. data = orjson.loads(data)
  233. uid = str(uuid.uuid4())
  234. file_path = os.path.join(CACHE_DIR, uid)
  235. try:
  236. cls.data_handle(data, file_path)
  237. for filename in os.listdir(CACHE_DIR):
  238. # 检查文件名是否包含关键字
  239. if uid in filename:
  240. file_path = os.path.join(CACHE_DIR, filename)
  241. try:
  242. # 删除文件
  243. os.remove(file_path)
  244. logger.info(f"已删除文件: {file_path}")
  245. except Exception as e:
  246. logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
  247. return
  248. except Exception as e:
  249. for filename in os.listdir(CACHE_DIR):
  250. # 检查文件名是否包含关键字
  251. if uid in filename:
  252. file_path = os.path.join(CACHE_DIR, filename)
  253. try:
  254. # 删除文件
  255. os.remove(file_path)
  256. logger.info(f"已删除文件: {file_path}")
  257. except Exception as e:
  258. logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
  259. return
  260. def run():
  261. scheduler = BlockingScheduler()
  262. try:
  263. logger.info(f"[内容分析] 开始启动")
  264. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=5)) # 每5分钟启动一次
  265. scheduler.start()
  266. except KeyboardInterrupt:
  267. pass
  268. except Exception as e:
  269. logger.error(f"[内容分析] 启动异常,异常信息:{e}")
  270. pass
  271. finally:
  272. scheduler.shutdown()
  273. if __name__ == '__main__':
  274. run()