Bladeren bron

add rule rank by 48h data

liqian 2 jaren geleden
bovenliggende
commit
1fd7f33f35
8 gewijzigde bestanden met toevoegingen van 500 en 35 verwijderingen
  1. 12 3
      check_video_limit_distribute.py
  2. 49 0
      config.py
  3. 4 3
      redis_data_monitor.py
  4. 115 23
      region_rule_rank_h.py
  5. 6 2
      region_rule_rank_h_task.sh
  6. 1 1
      rule_rank_h_by_24h.py
  7. 307 0
      rule_rank_h_by_48h.py
  8. 6 3
      videos_filter.py

+ 12 - 3
check_video_limit_distribute.py

@@ -123,6 +123,14 @@ def process_with_region(app_type, data_key, rule_key, region, stop_distribute_vi
             config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H,  # 不区分地域相对24h列表2
             config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H,  # 大列表
         ]
+    elif rule_key == 'rule5':
+        key_prefix_list = [
+            config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,  # 地域分组小时级列表
+            config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H,  # 地域分组相对24h列表
+            config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H,  # 不区分地域相对48h列表
+            config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H,  # 不区分地域相对48h列表2
+            config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H,  # 大列表
+        ]
     else:
         key_prefix_list = [
             config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,  # 地域分组小时级列表
@@ -184,7 +192,7 @@ def process_with_region(app_type, data_key, rule_key, region, stop_distribute_vi
               f"videos check end!")
 
 
-def check_region_videos():
+def check_region_videos(rule_params):
     """检查限流视频分发数"""
     # 获取当前日期
     now_date = datetime.datetime.today()
@@ -201,7 +209,7 @@ def check_region_videos():
 
     # 对已超分发的视频进行移除
     region_code_list = [code for region, code in config_.REGION_CODE.items()]
-    rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
+    # rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
 
     for app_type, params in rule_params.items():
         log_.info(f"app_type = {app_type}")
@@ -236,4 +244,5 @@ def check_region_videos():
 
 
 if __name__ == '__main__':
-    check_region_videos()
+    check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE)
+    check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H)

+ 49 - 0
config.py

@@ -157,6 +157,26 @@ class BaseConfig(object):
     }
 
     # ##### 区分appType数据
+    # 小时级更新过去48h数据 loghubods.video_data_each_hour_dataset_48h_total_apptype
+    PROJECT_48H_APP_TYPE = 'loghubods'
+    TABLE_48H_APP_TYPE = 'video_data_each_hour_dataset_48h_total_apptype'
+
+    # 小时级更新过去48h数据规则参数
+    RULE_PARAMS_48H_APP_TYPE = {
+        APP_TYPE['VLOG']: {
+            'rule_params': {
+                'rule1': {'cal_score_func': 2, 'return_count': 100, 'platform_return_rate': 0.001,
+                          'view_type': 'preview'},
+            },
+            'data_params': {
+                'data1': [APP_TYPE['VLOG'], ],
+            },
+            'params_list': [
+                {'data': 'data1', 'rule': 'rule1'},
+            ],
+        },
+    }
+
     # 小时级更新过去24h数据 loghubods.video_data_each_hour_dataset_24h_total_apptype
     PROJECT_24H_APP_TYPE = 'loghubods'
     TABLE_24H_APP_TYPE = 'video_data_each_hour_dataset_24h_total_apptype'
@@ -521,6 +541,22 @@ class BaseConfig(object):
         },
     }
 
+    # 不区分地域数据使用相对48h数据
+    RULE_PARAMS_REGION_APP_TYPE_48H = {
+        APP_TYPE['VLOG']: {
+            'rule_params': {
+                'rule5': {'view_type': 'video-show-region', 'platform_return_rate': 0.001,
+                          'region_24h_rule_key': 'rule2', '48h_rule_key': 'rule1'},
+            },
+            'data_params': {
+                'data1': [APP_TYPE['VLOG'], ],
+            },
+            'params_list': [
+                {'data': 'data1', 'rule': 'rule5'},
+            ],
+        },
+    }
+
     # 老视频更新使用数据
     OLD_VIDEOS_PROJECT = 'loghubods'
     OLD_VIDEOS_TABLE = 'xcx_test_video'
@@ -555,6 +591,13 @@ class BaseConfig(object):
     # 完整格式:com.weiqu.video.recall.hot.item.score.dup.day.pre.{rule_key}.{date}
     RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE = 'com.weiqu.video.recall.hot.item.score.dup.day.pre.'
 
+    # 小程序小时级48h数据更新结果存放 redis key前缀,
+    # 完整格式:recall:item:score:apptype:48h:{appType}:{data_key}:{rule_key}:{date}:{h}
+    RECALL_KEY_NAME_PREFIX_BY_48H = 'recall:item:score:apptype:48h:'
+    # 小程序小时级48h数据 筛选后的剩余数据 更新结果存放 redis key前缀,
+    # 完整格式:recall:item:score:apptype:48h:other:{appType}:{data_key}:{rule_key}:{date}:{h}
+    RECALL_KEY_NAME_PREFIX_BY_48H_OTHER = 'recall:item:score:apptype:48h:other:'
+
     # 小程序小时级24h数据更新结果存放 redis key前缀,
     # 完整格式:recall:item:score:apptype:24h:{appType}:{data_key}:{rule_key}:{date}:{h}
     RECALL_KEY_NAME_PREFIX_BY_24H = 'recall:item:score:apptype:24h:'
@@ -585,6 +628,12 @@ class BaseConfig(object):
     # 小程序小时级24h数据 筛选后的剩余数据 更新结果 与 小程序24h更新结果/小程序地域分组24h更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:recall:item:score:apptype:region:dup3:24h:{region}:{appType}:{data_key}:{rule_key}:{date}:{h}
     RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H = 'recall:item:score:apptype:region:dup3:24h:'
+    # 小程序48h更新结果与 小程序地域分组24h更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
+    # 完整格式:recall:item:score:apptype:region:dup2:48h:{region}:{appType}:{data_key}:{rule_key}:{date}:{h}
+    RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H = 'recall:item:score:apptype:region:dup2:48h:'
+    # 小程序小时级48h数据 筛选后的剩余数据 更新结果 与 小程序48h更新结果/小程序地域分组24h更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
+    # 完整格式:recall:item:score:apptype:region:dup3:48h:{region}:{appType}:{data_key}:{rule_key}:{date}:{h}
+    RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H = 'recall:item:score:apptype:region:dup3:48h:'
     # 小程序离线ROV模型结果与 小程序天级更新结果/小程序地域分组天级更新结果/小程序地域分组小时级更新结果 去重后 存放 redis key前缀,
     # 完整格式:recall:item:score:apptype:region:dup4:rov:{region}:{appType}:{data_key}:{rule_key}:{date}:{h}
     RECALL_KEY_NAME_PREFIX_DUP_REGION_H = 'recall:item:score:apptype:region:dup4:rov:'

+ 4 - 3
redis_data_monitor.py

@@ -29,10 +29,10 @@ def rov_data_monitor(now_date, now_h):
             )
 
 
-def region_data_monitor(now_date, now_h):
+def region_data_monitor(now_date, now_h, rule_params):
     """地域分组数据"""
     # 地域分组小时级列表
-    rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
+    # rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
     key_prefix_dict = {
         '地域分组小时级数据': config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
         '地域分组相对24h去重后数据': config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H,
@@ -138,7 +138,8 @@ def get_redis_data_keys(now_date, now_h):
 
 def monitor(now_date, now_h):
     rov_data_monitor(now_date=now_date, now_h=now_h)
-    region_data_monitor(now_date=now_date, now_h=now_h)
+    region_data_monitor(now_date=now_date, now_h=now_h, rule_params=config_.RULE_PARAMS_REGION_APP_TYPE)
+    region_data_monitor(now_date=now_date, now_h=now_h, rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H)
     special_videos_monitor(now_date=now_date, now_h=now_h)
     # whole_movies_monitor(now_date=now_date, now_h=now_h)
 

+ 115 - 23
region_rule_rank_h.py

@@ -6,6 +6,8 @@
 
 import multiprocessing
 import os
+import sys
+
 import gevent
 import datetime
 import pandas as pd
@@ -135,7 +137,7 @@ def cal_score(df, param):
     return df
 
 
-def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key):
+def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key, rule_rank_h_flag):
     """
     获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
     :param df:
@@ -191,14 +193,98 @@ def video_rank(df, now_date, now_h, rule_key, param, region, app_type, data_key)
 
     region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
     by_24h_rule_key = param.get('24h_rule_key', None)
+    by_48h_rule_key = param.get('48h_rule_key', None)
     # 与其他召回视频池去重,存入对应的redis
     dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
-                 region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key,
-                 region=region, app_type=app_type, data_key=data_key)
+                 region_24h_rule_key=region_24h_rule_key, by_24h_rule_key=by_24h_rule_key, by_48h_rule_key=by_48h_rule_key,
+                 region=region, app_type=app_type, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag)
+
+
+def dup_data(h_video_ids, initial_key_name, dup_key_name, region):
+    redis_helper = RedisHelper()
+    if redis_helper.key_exists(key_name=initial_key_name):
+        initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
+        # 屏蔽视频过滤
+        initial_video_ids = [int(video_id) for video_id, _ in initial_data]
+        shield_key_name_list = config_.SHIELD_CONFIG.get(region, None)
+        if shield_key_name_list is not None:
+            initial_video_ids = filter_shield_video(video_ids=initial_video_ids, shield_key_name_list=shield_key_name_list)
+
+        dup_data = {}
+        for video_id, score in initial_data:
+            if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids:
+                dup_data[int(video_id)] = score
+                h_video_ids.append(int(video_id))
+
+        if len(dup_data) > 0:
+            redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=23 * 3600)
+            # 限流视频score调整
+            update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name)
+    return h_video_ids
 
 
-def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key, region, app_type, data_key):
+def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key, region, app_type, data_key, rule_rank_h_flag):
     """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
+    # ##### 去重更新地域分组小时级24h列表,并另存为redis中
+    region_24h_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{app_type}:{data_key}:{region_24h_rule_key}:" \
+        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    region_24h_dup_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
+                           dup_key_name=region_24h_dup_key_name, region=region)
+
+    if rule_rank_h_flag == '48h':
+
+        # ##### 去重小程序相对48h更新结果,并另存为redis中
+        h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{app_type}:{data_key}:{by_24h_rule_key}:" \
+                         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        h_48h_dup_key_name = \
+            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+            f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
+                               dup_key_name=h_48h_dup_key_name, region=region)
+
+        # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
+        if by_48h_rule_key == 'rule1':
+            other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{app_type}:{data_key}:" \
+                                   f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            other_h_48h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
+                                   dup_key_name=other_h_48h_dup_key_name, region=region)
+
+    else:
+        # ##### 去重小程序相对24h更新结果,并另存为redis中
+        h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}:{data_key}:{by_24h_rule_key}:" \
+                         f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        h_24h_dup_key_name = \
+            f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+            f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+        h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
+                               dup_key_name=h_24h_dup_key_name, region=region)
+
+        # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
+        if by_24h_rule_key == 'rule3':
+            other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{app_type}:{data_key}:" \
+                                   f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            other_h_24h_dup_key_name = \
+                f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+                f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+            h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
+                                   dup_key_name=other_h_24h_dup_key_name, region=region)
+
+    # ##### 去重小程序模型更新结果,并另存为redis中
+    model_key_name = get_rov_redis_key(now_date=now_date)
+    model_data_dup_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}{region}:{app_type}:{data_key}:{rule_key}:" \
+        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+    h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=model_key_name,
+                           dup_key_name=model_data_dup_key_name, region=region)
+
+    """
     redis_helper = RedisHelper()
     # # ##### 去重更新地域分组天级列表,并另存为redis中
     # region_day_key_name = \
@@ -220,10 +306,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
     #     if len(region_day_dup) > 0:
     #         redis_helper.add_data_with_zset(key_name=region_day_dup_key_name, data=region_day_dup, expire_time=23 * 3600)
 
-    # ##### 去重更新地域分组小时级24h列表,并另存为redis中
-    region_24h_key_name = \
-        f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{app_type}:{data_key}:{region_24h_rule_key}:" \
-        f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+
     if redis_helper.key_exists(key_name=region_24h_key_name):
         region_24h_data = redis_helper.get_all_data_from_zset(key_name=region_24h_key_name, with_scores=True)
         # log_.info(f'region 24h data count = {len(region_24h_data)}')
@@ -269,6 +352,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
     #     if len(day_dup) > 0:
     #         redis_helper.add_data_with_zset(key_name=day_dup_key_name, data=day_dup, expire_time=23 * 3600)
 
+
     # ##### 去重小程序相对24h更新结果,并另存为redis中
     day_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{app_type}:{data_key}:{by_24h_rule_key}:" \
                    f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
@@ -353,6 +437,7 @@ def dup_to_redis(h_video_ids, now_date, now_h, rule_key, region_24h_rule_key, by
         redis_helper.add_data_with_zset(key_name=model_data_dup_key_name, data=model_data_dup, expire_time=23 * 3600)
         # 限流视频score调整
         update_limit_video_score(initial_videos=model_data_dup, key_name=model_data_dup_key_name)
+    """
 
 
 def merge_df(df_left, df_right):
@@ -373,19 +458,19 @@ def merge_df(df_left, df_right):
     return df_merged[feature_list]
 
 
-def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h):
+def process_with_region(region, df_merged, app_type, data_key, rule_key, rule_param, now_date, now_h, rule_rank_h_flag):
     log_.info(f"region = {region} start...")
     # 计算score
     region_df = df_merged[df_merged['code'] == region]
     log_.info(f'region = {region}, region_df count = {len(region_df)}')
     score_df = cal_score(df=region_df, param=rule_param)
     video_rank(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
-               region=region, app_type=app_type, data_key=data_key)
+               region=region, app_type=app_type, data_key=data_key, rule_rank_h_flag=rule_rank_h_flag)
     log_.info(f"region = {region} end!")
 
 
 
-def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h):
+def process_with_app_type(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag):
     log_.info(f"app_type = {app_type} start...")
     data_params_item = params.get('data_params')
     rule_params_item = params.get('rule_params')
@@ -403,7 +488,7 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
         task_list.extend(
             [
                 gevent.spawn(process_with_region, region, df_merged, app_type, data_key, rule_key, rule_param,
-                             now_date, now_h)
+                             now_date, now_h, rule_rank_h_flag)
                 for region in region_code_list
             ]
         )
@@ -429,7 +514,7 @@ def process_with_app_type(app_type, params, region_code_list, feature_df, now_da
     # gevent.joinall(task_list)
 
 
-def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
+def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
     # 获取特征数据
     feature_df = get_feature_data(project=project, table=table, now_date=now_date)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
@@ -442,7 +527,7 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     pool = multiprocessing.Pool(processes=len(config_.APP_TYPE))
     for app_type, params in rule_params.items():
         pool.apply_async(func=process_with_app_type,
-                         args=(app_type, params, region_code_list, feature_df, now_date, now_h))
+                         args=(app_type, params, region_code_list, feature_df, now_date, now_h, rule_rank_h_flag))
     pool.close()
     pool.join()
 
@@ -512,7 +597,7 @@ def rank_by_h(project, table, now_date, now_h, rule_params, region_code_list):
     #                   )
 
 
-def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
+def h_rank_bottom(now_date, now_h, rule_params, region_code_list, rule_rank_h_flag):
     """未按时更新数据,用上一小时结果作为当前小时的数据"""
     # 获取rov模型结果
     redis_helper = RedisHelper()
@@ -534,6 +619,8 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
             rule_param = rule_params_item.get(rule_key)
             log_.info(f"data_key = {data_key}, rule_key = {rule_key}, rule_param = {rule_param}")
             region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
+            by_24h_rule_key = rule_param.get('24h_rule_key', None)
+            by_48h_rule_key = rule_param.get('48h_rule_key', None)
             for region in region_code_list:
                 log_.info(f"region = {region}")
                 key_name = f"{key_prefix}{region}:{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
@@ -554,12 +641,11 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
                 # redis_helper.del_keys(
                 #     key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}")
 
-                region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
-                by_24h_rule_key = param.get('24h_rule_key', None)
                 # 与其他召回视频池去重,存入对应的redis
                 dup_to_redis(h_video_ids=h_video_ids, now_date=now_date, now_h=now_h, rule_key=rule_key,
                              region_24h_rule_key=region_24h_rule_key, region=region,
-                             app_type=app_type, data_key=data_key, by_24h_rule_key=by_24h_rule_key)
+                             app_type=app_type, data_key=data_key, by_24h_rule_key=by_24h_rule_key,
+                             by_48h_rule_key=by_48h_rule_key, rule_rank_h_flag=rule_rank_h_flag)
 
 
         # for data_key, data_param in params['data_params'].items():
@@ -592,16 +678,21 @@ def h_rank_bottom(now_date, now_h, rule_params, region_code_list):
 
 
 def h_timer_check():
-    rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
+    rule_rank_h_flag = sys.argv[1]
+    if rule_rank_h_flag == '48h':
+        rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
+    else:
+        rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
     project = config_.PROJECT_REGION_APP_TYPE
     table = config_.TABLE_REGION_APP_TYPE
     region_code_list = [code for region, code in region_code.items()]
     now_date = datetime.datetime.today()
-    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
+    log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
     now_h = datetime.datetime.now().hour
     now_min = datetime.datetime.now().minute
     if now_h == 0:
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
+        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
+                      rule_rank_h_flag=rule_rank_h_flag)
         return
     # 查看当前小时更新的数据是否已准备好
     h_data_count = h_data_check(project=project, table=table, now_date=now_date)
@@ -609,11 +700,12 @@ def h_timer_check():
         log_.info(f'region_h_data_count = {h_data_count}')
         # 数据准备好,进行更新
         rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params,
-                  project=project, table=table, region_code_list=region_code_list)
+                  project=project, table=table, region_code_list=region_code_list, rule_rank_h_flag=rule_rank_h_flag)
         log_.info(f"region_h_data end!")
     elif now_min > 50:
         log_.info('h_recall data is None, use bottom data!')
-        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list)
+        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params, region_code_list=region_code_list,
+                      rule_rank_h_flag=rule_rank_h_flag)
         log_.info(f"region_h_data end!")
     else:
         # 数据没准备好,1分钟后重新检查

+ 6 - 2
region_rule_rank_h_task.sh

@@ -3,9 +3,13 @@ echo $ROV_OFFLINE_ENV
 if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
     cd /data2/rov-offline && /root/anaconda3/bin/python /data2/rov-offline/rule_rank_h_by_24h.py &&
      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h_by24h.py &&
-      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py
+      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '24h' &&
+     /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h_by48h.py &&
+      /root/anaconda3/bin/python /data2/rov-offline/region_rule_rank_h.py '48h'
 elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
     cd /data/rov-offline && /root/anaconda3/bin/python /data/rov-offline/rule_rank_h_by_24h.py &&
      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h_by24h.py &&
-      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py
+      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '24h' &&
+     /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h_by48h.py &&
+      /root/anaconda3/bin/python /data/rov-offline/region_rule_rank_h.py '48h'
 fi

+ 1 - 1
rule_rank_h_by_24h.py

@@ -287,7 +287,7 @@ def h_rank_bottom(now_date, now_h, rule_params):
     else:
         redis_dt = datetime.strftime(now_date, '%Y%m%d')
         redis_h = now_h - 1
-    key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H]
+    key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER]
     for app_type, params in rule_params.items():
         log_.info(f"app_type = {app_type}")
         for param in params.get('params_list'):

+ 307 - 0
rule_rank_h_by_48h.py

@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+# @ModuleName: rule_rank_h_by_48h
+# @Author: Liqian
+# @Time: 2022/8/8 15:42
+# @Software: PyCharm
+
+import pandas as pd
+import math
+from functools import reduce
+from odps import ODPS
+from threading import Timer
+from datetime import datetime, timedelta
+from get_data import get_data_from_odps
+from db_helper import RedisHelper
+from utils import filter_video_status, check_table_partition_exits
+from config import set_config
+from log import Log
+
+config_, _ = set_config()
+log_ = Log()
+
+features = [
+    'apptype',
+    'videoid',
+    'preview人数',  # 过去48h预曝光人数
+    'view人数',  # 过去48h曝光人数
+    'play人数',  # 过去48h播放人数
+    'share人数',  # 过去48h分享人数
+    '回流人数',  # 过去48h分享,过去48h回流人数
+    'preview次数',  # 过去48h预曝光次数
+    'view次数',  # 过去48h曝光次数
+    'play次数',  # 过去48h播放次数
+    'share次数',  # 过去48h分享次数
+    'platform_return',
+    'platform_preview',
+    'platform_preview_total',
+    'platform_show',
+    'platform_show_total',
+    'platform_view',
+    'platform_view_total',
+]
+
+
+def h_data_check(project, table, now_date, now_h):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        # 23点开始到8点之前(不含8点),全部用22点生成那个列表
+        if now_h == 23:
+            dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
+        elif now_h < 8:
+            dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
+        else:
+            dt = datetime.strftime(now_date, '%Y%m%d%H')
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
+def get_feature_data(now_date, now_h, project, table):
+    """获取特征数据"""
+    # 23点开始到8点之前(不含8点),全部用22点生成那个列表
+    if now_h == 23:
+        dt = datetime.strftime(now_date - timedelta(hours=1), '%Y%m%d%H')
+    elif now_h < 8:
+        dt = f"{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}22"
+    else:
+        dt = datetime.strftime(now_date, '%Y%m%d%H')
+    log_.info({'feature_dt': dt})
+    # dt = '20220425'
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+def cal_score1(df):
+    # score1计算公式: score = 回流人数/(view人数+10000)
+    df = df.fillna(0)
+    df['score'] = df['回流人数'] / (df['view人数'] + 1000)
+    df = df.sort_values(by=['score'], ascending=False)
+    return df
+
+
+def cal_score2(df, param):
+    # score2计算公式: score = share次数/(view+1000)+0.01*return/(share次数+100)
+    df = df.fillna(0)
+    if param.get('view_type', None) == 'video-show':
+        df['share_rate'] = df['share次数'] / (df['platform_show'] + 1000)
+    elif param.get('view_type', None) == 'preview':
+        df['share_rate'] = df['share次数'] / (df['preview人数'] + 1000)
+    else:
+        df['share_rate'] = df['share次数'] / (df['view人数'] + 1000)
+    df['back_rate'] = df['回流人数'] / (df['share次数'] + 100)
+    df['score'] = df['share_rate'] + 0.01 * df['back_rate']
+    df['platform_return_rate'] = df['platform_return'] / df['回流人数']
+    df = df.sort_values(by=['score'], ascending=False)
+    return df
+
+
+def video_rank_h(df, now_date, now_h, rule_key, param, app_type, data_key):
+    """
+    获取符合进入召回源条件的视频,与每日更新的rov模型结果视频列表进行合并
+    :param df:
+    :param now_date:
+    :param now_h:
+    :param rule_key: 天级规则数据进入条件
+    :param param: 天级规则数据进入条件参数
+    :param app_type:
+    :param data_key: 使用数据标识
+    :return:
+    """
+    redis_helper = RedisHelper()
+    log_.info(f"app_type = {app_type}, videos_count = {len(df)}")
+
+    # videoid重复时,保留分值高
+    df = df.sort_values(by=['score'], ascending=False)
+    df = df.drop_duplicates(subset=['videoid'], keep='first')
+    df['videoid'] = df['videoid'].astype(int)
+
+    # 获取符合进入召回源条件的视频
+    return_count = param.get('return_count')
+    if return_count:
+        day_recall_df = df[df['回流人数'] > return_count]
+    else:
+        day_recall_df = df
+    platform_return_rate = param.get('platform_return_rate', 0)
+    day_recall_df = day_recall_df[day_recall_df['platform_return_rate'] > platform_return_rate]
+
+    day_recall_videos = day_recall_df['videoid'].to_list()
+    log_.info(f'h_by48h_recall videos count = {len(day_recall_videos)}')
+
+    # 视频状态过滤
+    filtered_videos = filter_video_status(day_recall_videos)
+    log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
+
+    # 写入对应的redis
+    now_dt = datetime.strftime(now_date, '%Y%m%d')
+    day_video_ids = []
+    day_recall_result = {}
+    for video_id in filtered_videos:
+        score = day_recall_df[day_recall_df['videoid'] == video_id]['score']
+        day_recall_result[int(video_id)] = float(score)
+        day_video_ids.append(int(video_id))
+    h_48h_recall_key_name = \
+        f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{app_type}:{data_key}:{rule_key}:{now_dt}:{now_h}"
+    if len(day_recall_result) > 0:
+        log_.info(f"count = {len(day_recall_result)}")
+        redis_helper.add_data_with_zset(key_name=h_48h_recall_key_name, data=day_recall_result, expire_time=2 * 3600)
+
+    if rule_key == 'rule1':
+        # 去重筛选结果,保留剩余数据并写入Redis
+        all_videos = df['videoid'].to_list()
+        log_.info(f'h_by48h_recall all videos count = {len(all_videos)}')
+        # 视频状态过滤
+        all_filtered_videos = filter_video_status(all_videos)
+        log_.info(f'all_filtered_videos count = {len(all_filtered_videos)}')
+        # 与筛选结果去重
+        other_videos = [video for video in all_filtered_videos if video not in day_video_ids]
+        log_.info(f'other_videos count = {len(other_videos)}')
+        # 写入对应的redis
+        other_48h_recall_result = {}
+        for video_id in other_videos:
+            score = df[df['videoid'] == video_id]['score']
+            other_48h_recall_result[int(video_id)] = float(score)
+        other_h_48h_recall_key_name = \
+            f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{app_type}:{data_key}:{rule_key}:{now_dt}:{now_h}"
+        if len(other_48h_recall_result) > 0:
+            log_.info(f"count = {len(other_48h_recall_result)}")
+            redis_helper.add_data_with_zset(key_name=other_h_48h_recall_key_name, data=other_48h_recall_result,
+                                            expire_time=2 * 3600)
+
+
+def merge_df(df_left, df_right):
+    """
+    df按照videoid 合并,对应特征求和
+    :param df_left:
+    :param df_right:
+    :return:
+    """
+    df_merged = pd.merge(df_left, df_right, on=['videoid'], how='outer', suffixes=['_x', '_y'])
+    df_merged.fillna(0, inplace=True)
+    feature_list = ['videoid']
+    for feature in features:
+        if feature in ['apptype', 'videoid']:
+            continue
+        df_merged[feature] = df_merged[f'{feature}_x'] + df_merged[f'{feature}_y']
+        feature_list.append(feature)
+    return df_merged[feature_list]
+
+
+def rank_by_h(now_date, now_h, rule_params, project, table):
+    # 获取特征数据
+    feature_df = get_feature_data(now_date=now_date, now_h=now_h, project=project, table=table)
+    feature_df['apptype'] = feature_df['apptype'].astype(int)
+    # rank
+    for app_type, params in rule_params.items():
+        log_.info(f"app_type = {app_type}")
+        data_params_item = params.get('data_params')
+        rule_params_item = params.get('rule_params')
+        for param in params.get('params_list'):
+            data_key = param.get('data')
+            data_param = data_params_item.get(data_key)
+            log_.info(f"data_key = {data_key}, data_param = {data_param}")
+            df_list = [feature_df[feature_df['apptype'] == apptype] for apptype in data_param]
+            df_merged = reduce(merge_df, df_list)
+
+            rule_key = param.get('rule')
+            rule_param = rule_params_item.get(rule_key)
+            log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
+            # 计算score
+            cal_score_func = rule_param.get('cal_score_func', 1)
+            if cal_score_func == 2:
+                score_df = cal_score2(df=df_merged, param=rule_param)
+            else:
+                score_df = cal_score1(df=df_merged)
+            video_rank_h(df=score_df, now_date=now_date, now_h=now_h, rule_key=rule_key, param=rule_param,
+                         app_type=app_type, data_key=data_key)
+
+    #     # to-csv
+    #     score_filename = f"score_by48h_{key}_{datetime.strftime(now_date, '%Y%m%d%H')}.csv"
+    #     score_df.to_csv(f'./data/{score_filename}')
+    #     # to-logs
+    #     log_.info({"date": datetime.strftime(now_date, '%Y%m%d%H'),
+    #                "redis_key_prefix": config_.RECALL_KEY_NAME_PREFIX_BY_48H,
+    #                "rule_key": key,
+    #                # "score_df": score_df[['videoid', 'score']]
+    #                })
+
+
+def h_rank_bottom(now_date, now_h, rule_params):
+    """未按时更新数据,用模型召回数据作为当前的数据"""
+    redis_helper = RedisHelper()
+    if now_h == 0:
+        redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
+        redis_h = 23
+    else:
+        redis_dt = datetime.strftime(now_date, '%Y%m%d')
+        redis_h = now_h - 1
+    key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_48H, config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER]
+    for app_type, params in rule_params.items():
+        log_.info(f"app_type = {app_type}")
+        for param in params.get('params_list'):
+            data_key = param.get('data')
+            rule_key = param.get('rule')
+            log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
+            for key_prefix in key_prefix_list:
+                key_name = f"{key_prefix}{app_type}:{data_key}:{rule_key}:{redis_dt}:{redis_h}"
+                initial_data = redis_helper.get_all_data_from_zset(key_name=key_name, with_scores=True)
+                if initial_data is None:
+                    initial_data = []
+                final_data = dict()
+                for video_id, score in initial_data:
+                    final_data[video_id] = score
+                # 存入对应的redis
+                final_key_name = \
+                    f"{key_prefix}{app_type}:{data_key}:{rule_key}:{datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
+                if len(final_data) > 0:
+                    redis_helper.add_data_with_zset(key_name=final_key_name, data=final_data, expire_time=2 * 3600)
+
+
+def h_timer_check():
+    project = config_.PROJECT_48H_APP_TYPE
+    table = config_.TABLE_48H_APP_TYPE
+    rule_params = config_.RULE_PARAMS_48H_APP_TYPE
+    now_date = datetime.today()
+    log_.info(f"now_date: {datetime.strftime(now_date, '%Y%m%d%H')}")
+    now_min = datetime.now().minute
+    now_h = datetime.now().hour
+    # 查看当前天级更新的数据是否已准备好
+    h_data_count = h_data_check(project=project, table=table, now_date=now_date, now_h=now_h)
+    if h_data_count > 0:
+        log_.info(f'h_by48h_data_count = {h_data_count}')
+        # 数据准备好,进行更新
+        rank_by_h(now_date=now_date, now_h=now_h, rule_params=rule_params, project=project, table=table)
+    elif now_min > 50:
+        log_.info('h_by48h_recall data is None!')
+        h_rank_bottom(now_date=now_date, now_h=now_h, rule_params=rule_params)
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(60, h_timer_check).start()
+
+
+if __name__ == '__main__':
+    h_timer_check()

+ 6 - 3
videos_filter.py

@@ -578,6 +578,8 @@ def filter_process_with_region(app_type, data_key, rule_key, region, now_date, n
         # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H,
         config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H,
         config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H,
+        config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H,
+        config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H,
         config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H
     ]
     for i, key_prefix in enumerate(key_prefix_list):
@@ -623,10 +625,10 @@ def filter_process_with_region(app_type, data_key, rule_key, region, now_date, n
               f"videos filter end!")
 
 
-def filter_region_videos():
+def filter_region_videos(rule_params):
     """过滤地域分组规则视频"""
     region_code_list = [code for region, code in region_code.items()]
-    rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
+    # rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
     log_.info("region_h videos filter start ...")
     redis_helper = RedisHelper()
     # 获取当前日期
@@ -905,7 +907,8 @@ def main():
         # 过滤老视频数据
         # filter_old_videos()
         # 过滤地域分组小时级视频
-        filter_region_videos()
+        filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE)
+        filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H)
         # 过滤地域分组天级视频
         # filter_region_videos_by_day()
         # 过滤小时级更新24h视频