123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import random
- import os
- import logging
- import json
- import time
- import traceback
- import ast
- from gevent import monkey
- monkey.patch_all()
- from flask import Flask, request
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.mq_producer import *
- from mq_http_sdk.mq_client import *
- from gpt_process import title_generate
- from log import Log
- from config import set_config
- from db_helper import RedisHelper
- from gevent.pywsgi import WSGIServer
- from multiprocessing import cpu_count, Process
- app = Flask(__name__)
- log_ = Log()
- config_ = set_config()
- def title_generate_main():
-
- mq_client = MQClient(
-
- host=config_.MQ_CONFIG['ENDPOINT'],
-
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
-
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
-
-
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
-
- todo_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
-
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, todo_topic_name, group_id)
-
-
- wait_seconds = 3
-
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", todo_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
-
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
- \nMessageTag: %s\nConsumedTimes: %s \
- \nPublishTime: %s\nBody: %s \
- \nNextConsumeTime: %s \
- \nReceiptHandle: %s \
- \nProperties: %s\n" % \
- (msg.message_id, msg.message_body_md5,
- msg.message_tag, msg.consumed_times,
- msg.publish_time, msg.message_body,
- msg.next_consume_time, msg.receipt_handle, msg.properties)))
- video_id = msg.message_body['videoId']
- video_path = msg.message_body['videoPath']
- try:
- title = title_generate(video_id=video_id, video_path=video_path)
- except ConnectionResetError:
-
- log_.info(video_id)
-
- key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
- redis_helper = RedisHelper()
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
-
- retry_count = redis_helper.get_data_from_redis(key_name=key_name)
- if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
-
- receipt_handle_list.append(msg.receipt_handle)
- pass
- else:
- pass
- except Exception:
-
- receipt_handle_list.append(msg.receipt_handle)
- log_.info(traceback.format_exc())
- else:
-
- print(title)
-
- receipt_handle_list.append(msg.receipt_handle)
- except MQExceptionBase as e:
-
- if e.type == "MessageNotExist":
- print(("No new message! RequestId: %s" % e.req_id))
- continue
- print(("Consume Message Fail! Exception:%s\n" % e))
- time.sleep(2)
- continue
-
-
- try:
- consumer.ack_message(receipt_handle_list)
- print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
- except MQExceptionBase as e:
- print(("\nAk Message Fail! Exception:%s" % e))
-
- if e.sub_errors:
- for sub_error in e.sub_errors:
- print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
- (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
- if __name__ == '__main__':
- title_generate_main()
|