浏览代码

user group update opt

liqian 2 年之前
父节点
当前提交
33899e15bc
共有 1 个文件被更改,包括 51 次插入1 次删除
  1. 51 1
      user_group_update.py

+ 51 - 1
user_group_update.py

@@ -43,6 +43,48 @@ def to_redis(group, mid_list, class_key_list):
               f"execute time = {(time.time() - start_time) / 60}min")
 
 
+def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
+    log_.info(f"mid count = {len(process_mid_list)}")
+    start_time = time.time()
+    for i in range(len(process_mid_list) // 100 + 1):
+        mid_temp_list = process_mid_list[i * 100:(i + 1) * 100]
+        task_list = []
+        for mid in mid_temp_list:
+            group_list = mid_group_mapping.get(mid)
+            mid_value = {}
+            for group in group_list:
+                for class_key in ad_mid_group_key_params.get(group, []):
+                    mid_value[class_key] = group
+            if len(mid_value) > 0:
+                task_list.append(
+                    gevent.spawn(redis_helper.set_data_to_redis,
+                                 f"{config_.KEY_NAME_PREFIX_MID_GROUP}:{mid}",
+                                 str(mid_value),
+                                 26 * 3600)
+                )
+        gevent.joinall(task_list)
+    log_.info(f"mid count = {len(process_mid_list)}, update redis finished! "
+              f"execute time = {(time.time() - start_time) / 60}min")
+
+
+def get_mid_group_mapping(feature_df, group_list):
+    """获取mid对应的分组列表"""
+    mid_group_mapping = {}
+    mids = []
+    for group in group_list:
+        mid_list = feature_df[group].tolist()
+        mid_list = list(set(mid_list))
+        for mid in mid_list:
+            if mid is None:
+                continue
+            if mid in mids:
+                mid_group_mapping[mid].append(group)
+            else:
+                mid_group_mapping[mid] = [group]
+                mids.append(mid)
+    return mid_group_mapping, mids
+
+
 def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
     """更新mid对应分组到redis中"""
     # 获取用户分组数据
@@ -50,8 +92,16 @@ def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_m
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     feature_df = feature_df[feature_df['apptype'].isin(app_type_list)]
     # print(len(feature_df))
-    # group_list = features[1:]
+
+    group_list = [group for group in ad_mid_group_key_params]
+    mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
+
     pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
+    step = len(mids) / (len(ad_mid_group_key_params) - 1)
+    for i in range(len(ad_mid_group_key_params) + 1):
+        process_mid_list = mids[i*step:(i+1)*step]
+        pool.apply_async(func=to_redis2, args=(process_mid_list, mid_group_mapping, ad_mid_group_key_params))
+
     for group, class_key_list in ad_mid_group_key_params.items():
         mid_list = feature_df[group].tolist()
         mid_list = list(set(mid_list))