Forráskód Böngészése

update with multiprocessing.Pool

liqian 2 éve
szülő
commit
416b667e50
2 módosított fájl, 38 hozzáadás és 7 törlés
  1. 13 5
      region_rule_rank_h.py
  2. 25 2
      region_rule_rank_h_by24h.py

+ 13 - 5
region_rule_rank_h.py

@@ -4,6 +4,7 @@
 # @Time: 2022/5/5 15:54
 # @Software: PyCharm
 
+import multiprocessing
 import os
 import gevent
 import datetime
@@ -17,7 +18,7 @@ from config import set_config
 from log import Log
 from check_video_limit_distribute import update_limit_video_score
 
-os.environ['NUMEXPR_MAX_THREADS'] = '16'
+# os.environ['NUMEXPR_MAX_THREADS'] = '16'
 
 config_, _ = set_config()
 log_ = Log()
@@ -385,7 +386,7 @@ def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_pa
 
 
 def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
-    log_.info(f"app_type = {app_type}")
+    log_.info(f"app_type = {app_type} start...")
     data_params_item = params.get('data_params')
     rule_params_item = params.get('rule_params')
     task_list = []
@@ -407,10 +408,9 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
             ]
         )
     gevent.joinall(task_list)
+    log_.info(f"app_type = {app_type} end!")
 
 
-
-    #
     # log_.info(f"app_type = {app_type}")
     # task_list = []
     # for data_key, data_param in params['data_params'].items():
@@ -439,6 +439,14 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     # ]
     # gevent.joinall(t)
 
+    pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
+    for app_type, params in rule_params.items():
+        pool.apply_async(func=process_with_app_type,
+                         args=(app_type, params, region_code_list, feature_df, now_date, now_h))
+    pool.close()
+    pool.join()
+
+    """
     for app_type, params in rule_params.items():
         log_.info(f"app_type = {app_type} start...")
         data_params_item = params.get('data_params')
@@ -465,7 +473,7 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
                 t.join()
             log_.info(f"param = {param} end!")
         log_.info(f"app_type = {app_type} end!")
-
+    """
 
     # for app_type, params in rule_params.items():
     #     log_.info(f"app_type = {app_type}")

+ 25 - 2
region_rule_rank_h_by24h.py

@@ -5,6 +5,7 @@
 # @Software: PyCharm
 
 import time
+import multiprocessing
 import os
 import gevent
 import datetime
@@ -209,7 +210,7 @@ def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_pa
 
 
 def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
-    log_.info(f"app_type = {app_type}")
+    log_.info(f"app_type = {app_type} start...")
     data_params_item = params.get('data_params')
     rule_params_item = params.get('rule_params')
     for param in params.get('params_list'):
@@ -228,6 +229,7 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
             for region in region_code_list
         ]
         gevent.joinall(task_list)
+    log_.info(f"app_type = {app_type} end!")
 
 
 def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
@@ -241,6 +243,27 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
     # ]
     # gevent.joinall(t)
 
+    """
+    ps = []
+    for app_type, params in rule_params.items():
+        p = multiprocessing.Process(target=process_with_app_type,
+                                    args=(app_type, params, region_code_list, feature_df, now_date, now_h))
+        ps.append(p)
+    for p in ps:
+        p.daemon = True
+        p.start()
+    for p in ps:
+        p.join()
+    """
+
+    pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
+    for app_type, params in rule_params.items():
+        pool.apply_async(func=process_with_app_type,
+                         args=(app_type, params, region_code_list, feature_df, now_date, now_h))
+    pool.close()
+    pool.join()
+
+    """
     for app_type, params in rule_params.items():
         log_.info(f"app_type = {app_type} start...")
         data_params_item = params.get('data_params')
@@ -273,7 +296,7 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
                 t.join()
             log_.info(f"param = {param} end!")
         log_.info(f"app_type = {app_type} end!")
-
+    """
 
     # for app_type, params in rule_params.items():
     #     log_.info(f"app_type = {app_type}")