liqian 2 年之前
父节点
当前提交
b6350c3bfa
共有 3 个文件被更改,包括 86 次插入36 次删除
  1. 40 15
      region_rule_rank_h.py
  2. 38 16
      region_rule_rank_h_by24h.py
  3. 8 5
      videos_filter.py

+ 40 - 15
region_rule_rank_h.py

@@ -314,23 +314,48 @@ def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_pa
                region=region, app_type=app_type, data_key=data_key)
 
 
+def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
+    log_.info(f"app_type = {app_type}")
+    task_list = []
+    for data_key, data_param in params['data_params'].items():
+        log_.info(f"data_key = {data_key}, data_param = {data_param}")
+        df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+        df_merged = reduce(merge_df, df_list)
+        for rule_key, rule_param in params['rule_params'].items():
+            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+            task_list.extend(
+                [
+                    gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
+                                 now_date, now_h)
+                    for region in region_code_list
+                ]
+            )
+    gevent.joinall(task_list)
+
+
 def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     # 获取特征数据
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
-    for app_type, params in rule_params.items():
-        log_.info(f"app_type = {app_type}")
-        for data_key, data_param in params['data_params'].items():
-            log_.info(f"data_key = {data_key}, data_param = {data_param}")
-            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
-            df_merged = reduce(merge_df, df_list)
-            for rule_key, rule_param in params['rule_params'].items():
-                log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-                task_list = [
-                    gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
-                    for region in region_code_list
-                ]
-                gevent.joinall(task_list)
+    t = [
+        gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
+        for app_type, params in rule_params.items()
+    ]
+    gevent.joinall(t)
+
+    # for app_type, params in rule_params.items():
+    #     log_.info(f"app_type = {app_type}")
+    #     for data_key, data_param in params['data_params'].items():
+    #         log_.info(f"data_key = {data_key}, data_param = {data_param}")
+    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+    #         df_merged = reduce(merge_df, df_list)
+    #         for rule_key, rule_param in params['rule_params'].items():
+    #             log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+    #             task_list = [
+    #                 gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h)
+    #                 for region in region_code_list
+    #             ]
+    #             gevent.joinall(task_list)
 
     # rank
     # for key, value in rule_params.items():
@@ -413,9 +438,9 @@ def h_timer_check():
     project = config_.PROJECT_REGION_APP_TYPE
     table = config_.TABLE_REGION_APP_TYPE
     region_code_list = [code for region, code in region_code.items()]
-    now_date = datetime.datetime.today()
+    now_date = datetime.datetime.today() - datetime.timedelta(hours=1)
     log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
-    now_h = datetime.datetime.now().hour
+    now_h = datetime.datetime.now().hour - 1
     now_min = datetime.datetime.now().minute
     if now_h == 0:
         h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)

+ 38 - 16
region_rule_rank_h_by24h.py

@@ -199,25 +199,47 @@ def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_pa
                app_type=app_type, data_key=data_key)
 
 
+def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
+    log_.info(f"app_type = {app_type}")
+    for data_key, data_param in params['data_params'].items():
+        log_.info(f"data_key = {data_key}, data_param = {data_param}")
+        df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+        df_merged = reduce(merge_df, df_list)
+        for rule_key, rule_param in params['rule_params'].items():
+            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+            task_list = [
+                gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
+                             now_date, now_h)
+                for region in region_code_list
+            ]
+            gevent.joinall(task_list)
+
+
 def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
     # 获取特征数据
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     # rank
-    for app_type, params in rule_params.items():
-        log_.info(f"app_type = {app_type}")
-        for data_key, data_param in params['data_params'].items():
-            log_.info(f"data_key = {data_key}, data_param = {data_param}")
-            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
-            df_merged = reduce(merge_df, df_list)
-            for rule_key, rule_param in params['rule_params'].items():
-                log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-                task_list = [
-                    gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
-                                 now_date, now_h)
-                    for region in region_code_list
-                ]
-                gevent.joinall(task_list)
+    t = [
+        gevent.spawn(process_with_app_type, app_type, params, region_code_list, feature_df, now_date, now_h)
+        for app_type, params in rule_params.items()
+    ]
+    gevent.joinall(t)
+
+    # for app_type, params in rule_params.items():
+    #     log_.info(f"app_type = {app_type}")
+    #     for data_key, data_param in params['data_params'].items():
+    #         log_.info(f"data_key = {data_key}, data_param = {data_param}")
+    #         df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+    #         df_merged = reduce(merge_df, df_list)
+    #         for rule_key, rule_param in params['rule_params'].items():
+    #             log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+    #             task_list = [
+    #                 gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
+    #                              now_date, now_h)
+    #                 for region in region_code_list
+    #             ]
+    #             gevent.joinall(task_list)
 
 
     # for key, value in rule_params.items():
@@ -325,8 +347,8 @@ def h_timer_check():
     project = config_.PROJECT_REGION_24H_APP_TYPE
     table = config_.TABLE_REGION_24H_APP_TYPE
     region_code_list = [code for region, code in region_code.items() if code != '-1']
-    now_date = datetime.datetime.today()
-    now_h = datetime.datetime.now().hour
+    now_date = datetime.datetime.today() - datetime.timedelta(hours=1)
+    now_h = datetime.datetime.now().hour - 1
     now_min = datetime.datetime.now().minute
     log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
     # 查看当天更新的数据是否已准备好

+ 8 - 5
videos_filter.py

@@ -633,17 +633,20 @@ def filter_region_videos():
     # 获取当前所在小时
     now_h = datetime.now().hour
     log_.info(f'now_date = {now_date}, now_h = {now_h}.')
+    task_list = []
     for app_type, params in rule_params.items():
         log_.info(f"app_type = {app_type}")
         for data_key, data_param in params['data_params'].items():
             log_.info(f"data_key = {data_key}, data_param = {data_param}")
             for rule_key, rule_param in params['rule_params'].items():
                 log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
-                task_list = [
-                    gevent.spawn(filter_process_with_region, app_type, data_key, rule_key, region, now_date, now_h)
-                    for region in region_code_list
-                ]
-                gevent.joinall(task_list)
+                task_list.extend(
+                    [
+                        gevent.spawn(filter_process_with_region, app_type, data_key, rule_key, region, now_date, now_h)
+                        for region in region_code_list
+                    ]
+                )
+    gevent.joinall(task_list)
     log_.info("region_h videos filter end!")