liqian 2 년 전
부모
커밋
2b59d835f9
1개의 변경된 파일22개의 추가작업 그리고 5개의 파일을 삭제
  1. 22 5
      user_group_update.py

+ 22 - 5
user_group_update.py

@@ -1,6 +1,7 @@
 import datetime
 import multiprocessing
 import traceback
+import gevent
 from threading import Timer
 from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 from config import set_config
@@ -21,6 +22,15 @@ features = [
 ]
 
 
+def to_redis(group, mid_temp_list):
+    task_list = [
+        gevent.spawn(redis_helper.set_data_to_redis,
+                     f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", group, 25 * 3600)
+        for mid in mid_temp_list
+    ]
+    gevent.joinall(task_list)
+
+
 def update_user_group_to_redis(project, table, dt, app_type):
     """更新mid对应分组到redis中"""
     # 获取用户分组数据
@@ -34,11 +44,18 @@ def update_user_group_to_redis(project, table, dt, app_type):
         mid_list = list(set(mid_list))
         mid_list = [mid for mid in mid_list if mid is not None]
         log_.info(f"mid count = {len(mid_list)}")
-        # pool = multiprocessing.Pool(processes=2)
-        for mid in mid_list:
-            # print(mid)
-            key_name = f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}"
-            redis_helper.set_data_to_redis(key_name=key_name, value=group, expire_time=25 * 3600)
+        pool = multiprocessing.Pool(processes=2)
+        for i in range(len(mid_list)//100+1):
+            log_.info(f"i = {i}")
+            mid_temp_list = mid_list[i*100:(i+1)*100]
+            pool.apply_async(func=to_redis, args=(group, mid_temp_list))
+        pool.close()
+        pool.join()
+
+        # for mid in mid_list:
+        #     # print(mid)
+        #     key_name = f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}"
+        #     redis_helper.set_data_to_redis(key_name=key_name, value=group, expire_time=25 * 3600)
         #     pool.apply_async(
         #         func=redis_helper.set_data_to_redis,
         #         args=(key_name, group, 25 * 3600)