asr_task.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import os
  2. import shutil
  3. import json
  4. import datetime
  5. import sys
  6. import time
  7. import traceback
  8. import requests
  9. import multiprocessing
  10. from threading import Timer
  11. from whisper_asr import get_whisper_asr
  12. from gpt_tag import request_gpt
  13. from config import set_config
  14. from audio_process import get_wav
  15. from log import Log
  16. from utils import data_check, get_feature_data
  17. config_ = set_config()
  18. log_ = Log()
  19. features = ['videoid', 'title', 'video_path']
  20. def get_asr(video_id, download_folder, asr_folder):
  21. video_folder = os.path.join(download_folder, video_id)
  22. for filename in os.listdir(video_folder):
  23. video_type = filename.split('.')[-1]
  24. if video_type in ['mp4', 'm3u8']:
  25. video_file = os.path.join(video_folder, filename)
  26. audio_path = get_wav(video_file)
  27. # 1. asr识别
  28. asr_res_initial = get_whisper_asr(audio=audio_path)
  29. print(video_id, asr_res_initial)
  30. # 2. 识别结果写入文件
  31. asr_path = os.path.join(asr_folder, f"{video_id}.txt")
  32. with open(asr_path, 'w', encoding='utf-8') as wf:
  33. wf.write(asr_res_initial)
  34. # 将处理过的视频进行删除
  35. shutil.rmtree(os.path.join(download_folder, video_id))
  36. break
  37. def asr_process(project, table, dt, cuda_id):
  38. # 获取特征数据
  39. feature_df = get_feature_data(
  40. project=project, table=table, dt=dt, features=features)
  41. video_id_list = feature_df['videoid'].to_list()
  42. video_info = {}
  43. for video_id in video_id_list:
  44. title = feature_df[feature_df['videoid']
  45. == video_id]['title'].values[0]
  46. if title is None:
  47. continue
  48. title = title.strip()
  49. if len(title) > 0:
  50. video_info[video_id] = {'title': title}
  51. # 获取已下载视频,做asr识别
  52. download_folder = 'videos'
  53. download_folder = f'{download_folder}_{cuda_id}'
  54. asr_folder = 'asr_res'
  55. if not os.path.exists(asr_folder):
  56. os.makedirs(asr_folder)
  57. retry = 0
  58. while retry < 3:
  59. video_folder_list = os.listdir(download_folder)
  60. if len(video_folder_list) < 1:
  61. retry += 1
  62. time.sleep(60)
  63. continue
  64. retry = 0
  65. for video_id in video_folder_list:
  66. if video_id not in video_id_list:
  67. continue
  68. if video_info.get(video_id, None) is None:
  69. try:
  70. shutil.rmtree(os.path.join(download_folder, video_id))
  71. except:
  72. continue
  73. else:
  74. video_folder = os.path.join(download_folder, video_id)
  75. for filename in os.listdir(video_folder):
  76. video_type = filename.split('.')[-1]
  77. if video_type in ['mp4', 'm3u8']:
  78. video_file = os.path.join(video_folder, filename)
  79. # 1. asr识别
  80. asr_res_initial = get_whisper_asr(audio=video_file)
  81. print(video_id, asr_res_initial)
  82. # 2. 识别结果写入文件
  83. asr_path = os.path.join(asr_folder, f"{video_id}.txt")
  84. with open(asr_path, 'w', encoding='utf-8') as wf:
  85. wf.write(asr_res_initial)
  86. # 将处理过的视频进行删除
  87. shutil.rmtree(os.path.join(download_folder, video_id))
  88. break
  89. pool = multiprocessing.Pool(processes=2)
  90. for video_id in video_folder_list:
  91. if video_id not in video_id_list:
  92. continue
  93. if video_info.get(video_id, None) is None:
  94. try:
  95. shutil.rmtree(os.path.join(download_folder, video_id))
  96. except:
  97. continue
  98. else:
  99. pool.apply_async(
  100. func=get_asr,
  101. args=(video_id, download_folder, asr_folder)
  102. )
  103. pool.close()
  104. pool.join()
  105. def timer_check():
  106. try:
  107. cuda_id = sys.argv[1]
  108. project = config_.DAILY_VIDEO['project']
  109. table = config_.DAILY_VIDEO['table']
  110. now_date = datetime.datetime.today()
  111. print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  112. dt = datetime.datetime.strftime(
  113. now_date-datetime.timedelta(days=1), '%Y%m%d')
  114. # 查看数据是否已准备好
  115. data_count = data_check(project=project, table=table, dt=dt)
  116. if data_count > 0:
  117. print(f'videos count = {data_count}')
  118. download_folder = 'videos'
  119. download_folder = f'{download_folder}_{cuda_id}'
  120. if not os.path.exists(download_folder):
  121. # 视频未下载好,1分钟后重新检查
  122. Timer(60, timer_check).start()
  123. else:
  124. # 数据准备好,进行asr
  125. asr_process(project=project, table=table,
  126. dt=dt, cuda_id=cuda_id)
  127. print(f"videos asr finished!")
  128. else:
  129. # 数据没准备好,1分钟后重新检查
  130. Timer(60, timer_check).start()
  131. except Exception as e:
  132. print(
  133. f"视频asr识别失败, exception: {e}, traceback: {traceback.format_exc()}")
  134. if __name__ == '__main__':
  135. # timer_check()
  136. cuda_id = sys.argv[1]
  137. download_folder = 'videos'
  138. download_folder = f'{download_folder}_{cuda_id}'
  139. if not os.path.exists(download_folder):
  140. print(f"download_folder: {download_folder} not exists!")
  141. exit(0)
  142. # 遍历download_folder下所有的子文件夹名,即video_id list
  143. video_folder_list = os.listdir(download_folder)
  144. if len(video_folder_list) < 1:
  145. print(f"video_folder_list is empty!")
  146. exit(0)
  147. asr_folder = 'asr_res'
  148. if not os.path.exists(asr_folder):
  149. os.makedirs(asr_folder)
  150. pool = multiprocessing.Pool(processes=2)
  151. for video_id in video_folder_list:
  152. pool.apply_async(
  153. func=get_asr,
  154. args=(video_id, download_folder, asr_folder)
  155. )
  156. pool.close()
  157. pool.join()
  158. print(f"videos asr finished!")