import random import os import logging import json import threading import time import traceback import ast from gevent import monkey monkey.patch_all() from flask import Flask, request from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import * from mq_http_sdk.mq_client import * from gpt_process import title_generate from log import Log from config import set_config from db_helper import RedisHelper from gevent.pywsgi import WSGIServer from multiprocessing import cpu_count, Process log_ = Log() config_ = set_config() class FlaskApp(Flask): def __init__(self, *args, **kwargs): super(FlaskApp, self).__init__(*args, **kwargs) self._activate_background_job() def _activate_background_job(self): log_.info("debug: _activate_background_job") def run_job(): log_.info("debug: run_job") title_generate_main() t1 = threading.Thread(target=run_job) t1.start() app = FlaskApp(__name__) @app.route('/healthcheck') def health_check(): log_.info('debug: ok!') return 'ok!' def title_generate_main(): log_.info("debug: title_generate_main") # 初始化client mq_client = MQClient( # 设置HTTP协议客户端接入点 host=config_.MQ_CONFIG['ENDPOINT'], # AccessKey ID,阿里云身份验证标识 access_id=config_.MQ_CONFIG['ACCESS_KEY'], # AccessKey Secret,阿里云身份验证密钥 access_key=config_.MQ_CONFIG['SECRET_KEY'] ) # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 instance_id = config_.MQ_CONFIG['INSTANCE_ID'] # 监听消息所属的Topic consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name'] # 您在消息队列RocketMQ版控制台创建的Group ID。 group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id'] consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id) # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。 # 长轮询时间3秒(最多可设置为30秒)。 wait_seconds = 3 # 一次最多消费1条(最多可设置为16条)。 batch = 1 print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \ % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds))) while True: receipt_handle_list = [] try: # 长轮询消费消息。 recv_msgs = consumer.consume_message(batch, wait_seconds) for msg in recv_msgs: receive_info = { 'messageId': msg.message_id, 'messageBodyMD5': msg.message_body_md5, 'messageTag': msg.message_tag, 'consumedTimes': msg.consumed_times, 'publishTime': msg.publish_time, 'body': msg.message_body, 'nextConsumeTime': msg.next_consume_time, 'receiptHandle': msg.receipt_handle, 'properties': msg.properties } log_.info(receive_info) log_.info(f"debug: {receive_info}") video_id = msg.message_body['videoId'] video_path = msg.message_body['videoPath'] try: ppp = {'videoId': video_id, 'videoPath': video_path} log_.info(f"debug: {ppp}") title = title_generate(video_id=video_id, video_path=video_path) log_.info({'titleGenerate': {'videoId': video_id, 'title': title}}) except ConnectionResetError: # API限流 # 记录重试次数 key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}" redis_helper = RedisHelper() redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600) redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600) # 判断已重试次数 retry_count = redis_helper.get_data_from_redis(key_name=key_name) log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count}) if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT: # 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) pass except Exception: # 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) log_.error({'videoId': video_id, 'traceback': traceback.format_exc()}) else: # 1. 发送结果至done消息队列 try: msg_content = { 'title': title, 'videoId': video_id } message_body = json.dumps(msg_content) producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name'] producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name) msg = TopicMessage(message_body=message_body) re_msg = producer.publish_message(msg) log_.info({ 'publish': { 'status': 'success', 'messageID': re_msg.message_id, 'bodyMD5': re_msg.message_body_md5, 'messageContent': msg_content } }) except MQExceptionBase as publish_e: log_.error({ 'errorType': 'publish', 'status': 'fail', 'videoId': video_id, 'exception': publish_e, 'traceback': traceback.format_exc() }) if publish_e.type == "TopicNotExist": sys.exit(1) # 2. 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。 # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 consumer.ack_message(receipt_handle_list) log_.info({ 'ackMessage': { 'status': 'success', 'receiptHandleList': receipt_handle_list } }) except MQExceptionBase as consume_e: log_.error({ 'errorType': 'consume', 'status': 'fail', 'exception': consume_e, 'traceback': traceback.format_exc() }) # Topic中没有消息可消费。 if consume_e.type == "MessageNotExist": print(("No new message! RequestId: %s" % consume_e.req_id)) continue # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 elif consume_e.sub_errors: for sub_error in consume_e.sub_errors: print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \ (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"]))) log_.error("Consume Message Fail! Exception:%s\n" % consume_e) time.sleep(2) continue if __name__ == '__main__': # title_generate_main() app.run()