liqian 2 years ago
parent
commit
d05c88b6c5
3 changed files with 59 additions and 15 deletions
  1. 2 2
      config.py
  2. 53 13
      user_group_update.py
  3. 4 0
      utils.py

+ 2 - 2
config.py

@@ -1108,8 +1108,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
-    env = os.environ.get('ROV_OFFLINE_ENV')
-    # env = 'dev'
+    # env = os.environ.get('ROV_OFFLINE_ENV')
+    env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 53 - 13
user_group_update.py

@@ -4,7 +4,10 @@ import multiprocessing
 import time
 import traceback
 import gevent
+import asyncio
 from threading import Timer
+from concurrent.futures import ThreadPoolExecutor
+
 from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 from config import set_config
 from log import Log
@@ -23,6 +26,9 @@ redis_helper = RedisHelper()
 #     'return0share2_nmids'
 # ]
 
+mid_group_mapping_global = {}
+mids_global = []
+
 
 def to_redis(group, mid_list, class_key_list):
     log_.info(f"group = {group} update redis start ...")
@@ -66,22 +72,53 @@ def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
               f"execute time = {(time.time() - start_time) / 60}min")
 
 
-def get_mid_group_mapping(feature_df, group_list):
+def mapping_process(group, mid_list):
+    global mids_global, mid_group_mapping_global
+    for mid in mid_list:
+        if mid is None:
+            continue
+        if mid in mids_global:
+            mid_group_mapping_global[mid].append(group)
+        else:
+            mid_group_mapping_global[mid] = [group]
+            mids_global.append(mid)
+
+
+async def get_mid_group_mapping(feature_df, group_list):
     """获取mid对应的分组列表"""
-    mid_group_mapping = {}
-    mids = []
     for group in group_list:
+        start_time = time.time()
         mid_list = feature_df[group].tolist()
         mid_list = list(set(mid_list))
-        for mid in mid_list:
-            if mid is None:
-                continue
-            if mid in mids:
-                mid_group_mapping[mid].append(group)
-            else:
-                mid_group_mapping[mid] = [group]
-                mids.append(mid)
-    return mid_group_mapping, mids
+        log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
+        # pool = multiprocessing.Pool(processes=10)
+        # step = len(mid_list) // (10 - 1)
+        # for i in range(10 + 1):
+        #     process_mid_list = mid_list[i * step:(i + 1) * step]
+        #     pool.apply_async(func=mapping_process, args=(group, process_mid_list))
+        # pool.close()
+        # pool.join()
+
+        step = len(mid_list) // (10 - 1)
+        loop = asyncio.get_running_loop()
+        executor = ThreadPoolExecutor(max_workers=10)
+        tasks = []
+        for i in range(10 + 1):
+            process_mid_list = mid_list[i * step:(i + 1) * step]
+            tasks.append(loop.run_in_executor(executor, mapping_process, group, process_mid_list))
+        await asyncio.wait(tasks)
+        log_.info(f"group = {group} mid mapping finished! execute time = {(time.time() - start_time) / 60}min")
+
+        # for mid in mid_list:
+        #     if mid is None:
+        #         continue
+        #     if mid in mids:
+        #         mid_group_mapping[mid].append(group)
+        #     else:
+        #         mid_group_mapping[mid] = [group]
+        #         mids.append(mid)
+    # mid_group_mapping, mids = mid_group_mapping_global, mids_global
+    # return mid_group_mapping, mids
 
 
 def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
@@ -93,7 +130,10 @@ def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_m
     # print(len(feature_df))
 
     group_list = [group for group in ad_mid_group_key_params]
-    mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
+    # mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
+    asyncio.run(get_mid_group_mapping(feature_df=feature_df, group_list=group_list))
+    global mid_group_mapping_global, mids_global
+    mid_group_mapping, mids = mid_group_mapping_global, mids_global
 
     pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
     step = len(mids) // (len(ad_mid_group_key_params) - 1)

+ 4 - 0
utils.py

@@ -477,11 +477,15 @@ def get_feature_data(project, table, features, dt):
     """获取特征数据"""
     records = get_data_from_odps(date=dt, project=project, table=table)
     feature_data = []
+    i = 0
     for record in records:
+        if i > 1000:
+            break
         item = {}
         for feature_name in features:
             item[feature_name] = record[feature_name]
         feature_data.append(item)
+        i += 1
     feature_df = pd.DataFrame(feature_data)
     return feature_df