import random
import os
import logging
import json
import threading
import time
import traceback
import ast
from gevent import monkey
monkey.patch_all()

from flask import Flask, request
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
from gpt_process import title_generate
from log import Log
from config import set_config
from db_helper import RedisHelper
from gevent.pywsgi import WSGIServer
from multiprocessing import cpu_count, Process


log_ = Log()
config_ = set_config()


class FlaskApp(Flask):
    def __init__(self, *args, **kwargs):
        super(FlaskApp, self).__init__(*args, **kwargs)
        self._activate_background_job()

    def _activate_background_job(self):
        def run_job():
            title_generate_main()

        t1 = threading.Thread(target=run_job)
        t1.start()


def title_generate_main():
    # 初始化client
    mq_client = MQClient(
        # 设置HTTP协议客户端接入点
        host=config_.MQ_CONFIG['ENDPOINT'],
        # AccessKey ID,阿里云身份验证标识
        access_id=config_.MQ_CONFIG['ACCESS_KEY'],
        # AccessKey Secret,阿里云身份验证密钥
        access_key=config_.MQ_CONFIG['SECRET_KEY']
    )
    # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
    instance_id = config_.MQ_CONFIG['INSTANCE_ID']
    # 监听消息所属的Topic
    consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
    # 您在消息队列RocketMQ版控制台创建的Group ID。
    group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
    consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
    # 长轮询时间3秒(最多可设置为30秒)。
    wait_seconds = 3
    # 一次最多消费1条(最多可设置为16条)。
    batch = 1
    print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
           % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
    while True:
        receipt_handle_list = []
        try:
            # 长轮询消费消息。
            recv_msgs = consumer.consume_message(batch, wait_seconds)
            for msg in recv_msgs:
                receive_info = {
                    'messageId': msg.message_id,
                    'messageBodyMD5': msg.message_body_md5,
                    'messageTag': msg.message_tag,
                    'consumedTimes': msg.consumed_times,
                    'publishTime': msg.publish_time,
                    'body': msg.message_body,
                    'nextConsumeTime': msg.next_consume_time,
                    'receiptHandle': msg.receipt_handle,
                    'properties': msg.properties
                }
                log_.info(receive_info)
                video_id = msg.message_body['videoId']
                video_path = msg.message_body['videoPath']
                try:
                    title = title_generate(video_id=video_id, video_path=video_path)
                    log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})

                except ConnectionResetError:
                    # API限流
                    # 记录重试次数
                    key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
                    redis_helper = RedisHelper()
                    redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
                    redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
                    # 判断已重试次数
                    retry_count = redis_helper.get_data_from_redis(key_name=key_name)
                    log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
                    if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
                        # 确认消息消费成功
                        receipt_handle_list.append(msg.receipt_handle)
                        pass

                except Exception:
                    # 确认消息消费成功
                    receipt_handle_list.append(msg.receipt_handle)
                    log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
                else:
                    # 1. 发送结果至done消息队列
                    try:
                        msg_content = {
                            'title': title,
                            'videoId': video_id
                        }
                        message_body = json.dumps(msg_content)
                        producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
                        producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
                        msg = TopicMessage(message_body=message_body)
                        re_msg = producer.publish_message(msg)
                        log_.info({
                            'publish': {
                                'status': 'success',
                                'messageID': re_msg.message_id,
                                'bodyMD5': re_msg.message_body_md5,
                                'messageContent': msg_content
                            }
                        })
                    except MQExceptionBase as publish_e:
                        log_.error({
                            'errorType': 'publish',
                            'status': 'fail',
                            'videoId': video_id,
                            'exception': publish_e,
                            'traceback': traceback.format_exc()
                        })
                        if publish_e.type == "TopicNotExist":
                            sys.exit(1)

                    # 2. 确认消息消费成功
                    receipt_handle_list.append(msg.receipt_handle)

            # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
            # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
            consumer.ack_message(receipt_handle_list)
            log_.info({
                'ackMessage': {
                    'status': 'success',
                    'receiptHandleList': receipt_handle_list
                }
            })
        except MQExceptionBase as consume_e:
            log_.error({
                'errorType': 'consume',
                'status': 'fail',
                'exception': consume_e,
                'traceback': traceback.format_exc()
            })
            # Topic中没有消息可消费。
            if consume_e.type == "MessageNotExist":
                print(("No new message! RequestId: %s" % consume_e.req_id))
                continue
            # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
            elif consume_e.sub_errors:
                for sub_error in consume_e.sub_errors:
                    print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
                           (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))

            log_.error("Consume Message Fail! Exception:%s\n" % consume_e)
            time.sleep(2)
            continue


if __name__ == '__main__':
    # title_generate_main()
    app = FlaskApp(__name__)
    app.run()