liqian vor 2 Jahren
Ursprung
Commit
b51daa0f45
5 geänderte Dateien mit 156 neuen und 71 gelöschten Zeilen
  1. 91 39
      app.py
  2. 32 0
      config.py
  3. 4 2
      gpt_process.py
  4. 1 0
      log.py
  5. 28 30
      log_conf.py

+ 91 - 39
app.py

@@ -2,6 +2,7 @@ import random
 import os
 import logging
 import json
+import threading
 import time
 import traceback
 import ast
@@ -18,15 +19,30 @@ from config import set_config
 from db_helper import RedisHelper
 from gevent.pywsgi import WSGIServer
 from multiprocessing import cpu_count, Process
-# from werkzeug.middleware.profiler import ProfilerMiddleware
-# from geventwebsocket.handler import WebSocketHandler
 
-app = Flask(__name__)
+
 log_ = Log()
 config_ = set_config()
 
 
+class FlaskApp(Flask):
+    def __init__(self, *args, **kwargs):
+        super(FlaskApp, self).__init__(*args, **kwargs)
+        self._activate_background_job()
+
+    def _activate_background_job(self):
+        def run_job():
+            title_generate_main()
+
+        t1 = threading.Thread(target=run_job)
+        t1.start()
+
+
 def title_generate_main():
+    log_.error({
+        'errorType': '1111',
+        'status': 'fail',
+    })
     # 初始化client
     mq_client = MQClient(
         # 设置HTTP协议客户端接入点
@@ -57,23 +73,26 @@ def title_generate_main():
             # 长轮询消费消息。
             recv_msgs = consumer.consume_message(batch, wait_seconds)
             for msg in recv_msgs:
-                print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
-                                  \nMessageTag: %s\nConsumedTimes: %s \
-                                  \nPublishTime: %s\nBody: %s \
-                                  \nNextConsumeTime: %s \
-                                  \nReceiptHandle: %s \
-                                  \nProperties: %s\n" % \
-                       (msg.message_id, msg.message_body_md5,
-                        msg.message_tag, msg.consumed_times,
-                        msg.publish_time, msg.message_body,
-                        msg.next_consume_time, msg.receipt_handle, msg.properties)))
+                receive_info = {
+                    'messageId': msg.message_id,
+                    'messageBodyMD5': msg.message_body_md5,
+                    'messageTag': msg.message_tag,
+                    'consumedTimes': msg.consumed_times,
+                    'publishTime': msg.publish_time,
+                    'body': msg.message_body,
+                    'nextConsumeTime': msg.next_consume_time,
+                    'receiptHandle': msg.receipt_handle,
+                    'properties': msg.properties
+                }
+                log_.info(receive_info)
                 video_id = msg.message_body['videoId']
                 video_path = msg.message_body['videoPath']
                 try:
                     title = title_generate(video_id=video_id, video_path=video_path)
+                    log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
+
                 except ConnectionResetError:
                     # API限流
-                    log_.info(video_id)
                     # 记录重试次数
                     key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
                     redis_helper = RedisHelper()
@@ -81,49 +100,82 @@ def title_generate_main():
                     redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
                     # 判断已重试次数
                     retry_count = redis_helper.get_data_from_redis(key_name=key_name)
+                    log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
                     if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
                         # 确认消息消费成功
                         receipt_handle_list.append(msg.receipt_handle)
                         pass
-                    else:
-                        pass
 
                 except Exception:
                     # 确认消息消费成功
                     receipt_handle_list.append(msg.receipt_handle)
-                    log_.info(traceback.format_exc())
+                    log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
                 else:
                     # 1. 发送结果至done消息队列
-                    producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
-                    producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
-                    ????
-                    print(title)
+                    try:
+                        msg_content = {
+                            'title': title,
+                            'videoId': video_id
+                        }
+                        message_body = json.dumps(msg_content)
+                        producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
+                        producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
+                        msg = TopicMessage(message_body=message_body)
+                        re_msg = producer.publish_message(msg)
+                        log_.info({
+                            'publish': {
+                                'status': 'success',
+                                'messageID': re_msg.message_id,
+                                'bodyMD5': re_msg.message_body_md5,
+                                'messageContent': msg_content
+                            }
+                        })
+                    except MQExceptionBase as publish_e:
+                        log_.error({
+                            'errorType': 'publish',
+                            'status': 'fail',
+                            'videoId': video_id,
+                            'exception': publish_e,
+                            'traceback': traceback.format_exc()
+                        })
+                        if publish_e.type == "TopicNotExist":
+                            sys.exit(1)
+
                     # 2. 确认消息消费成功
                     receipt_handle_list.append(msg.receipt_handle)
 
-        except MQExceptionBase as e:
+            # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
+            # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
+            consumer.ack_message(receipt_handle_list)
+            log_.info({
+                'ackMessage': {
+                    'status': 'success',
+                    'receiptHandleList': receipt_handle_list
+                }
+            })
+        except MQExceptionBase as consume_e:
+            log_.error({
+                'errorType': '1111',
+                'status': 'fail',
+                'exception': consume_e,
+                'traceback': traceback.format_exc()
+            })
             # Topic中没有消息可消费。
-            if e.type == "MessageNotExist":
-                print(("No new message! RequestId: %s" % e.req_id))
+            if consume_e.type == "MessageNotExist":
+                print(("No new message! RequestId: %s" % consume_e.req_id))
                 continue
-
-            print(("Consume Message Fail! Exception:%s\n" % e))
-            time.sleep(2)
-            continue
-
-        # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
-        # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
-        try:
-            consumer.ack_message(receipt_handle_list)
-            print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
-        except MQExceptionBase as e:
-            print(("\nAk Message Fail! Exception:%s" % e))
             # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
-            if e.sub_errors:
-                for sub_error in e.sub_errors:
+            elif consume_e.sub_errors:
+                for sub_error in consume_e.sub_errors:
                     print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
                            (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
 
+            log_.error("Consume Message Fail! Exception:%s\n" % consume_e)
+            time.sleep(2)
+            continue
+
 
 if __name__ == '__main__':
-    title_generate_main()
+    # title_generate_main()
+    app = FlaskApp(__name__)
+    app.run()

+ 32 - 0
config.py

@@ -85,6 +85,14 @@ class DevelopmentConfig(BaseConfig):
     # 报警内容 环境区分
     ENV_TEXT = "开发环境"
 
+    # 日志服务配置
+    ALIYUN_LOG = {
+        'ENDPOINT': 'cn-hangzhou-intranet.log.aliyuncs.com',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+        'PROJECT': 'aigc-server-test',
+    }
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -111,6 +119,14 @@ class TestConfig(BaseConfig):
     # 报警内容 环境区分
     ENV_TEXT = "测试环境"
 
+    # 日志服务配置
+    ALIYUN_LOG = {
+        'ENDPOINT': 'cn-hangzhou-intranet.log.aliyuncs.com',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+        'PROJECT': 'aigc-server-test',
+    }
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com',
@@ -137,6 +153,14 @@ class PreProductionConfig(BaseConfig):
     # 报警内容 环境区分
     ENV_TEXT = "预发布环境"
 
+    # 日志服务配置
+    ALIYUN_LOG = {
+        'ENDPOINT': 'cn-hangzhou-intranet.log.aliyuncs.com',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+        'PROJECT': 'aigc-server',
+    }
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -163,6 +187,14 @@ class ProductionConfig(BaseConfig):
     # 报警内容 环境区分
     ENV_TEXT = "生产环境"
 
+    # 日志服务配置
+    ALIYUN_LOG = {
+        'ENDPOINT': 'cn-hangzhou-intranet.log.aliyuncs.com',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+        'PROJECT': 'aigc-server',
+    }
+
     # redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',

+ 4 - 2
gpt_process.py

@@ -27,7 +27,7 @@ def request_gpt(prompt):
             },
         ],
     }
-    raise ConnectionResetError
+    # raise ConnectionResetError
     response = requests.post(url=config_.GPT_HOST, headers=headers, json=json_data, proxies=proxies)
     # print(response.json())
     # print(response.json()['choices'][0]['message']['content'])
@@ -51,7 +51,9 @@ def title_generate(video_id, video_path):
     log_.info(f"audio_path = {audio_path}")
     # 3. asr
     dialogue_path, asr_res = call_asr(audio_path=audio_path)
-    log_.info(f"dialogue_path = {dialogue_path}, asr_res = {asr_res}")
+    log_.info({
+        'asrResult': {'videoId': video_id, 'asrRes': asr_res}
+    })
     # 4. gpt产出结果
     prompt = f"{config_.GPT_PROMPT['title']['prompt2']}{asr_res.strip()}"
     gpt_res = request_gpt(prompt=prompt)

+ 1 - 0
log.py

@@ -36,5 +36,6 @@ class Log(object):
         # return
 
     def error(self, message):
+        print(111)
         self.__console('error', message)
         # return

+ 28 - 30
log_conf.py

@@ -31,32 +31,32 @@ conf = {
             'level': 'DEBUG',
             'formatter': 'simpleFormatter',
         },
-        # 'slsHandler': {
-        #     '()': 'aliyun.log.QueuedLogHandler',
-        #     'level': 'INFO',
-        #     'formatter': 'rawFormatter',
-        #     # custom args:
-        #     'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
-        #     'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
-        #     'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
-        #     'project': config_.ALIYUN_LOG.get('PROJECT', ''),
-        #     'log_store': "info",
-        #     'extract_kv': True,
-        #     'extract_json': True
-        # },
-        # 'errorHandler': {
-        #     '()': 'aliyun.log.QueuedLogHandler',
-        #     'level': 'ERROR',
-        #     'formatter': 'rawFormatter',
-        #     # custom args:
-        #     'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
-        #     'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
-        #     'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
-        #     'project': config_.ALIYUN_LOG.get('PROJECT', ''),
-        #     'log_store': "error",
-        #     'extract_kv': True,
-        #     'extract_json': True
-        # },
+        'slsHandler': {
+            '()': 'aliyun.log.QueuedLogHandler',
+            'level': 'INFO',
+            'formatter': 'rawFormatter',
+            # custom args:
+            'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
+            'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
+            'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
+            'project': config_.ALIYUN_LOG.get('PROJECT', ''),
+            'log_store': "info",
+            'extract_kv': True,
+            'extract_json': True
+        },
+        'errorHandler': {
+            '()': 'aliyun.log.QueuedLogHandler',
+            'level': 'ERROR',
+            'formatter': 'rawFormatter',
+            # custom args:
+            'end_point': config_.ALIYUN_LOG.get('ENDPOINT', ''),
+            'access_key_id': config_.ALIYUN_LOG.get('ACCESSID', ''),
+            'access_key': config_.ALIYUN_LOG.get('ACCESSKEY', ''),
+            'project': config_.ALIYUN_LOG.get('PROJECT', ''),
+            'log_store': "error",
+            'extract_kv': True,
+            'extract_json': True
+        },
         'fileHandler': {
             '()': 'logging.FileHandler',
             'level': 'INFO',
@@ -72,14 +72,12 @@ conf = {
             'level': 'DEBUG'
         },
         'sls': {
-            # 'handlers': ['consoleHandler', 'slsHandler'],
-            'handlers': ['consoleHandler', 'fileHandler'],
+            'handlers': ['consoleHandler', 'slsHandler'],
             'level': 'INFO',
             'propagate': False
         },
         'error': {
-            # 'handlers': ['consoleHandler', 'errorHandler'],
-            'handlers': ['consoleHandler', 'fileHandler'],
+            'handlers': ['consoleHandler', 'errorHandler'],
             'level': 'ERROR',
             'propagate': False
         }