|
@@ -4,6 +4,9 @@
|
|
|
# @Time: 2022/5/5 15:54
|
|
|
# @Software: PyCharm
|
|
|
|
|
|
+import time
|
|
|
+import multiprocessing
|
|
|
+import os
|
|
|
import gevent
|
|
|
import datetime
|
|
|
import pandas as pd
|
|
@@ -15,6 +18,8 @@ from utils import RedisHelper, get_data_from_odps, filter_video_status, check_ta
|
|
|
from config import set_config
|
|
|
from log import Log
|
|
|
|
|
|
+# os.environ['NUMEXPR_MAX_THREADS'] = '16'
|
|
|
+
|
|
|
config_, _ = set_config()
|
|
|
log_ = Log()
|
|
|
|
|
@@ -205,7 +210,7 @@ def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_pa
|
|
|
|
|
|
|
|
|
def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
|
|
|
- log_.info(f"app_type = {app_type}")
|
|
|
+ log_.info(f"app_type = {app_type} start...")
|
|
|
data_params_item = params.get('data_params')
|
|
|
rule_params_item = params.get('rule_params')
|
|
|
for param in params.get('params_list'):
|
|
@@ -224,6 +229,7 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
|
|
|
for region in region_code_list
|
|
|
]
|
|
|
gevent.joinall(task_list)
|
|
|
+ log_.info(f"app_type = {app_type} end!")
|
|
|
|
|
|
|
|
|
def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
@@ -237,6 +243,27 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
# ]
|
|
|
# gevent.joinall(t)
|
|
|
|
|
|
+ """
|
|
|
+ ps = []
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
+ p = multiprocessing.Process(target=process_with_app_type,
|
|
|
+ args=(app_type, params, region_code_list, feature_df, now_date, now_h))
|
|
|
+ ps.append(p)
|
|
|
+ for p in ps:
|
|
|
+ p.daemon = True
|
|
|
+ p.start()
|
|
|
+ for p in ps:
|
|
|
+ p.join()
|
|
|
+ """
|
|
|
+
|
|
|
+ pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
|
|
|
+ for app_type, params in rule_params.items():
|
|
|
+ pool.apply_async(func=process_with_app_type,
|
|
|
+ args=(app_type, params, region_code_list, feature_df, now_date, now_h))
|
|
|
+ pool.close()
|
|
|
+ pool.join()
|
|
|
+
|
|
|
+ """
|
|
|
for app_type, params in rule_params.items():
|
|
|
log_.info(f"app_type = {app_type} start...")
|
|
|
data_params_item = params.get('data_params')
|
|
@@ -251,6 +278,13 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
|
|
|
df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
|
|
|
df_merged = reduce(merge_df, df_list)
|
|
|
+
|
|
|
+ # thread_pool = ThreadPoolExecutor(max_workers=5)
|
|
|
+ # for region in region_code_list:
|
|
|
+ # t = thread_pool.submit(process_with_region,
|
|
|
+ # (region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h))
|
|
|
+ # thread_pool.shutdown(wait=True)
|
|
|
+
|
|
|
task_list = []
|
|
|
for region in region_code_list:
|
|
|
t = Thread(target=process_with_region,
|
|
@@ -262,7 +296,7 @@ def rank_by_24h(project, table, now_date, now_h, rule_params, region_code_list):
|
|
|
t.join()
|
|
|
log_.info(f"param = {param} end!")
|
|
|
log_.info(f"app_type = {app_type} end!")
|
|
|
-
|
|
|
+ """
|
|
|
|
|
|
# for app_type, params in rule_params.items():
|
|
|
# log_.info(f"app_type = {app_type}")
|