import os import shutil import json import datetime import sys import time import traceback import requests import multiprocessing from threading import Timer from whisper_asr import get_whisper_asr from gpt_tag import request_gpt from config import set_config from audio_process import get_wav from log import Log config_ = set_config() log_ = Log() features = ['videoid', 'title', 'video_path'] def get_asr(video_id, download_folder, asr_folder): video_folder = os.path.join(download_folder, video_id) for filename in os.listdir(video_folder): video_type = filename.split('.')[-1] if video_type in ['mp4', 'm3u8']: video_file = os.path.join(video_folder, filename) audio_path = get_wav(video_file) # 1. asr识别 asr_res_initial = get_whisper_asr(audio=audio_path) print(video_id, asr_res_initial) # 2. 识别结果写入文件 asr_path = os.path.join(asr_folder, f"{video_id}.txt") with open(asr_path, 'w', encoding='utf-8') as wf: wf.write(asr_res_initial) # 将处理过的视频进行删除 # shutil.rmtree(os.path.join(download_folder, video_id)) break # def asr_process(project, table, dt, cuda_id): # # 获取特征数据 # feature_df = get_feature_data( # project=project, table=table, dt=dt, features=features) # video_id_list = feature_df['videoid'].to_list() # video_info = {} # for video_id in video_id_list: # title = feature_df[feature_df['videoid'] # == video_id]['title'].values[0] # if title is None: # continue # title = title.strip() # if len(title) > 0: # video_info[video_id] = {'title': title} # # 获取已下载视频,做asr识别 # download_folder = 'videos' # download_folder = f'{download_folder}_{cuda_id}' # asr_folder = 'asr_res' # if not os.path.exists(asr_folder): # os.makedirs(asr_folder) # retry = 0 # while retry < 3: # video_folder_list = os.listdir(download_folder) # if len(video_folder_list) < 1: # retry += 1 # time.sleep(60) # continue # retry = 0 # # for video_id in video_folder_list: # # if video_id not in video_id_list: # # continue # # if video_info.get(video_id, None) is None: # # try: # # shutil.rmtree(os.path.join(download_folder, video_id)) # # except: # # continue # # else: # # video_folder = os.path.join(download_folder, video_id) # # for filename in os.listdir(video_folder): # # video_type = filename.split('.')[-1] # # if video_type in ['mp4', 'm3u8']: # # video_file = os.path.join(video_folder, filename) # # # 1. asr识别 # # asr_res_initial = get_whisper_asr(video=video_file) # # print(video_id, asr_res_initial) # # # 2. 识别结果写入文件 # # asr_path = os.path.join(asr_folder, f"{video_id}.txt") # # with open(asr_path, 'w', encoding='utf-8') as wf: # # wf.write(asr_res_initial) # # # 将处理过的视频进行删除 # # shutil.rmtree(os.path.join(download_folder, video_id)) # # break # pool = multiprocessing.Pool(processes=2) # for video_id in video_folder_list: # if video_id not in video_id_list: # continue # if video_info.get(video_id, None) is None: # try: # shutil.rmtree(os.path.join(download_folder, video_id)) # except: # continue # else: # pool.apply_async( # func=get_asr, # args=(video_id, download_folder, asr_folder) # ) # pool.close() # pool.join() # def timer_check(): # try: # cuda_id = sys.argv[1] # project = config_.DAILY_VIDEO['project'] # table = config_.DAILY_VIDEO['table'] # now_date = datetime.datetime.today() # print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}") # dt = datetime.datetime.strftime( # now_date-datetime.timedelta(days=1), '%Y%m%d') # # 查看数据是否已准备好 # data_count = data_check(project=project, table=table, dt=dt) # if data_count > 0: # print(f'videos count = {data_count}') # download_folder = 'videos' # download_folder = f'{download_folder}_{cuda_id}' # if not os.path.exists(download_folder): # # 视频未下载好,1分钟后重新检查 # Timer(60, timer_check).start() # else: # # 数据准备好,进行asr # asr_process(project=project, table=table, # dt=dt, cuda_id=cuda_id) # print(f"videos asr finished!") # else: # # 数据没准备好,1分钟后重新检查 # Timer(60, timer_check).start() # except Exception as e: # print( # f"视频asr识别失败, exception: {e}, traceback: {traceback.format_exc()}") if __name__ == '__main__': # timer_check() cuda_id = sys.argv[1] download_folder = 'videos' download_folder = f'{download_folder}_{cuda_id}' if not os.path.exists(download_folder): print(f"download_folder: {download_folder} not exists!") exit(0) # 遍历download_folder下所有的子文件夹名,即video_id list video_folder_list = os.listdir(download_folder) if len(video_folder_list) < 1: print(f"video_folder_list is empty!") exit(0) asr_folder = 'asr_res' if not os.path.exists(asr_folder): os.makedirs(asr_folder) pool = multiprocessing.Pool(processes=5) for video_id in video_folder_list: pool.apply_async( func=get_asr, args=(video_id, download_folder, asr_folder) ) pool.close() pool.join() print(f"videos asr finished!")