123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- import random
- import os
- import logging
- import json
- import threading
- import time
- import traceback
- import ast
- from gevent import monkey
- monkey.patch_all()
- from flask import Flask, request
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.mq_producer import *
- from mq_http_sdk.mq_client import *
- from gpt_process import title_generate
- from log import Log
- from config import set_config
- from db_helper import RedisHelper
- from gevent.pywsgi import WSGIServer
- from multiprocessing import cpu_count, Process
- log_ = Log()
- config_ = set_config()
- def title_generate_main0():
- # log_.info("debug: title_generate_main")
- # 初始化client
- mq_client = MQClient(
- # 设置HTTP协议客户端接入点
- host=config_.MQ_CONFIG['ENDPOINT'],
- # AccessKey ID,阿里云身份验证标识
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
- # AccessKey Secret,阿里云身份验证密钥
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
- # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
- # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
- # 监听消息所属的Topic
- consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
- # 您在消息队列RocketMQ版控制台创建的Group ID。
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
- # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
- # 长轮询时间3秒(最多可设置为30秒)。
- wait_seconds = 3
- # 一次最多消费1条(最多可设置为16条)。
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
- # 长轮询消费消息。
- start_time0 = time.time()
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- start_time = time.time()
- receive_info = {
- 'messageId': msg.message_id,
- 'messageBodyMD5': msg.message_body_md5,
- 'messageTag': msg.message_tag,
- 'consumedTimes': msg.consumed_times,
- 'publishTime': msg.publish_time,
- 'body': msg.message_body,
- 'nextConsumeTime': msg.next_consume_time,
- 'receiptHandle': msg.receipt_handle,
- 'properties': msg.properties
- }
- log_.info(receive_info)
- # log_.info(f"debug: {receive_info}")
- message_body = json.loads(msg.message_body)
- video_id = message_body['videoId']
- video_path = message_body['videoPath']
- try:
- # ppp = {'videoId': video_id, 'videoPath': video_path}
- # log_.info(f"debug: {ppp}")
- title = title_generate(video_id=video_id, video_path=video_path)
- if title[0] in ['"', "'"] and title[-1] in ['"', "'"]:
- title = title[1:-1]
- log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
- except ConnectionResetError:
- # API限流
- # 记录重试次数
- key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
- redis_helper = RedisHelper()
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
- # 判断已重试次数
- retry_count = redis_helper.get_data_from_redis(key_name=key_name)
- log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
- if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
- # 添加消息handle到ack_message列表
- receipt_handle_list.append(msg.receipt_handle)
- pass
- except Exception:
- # 添加消息handle到ack_message列表
- receipt_handle_list.append(msg.receipt_handle)
- log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
- else:
- # 1. 发送结果至done消息队列
- try:
- msg_content = {
- 'title': title,
- 'videoId': video_id
- }
- message_body = json.dumps(msg_content)
- producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
- producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
- produce_msg = TopicMessage(message_body=message_body)
- # 设置消息的Key。
- produce_msg.set_message_key(f"videoId{video_id}")
- re_msg = producer.publish_message(produce_msg)
- log_.info({
- 'publish': {
- 'status': 'success',
- 'messageID': re_msg.message_id,
- 'bodyMD5': re_msg.message_body_md5,
- 'messageContent': msg_content,
- 'executeTime': (time.time() - start_time) * 1000
- }
- })
- except MQExceptionBase as publish_e:
- log_.error({
- 'errorType': 'publish',
- 'status': 'fail',
- 'videoId': video_id,
- 'exception': publish_e,
- 'traceback': traceback.format_exc()
- })
- if publish_e.type == "TopicNotExist":
- sys.exit(1)
- # 2. 添加消息handle到ack_message列表
- receipt_handle_list.append(msg.receipt_handle)
- # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
- # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
- # log_.info(f"receipt_handle_list: {receipt_handle_list}")
- consumer.ack_message(receipt_handle_list)
- log_.info({
- 'ackMessage': {
- 'status': 'success',
- 'receiptHandleList': receipt_handle_list,
- 'executeTime': (time.time() - start_time0) * 1000
- }
- })
- # except MQExceptionBase as consume_e:
- # log_.error({
- # 'errorType': 'consume',
- # 'status': 'fail',
- # 'exception': consume_e,
- # 'traceback': traceback.format_exc()
- # })
- # # Topic中没有消息可消费。
- # if consume_e.type == "MessageNotExist":
- # print(("No new message! RequestId: %s" % consume_e.req_id))
- # continue
- # # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
- # elif consume_e.sub_errors:
- # for sub_error in consume_e.sub_errors:
- # print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
- # (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
- #
- # log_.error("Consume Message Fail! Exception:%s\n" % consume_e)
- # time.sleep(30)
- # continue
- except Exception as consume_e:
- log_.error({
- 'errorType': 'consume',
- 'status': 'fail',
- 'exception': consume_e,
- 'traceback': traceback.format_exc()
- })
- # log_.error("Consume Message Fail! Exception:%s\n" % e)
- time.sleep(30)
- continue
- def title_generate_main():
- # log_.info("debug: title_generate_main")
- # 初始化client
- mq_client = MQClient(
- # 设置HTTP协议客户端接入点
- host=config_.MQ_CONFIG['ENDPOINT'],
- # AccessKey ID,阿里云身份验证标识
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
- # AccessKey Secret,阿里云身份验证密钥
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
- # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
- # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
- # 监听消息所属的Topic
- consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
- # 您在消息队列RocketMQ版控制台创建的Group ID。
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
- # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
- # 长轮询时间3秒(最多可设置为30秒)。
- wait_seconds = 3
- # 一次最多消费1条(最多可设置为16条)。
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
- # 长轮询消费消息。
- start_time0 = time.time()
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- start_time = time.time()
- receive_info = {
- 'messageId': msg.message_id,
- 'messageBodyMD5': msg.message_body_md5,
- 'messageTag': msg.message_tag,
- 'consumedTimes': msg.consumed_times,
- 'publishTime': msg.publish_time,
- 'body': msg.message_body,
- 'nextConsumeTime': msg.next_consume_time,
- 'receiptHandle': msg.receipt_handle,
- 'properties': msg.properties
- }
- log_.info(receive_info)
- message_body = json.loads(msg.message_body)
- video_id = message_body['videoId']
- video_path = message_body['videoPath']
- # 1. 添加消息handle到ack_message列表
- receipt_handle_list.append(msg.receipt_handle)
- # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
- # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
- consumer.ack_message(receipt_handle_list)
- log_.info({
- 'ackMessage': {
- 'status': 'success',
- 'receiptHandleList': receipt_handle_list,
- 'executeTime': (time.time() - start_time0) * 1000
- }
- })
- # 2. 获取标题
- title, generate_filepath = title_generate(video_id=video_id, video_path=video_path)
- log_.info({'titleGenerateInitial': {'videoId': video_id, 'title': title}})
- # 删除相关文件
- for _, filepath in generate_filepath.items():
- try:
- os.remove(filepath)
- except:
- continue
- if title is None:
- continue
- elif title[0] in ['"', "'", "′", "“"] and title[-1] in ['"', "'", "′", "”"]:
- title = title[1:-1]
- log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
- # 3. 发送结果至done消息队列
- try:
- msg_content = {
- 'title': title,
- 'videoId': video_id
- }
- message_body = json.dumps(msg_content)
- producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
- producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
- produce_msg = TopicMessage(message_body=message_body)
- # 设置消息的Key。
- produce_msg.set_message_key(f"videoId-{video_id}")
- re_msg = producer.publish_message(produce_msg)
- log_.info({
- 'publish': {
- 'status': 'success',
- 'messageID': re_msg.message_id,
- 'bodyMD5': re_msg.message_body_md5,
- 'messageContent': msg_content,
- 'executeTime': (time.time() - start_time) * 1000
- }
- })
- except Exception as publish_e:
- log_.error({
- 'errorType': 'publish',
- 'status': 'fail',
- 'videoId': video_id,
- 'exception': publish_e,
- 'traceback': traceback.format_exc()
- })
- # if publish_e.type == "TopicNotExist":
- # sys.exit(1)
- except Exception as consume_e:
- log_.error({
- 'errorType': 'consume',
- 'status': 'fail',
- 'exception': consume_e,
- 'traceback': traceback.format_exc()
- })
- # log_.error("Consume Message Fail! Exception:%s\n" % e)
- time.sleep(30)
- continue
- class FlaskApp(Flask):
- def __init__(self, *args, **kwargs):
- super(FlaskApp, self).__init__(*args, **kwargs)
- self._activate_background_job()
- def _activate_background_job(self):
- # log_.info("debug: _activate_background_job")
- def run_job():
- # log_.info("debug: run_job")
- title_generate_main()
- t1 = threading.Thread(target=run_job)
- t1.start()
- app = FlaskApp(__name__)
- @app.route('/healthcheck')
- def health_check():
- # log_.info('debug: ok!')
- return 'ok!'
- if __name__ == '__main__':
- # title_generate_main()
- app.run()
|