Bläddra i källkod

update with Thread

liqian 2 år sedan
förälder
incheckning
42a5ce4526
2 ändrade filer med 90 tillägg och 30 borttagningar
  1. 50 20
      region_rule_rank_h.py
  2. 40 10
      region_rule_rank_h_by24h.py

+ 50 - 20
region_rule_rank_h.py

@@ -10,7 +10,7 @@ import pandas as pd
 import math
 from functools import reduce
 from odps import ODPS
-from threading import Timer
+from threading import Timer, Thread
 from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, check_table_partition_exits
 from config import set_config
 from log import Log
@@ -155,17 +155,17 @@ def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key)
     h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
     h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
     h_recall_videos = h_recall_df['videoid'].to_list()
-    log_.info(f'h_recall videos count = {len(h_recall_videos)}')
+    # log_.info(f'h_recall videos count = {len(h_recall_videos)}')
 
     # 视频状态过滤
     filtered_videos = filter_video_status(h_recall_videos)
-    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
+    # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
 
     # 屏蔽视频过滤
     shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
     if shield_key_name_list is not None:
         filtered_videos = filter_shield_video(video_ids=filtered_videos, shield_key_name_list=shield_key_name_list)
-        log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
+        # log_.info(f"shield filtered_videos count = {len(filtered_videos)}")
 
     # 写入对应的redis
     h_video_ids = []
@@ -222,21 +222,21 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
     if redis_helper.key_exists(key_name=region_24h_key_name):
         region_24h_data = redis_helper.get_all_data_from_zset(key_name=region_24h_key_name, with_scores=True)
-        log_.info(f'region 24h data count = {len(region_24h_data)}')
+        # log_.info(f'region 24h data count = {len(region_24h_data)}')
 
         # 屏蔽视频过滤
         region_24h_video_ids = [int(video_id) for video_id, _ in region_24h_data]
         shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
         if shield_key_name_list is not None:
             region_24h_video_ids = filter_shield_video(video_ids=region_24h_video_ids, shield_key_name_list=shield_key_name_list)
-            log_.info(f"shield filtered_videos count = {len(region_24h_video_ids)}")
+            # log_.info(f"shield filtered_videos count = {len(region_24h_video_ids)}")
 
         region_24h_dup = {}
         for video_id, score in region_24h_data:
             if int(video_id) not in h_video_ids and int(video_id) in region_24h_video_ids:
                 region_24h_dup[int(video_id)] = score
                 h_video_ids.append(int(video_id))
-        log_.info(f"region 24h data dup count = {len(region_24h_dup)}")
+        # log_.info(f"region 24h data dup count = {len(region_24h_dup)}")
         region_24h_dup_key_name = \
             f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
@@ -270,21 +270,21 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
                    f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
     if redis_helper.key_exists(key_name=day_key_name):
         day_data = redis_helper.get_all_data_from_zset(key_name=day_key_name, with_scores=True)
-        log_.info(f'24h data count = {len(day_data)}')
+        # log_.info(f'24h data count = {len(day_data)}')
 
         # 屏蔽视频过滤
         day_video_ids = [int(video_id) for video_id, _ in day_data]
         shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
         if shield_key_name_list is not None:
             day_video_ids = filter_shield_video(video_ids=day_video_ids, shield_key_name_list=shield_key_name_list)
-            log_.info(f"shield filtered_videos count = {len(day_video_ids)}")
+            # log_.info(f"shield filtered_videos count = {len(day_video_ids)}")
 
         day_dup = {}
         for video_id, score in day_data:
             if int(video_id) not in h_video_ids and int(video_id) in day_video_ids:
                 day_dup[int(video_id)] = score
                 h_video_ids.append(int(video_id))
-        log_.info(f"24h data dup count = {len(day_dup)}")
+        # log_.info(f"24h data dup count = {len(day_dup)}")
         day_dup_key_name = \
             f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}.{app_type}.{data_key}.{rule_key}." \
             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
@@ -327,21 +327,21 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
     # ##### 去重小程序模型更新结果,并另存为redis中
     model_key_name = get_rov_redis_key(now_date=now_date)
     model_data = redis_helper.get_all_data_from_zset(key_name=model_key_name, with_scores=True)
-    log_.info(f'model data count = {len(model_data)}')
+    # log_.info(f'model data count = {len(model_data)}')
 
     # 屏蔽视频过滤
     model_video_ids = [int(video_id) for video_id, _ in model_data]
     shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
     if shield_key_name_list is not None:
         model_video_ids = filter_shield_video(video_ids=model_video_ids, shield_key_name_list=shield_key_name_list)
-        log_.info(f"shield filtered_videos count = {len(model_video_ids)}")
+        # log_.info(f"shield filtered_videos count = {len(model_video_ids)}")
 
     model_data_dup = {}
     for video_id, score in model_data:
         if int(video_id) not in h_video_ids and int(video_id) in model_video_ids:
             model_data_dup[int(video_id)] = score
             h_video_ids.append(int(video_id))
-    log_.info(f"model data dup count = {len(model_data_dup)}")
+    # log_.info(f"model data dup count = {len(model_data_dup)}")
     model_data_dup_key_name = \
         f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}.{app_type}.{data_key}.{rule_key}." \
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
@@ -370,13 +370,15 @@ def merge_df(df_left, df_right):
 
 
 def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h):
-    log_.info(f"region = {region}")
+    log_.info(f"region = {region} start...")
     # 计算score
     region_df = df_merged[df_merged['code'] == region]
-    log_.info(f'region_df count = {len(region_df)}')
+    log_.info(f'region = {region}, region_df count = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
                region=region, app_type=app_type, data_key=data_key)
+    log_.info(f"region = {region} end!")
+
 
 
 def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
@@ -428,11 +430,37 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     # 获取特征数据
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
-    t = [
-        gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
-        for app_type, params in rule_params.items()
-    ]
-    gevent.joinall(t)
+    # t = [
+    #     gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
+    #     for app_type, params in rule_params.items()
+    # ]
+    # gevent.joinall(t)
+
+    for app_type, params in rule_params.items():
+        log_.info(f"app_type = {app_type} start...")
+        data_params_item = params.get('data_params')
+        rule_params_item = params.get('rule_params')
+        for param in params.get('params_list'):
+            log_.info(f"param = {param} start...")
+            data_key = param.get('data')
+            data_param = data_params_item.get(data_key)
+            log_.info(f"data_key = {data_key}, data_param = {data_param}")
+            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+            df_merged = reduce(merge_df, df_list)
+            rule_key = param.get('rule')
+            rule_param = rule_params_item.get(rule_key)
+            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+            task_list = []
+            for region in region_code_list:
+                t = Thread(target=process_with_region,
+                           args=(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
+                           )
+                t.start()
+                task_list.append(t)
+            for t in task_list:
+                t.join()
+            log_.info(f"param = {param} end!")
+        log_.info(f"app_type = {app_type} end!")
 
 
     # for app_type, params in rule_params.items():
@@ -549,6 +577,7 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
 
 
 def h_timer_check():
+    log_.info(f"region_h_data start...")
     rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
     project = config_.PROJECT_REGION_APP_TYPE
     table = config_.TABLE_REGION_APP_TYPE
@@ -573,6 +602,7 @@ def h_timer_check():
     else:
         # 数据没准备好,1分钟后重新检查
         Timer(60, h_timer_check).start()
+    log_.info(f"region_h_data end!")
 
 
 if __name__ == '__main__':

+ 40 - 10
region_rule_rank_h_by24h.py

@@ -10,7 +10,7 @@ import pandas as pd
 import math
 from functools import reduce
 from odps import ODPS
-from threading import Timer
+from threading import Timer, Thread
 from utils import RedisHelper, get_data_from_odps, filter_video_status, check_table_partition_exits
 from config import set_config
 from log import Log
@@ -147,11 +147,11 @@ def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key)
     h_recall_df = h_recall_df.drop_duplicates(subset=['videoid'], keep='first')
     h_recall_df['videoid'] = h_recall_df['videoid'].astype(int)
     h_recall_videos = h_recall_df['videoid'].to_list()
-    log_.info(f'day_recall videos count = {len(h_recall_videos)}')
+    # log_.info(f'day_recall videos count = {len(h_recall_videos)}')
 
     # 视频状态过滤
     filtered_videos = filter_video_status(h_recall_videos)
-    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
+    # log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
 
     # 写入对应的redis
     h_video_ids = []
@@ -193,14 +193,15 @@ def merge_df(df_left, df_right):
 
 
 def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h):
-    log_.info(f"region = {region}")
+    log_.info(f"region = {region} start...")
     # 计算score
     region_df = df_merged[df_merged['code'] == region]
-    log_.info(f'region_df count = {len(region_df)}')
+    log_.info(f'region = {region}, region_df count = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h,
                rule_key=rule_key, param=rule_param, region=region,
                app_type=app_type, data_key=data_key)
+    log_.info(f"region = {region} end!")
 
 
 def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
@@ -230,11 +231,38 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     # rank
-    t = [
-        gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
-        for app_type, params in rule_params.items()
-    ]
-    gevent.joinall(t)
+    # t = [
+    #     gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
+    #     for app_type, params in rule_params.items()
+    # ]
+    # gevent.joinall(t)
+
+    for app_type, params in rule_params.items():
+        log_.info(f"app_type = {app_type} start...")
+        data_params_item = params.get('data_params')
+        rule_params_item = params.get('rule_params')
+        for param in params.get('params_list'):
+            log_.info(f"param = {param} start...")
+            data_key = param.get('data')
+            data_param = data_params_item.get(data_key)
+            log_.info(f"data_key = {data_key}, data_param = {data_param}")
+            rule_key = param.get('rule')
+            rule_param = rule_params_item.get(rule_key)
+            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+            df_merged = reduce(merge_df, df_list)
+            task_list = []
+            for region in region_code_list:
+                t = Thread(target=process_with_region,
+                           args=(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
+                           )
+                t.start()
+                task_list.append(t)
+            for t in task_list:
+                t.join()
+            log_.info(f"param = {param} end!")
+        log_.info(f"app_type = {app_type} end!")
+
 
     # for app_type, params in rule_params.items():
     #     log_.info(f"app_type = {app_type}")
@@ -353,6 +381,7 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
 
 
 def h_timer_check():
+    log_.info(f"region_24h_data start...")
     rule_params = config_.RULE_PARAMS_REGION_24H_APP_TYPE
     project = config_.PROJECT_REGION_24H_APP_TYPE
     table = config_.TABLE_REGION_24H_APP_TYPE
@@ -375,6 +404,7 @@ def h_timer_check():
     else:
         # 数据没准备好,1分钟后重新检查
         Timer(60, h_timer_check).start()
+    log_.info(f"region_24h_data end!")
 
 
 if __name__ == '__main__':