ai_tag_task.py 11 KB


  1. import os
  2. import shutil
  3. import json
  4. import datetime
  5. import time
  6. import traceback
  7. import requests
  8. from threading import Timer
  9. from utils import data_check, get_feature_data, asr_validity_discrimination
  10. from whisper_asr import get_whisper_asr
  11. from gpt_tag import request_gpt
  12. from config import set_config
  13. from log import Log
  14. from ReadXlsxFile import getVideoInfoInXlxs
  15. from result_save import insert_content
  16. from result_save import insert_content
  17. config_ = set_config()
  18. log_ = Log()
  19. features = ['videoid', 'title', 'video_path']
  20. def get_video_ai_tags(video_id, video_info):
  21. try:
  22. st_time = time.time()
  23. log_message = {
  24. 'videoId': int(video_id),
  25. }
  26. title = video_info.get('title')
  27. log_message['videoPath'] = video_info.get('video_path')
  28. log_message['title'] = video_info.get('title')
  29. # 1. 获取asr结果
  30. asr_res_initial = video_info.get('asr_res', '')
  31. log_message['asrRes'] = asr_res_initial
  32. # 2. 判断asr识别的文本是否有效
  33. validity = asr_validity_discrimination(text=asr_res_initial)
  34. log_message['asrValidity'] = validity
  35. if validity is True:
  36. # 3. 对asr结果进行清洗
  37. asr_res = asr_res_initial.replace('\n', '')
  38. for stop_word in config_.STOP_WORDS:
  39. asr_res = asr_res.replace(stop_word, '')
  40. # token限制: 字数 <= 2500
  41. asr_res = asr_res[-2500:]
  42. # 4. gpt产出结果
  43. # 4.1 gpt产出summary, keywords,
  44. prompt1 = f"{config_.GPT_PROMPT['tags']['prompt6']}{asr_res.strip()}"
  45. log_message['gptPromptSummaryKeywords'] = prompt1
  46. gpt_res1 = request_gpt(prompt=prompt1)
  47. log_message['gptResSummaryKeywords'] = gpt_res1
  48. if gpt_res1 is not None:
  49. # 4.2 获取summary, keywords, title进行分类
  50. try:
  51. gpt_res1_json = json.loads(gpt_res1)
  52. summary = gpt_res1_json['summary']
  53. keywords = gpt_res1_json['keywords']
  54. log_message['summary'] = summary
  55. log_message['keywords'] = str(keywords)
  56. # TODO 三个 prompt 拆分成三个请求
  57. prompt2_param = f"标题:{title}\n概况:{summary}\n关键词:{keywords}"
  58. prompt2 = f"{config_.GPT_PROMPT['tags']['prompt8']}{prompt2_param}"
  59. log_message['gptPrompt2'] = prompt2
  60. gpt_res2 = request_gpt(prompt=prompt2)
  61. log_message['gptRes2'] = gpt_res2
  62. prompt3 = f"{config_.GPT_PROMPT['tags']['prompt9']}{prompt2_param}"
  63. log_message['gptPrompt3'] = prompt3
  64. gpt_res3 = request_gpt(prompt=prompt3)
  65. log_message['gptRes3'] = gpt_res3
  66. prompt4 = f"{config_.GPT_PROMPT['tags']['prompt10']}{prompt2_param}"
  67. log_message['gptPrompt4'] = prompt4
  68. gpt_res4 = request_gpt(prompt=prompt4)
  69. log_message['gptRes4'] = gpt_res4
  70. # 5. 解析gpt产出结果
  71. parseRes = praseGptRes(gpt_res2, gpt_res3, gpt_res4)
  72. parseRes['video_id'] = video_id
  73. log_message.update(parseRes)
  74. # 6. 保存结果
  75. insert_content(parseRes)
  76. except:
  77. log_.error(traceback.format_exc())
  78. pass
  79. else:
  80. pass
  81. log_message['executeTime'] = (time.time() - st_time) * 1000
  82. log_.info(log_message)
  83. except Exception as e:
  84. log_.error(e)
  85. log_.error(traceback.format_exc())
  86. def praseGptRes(gpt_res2, gpt_res3, gpt_res4):
  87. result = {}
  88. if gpt_res2 is not None:
  89. try:
  90. res2 = json.loads(gpt_res2)
  91. result['key_words'] = res2['key_words']
  92. result['search_keys'] = res2['search_keys']
  93. result['extra_keys'] = res2['extra_keys']
  94. except:
  95. pass
  96. if gpt_res3 is not None:
  97. try:
  98. res3 = json.loads(gpt_res3)
  99. result['tone'] = res3['tone']
  100. result['target_audience'] = res3['target_audience']
  101. result['target_age'] = res3['target_age']
  102. except:
  103. pass
  104. if gpt_res4 is not None:
  105. try:
  106. res4 = json.loads(gpt_res4)
  107. result['category'] = res4['category']
  108. result['target_gender'] = res4['target_gender']
  109. result['address'] = res4['address']
  110. result['theme'] = res4['theme']
  111. except:
  112. pass
  113. return result
  114. def process(video_id, video_info, download_folder):
  115. if video_info.get(video_id, None) is None:
  116. shutil.rmtree(os.path.join(download_folder, video_id))
  117. else:
  118. video_folder = os.path.join(download_folder, video_id)
  119. for filename in os.listdir(video_folder):
  120. video_type = filename.split('.')[-1]
  121. if video_type in ['mp4', 'm3u8']:
  122. video_file = os.path.join(video_folder, filename)
  123. get_video_ai_tags(
  124. video_id=video_id, video_file=video_file, video_info=video_info.get(video_id))
  125. # 将处理过的视频进行删除
  126. shutil.rmtree(os.path.join(download_folder, video_id))
  127. else:
  128. shutil.rmtree(os.path.join(download_folder, video_id))
  129. def ai_tags(project, table, dt):
  130. # 获取特征数据
  131. feature_df = get_feature_data(
  132. project=project, table=table, dt=dt, features=features)
  133. video_id_list = feature_df['videoid'].to_list()
  134. video_info = {}
  135. for video_id in video_id_list:
  136. title = feature_df[feature_df['videoid']
  137. == video_id]['title'].values[0]
  138. video_path = feature_df[feature_df['videoid']
  139. == video_id]['video_path'].values[0]
  140. if title is None:
  141. continue
  142. title = title.strip()
  143. if len(title) > 0:
  144. video_info[video_id] = {'title': title, 'video_path': video_path}
  145. # print(video_id, title)
  146. print(len(video_info))
  147. # 获取已下载视频
  148. download_folder = 'videos'
  149. retry = 0
  150. while retry < 3:
  151. video_folder_list = os.listdir(download_folder)
  152. if len(video_folder_list) < 2:
  153. retry += 1
  154. time.sleep(60)
  155. continue
  156. # pool = multiprocessing.Pool(processes=5)
  157. # for video_id in video_folder_list:
  158. # if video_id not in video_id_list:
  159. # continue
  160. # pool.apply_async(
  161. # func=process,
  162. # args=(video_id, video_info, download_folder)
  163. # )
  164. # pool.close()
  165. # pool.join()
  166. for video_id in video_folder_list:
  167. if video_id not in video_id_list:
  168. continue
  169. if video_info.get(video_id, None) is None:
  170. shutil.rmtree(os.path.join(download_folder, video_id))
  171. else:
  172. video_folder = os.path.join(download_folder, video_id)
  173. for filename in os.listdir(video_folder):
  174. video_type = filename.split('.')[-1]
  175. if video_type in ['mp4', 'm3u8']:
  176. video_file = os.path.join(video_folder, filename)
  177. get_video_ai_tags(
  178. video_id=video_id, video_file=video_file, video_info=video_info.get(video_id))
  179. # 将处理过的视频进行删除
  180. shutil.rmtree(os.path.join(download_folder, video_id))
  181. else:
  182. shutil.rmtree(os.path.join(download_folder, video_id))
  183. def get_asr_res(video_id):
  184. # URL of the API endpoint
  185. url = 'http://61.48.133.26:5999/video_to_text'
  186. # Headers for the request
  187. headers = {
  188. 'Content-Type': 'application/json'
  189. }
  190. # Data to be sent in the request
  191. data = {
  192. "video_id": f"{video_id}"
  193. }
  194. # Making the POST request
  195. response = requests.post(url, headers=headers, json=data)
  196. # Checking if the request was successful
  197. if response.status_code == 200:
  198. # Extracting the 'text' field from the JSON response
  199. result_text = response.json().get('text', '无内容')
  200. return result_text
  201. else:
  202. return '无内容'
  203. def ai_tags_new(project, table, dt):
  204. # 获取特征数据
  205. feature_df = get_feature_data(
  206. project=project, table=table, dt=dt, features=features)
  207. video_id_list = feature_df['videoid'].to_list()
  208. video_info = {}
  209. for video_id in video_id_list:
  210. title = feature_df[feature_df['videoid']
  211. == video_id]['title'].values[0]
  212. video_path = feature_df[feature_df['videoid']
  213. == video_id]['video_path'].values[0]
  214. if title is None:
  215. continue
  216. title = title.strip()
  217. if len(title) < 1:
  218. continue
  219. # 获取asr结果
  220. asr_res = get_asr_res(video_id)
  221. video_info[video_id] = {'title': title,
  222. 'video_path': video_path, 'asr_res': asr_res}
  223. get_video_ai_tags(video_id=video_id,
  224. video_info=video_info.get(video_id))
  225. def timer_check():
  226. try:
  227. project = config_.DAILY_VIDEO['project']
  228. table = config_.DAILY_VIDEO['table']
  229. now_date = datetime.datetime.today()
  230. print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  231. dt = datetime.datetime.strftime(
  232. now_date-datetime.timedelta(days=1), '%Y%m%d')
  233. # 查看数据是否已准备好
  234. data_count = data_check(project=project, table=table, dt=dt)
  235. if data_count > 0:
  236. print(f'videos count = {data_count}')
  237. asr_folder = 'asr_res'
  238. if not os.path.exists(asr_folder):
  239. # 1分钟后重新检查
  240. Timer(60, timer_check).start()
  241. else:
  242. # 数据准备好,进行aiTag
  243. ai_tags_new(project=project, table=table, dt=dt)
  244. print(f"videos ai tag finished!")
  245. else:
  246. # 数据没准备好,1分钟后重新检查
  247. Timer(60, timer_check).start()
  248. except Exception as e:
  249. print(
  250. f"视频ai打标签失败, exception: {e}, traceback: {traceback.format_exc()}")
  251. if __name__ == '__main__':
  252. # timer_check()
  253. feature_df = getVideoInfoInXlxs('aigc-test/past_videos.xlsx')
  254. video_id_list = feature_df['videoid'].to_list()
  255. video_info = {}
  256. for video_id in video_id_list:
  257. title = feature_df[feature_df['videoid']
  258. == video_id]['title'].values[0]
  259. video_path = feature_df[feature_df['videoid']
  260. == video_id]['video_path'].values[0]
  261. if title is None:
  262. continue
  263. title = title.strip()
  264. if len(title) < 1:
  265. continue
  266. # 获取asr结果
  267. asr_res = get_asr_res(video_id)
  268. video_info[video_id] = {'title': title,
  269. 'video_path': video_path, 'asr_res': asr_res}
  270. get_video_ai_tags(video_id=video_id,
  271. video_info=video_info.get(video_id))