liqian 1 éve
szülő
commit
de694d738f
10 módosított fájl, 752 hozzáadás és 0 törlés
  1. 4 0
      .gitignore
  2. 24 0
      audio_process.py
  3. 90 0
      config.py
  4. 94 0
      feishu.py
  5. 42 0
      gpt_tag.py
  6. 40 0
      log.py
  7. 85 0
      log_conf.py
  8. 143 0
      main_process.py
  9. 76 0
      utils.py
  10. 154 0
      xunfei_asr.py

+ 4 - 0
.gitignore

@@ -58,3 +58,7 @@ docs/_build/
 # PyBuilder
 target/
 
+videos/
+logs/
+.idea/
+

+ 24 - 0
audio_process.py

@@ -0,0 +1,24 @@
+from moviepy.editor import AudioFileClip, VideoFileClip
+from config import set_config
+
+config_ = set_config()
+
+
+def get_wav(video_path):
+    """提取音频"""
+    # 音频采样率设置为:8K
+    video = VideoFileClip(video_path, audio_fps=8000)
+    # Extract the audio from the video
+    audio = video.audio
+    # Save the extracted audio to a file
+    audio_path = video_path.replace('.mp4', '.wav')
+    audio.write_audiofile(audio_path)
+    return audio_path
+
+
+def get_audio_duration(audio_file_path):
+    """获取音频时长,单位:ms"""
+    audio_clip = AudioFileClip(audio_file_path)
+    audio_length = audio_clip.duration
+    return int(audio_length * 1000)
+

+ 90 - 0
config.py

@@ -0,0 +1,90 @@
+import os
+
+
+class BaseConfig(object):
+    # 讯飞asr配置
+    XFASR_HOST = 'https://raasr.xfyun.cn/v2/api'
+    XF_API = {
+        'upload': '/upload',
+        'get_result': '/getResult'
+    }
+    XFASR_CONFIG = {
+        'appid': 'ac4ec700',
+        'secret_key': 'f822c63011275bcd26fa286fbb01768a'
+    }
+
+    # gpt配置
+    GPT_HOST = 'https://api.openai.com/v1/chat/completions'
+    # GPT_OPENAI_API_KEY = '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'
+    GPT_OPENAI_API_KEY = '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'
+
+    # 代理地址
+    PROXIES = {
+        'http': 'http://127.0.0.1:4780',
+        'https': 'http://127.0.0.1:4780'
+    }
+
+    # 飞书应用凭证
+    FEISHU_TOKEN = {
+        'app_id': 'cli_a3667697a57b500e',
+        'app_secret': '5eMszgeNt21U56XnPjCykgmTfZUEEMnp'
+    }
+
+    # stop_words
+    STOP_WORDS = ['啊', '嗯', '呗', '呢', '呐', '呀', '唉', '哎', '额', '呃', '哦', '呵']
+
+    # video tags
+    TAGS = ['舞蹈', '美食', '时尚', '旅行', '音乐', '运动', '影视', '搞笑', '科技', '综艺',
+            '游戏', '情感', '健康', '人文', '社会', '热点', '财富', '生活']
+    TAGS_NEW = ['资讯', '国际', '政治', '科技', '军事', '健康', '社会', '民生政策', '人文', '人物', '旅行', '生活',
+                '美食', '综艺', '搞笑', '民俗文化', '安全', '休闲娱乐', '艺术', '情感']
+    # GPT prompt
+    GPT_PROMPT = {
+        'tags': {
+            'prompt1': f"""
+请对如下文本进行分类。类别为其中的一个:【{' '.join(TAGS)}】。
+以json格式返回,key为category与confidence,分别代表类别与分类置信度。给出top 3的分类结果。
+-----------------------------
+""",
+            'prompt2': f"""
+请对如下文本进行:
+1. 分类,类别为其中的一个:【{' '.join(TAGS)}】。如果无法有效分类,请返回“其他”。
+2. 用20个字以内对文本内容进行概况。
+3. 为文本取一个易于分享,吸引人要求的标题。
+4. 列举三个关键词。
+以json格式返回,key为category, confidence, summery, title, keywords。分别代表类别,分类置信度,概要,标题,关键词。
+-----------------------------
+""",
+            'prompt3': f"""
+请对如下文本进行分类,不允许返回"其他"。
+以json格式返回,key为categor与confidence, 分别代表类别与分类置信度。给出top 3的分类结果。
+-----------------------------
+""",
+            'prompt4': f"""
+请对如下文本进行分类。类别为其中的一个:【{' '.join(TAGS_NEW)}】。
+以json格式返回,key为category与confidence,分别代表类别与分类置信度。给出top 3的分类结果。
+-----------------------------
+""",
+        },
+        'title': {
+            'prompt1': f"""
+请对如下文本进行: 为文本取一个易于分享,吸引人要求的标题。要求在30个字以内。
+-----------------------------
+""",
+            'prompt2': f"""
+我想让你充当爆款标题生成器,我会给你提供一段视频的讲解文本,你生成一个更吸引眼球的标题。
+标题不要超过35个字。
+标题要突出惊奇感,让老年人看到就想转发,语义要保持中立,不要向负面倾斜,不要涉及政治敏感话题。
+如果原讲解文本中有地名,不要改变讲解文本中的地名。
+如果原讲解文本是关于唱歌但是没有说明是哪首歌,你起的标题也不要说歌名。
+如果原讲解文本中没有明确指出男女性别,你起的标题也不要说具体的性别。
+如果原讲解文本中没有明确指出是谁干了这件事,你起的标题也不要说是谁干的。
+如果原讲解文本中没有提及老年人,你起的标题也不要说老年人。
+我的原讲解文本是:
+"""
+        }
+    }
+
+
+def set_config():
+    return BaseConfig()

+ 94 - 0
feishu.py

@@ -0,0 +1,94 @@
+import json
+from utils import request_post, request_get
+from config import set_config
+
+config_ = set_config()
+
+
+class FeiShuHelper(object):
+    @staticmethod
+    def get_tenant_access_token():
+        """获取自建应用的tenant_access_token"""
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
+        headers = {"Content-Type": "application/json; charset=utf-8"}
+        request_data = config_.FEISHU_TOKEN
+        data = request_post(request_url=url, headers=headers, request_data=request_data)
+        if data is not None:
+            tenant_access_token = data.get('tenant_access_token')
+            return tenant_access_token
+
+    def get_data(self, spreadsheet_token, sheet_id):
+        """读取电子表格数据"""
+        tenant_access_token = self.get_tenant_access_token()
+        url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{spreadsheet_token}/values_batch_get"
+        headers = {
+            "Content-Type": "application/json; charset=utf-8",
+            "Authorization": f"Bearer {tenant_access_token}"
+        }
+        params = {
+            'ranges': sheet_id,
+        }
+        data = request_get(request_url=url, headers=headers, params=params)
+        values = []
+        if data is not None:
+            try:
+                values = data['data']['valueRanges'][0].get('values')
+            except:
+                values = []
+        return values
+
+    def data_to_feishu_sheet(self, sheet_token, sheet_id, data, start_row, start_column, end_column):
+        tenant_access_token = self.get_tenant_access_token()
+        url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{sheet_token}/values_prepend"
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": f"Bearer {tenant_access_token}"
+        }
+        for i in range(len(data) // 10 + 1):
+            values = data[i * 10:(i + 1) * 10]
+            start_index = start_row + i * 10
+            end_index = start_index + len(values)-1
+            print(len(values), start_index, end_index)
+            post_data = {
+                "valueRange": {
+                    "range": f"{sheet_id}!{start_column}{start_index}:{end_column}{end_index}",
+                    "values": values
+                }
+            }
+            r2 = request_post(request_url=url, headers=headers, request_data=post_data)
+            # print(r2["msg"])
+            print(r2)
+
+    def update_values(self, sheet_token, sheet_id, data, start_row, start_column, end_column):
+        """写入数据"""
+        tenant_access_token = self.get_tenant_access_token()
+        url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{sheet_token}/values_append"
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": f"Bearer {tenant_access_token}"
+        }
+        for i in range(len(data) // 10 + 1):
+            values = data[i * 10:(i + 1) * 10]
+            start_index = start_row + i * 10
+            end_index = start_index + len(values) - 1
+            print(len(values), start_index, end_index)
+            post_data = {
+                "valueRange": {
+                    "range": f"{sheet_id}!{start_column}{start_index}:{end_column}{end_index}",
+                    "values": values
+                }
+            }
+            r2 = request_post(request_url=url, headers=headers, request_data=post_data)
+            # print(r2["msg"])
+            print(r2)
+
+
+if __name__ == '__main__':
+    # sheet_info = config_.SHEET_INFO['汉语常用词汇表']
+    # FeiShuHelper().get_data(spreadsheet_token=sheet_info.get('spreadsheet_token'), sheet_id=sheet_info.get('sheet_id'))
+    FeiShuHelper().data_to_feishu_sheet(sheet_token='DkiUsqwJ6hmBxstBYyEcNE4ante',
+                                        sheet_id='08d4cc',
+                                        data=[['1', 2, 3, 4]],
+                                        start_row=1,
+                                        start_column='A',
+                                        end_column='D')

+ 42 - 0
gpt_tag.py

@@ -0,0 +1,42 @@
+import time
+import requests
+import traceback
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+def get_tag(prompt):
+    retry = 1
+    while retry < 3:
+        try:
+            log_.info(f"retry = {retry}")
+            headers = {
+                'Content-Type': 'application/json',
+                # 'Authorization': f'Bearer {os.environ["OPENAI_API_KEY"]}',
+                'Authorization': f'Bearer {config_.GPT_OPENAI_API_KEY}',
+            }
+            proxies = config_.PROXIES
+            json_data = {
+                'model': 'gpt-3.5-turbo',
+                'messages': [
+                    {
+                        'role': 'user',
+                        'content': f'{prompt}',
+                    },
+                ],
+            }
+            response = requests.post(url=config_.GPT_HOST, headers=headers, json=json_data, proxies=proxies)
+            print(response.json())
+            print(response.json()['choices'][0]['message']['content'])
+            print('\n')
+            result_content = response.json()['choices'][0]['message']['content']
+            return result_content
+        except Exception as e:
+            print(e)
+            log_.error(traceback.format_exc())
+            retry += 1
+            time.sleep(30)
+            continue

+ 40 - 0
log.py

@@ -0,0 +1,40 @@
+import logging
+import logging.config
+
+from log_conf import conf
+
+
+class Log(object):
+    def __init__(self):
+        # 配置
+        logging.config.dictConfig(conf)
+
+    def __console(self, level, message):
+        if level == 'info':
+            logger = logging.getLogger('sls')
+            logger.info(message)
+        elif level == 'debug':
+            logger = logging.getLogger('root')
+            logger.debug(message)
+        elif level == 'warning':
+            logger = logging.getLogger('root')
+            logger.warning(message)
+        elif level == 'error':
+            logger = logging.getLogger('error')
+            logger.error(message)
+
+    def debug(self, message):
+        self.__console('debug', message)
+        # return
+
+    def info(self, message):
+        self.__console('info', message)
+        # return
+
+    def warning(self, message):
+        self.__console('warning', message)
+        # return
+
+    def error(self, message):
+        self.__console('error', message)
+        # return

+ 85 - 0
log_conf.py

@@ -0,0 +1,85 @@
+# log conf
+import logging
+import aliyun
+import os
+import time
+from config import set_config
+config_ = set_config()
+
+# 本地日志存储路径
+log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "logs")
+if not os.path.exists(log_path):
+    os.makedirs(log_path)
+# 文件的命名
+log_name = os.path.join(log_path, '{}.log'.format(time.strftime('%Y%m%d')))
+
+conf = {
+    'version': 1,
+    'formatters': {
+        'rawFormatter': {
+            'class': 'logging.Formatter',
+            'format': '%(message)s'
+        },
+        'simpleFormatter': {
+            'class': 'logging.Formatter',
+            'format': '%(asctime)s %(levelname)s: %(message)s'
+        }
+    },
+    'handlers': {
+        'consoleHandler': {
+            '()': 'logging.StreamHandler',
+            'level': 'DEBUG',
+            'formatter': 'simpleFormatter',
+        },
+        # 'slsHandler': {
+        #     '()': 'aliyun.log.QueuedLogHandler',
+        #     'level': 'INFO',
+        #     'formatter': 'rawFormatter',
+        #     # custom args:
+        #     'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
+        #     'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
+        #     'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
+        #     'project': config_.ALIYUN_LOG.get('PROJECT', ''),
+        #     'log_store': "info",
+        #     'extract_kv': True,
+        #     'extract_json': True
+        # },
+        # 'errorHandler': {
+        #     '()': 'aliyun.log.QueuedLogHandler',
+        #     'level': 'ERROR',
+        #     'formatter': 'rawFormatter',
+        #     # custom args:
+        #     'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
+        #     'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
+        #     'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
+        #     'project': config_.ALIYUN_LOG.get('PROJECT', ''),
+        #     'log_store': "error",
+        #     'extract_kv': True,
+        #     'extract_json': True
+        # },
+        'fileHandler': {
+            '()': 'logging.FileHandler',
+            'level': 'INFO',
+            'formatter': 'simpleFormatter',
+            'filename': log_name,
+            'mode': 'a',
+            'encoding': 'utf-8'
+        }
+    },
+    'loggers': {
+        'root': {
+            'handlers': ['consoleHandler', ],
+            'level': 'DEBUG'
+        },
+        'sls': {
+            'handlers': ['consoleHandler', 'fileHandler'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'error': {
+            'handlers': ['consoleHandler', 'fileHandler'],
+            'level': 'ERROR',
+            'propagate': False
+        }
+    }
+}

+ 143 - 0
main_process.py

@@ -0,0 +1,143 @@
+import traceback
+
+from feishu import FeiShuHelper
+from audio_process import get_wav
+from xunfei_asr import call_asr
+from utils import download_video, asr_validity_discrimination
+from gpt_tag import get_tag
+from config import set_config
+from log import Log
+config_ = set_config()
+log_ = Log()
+
+
+def main(sheet_info_config):
+    video_spreadsheet_token = sheet_info_config['video_spreadsheet_token']
+    video_sheet_id = sheet_info_config['video_sheet_id']
+    read_start_row = sheet_info_config['read_start_row']
+    res_spreadsheet_token = sheet_info_config['res_spreadsheet_token']
+    res_sheet_id = sheet_info_config['res_sheet_id']
+    write_start_row = sheet_info_config['write_start_row']
+    write_start_col = sheet_info_config['write_start_col']
+    write_end_col = sheet_info_config['write_end_col']
+
+    # 1. 读取飞书表格,获取视频url和videoId
+    feishu_helper = FeiShuHelper()
+    data = feishu_helper.get_data(spreadsheet_token=video_spreadsheet_token, sheet_id=video_sheet_id)
+    videos = []
+    for item in data[read_start_row:]:
+        print(item)
+        try:
+            videos.append(
+                {
+                    'videoId': item[0],
+                    'title': item[1],
+                    'videoPath': item[3][0]['text'],
+                }
+            )
+        except:
+            continue
+    log_.info(f"videos count: {len(videos)}")
+
+    result = []
+    for i, video in enumerate(videos):
+        try:
+            log_.info(f"i = {i}, video = {video}")
+            # 1. 下载视频
+            video_id = video['videoId']
+            video_path = video['videoPath']
+            video_file = download_video(video_path=video_path, video_id=video_id, download_folder='videos')
+            # print(video_file)
+            log_.info(f"video_path = {video_file}")
+
+            # 2. 获取视频中的音频
+            audio_path = get_wav(video_path=video_file)
+            # print(audio_path)
+            log_.info(f"audio_path = {audio_path}")
+
+            # 3. asr
+            dialogue_path, asr_res_initial = call_asr(audio_path=audio_path)
+            # print(asr_res)
+            log_.info(f"asr_res_initial = {asr_res_initial}")
+
+            # 4. 判断asr识别的文本是否有效
+            validity = asr_validity_discrimination(text=asr_res_initial)
+            log_.info(f"validity = {validity}")
+
+            # 5. 对asr结果进行清洗
+            asr_res = asr_res_initial.strip().replace('\n', '')
+            for stop_word in config_.STOP_WORDS:
+                asr_res = asr_res.replace(stop_word, '')
+            # token限制: 字数 <= 2500
+            asr_res = asr_res[-2500:]
+
+            # 6. gpt产出结果
+            prompt = f"{config_.GPT_PROMPT['tags']['prompt4']}{asr_res.strip()}"
+            gpt_res = get_tag(prompt=prompt)
+            # print(gpt_res)
+            log_.info(f"gpt_res = {gpt_res}")
+
+            # 7. 结果写入飞书表格
+            result = [[video_id, video_path, video['title'], str(validity), asr_res_initial, gpt_res, prompt]]
+            log_.info(f"result = {result}")
+            if len(result) > 0:
+                feishu_helper.data_to_feishu_sheet(
+                    sheet_token=res_spreadsheet_token,
+                    sheet_id=res_sheet_id,
+                    data=result,
+                    start_row=write_start_row,
+                    start_column=write_start_col,
+                    end_column=write_end_col
+                )
+                log_.info(f"write to feishu success!")
+                write_start_row += 1
+        except Exception as e:
+            log_.error(e)
+            log_.error(traceback.format_exc())
+            continue
+
+    # 6. 结果写入飞书表格
+    # if len(result) > 0:
+    #     feishu_helper.data_to_feishu_sheet(
+    #         sheet_token=res_spreadsheet_token,
+    #         sheet_id=res_sheet_id,
+    #         data=result,
+    #         start_row=write_start_row,
+    #         start_column=write_start_col,
+    #         end_column=write_end_col
+    #     )
+
+
+if __name__ == '__main__':
+    sheet_info = {
+        '历史视频top5000回流倒叙排列': {
+            'video_spreadsheet_token': 'L4ywsRaV2hFLv1t4Athcdw71nde',
+            'video_sheet_id': 'hRjMrL',
+            'read_start_row': 2,
+            'res_spreadsheet_token': 'DkiUsqwJ6hmBxstBYyEcNE4ante',
+            'res_sheet_id': '7Fua00',
+            'write_start_row': 3,
+            'write_start_col': 'A',
+            'write_end_col': 'I'
+        }
+    }
+
+    for sheet_tag, sheet_item in sheet_info.items():
+        print(sheet_tag)
+        main(sheet_info_config=sheet_item)
+
+    # video_path = download_video(
+    #     video_url='http://rescdn.yishihui.com/longvideo/video/vpc/20230420/22421791F3yZJNHSelDuvs04zd',
+    #     video_id='001', download_folder='videos', ftype='mp4')
+    # print(video_path)
+    # # 3. 获取视频中的音频
+    # audio_path = get_wav(video_path=video_path)
+    # print(audio_path)
+    # log_.info(f"audio_path = {audio_path}")
+    # # 4. asr
+    # asr_res = call_asr(audio_path=audio_path)
+    # print(asr_res)
+    # log_.info(f"asr_res = {asr_res}")
+    # # 5. gpt产出结果
+    # gpt_res = get_tag(text=asr_res)
+    # print(gpt_res)

+ 76 - 0
utils.py

@@ -0,0 +1,76 @@
+import requests
+import os
+import json
+import traceback
+from log import Log
+from config import set_config
+log_ = Log()
+config_ = set_config()
+
+
+def request_post(request_url, headers, request_data):
+    """
+    post 请求 HTTP接口
+    :param request_url: 接口URL
+    :param headers: 请求头
+    :param request_data: 请求参数
+    :return: res_data json格式
+    """
+    try:
+        response = requests.post(url=request_url, json=request_data, headers=headers)
+        # print(response)
+        if response.status_code == 200:
+            res_data = json.loads(response.text)
+            return res_data
+        else:
+            return None
+    except Exception as e:
+        log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
+        return None
+
+
+def request_get(request_url, headers, params=None):
+    """
+    get 请求 HTTP接口
+    :param request_url: 接口URL
+    :param headers: 请求头
+    :param params: 请求参数
+    :return: res_data json格式
+    """
+    try:
+        response = requests.get(url=request_url, headers=headers, params=params)
+        if response.status_code == 200:
+            res_data = json.loads(response.text)
+            return res_data
+        else:
+            return None
+    except Exception as e:
+        log_.error('url: {}, exception: {}, traceback: {}'.format(request_url, e, traceback.format_exc()))
+        return None
+
+
+def download_video(video_path, video_id, download_folder, ftype='mp4'):
+    """下载视频"""
+    if not os.path.exists(download_folder):
+        os.makedirs(download_folder)
+    filename = f"{download_folder}/{video_id}.{ftype}"
+    # 视频已存在,则不重复下载
+    if os.path.exists(filename):
+        return filename
+    response = requests.get(video_path, stream=True)
+    if response.status_code == 200:
+        with open(filename, "wb") as video_file:
+            for chunk in response.iter_content(chunk_size=8192):
+                video_file.write(chunk)
+        return filename
+
+
+def asr_validity_discrimination(text):
+    """判断asr识别的文本是否有效"""
+    words = set(text)
+    words_rate = len(words) / len(text)
+    # print(len(words), words_rate)
+    if words_rate < 0.1:
+        return False
+    return True
+

+ 154 - 0
xunfei_asr.py

@@ -0,0 +1,154 @@
+import ast
+import base64
+import hashlib
+import hmac
+import json
+import time
+import requests
+import urllib
+import os
+from audio_process import get_audio_duration
+from config import set_config
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+class RequestApi(object):
+    def __init__(self, appid, secret_key, upload_file_path):
+        self.appid = appid
+        self.secret_key = secret_key
+        self.upload_file_path = upload_file_path
+        self.ts = str(int(time.time()))
+        self.signa = self.get_signa()
+
+    def get_signa(self):
+        """
+        signa生成
+        :return: signa
+        """
+        # signa的生成公式:HmacSHA1(MD5(appid + ts),secretkey)
+        m2 = hashlib.md5()
+        m2.update((self.appid + self.ts).encode('utf-8'))
+        md5 = m2.hexdigest()
+        md5 = bytes(md5, encoding='utf-8')
+        # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
+        signa = hmac.new(self.secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
+        signa = base64.b64encode(signa)
+        signa = str(signa, 'utf-8')
+        return signa
+
+    def upload(self):
+        """
+        上传
+        :return: orderId
+        """
+        video_id = self.upload_file_path.split('/')[-1].replace('.wav', '')
+        # 获取音频文件大小,不超过500M
+        file_len = os.path.getsize(self.upload_file_path)
+        file_size = file_len / 1024 / 1024
+        if file_size > 500:
+            log_.error({'videoId': video_id, 'errorType': 'audioSizeError',
+                        'errorMsg': f'audioSize: {file_size}M, required <= 500M'})
+            return None
+        file_name = os.path.basename(self.upload_file_path)
+        # 获取音频时长,不超过5h
+        duration = get_audio_duration(self.upload_file_path)
+        audio_duration = duration / 1000 / 60 / 60
+        if audio_duration > 5:
+            log_.error({'videoId': video_id, 'errorType': 'audioDurationError',
+                        'errorMsg': f'audioSize: {audio_duration}h, required <= 5h'})
+            return None
+        # 请求参数拼接
+        param_dict = {
+            'appId': self.appid,
+            'signa': self.signa,
+            'ts': self.ts,
+            'fileSize': file_len,
+            'fileName': file_name,
+            'duration': str(duration),
+            'roleType': 1
+        }
+        # print("upload参数:", param_dict)
+        # 以二进制方式读取音频文件内容
+        data = open(self.upload_file_path, 'rb').read(file_len)
+        # 请求upload api
+        response = requests.post(
+            url=config_.XFASR_HOST + config_.XF_API['upload'] + "?" + urllib.parse.urlencode(param_dict),
+            headers={"Content-type": "application/json"},
+            data=data
+        )
+        # print("upload_url:", response.request.url)
+        result = json.loads(response.text)
+        # print("upload resp:", result)
+        return result['content']['orderId']
+
+    def get_result(self, order_id):
+        """
+        查询结果
+        :param order_id:
+        :return: result
+        """
+        param_dict = {
+            'appId': self.appid,
+            'signa': self.signa,
+            'ts': self.ts,
+            'orderId': order_id,
+            'resultType': 'transfer'
+        }
+        status = 3
+        # 建议使用回调的方式查询结果,查询接口有请求频率限制
+        while status == 3:
+            response = requests.post(
+                url=config_.XFASR_HOST + config_.XF_API['get_result'] + "?" + urllib.parse.urlencode(param_dict),
+                headers={"Content-type": "application/json"}
+            )
+            # print("get_result_url:",response.request.url)
+            result = json.loads(response.text)
+            status = result['content']['orderInfo']['status']
+            if status == 4:
+                return result
+            time.sleep(5)
+
+    def parse_lattice(self, result):
+        content = result['content']['orderResult']
+        content = ast.literal_eval(content)
+        contents = content['lattice']
+        asr_ret = ''
+        for js in contents:
+            json_1best = js['json_1best']
+            json_1best = ast.literal_eval(json_1best)
+            # print(json_1best)
+            json_1best_contents = json_1best['st']['rt']
+            l = []
+            for cw in json_1best_contents:
+                cws = cw['ws']
+                for cw in cws:
+                    l.append(cw['cw'][0]['w'])
+            asr_ret += ''.join(l)+'\n'
+        return asr_ret
+
+
+def call_asr(audio_path):
+    """ASR"""
+    dialogue_path = audio_path.replace('.wav', '.txt')
+    # 视频已识别,则不重复调用,直接读取文件中的内容
+    if os.path.exists(dialogue_path):
+        with open(dialogue_path, 'r') as rf:
+            asr_res = ''.join(rf.readlines())
+    else:
+        api = RequestApi(appid=config_.XFASR_CONFIG['appid'],
+                         secret_key=config_.XFASR_CONFIG['secret_key'],
+                         upload_file_path=audio_path)
+        order_id = api.upload()
+        result = api.get_result(order_id)
+        asr_res = api.parse_lattice(result)
+        with open(dialogue_path, 'w') as f:
+            f.write(asr_res)
+    return dialogue_path, asr_res
+
+
+if __name__ == '__main__':
+    audio_path = 'videos/001.wav'
+    call_asr(audio_path=audio_path)