Browse Source

update title_generate_main retry

liqian 2 years ago
parent
commit
895318ed0a
4 changed files with 165 additions and 17 deletions
  1. 125 1
      app.py
  2. 2 0
      config.py
  3. 23 14
      gpt_process.py
  4. 15 2
      xunfei_asr.py

+ 125 - 1
app.py

@@ -25,7 +25,7 @@ log_ = Log()
 config_ = set_config()
 
 
-def title_generate_main():
+def title_generate_main0():
     # log_.info("debug: title_generate_main")
     # 初始化client
     mq_client = MQClient(
@@ -183,6 +183,130 @@ def title_generate_main():
             continue
 
 
+def title_generate_main():
+    # log_.info("debug: title_generate_main")
+    # 初始化client
+    mq_client = MQClient(
+        # 设置HTTP协议客户端接入点
+        host=config_.MQ_CONFIG['ENDPOINT'],
+        # AccessKey ID,阿里云身份验证标识
+        access_id=config_.MQ_CONFIG['ACCESS_KEY'],
+        # AccessKey Secret,阿里云身份验证密钥
+        access_key=config_.MQ_CONFIG['SECRET_KEY']
+    )
+    # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
+    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
+    instance_id = config_.MQ_CONFIG['INSTANCE_ID']
+    # 监听消息所属的Topic
+    consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
+    # 您在消息队列RocketMQ版控制台创建的Group ID。
+    group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
+    consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 3
+    # 一次最多消费1条(最多可设置为16条)。
+    batch = 1
+    print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
+           % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
+    while True:
+        receipt_handle_list = []
+        try:
+            # 长轮询消费消息。
+            start_time0 = time.time()
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                start_time = time.time()
+                receive_info = {
+                    'messageId': msg.message_id,
+                    'messageBodyMD5': msg.message_body_md5,
+                    'messageTag': msg.message_tag,
+                    'consumedTimes': msg.consumed_times,
+                    'publishTime': msg.publish_time,
+                    'body': msg.message_body,
+                    'nextConsumeTime': msg.next_consume_time,
+                    'receiptHandle': msg.receipt_handle,
+                    'properties': msg.properties
+                }
+                log_.info(receive_info)
+                message_body = json.loads(msg.message_body)
+                video_id = message_body['videoId']
+                video_path = message_body['videoPath']
+
+                # 1. 添加消息handle到ack_message列表
+                receipt_handle_list.append(msg.receipt_handle)
+
+                # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
+                # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
+                consumer.ack_message(receipt_handle_list)
+                log_.info({
+                    'ackMessage': {
+                        'status': 'success',
+                        'receiptHandleList': receipt_handle_list,
+                        'executeTime': (time.time() - start_time0) * 1000
+                    }
+                })
+
+                # 2. 获取标题
+                title, generate_filepath = title_generate(video_id=video_id, video_path=video_path)
+                log_.info({'titleGenerateInitial': {'videoId': video_id, 'title': title}})
+                # 删除相关文件
+                for _, filepath in generate_filepath.items():
+                    try:
+                        os.remove(filepath)
+                    except:
+                        continue
+                if title is None:
+                    continue
+                elif title[0] in ['"', "'"] and title[-1] in ['"', "'"]:
+                    title = title[1:-1]
+                log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
+
+                # 3. 发送结果至done消息队列
+                try:
+                    msg_content = {
+                        'title': title,
+                        'videoId': video_id
+                    }
+                    message_body = json.dumps(msg_content)
+                    producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
+                    producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
+                    produce_msg = TopicMessage(message_body=message_body)
+                    # 设置消息的Key。
+                    produce_msg.set_message_key(f"videoId-{video_id}")
+                    re_msg = producer.publish_message(produce_msg)
+                    log_.info({
+                        'publish': {
+                            'status': 'success',
+                            'messageID': re_msg.message_id,
+                            'bodyMD5': re_msg.message_body_md5,
+                            'messageContent': msg_content,
+                            'executeTime': (time.time() - start_time) * 1000
+                        }
+                    })
+                except Exception as publish_e:
+                    log_.error({
+                        'errorType': 'publish',
+                        'status': 'fail',
+                        'videoId': video_id,
+                        'exception': publish_e,
+                        'traceback': traceback.format_exc()
+                    })
+                    # if publish_e.type == "TopicNotExist":
+                    #     sys.exit(1)
+
+        except Exception as consume_e:
+            log_.error({
+                'errorType': 'consume',
+                'status': 'fail',
+                'exception': consume_e,
+                'traceback': traceback.format_exc()
+            })
+            # log_.error("Consume Message Fail! Exception:%s\n" % e)
+            time.sleep(30)
+            continue
+
+
 class FlaskApp(Flask):
     def __init__(self, *args, **kwargs):
         super(FlaskApp, self).__init__(*args, **kwargs)

+ 2 - 0
config.py

@@ -18,6 +18,8 @@ class BaseConfig(object):
     # GPT_OPENAI_API_KEY = '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'
     GPT_OPENAI_API_KEY = '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'
     GPT_URL = 'http://aigc.piaoquantv.com/aigc-server/aigc/content'
+    RETRY_MAX_COUNT = 3
+
     # 代理地址
     PROXIES = {
         'http': 'http://127.0.0.1:4780',

+ 23 - 14
gpt_process.py

@@ -32,16 +32,23 @@ def request_gpt(prompt):
     }
     response = requests.post(url=config_.GPT_HOST, headers=headers, json=json_data, proxies=proxies)
     """
-    response = requests.post(url=config_.GPT_URL, json={'content': prompt, 'auth': config_.GPT_OPENAI_API_KEY})
-    # print(response.json())
-    # print(response.json()['choices'][0]['message']['content'])
-    # print('\n')
-    # result_content = response.json()['choices'][0]['message']['content']
-    # log_.info(f"response.text: {response.text}")
-    res_data = json.loads(response.text)
-    if res_data['code'] != 0:
-        raise ConnectionResetError
-    result_content = res_data['data']['choices'][0]['message']['content']
+    retry_count = 0
+    result_content = None
+    while retry_count < config_.RETRY_MAX_COUNT:
+        retry_count += 1
+        try:
+            response = requests.post(url=config_.GPT_URL, json={'content': prompt, 'auth': config_.GPT_OPENAI_API_KEY})
+            # print(response.json())
+            # print(response.json()['choices'][0]['message']['content'])
+            # print('\n')
+            # result_content = response.json()['choices'][0]['message']['content']
+            # log_.info(f"response.text: {response.text}")
+            res_data = json.loads(response.text)
+            if res_data['code'] != 0:
+                continue
+            result_content = res_data['data']['choices'][0]['message']['content']
+        except Exception:
+            continue
     return result_content
 
 
@@ -52,17 +59,21 @@ def title_generate(video_id, video_path):
     :param video_path: videoPath
     :return:
     """
+    generate_filepath = dict()
     # 1. 下载视频
     # log_.info(f"debug: title_generate 1")
     video_file_path = download_video(video_path=video_path, video_id=video_id, download_folder='videos')
+    generate_filepath['video_file_path'] = video_file_path
     # log_.info({'videoId': video_id, 'video_file_path': video_file_path})
     # 2. 获取视频中的音频
     # log_.info(f"debug: title_generate 2")
     audio_path = get_wav(video_path=video_file_path)
+    generate_filepath['audio_path'] = audio_path
     # log_.info({'videoId': video_id, 'audio_path': audio_path})
     # 3. asr
     # log_.info(f"debug: title_generate 3")
     dialogue_path, asr_res = call_asr(audio_path=audio_path)
+    generate_filepath['dialogue_path'] = dialogue_path
     log_.info({
         'asrResult': {'videoId': video_id, 'asrRes': asr_res}
     })
@@ -70,10 +81,8 @@ def title_generate(video_id, video_path):
     # log_.info(f"debug: title_generate 4")
     prompt = f"{config_.GPT_PROMPT['title']['prompt2']}{asr_res.strip()}"
     gpt_res = request_gpt(prompt=prompt)
-    # 5. 删除相关文件
-    for file in [video_file_path, audio_path, dialogue_path]:
-        os.remove(file)
-    return gpt_res
+
+    return gpt_res, generate_filepath
     # except ConnectionResetError:
     #     log_.info(video_id)
     # except Exception as e:

+ 15 - 2
xunfei_asr.py

@@ -9,8 +9,10 @@ import urllib
 import os
 from audio_process import get_audio_duration
 from config import set_config
+from log import Log
 
 config_ = set_config()
+log_ = Log()
 
 
 class RequestApi(object):
@@ -42,11 +44,22 @@ class RequestApi(object):
         上传
         :return: orderId
         """
-        # 获取音频文件大小
+        video_id = self.upload_file_path.split('/')[-1].replace('.wav', '')
+        # 获取音频文件大小,不超过500M
         file_len = os.path.getsize(self.upload_file_path)
+        file_size = file_len / 1024 / 1024
+        if file_size > 500:
+            log_.error({'videoId': video_id, 'errorType': 'audioSizeError',
+                        'errorMsg': f'audioSize: {file_size}M, required <= 500M'})
+            return None
         file_name = os.path.basename(self.upload_file_path)
-        # 获取音频时长
+        # 获取音频时长,不超过5h
         duration = get_audio_duration(self.upload_file_path)
+        audio_duration = duration / 1000 / 60
+        if audio_duration > 60:
+            log_.error({'videoId': video_id, 'errorType': 'audioDurationError',
+                        'errorMsg': f'audioSize: {audio_duration}h, required <= 60h'})
+            return None
         # 请求参数拼接
         param_dict = {
             'appId': self.appid,