import random import os import logging import json import time import traceback import ast from gevent import monkey monkey.patch_all() from flask import Flask, request from mq_http_sdk.mq_exception import MQExceptionBase from mq_http_sdk.mq_producer import * from mq_http_sdk.mq_client import * from gpt_process import title_generate from log import Log from config import set_config from db_helper import RedisHelper from gevent.pywsgi import WSGIServer from multiprocessing import cpu_count, Process # from werkzeug.middleware.profiler import ProfilerMiddleware # from geventwebsocket.handler import WebSocketHandler app = Flask(__name__) log_ = Log() config_ = set_config() def title_generate_main(): # 初始化client mq_client = MQClient( # 设置HTTP协议客户端接入点 host=config_.MQ_CONFIG['ENDPOINT'], # AccessKey ID,阿里云身份验证标识 access_id=config_.MQ_CONFIG['ACCESS_KEY'], # AccessKey Secret,阿里云身份验证密钥 access_key=config_.MQ_CONFIG['SECRET_KEY'] ) # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 instance_id = config_.MQ_CONFIG['INSTANCE_ID'] # 监听消息所属的Topic consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name'] # 您在消息队列RocketMQ版控制台创建的Group ID。 group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id'] consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id) # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。 # 长轮询时间3秒(最多可设置为30秒)。 wait_seconds = 3 # 一次最多消费1条(最多可设置为16条)。 batch = 1 print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \ % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds))) while True: receipt_handle_list = [] try: # 长轮询消费消息。 recv_msgs = consumer.consume_message(batch, wait_seconds) for msg in recv_msgs: print(("Receive, MessageId: %s\nMessageBodyMD5: %s \ \nMessageTag: %s\nConsumedTimes: %s \ \nPublishTime: %s\nBody: %s \ \nNextConsumeTime: %s \ \nReceiptHandle: %s \ \nProperties: %s\n" % \ (msg.message_id, msg.message_body_md5, msg.message_tag, msg.consumed_times, msg.publish_time, msg.message_body, msg.next_consume_time, msg.receipt_handle, msg.properties))) video_id = msg.message_body['videoId'] video_path = msg.message_body['videoPath'] try: title = title_generate(video_id=video_id, video_path=video_path) except ConnectionResetError: # API限流 log_.info(video_id) # 记录重试次数 key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}" redis_helper = RedisHelper() redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600) redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600) # 判断已重试次数 retry_count = redis_helper.get_data_from_redis(key_name=key_name) if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT: # 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) pass else: pass except Exception: # 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) log_.info(traceback.format_exc()) else: # 1. 发送结果至done消息队列 producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name'] producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name) ???? print(title) # 2. 确认消息消费成功 receipt_handle_list.append(msg.receipt_handle) except MQExceptionBase as e: # Topic中没有消息可消费。 if e.type == "MessageNotExist": print(("No new message! RequestId: %s" % e.req_id)) continue print(("Consume Message Fail! Exception:%s\n" % e)) time.sleep(2) continue # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。 # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 try: consumer.ack_message(receipt_handle_list) print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list))) except MQExceptionBase as e: print(("\nAk Message Fail! Exception:%s" % e)) # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 if e.sub_errors: for sub_error in e.sub_errors: print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \ (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"]))) if __name__ == '__main__': title_generate_main()