import pandas as pd
from odps import ODPS
from datetime import datetime, timedelta
from threading import Timer
from utils import get_data_from_odps, filter_video_status
from db_helper import RedisHelper
from config import set_config
from log import Log

config_, _ = set_config()
log_ = Log()

features = [
    'videoid',
    'preview人数',  # 过去1天预曝光人数
    'view人数',  # 过去1天曝光人数
    'play人数',  # 过去1天播放人数
    'share人数',  # 过去1天分享人数
    '回流人数',  # 过去1天分享,过去1天回流人数
    'preview次数',  # 过去1天预曝光次数
    'view次数',  # 过去1天曝光次数
    'play次数',  # 过去1天播放次数
    'share次数',  # 过去1天分享次数
    'platform_return',
    'platform_preview',
    'platform_preview_total',
    'platform_show',
    'platform_show_total',
    'platform_view',
    'platform_view_total',
]


def get_rov_redis_key(now_date):
    # 获取rov模型结果存放key
    redis_helper = RedisHelper()
    now_dt = datetime.strftime(now_date, '%Y%m%d')
    key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
    key_dt = now_dt
    if not redis_helper.key_exists(key_name=key_name):
        pre_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
        key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
        key_dt = pre_dt
    return key_name, key_dt


def day_data_check(project, table, now_date):
    """检查数据是否准备好"""
    odps = ODPS(
        access_id=config_.ODPS_CONFIG['ACCESSID'],
        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
        project=project,
        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
        connect_timeout=3000,
        read_timeout=500000,
        pool_maxsize=1000,
        pool_connections=1000
    )

    try:
        dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
        sql = f'select * from {project}.{table} where dt = {dt}'
        with odps.execute_sql(sql=sql).open_reader() as reader:
            data_count = reader.count
    except Exception as e:
        data_count = 0
    return data_count


def get_feature_data(now_date, project, table):
    """获取特征数据"""
    dt = datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d')
    # dt = '20220425'
    records = get_data_from_odps(date=dt, project=project, table=table)
    feature_data = []
    for record in records:
        item = {}
        for feature_name in features:
            item[feature_name] = record[feature_name]
        feature_data.append(item)
    feature_df = pd.DataFrame(feature_data)
    return feature_df


def cal_score1(df):
    # score1计算公式: score = 回流人数/(view人数+10000)
    df = df.fillna(0)
    df['score'] = df['回流人数'] / (df['view人数'] + 1000)
    df = df.sort_values(by=['score'], ascending=False)
    return df


def cal_score2(df):
    # score2计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
    df = df.fillna(0)
    df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
    df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
    df['score'] = df['share_rate'] + 0.01 * df['back_rate']
    df['platform_return_rate'] = df['platform_return'] / df['回流人数']
    df = df.sort_values(by=['score'], ascending=False)
    return df


def video_rank_day(df, now_date, rule_key, param):
    """
    获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
    :param df:
    :param now_date:
    :param rule_key: 天级规则数据进入条件
    :param param: 天级规则数据进入条件参数
    :return:
    """
    # 获取rov模型结果
    redis_helper = RedisHelper()
    key_name, key_dt = get_rov_redis_key(now_date=now_date)
    initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
    log_.info(f'initial data count = {len(initial_data)}, key_dt = {key_dt}')

    # 获取符合进入召回源条件的视频
    return_count = param.get('return_count')
    if return_count:
        day_recall_df = df[df['回流人数'] > return_count]
    else:
        day_recall_df = df
    platform_return_rate = param.get('platform_return_rate', 0)
    day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] >= platform_return_rate]

    # videoid重复时,保留分值高
    day_recall_df = day_recall_df.sort_values(by=['score'], ascending=False)
    day_recall_df = day_recall_df.drop_duplicates(subset=['videoid'], keep='first')
    day_recall_df['videoid'] = day_recall_df['videoid'].astype(int)
    day_recall_videos = day_recall_df['videoid'].to_list()
    log_.info(f'day_recall videos count = {len(day_recall_videos)}')

    # 视频状态过滤
    filtered_videos = filter_video_status(day_recall_videos)
    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))

    # 写入对应的redis
    day_video_ids =[]
    day_recall_result = {}
    for video_id in filtered_videos:
        score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
        day_recall_result[int(video_id)] = float(score)
        day_video_ids.append(int(video_id))
    day_recall_key_name = \
        f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}{rule_key}.{datetime.strftime(now_date, '%Y%m%d')}"
    if len(day_recall_result) > 0:
        redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=7 * 24 * 3600)

    # 去重更新rov模型结果,并另存为redis中
    initial_data_dup = {}
    for video_id, score in initial_data:
        if int(video_id) not in day_video_ids:
            initial_data_dup[int(video_id)] = score
    log_.info(f"initial data dup count = {len(initial_data_dup)}")

    now_dt = datetime.strftime(now_date, '%Y%m%d')
    if key_dt == now_dt:
        initial_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
    else:
        initial_key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
    initial_key_name = f"{initial_key_name_prefix}{rule_key}.{now_dt}"
    if len(initial_data_dup) > 0:
        redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=7 * 24 * 3600)


def rank_by_day(now_date, rule_params, project, table):
    # 获取特征数据
    feature_df = get_feature_data(now_date=now_date, project=project, table=table)
    # rank
    for key, value in rule_params.items():
        log_.info(f"rule = {key}, param = {value}")
        # 计算score
        cal_score_func = value.get('cal_score_func', 1)
        if cal_score_func == 2:
            score_df = cal_score2(df=feature_df)
        else:
            score_df = cal_score1(df=feature_df)
        video_rank_day(df=score_df, now_date=now_date, rule_key=key, param=value)
        # to-csv
        score_filename = f"score_{key}_{datetime.strftime(now_date, '%Y%m%d')}.csv"
        score_df.to_csv(f'./data/{score_filename}')
        # to-logs
        log_.info({"date": datetime.strftime(now_date, '%Y%m%d%H'),
                   "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_BY_DAY,
                   "rule_key": key,
                   "score_df": score_df[['videoid', 'score']]})


def day_rank_bottom(now_date, rule_key):
    """未按时更新数据,用模型召回数据作为当前的数据"""
    log_.info(f"rule_key = {rule_key}")
    now_dt = datetime.strftime(now_date, '%Y%m%d')
    redis_helper = RedisHelper()
    key_name, key_dt = get_rov_redis_key(now_date=now_date)
    initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
    final_data = dict()
    for video_id, score in initial_data:
        final_data[video_id] = score
    if key_dt == now_dt:
        key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
    else:
        key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE
    key_name = f"{key_name_prefix}{rule_key}.{now_dt}"
    if len(final_data) > 0:
        redis_helper.add_data_with_zset(key_name=key_name, data=final_data, expire_time=7 * 24 * 3600)


def day_timer_check():
    project = config_.PROJECT_DAY
    table = config_.TABLE_DAY
    rule_params = config_.RULE_PARAMS_DAY
    now_date = datetime.today()
    log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d')}")
    now_min = datetime.now().minute
    # 查看当前天级更新的数据是否已准备好
    h_data_count = day_data_check(project=project, table=table, now_date=now_date)
    if h_data_count > 0:
        log_.info(f'h_data_count = {h_data_count}')
        # 数据准备好,进行更新
        rank_by_day(now_date=now_date, rule_params=rule_params, project=project, table=table)
    elif now_min > 50:
        log_.info('day_recall data is None!')
        for key, _ in rule_params.items():
            day_rank_bottom(now_date=now_date, rule_key=key)
    else:
        # 数据没准备好,1分钟后重新检查
        Timer(60, day_timer_check).start()


if __name__ == '__main__':
    day_timer_check()