ai_tag_task.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import os
  2. import json
  3. import datetime
  4. import time
  5. import traceback
  6. import requests
  7. from threading import Timer
  8. from utils import data_check, get_feature_data, asr_validity_discrimination
  9. from whisper_asr import get_whisper_asr
  10. from gpt_tag import request_gpt
  11. from config import set_config
  12. from log import Log
  13. config_ = set_config()
  14. log_ = Log()
  15. features = ['videoid', 'title', 'video_path']
  16. def get_video_ai_tags(video_id, video_file, video_info):
  17. try:
  18. st_time = time.time()
  19. log_message = {
  20. 'videoId': int(video_id),
  21. }
  22. title = video_info.get('title')
  23. log_message['videoPath'] = video_info.get('video_path')
  24. log_message['title'] = video_info.get('title')
  25. # 1. asr
  26. asr_res_initial = get_whisper_asr(video=video_file)
  27. log_message['asrRes'] = asr_res_initial
  28. # 2. 判断asr识别的文本是否有效
  29. validity = asr_validity_discrimination(text=asr_res_initial)
  30. log_message['asrValidity'] = validity
  31. if validity is True:
  32. # 3. 对asr结果进行清洗
  33. asr_res = asr_res_initial.replace('\n', '')
  34. for stop_word in config_.STOP_WORDS:
  35. asr_res = asr_res.replace(stop_word, '')
  36. # token限制: 字数 <= 2500
  37. asr_res = asr_res[-2500:]
  38. # 4. gpt产出结果
  39. # 4.1 gpt产出summary, keywords,
  40. prompt1 = f"{config_.GPT_PROMPT['tags']['prompt6']}{asr_res.strip()}"
  41. log_message['gptPromptSummaryKeywords'] = prompt1
  42. gpt_res1 = request_gpt(prompt=prompt1)
  43. log_message['gptResSummaryKeywords'] = gpt_res1
  44. if gpt_res1 is not None:
  45. # 4.2 获取summary, keywords, title进行分类
  46. try:
  47. gpt_res1_json = json.loads(gpt_res1)
  48. summary = gpt_res1_json['summary']
  49. keywords = gpt_res1_json['keywords']
  50. log_message['summary'] = summary
  51. log_message['keywords'] = keywords
  52. prompt2_param = f"标题:{title}\n概况:{summary}\n关键词:{keywords}"
  53. prompt2 = f"{config_.GPT_PROMPT['tags']['prompt7']}{prompt2_param}"
  54. log_message['gptPromptTag'] = prompt2
  55. gpt_res2 = request_gpt(prompt=prompt2)
  56. log_message['gptResTag'] = gpt_res2
  57. if gpt_res2 is not None:
  58. confidence_up_list = []
  59. try:
  60. for item in json.loads(gpt_res2):
  61. if item['confidence'] > 0.5 and item['category'] in config_.TAGS_NEW:
  62. confidence_up_list.append(f"AI标签-{item['category']}")
  63. except:
  64. pass
  65. confidence_up = ','.join(confidence_up_list)
  66. log_message['AITags'] = confidence_up
  67. # 5. 调用后端接口,结果传给后端
  68. if len(confidence_up) > 0:
  69. response = requests.post(url=config_.ADD_VIDEO_AI_TAGS_URL,
  70. json={'videoId': int(video_id), 'tagNames': confidence_up})
  71. res_data = json.loads(response.text)
  72. if res_data['code'] != 0:
  73. log_.error({'videoId': video_id, 'msg': 'add video ai tags fail!'})
  74. except:
  75. pass
  76. else:
  77. pass
  78. log_message['executeTime'] = (time.time() - st_time) * 1000
  79. log_.info(log_message)
  80. except Exception as e:
  81. log_.error(e)
  82. log_.error(traceback.format_exc())
  83. def ai_tags(project, table, dt):
  84. # 获取特征数据
  85. feature_df = get_feature_data(project=project, table=table, dt=dt, features=features)
  86. video_id_list = feature_df['videoid'].to_list()
  87. video_info = {}
  88. for video_id in video_id_list:
  89. title = feature_df[feature_df['videoid'] == video_id]['title'].values[0]
  90. video_path = feature_df[feature_df['videoid'] == video_id]['video_path'].values[0]
  91. if title is None:
  92. continue
  93. title = title.strip()
  94. if len(title) > 0:
  95. video_info[video_id] = {'title': title, 'video_path': video_path}
  96. # print(video_id, title)
  97. print(len(video_info))
  98. # 获取已下载视频
  99. download_folder = 'videos'
  100. retry = 0
  101. while retry < 3:
  102. video_folder_list = os.listdir(download_folder)
  103. if len(video_folder_list) < 2:
  104. retry += 1
  105. time.sleep(60)
  106. continue
  107. for video_id in video_folder_list:
  108. if video_id not in video_id_list:
  109. continue
  110. if video_info.get(video_id, None) is None:
  111. os.removedirs(os.path.join(download_folder, video_id))
  112. else:
  113. video_folder = os.path.join(download_folder, video_id)
  114. for filename in os.listdir(video_folder):
  115. video_type = filename.split('.')[-1]
  116. if video_type in ['mp4', 'm3u8']:
  117. video_file = os.path.join(video_folder, filename)
  118. get_video_ai_tags(video_id=video_id, video_file=video_file, video_info=video_info.get(video_id))
  119. # 将处理过的视频进行删除
  120. os.removedirs(os.path.join(download_folder, video_id))
  121. else:
  122. os.removedirs(os.path.join(download_folder, video_id))
  123. def timer_check():
  124. try:
  125. project = config_.DAILY_VIDEO['project']
  126. table = config_.DAILY_VIDEO['table']
  127. now_date = datetime.datetime.today()
  128. print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  129. dt = datetime.datetime.strftime(now_date-datetime.timedelta(days=1), '%Y%m%d')
  130. # 查看数据是否已准备好
  131. data_count = data_check(project=project, table=table, dt=dt)
  132. if data_count > 0:
  133. print(f'videos count = {data_count}')
  134. # 数据准备好,进行视频下载
  135. ai_tags(project=project, table=table, dt=dt)
  136. print(f"videos ai tag finished!")
  137. else:
  138. # 数据没准备好,1分钟后重新检查
  139. Timer(60, timer_check).start()
  140. except Exception as e:
  141. print(f"视频ai打标签失败, exception: {e}, traceback: {traceback.format_exc()}")
  142. if __name__ == '__main__':
  143. timer_check()