浏览代码

Merge branch 'user-group-update-opt-20221201' into user-group-update-opt-20221206

liqian 2 年之前
父节点
当前提交
68e1d899b1
共有 2 个文件被更改,包括 57 次插入8 次删除
  1. 1 1
      config.py
  2. 56 7
      user_group_update.py

+ 1 - 1
config.py

@@ -780,7 +780,7 @@ class BaseConfig(object):
     KEY_NAME_PREFIX_AD_GROUP = 'ad:users:group:predict:share:rate:'
     # 视频有广告时的分享率预测结果存放 redis key 前缀,完整格式:ad:video:predict:share:rate:{video_data_key}:{date}
     KEY_NAME_PREFIX_AD_VIDEO = 'ad:video:predict:share:rate:'
-    # 用户分组结果存放 redis key 前缀,完整格式:mid:group:{class_key}:{mid}
+    # 用户分组结果存放 redis key 前缀,完整格式:mid:group:{mid}
     KEY_NAME_PREFIX_MID_GROUP = 'mid:group:'
     # 广告推荐阈值结果存放 redis key 前缀,完整格式:ad:threshold:{abtestId}:{abtestConfigTag}:{group}
     KEY_NAME_PREFIX_AD_THRESHOLD = 'ad:threshold:'

+ 56 - 7
user_group_update.py

@@ -43,6 +43,47 @@ def to_redis(group, mid_list, class_key_list):
               f"execute time = {(time.time() - start_time) / 60}min")
 
 
+def to_redis2(process_mid_list, mid_group_mapping, ad_mid_group_key_params):
+    log_.info(f"mid count = {len(process_mid_list)}")
+    start_time = time.time()
+    for i in range(len(process_mid_list) // 100 + 1):
+        mid_temp_list = process_mid_list[i * 100:(i + 1) * 100]
+        task_list = []
+        for mid in mid_temp_list:
+            group_list = mid_group_mapping.get(mid)
+            mid_value = {}
+            for group in group_list:
+                for class_key in ad_mid_group_key_params.get(group, []):
+                    mid_value[class_key] = group
+            # print(f"mid={mid}, mid_value={mid_value}")
+            if len(mid_value) > 0:
+                task_list.append(
+                    gevent.spawn(redis_helper.set_data_to_redis,
+                                 f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", str(mid_value), 28 * 3600)
+                )
+        gevent.joinall(task_list)
+    log_.info(f"mid count = {len(process_mid_list)}, update redis finished! "
+              f"execute time = {(time.time() - start_time) / 60}min")
+
+
+def get_mid_group_mapping(feature_df, group_list):
+    """获取mid对应的分组列表"""
+    mid_group_mapping = {}
+    mids = []
+    for group in group_list:
+        mid_list = feature_df[group].tolist()
+        mid_list = list(set(mid_list))
+        for mid in mid_list:
+            if mid is None:
+                continue
+            if mid in mids:
+                mid_group_mapping[mid].append(group)
+            else:
+                mid_group_mapping[mid] = [group]
+                mids.append(mid)
+    return mid_group_mapping, mids
+
+
 def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_mid_group_key_params):
     """更新mid对应分组到redis中"""
     # 获取用户分组数据
@@ -50,14 +91,22 @@ def update_user_group_to_redis(project, table, dt, app_type_list, features, ad_m
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     feature_df = feature_df[feature_df['apptype'].isin(app_type_list)]
     # print(len(feature_df))
-    # group_list = features[1:]
+
+    group_list = [group for group in ad_mid_group_key_params]
+    mid_group_mapping, mids = get_mid_group_mapping(feature_df=feature_df, group_list=group_list)
+
     pool = multiprocessing.Pool(processes=len(ad_mid_group_key_params))
-    for group, class_key_list in ad_mid_group_key_params.items():
-        mid_list = feature_df[group].tolist()
-        mid_list = list(set(mid_list))
-        mid_list = [mid for mid in mid_list if mid is not None]
-        # class_key_list = ad_mid_group_key_params.get(group)
-        pool.apply_async(func=to_redis, args=(group, mid_list, class_key_list))
+    step = len(mids) // (len(ad_mid_group_key_params) - 1)
+    for i in range(len(ad_mid_group_key_params) + 1):
+        process_mid_list = mids[i*step:(i+1)*step]
+        pool.apply_async(func=to_redis2, args=(process_mid_list, mid_group_mapping, ad_mid_group_key_params))
+
+    # for group, class_key_list in ad_mid_group_key_params.items():
+    #     mid_list = feature_df[group].tolist()
+    #     mid_list = list(set(mid_list))
+    #     mid_list = [mid for mid in mid_list if mid is not None]
+    #     # class_key_list = ad_mid_group_key_params.get(group)
+    #     pool.apply_async(func=to_redis, args=(group, mid_list, class_key_list))
     pool.close()
     pool.join()