|
@@ -267,14 +267,14 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
|
|
|
dup_key_name=h_24h_dup_key_name, region=region)
|
|
|
|
|
|
|
|
|
- if by_24h_rule_key == 'rule3':
|
|
|
- other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
|
|
|
- f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
- other_h_24h_dup_key_name = \
|
|
|
- f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
|
|
|
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
- h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
|
|
|
- dup_key_name=other_h_24h_dup_key_name, region=region)
|
|
|
+
|
|
|
+ other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
|
|
|
+ f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
+ other_h_24h_dup_key_name = \
|
|
|
+ f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
|
|
|
+ f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
|
|
|
+ h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
|
|
|
+ dup_key_name=other_h_24h_dup_key_name, region=region)
|
|
|
|
|
|
|
|
|
model_key_name = get_rov_redis_key(now_date=now_date)
|
|
@@ -458,6 +458,21 @@ def merge_df(df_left, df_right):
|
|
|
return df_merged[feature_list]
|
|
|
|
|
|
|
|
|
+def merge_df_with_score(df_left, df_right):
|
|
|
+ """
|
|
|
+ df 按照[videoid, code]合并,平台回流人数、回流人数、分数 分别求和
|
|
|
+ :param df_left:
|
|
|
+ :param df_right:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
+ feature_list = ['videoid', 'code', 'lastonehour_return', 'platform_return', 'score']
|
|
|
+ for feature in feature_list[2:]:
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
+ return df_merged[feature_list]
|
|
|
+
|
|
|
+
|
|
|
def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag):
|
|
|
log_.info(f"region = {region} start...")
|
|
|
|
|
@@ -469,6 +484,14 @@ def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_d
|
|
|
log_.info(f"region = {region} end!")
|
|
|
|
|
|
|
|
|
+def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag):
|
|
|
+ log_.info(f"region = {region} start...")
|
|
|
+ region_score_df = df_merged[df_merged['code'] == region]
|
|
|
+ log_.info(f'region = {region}, region_score_df count = {len(region_score_df)}')
|
|
|
+ video_rank(df=region_score_df, now_date=now_date, now_h=now_h, region=region,
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag)
|
|
|
+ log_.info(f"region = {region} end!")
|
|
|
+
|
|
|
|
|
|
def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
|
|
|
log_.info(f"app_type = {app_type} start...")
|
|
@@ -520,19 +543,38 @@ def process_with_param(param, data_params_item, rule_params_item, region_code_li
|
|
|
data_key = param.get('data')
|
|
|
data_param = data_params_item.get(data_key)
|
|
|
log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
|
- df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
- df_merged = reduce(merge_df, df_list)
|
|
|
-
|
|
|
rule_key = param.get('rule')
|
|
|
rule_param = rule_params_item.get(rule_key)
|
|
|
log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
- task_list = [
|
|
|
- gevent.spawn(process_with_region,
|
|
|
- region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag)
|
|
|
- for region in region_code_list
|
|
|
- ]
|
|
|
- gevent.joinall(task_list)
|
|
|
+ merge_func = rule_param.get('merge_func', None)
|
|
|
+
|
|
|
+ if merge_func == 2:
|
|
|
+ score_df_list = []
|
|
|
+ for apptype, weight in data_param.items():
|
|
|
+ df = feature_df[feature_df['apptype'] == apptype]
|
|
|
+
|
|
|
+ score_df = cal_score(df=df, param=rule_param)
|
|
|
+ score_df['score'] = score_df['score'] * weight
|
|
|
+ score_df_list.append(score_df)
|
|
|
+
|
|
|
+ df_merged = reduce(merge_df_with_score, score_df_list)
|
|
|
+
|
|
|
+ df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastonehour_return']
|
|
|
+ task_list = [
|
|
|
+ gevent.spawn(process_with_region2,
|
|
|
+ region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag)
|
|
|
+ for region in region_code_list
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
+ df_merged = reduce(merge_df, df_list)
|
|
|
+ task_list = [
|
|
|
+ gevent.spawn(process_with_region,
|
|
|
+ region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag)
|
|
|
+ for region in region_code_list
|
|
|
+ ]
|
|
|
|
|
|
+ gevent.joinall(task_list)
|
|
|
log_.info(f"param = {param} end!")
|
|
|
|
|
|
|