Jelajahi Sumber

add rule_rank_day_by_30day data

liqian 2 tahun lalu
induk
melakukan
3d319068be

+ 12 - 0
check_video_limit_distribute.py

@@ -230,6 +230,18 @@ def check_region_videos(rule_params):
         ]
         gevent.joinall(task_list)
 
+    # 将已超分发视频 移除 天级更新30天列表
+    day30_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
+    for param in rule_params.get('params_list'):
+        data_key = param.get('data')
+        rule_key = param.get('rule')
+        log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
+        key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}"
+        if not redis_helper.key_exists(key_name=key_name):
+            redis_date = now_date - datetime.timedelta(days=1)
+            key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(redis_date, '%Y%m%d')}"
+        redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
+
     # 将已超分发视频 移除 原始大列表
     # key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
     # if not redis_helper.key_exists(key_name=key_name):

+ 1 - 1
config.py

@@ -171,7 +171,7 @@ class BaseConfig(object):
     # 天级更新过去30天数据规则参数
     RULE_PARAMS_30DAY_APP_TYPE = {
         'rule_params': {
-            'rule1': {'return_count': 100, 'platform_return_rate': 0.001, 'view_type': 'preview'},
+            'rule1': {'top': 500, 'view_type': 'preview'},
         },
         'data_params': DATA_PARAMS,
         'params_list': [

+ 3 - 0
region_rule_rank_h.py

@@ -195,8 +195,10 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
     if by_30day_rule_key is not None:
         # 与相对30天列表去重
         h_video_ids = get_day_30day_videos(now_date=now_date, data_key=data_key, rule_key=by_30day_rule_key)
+        log_.info(f"h_video_ids count = {len(h_video_ids)}")
         if h_video_ids is not None:
             filtered_videos = [video_id for video_id in filtered_videos if int(video_id) not in h_video_ids]
+            log_.info(f"filtered_videos count = {len(filtered_videos)}")
 
     h_recall_result = {}
     for video_id in filtered_videos:
@@ -208,6 +210,7 @@ def video_rank(df, now_date, now_h, rule_key, param, region, data_key, rule_rank
         f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
     if len(h_recall_result) > 0:
+        log_.info(f"h_recall_result count = {len(h_recall_result)}")
         redis_helper.add_data_with_zset(key_name=h_recall_key_name, data=h_recall_result, expire_time=23 * 3600)
         # 限流视频score调整
         update_limit_video_score(initial_videos=h_recall_result, key_name=h_recall_key_name)

+ 10 - 12
rule_rank_day_by_30day.py

@@ -110,26 +110,24 @@ def video_rank_h(df, now_date, rule_key, param, data_key):
     df = df.drop_duplicates(subset=['videoid'], keep='first')
     df['videoid'] = df['videoid'].astype(int)
 
-    # 获取符合进入召回源条件的视频
-    return_count = param.get('return_count')
-    if return_count:
-        day_recall_df = df[df['回流人数'] > return_count]
-    else:
-        day_recall_df = df
-    platform_return_rate = param.get('platform_return_rate', 0)
-    day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] > platform_return_rate]
-    day_recall_videos = day_recall_df['videoid'].to_list()
+    day_recall_videos = df['videoid'].to_list()
     log_.info(f'day_by30day_recall videos count = {len(day_recall_videos)}')
 
     # 视频状态过滤
     filtered_videos = filter_video_status(day_recall_videos)
     log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
 
+    # 获取top视频
+    top = param.get('top')
+    day_recall_df = df[df['videoid'].isin(filtered_videos)]
+    day_recall_df = day_recall_df.sort_values(by=['score'], ascending=False)
+    day_recall_df = day_recall_df[:top]
+
     # 写入对应的redis
     now_dt = datetime.strftime(now_date, '%Y%m%d')
     day_video_ids = []
     day_recall_result = {}
-    for video_id in filtered_videos:
+    for video_id in day_recall_df['videoid'].to_list():
         score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
         day_recall_result[int(video_id)] = float(score)
         day_video_ids.append(int(video_id))
@@ -243,7 +241,7 @@ def timer_check():
     table = config_.TABLE_30DAY_APP_TYPE
     rule_params = config_.RULE_PARAMS_30DAY_APP_TYPE
     now_date = datetime.today()
-    log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
+    log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d')}")
     now_h = datetime.now().hour
     # 查看当前天级更新的数据是否已准备好
     data_count = data_check(project=project, table=table, now_date=now_date)
@@ -251,7 +249,7 @@ def timer_check():
         log_.info(f'day_by30day_data_count = {data_count}')
         # 数据准备好,进行更新
         rank(now_date=now_date, rule_params=rule_params, project=project, table=table)
-    elif now_h > 22:
+    elif now_h > 2:
         log_.info('day_by30day_recall data is None!')
         rank_bottom(now_date=now_date, rule_params=rule_params)
     else:

+ 7 - 0
rule_rank_day_by_30day_task.sh

@@ -0,0 +1,7 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rule_rank_day_by_30day.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rule_rank_day_by_30day.py
+fi

+ 42 - 1
videos_filter.py

@@ -885,6 +885,45 @@ def filter_whole_movies():
     log_.info("whole movies filter end!")
 
 
+def filter_day_30day():
+    """过滤小程序天级更新30天数据"""
+    log_.info("day_by_30day pool filter start ...")
+    # 获取当前日期
+    now_date = date.today().strftime('%Y%m%d')
+    rule_params = config_.RULE_PARAMS_30DAY_APP_TYPE
+    params_list = rule_params.get('params_list')
+    redis_helper = RedisHelper()
+    log_.info(f'now_date = {now_date}.')
+    for param in params_list:
+        data_key = param.get('data')
+        rule_key = param.get('rule')
+        log_.info(f"param = {param} videos filter start... ")
+        # 需过滤视频列表
+        key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
+        key_name = f"{key_prefix}{data_key}:{rule_key}:{now_date}"
+        log_.info(f"key_name: {key_name}")
+        # 获取视频
+        data = redis_helper.get_all_data_from_zset(key_name=key_name)
+        if data is None:
+            log_.info("data is None")
+            log_.info("filter end!")
+            continue
+        # 过滤
+        video_ids = [int(video_id) for video_id in data]
+        filtered_result = filter_video_status(video_ids=video_ids)
+        # 求差集,获取需要过滤掉的视频,并从redis中移除
+        filter_videos = set(video_ids) - set(filtered_result)
+        log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
+                                                                                      len(filtered_result),
+                                                                                      len(filter_videos)))
+        if len(filter_videos) == 0:
+            log_.info("filter end!")
+            continue
+        redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
+
+    log_.info("day_by_30day pool filter end!")
+
+
 def main():
     try:
         # ROV召回池视频过滤
@@ -900,7 +939,7 @@ def main():
         # 流量池视频过滤
         filter_flow_pool()
         # 兜底视频过滤
-        filter_bottom()
+        # filter_bottom()
         # 修改过ROV的视频过滤
         # filter_rov_updated()
         # filter_rov_updated_app()
@@ -927,6 +966,8 @@ def main():
         # filter_region_videos_24h()
         # 过滤完整电影数据
         # filter_whole_movies()
+        # 过滤小程序天级更新30天数据
+        filter_day_30day()
     except Exception as e:
         log_.error(traceback.format_exc())
         send_msg_to_feishu(