liqian 2 years ago
parent
commit
526da19af3
1 changed files with 30 additions and 17 deletions
  1. 30 17
      user_group_update.py

+ 30 - 17
user_group_update.py

@@ -1,5 +1,6 @@
 import datetime
 import multiprocessing
+import time
 import traceback
 import gevent
 from threading import Timer
@@ -22,13 +23,21 @@ features = [
 ]
 
 
-def to_redis(group, mid_temp_list):
-    task_list = [
-        gevent.spawn(redis_helper.set_data_to_redis,
-                     f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", group, 25 * 3600)
-        for mid in mid_temp_list
-    ]
-    gevent.joinall(task_list)
+def to_redis(group, mid_list):
+    log_.info(f"group = {group} update redis start ...")
+    start_time = time.time()
+    log_.info(f"mid count = {len(mid_list)}")
+    for i in range(len(mid_list) // 100 + 1):
+        # log_.info(f"i = {i}")
+        mid_temp_list = mid_list[i * 100:(i + 1) * 100]
+        task_list = [
+            gevent.spawn(redis_helper.set_data_to_redis,
+                         f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", group, 25 * 3600)
+            for mid in mid_temp_list
+        ]
+        gevent.joinall(task_list)
+    log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
+              f"execute time = {(time.time() - start_time) / 60}min")
 
 
 def update_user_group_to_redis(project, table, dt, app_type):
@@ -38,19 +47,22 @@ def update_user_group_to_redis(project, table, dt, app_type):
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     feature_df = feature_df[feature_df['apptype'] == app_type]
     group_list = features[1:]
+    pool = multiprocessing.Pool(processes=len(group_list))
     for group in group_list:
-        log_.info(f"group = {group} update redis start ...")
+        # log_.info(f"group = {group} update redis start ...")
+        # start_time = time.time()
         mid_list = feature_df[group].tolist()
         mid_list = list(set(mid_list))
         mid_list = [mid for mid in mid_list if mid is not None]
-        log_.info(f"mid count = {len(mid_list)}")
-        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
-        for i in range(len(mid_list)//100+1):
-            log_.info(f"i = {i}")
-            mid_temp_list = mid_list[i*100:(i+1)*100]
-            pool.apply_async(func=to_redis, args=(group, mid_temp_list))
-        pool.close()
-        pool.join()
+        # log_.info(f"mid count = {len(mid_list)}")
+        pool.apply_async(func=to_redis, args=(group, mid_list))
+
+        # for i in range(len(mid_list)//100+1):
+        #     log_.info(f"i = {i}")
+        #     mid_temp_list = mid_list[i*100:(i+1)*100]
+        #     pool.apply_async(func=to_redis, args=(group, mid_temp_list))
+    pool.close()
+    pool.join()
 
         # for mid in mid_list:
         #     # print(mid)
@@ -62,7 +74,8 @@ def update_user_group_to_redis(project, table, dt, app_type):
         #     )
         # pool.close()
         # pool.join()
-        log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished!")
+        # log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
+        #           f"execute time = {(time.time()-start_time)/60}min")
 
 
 def timer_check():