import os import json import datetime import time import traceback import requests from threading import Timer from utils import data_check, get_feature_data, asr_validity_discrimination from whisper_asr import get_whisper_asr from gpt_tag import request_gpt from config import set_config from log import Log config_ = set_config() log_ = Log() features = ['videoid', 'title', 'video_path'] def get_video_ai_tags(video_id, video_file, video_info): try: log_message = { 'videoId': int(video_id), } title = video_info.get('title') log_message['videoPath'] = video_info.get('video_path') log_message['title'] = video_info.get('title') # 1. asr asr_res_initial = get_whisper_asr(video=video_file) log_message['asrRes'] = asr_res_initial # 2. 判断asr识别的文本是否有效 validity = asr_validity_discrimination(text=asr_res_initial) log_message['asrValidity'] = validity if validity is True: # 3. 对asr结果进行清洗 asr_res = asr_res_initial.replace('\n', '') for stop_word in config_.STOP_WORDS: asr_res = asr_res.replace(stop_word, '') # token限制: 字数 <= 2500 asr_res = asr_res[-2500:] # 4. gpt产出结果 # 4.1 gpt产出summary, keywords, prompt1 = f"{config_.GPT_PROMPT['tags']['prompt6']}{asr_res.strip()}" log_message['gptPromptSummaryKeywords'] = prompt1 gpt_res1 = request_gpt(prompt=prompt1) log_message['gptResSummaryKeywords'] = gpt_res1 if gpt_res1 is not None: # 4.2 获取summary, keywords, title进行分类 try: gpt_res1_json = json.loads(gpt_res1) summary = gpt_res1_json['summary'] keywords = gpt_res1_json['keywords'] log_message['summary'] = summary log_message['keywords'] = keywords prompt2_param = f"标题:{title}\n概况:{summary}\n关键词:{keywords}" prompt2 = f"{config_.GPT_PROMPT['tags']['prompt7']}{prompt2_param}" log_message['gptPromptTag'] = prompt2 gpt_res2 = request_gpt(prompt=prompt2) log_message['gptResTag'] = gpt_res2 if gpt_res2 is not None: confidence_up_list = [] try: for item in json.loads(gpt_res2): if item['confidence'] > 0.5 and item['category'] in config_.TAGS_NEW: confidence_up_list.append(f"AI标签-{item['category']}") except: pass confidence_up = ','.join(confidence_up_list) log_message['AITags'] = confidence_up # 5. 调用后端接口,结果传给后端 if len(confidence_up) > 0: response = requests.post(url=config_.ADD_VIDEO_AI_TAGS_URL, json={'videoId': int(video_id), 'tagNames': confidence_up}) res_data = json.loads(response.text) if res_data['code'] != 0: log_.error({'videoId': video_id, 'msg': 'add video ai tags fail!'}) except: pass else: pass log_.info(log_message) except Exception as e: log_.error(e) log_.error(traceback.format_exc()) def ai_tags(project, table, dt): # 获取特征数据 feature_df = get_feature_data(project=project, table=table, dt=dt, features=features) video_id_list = feature_df['videoid'].to_list() video_info = {} for video_id in video_id_list: title = feature_df[feature_df['videoid'] == video_id]['title'].values[0] video_path = feature_df[feature_df['videoid'] == video_id]['video_path'].values[0] if title is None: continue title = title.strip() if len(title) > 0: video_info[video_id] = {'title': title, 'video_path': video_path} # print(video_id, title) print(len(video_info)) # 获取已下载视频 download_folder = 'videos' retry = 0 while retry < 3: video_folder_list = os.listdir(download_folder) if len(video_folder_list) < 2: retry += 1 time.sleep(60) continue for video_id in video_folder_list: if video_id not in video_id_list: continue if video_info.get(video_id, None) is None: os.rmdir(os.path.join(download_folder, video_id)) else: video_folder = os.path.join(download_folder, video_id) for filename in os.listdir(video_folder): video_type = filename.split('.')[-1] if video_type in ['mp4', 'm3u8']: video_file = os.path.join(video_folder, filename) get_video_ai_tags(video_id=video_id, video_file=video_file, video_info=video_info.get(video_id)) # 将处理过的视频进行删除 os.rmdir(os.path.join(download_folder, video_id)) def timer_check(): try: project = config_.DAILY_VIDEO['project'] table = config_.DAILY_VIDEO['table'] now_date = datetime.datetime.today() print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}") dt = datetime.datetime.strftime(now_date-datetime.timedelta(days=1), '%Y%m%d') # 查看数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: print(f'videos count = {data_count}') # 数据准备好,进行视频下载 ai_tags(project=project, table=table, dt=dt) print(f"videos ai tag finished!") else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() except Exception as e: print(f"视频ai打标签失败, exception: {e}, traceback: {traceback.format_exc()}") if __name__ == '__main__': timer_check()