liqian 2 年之前
父节点
当前提交
e8ddb0d92b
共有 1 个文件被更改,包括 47 次插入1 次删除
  1. 47 1
      user_group_update.py

+ 47 - 1
user_group_update.py

@@ -84,6 +84,52 @@ def mapping_process(group, mid_list):
             mids_global.append(mid)
 
 
+def mapping_process2(mid_list, group_list, group_mid_list):
+    global mid_group_mapping_global
+    for mid in mid_list:
+        if mid is None:
+            continue
+        mid_group = [group for group in group_list if mid in group_mid_list.get(group, [])]
+        mid_group_mapping_global[mid] = mid_group
+
+
+async def get_mid_group_mapping2(feature_df, group_list):
+    """获取mid对应的分组列表"""
+    start_time = time.time()
+    group_mid_list = {}
+    mid_list_all = []
+    for group in group_list:
+        start_time = time.time()
+        mid_list = feature_df[group].tolist()
+        mid_list = list(set(mid_list))
+        group_mid_list[group] = mid_list
+        mid_list_all.extend(mid_list)
+        log_.info(f"group = {group}, mid_list_count = {len(mid_list)}")
+
+    mid_list_all = list(set(mid_list_all))
+    global mids_global, mid_group_mapping_global
+    mids_global = mid_list_all
+    step = 10000
+    loop = asyncio.get_running_loop()
+    executor = ThreadPoolExecutor(max_workers=20)
+    tasks = []
+    for i in range(len(mid_list_all) // step + 1):
+        log_.info(f"i = {i}")
+        process_mid_list = mid_list_all[i * step:(i + 1) * step]
+        tasks.append(loop.run_in_executor(executor, mapping_process2, process_mid_list, group_list, group_mid_list))
+    await asyncio.wait(tasks)
+    # mid_group_mapping = {}
+    # for mid in mid_list_all:
+    #     if mid is None:
+    #         continue
+    #     mid_group = [group for group in group_list if mid in group_mid_list.get(group, [])]
+    #     mid_group_mapping[mid] = mid_group
+    # return mid_group_mapping, mid_list_all
+    log_.info(f"group mid mapping finished! "
+              f"mid_count = {len(mids_global)}, mid_group_mapping_count = {len(mid_group_mapping_global)}, "
+              f"execute time = {(time.time() - start_time) / 60}min")
+
+
 async def get_mid_group_mapping(feature_df, group_list):
     """获取mid对应的分组列表"""
     for group in group_list:
@@ -136,7 +182,7 @@ def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_m
 
     group_list = [group for group in ad_mid_group_key_params]
     # mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
-    asyncio.run(get_mid_group_mapping(feature_df=feature_df, group_list=group_list))
+    asyncio.run(get_mid_group_mapping2(feature_df=feature_df, group_list=group_list))
     global mid_group_mapping_global, mids_global
     mid_group_mapping, mids = mid_group_mapping_global, mids_global