123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import ast
- import base64
- import hashlib
- import hmac
- import json
- import time
- import requests
- import urllib
- import os
- from audio_process import get_audio_duration
- from config import set_config
- from log import Log
- config_ = set_config()
- log_ = Log()
- class RequestApi(object):
- def __init__(self, appid, secret_key, upload_file_path):
- self.appid = appid
- self.secret_key = secret_key
- self.upload_file_path = upload_file_path
- self.ts = str(int(time.time()))
- self.signa = self.get_signa()
- def get_signa(self):
- """
- signa生成
- :return: signa
- """
- # signa的生成公式:HmacSHA1(MD5(appid + ts),secretkey)
- m2 = hashlib.md5()
- m2.update((self.appid + self.ts).encode('utf-8'))
- md5 = m2.hexdigest()
- md5 = bytes(md5, encoding='utf-8')
- # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
- signa = hmac.new(self.secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
- signa = base64.b64encode(signa)
- signa = str(signa, 'utf-8')
- return signa
- def upload(self):
- """
- 上传
- :return: orderId
- """
- video_id = self.upload_file_path.split('/')[-1].replace('.wav', '')
- # 获取音频文件大小,不超过500M
- file_len = os.path.getsize(self.upload_file_path)
- file_size = file_len / 1024 / 1024
- if file_size > 500:
- log_.error({'videoId': video_id, 'errorType': 'audioSizeError',
- 'errorMsg': f'audioSize: {file_size}M, required <= 500M'})
- return None
- file_name = os.path.basename(self.upload_file_path)
- # 获取音频时长,不超过5h
- duration = get_audio_duration(self.upload_file_path)
- audio_duration = duration / 1000 / 60 / 60
- if audio_duration > 5:
- log_.error({'videoId': video_id, 'errorType': 'audioDurationError',
- 'errorMsg': f'audioSize: {audio_duration}h, required <= 5h'})
- return None
- # 请求参数拼接
- param_dict = {
- 'appId': self.appid,
- 'signa': self.signa,
- 'ts': self.ts,
- 'fileSize': file_len,
- 'fileName': file_name,
- 'duration': str(duration),
- 'roleType': 1
- }
- # print("upload参数:", param_dict)
- # 以二进制方式读取音频文件内容
- data = open(self.upload_file_path, 'rb').read(file_len)
- # 请求upload api
- response = requests.post(
- url=config_.XFASR_HOST + config_.XF_API['upload'] + "?" + urllib.parse.urlencode(param_dict),
- headers={"Content-type": "application/json"},
- data=data
- )
- # print(response.text)
- # print("upload_url:", response.request.url)
- result = json.loads(response.text)
- # print("upload resp:", result)
- return result['content']['orderId']
- def get_result(self, order_id):
- """
- 查询结果
- :param order_id:
- :return: result
- """
- param_dict = {
- 'appId': self.appid,
- 'signa': self.signa,
- 'ts': self.ts,
- 'orderId': order_id,
- 'resultType': 'transfer'
- }
- status = 3
- # 建议使用回调的方式查询结果,查询接口有请求频率限制
- while status == 3:
- response = requests.post(
- url=config_.XFASR_HOST + config_.XF_API['get_result'] + "?" + urllib.parse.urlencode(param_dict),
- headers={"Content-type": "application/json"}
- )
- # print("get_result_url:",response.request.url)
- result = json.loads(response.text)
- status = result['content']['orderInfo']['status']
- if status == 4:
- return result
- time.sleep(5)
- def parse_lattice(self, result):
- content = result['content']['orderResult']
- content = ast.literal_eval(content)
- contents = content['lattice']
- asr_ret = ''
- for js in contents:
- json_1best = js['json_1best']
- json_1best = ast.literal_eval(json_1best)
- # print(json_1best)
- json_1best_contents = json_1best['st']['rt']
- l = []
- for cw in json_1best_contents:
- cws = cw['ws']
- for cw in cws:
- l.append(cw['cw'][0]['w'])
- asr_ret += ''.join(l)+'\n'
- return asr_ret
- def call_asr(audio_path):
- """ASR"""
- dialogue_path = audio_path.replace('.wav', '.txt')
- # 视频已识别,则不重复调用,直接读取文件中的内容
- if os.path.exists(dialogue_path):
- with open(dialogue_path, 'r') as rf:
- asr_res = ''.join(rf.readlines())
- else:
- api = RequestApi(appid=config_.XFASR_CONFIG['appid'],
- secret_key=config_.XFASR_CONFIG['secret_key'],
- upload_file_path=audio_path)
- order_id = api.upload()
- result = api.get_result(order_id)
- asr_res = api.parse_lattice(result)
- with open(dialogue_path, 'w') as f:
- f.write(asr_res)
- return dialogue_path, asr_res
- if __name__ == '__main__':
- audio_path = 'videos/1275943.wav'
- call_asr(audio_path=audio_path)
|