|
@@ -18,8 +18,7 @@ log_ = Log()
|
|
|
|
|
|
RULE_PARAMS = {
|
|
|
'rule_params': {
|
|
|
- 'rule66': {'cal_score_func': 2, 'return_count': 100, 'platform_return_rate': 0.001,
|
|
|
- 'view_type': 'preview'},
|
|
|
+ 'rule66': {'cal_score_func': 2, 'return_count': 100, 'platform_return_rate': 0.001, 'view_type': 'preview'},
|
|
|
},
|
|
|
'data_params': config_.DATA_PARAMS,
|
|
|
'params_list': [
|
|
@@ -186,7 +185,7 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key, notify_backend)
|
|
|
:return:
|
|
|
"""
|
|
|
redis_helper = RedisHelper()
|
|
|
- log_.info(f"videos_count = {len(df)}")
|
|
|
+ log_.info(f"一共有多少个视频 = {len(df)}")
|
|
|
|
|
|
|
|
|
df = df.sort_values(by=['score'], ascending=False)
|
|
@@ -199,49 +198,46 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key, notify_backend)
|
|
|
day_recall_df = df[df['回流人数'] > return_count]
|
|
|
else:
|
|
|
day_recall_df = df
|
|
|
+ log_.info(f"回流量-过滤后,一共有多少个视频 = {len(day_recall_df)}")
|
|
|
platform_return_rate = param.get('platform_return_rate', 0)
|
|
|
day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] > platform_return_rate]
|
|
|
day_recall_videos = day_recall_df['videoid'].to_list()
|
|
|
- log_.info(f'h_by24h_recall videos count = {len(day_recall_videos)}')
|
|
|
+ log_.info(f"回流率-过滤后,一共有多少个视频 = {len(day_recall_videos)}")
|
|
|
|
|
|
if data_key in ['data7', ]:
|
|
|
filtered_videos = filter_video_status_app(day_recall_videos)
|
|
|
else:
|
|
|
filtered_videos = filter_video_status(day_recall_videos)
|
|
|
- log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
|
|
|
+ log_.info(f"视频状态-过滤后,一共有多少个视频 = {len(filtered_videos)}")
|
|
|
|
|
|
|
|
|
now_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
day_video_ids = []
|
|
|
day_recall_result = {}
|
|
|
-
|
|
|
for video_id in filtered_videos:
|
|
|
score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
|
|
|
day_recall_result[int(video_id)] = float(score)
|
|
|
day_video_ids.append(int(video_id))
|
|
|
-
|
|
|
|
|
|
+
|
|
|
h_24h_recall_key_name = \
|
|
|
f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{rule_key}:{now_dt}:{now_h}"
|
|
|
- log_.info("h_24h_recall_key_name:redis:{}".format(h_24h_recall_key_name))
|
|
|
+ log_.info("打印非地域24小时redis key:{}".format(h_24h_recall_key_name))
|
|
|
if len(day_recall_result) > 0:
|
|
|
- log_.info(f"count = {len(day_recall_result)}, key = {h_24h_recall_key_name}")
|
|
|
+ log_.info(f"开始写入头部数据:count = {len(day_recall_result)}, key = {h_24h_recall_key_name}")
|
|
|
redis_helper.add_data_with_zset(key_name=h_24h_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
|
|
|
-
|
|
|
-
|
|
|
+ else:
|
|
|
+ log_.info(f"无数据,不写入。")
|
|
|
|
|
|
-
|
|
|
+
|
|
|
+ log_.info('开始处理剩余结果other')
|
|
|
all_videos = df['videoid'].to_list()
|
|
|
- log_.info(f'h_by24h_recall all videos count = {len(all_videos)}')
|
|
|
-
|
|
|
if data_key in ['data7', ]:
|
|
|
all_filtered_videos = filter_video_status_app(all_videos)
|
|
|
else:
|
|
|
all_filtered_videos = filter_video_status(all_videos)
|
|
|
- log_.info(f'all_filtered_videos count = {len(all_filtered_videos)}')
|
|
|
-
|
|
|
other_videos = [video for video in all_filtered_videos if video not in day_video_ids]
|
|
|
- log_.info(f'other_videos count = {len(other_videos)}')
|
|
|
+ log_.info(f'过滤后剩余视频数量 count = {len(other_videos)}')
|
|
|
|
|
|
other_24h_recall_result = {}
|
|
|
json_data = []
|
|
@@ -249,18 +245,20 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key, notify_backend)
|
|
|
score = df[df['videoid'] == video_id]['score']
|
|
|
other_24h_recall_result[int(video_id)] = float(score)
|
|
|
json_data.append({'videoId': video_id, 'rovScore': float(score)})
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
other_h_24h_recall_key_name = \
|
|
|
f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:{rule_key}:{now_dt}:{now_h}"
|
|
|
+ log_.info("打印非地域24小时(剩余)redis key:{}".format(other_h_24h_recall_key_name))
|
|
|
if len(other_24h_recall_result) > 0:
|
|
|
- log_.info(f"count = {len(other_24h_recall_result)}")
|
|
|
+ log_.info(f"开始写入尾部数据:count = {len(other_24h_recall_result)}, key = {other_h_24h_recall_key_name}")
|
|
|
redis_helper.add_data_with_zset(key_name=other_h_24h_recall_key_name, data=other_24h_recall_result,
|
|
|
expire_time=2 * 3600)
|
|
|
+ else:
|
|
|
+ log_.info(f"无尾部数据,不写入。")
|
|
|
+
|
|
|
|
|
|
if notify_backend is True:
|
|
|
log_.info('json_data count = {}'.format(len(json_data[:5000])))
|
|
|
-
|
|
|
result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL,
|
|
|
request_data={'videos': json_data[:5000]})
|
|
|
if result is None:
|
|
@@ -322,38 +320,17 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
|
|
|
|
|
|
data_params_item = rule_params.get('data_params')
|
|
|
rule_params_item = rule_params.get('rule_params')
|
|
|
- """
|
|
|
- for param in rule_params.get('params_list'):
|
|
|
- data_key = param.get('data')
|
|
|
- data_param = data_params_item.get(data_key)
|
|
|
- log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
- df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
- df_merged = reduce(merge_df, df_list)
|
|
|
-
|
|
|
- rule_key = param.get('rule')
|
|
|
- rule_param = rule_params_item.get(rule_key)
|
|
|
- log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
-
|
|
|
- cal_score_func = rule_param.get('cal_score_func', 1)
|
|
|
- if cal_score_func == 2:
|
|
|
- score_df = cal_score2(df=df_merged, param=rule_param)
|
|
|
- else:
|
|
|
- score_df = cal_score1(df=df_merged)
|
|
|
- video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
|
|
|
- rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
- """
|
|
|
|
|
|
for param in rule_params.get('params_list'):
|
|
|
score_df_list = []
|
|
|
notify_backend = param.get('notify_backend', False)
|
|
|
data_key = param.get('data')
|
|
|
data_param = data_params_item.get(data_key)
|
|
|
- log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
rule_key = param.get('rule')
|
|
|
rule_param = rule_params_item.get(rule_key)
|
|
|
- log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
-
|
|
|
merge_func = rule_param.get('merge_func', 1)
|
|
|
+ log_.info("数据采用:{},统计采用{}.".format(data_key, rule_key))
|
|
|
+ log_.info("具体的规则是:{}.".format(rule_param))
|
|
|
|
|
|
if merge_func == 2:
|
|
|
for apptype, weight in data_param.items():
|
|
@@ -417,29 +394,7 @@ def h_rank_bottom(now_date, now_h, rule_params):
|
|
|
if len(final_data) > 0:
|
|
|
redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
|
|
|
|
|
|
- """
|
|
|
- for app_type, params in rule_params.items():
|
|
|
- log_.info(f"app_type = {app_type}")
|
|
|
- for param in params.get('params_list'):
|
|
|
- data_key = param.get('data')
|
|
|
- rule_key = param.get('rule')
|
|
|
- log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
|
|
|
- for key_prefix in key_prefix_list:
|
|
|
- key_name = f"{key_prefix}{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
|
|
|
- initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
|
|
|
- if initial_data is None:
|
|
|
- initial_data = []
|
|
|
- final_data = dict()
|
|
|
- for video_id, score in initial_data:
|
|
|
- final_data[video_id] = score
|
|
|
-
|
|
|
- final_key_name = \
|
|
|
- f"{key_prefix}{app_type}:{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
- if len(final_data) > 0:
|
|
|
- redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
|
|
|
-
|
|
|
-
|
|
|
- """
|
|
|
+
|
|
|
|
|
|
def h_timer_check():
|
|
|
try:
|
|
@@ -447,26 +402,26 @@ def h_timer_check():
|
|
|
table = config_.TABLE_24H_APP_TYPE
|
|
|
rule_params = RULE_PARAMS
|
|
|
now_date = datetime.today()
|
|
|
- log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
+ log_.info(f"开始执行: {datetime.strftime(now_date, '%Y%m%d%H')}")
|
|
|
now_min = datetime.now().minute
|
|
|
now_h = datetime.now().hour
|
|
|
|
|
|
h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
|
|
|
if now_h == 23 or now_h < 8:
|
|
|
- log_.info(f'now_h = {now_h} use bottom data!')
|
|
|
+ log_.info("当前时间{}小时,使用bottom的data,开始。".format(now_h))
|
|
|
h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
|
- log_.info(f"24h_data end!")
|
|
|
+ log_.info("当前时间{}小时,使用bottom的data,完成。".format(now_h))
|
|
|
elif h_data_count > 0:
|
|
|
- log_.info(f'h_by24h_data_count = {h_data_count}')
|
|
|
-
|
|
|
+ log_.info('上游数据表查询数据条数 h_data_count = {},开始计算。'.format(h_data_count))
|
|
|
rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
|
|
|
- log_.info(f"24h_data end!")
|
|
|
+ log_.info("----------正常完成----------")
|
|
|
elif now_min > 40:
|
|
|
- log_.info('h_by24h_recall data is None, use bottom data!')
|
|
|
+ log_.info('当前分钟超过40,预计执行无法完成,使用 bottom data!')
|
|
|
h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
|
- log_.info(f"24h_data end!")
|
|
|
+ log_.info('当前分钟超过40,完成。')
|
|
|
else:
|
|
|
|
|
|
+ log_.info("上游数据未就绪,等待...")
|
|
|
Timer(60, h_timer_check).start()
|
|
|
|
|
|
except Exception as e:
|
|
@@ -481,5 +436,6 @@ def h_timer_check():
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- log_.info(f"24h_data start...")
|
|
|
+ log_.info("文件alg_recsys_recall_24h_noregion.py:「24小时无地域」 开始执行")
|
|
|
h_timer_check()
|
|
|
+ log_.info("文件alg_recsys_recall_24h_noregion.py:「24小时无地域」 执行完毕")
|