|
@@ -0,0 +1,228 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# @ModuleName: rule_rank_h_18_19
|
|
|
+# @Author: Liqian
|
|
|
+# @Time: 2022/4/21 下午4:31
|
|
|
+# @Software: PyCharm
|
|
|
+
|
|
|
+import datetime
|
|
|
+import pandas as pd
|
|
|
+import math
|
|
|
+from odps import ODPS
|
|
|
+from threading import Timer
|
|
|
+from get_data import get_data_from_odps
|
|
|
+from db_helper import RedisHelper
|
|
|
+from config import set_config
|
|
|
+from log import Log
|
|
|
+
|
|
|
+config_, _ = set_config()
|
|
|
+log_ = Log()
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'videoid',
|
|
|
+ 'lastonehour_view', # 过去1小时曝光
|
|
|
+ 'lastonehour_play', # 过去1小时播放
|
|
|
+ 'lastonehour_share', # 过去1小时分享
|
|
|
+ 'lastonehour_return', # 过去1小时分享,过去1小时回流
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+def h_data_check(project, table, now_date):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_count = reader.count
|
|
|
+ except Exception as e:
|
|
|
+ data_count = 0
|
|
|
+ return data_count
|
|
|
+
|
|
|
+
|
|
|
+def get_feature_data(now_date, project, table):
|
|
|
+ """获取特征数据"""
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
|
|
|
+ # dt = '2022041310'
|
|
|
+ records = get_data_from_odps(date=dt, project=project, table=table)
|
|
|
+ feature_data = []
|
|
|
+ for record in records:
|
|
|
+ item = {}
|
|
|
+ for feature_name in features:
|
|
|
+ item[feature_name] = record[feature_name]
|
|
|
+ feature_data.append(item)
|
|
|
+ feature_df = pd.DataFrame(feature_data)
|
|
|
+ return feature_df
|
|
|
+
|
|
|
+
|
|
|
+def cal_score(df):
|
|
|
+ """
|
|
|
+ 计算score
|
|
|
+ :param df: 特征数据
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ # score计算公式: sharerate*backrate*logback*ctr
|
|
|
+ # sharerate = lastonehour_share/(lastonehour_play+1000)
|
|
|
+ # backrate = lastonehour_return/(lastonehour_share+10)
|
|
|
+ # ctr = lastonehour_play/(lastonehour_view+1000), 对ctr限最大值:K2 = 0.6 if ctr > 0.6 else ctr
|
|
|
+ # score = sharerate * backrate * LOG(lastonehour_return+1) * K2
|
|
|
+
|
|
|
+ df = df.fillna(0)
|
|
|
+ df['share_rate'] = df['lastonehour_share'] / (df['lastonehour_play'] + 1000)
|
|
|
+ df['back_rate'] = df['lastonehour_return'] / (df['lastonehour_share'] + 10)
|
|
|
+ df['log_back'] = (df['lastonehour_return'] + 1).apply(math.log)
|
|
|
+ df['ctr'] = df['lastonehour_play'] / (df['lastonehour_view'] + 1000)
|
|
|
+ df['K2'] = df['ctr'].apply(lambda x: 0.6 if x > 0.6 else x)
|
|
|
+ df['score'] = df['share_rate'] * df['back_rate'] * df['log_back'] * df['K2']
|
|
|
+ df = df.sort_values(by=['score'], ascending=False)
|
|
|
+ return df
|
|
|
+
|
|
|
+
|
|
|
+def video_rank(df, now_date, now_h, return_count):
|
|
|
+ """
|
|
|
+ 根据回流数量,对视频进行二次排序
|
|
|
+ :param df:
|
|
|
+ :param now_date:
|
|
|
+ :param now_h:
|
|
|
+ :param return_count: 小时级数据回流限制数
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ log_.info(f'df length = {len(df)}')
|
|
|
+ # 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
|
|
|
+ h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= 0.005)]
|
|
|
+ h_recall_videos = h_recall_df['videoid'].to_list()
|
|
|
+ log_.info(f'h_recall videos count = {len(h_recall_videos)}')
|
|
|
+ # 不符合进入召回源条件的视频
|
|
|
+ df = df.append(h_recall_df)
|
|
|
+ h_else_df = df.drop_duplicates(['videoid'], keep=False)
|
|
|
+ h_else_df = h_else_df.sort_values(by=['score'], ascending=False)
|
|
|
+ h_else_videos = h_else_df['videoid'].to_list
|
|
|
+ # 合并,给定分数
|
|
|
+ final_videos = h_recall_videos + h_else_videos
|
|
|
+ for i, video_id in enumerate(final_videos):
|
|
|
+
|
|
|
+
|
|
|
+ # 写入对应的redis
|
|
|
+ h_video_ids =[]
|
|
|
+ h_recall_result = {}
|
|
|
+ for video_id in h_recall_videos:
|
|
|
+ score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
|
|
|
+ h_recall_result[int(video_id)] = float(score)
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+ h_recall_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(h_recall_result) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
|
|
|
+ # 清空线上过滤应用列表
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
|
|
|
+
|
|
|
+ # 去重更新rov模型结果,并另存为redis中
|
|
|
+ initial_data_dup = {}
|
|
|
+ for video_id, score in initial_data:
|
|
|
+ if int(video_id) not in h_video_ids:
|
|
|
+ initial_data_dup[int(video_id)] = score
|
|
|
+ log_.info(f"initial data dup count = {len(initial_data_dup)}")
|
|
|
+ initial_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(initial_data_dup) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+ # # 去重合并
|
|
|
+ # final_videos = [int(item) for item in h_recall_videos]
|
|
|
+ # temp_videos = [int(video_id) for video_id, _ in initial_data if int(video_id) not in final_videos]
|
|
|
+ # final_videos = final_videos + temp_videos
|
|
|
+ # log_.info(f'final videos count = {len(final_videos)}')
|
|
|
+ #
|
|
|
+ # # 重新给定score
|
|
|
+ # final_data = {}
|
|
|
+ # for i, video_id in enumerate(final_videos):
|
|
|
+ # score = 100 - i * config_.ROV_SCORE_D
|
|
|
+ # final_data[video_id] = score
|
|
|
+ #
|
|
|
+ # # 存入对应的redis
|
|
|
+ # final_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ # redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+def rank_by_h(now_date, now_h, return_count_list):
|
|
|
+ # 获取特征数据
|
|
|
+ feature_df = get_feature_data(now_date=now_date)
|
|
|
+ # 计算score
|
|
|
+ score_df = cal_score(df=feature_df)
|
|
|
+ # rank
|
|
|
+ for cnt in return_count_list:
|
|
|
+ log_.info(f"return_count = {cnt}")
|
|
|
+ video_rank(df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
+ # to-csv
|
|
|
+ score_filename = f"score_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
|
+ score_df.to_csv(f'./data/{score_filename}')
|
|
|
+
|
|
|
+
|
|
|
+def h_rank_bottom(now_date, now_h, return_count):
|
|
|
+ """未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
|
+ log_.info(f"return_count = {return_count}")
|
|
|
+ # 获取rov模型结果
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ if now_h == 0:
|
|
|
+ redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
|
+ redis_h = 23
|
|
|
+ else:
|
|
|
+ redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ redis_h = now_h - 1
|
|
|
+ key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H]
|
|
|
+ for key_prefix in key_prefix_list:
|
|
|
+ key_name = f"{key_prefix}{return_count}.{redis_dt}.{redis_h}"
|
|
|
+ initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
|
+ final_data = dict()
|
|
|
+ for video_id, score in initial_data:
|
|
|
+ final_data[video_id] = score
|
|
|
+ # 存入对应的redis
|
|
|
+ final_key_name = \
|
|
|
+ f"{key_prefix}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(final_data) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
+ # 清空线上过滤应用列表
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
|
|
|
+
|
|
|
+
|
|
|
+def h_timer_check():
|
|
|
+ return_count_list = [20, 10]
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
+ now_h = datetime.datetime.now().hour
|
|
|
+ now_min = datetime.datetime.now().minute
|
|
|
+ if now_h == 0:
|
|
|
+ for cnt in return_count_list:
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
+ return
|
|
|
+ # 查看当前小时更新的数据是否已准备好
|
|
|
+ h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
|
+ if h_data_count > 0:
|
|
|
+ log_.info(f'h_data_count = {h_data_count}')
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ rank_by_h(now_date=now_date, now_h=now_h, return_count_list=return_count_list)
|
|
|
+ elif now_min > 50:
|
|
|
+ log_.info('h_recall data is None, use bottom data!')
|
|
|
+ for cnt in return_count_list:
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, h_timer_check).start()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ # df1 = get_feature_data()
|
|
|
+ # res = cal_score(df=df1)
|
|
|
+ # video_rank(df=res, now_date=datetime.datetime.today())
|
|
|
+ # rank_by_h()
|
|
|
+ h_timer_check()
|