|
@@ -55,8 +55,10 @@ def title_generate_main():
|
|
|
receipt_handle_list = []
|
|
|
try:
|
|
|
# 长轮询消费消息。
|
|
|
+ start_time0 = time.time()
|
|
|
recv_msgs = consumer.consume_message(batch, wait_seconds)
|
|
|
for msg in recv_msgs:
|
|
|
+ start_time = time.time()
|
|
|
receive_info = {
|
|
|
'messageId': msg.message_id,
|
|
|
'messageBodyMD5': msg.message_body_md5,
|
|
@@ -92,12 +94,12 @@ def title_generate_main():
|
|
|
retry_count = redis_helper.get_data_from_redis(key_name=key_name)
|
|
|
log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
|
|
|
if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
|
|
|
- # 确认消息消费成功
|
|
|
+ # 添加消息handle到ack_message列表
|
|
|
receipt_handle_list.append(msg.receipt_handle)
|
|
|
pass
|
|
|
|
|
|
except Exception:
|
|
|
- # 确认消息消费成功
|
|
|
+ # 添加消息handle到ack_message列表
|
|
|
receipt_handle_list.append(msg.receipt_handle)
|
|
|
log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
|
|
|
else:
|
|
@@ -117,7 +119,8 @@ def title_generate_main():
|
|
|
'status': 'success',
|
|
|
'messageID': re_msg.message_id,
|
|
|
'bodyMD5': re_msg.message_body_md5,
|
|
|
- 'messageContent': msg_content
|
|
|
+ 'messageContent': msg_content,
|
|
|
+ 'executeTime': (time.time() - start_time) * 1000
|
|
|
}
|
|
|
})
|
|
|
except MQExceptionBase as publish_e:
|
|
@@ -131,7 +134,7 @@ def title_generate_main():
|
|
|
if publish_e.type == "TopicNotExist":
|
|
|
sys.exit(1)
|
|
|
|
|
|
- # 2. 确认消息消费成功
|
|
|
+ # 2. 添加消息handle到ack_message列表
|
|
|
receipt_handle_list.append(msg.receipt_handle)
|
|
|
|
|
|
# msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
|
|
@@ -141,7 +144,8 @@ def title_generate_main():
|
|
|
log_.info({
|
|
|
'ackMessage': {
|
|
|
'status': 'success',
|
|
|
- 'receiptHandleList': receipt_handle_list
|
|
|
+ 'receiptHandleList': receipt_handle_list,
|
|
|
+ 'executeTime': (time.time() - start_time0) * 1000
|
|
|
}
|
|
|
})
|
|
|
# except MQExceptionBase as consume_e:
|