|
@@ -94,13 +94,14 @@ def cal_score(df):
|
|
return df
|
|
return df
|
|
|
|
|
|
|
|
|
|
-def video_rank(df, now_date, now_h, return_count):
|
|
|
|
|
|
+def video_rank(df, now_date, now_h, rule_key, param):
|
|
"""
|
|
"""
|
|
获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
|
|
获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
|
|
:param df:
|
|
:param df:
|
|
:param now_date:
|
|
:param now_date:
|
|
:param now_h:
|
|
:param now_h:
|
|
- :param return_count: 小时级数据回流限制数
|
|
|
|
|
|
+ :param rule_key: 小时级数据进入条件
|
|
|
|
+ :param param: 小时级数据进入条件参数
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
# 获取rov模型结果
|
|
# 获取rov模型结果
|
|
@@ -110,7 +111,9 @@ def video_rank(df, now_date, now_h, return_count):
|
|
log_.info(f'initial data count = {len(initial_data)}')
|
|
log_.info(f'initial data count = {len(initial_data)}')
|
|
|
|
|
|
# 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
|
|
# 获取符合进入召回源条件的视频,进入条件:小时级回流>=20 && score>=0.005
|
|
- h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= 0.005)]
|
|
|
|
|
|
+ return_count = param.get('return_count')
|
|
|
|
+ score_value = param.get('score_rule')
|
|
|
|
+ h_recall_df = df[(df['lastonehour_return'] >= return_count) & (df['score'] >= score_value)]
|
|
h_recall_videos = h_recall_df['videoid'].to_list()
|
|
h_recall_videos = h_recall_df['videoid'].to_list()
|
|
log_.info(f'h_recall videos count = {len(h_recall_videos)}')
|
|
log_.info(f'h_recall videos count = {len(h_recall_videos)}')
|
|
# 写入对应的redis
|
|
# 写入对应的redis
|
|
@@ -121,11 +124,11 @@ def video_rank(df, now_date, now_h, return_count):
|
|
h_recall_result[int(video_id)] = float(score)
|
|
h_recall_result[int(video_id)] = float(score)
|
|
h_video_ids.append(int(video_id))
|
|
h_video_ids.append(int(video_id))
|
|
h_recall_key_name = \
|
|
h_recall_key_name = \
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_BY_H}{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
if len(h_recall_result) > 0:
|
|
if len(h_recall_result) > 0:
|
|
redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
|
|
redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
|
|
# 清空线上过滤应用列表
|
|
# 清空线上过滤应用列表
|
|
- redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
|
|
|
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{rule_key}")
|
|
|
|
|
|
# 去重更新rov模型结果,并另存为redis中
|
|
# 去重更新rov模型结果,并另存为redis中
|
|
initial_data_dup = {}
|
|
initial_data_dup = {}
|
|
@@ -134,7 +137,7 @@ def video_rank(df, now_date, now_h, return_count):
|
|
initial_data_dup[int(video_id)] = score
|
|
initial_data_dup[int(video_id)] = score
|
|
log_.info(f"initial data dup count = {len(initial_data_dup)}")
|
|
log_.info(f"initial data dup count = {len(initial_data_dup)}")
|
|
initial_key_name = \
|
|
initial_key_name = \
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H}{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
if len(initial_data_dup) > 0:
|
|
if len(initial_data_dup) > 0:
|
|
redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
|
|
redis_helper.add_data_with_zset(key_name=initial_key_name, data=initial_data_dup, expire_time=23 * 3600)
|
|
|
|
|
|
@@ -156,23 +159,23 @@ def video_rank(df, now_date, now_h, return_count):
|
|
# redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
|
|
# redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=24 * 3600)
|
|
|
|
|
|
|
|
|
|
-def rank_by_h(now_date, now_h, return_count_list):
|
|
|
|
|
|
+def rank_by_h(now_date, now_h, rule_params):
|
|
# 获取特征数据
|
|
# 获取特征数据
|
|
feature_df = get_feature_data(now_date=now_date)
|
|
feature_df = get_feature_data(now_date=now_date)
|
|
# 计算score
|
|
# 计算score
|
|
score_df = cal_score(df=feature_df)
|
|
score_df = cal_score(df=feature_df)
|
|
# rank
|
|
# rank
|
|
- for cnt in return_count_list:
|
|
|
|
- log_.info(f"return_count = {cnt}")
|
|
|
|
- video_rank(df=score_df, now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
|
|
|
+ for key, value in rule_params.items():
|
|
|
|
+ log_.info(f"rule = {key}, param = {value}")
|
|
|
|
+ video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=key, param=value)
|
|
# to-csv
|
|
# to-csv
|
|
score_filename = f"score_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
score_filename = f"score_{datetime.datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
score_df.to_csv(f'./data/{score_filename}')
|
|
score_df.to_csv(f'./data/{score_filename}')
|
|
|
|
|
|
|
|
|
|
-def h_rank_bottom(now_date, now_h, return_count):
|
|
|
|
|
|
+def h_rank_bottom(now_date, now_h, rule_key):
|
|
"""未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
"""未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
- log_.info(f"return_count = {return_count}")
|
|
|
|
|
|
+ log_.info(f"rule_key = {rule_key}")
|
|
# 获取rov模型结果
|
|
# 获取rov模型结果
|
|
redis_helper = RedisHelper()
|
|
redis_helper = RedisHelper()
|
|
if now_h == 0:
|
|
if now_h == 0:
|
|
@@ -183,40 +186,44 @@ def h_rank_bottom(now_date, now_h, return_count):
|
|
redis_h = now_h - 1
|
|
redis_h = now_h - 1
|
|
key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H]
|
|
key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H]
|
|
for key_prefix in key_prefix_list:
|
|
for key_prefix in key_prefix_list:
|
|
- key_name = f"{key_prefix}{return_count}.{redis_dt}.{redis_h}"
|
|
|
|
|
|
+ key_name = f"{key_prefix}{rule_key}.{redis_dt}.{redis_h}"
|
|
initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
final_data = dict()
|
|
final_data = dict()
|
|
for video_id, score in initial_data:
|
|
for video_id, score in initial_data:
|
|
final_data[video_id] = score
|
|
final_data[video_id] = score
|
|
# 存入对应的redis
|
|
# 存入对应的redis
|
|
final_key_name = \
|
|
final_key_name = \
|
|
- f"{key_prefix}{return_count}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
|
|
|
+ f"{key_prefix}{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
if len(final_data) > 0:
|
|
if len(final_data) > 0:
|
|
redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
# 清空线上过滤应用列表
|
|
# 清空线上过滤应用列表
|
|
- redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{return_count}")
|
|
|
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER}{rule_key}")
|
|
|
|
|
|
|
|
|
|
def h_timer_check():
|
|
def h_timer_check():
|
|
- return_count_list = [20, 10]
|
|
|
|
|
|
+ rule_params = {
|
|
|
|
+ 'rule1': {'return_count': 20, 'score_rule': 0.005},
|
|
|
|
+ 'rule2': {'return_count': 20, 'score_rule': 0.001}
|
|
|
|
+ }
|
|
|
|
+ # return_count_list = [20, 10]
|
|
now_date = datetime.datetime.today()
|
|
now_date = datetime.datetime.today()
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
now_h = datetime.datetime.now().hour
|
|
now_h = datetime.datetime.now().hour
|
|
now_min = datetime.datetime.now().minute
|
|
now_min = datetime.datetime.now().minute
|
|
if now_h == 0:
|
|
if now_h == 0:
|
|
- for cnt in return_count_list:
|
|
|
|
- h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
|
|
|
+ for key, _ in rule_params.items():
|
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key)
|
|
return
|
|
return
|
|
# 查看当前小时更新的数据是否已准备好
|
|
# 查看当前小时更新的数据是否已准备好
|
|
h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
h_data_count = h_data_check(project=project, table=table, now_date=now_date)
|
|
if h_data_count > 0:
|
|
if h_data_count > 0:
|
|
log_.info(f'h_data_count = {h_data_count}')
|
|
log_.info(f'h_data_count = {h_data_count}')
|
|
# 数据准备好,进行更新
|
|
# 数据准备好,进行更新
|
|
- rank_by_h(now_date=now_date, now_h=now_h, return_count_list=return_count_list)
|
|
|
|
|
|
+ rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
elif now_min > 50:
|
|
elif now_min > 50:
|
|
log_.info('h_recall data is None, use bottom data!')
|
|
log_.info('h_recall data is None, use bottom data!')
|
|
- for cnt in return_count_list:
|
|
|
|
- h_rank_bottom(now_date=now_date, now_h=now_h, return_count=cnt)
|
|
|
|
|
|
+ for key, _ in rule_params.items():
|
|
|
|
+ h_rank_bottom(now_date=now_date, now_h=now_h, rule_key=key)
|
|
else:
|
|
else:
|
|
# 数据没准备好,1分钟后重新检查
|
|
# 数据没准备好,1分钟后重新检查
|
|
Timer(60, h_timer_check).start()
|
|
Timer(60, h_timer_check).start()
|