|
@@ -68,6 +68,17 @@ features = [
|
|
|
]
|
|
|
|
|
|
|
|
|
+def get_rov_redis_key(now_date):
|
|
|
+ """获取rov模型结果存放key"""
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ now_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{now_dt}'
|
|
|
+ if not redis_helper.key_exists(key_name=key_name):
|
|
|
+ pre_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
|
+ key_name = f'{config_.RECALL_KEY_NAME_PREFIX}{pre_dt}'
|
|
|
+ return key_name
|
|
|
+
|
|
|
+
|
|
|
def data_check(project, table, now_date):
|
|
|
"""检查数据是否准备好"""
|
|
|
odps = ODPS(
|
|
@@ -157,11 +168,14 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
|
|
|
|
|
|
|
|
|
+ h_video_ids = []
|
|
|
day_recall_result = {}
|
|
|
for video_id in filtered_videos:
|
|
|
score = h_recall_df[h_recall_df['videoid'] == video_id]['score']
|
|
|
|
|
|
day_recall_result[int(video_id)] = float(score)
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+
|
|
|
day_recall_key_name = \
|
|
|
f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}.{rule_key}." \
|
|
|
f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
@@ -169,6 +183,8 @@ def video_rank(df, now_date, now_h, rule_key, param, region):
|
|
|
redis_helper.add_data_with_zset(key_name=day_recall_key_name, data=day_recall_result, expire_time=23 * 3600)
|
|
|
|
|
|
redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
+
|
|
|
+ dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
|
|
|
|
|
|
|
def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
@@ -195,6 +211,79 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
"score_df": score_df[['videoid', 'score']]})
|
|
|
|
|
|
|
|
|
+def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region):
|
|
|
+ """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+
|
|
|
+
|
|
|
+ day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_DAY}rule2.{datetime.datetime.strftime(now_date, '%Y%m%d')}"
|
|
|
+ if redis_helper.key_exists(key_name=day_key_name):
|
|
|
+ day_data = redis_helper.get_data_zset_with_index(
|
|
|
+ key_name=day_key_name, start=0, end=-1, with_scores=True)
|
|
|
+ log_.info(f'day data count = {len(day_data)}')
|
|
|
+ day_dup = {}
|
|
|
+ for video_id, score in day_data:
|
|
|
+ if int(video_id) not in h_video_ids:
|
|
|
+ day_dup[int(video_id)] = score
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+ log_.info(f"day data dup count = {len(day_dup)}")
|
|
|
+ day_dup_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H}{region}.{rule_key}." \
|
|
|
+ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(day_dup) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+ model_key_name = get_rov_redis_key(now_date=now_date)
|
|
|
+ model_data = redis_helper.get_data_zset_with_index(key_name=model_key_name, start=0, end=-1, with_scores=True)
|
|
|
+ log_.info(f'model data count = {len(model_data)}')
|
|
|
+ model_data_dup = {}
|
|
|
+ for video_id, score in model_data:
|
|
|
+ if int(video_id) not in h_video_ids:
|
|
|
+ model_data_dup[int(video_id)] = score
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+ log_.info(f"model data dup count = {len(model_data_dup)}")
|
|
|
+ model_data_dup_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H}{region}.{rule_key}." \
|
|
|
+ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(model_data_dup) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
|
|
|
+
|
|
|
+
|
|
|
+def h_rank_bottom(now_date, now_h, rule_key, region_code_list):
|
|
|
+ """未按时更新数据,用上一小时结果作为当前小时的数据"""
|
|
|
+ log_.info(f"rule_key = {rule_key}")
|
|
|
+
|
|
|
+ redis_helper = RedisHelper()
|
|
|
+ if now_h == 0:
|
|
|
+ redis_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
|
|
|
+ redis_h = 23
|
|
|
+ else:
|
|
|
+ redis_dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ redis_h = now_h - 1
|
|
|
+
|
|
|
+
|
|
|
+ key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H
|
|
|
+ for region in region_code_list:
|
|
|
+ log_.info(f"region = {region}")
|
|
|
+ key_name = f"{key_prefix}{region}.{rule_key}.{redis_dt}.{redis_h}"
|
|
|
+ initial_data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1, with_scores=True)
|
|
|
+ final_data = dict()
|
|
|
+ h_video_ids = []
|
|
|
+ for video_id, score in initial_data:
|
|
|
+ final_data[video_id] = score
|
|
|
+ h_video_ids.append(int(video_id))
|
|
|
+
|
|
|
+ final_key_name = \
|
|
|
+ f"{key_prefix}{region}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
|
|
|
+ if len(final_data) > 0:
|
|
|
+ redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600)
|
|
|
+
|
|
|
+ redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{rule_key}")
|
|
|
+
|
|
|
+ dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, region=region)
|
|
|
+
|
|
|
+
|
|
|
def h_timer_check():
|
|
|
rule_params = config_.RULE_PARAMS_REGION_24H
|
|
|
project = config_.PROJECT_REGION_24H
|