123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import random
- import os
- import logging
- import json
- import time
- import traceback
- import ast
- from gevent import monkey
- monkey.patch_all()
- from flask import Flask, request
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.mq_producer import *
- from mq_http_sdk.mq_client import *
- from gpt_process import title_generate
- from log import Log
- from config import set_config
- from db_helper import RedisHelper
- from gevent.pywsgi import WSGIServer
- from multiprocessing import cpu_count, Process
- # from werkzeug.middleware.profiler import ProfilerMiddleware
- # from geventwebsocket.handler import WebSocketHandler
- app = Flask(__name__)
- log_ = Log()
- config_ = set_config()
- def title_generate_main():
- # 初始化client
- mq_client = MQClient(
- # 设置HTTP协议客户端接入点
- host=config_.MQ_CONFIG['ENDPOINT'],
- # AccessKey ID,阿里云身份验证标识
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
- # AccessKey Secret,阿里云身份验证密钥
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
- # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
- # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
- # 监听消息所属的Topic
- todo_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
- # 您在消息队列RocketMQ版控制台创建的Group ID。
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, todo_topic_name, group_id)
- # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
- # 长轮询时间3秒(最多可设置为30秒)。
- wait_seconds = 3
- # 一次最多消费1条(最多可设置为16条)。
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", todo_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
- # 长轮询消费消息。
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
- \nMessageTag: %s\nConsumedTimes: %s \
- \nPublishTime: %s\nBody: %s \
- \nNextConsumeTime: %s \
- \nReceiptHandle: %s \
- \nProperties: %s\n" % \
- (msg.message_id, msg.message_body_md5,
- msg.message_tag, msg.consumed_times,
- msg.publish_time, msg.message_body,
- msg.next_consume_time, msg.receipt_handle, msg.properties)))
- video_id = msg.message_body['videoId']
- video_path = msg.message_body['videoPath']
- try:
- title = title_generate(video_id=video_id, video_path=video_path)
- except ConnectionResetError:
- # API限流
- log_.info(video_id)
- # 记录重试次数
- key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
- redis_helper = RedisHelper()
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
- # 判断已重试次数
- retry_count = redis_helper.get_data_from_redis(key_name=key_name)
- if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
- # 确认消息消费成功
- receipt_handle_list.append(msg.receipt_handle)
- pass
- else:
- pass
- except Exception:
- # 确认消息消费成功
- receipt_handle_list.append(msg.receipt_handle)
- log_.info(traceback.format_exc())
- else:
- # 1. 发送结果至done消息队列
- print(title)
- # 2. 确认消息消费成功
- receipt_handle_list.append(msg.receipt_handle)
- except MQExceptionBase as e:
- # Topic中没有消息可消费。
- if e.type == "MessageNotExist":
- print(("No new message! RequestId: %s" % e.req_id))
- continue
- print(("Consume Message Fail! Exception:%s\n" % e))
- time.sleep(2)
- continue
- # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
- # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
- try:
- consumer.ack_message(receipt_handle_list)
- print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
- except MQExceptionBase as e:
- print(("\nAk Message Fail! Exception:%s" % e))
- # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
- if e.sub_errors:
- for sub_error in e.sub_errors:
- print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
- (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
- if __name__ == '__main__':
- title_generate_main()
|