Browse Source

Merge branch 'ad-recommend-20221018' into test

liqian 2 years ago
parent
commit
468ed6df88
2 changed files with 27 additions and 18 deletions
  1. 3 3
      db_helper.py
  2. 24 15
      user_group_update.py

+ 3 - 3
db_helper.py

@@ -346,6 +346,6 @@ if __name__ == '__main__':
     # res = redis_helper.get_score_with_value(key, 90797)
     # print(res)
     # redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=(8633849,))
-    data = redis_helper.get_all_data_from_zset(key_name="com.weiqu.video.recall.hot.item.score.20220622", with_scores=True)
-    print(data[:10])
-    print(len(data))
+    con = redis_helper.connect()
+    res = redis_helper.key_exists(key_name='eeew')
+    print(res)

+ 24 - 15
user_group_update.py

@@ -1,6 +1,8 @@
 import datetime
 import multiprocessing
+import time
 import traceback
+import gevent
 from threading import Timer
 from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
 from config import set_config
@@ -21,6 +23,23 @@ features = [
 ]
 
 
+def to_redis(group, mid_list):
+    log_.info(f"group = {group} update redis start ...")
+    start_time = time.time()
+    log_.info(f"mid count = {len(mid_list)}")
+    for i in range(len(mid_list) // 100 + 1):
+        # log_.info(f"i = {i}")
+        mid_temp_list = mid_list[i * 100:(i + 1) * 100]
+        task_list = [
+            gevent.spawn(redis_helper.set_data_to_redis,
+                         f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}", group, 26 * 3600)
+            for mid in mid_temp_list
+        ]
+        gevent.joinall(task_list)
+    log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished! "
+              f"execute time = {(time.time() - start_time) / 60}min")
+
+
 def update_user_group_to_redis(project, table, dt, app_type):
     """更新mid对应分组到redis中"""
     # 获取用户分组数据
@@ -28,24 +47,14 @@ def update_user_group_to_redis(project, table, dt, app_type):
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     feature_df = feature_df[feature_df['apptype'] == app_type]
     group_list = features[1:]
+    pool = multiprocessing.Pool(processes=len(group_list))
     for group in group_list:
-        log_.info(f"group = {group} update redis start ...")
         mid_list = feature_df[group].tolist()
         mid_list = list(set(mid_list))
         mid_list = [mid for mid in mid_list if mid is not None]
-        log_.info(f"mid count = {len(mid_list)}")
-        # pool = multiprocessing.Pool(processes=2)
-        for mid in mid_list:
-            # print(mid)
-            key_name = f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}"
-            redis_helper.set_data_to_redis(key_name=key_name, value=group, expire_time=25 * 3600)
-        #     pool.apply_async(
-        #         func=redis_helper.set_data_to_redis,
-        #         args=(key_name, group, 25 * 3600)
-        #     )
-        # pool.close()
-        # pool.join()
-        log_.info(f"group = {group}, mid count = {len(mid_list)}, update redis finished!")
+        pool.apply_async(func=to_redis, args=(group, mid_list))
+    pool.close()
+    pool.join()
 
 
 def timer_check():
@@ -53,7 +62,7 @@ def timer_check():
         app_type = config_.APP_TYPE['VLOG']
         project = config_.ad_model_data['user_group'].get('project')
         table = config_.ad_model_data['user_group'].get('table')
-        now_date = datetime.datetime.today() - datetime.timedelta(days=1)
+        now_date = datetime.datetime.today()
         dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         log_.info(f"now_date: {dt}")
         now_min = datetime.datetime.now().minute