import random
import os
import logging
import json
import threading
import time
import traceback
import ast
from gevent import monkey
monkey.patch_all()

from flask import Flask, request
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
from gpt_process import title_generate
from log import Log
from config import set_config
from db_helper import RedisHelper
from gevent.pywsgi import WSGIServer
from multiprocessing import cpu_count, Process


log_ = Log()
config_ = set_config()


def title_generate_main0():
    # log_.info("debug: title_generate_main")
    # 初始化client
    mq_client = MQClient(
        # 设置HTTP协议客户端接入点
        host=config_.MQ_CONFIG['ENDPOINT'],
        # AccessKey ID,阿里云身份验证标识
        access_id=config_.MQ_CONFIG['ACCESS_KEY'],
        # AccessKey Secret,阿里云身份验证密钥
        access_key=config_.MQ_CONFIG['SECRET_KEY']
    )
    # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
    instance_id = config_.MQ_CONFIG['INSTANCE_ID']
    # 监听消息所属的Topic
    consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
    # 您在消息队列RocketMQ版控制台创建的Group ID。
    group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
    consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
    # 长轮询时间3秒(最多可设置为30秒)。
    wait_seconds = 3
    # 一次最多消费1条(最多可设置为16条)。
    batch = 1
    print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
           % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
    while True:
        receipt_handle_list = []
        try:
            # 长轮询消费消息。
            start_time0 = time.time()
            recv_msgs = consumer.consume_message(batch, wait_seconds)
            for msg in recv_msgs:
                start_time = time.time()
                receive_info = {
                    'messageId': msg.message_id,
                    'messageBodyMD5': msg.message_body_md5,
                    'messageTag': msg.message_tag,
                    'consumedTimes': msg.consumed_times,
                    'publishTime': msg.publish_time,
                    'body': msg.message_body,
                    'nextConsumeTime': msg.next_consume_time,
                    'receiptHandle': msg.receipt_handle,
                    'properties': msg.properties
                }
                log_.info(receive_info)
                # log_.info(f"debug: {receive_info}")
                message_body = json.loads(msg.message_body)
                video_id = message_body['videoId']
                video_path = message_body['videoPath']
                try:
                    # ppp = {'videoId': video_id, 'videoPath': video_path}
                    # log_.info(f"debug: {ppp}")
                    title = title_generate(video_id=video_id, video_path=video_path)
                    if title[0] in ['"', "'"] and title[-1] in ['"', "'"]:
                        title = title[1:-1]
                    log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})

                except ConnectionResetError:
                    # API限流
                    # 记录重试次数
                    key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
                    redis_helper = RedisHelper()
                    redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
                    redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
                    # 判断已重试次数
                    retry_count = redis_helper.get_data_from_redis(key_name=key_name)
                    log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
                    if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
                        # 添加消息handle到ack_message列表
                        receipt_handle_list.append(msg.receipt_handle)
                        pass

                except Exception:
                    # 添加消息handle到ack_message列表
                    receipt_handle_list.append(msg.receipt_handle)
                    log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
                else:
                    # 1. 发送结果至done消息队列
                    try:
                        msg_content = {
                            'title': title,
                            'videoId': video_id
                        }
                        message_body = json.dumps(msg_content)
                        producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
                        producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
                        produce_msg = TopicMessage(message_body=message_body)
                        # 设置消息的Key。
                        produce_msg.set_message_key(f"videoId{video_id}")
                        re_msg = producer.publish_message(produce_msg)
                        log_.info({
                            'publish': {
                                'status': 'success',
                                'messageID': re_msg.message_id,
                                'bodyMD5': re_msg.message_body_md5,
                                'messageContent': msg_content,
                                'executeTime': (time.time() - start_time) * 1000
                            }
                        })
                    except MQExceptionBase as publish_e:
                        log_.error({
                            'errorType': 'publish',
                            'status': 'fail',
                            'videoId': video_id,
                            'exception': publish_e,
                            'traceback': traceback.format_exc()
                        })
                        if publish_e.type == "TopicNotExist":
                            sys.exit(1)

                    # 2. 添加消息handle到ack_message列表
                    receipt_handle_list.append(msg.receipt_handle)

            # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
            # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
            # log_.info(f"receipt_handle_list: {receipt_handle_list}")
            consumer.ack_message(receipt_handle_list)
            log_.info({
                'ackMessage': {
                    'status': 'success',
                    'receiptHandleList': receipt_handle_list,
                    'executeTime': (time.time() - start_time0) * 1000
                }
            })
        # except MQExceptionBase as consume_e:
        #     log_.error({
        #         'errorType': 'consume',
        #         'status': 'fail',
        #         'exception': consume_e,
        #         'traceback': traceback.format_exc()
        #     })
        #     # Topic中没有消息可消费。
        #     if consume_e.type == "MessageNotExist":
        #         print(("No new message! RequestId: %s" % consume_e.req_id))
        #         continue
        #     # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
        #     elif consume_e.sub_errors:
        #         for sub_error in consume_e.sub_errors:
        #             print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
        #                    (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
        #
        #     log_.error("Consume Message Fail! Exception:%s\n" % consume_e)
        #     time.sleep(30)
        #     continue

        except Exception as consume_e:
            log_.error({
                'errorType': 'consume',
                'status': 'fail',
                'exception': consume_e,
                'traceback': traceback.format_exc()
            })
            # log_.error("Consume Message Fail! Exception:%s\n" % e)
            time.sleep(30)
            continue


def title_generate_main():
    # log_.info("debug: title_generate_main")
    # 初始化client
    mq_client = MQClient(
        # 设置HTTP协议客户端接入点
        host=config_.MQ_CONFIG['ENDPOINT'],
        # AccessKey ID,阿里云身份验证标识
        access_id=config_.MQ_CONFIG['ACCESS_KEY'],
        # AccessKey Secret,阿里云身份验证密钥
        access_key=config_.MQ_CONFIG['SECRET_KEY']
    )
    # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
    instance_id = config_.MQ_CONFIG['INSTANCE_ID']
    # 监听消息所属的Topic
    consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
    # 您在消息队列RocketMQ版控制台创建的Group ID。
    group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
    consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
    # 长轮询时间3秒(最多可设置为30秒)。
    wait_seconds = 3
    # 一次最多消费1条(最多可设置为16条)。
    batch = 1
    print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
           % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
    while True:
        receipt_handle_list = []
        try:
            # 长轮询消费消息。
            start_time0 = time.time()
            recv_msgs = consumer.consume_message(batch, wait_seconds)
            for msg in recv_msgs:
                start_time = time.time()
                receive_info = {
                    'messageId': msg.message_id,
                    'messageBodyMD5': msg.message_body_md5,
                    'messageTag': msg.message_tag,
                    'consumedTimes': msg.consumed_times,
                    'publishTime': msg.publish_time,
                    'body': msg.message_body,
                    'nextConsumeTime': msg.next_consume_time,
                    'receiptHandle': msg.receipt_handle,
                    'properties': msg.properties
                }
                log_.info(receive_info)
                message_body = json.loads(msg.message_body)
                video_id = message_body['videoId']
                video_path = message_body['videoPath']

                # 1. 添加消息handle到ack_message列表
                receipt_handle_list.append(msg.receipt_handle)

                # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
                # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
                consumer.ack_message(receipt_handle_list)
                log_.info({
                    'ackMessage': {
                        'status': 'success',
                        'receiptHandleList': receipt_handle_list,
                        'executeTime': (time.time() - start_time0) * 1000
                    }
                })

                # 2. 获取标题
                title, generate_filepath = title_generate(video_id=video_id, video_path=video_path)
                log_.info({'titleGenerateInitial': {'videoId': video_id, 'title': title}})
                # 删除相关文件
                for _, filepath in generate_filepath.items():
                    try:
                        os.remove(filepath)
                    except:
                        continue
                if title is None:
                    continue
                elif title[0] in ['"', "'", "′", "“"] and title[-1] in ['"', "'", "′", "”"]:
                    title = title[1:-1]
                log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})

                # 3. 发送结果至done消息队列
                try:
                    msg_content = {
                        'title': title,
                        'videoId': video_id
                    }
                    message_body = json.dumps(msg_content)
                    producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
                    producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
                    produce_msg = TopicMessage(message_body=message_body)
                    # 设置消息的Key。
                    produce_msg.set_message_key(f"videoId-{video_id}")
                    re_msg = producer.publish_message(produce_msg)
                    log_.info({
                        'publish': {
                            'status': 'success',
                            'messageID': re_msg.message_id,
                            'bodyMD5': re_msg.message_body_md5,
                            'messageContent': msg_content,
                            'executeTime': (time.time() - start_time) * 1000
                        }
                    })
                except Exception as publish_e:
                    log_.error({
                        'errorType': 'publish',
                        'status': 'fail',
                        'videoId': video_id,
                        'exception': publish_e,
                        'traceback': traceback.format_exc()
                    })
                    # if publish_e.type == "TopicNotExist":
                    #     sys.exit(1)

        except Exception as consume_e:
            log_.error({
                'errorType': 'consume',
                'status': 'fail',
                'exception': consume_e,
                'traceback': traceback.format_exc()
            })
            # log_.error("Consume Message Fail! Exception:%s\n" % e)
            time.sleep(30)
            continue


class FlaskApp(Flask):
    def __init__(self, *args, **kwargs):
        super(FlaskApp, self).__init__(*args, **kwargs)
        self._activate_background_job()

    def _activate_background_job(self):
        # log_.info("debug: _activate_background_job")
        def run_job():
            # log_.info("debug: run_job")
            title_generate_main()

        t1 = threading.Thread(target=run_job)
        t1.start()


app = FlaskApp(__name__)


@app.route('/healthcheck')
def health_check():
    # log_.info('debug: ok!')
    return 'ok!'


if __name__ == '__main__':
    # title_generate_main()
    app.run()