123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336 |
- import random
- import os
- import logging
- import json
- import threading
- import time
- import traceback
- import ast
- from gevent import monkey
- monkey.patch_all()
- from flask import Flask, request
- from mq_http_sdk.mq_exception import MQExceptionBase
- from mq_http_sdk.mq_producer import *
- from mq_http_sdk.mq_client import *
- from gpt_process import title_generate
- from log import Log
- from config import set_config
- from db_helper import RedisHelper
- from gevent.pywsgi import WSGIServer
- from multiprocessing import cpu_count, Process
- log_ = Log()
- config_ = set_config()
- def title_generate_main0():
-
-
- mq_client = MQClient(
-
- host=config_.MQ_CONFIG['ENDPOINT'],
-
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
-
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
-
-
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
-
- consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
-
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
-
-
- wait_seconds = 3
-
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
-
- start_time0 = time.time()
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- start_time = time.time()
- receive_info = {
- 'messageId': msg.message_id,
- 'messageBodyMD5': msg.message_body_md5,
- 'messageTag': msg.message_tag,
- 'consumedTimes': msg.consumed_times,
- 'publishTime': msg.publish_time,
- 'body': msg.message_body,
- 'nextConsumeTime': msg.next_consume_time,
- 'receiptHandle': msg.receipt_handle,
- 'properties': msg.properties
- }
- log_.info(receive_info)
-
- message_body = json.loads(msg.message_body)
- video_id = message_body['videoId']
- video_path = message_body['videoPath']
- try:
-
-
- title = title_generate(video_id=video_id, video_path=video_path)
- if title[0] in ['"', "'"] and title[-1] in ['"', "'"]:
- title = title[1:-1]
- log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
- except ConnectionResetError:
-
-
- key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
- redis_helper = RedisHelper()
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
-
- retry_count = redis_helper.get_data_from_redis(key_name=key_name)
- log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
- if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
-
- receipt_handle_list.append(msg.receipt_handle)
- pass
- except Exception:
-
- receipt_handle_list.append(msg.receipt_handle)
- log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
- else:
-
- try:
- msg_content = {
- 'title': title,
- 'videoId': video_id
- }
- message_body = json.dumps(msg_content)
- producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
- producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
- produce_msg = TopicMessage(message_body=message_body)
-
- produce_msg.set_message_key(f"videoId{video_id}")
- re_msg = producer.publish_message(produce_msg)
- log_.info({
- 'publish': {
- 'status': 'success',
- 'messageID': re_msg.message_id,
- 'bodyMD5': re_msg.message_body_md5,
- 'messageContent': msg_content,
- 'executeTime': (time.time() - start_time) * 1000
- }
- })
- except MQExceptionBase as publish_e:
- log_.error({
- 'errorType': 'publish',
- 'status': 'fail',
- 'videoId': video_id,
- 'exception': publish_e,
- 'traceback': traceback.format_exc()
- })
- if publish_e.type == "TopicNotExist":
- sys.exit(1)
-
- receipt_handle_list.append(msg.receipt_handle)
-
-
-
- consumer.ack_message(receipt_handle_list)
- log_.info({
- 'ackMessage': {
- 'status': 'success',
- 'receiptHandleList': receipt_handle_list,
- 'executeTime': (time.time() - start_time0) * 1000
- }
- })
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- except Exception as consume_e:
- log_.error({
- 'errorType': 'consume',
- 'status': 'fail',
- 'exception': consume_e,
- 'traceback': traceback.format_exc()
- })
-
- time.sleep(30)
- continue
- def title_generate_main():
-
-
- mq_client = MQClient(
-
- host=config_.MQ_CONFIG['ENDPOINT'],
-
- access_id=config_.MQ_CONFIG['ACCESS_KEY'],
-
- access_key=config_.MQ_CONFIG['SECRET_KEY']
- )
-
-
- instance_id = config_.MQ_CONFIG['INSTANCE_ID']
-
- consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
-
- group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
- consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
-
-
- wait_seconds = 3
-
- batch = 1
- print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
- % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
- while True:
- receipt_handle_list = []
- try:
-
- start_time0 = time.time()
- recv_msgs = consumer.consume_message(batch, wait_seconds)
- for msg in recv_msgs:
- start_time = time.time()
- receive_info = {
- 'messageId': msg.message_id,
- 'messageBodyMD5': msg.message_body_md5,
- 'messageTag': msg.message_tag,
- 'consumedTimes': msg.consumed_times,
- 'publishTime': msg.publish_time,
- 'body': msg.message_body,
- 'nextConsumeTime': msg.next_consume_time,
- 'receiptHandle': msg.receipt_handle,
- 'properties': msg.properties
- }
- log_.info(receive_info)
- message_body = json.loads(msg.message_body)
- video_id = message_body['videoId']
- video_path = message_body['videoPath']
-
- receipt_handle_list.append(msg.receipt_handle)
-
-
- consumer.ack_message(receipt_handle_list)
- log_.info({
- 'ackMessage': {
- 'status': 'success',
- 'receiptHandleList': receipt_handle_list,
- 'executeTime': (time.time() - start_time0) * 1000
- }
- })
-
- title, generate_filepath = title_generate(video_id=video_id, video_path=video_path)
- log_.info({'titleGenerateInitial': {'videoId': video_id, 'title': title}})
-
- for _, filepath in generate_filepath.items():
- try:
- os.remove(filepath)
- except:
- continue
- if title is None:
- continue
- elif title[0] in ['"', "'", "′", "“"] and title[-1] in ['"', "'", "′", "”"]:
- title = title[1:-1]
- log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
-
- try:
- msg_content = {
- 'title': title,
- 'videoId': video_id
- }
- message_body = json.dumps(msg_content)
- producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
- producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
- produce_msg = TopicMessage(message_body=message_body)
-
- produce_msg.set_message_key(f"videoId-{video_id}")
- re_msg = producer.publish_message(produce_msg)
- log_.info({
- 'publish': {
- 'status': 'success',
- 'messageID': re_msg.message_id,
- 'bodyMD5': re_msg.message_body_md5,
- 'messageContent': msg_content,
- 'executeTime': (time.time() - start_time) * 1000
- }
- })
- except Exception as publish_e:
- log_.error({
- 'errorType': 'publish',
- 'status': 'fail',
- 'videoId': video_id,
- 'exception': publish_e,
- 'traceback': traceback.format_exc()
- })
-
-
- except Exception as consume_e:
- log_.error({
- 'errorType': 'consume',
- 'status': 'fail',
- 'exception': consume_e,
- 'traceback': traceback.format_exc()
- })
-
- time.sleep(30)
- continue
- class FlaskApp(Flask):
- def __init__(self, *args, **kwargs):
- super(FlaskApp, self).__init__(*args, **kwargs)
- self._activate_background_job()
- def _activate_background_job(self):
-
- def run_job():
-
- title_generate_main()
- t1 = threading.Thread(target=run_job)
- t1.start()
- app = FlaskApp(__name__)
- @app.route('/healthcheck')
- def health_check():
-
- return 'ok!'
- if __name__ == '__main__':
-
- app.run()
|