consumption_work.py 29 KB


  1. import json
  2. import os
  3. import random
  4. import re
  5. import sys
  6. import time
  7. import uuid
  8. from datetime import datetime
  9. import orjson
  10. from apscheduler.schedulers.blocking import BlockingScheduler
  11. from apscheduler.triggers.interval import IntervalTrigger
  12. from loguru import logger
  13. sys.path.append('/app')
  14. from utils.redis import RedisHelper
  15. from utils.aliyun_log import AliyunLogger
  16. from utils.aliyun_oss import Oss
  17. from utils.download_video import DownLoad
  18. from utils.dy_ks_get_url import Dy_KS
  19. from utils.feishu_form import Material
  20. from utils.feishu_utils import Feishu
  21. from utils.ffmpeg import FFmpeg
  22. from utils.gpt4o_mini_help import GPT4oMini
  23. from utils.piaoquan import PQ
  24. from utils.sql_help import sqlCollect
  25. from utils.tag_video import Tag
  26. from utils.tts_help import TTS
  27. from utils.google_ai_studio import GoogleAI
  28. CACHE_DIR = '/app/cache/'
  29. # CACHE_DIR = '/Users/z/Downloads/'
  30. class ConsumptionRecommend(object):
  31. @classmethod
  32. def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark):
  33. logger.info(f"[+] 开始写入票圈")
  34. n_ids = str(data["pq_ids"])
  35. if ',' in n_ids:
  36. n_id_list = n_ids.split(',')
  37. else:
  38. n_id_list = [n_ids]
  39. pq_list = []
  40. for n_id in n_id_list:
  41. code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None)
  42. if not code:
  43. logger.error(f"[+] 写入票圈后台失败")
  44. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  45. "改造失败,写入票圈后台失败", "3001", str(data))
  46. text = (
  47. f"**负责人**: {data['name']}\n"
  48. f"**内容**: {data}\n"
  49. f"**失败信息**: 视频写入票圈后台失败,视频ID{code}\n"
  50. )
  51. Feishu.finish_bot(text,
  52. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  53. "【 搬运&改造效率工具失败通知 】")
  54. continue
  55. pq_list.append(code)
  56. logger.info(f"[+] 写入票圈成功,返回视频id{code}")
  57. tag_status = Tag.video_tag(code, str(tags))
  58. if tag_status == 0:
  59. logger.info(f"[+] 写入标签成功,后台视频ID为{code}")
  60. try:
  61. current_time = datetime.now()
  62. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  63. sqlCollect.insert_machine_making_data(data["name"], task_mark, tag_transport_channel,
  64. data["video_url"], data["video_url"], data["pq_ids"],
  65. data["title_category"],
  66. code,
  67. formatted_time, data["title_category"], oss_object_key)
  68. pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
  69. values = [
  70. [
  71. str(code),
  72. str(n_id),
  73. formatted_time,
  74. channel_mark,
  75. data["name"],
  76. data["pq_ids"],
  77. data["pq_label"],
  78. data["activate_data"],
  79. data["video_url"],
  80. data["title_category"],
  81. tag_transport_channel,
  82. data["tag_transport_scene"],
  83. data["tag_transport_keyword"],
  84. data["tag"],
  85. data["transform_rule"],
  86. data["video_share"],
  87. data["trailer_share"],
  88. data["trailer_share_audio"],
  89. data["video_clipping"],
  90. data["video_clipping_time"],
  91. data["title_transform"],
  92. pq_url
  93. ]
  94. ]
  95. name_to_sheet = {
  96. "范军": "276ffc",
  97. "鲁涛": "QqrKRY",
  98. "余海涛": "dTzUlI",
  99. "罗情": "8JPv9g",
  100. "刘诗雨": "HqwG0o",
  101. "王媛": "vtWvle",
  102. "周仙琴": "MWUqWt",
  103. "王雪珂": "xN1KrU",
  104. "信欣": "PtoeGT",
  105. "邓锋": "dgV2Af",
  106. "王知微":"QDyCg6",
  107. "刘兆恒": "vtRMGX",
  108. "张博": "LEZMRr",
  109. "子涵": "tCw56r"
  110. }
  111. name = re.sub(r"\s+", "", data.get("name", ""))
  112. sheet = name_to_sheet.get(name)
  113. Feishu.insert_columns("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "ROWS", 1, 2)
  114. time.sleep(0.5)
  115. Feishu.update_values("R4dLsce8Jhz9oCtDMr9ccpFHnbI", sheet, "A2:Z2", values)
  116. logger.info(f"[处理] 写入飞书成功")
  117. except Exception as e:
  118. logger.error(f"[处理] 写入飞书失败{e}")
  119. pass
  120. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  121. "改造成功", "1000", str(data), str(pq_list))
  122. return
  123. @classmethod
  124. def data_handle(cls, data, file_path, redis_name,studio_key):
  125. url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具")
  126. if url == "重新处理" or not url:
  127. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  128. text = (
  129. f"**负责人**: {data['name']}\n"
  130. f"**内容**: {data}\n"
  131. f"**失败信息**: 没有获取到视频链接,等待重新处理\n"
  132. )
  133. Feishu.finish_bot(text,
  134. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  135. f"【 搬运&改造效率工具失败通知 】")
  136. return
  137. elif url == "作品不存在" or url == "链接不是抖/快" or url == "note":
  138. if url == "note":
  139. url = "图文"
  140. text = (
  141. f"**负责人**: {data['name']}\n"
  142. f"**内容**: {data}\n"
  143. f"**失败信息**: {url},不做处理\n"
  144. )
  145. Feishu.finish_bot(text,
  146. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  147. f"【 搬运&改造效率工具失败通知 】")
  148. return
  149. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "扫描到一条视频",
  150. "2001", str(data))
  151. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "符合规则等待改造",
  152. "2004", str(data))
  153. logger.info(f"[处理] {url}开始下载视频")
  154. video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
  155. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  156. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  157. logger.error(f"[处理] {url}下载失败")
  158. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  159. "视频下载失败等待重新处理", "3002", str(data))
  160. text = (
  161. f"**负责人**: {data['name']}\n"
  162. f"**内容**: {data}\n"
  163. f"**失败信息**: 视频下载失败等待重新处理,视频链接{url}\n"
  164. )
  165. Feishu.finish_bot(text,
  166. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  167. "【 搬运&改造效率工具失败通知 】")
  168. return
  169. logger.info(f"[处理] {url}视频下载成功")
  170. if data["title_category"] == "AI标题" or str(data["trailer_share"]) == "AI标题":
  171. title = GPT4oMini.get_ai_mini_title(
  172. original_title if data["title_category"] == "AI标题" else data["title_category"])
  173. else:
  174. title = original_title if data["title_category"] == "原标题" else data["title_category"]
  175. if tag_transport_channel == "抖音":
  176. if "复制打开抖音" in data['video_url']:
  177. channel_mark = "APP"
  178. else:
  179. channel_mark = "PC"
  180. else:
  181. if "https://www.kuaishou.com/f" in data['video_url']:
  182. channel_mark = "PC"
  183. else:
  184. channel_mark = "APP"
  185. if data["transform_rule"] == '否' or data["transform_rule"] == "是":
  186. logger.info(f"[处理] 数据开始发送oss")
  187. oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
  188. oss_object_key = oss_object_key.get("oss_object_key")
  189. tags = ','.join(filter(None, [
  190. data['pq_label'],
  191. channel_mark,
  192. tag_transport_channel,
  193. data['tag_transport_scene'],
  194. data['tag_transport_keyword'],
  195. "搬运工具",
  196. data['tag']
  197. ]))
  198. cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
  199. "搬运工具")
  200. if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
  201. try:
  202. width, height = FFmpeg.get_w_h_size(video_path)
  203. if width < height: # 判断是否需要修改为竖屏
  204. video_path = FFmpeg.update_video_h_w(video_path, file_path)
  205. logger.info(f"[处理] 视频更改分辨率处理")
  206. video_path = FFmpeg.video_640(video_path, file_path)
  207. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  208. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  209. logger.error(f"[处理] 视频更改分辨率失败")
  210. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  211. "改造失败,片尾拼接失败", "3001", str(data))
  212. text = (
  213. f"**负责人**: {data['name']}\n"
  214. f"**内容**: {data}\n"
  215. f"**失败信息**: 视频更改分辨率失败\n"
  216. )
  217. Feishu.finish_bot(text,
  218. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  219. "【 搬运&改造效率工具失败通知 】")
  220. return
  221. logger.info(f"[处理] 视频更改分辨率处理成功")
  222. if data["video_clipping"]: # 判断是否需要裁剪
  223. video_path = FFmpeg.video_crop(video_path, file_path)
  224. if data["video_clipping_time"]: # 判断是否需要指定视频时长
  225. video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
  226. if data['trailer_share'] == "内容分析":
  227. video_text = GoogleAI.run(studio_key, video_path)
  228. if not video_text:
  229. logger.error(f"[处理] 视频内容分析获取内容信息失败")
  230. data["transform_rule"] = "仅改造"
  231. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  232. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  233. "改造失败,视频内容分析获取内容信息失败", "3001", str(data))
  234. text = (
  235. f"**负责人**: {data['name']}\n"
  236. f"**内容**: {data}\n"
  237. f"**失败信息**: 视频内容分析获取内容信息失败\n"
  238. )
  239. Feishu.finish_bot(text,
  240. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  241. "【 搬运&改造效率工具失败通知 】")
  242. return
  243. logger.info(f"[处理] 片尾引导-开始获取AI片尾")
  244. pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
  245. else:
  246. prompt = Material.get_propmt_data(str(data['trailer_share']))
  247. pw_srt_text = GPT4oMini.get_ai_mini_pw(title, prompt)
  248. voice = data['trailer_share_audio']
  249. if voice:
  250. if ',' in voice:
  251. voices = voice.split(',')
  252. else:
  253. voices = [voice]
  254. voice = random.choice(voices)
  255. else:
  256. voice = "zhifeng_emo"
  257. pw_url = TTS.get_pw_zm(pw_srt_text, voice)
  258. if not pw_url:
  259. logger.error(f"[处理] 数据片尾获取失败")
  260. data["transform_rule"] = "仅改造"
  261. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  262. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  263. "改造失败,片尾获取失败", "3001", str(data))
  264. text = (
  265. f"**负责人**: {data['name']}\n"
  266. f"**内容**: {data}\n"
  267. f"**失败信息**: 获取片尾失败\n"
  268. )
  269. Feishu.finish_bot(text,
  270. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  271. "【 搬运&改造效率工具失败通知 】")
  272. return
  273. logger.info(f"[处理] 数据片尾获取成功")
  274. pw_srt = TTS.getSrt(pw_url)
  275. if not pw_srt:
  276. data["transform_rule"] = "仅改造"
  277. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  278. logger.error(f"[处理] 数据片尾音频srt获取失败")
  279. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  280. "改造失败,片尾音频下载失败", "3001", str(data))
  281. text = (
  282. f"**负责人**: {data['name']}\n"
  283. f"**内容**: {data}\n"
  284. f"**失败信息**: 片尾音频下载失败\n"
  285. )
  286. Feishu.finish_bot(text,
  287. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  288. "【 搬运&改造效率工具失败通知 】")
  289. return
  290. pw_mp3_path = TTS.download_mp3(pw_url, file_path)
  291. if not pw_mp3_path:
  292. data["transform_rule"] = "仅改造"
  293. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  294. logger.error(f"[处理] 数据片尾音频下载失败")
  295. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  296. "改造失败,片尾音频下载失败", "3001", str(data))
  297. text = (
  298. f"**负责人**: {data['name']}\n"
  299. f"**内容**: {data}\n"
  300. f"**失败信息**: 片尾音频下载失败\n"
  301. )
  302. Feishu.finish_bot(text,
  303. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  304. "【 搬运&改造效率工具失败通知 】")
  305. return
  306. logger.info(f"[处理] 数据片尾音频下载成功")
  307. if str(data['trailer_share_video']) and str(data['trailer_share_video'] ) != "None":
  308. rg_pw = str(data["trailer_share_video"])
  309. if ',' in rg_pw:
  310. rg_pw_list = rg_pw.split(',')
  311. else:
  312. rg_pw_list = [rg_pw]
  313. rg_pw_url_list = PQ.get_pq_oss(rg_pw_list)
  314. if not rg_pw_url_list:
  315. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  316. "无法获取站内视频链接", "3001", str(data))
  317. text = (
  318. f"**负责人**: {data['name']}\n"
  319. f"**内容**: {data}\n"
  320. f"**失败信息**: 无法获取站内视频链接\n"
  321. )
  322. Feishu.finish_bot(text,
  323. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  324. "【 搬运&改造效率工具失败通知 】")
  325. return
  326. pw_url_duration = FFmpeg.get_http_duration([pw_url])
  327. pw_videos_duration = FFmpeg.get_http_duration(rg_pw_url_list)
  328. if pw_videos_duration < pw_url_duration:
  329. jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
  330. if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
  331. data["transform_rule"] = "仅改造"
  332. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  333. logger.error(f"[处理] 数据片尾获取最后一帧失败")
  334. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  335. "改造失败,获取最后一帧失败", "3001", str(data))
  336. text = (
  337. f"**负责人**: {data['name']}\n"
  338. f"**内容**: {data}\n"
  339. f"**失败信息**: 获取视频最后一帧失败\n"
  340. )
  341. Feishu.finish_bot(text,
  342. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  343. "【 搬运&改造效率工具失败通知 】")
  344. return
  345. logger.info(f"[处理] 数据片尾获取最后一帧成功")
  346. else:
  347. rg_pw_url = DownLoad.download_pq_video(file_path, rg_pw_url_list)
  348. rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path)
  349. if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0:
  350. data["transform_rule"] = "仅改造"
  351. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  352. logger.error(f"[处理] 数据片尾拼接失败")
  353. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  354. "改造失败,片尾拼接失败", "3001", str(data))
  355. text = (
  356. f"**负责人**: {data['name']}\n"
  357. f"**内容**: {data}\n"
  358. f"**失败信息**: 片尾拼接失败\n"
  359. )
  360. Feishu.finish_bot(text,
  361. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  362. "【 搬运&改造效率工具失败通知 】")
  363. return
  364. jpg_path = FFmpeg.video_640(rg_pw_list, file_path)
  365. logger.info(f"[处理] 生成人工片尾成功")
  366. else:
  367. jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
  368. if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
  369. data["transform_rule"] = "仅改造"
  370. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  371. logger.error(f"[处理] 数据片尾获取最后一帧失败")
  372. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  373. "改造失败,获取最后一帧失败", "3001", str(data))
  374. text = (
  375. f"**负责人**: {data['name']}\n"
  376. f"**内容**: {data}\n"
  377. f"**失败信息**: 获取视频最后一帧失败\n"
  378. )
  379. Feishu.finish_bot(text,
  380. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  381. "【 搬运&改造效率工具失败通知 】")
  382. return
  383. logger.info(f"[处理] 数据片尾获取最后一帧成功")
  384. pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
  385. if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
  386. data["transform_rule"] = "仅改造"
  387. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  388. logger.error(f"[处理] 数据片尾拼接失败")
  389. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  390. "改造失败,片尾拼接失败", "3001", str(data))
  391. text = (
  392. f"**负责人**: {data['name']}\n"
  393. f"**内容**: {data}\n"
  394. f"**失败信息**: 片尾拼接失败\n"
  395. )
  396. Feishu.finish_bot(text,
  397. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  398. "【 搬运&改造效率工具失败通知 】")
  399. return
  400. logger.info(f"[处理] 数据合并开始拼接")
  401. trailer_share_bgm = data['trailer_share_bgm']
  402. # trailer_share_bgm = '48594759'
  403. if trailer_share_bgm and trailer_share_bgm != "None":
  404. try:
  405. logger.info(f"[处理] 获取bgm")
  406. rg_bgm_list = PQ.get_pq_oss([trailer_share_bgm])
  407. rg_bgm_url = DownLoad.download_pq_video(file_path, rg_bgm_list)
  408. bgm_mp3_path = FFmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
  409. pw_path = FFmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
  410. logger.info(f"[处理] 片尾bgm添加成功")
  411. except Exception as e:
  412. logger.error(f"[处理] 片尾bgm添加失败")
  413. video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
  414. video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
  415. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  416. data["transform_rule"] = "仅改造"
  417. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  418. logger.error(f"[处理] 数据添加片中字幕失败")
  419. AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
  420. "改造失败,添加片中字幕失败", "3001", str(data))
  421. text = (
  422. f"**负责人**: {data['name']}\n"
  423. f"**内容**: {data}\n"
  424. f"**失败信息**: 视频片中增加字幕失败\n"
  425. )
  426. Feishu.finish_bot(text,
  427. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  428. "【 搬运&改造效率工具失败通知 】")
  429. return
  430. logger.info(f"[处理] 数据添加片中字幕成功")
  431. logger.info(f"[处理] 数据开始发送oss")
  432. oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
  433. logger.info(f"[处理] 数据发送oss成功")
  434. oss_object_key = oss_object_key.get("oss_object_key")
  435. tags = ','.join(filter(None, [
  436. data['pq_label'],
  437. channel_mark,
  438. tag_transport_channel,
  439. data['tag_transport_scene'],
  440. data['tag_transport_keyword'],
  441. "搬运改造",
  442. data['tag']
  443. ]))
  444. cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
  445. "搬运改造")
  446. return
  447. except Exception as e:
  448. data["transform_rule"] = "仅改造"
  449. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  450. logger.error(f"[+] 视频改造失败{e}")
  451. text = (
  452. f"**负责人**: {data['name']}\n"
  453. f"**内容**: {data}\n"
  454. f"**失败信息**: 视频改造失败{e}\n"
  455. )
  456. Feishu.finish_bot(text,
  457. "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
  458. "【 搬运&改造效率工具失败通知 】")
  459. return
  460. @classmethod
  461. def run(cls):
  462. uid = str(uuid.uuid4())
  463. file_path = os.path.join(CACHE_DIR, uid)
  464. logger.info(f"[处理] 开始获取redis数据")
  465. fs_data = os.getenv("FS_DATA")
  466. # fs_data = '周仙琴,2WIcBU,task:carry_data_redis_zxq'
  467. fs_data_list = fs_data.split(',')
  468. redis_name = fs_data_list[2]
  469. studio_key = fs_data_list[3]
  470. data = RedisHelper().get_client().rpop(name=redis_name)
  471. if not data:
  472. logger.info('[处理] 无待执行的扫描任务')
  473. return
  474. data = orjson.loads(data)
  475. try:
  476. cls.data_handle(data, file_path, redis_name,studio_key)
  477. for filename in os.listdir(CACHE_DIR):
  478. # 检查文件名是否包含关键字
  479. if uid in filename:
  480. file_path = os.path.join(CACHE_DIR, filename)
  481. try:
  482. # 删除文件
  483. os.remove(file_path)
  484. logger.info(f"已删除文件: {file_path}")
  485. except Exception as e:
  486. logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
  487. return
  488. except Exception as e:
  489. RedisHelper().get_client().rpush(redis_name, json.dumps(data))
  490. for filename in os.listdir(CACHE_DIR):
  491. # 检查文件名是否包含关键字
  492. if uid in filename:
  493. file_path = os.path.join(CACHE_DIR, filename)
  494. try:
  495. # 删除文件
  496. os.remove(file_path)
  497. logger.info(f"已删除文件: {file_path}")
  498. except Exception as e:
  499. logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
  500. return
  501. def run():
  502. scheduler = BlockingScheduler()
  503. try:
  504. logger.info(f"[处理] 开始启动")
  505. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次
  506. scheduler.start()
  507. except KeyboardInterrupt:
  508. pass
  509. except Exception as e:
  510. logger.error(f"[处理] 启动异常,异常信息:{e}")
  511. pass
  512. finally:
  513. scheduler.shutdown()
  514. if __name__ == '__main__':
  515. run()