liqian 1 năm trước cách đây
mục cha
commit
664b74f3b4

+ 6 - 0
config.py

@@ -861,6 +861,12 @@ class BaseConfig(object):
         'value': 1
     }
 
+    # 各召回池数据更新状态
+    RULE_24H_DATA_STATUS = 'rule:24h:data:status'
+    REGION_24H_DATA_STATUS = 'region:24h:data:status'
+    RULE_H_DATA_STATUS = 'rule:h:data:status'
+
+
     # ##################################### 广告模型配置 #####################################
 
     # 广告模型数据

+ 18 - 0
recommend_region_data_status_update.py

@@ -0,0 +1,18 @@
+from config import set_config
+from log import Log
+from db_helper import RedisHelper
+
+config_, _ = set_config()
+log_ = Log()
+
+redis_helper = RedisHelper()
+redis_helper.set_data_to_redis(
+    key_name=config_.RULE_24H_DATA_STATUS, value='0', expire_time=2 * 3600
+)
+redis_helper.set_data_to_redis(
+    key_name=config_.REGION_24H_DATA_STATUS, value='0', expire_time=2 * 3600
+)
+redis_helper.set_data_to_redis(
+    key_name=config_.RULE_H_DATA_STATUS, value='0', expire_time=2 * 3600
+)
+log_.info(f"recommend data status update to initial '0' finished!")

+ 106 - 10
region_rule_rank_h.py

@@ -672,11 +672,13 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     by_48h_rule_key = param.get('48h_rule_key', None)
     dup_remove = param.get('dup_remove', True)
     # 与其他召回视频池去重,存入对应的redis
-    dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
-                 region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
-                 by_48h_rule_key=by_48h_rule_key, region=region, data_key=data_key,
-                 rule_rank_h_flag=rule_rank_h_flag, political_filter=political_filter,
-                 shield_config=shield_config, dup_remove=dup_remove)
+    dup_to_redis_with_timecheck(
+        h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
+        region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
+        by_48h_rule_key=by_48h_rule_key, region=region, data_key=data_key,
+        rule_rank_h_flag=rule_rank_h_flag, political_filter=political_filter,
+        shield_config=shield_config, dup_remove=dup_remove
+    )
 
 
 def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
@@ -793,6 +795,98 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, h_rule_key, region_24h_
     #                        dup_key_name=model_data_dup_key_name, region=region)
 
 
+def dup_to_redis_with_timecheck(h_video_ids, now_date, now_h, rule_key, h_rule_key, region_24h_rule_key,
+                                by_24h_rule_key, by_48h_rule_key, region, data_key, rule_rank_h_flag,
+                                political_filter, shield_config, dup_remove):
+    """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
+    # 获取并判断其他数据表更新状态
+    redis_helper = RedisHelper()
+    rule_24h_status = redis_helper.get_data_from_redis(key_name=config_.RULE_24H_DATA_STATUS)
+    region_24h_status = redis_helper.get_data_from_redis(key_name=config_.REGION_24H_DATA_STATUS)
+    rule_h_status = redis_helper.get_data_from_redis(key_name=config_.RULE_H_DATA_STATUS)
+    if rule_24h_status == '1' and region_24h_status == '1' and rule_h_status == '1':
+        # ##### 去重更新不区分地域小时级列表,并另存为redis中
+        if h_rule_key is not None:
+            h_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name,
+                                   dup_key_name=h_dup_key_name, region=region, political_filter=political_filter,
+                                   shield_config=shield_config, dup_remove=dup_remove)
+
+        # ##### 去重更新地域分组小时级24h列表,并另存为redis中
+        region_24h_key_name = \
+            f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
+            f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        region_24h_dup_key_name = \
+            f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
+            f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
+                               dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter,
+                               shield_config=shield_config, dup_remove=dup_remove)
+
+        if rule_rank_h_flag == '48h':
+
+            # ##### 去重小程序相对48h更新结果,并另存为redis中
+            h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \
+                             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_48h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
+                                   dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter,
+                                   shield_config=shield_config, dup_remove=dup_remove)
+
+            # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
+            if by_48h_rule_key == 'rule1':
+                other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \
+                                       f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+                other_h_48h_dup_key_name = \
+                    f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
+                    f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+                h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
+                                       dup_key_name=other_h_48h_dup_key_name, region=region,
+                                       political_filter=political_filter, shield_config=shield_config,
+                                       dup_remove=dup_remove)
+
+        else:
+            # ##### 去重小程序相对24h更新结果,并另存为redis中
+            h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
+                             f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_24h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
+                                   dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter,
+                                   shield_config=shield_config, dup_remove=dup_remove)
+
+            # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
+            # if by_24h_rule_key in ['rule3', 'rule4']:
+            other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
+                                   f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            other_h_24h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
+                                   dup_key_name=other_h_24h_dup_key_name, region=region,
+                                   political_filter=political_filter,
+                                   shield_config=shield_config, dup_remove=dup_remove)
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(
+            60,
+            dup_to_redis_with_timecheck,
+            args=[h_video_ids, now_date, now_h, rule_key, h_rule_key, region_24h_rule_key,
+                  by_24h_rule_key, by_48h_rule_key, region, data_key, rule_rank_h_flag,
+                  political_filter, shield_config, dup_remove]
+        ).start()
+
+
+
+
 def merge_df(df_left, df_right):
     """
     df按照videoid, code 合并,对应特征求和
@@ -1118,11 +1212,13 @@ def h_bottom_process(param, rule_params_item, region_code_list, key_prefix, redi
         if len(final_data) > 0:
             redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 24 * 3600)
         # 与其他召回视频池去重,存入对应的redis
-        dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
-                     region_24h_rule_key=region_24h_rule_key, region=region,
-                     data_key=data_key, by_24h_rule_key=by_24h_rule_key,
-                     by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
-                     political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove)
+        dup_to_redis_with_timecheck(
+            h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key, h_rule_key=h_rule_key,
+            region_24h_rule_key=region_24h_rule_key, region=region,
+            data_key=data_key, by_24h_rule_key=by_24h_rule_key,
+            by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag,
+            political_filter=political_filter, shield_config=shield_config, dup_remove=dup_remove
+        )
     # 特殊城市视频数据准备
     for region, city_list in config_.REGION_CITY_MAPPING.items():
         t = [

+ 5 - 0
region_rule_rank_h_by24h.py

@@ -502,6 +502,11 @@ def h_timer_check():
         else:
             # 数据没准备好,1分钟后重新检查
             Timer(60, h_timer_check).start()
+        redis_helper = RedisHelper()
+        redis_helper.set_data_to_redis(
+            key_name=config_.REGION_24H_DATA_STATUS, value='1', expire_time=2 * 3600
+        )
+        log_.info(f"region_24h_data status update to '1' finished!")
 
     except Exception as e:
         log_.error(f"地域分组24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")

+ 16 - 12
region_rule_rank_h_task.sh

@@ -2,28 +2,32 @@ source /etc/profile
 echo $ROV_OFFLINE_ENV
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
     cd /data2/rov-offline
+    /root/anaconda3/bin/python /data2/rov-offline/recommend_region_data_status_update.py
+    echo "recommend data status update to initial '0' finished!"
     nohup /root/anaconda3/bin/python /data2/rov-offline/rule_rank_h_by_24h.py &
     nohup /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h_by24h.py &
     nohup /root/anaconda3/bin/python /data2/rov-offline/rule_rank_h_new.py &
-    while ps aux | grep "rule_rank_h_by_24h.py" | grep -v grep > /dev/null || ps aux | grep "region_rule_rank_h_by24h.py" | grep -v grep > /dev/null || ps aux | grep "rule_rank_h_new.py" | grep -v grep > /dev/null;
-    do
-      sleep 30
-    done
-    echo "24h, region_24h, h data update task finished!"
-    /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '24h'
+#    while ps aux | grep "rule_rank_h_by_24h.py" | grep -v grep > /dev/null || ps aux | grep "region_rule_rank_h_by24h.py" | grep -v grep > /dev/null || ps aux | grep "rule_rank_h_new.py" | grep -v grep > /dev/null;
+#    do
+#      sleep 30
+#    done
+#    echo "24h, region_24h, h data update task finished!"
+    nohup /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '24h' &
 #      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h_new.py
 #      /root/anaconda3/bin/python /data2/rov-offline/laohaokan_recommend_update.py
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
     cd /data/rov-offline
+    /root/anaconda3/bin/python /data/rov-offline/recommend_region_data_status_update.py
+    echo "recommend data status update to initial '0' finished!"
     nohup /root/anaconda3/bin/python /data/rov-offline/rule_rank_h_by_24h.py &
     nohup /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h_by24h.py &
     nohup /root/anaconda3/bin/python /data/rov-offline/rule_rank_h_new.py &
-    while ps aux | grep "rule_rank_h_by_24h.py" | grep -v grep > /dev/null || ps aux | grep "region_rule_rank_h_by24h.py" | grep -v grep > /dev/null || ps aux | grep "rule_rank_h_new.py" | grep -v grep > /dev/null;
-    do
-      sleep 30
-    done
-    echo "24h, region_24h, h data update task finished!"
-    /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '24h'
+#    while ps aux | grep "rule_rank_h_by_24h.py" | grep -v grep > /dev/null || ps aux | grep "region_rule_rank_h_by24h.py" | grep -v grep > /dev/null || ps aux | grep "rule_rank_h_new.py" | grep -v grep > /dev/null;
+#    do
+#      sleep 30
+#    done
+#    echo "24h, region_24h, h data update task finished!"
+    nohup /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '24h' &
 #      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h_new.py
 #      /root/anaconda3/bin/python /data/rov-offline/laohaokan_recommend_update.py
 fi

+ 5 - 0
rule_rank_h_by_24h.py

@@ -504,6 +504,11 @@ def h_timer_check():
         else:
             # 数据没准备好,1分钟后重新检查
             Timer(60, h_timer_check).start()
+        redis_helper = RedisHelper()
+        redis_helper.set_data_to_redis(
+            key_name=config_.RULE_24H_DATA_STATUS, value='1', expire_time=2 * 3600
+        )
+        log_.info(f"rule_24h_data status update to '1' finished!")
 
     except Exception as e:
         log_.error(f"不区分地域24h数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")

+ 5 - 0
rule_rank_h_new.py

@@ -283,6 +283,11 @@ def h_timer_check():
         else:
             # 数据没准备好,1分钟后重新检查
             Timer(60, h_timer_check).start()
+        redis_helper = RedisHelper()
+        redis_helper.set_data_to_redis(
+            key_name=config_.RULE_H_DATA_STATUS, value='1', expire_time=2 * 3600
+        )
+        log_.info(f"rule_h_data status update to '1' finished!")
 
     except Exception as e:
         log_.error(f"不区分地域小时级数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")