consumption_work.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. import os
  2. import random
  3. import sys
  4. import time
  5. import uuid
  6. from datetime import datetime
  7. import orjson
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. from apscheduler.triggers.interval import IntervalTrigger
  10. from loguru import logger
  11. sys.path.append('/app')
  12. from utils.redis import RedisHelper
  13. from utils.aliyun_log import AliyunLogger
  14. from utils.download_video import DownLoad
  15. from utils.feishu_form import Material
  16. from utils.feishu_utils import Feishu
  17. from utils.piaoquan import PQ
  18. from utils.sql_help import sqlCollect
  19. from utils.tag_video import Tag
  20. from channel.dy import DY
  21. from channel.dy_keyword import DyKeyword
  22. from channel.ks import KS
  23. from channel.ks_keyword import KsKeyword
  24. from channel.ks_xcx import KSXCX
  25. from utils.aliyun_oss import Oss
  26. from utils.ffmpeg import FFmpeg
  27. from utils.google_ai_studio import GoogleAI
  28. from utils.gpt4o_mini_help import GPT4oMini
  29. from utils.tts_help import TTS
  30. CACHE_DIR = '/app/cache/'
  31. # CACHE_DIR = '/Users/z/Downloads/'
  32. class ConsumptionRecommend(object):
  33. @classmethod
  34. def generate_title(cls, video, title):
  35. """
  36. 生成新标题
  37. """
  38. if video['old_title']:
  39. new_title = video['old_title'].strip().replace("\n", "") \
  40. .replace("/", "").replace("\\", "").replace("\r", "") \
  41. .replace(":", "").replace("*", "").replace("?", "") \
  42. .replace("?", "").replace('"', "").replace("<", "") \
  43. .replace(">", "").replace("|", "").replace(" ", "") \
  44. .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
  45. .replace("'", "").replace("#", "").replace("Merge", "")
  46. else:
  47. return '这个视频,分享给我的老友,祝愿您能幸福安康'
  48. if title == "原标题":
  49. if not new_title:
  50. new_title = '这个视频,分享给我的老友,祝愿您能幸福安康'
  51. elif title == "AI标题":
  52. if not new_title:
  53. new_title = '这个视频,分享给我的老友,祝愿您能幸福安康'
  54. else:
  55. new_title = GPT4oMini.get_ai_mini_title(new_title)
  56. else:
  57. titles = title.split('/') if '/' in title else [title]
  58. new_title = random.choice(titles)
  59. return new_title
  60. @classmethod
  61. def insert_pq(cls, task, oss_object_key, title, tags, fs_channel_name, video, voice, fs_mark):
  62. logger.info(f"[+] 开始写入票圈")
  63. n_id = str(task["pd_id"])
  64. code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None)
  65. if not code:
  66. logger.error(f"[机器改造] 写入票圈后台失败")
  67. text = (
  68. f"**通知类型**: 写入票圈后台失败\n"
  69. f"**负责人**: {fs_channel_name}\n"
  70. f"**渠道**: {task['channel']}\n"
  71. f"**视频主页ID**: {task['channel_url']}\n"
  72. f"**视频Video_id**: {video['video_id']}\n"
  73. )
  74. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  75. "视频下载失败", "3002", f"video_url:{video['video_url']}")
  76. Feishu.finish_bot(text,
  77. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  78. "【 机器改造通知 】")
  79. logger.info(f"[机器改造] 写入票圈成功,返回视频id{code}")
  80. log_data = f"user:{task['channel']},,video_id:{video['video_id']},,video_url:{video['video_url']},,ai_title:{title},,voice:{voice}"
  81. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频改造成功", "1000", log_data, str(code))
  82. tag_status = Tag.video_tag(code, str(tags))
  83. if tag_status == 0:
  84. logger.info(f"[机器改造] 写入标签成功,后台视频ID为{code}")
  85. current_time = datetime.now()
  86. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  87. sqlCollect.insert_machine_making_data(fs_channel_name, task['task_mark'], task['channel'], task['channel_url'], video['video_id'], n_id, title, code,
  88. formatted_time, title, oss_object_key)
  89. sqlCollect.insert_task(task['task_mark'], task['channel_url'], fs_mark, task['channel']) # 插入数据库
  90. try:
  91. current_time = datetime.now()
  92. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  93. pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
  94. if '历史抖音' == task['channel'] or '历史快手' == task['channel']:
  95. explain = "历史爆款"
  96. else:
  97. explain = "新供给"
  98. values = [
  99. [
  100. fs_channel_name,
  101. task['task_mark'],
  102. task['channel'],
  103. task['channel_url'],
  104. str(video['video_id']),
  105. str(n_id),
  106. video['old_title'],
  107. "AI标题",
  108. title,
  109. str(code),
  110. formatted_time,
  111. video["rule"],
  112. explain,
  113. voice,
  114. tags,
  115. task['keyword_name'],
  116. pq_url
  117. ]
  118. ]
  119. if fs_channel_name == "抖音品类账号":
  120. sheet = "905313"
  121. elif fs_channel_name == "快手品类账号":
  122. sheet = "JHVpNK"
  123. elif fs_channel_name == "抖音关键词搜索":
  124. sheet = "6nclDV"
  125. elif fs_channel_name == "快手关键词搜索":
  126. sheet = "PVd8nj"
  127. elif fs_channel_name == "快手小程序":
  128. sheet = "dEjDt1"
  129. elif fs_channel_name == "top溯源账号":
  130. sheet = "5tPopP"
  131. Feishu.insert_columns("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "ROWS", 1, 2)
  132. time.sleep(0.5)
  133. Feishu.update_values("L2KGsz5HzhDfyYtV9IRcLHjtnsg", sheet, "A2:Z2", values)
  134. logger.info(f"[处理] 写入飞书成功")
  135. except Exception as e:
  136. logger.error(f"[处理] 写入飞书失败{e}")
  137. pass
  138. return
  139. @classmethod
  140. def get_channle_data(cls, task, fs_channel_name):
  141. channel = task['channel']
  142. if channel == "抖音" or channel == "历史抖音":
  143. return DY.get_dy_list(task, fs_channel_name)
  144. elif channel == "快手" or channel == "历史快手":
  145. return KS.get_ks_list(task, fs_channel_name)
  146. elif channel == "抖音搜索":
  147. return DyKeyword.get_key_word(task, fs_channel_name)
  148. elif channel == "快手搜索":
  149. return KsKeyword.get_key_word(task, fs_channel_name)
  150. elif channel == '快手小程序':
  151. return KSXCX.get_xcx_date(task, fs_channel_name)
  152. @classmethod
  153. def data_handle(cls, task, file_path, fs_channel_name, ai_key, fs_mark):
  154. logger.info(f"[机器改造] 开始获取视频数据")
  155. data_list = cls.get_channle_data(task, fs_channel_name)
  156. if not data_list:
  157. logger.info(f"[+] {fs_channel_name}-{task['channel_url']}获取视频完成,没有符合/改造视频")
  158. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], "", "无改造视频", "4000")
  159. text = (
  160. f"**通知类型**: 没有改造的视频\n"
  161. f"**负责人**: {fs_channel_name}\n"
  162. f"**渠道**: {task['channel']}\n"
  163. f"**视频主页ID**: {task['channel_url']}\n"
  164. )
  165. Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  166. "【 机器改造通知 】")
  167. return
  168. for video in data_list:
  169. file_path = file_path + str(uuid.uuid4())
  170. logger.info(f"[机器改造] {video['video_url']}视频开始下载")
  171. video_path = DownLoad.download_video(video["video_url"], file_path, task['channel'], video["video_id"])
  172. logger.info(f"[机器改造] {video['video_url']}视频下载成功")
  173. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  174. text = (
  175. f"**通知类型**: 视频下载失败\n"
  176. f"**负责人**: {fs_channel_name}\n"
  177. f"**渠道**: {task['channel']}\n"
  178. f"**视频主页ID**: {task['channel_url']}\n"
  179. f"**视频Video_id**: {video['video_id']}\n"
  180. )
  181. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'], "视频下载失败", "3002", f"video_url:{video['video_url']}")
  182. Feishu.finish_bot(text,
  183. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  184. "【 机器改造通知 】")
  185. continue
  186. logger.info(f"[机器改造] 视频更改分辨率处理")
  187. width, height = FFmpeg.get_w_h_size(video_path)
  188. if width < height: # 判断是否需要修改为竖屏
  189. video_path = FFmpeg.update_video_h_w(video_path, file_path)
  190. if task["crop_tool"]: # 判断是否需要裁剪
  191. video_path = FFmpeg.video_crop(video_path, file_path)
  192. if task["gg_duration"]: # 判断是否需要指定视频时长
  193. video_path = FFmpeg.video_ggduration(video_path, file_path, task["gg_duration"])
  194. video_path = FFmpeg.video_640(video_path, file_path)
  195. logger.info(f"[机器改造] 视频更改分辨率处理成功")
  196. video_text = GoogleAI.run(ai_key, video_path)
  197. if not video_text:
  198. text = (
  199. f"**通知类型**: 获取口播文案失败\n"
  200. f"**负责人**: {fs_channel_name}\n"
  201. f"**渠道**: {task['channel']}\n"
  202. f"**视频主页ID**: {task['channel_url']}\n"
  203. f"**视频Video_id**: {video['video_id']}\n"
  204. )
  205. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  206. "获取口播文案失败", "3002", f"video_url:{video['video_url']}")
  207. Feishu.finish_bot(text,
  208. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  209. "【 机器改造通知 】")
  210. continue
  211. pw_srt_text = GPT4oMini.get_content_understanding_pw(video_text)
  212. voice = task["voice"]
  213. if voice:
  214. if ',' in voice:
  215. voices = voice.split(',')
  216. else:
  217. voices = [voice]
  218. voice = random.choice(voices)
  219. else:
  220. voice = "zhifeng_emo"
  221. pw_url = TTS.get_pw_zm(pw_srt_text, voice)
  222. if not pw_url:
  223. text = (
  224. f"**通知类型**: 获取片尾引导失败\n"
  225. f"**负责人**: {fs_channel_name}\n"
  226. f"**渠道**: {task['channel']}\n"
  227. f"**视频主页ID**: {task['channel_url']}\n"
  228. f"**视频Video_id**: {video['video_id']}\n"
  229. )
  230. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  231. "获取片尾引导失败", "3002", f"video_url:{video['video_url']}")
  232. Feishu.finish_bot(text,
  233. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  234. "【 机器改造通知 】")
  235. continue
  236. pw_srt = TTS.getSrt(pw_url)
  237. if not pw_srt:
  238. text = (
  239. f"**通知类型**: 获取片尾引导失败\n"
  240. f"**负责人**: {fs_channel_name}\n"
  241. f"**渠道**: {task['channel']}\n"
  242. f"**视频主页ID**: {task['channel_url']}\n"
  243. f"**视频Video_id**: {video['video_id']}\n"
  244. )
  245. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  246. "获取片尾引导失败", "3002", f"video_url:{video['video_url']}")
  247. Feishu.finish_bot(text,
  248. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  249. "【 机器改造通知 】")
  250. continue
  251. logger.info(f"[机器改造] 开始下载音频")
  252. pw_mp3_path = TTS.download_mp3(pw_url, file_path)
  253. if not pw_mp3_path:
  254. text = (
  255. f"**通知类型**: 片尾音频下载失败\n"
  256. f"**负责人**: {fs_channel_name}\n"
  257. f"**渠道**: {task['channel']}\n"
  258. f"**视频主页ID**: {task['channel_url']}\n"
  259. f"**视频Video_id**: {video['video_id']}\n"
  260. )
  261. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  262. "片尾音频下载失败", "3002", f"video_url:{video['video_url']}")
  263. Feishu.finish_bot(text,
  264. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  265. "【 机器改造通知 】")
  266. continue
  267. logger.info(f"[机器改造] 片尾下载成功")
  268. jpg_path = FFmpeg.video_png(video_path, file_path) # 生成视频最后一帧jpg
  269. pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt) # 生成片尾视频
  270. if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
  271. text = (
  272. f"**通知类型**: 生成片尾视频失败\n"
  273. f"**负责人**: {fs_channel_name}\n"
  274. f"**渠道**: {task['channel']}\n"
  275. f"**视频主页ID**: {task['channel_url']}\n"
  276. f"**视频Video_id**: {video['video_id']}\n"
  277. )
  278. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  279. "生成片尾视频失败", "3002", f"video_url:{video['video_url']}")
  280. Feishu.finish_bot(text,
  281. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  282. "【 机器改造通知 】")
  283. continue
  284. logger.info(f"[机器改造] 合并开始拼接")
  285. video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
  286. zm = Material.get_pzsrt_data("summary", "500Oe0", task["video_share"])
  287. video_path = FFmpeg.single_video(video_path, file_path, zm)
  288. if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
  289. text = (
  290. f"**通知类型**: 合并拼接失败\n"
  291. f"**负责人**: {fs_channel_name}\n"
  292. f"**渠道**: {task['channel']}\n"
  293. f"**视频主页ID**: {task['channel_url']}\n"
  294. f"**视频Video_id**: {video['video_id']}\n"
  295. )
  296. AliyunLogger.logging(task['channel'], fs_channel_name, task['channel_url'], video['video_id'],
  297. "合并拼接失败", "3002", f"video_url:{video['video_url']}")
  298. Feishu.finish_bot(text,
  299. "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
  300. "【 机器改造通知 】")
  301. continue
  302. logger.info(f"[机器改造] 视频-开始发送oss")
  303. oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4())) # 视频发送OSS
  304. logger.info(f"[机器改造] 数据发送oss成功")
  305. oss_object_key = oss_object_key.get("oss_object_key")
  306. base_tags = [task['tag'], f"一级品类_{task['first_category']}",f"来源_{fs_channel_name}"]
  307. tags = ','.join(filter(None, base_tags))
  308. title = cls.generate_title(video, task["ai_title"])
  309. cls.insert_pq(task, oss_object_key,title, tags, fs_channel_name, video, voice, fs_mark)
  310. @classmethod
  311. def run(cls):
  312. uid = str(uuid.uuid4())
  313. file_path = os.path.join(CACHE_DIR, uid)
  314. logger.info(f"[机器改造] 开始获取redis数据")
  315. fs_data = os.getenv("FS_DATA")
  316. # fs_data = '快手关键词搜索,ks-gjc,B65Gs3bCHhIzj0t7KGWcwZD0nGf,91wp7k,AIzaSyBiwivvBKfqDsxvqAKBrCZyk-wFMhfthXg'
  317. try:
  318. fs_data_list = fs_data.split(',')
  319. fs_channel_name = fs_data_list[0]
  320. fs_mark = fs_data_list[1]
  321. fs_id = fs_data_list[2]
  322. fs_sheet = fs_data_list[3]
  323. ai_key = fs_data_list[4]
  324. data = RedisHelper().get_client().rpop(name=f"task:{fs_mark}")
  325. if not data:
  326. logger.info('[机器改造] 无待执行的扫描任务,重新获取飞书表格读取任务')
  327. fs_data = Material.get_carry_data(fs_id, fs_sheet)
  328. RedisHelper().get_client().rpush(f"task:{fs_mark}", *fs_data)
  329. return
  330. task = orjson.loads(data)
  331. totality_count = task['totality_count']
  332. if totality_count:
  333. task_totality_count = sqlCollect.get_mark_count(task["task_mark"])
  334. if int(totality_count) <= int(task_totality_count[0][0]):
  335. AliyunLogger.logging((task["channel"]), fs_channel_name, task["channel_url"], '',
  336. f"{task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}",
  337. "1111")
  338. logger.info(
  339. f"[+] {task['task_mark']}标识任务每日指定条数已足够,指定条数{totality_count},实际生成条数{int(task_totality_count[0][0])}")
  340. return
  341. if fs_channel_name in ["快手关键词搜索","抖音关键词搜索"]:
  342. gjc_count = RedisHelper().get_client().get(f"{fs_mark}_count")
  343. RedisHelper().get_client().incrby(f"{fs_mark}_count", 1)
  344. if int(gjc_count.decode()) >= 300:
  345. logger.info(f"[机器改造] {fs_channel_name}上限")
  346. return
  347. cls.data_handle(task, file_path, fs_channel_name, ai_key, fs_mark)
  348. for filename in os.listdir(CACHE_DIR):
  349. # 检查文件名是否包含关键字
  350. if uid in filename:
  351. file_path = os.path.join(CACHE_DIR, filename)
  352. try:
  353. # 删除文件
  354. os.remove(file_path)
  355. logger.info(f"[机器改造] 已删除文件: {file_path}")
  356. except Exception as e:
  357. logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}")
  358. return
  359. except Exception as e:
  360. for filename in os.listdir(CACHE_DIR):
  361. # 检查文件名是否包含关键字
  362. if uid in filename:
  363. file_path = os.path.join(CACHE_DIR, filename)
  364. try:
  365. # 删除文件
  366. os.remove(file_path)
  367. logger.info(f"[机器改造] 已删除文件: {file_path}")
  368. except Exception as e:
  369. logger.error(f"[机器改造] 删除文件时出错: {file_path}, 错误: {e}")
  370. return
  371. def run():
  372. scheduler = BlockingScheduler()
  373. try:
  374. logger.info(f"[机器改造] 开始启动")
  375. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=1)) # 每1分钟启动一次
  376. scheduler.start()
  377. except KeyboardInterrupt:
  378. pass
  379. except Exception as e:
  380. logger.error(f"[机器改造] 启动异常,异常信息:{e}")
  381. pass
  382. finally:
  383. scheduler.shutdown()
  384. if __name__ == '__main__':
  385. run()