|  | @@ -1,4 +1,5 @@
 | 
	
		
			
				|  |  |  import pandas as pd
 | 
	
		
			
				|  |  | +import multiprocessing
 | 
	
		
			
				|  |  |  import math
 | 
	
		
			
				|  |  |  import traceback
 | 
	
		
			
				|  |  |  from functools import reduce
 | 
	
	
		
			
				|  | @@ -303,6 +304,43 @@ def merge_df_with_score(df_left, df_right):
 | 
	
		
			
				|  |  |      return df_merged[feature_list]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +def process_with_param(param, data_params_item, rule_params_item, feature_df, now_date, now_h):
 | 
	
		
			
				|  |  | +    log_.info(f"param = {param} start...")
 | 
	
		
			
				|  |  | +    score_df_list = []
 | 
	
		
			
				|  |  | +    notify_backend = param.get('notify_backend', False)
 | 
	
		
			
				|  |  | +    data_key = param.get('data')
 | 
	
		
			
				|  |  | +    data_param = data_params_item.get(data_key)
 | 
	
		
			
				|  |  | +    log_.info(f"data_key = {data_key}, data_param = {data_param}")
 | 
	
		
			
				|  |  | +    rule_key = param.get('rule')
 | 
	
		
			
				|  |  | +    rule_param = rule_params_item.get(rule_key)
 | 
	
		
			
				|  |  | +    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
 | 
	
		
			
				|  |  | +    # cal_score_func = rule_param.get('cal_score_func', 1)
 | 
	
		
			
				|  |  | +    merge_func = rule_param.get('merge_func', 1)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if merge_func == 2:
 | 
	
		
			
				|  |  | +        for apptype, weight in data_param.items():
 | 
	
		
			
				|  |  | +            df = feature_df[feature_df['apptype'] == apptype]
 | 
	
		
			
				|  |  | +            # 计算score
 | 
	
		
			
				|  |  | +            score_df = cal_score(df=df, param=rule_param)
 | 
	
		
			
				|  |  | +            score_df['score'] = score_df['score'] * weight
 | 
	
		
			
				|  |  | +            score_df_list.append(score_df)
 | 
	
		
			
				|  |  | +        # 分数合并
 | 
	
		
			
				|  |  | +        df_merged = reduce(merge_df_with_score, score_df_list)
 | 
	
		
			
				|  |  | +        # 更新平台回流比
 | 
	
		
			
				|  |  | +        df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
 | 
	
		
			
				|  |  | +        video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | +                     rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | +                     notify_backend=notify_backend)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +        df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
 | 
	
		
			
				|  |  | +        df_merged = reduce(merge_df, df_list)
 | 
	
		
			
				|  |  | +        score_df = cal_score(df=df_merged, param=rule_param)
 | 
	
		
			
				|  |  | +        video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | +                     rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | +                     notify_backend=notify_backend)
 | 
	
		
			
				|  |  | +    log_.info(f"param = {param} end!")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  def rank_by_h(now_date, now_h, rule_params, project, table):
 | 
	
		
			
				|  |  |      # 获取特征数据
 | 
	
		
			
				|  |  |      feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
 | 
	
	
		
			
				|  | @@ -330,40 +368,49 @@ def rank_by_h(now_date, now_h, rule_params, project, table):
 | 
	
		
			
				|  |  |          video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  |                       rule_key=rule_key, param=rule_param, data_key=data_key)
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    for param in rule_params.get('params_list'):
 | 
	
		
			
				|  |  | -        score_df_list = []
 | 
	
		
			
				|  |  | -        notify_backend = param.get('notify_backend', False)
 | 
	
		
			
				|  |  | -        data_key = param.get('data')
 | 
	
		
			
				|  |  | -        data_param = data_params_item.get(data_key)
 | 
	
		
			
				|  |  | -        log_.info(f"data_key = {data_key}, data_param = {data_param}")
 | 
	
		
			
				|  |  | -        rule_key = param.get('rule')
 | 
	
		
			
				|  |  | -        rule_param = rule_params_item.get(rule_key)
 | 
	
		
			
				|  |  | -        log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
 | 
	
		
			
				|  |  | -        # cal_score_func = rule_param.get('cal_score_func', 1)
 | 
	
		
			
				|  |  | -        merge_func = rule_param.get('merge_func', 1)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        if merge_func == 2:
 | 
	
		
			
				|  |  | -            for apptype, weight in data_param.items():
 | 
	
		
			
				|  |  | -                df = feature_df[feature_df['apptype'] == apptype]
 | 
	
		
			
				|  |  | -                # 计算score
 | 
	
		
			
				|  |  | -                score_df = cal_score(df=df, param=rule_param)
 | 
	
		
			
				|  |  | -                score_df['score'] = score_df['score'] * weight
 | 
	
		
			
				|  |  | -                score_df_list.append(score_df)
 | 
	
		
			
				|  |  | -            # 分数合并
 | 
	
		
			
				|  |  | -            df_merged = reduce(merge_df_with_score, score_df_list)
 | 
	
		
			
				|  |  | -            # 更新平台回流比
 | 
	
		
			
				|  |  | -            df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
 | 
	
		
			
				|  |  | -            video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | -                         rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | -                         notify_backend=notify_backend)
 | 
	
		
			
				|  |  | -        else:
 | 
	
		
			
				|  |  | -            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
 | 
	
		
			
				|  |  | -            df_merged = reduce(merge_df, df_list)
 | 
	
		
			
				|  |  | -            score_df = cal_score(df=df_merged, param=rule_param)
 | 
	
		
			
				|  |  | -            video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | -                         rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | -                         notify_backend=notify_backend)
 | 
	
		
			
				|  |  | +    params_list = rule_params.get('params_list')
 | 
	
		
			
				|  |  | +    pool = multiprocessing.Pool(processes=len(params_list))
 | 
	
		
			
				|  |  | +    for param in params_list:
 | 
	
		
			
				|  |  | +        pool.apply_async(
 | 
	
		
			
				|  |  | +            func=process_with_param,
 | 
	
		
			
				|  |  | +            args=(param, data_params_item, rule_params_item, feature_df, now_date, now_h)
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  | +    pool.close()
 | 
	
		
			
				|  |  | +    pool.join()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # for param in rule_params.get('params_list'):
 | 
	
		
			
				|  |  | +    #     score_df_list = []
 | 
	
		
			
				|  |  | +    #     notify_backend = param.get('notify_backend', False)
 | 
	
		
			
				|  |  | +    #     data_key = param.get('data')
 | 
	
		
			
				|  |  | +    #     data_param = data_params_item.get(data_key)
 | 
	
		
			
				|  |  | +    #     log_.info(f"data_key = {data_key}, data_param = {data_param}")
 | 
	
		
			
				|  |  | +    #     rule_key = param.get('rule')
 | 
	
		
			
				|  |  | +    #     rule_param = rule_params_item.get(rule_key)
 | 
	
		
			
				|  |  | +    #     log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
 | 
	
		
			
				|  |  | +    #     # cal_score_func = rule_param.get('cal_score_func', 1)
 | 
	
		
			
				|  |  | +    #     merge_func = rule_param.get('merge_func', 1)
 | 
	
		
			
				|  |  | +    #
 | 
	
		
			
				|  |  | +    #     if merge_func == 2:
 | 
	
		
			
				|  |  | +    #         for apptype, weight in data_param.items():
 | 
	
		
			
				|  |  | +    #             df = feature_df[feature_df['apptype'] == apptype]
 | 
	
		
			
				|  |  | +    #             # 计算score
 | 
	
		
			
				|  |  | +    #             score_df = cal_score(df=df, param=rule_param)
 | 
	
		
			
				|  |  | +    #             score_df['score'] = score_df['score'] * weight
 | 
	
		
			
				|  |  | +    #             score_df_list.append(score_df)
 | 
	
		
			
				|  |  | +    #         # 分数合并
 | 
	
		
			
				|  |  | +    #         df_merged = reduce(merge_df_with_score, score_df_list)
 | 
	
		
			
				|  |  | +    #         # 更新平台回流比
 | 
	
		
			
				|  |  | +    #         df_merged['platform_return_rate'] = df_merged['platform_return'] / df_merged['回流人数']
 | 
	
		
			
				|  |  | +    #         video_rank_h(df=df_merged, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | +    #                      rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | +    #                      notify_backend=notify_backend)
 | 
	
		
			
				|  |  | +    #     else:
 | 
	
		
			
				|  |  | +    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype, _ in data_param.items()]
 | 
	
		
			
				|  |  | +    #         df_merged = reduce(merge_df, df_list)
 | 
	
		
			
				|  |  | +    #         score_df = cal_score(df=df_merged, param=rule_param)
 | 
	
		
			
				|  |  | +    #         video_rank_h(df=score_df, now_date=now_date, now_h=now_h,
 | 
	
		
			
				|  |  | +    #                      rule_key=rule_key, param=rule_param, data_key=data_key,
 | 
	
		
			
				|  |  | +    #                      notify_backend=notify_backend)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      #     # to-csv
 | 
	
		
			
				|  |  |      #     score_filename = f"score_by24h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"
 |