|
@@ -197,6 +197,21 @@ def merge_df(df_left, df_right):
|
|
return df_merged[feature_list]
|
|
return df_merged[feature_list]
|
|
|
|
|
|
|
|
|
|
|
|
+def merge_df_with_score(df_left, df_right):
|
|
|
|
+ """
|
|
|
|
+ df 按照videoid合并,平台回流人数、回流人数、分数 分别求和
|
|
|
|
+ :param df_left:
|
|
|
|
+ :param df_right:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ df_merged = pd.merge(df_left, df_right, on=['videoid', 'code'], how='outer', suffixes=['_x', '_y'])
|
|
|
|
+ df_merged.fillna(0, inplace=True)
|
|
|
|
+ feature_list = ['videoid', 'code', 'lastday_return', 'platform_return', 'score']
|
|
|
|
+ for feature in feature_list[2:]:
|
|
|
|
+ df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
|
|
|
|
+ return df_merged[feature_list]
|
|
|
|
+
|
|
|
|
+
|
|
def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
|
|
def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
|
|
log_.info(f"region = {region} start...")
|
|
log_.info(f"region = {region} start...")
|
|
# 计算score
|
|
# 计算score
|
|
@@ -208,6 +223,15 @@ def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_d
|
|
log_.info(f"region = {region} end!")
|
|
log_.info(f"region = {region} end!")
|
|
|
|
|
|
|
|
|
|
|
|
+def process_with_region2(region, df_merged, data_key, rule_key, rule_param, now_date, now_h):
|
|
|
|
+ log_.info(f"region = {region} start...")
|
|
|
|
+ region_score_df = df_merged[df_merged['code'] == region]
|
|
|
|
+ log_.info(f'region = {region}, region_score_df count = {len(region_score_df)}')
|
|
|
|
+ video_rank(df=region_score_df, now_date=now_date, now_h=now_h, region=region,
|
|
|
|
+ rule_key=rule_key, param=rule_param, data_key=data_key)
|
|
|
|
+ log_.info(f"region = {region} end!")
|
|
|
|
+
|
|
|
|
+
|
|
def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
|
|
def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
|
|
log_.info(f"app_type = {app_type} start...")
|
|
log_.info(f"app_type = {app_type} start...")
|
|
data_params_item = params.get('data_params')
|
|
data_params_item = params.get('data_params')
|
|
@@ -233,22 +257,38 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
|
|
|
|
|
|
def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h):
|
|
def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h):
|
|
log_.info(f"param = {param} start...")
|
|
log_.info(f"param = {param} start...")
|
|
-
|
|
|
|
data_key = param.get('data')
|
|
data_key = param.get('data')
|
|
data_param = data_params_item.get(data_key)
|
|
data_param = data_params_item.get(data_key)
|
|
log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
log_.info(f"data_key = {data_key}, data_param = {data_param}")
|
|
- df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
|
- df_merged = reduce(merge_df, df_list)
|
|
|
|
-
|
|
|
|
rule_key = param.get('rule')
|
|
rule_key = param.get('rule')
|
|
rule_param = rule_params_item.get(rule_key)
|
|
rule_param = rule_params_item.get(rule_key)
|
|
log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
- task_list = [
|
|
|
|
- gevent.spawn(process_with_region, region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
|
|
|
|
- for region in region_code_list
|
|
|
|
- ]
|
|
|
|
- gevent.joinall(task_list)
|
|
|
|
|
|
+ merge_func = rule_param.get('merge_func', None)
|
|
|
|
+ if merge_func == 2:
|
|
|
|
+ score_df_list = []
|
|
|
|
+ for apptype, weight in data_param.items():
|
|
|
|
+ df = feature_df[feature_df['apptype'] == apptype]
|
|
|
|
+ # 计算score
|
|
|
|
+ score_df = cal_score(df=df, param=rule_param)
|
|
|
|
+ score_df['score'] = score_df['score'] * weight
|
|
|
|
+ score_df_list.append(score_df)
|
|
|
|
+ # 分数合并
|
|
|
|
+ df_merged = reduce(merge_df_with_score, score_df_list)
|
|
|
|
+ # 更新平台回流比
|
|
|
|
+ df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['lastday_return']
|
|
|
|
+ task_list = [
|
|
|
|
+ gevent.spawn(process_with_region2, region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
|
|
|
|
+ for region in region_code_list
|
|
|
|
+ ]
|
|
|
|
+ else:
|
|
|
|
+ df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
|
+ df_merged = reduce(merge_df, df_list)
|
|
|
|
+ task_list = [
|
|
|
|
+ gevent.spawn(process_with_region, region, df_merged, data_key, rule_key, rule_param, now_date, now_h)
|
|
|
|
+ for region in region_code_list
|
|
|
|
+ ]
|
|
|
|
|
|
|
|
+ gevent.joinall(task_list)
|
|
log_.info(f"param = {param} end!")
|
|
log_.info(f"param = {param} end!")
|
|
|
|
|
|
|
|
|
|
@@ -261,7 +301,7 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
rule_params_item = rule_params.get('rule_params')
|
|
rule_params_item = rule_params.get('rule_params')
|
|
params_list = rule_params.get('params_list')
|
|
params_list = rule_params.get('params_list')
|
|
pool = multiprocessing.Pool(processes=len(params_list))
|
|
pool = multiprocessing.Pool(processes=len(params_list))
|
|
- for param in params_list:
|
|
|
|
|
|
+ for param in params_list[1:]:
|
|
pool.apply_async(
|
|
pool.apply_async(
|
|
func=process_with_param,
|
|
func=process_with_param,
|
|
args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h)
|
|
args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h)
|