import ast import base64 import hashlib import hmac import json import time import requests import urllib import os from audio_process import get_audio_duration from config import set_config from log import Log config_ = set_config() log_ = Log() class RequestApi(object): def __init__(self, appid, secret_key, upload_file_path): self.appid = appid self.secret_key = secret_key self.upload_file_path = upload_file_path self.ts = str(int(time.time())) self.signa = self.get_signa() def get_signa(self): """ signa生成 :return: signa """ # signa的生成公式:HmacSHA1(MD5(appid + ts),secretkey) m2 = hashlib.md5() m2.update((self.appid + self.ts).encode('utf-8')) md5 = m2.hexdigest() md5 = bytes(md5, encoding='utf-8') # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa signa = hmac.new(self.secret_key.encode('utf-8'), md5, hashlib.sha1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') return signa def upload(self): """ 上传 :return: orderId """ video_id = self.upload_file_path.split('/')[-1].replace('.wav', '') # 获取音频文件大小,不超过500M file_len = os.path.getsize(self.upload_file_path) file_size = file_len / 1024 / 1024 if file_size > 500: log_.error({'videoId': video_id, 'errorType': 'audioSizeError', 'errorMsg': f'audioSize: {file_size}M, required <= 500M'}) return None file_name = os.path.basename(self.upload_file_path) # 获取音频时长,不超过5h duration = get_audio_duration(self.upload_file_path) audio_duration = duration / 1000 / 60 / 60 if audio_duration > 5: log_.error({'videoId': video_id, 'errorType': 'audioDurationError', 'errorMsg': f'audioSize: {audio_duration}h, required <= 5h'}) return None # 请求参数拼接 param_dict = { 'appId': self.appid, 'signa': self.signa, 'ts': self.ts, 'fileSize': file_len, 'fileName': file_name, 'duration': str(duration), 'roleType': 1 } # print("upload参数:", param_dict) # 以二进制方式读取音频文件内容 data = open(self.upload_file_path, 'rb').read(file_len) # 请求upload api response = requests.post( url=config_.XFASR_HOST + config_.XF_API['upload'] + "?" + urllib.parse.urlencode(param_dict), headers={"Content-type": "application/json"}, data=data ) # print(response.text) # print("upload_url:", response.request.url) result = json.loads(response.text) # print("upload resp:", result) return result['content']['orderId'] def get_result(self, order_id): """ 查询结果 :param order_id: :return: result """ param_dict = { 'appId': self.appid, 'signa': self.signa, 'ts': self.ts, 'orderId': order_id, 'resultType': 'transfer' } status = 3 # 建议使用回调的方式查询结果,查询接口有请求频率限制 while status == 3: response = requests.post( url=config_.XFASR_HOST + config_.XF_API['get_result'] + "?" + urllib.parse.urlencode(param_dict), headers={"Content-type": "application/json"} ) # print("get_result_url:",response.request.url) result = json.loads(response.text) status = result['content']['orderInfo']['status'] if status == 4: return result time.sleep(5) def parse_lattice(self, result): content = result['content']['orderResult'] content = ast.literal_eval(content) contents = content['lattice'] asr_ret = '' for js in contents: json_1best = js['json_1best'] json_1best = ast.literal_eval(json_1best) # print(json_1best) json_1best_contents = json_1best['st']['rt'] l = [] for cw in json_1best_contents: cws = cw['ws'] for cw in cws: l.append(cw['cw'][0]['w']) asr_ret += ''.join(l)+'\n' return asr_ret def call_asr(audio_path): """ASR""" dialogue_path = audio_path.replace('.wav', '.txt') # 视频已识别,则不重复调用,直接读取文件中的内容 if os.path.exists(dialogue_path): with open(dialogue_path, 'r') as rf: asr_res = ''.join(rf.readlines()) else: api = RequestApi(appid=config_.XFASR_CONFIG['appid'], secret_key=config_.XFASR_CONFIG['secret_key'], upload_file_path=audio_path) order_id = api.upload() result = api.get_result(order_id) asr_res = api.parse_lattice(result) with open(dialogue_path, 'w') as f: f.write(asr_res) return dialogue_path, asr_res if __name__ == '__main__': audio_path = 'videos/1275943.wav' call_asr(audio_path=audio_path)