| 
					
				 | 
			
			
				@@ -137,7 +137,7 @@ def cal_score(df, param): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     return df 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     :param df: 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -182,7 +182,7 @@ def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_recall_result[int(video_id)] = float(score) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_video_ids.append(int(video_id)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     h_recall_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     if len(h_recall_result) > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -197,7 +197,7 @@ def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # 与其他召回视频池去重,存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                  region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key, by_48h_rule_key=by_48h_rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                 region=region, app_type=app_type, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                 region=region, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def dup_data(h_video_ids, initial_key_name, dup_key_name, region): 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -223,14 +223,14 @@ def dup_data(h_video_ids, initial_key_name, dup_key_name, region): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     return h_video_ids 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key, region, app_type, data_key, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key, region, data_key, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """将地域分组小时级数据与其他召回视频池去重,存入对应的redis""" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # ##### 去重更新地域分组小时级24h列表,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     region_24h_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{app_type}:{data_key}:{region_24h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     region_24h_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                            dup_key_name=region_24h_dup_key_name, region=region) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -238,40 +238,40 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     if rule_rank_h_flag == '48h': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         # ##### 去重小程序相对48h更新结果,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{app_type}:{data_key}:{by_48h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                          f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_48h_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                dup_key_name=h_48h_dup_key_name, region=region) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if by_48h_rule_key == 'rule1': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{app_type}:{data_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                    f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             other_h_48h_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                    dup_key_name=other_h_48h_dup_key_name, region=region) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         # ##### 去重小程序相对24h更新结果,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}:{data_key}:{by_24h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                          f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_24h_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                dup_key_name=h_24h_dup_key_name, region=region) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if by_24h_rule_key == 'rule3': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{app_type}:{data_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                    f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             other_h_24h_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                    dup_key_name=other_h_24h_dup_key_name, region=region) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -279,7 +279,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # ##### 去重小程序模型更新结果,并另存为redis中 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     model_key_name = get_rov_redis_key(now_date=now_date) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     model_data_dup_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}:{app_type}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}:{data_key}:{rule_key}:" \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=model_key_name, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                            dup_key_name=model_data_dup_key_name, region=region) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -458,14 +458,14 @@ def merge_df(df_left, df_right): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     return df_merged[feature_list] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def process_with_region(region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     log_.info(f"region = {region} start...") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # 计算score 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     region_df = df_merged[df_merged['code'] == region] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     log_.info(f'region = {region}, region_df count = {len(region_df)}') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     score_df = cal_score(df=region_df, param=rule_param) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-               region=region, app_type=app_type, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+               region=region, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     log_.info(f"region = {region} end!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -514,23 +514,54 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # gevent.joinall(task_list) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def process_with_param(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"param = {param} start...") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    data_key = param.get('data') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    data_param = data_params_item.get(data_key) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"data_key = {data_key}, data_param = {data_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    df_merged = reduce(merge_df, df_list) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rule_key = param.get('rule') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rule_param = rule_params_item.get(rule_key) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    task_list = [ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        gevent.spawn(process_with_region, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                     region, df_merged, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for region in region_code_list 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    gevent.joinall(task_list) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log_.info(f"param = {param} end!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, rule_rank_h_flag): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # 获取特征数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     feature_df = get_feature_data(project=project, table=table, now_date=now_date) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     feature_df['apptype'] = feature_df['apptype'].astype(int) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # t = [ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    #     gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    #     for app_type, params in rule_params.items() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # ] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # gevent.joinall(t) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    pool = multiprocessing.Pool(processes=len(config_.APP_TYPE)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    for app_type, params in rule_params.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        pool.apply_async(func=process_with_app_type, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                         args=(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    data_params_item = rule_params.get('data_params') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rule_params_item = rule_params.get('rule_params') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    params_list = rule_params.get('params_list') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    pool = multiprocessing.Pool(processes=len(params_list)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for param in params_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool.apply_async( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            func=process_with_param, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            args=(param, data_params_item, rule_params_item, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     pool.close() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     pool.join() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # pool = multiprocessing.Pool(processes=len(config_.APP_TYPE)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # for app_type, params in rule_params.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #     pool.apply_async(func=process_with_app_type, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    #                      args=(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # pool.close() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # pool.join() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     for app_type, params in rule_params.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         log_.info(f"app_type = {app_type} start...") 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -610,71 +641,36 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_fl 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # 以上一小时的地域分组数据作为当前小时的数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    for app_type, params in rule_params.items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        log_.info(f"app_type = {app_type}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        rule_params_item = params.get('rule_params') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        for param in params.get('params_list'): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            data_key = param.get('data') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            rule_key = param.get('rule') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            rule_param = rule_params_item.get(rule_key) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            by_24h_rule_key = rule_param.get('24h_rule_key', None) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            by_48h_rule_key = rule_param.get('48h_rule_key', None) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            for region in region_code_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                log_.info(f"region = {region}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                key_name = f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                if initial_data is None: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    initial_data = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                final_data = dict() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                h_video_ids = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                for video_id, score in initial_data: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    final_data[video_id] = score 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    h_video_ids.append(int(video_id)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                # 存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                final_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                if len(final_data) > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                # 清空线上过滤应用列表 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                # redis_helper.del_keys( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                #     key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                # 与其他召回视频池去重,存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                             region_24h_rule_key=region_24h_rule_key, region=region, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                             app_type=app_type, data_key=data_key, by_24h_rule_key=by_24h_rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                             by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        # for data_key, data_param in params['data_params'].items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #     log_.info(f"data_key = {data_key}, data_param = {data_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #     for rule_key, rule_param in params['rule_params'].items(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #         log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #         region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #         for region in region_code_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             log_.info(f"region = {region}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             key_name = f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{redis_dt}.{redis_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             if initial_data is None: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                 initial_data = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             final_data = dict() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             h_video_ids = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             for video_id, score in initial_data: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                 final_data[video_id] = score 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                 h_video_ids.append(int(video_id)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             # 存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             final_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                 f"{key_prefix}{region}.{app_type}.{data_key}.{rule_key}.{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             if len(final_data) > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                 redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             # 清空线上过滤应用列表 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             redis_helper.del_keys(key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             # 与其他召回视频池去重,存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #             dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                          region_24h_rule_key=region_24h_rule_key, region=region, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        #                          app_type=app_type, data_key=data_key) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rule_params_item = rule_params.get('rule_params') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    for param in rule_params.get('params_list'): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        data_key = param.get('data') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        rule_key = param.get('rule') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        rule_param = rule_params_item.get(rule_key) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        by_24h_rule_key = rule_param.get('24h_rule_key', None) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        by_48h_rule_key = rule_param.get('48h_rule_key', None) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for region in region_code_list: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            log_.info(f"region = {region}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{redis_dt}:{redis_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if initial_data is None: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                initial_data = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            final_data = dict() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            h_video_ids = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for video_id, score in initial_data: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                final_data[video_id] = score 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                h_video_ids.append(int(video_id)) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            final_key_name = \ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if len(final_data) > 0: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=23 * 3600) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # 与其他召回视频池去重,存入对应的redis 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         region_24h_rule_key=region_24h_rule_key, region=region, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         data_key=data_key, by_24h_rule_key=by_24h_rule_key, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                         by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 def h_timer_check(): 
			 |