|
@@ -6,7 +6,7 @@ from threading import Timer
|
|
|
from datetime import datetime, timedelta
|
|
|
from get_data import get_data_from_odps
|
|
|
from db_helper import RedisHelper
|
|
|
-from utils import filter_video_status, check_table_partition_exits, filter_video_status_app
|
|
|
+from utils import filter_video_status, check_table_partition_exits, filter_video_status_app, request_post
|
|
|
from config import set_config
|
|
|
from log import Log
|
|
|
|
|
@@ -159,7 +159,7 @@ def cal_score(df, param):
|
|
|
return df
|
|
|
|
|
|
|
|
|
-def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
|
|
|
+def video_rank_h(df, now_date, now_h, rule_key, param, data_key, notify_backend):
|
|
|
"""
|
|
|
获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
|
|
|
:param df:
|
|
@@ -168,6 +168,7 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
|
|
|
:param rule_key: 天级规则数据进入条件
|
|
|
:param param: 天级规则数据进入条件参数
|
|
|
:param data_key: 使用数据标识
|
|
|
+ :param notify_backend: 是否同步给后端标识
|
|
|
:return:
|
|
|
"""
|
|
|
redis_helper = RedisHelper()
|
|
@@ -200,12 +201,13 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
|
|
|
now_dt = datetime.strftime(now_date, '%Y%m%d')
|
|
|
day_video_ids = []
|
|
|
day_recall_result = {}
|
|
|
+ json_data = []
|
|
|
for video_id in filtered_videos:
|
|
|
score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
|
|
|
day_recall_result[int(video_id)] = float(score)
|
|
|
day_video_ids.append(int(video_id))
|
|
|
- # h_24h_recall_key_name = \
|
|
|
- # f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}:{data_key}:{rule_key}:{now_dt}:{now_h}"
|
|
|
+ json_data.append({'videoId': video_id, 'rovScore': score})
|
|
|
+
|
|
|
h_24h_recall_key_name = \
|
|
|
f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{rule_key}:{now_dt}:{now_h}"
|
|
|
|
|
@@ -215,7 +217,16 @@ def video_rank_h(df, now_date, now_h, rule_key, param, data_key):
|
|
|
# 清空线上过滤应用列表
|
|
|
# redis_helper.del_keys(key_name=f"{config_.H_VIDEO_FILER_24H}{app_type}.{data_key}.{rule_key}")
|
|
|
|
|
|
- # if rule_key in ['rule3', 'rule4']:
|
|
|
+ # 通知后端更新兜底视频数据
|
|
|
+ if notify_backend is True:
|
|
|
+ log_.info('json_data count = {}'.format(len(json_data)))
|
|
|
+ result = request_post(request_url=config_.NOTIFY_BACKEND_updateFallBackVideoList_URL,
|
|
|
+ request_data={'videos': json_data})
|
|
|
+ if result['code'] == 0:
|
|
|
+ log_.info('notify backend updateFallBackVideoList success!')
|
|
|
+ else:
|
|
|
+ log_.error('notify backend updateFallBackVideoList fail!')
|
|
|
+
|
|
|
# 去重筛选结果,保留剩余数据并写入Redis
|
|
|
all_videos = df['videoid'].to_list()
|
|
|
log_.info(f'h_by24h_recall all videos count = {len(all_videos)}')
|
|
@@ -317,6 +328,7 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
|
|
|
|
|
|
for param in rule_params.get('params_list'):
|
|
|
score_df_list = []
|
|
|
+ notify_backend = param.get('notify_backend', False)
|
|
|
data_key = param.get('data')
|
|
|
data_param = data_params_item.get(data_key)
|
|
|
log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
@@ -338,14 +350,16 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
|
|
|
# 更新平台回流比
|
|
|
df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
|
|
|
video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
|
|
|
- rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key,
|
|
|
+ notify_backend=notify_backend)
|
|
|
|
|
|
else:
|
|
|
df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
|
|
|
df_merged = reduce(merge_df, df_list)
|
|
|
score_df = cal_score(df=df_merged, param=rule_param)
|
|
|
video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
|
|
|
- rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key,
|
|
|
+ notify_backend=notify_backend)
|
|
|
|
|
|
# # to-csv
|
|
|
# score_filename = f"score_by24h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"
|
|
@@ -426,7 +440,7 @@ def h_timer_check():
|
|
|
log_.info(f'h_by24h_data_count = {h_data_count}')
|
|
|
# 数据准备好,进行更新
|
|
|
rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
|
|
|
- elif now_min > 50:
|
|
|
+ elif now_min > 45:
|
|
|
log_.info('h_by24h_recall data is None!')
|
|
|
h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
|
|
|
else:
|