import random import os import logging import json import threading import time import traceback import ast from gevent import monkey monkey.patch_all() from flask import Flask, request from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import * from mq_http_sdk.mq_client import * from gpt_process import title_generate from log import Log from config import set_config from db_helper import RedisHelper from gevent.pywsgi import WSGIServer from multiprocessing import cpu_count, Process log_ = Log() config_ = set_config() def title_generate_main(): # log_.info("debug: title_generate_main") # 初始化client mq_client = MQClient( # 设置HTTP协议客户端接入点 host=config_.MQ_CONFIG['ENDPOINT'], # AccessKey ID,阿里云身份验证标识 access_id=config_.MQ_CONFIG['ACCESS_KEY'], # AccessKey Secret,阿里云身份验证密钥 access_key=config_.MQ_CONFIG['SECRET_KEY'] ) # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 instance_id = config_.MQ_CONFIG['INSTANCE_ID'] # 监听消息所属的Topic consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name'] # 您在消息队列RocketMQ版控制台创建的Group ID。 group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id'] consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id) # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。 # 长轮询时间3秒(最多可设置为30秒)。 wait_seconds = 3 # 一次最多消费1条(最多可设置为16条)。 batch = 1 print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \ % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds))) while True: receipt_handle_list = [] try: # 长轮询消费消息。 start_time0 = time.time() recv_msgs = consumer.consume_message(batch, wait_seconds) for msg in recv_msgs: start_time = time.time() receive_info = { 'messageId': msg.message_id, 'messageBodyMD5': msg.message_body_md5, 'messageTag': msg.message_tag, 'consumedTimes': msg.consumed_times, 'publishTime': msg.publish_time, 'body': msg.message_body, 'nextConsumeTime': msg.next_consume_time, 'receiptHandle': msg.receipt_handle, 'properties': msg.properties } log_.info(receive_info) # log_.info(f"debug: {receive_info}") message_body = json.loads(msg.message_body) video_id = message_body['videoId'] video_path = message_body['videoPath'] try: # ppp = {'videoId': video_id, 'videoPath': video_path} # log_.info(f"debug: {ppp}") title = title_generate(video_id=video_id, video_path=video_path) if title[0] in ['"', "'"] and title[-1] in ['"', "'"]: title = title[1:-1] log_.info({'titleGenerate': {'videoId': video_id, 'title': title}}) except ConnectionResetError: # API限流 # 记录重试次数 key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}" redis_helper = RedisHelper() redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600) redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600) # 判断已重试次数 retry_count = redis_helper.get_data_from_redis(key_name=key_name) log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count}) if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT: # 添加消息handle到ack_message列表 receipt_handle_list.append(msg.receipt_handle) pass except Exception: # 添加消息handle到ack_message列表 receipt_handle_list.append(msg.receipt_handle) log_.error({'videoId': video_id, 'traceback': traceback.format_exc()}) else: # 1. 发送结果至done消息队列 try: msg_content = { 'title': title, 'videoId': video_id } message_body = json.dumps(msg_content) producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name'] producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name) produce_msg = TopicMessage(message_body=message_body) re_msg = producer.publish_message(produce_msg) log_.info({ 'publish': { 'status': 'success', 'messageID': re_msg.message_id, 'bodyMD5': re_msg.message_body_md5, 'messageContent': msg_content, 'executeTime': (time.time() - start_time) * 1000 } }) except MQExceptionBase as publish_e: log_.error({ 'errorType': 'publish', 'status': 'fail', 'videoId': video_id, 'exception': publish_e, 'traceback': traceback.format_exc() }) if publish_e.type == "TopicNotExist": sys.exit(1) # 2. 添加消息handle到ack_message列表 receipt_handle_list.append(msg.receipt_handle) # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。 # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 # log_.info(f"receipt_handle_list: {receipt_handle_list}") consumer.ack_message(receipt_handle_list) log_.info({ 'ackMessage': { 'status': 'success', 'receiptHandleList': receipt_handle_list, 'executeTime': (time.time() - start_time0) * 1000 } }) # except MQExceptionBase as consume_e: # log_.error({ # 'errorType': 'consume', # 'status': 'fail', # 'exception': consume_e, # 'traceback': traceback.format_exc() # }) # # Topic中没有消息可消费。 # if consume_e.type == "MessageNotExist": # print(("No new message! RequestId: %s" % consume_e.req_id)) # continue # # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 # elif consume_e.sub_errors: # for sub_error in consume_e.sub_errors: # print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \ # (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"]))) # # log_.error("Consume Message Fail! Exception:%s\n" % consume_e) # time.sleep(30) # continue except Exception as consume_e: log_.error({ 'errorType': 'consume', 'status': 'fail', 'exception': consume_e, 'traceback': traceback.format_exc() }) # log_.error("Consume Message Fail! Exception:%s\n" % e) time.sleep(30) continue class FlaskApp(Flask): def __init__(self, *args, **kwargs): super(FlaskApp, self).__init__(*args, **kwargs) self._activate_background_job() def _activate_background_job(self): # log_.info("debug: _activate_background_job") def run_job(): # log_.info("debug: run_job") title_generate_main() t1 = threading.Thread(target=run_job) t1.start() app = FlaskApp(__name__) @app.route('/healthcheck') def health_check(): log_.info('debug: ok!') return 'ok!' if __name__ == '__main__': # title_generate_main() app.run()