Bladeren bron

Merge branch 'abtest-139-2022090815' into pre-master

liqian 2 jaren geleden
bovenliggende
commit
eebf027103
6 gewijzigde bestanden met toevoegingen van 340 en 106 verwijderingen
  1. 28 10
      config.py
  2. 74 13
      manager_op.py
  3. 9 7
      recommend.py
  4. 2 0
      user2new.py
  5. 49 38
      video_rank.py
  6. 178 38
      video_recall.py

+ 28 - 10
config.py

@@ -127,6 +127,7 @@ class BaseConfig(object):
             'abtest_121': 60022,
             'abtest_122': 60023,
             'abtest_130': 60024,
+            'abtest_139': 60025,
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -212,6 +213,10 @@ class BaseConfig(object):
             'data_key': 'data1', 'rule_key': 'rule4',
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_130')
         },  # 095实验基础上去除大列表,进行兜底策略优化
+        '139': {
+            'data_key': 'data1', 'rule_key': 'rule9', '30day_rule_key': 'rule1',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_139')
+        }
     }
 
     # 小程序小时级列表key不同实验标识
@@ -286,6 +291,7 @@ class BaseConfig(object):
         'whole_movies': 'whole_movies',  # 完整影视
         'talk_videos': 'talk_videos',  # 影视解说
         'special_mid': 'special_mid_videos',  # 特殊mid指定视频
+        'rov_recall_30day': 'recall_pool_30day',  # 天级更新相对30天列表
     }
 
     # category id mapping
@@ -327,9 +333,14 @@ class BaseConfig(object):
     # # 记录 mid-小时级key 中数据所属(date,h),完整格式:com.weiqu.video.h.region.24h.record.mid.{appType}.{mid}
     # H_WITH_MID_RECORD_KEY_NAME_PREFIX_REGION_24H = 'com.weiqu.video.h.region.24h.record.mid.'
 
-    # 用户上一次在 地域分组小时级 更新列表中对应的位置 redis key前缀,完整key格式:recall:pool:last:item:region:h:{appType}:{mid}
+    # 用户上一次在 相对30天 天级更新列表中对应的位置 redis key前缀,完整key格式:recall:last:item:30day:{appType}:{mid}
+    LAST_VIDEO_FROM_30DAY_PREFIX = 'recall:last:item:30day:'
+    # 记录 mid-上一次在 地域分组小时级 更新列表中对应的位置key 中数据所属(date,h),完整格式:recall:last:record:30day:{appType}:{mid}
+    RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY = 'recall:last:record:30day:'
+
+    # 用户上一次在 地域分组小时级 更新列表中对应的位置 redis key前缀,完整key格式:recall:last:item:region:h:{appType}:{mid}
     LAST_VIDEO_FROM_REGION_H_PREFIX = 'recall:last:item:region:h:'
-    # 记录 mid-上一次在 地域分组小时级 更新列表中对应的位置key 中数据所属(date,h),完整格式:recall:pool:last:record:region:h:{appType}:{mid}
+    # 记录 mid-上一次在 地域分组小时级 更新列表中对应的位置key 中数据所属(date,h),完整格式:recall:last:record:region:h:{appType}:{mid}
     RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_H = 'recall:last:record:region:h:'
 
     # 用户上一次在dup1 分地域24h更新列表中对应的位置 redis key前缀,完整key格式:recall:last:item:region:dup1:24h:{appType}:{mid}
@@ -378,6 +389,10 @@ class BaseConfig(object):
     # # 小时级视频状态不符合推荐要求的列表 redis key,完整格式:com.weiqu.video.filter.apptype.h.item.24h.{appType}.{data_key}.{rule_key}
     # H_VIDEO_FILER_24H = 'com.weiqu.video.filter.apptype.h.item.24h.'
 
+    # 小程序相对30天数据天级更新结果存放 redis key前缀,
+    # 完整格式:recall:item:score:30day:{data_key}:{rule_key}:{date}
+    RECALL_KEY_NAME_PREFIX_30DAY = 'recall:item:score:30day:'
+
     # 小程序地域分组小时级更新结果存放 redis key前缀,
     # 完整格式:recall:item:score:region:h:{region}:{data_key}:{rule_key}:{date}:{h}
     RECALL_KEY_NAME_PREFIX_REGION_BY_H = 'recall:item:score:region:h:'
@@ -542,22 +557,25 @@ class BaseConfig(object):
         0: {
             "dataListDesc": "全部", "dataListCode": 0,
             "keyPrefixList": [
-                {"dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
-                {"dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
-                {"dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
-                {"dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H},
+                {"dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_30DAY},
+                {"dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
+                {"dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
+                {"dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
+                {"dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H},
                 # {"dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H},
                 # {"dataListCode": 6, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H},
                 # {"dataListCode": 7, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
             ]
         },
-        1: {"dataListDesc": "地域小时级", "dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
-        2: {"dataListDesc": "地域相对24小时级", "dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
-        3: {"dataListDesc": "非地域相对24小时级", "dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
-        4: {"dataListDesc": "非地域相对24小时级列表2", "dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H},
+        1: {"dataListDesc": "相对30天数据列表", "dataListCode": 1, "keyPrefix": RECALL_KEY_NAME_PREFIX_30DAY},
+        2: {"dataListDesc": "地域小时级", "dataListCode": 2, "keyPrefix": RECALL_KEY_NAME_PREFIX_REGION_BY_H},
+        3: {"dataListDesc": "地域相对24小时级", "dataListCode": 3, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H},
+        4: {"dataListDesc": "非地域相对24小时级", "dataListCode": 4, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H},
+        5: {"dataListDesc": "非地域相对24小时级列表2", "dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H},
         # 5: {"dataListDesc": "非地域相对48小时级", "dataListCode": 5, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H},
         # 6: {"dataListDesc": "非地域相对48小时级列表2", "dataListCode": 6, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H},
         # 7: {"dataListDesc": "大列表", "dataListCode": 7, "keyPrefix": RECALL_KEY_NAME_PREFIX_DUP_REGION_H},
+
     }
 
     # VIDEO_DATA_LIST_AB_EXP_CODE_MAPPING = {

+ 74 - 13
manager_op.py

@@ -25,20 +25,29 @@ def get_video_list(ab_exp_code, search_time, data_list_type, region_code, page_n
         # data_count = 0
         result = {'code': -1, 'message': 'request data error!'}
         return result
-    search_time = ':'.join(search_time.split('.'))
+    search_time_seg = search_time.split('.')
+    if search_time_seg[1][0] == '0':
+        search_time_seg[1] = search_time_seg[1][1]
+    search_time = ':'.join(search_time_seg)
     data = []
     key_params = config_.AB_EXP_CODE.get(ab_exp_code)
     # key_params = config_.VIDEO_DATA_LIST_AB_EXP_CODE_MAPPING.get(ab_exp_code)
     # app_type = key_params.get('app_type')
     data_key = key_params.get('data_key')
     rule_key = key_params.get('rule_key')
+    rule_key_30day = key_params.get('30day_rule_key')
     if data_list_type == 0:
         # 视频数据表类型 选择为全部
         key_prefix_list = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefixList')
+        if ab_exp_code not in ['139']:
+            key_prefix_list = key_prefix_list[1:]
         for item in key_prefix_list:
             data_list_code = item.get('dataListCode')
             key_prefix = item.get('keyPrefix')
-            key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
+            if data_list_code == 1:
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
+            else:
+                key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
             key_data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=True)
             if key_data is None:
                 continue
@@ -56,8 +65,15 @@ def get_video_list(ab_exp_code, search_time, data_list_type, region_code, page_n
             data.extend(videos)
     else:
         key_prefix = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefix')
-        key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
-        key_data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=True)
+        if data_list_type == 1:
+            if ab_exp_code not in ['139']:
+                key_data = None
+            else:
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
+                key_data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=True)
+        else:
+            key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
+            key_data = redis_helper.get_all_data_from_zset(key_name=key_name, desc=True, with_scores=True)
         if key_data is None:
             videos = []
         else:
@@ -105,22 +121,28 @@ def search_video(ab_exp_code, search_time, data_list_type, region_code, video_id
         # data_count = 0
         result = {'code': -1, 'message': 'request data error!'}
         return result
-    search_time = ':'.join(search_time.split('.'))
+    search_time_seg = search_time.split('.')
+    if search_time_seg[1][0] == '0':
+        search_time_seg[1] = search_time_seg[1][1]
+    search_time = ':'.join(search_time_seg)
     data = []
     key_params = config_.AB_EXP_CODE.get(ab_exp_code)
     # key_params = config_.VIDEO_DATA_LIST_AB_EXP_CODE_MAPPING.get(ab_exp_code)
     # app_type = key_params.get('app_type')
     data_key = key_params.get('data_key')
     rule_key = key_params.get('rule_key')
+    rule_key_30day = key_params.get('30day_rule_key')
     if data_list_type == 0 and region_code == '0':
         # 视频数据表类型 和 地域 选择都为全部
         key_prefix_list = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefixList')
+        if ab_exp_code not in ['139']:
+            key_prefix_list = key_prefix_list[1:]
         code_list = [code for _, code in config_.REGION_CODE.items()]
         for item in key_prefix_list:
             data_list_code = item.get('dataListCode')
             key_prefix = item.get('keyPrefix')
-            for code in code_list:
-                key_name = f"{key_prefix}{code}:{data_key}:{rule_key}:{search_time}"
+            if data_list_code == 1:
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
                 rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
                 score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
                 if rank is None or score is None:
@@ -129,21 +151,43 @@ def search_video(ab_exp_code, search_time, data_list_type, region_code, video_id
                     'abExpCode': ab_exp_code,
                     'searchTime': search_time,
                     'dataListType': data_list_code,
-                    'regionCode': code,
+                    'regionCode': '0',
                     'videoId': int(video_id),
                     'score': float(score),
                     'rank': int(rank) + 1,
                 }
                 data.append(videos)
+            else:
+                for code in code_list:
+                    key_name = f"{key_prefix}{code}:{data_key}:{rule_key}:{search_time}"
+                    rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
+                    score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
+                    if rank is None or score is None:
+                        continue
+                    videos = {
+                        'abExpCode': ab_exp_code,
+                        'searchTime': search_time,
+                        'dataListType': data_list_code,
+                        'regionCode': code,
+                        'videoId': int(video_id),
+                        'score': float(score),
+                        'rank': int(rank) + 1,
+                    }
+                    data.append(videos)
         data.sort(key=lambda x: (x['dataListType'], x['regionCode']), reverse=False)
 
     elif data_list_type == 0:
         # 视频数据表类型 选择为全部
         key_prefix_list = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefixList')
+        if ab_exp_code not in ['139']:
+            key_prefix_list = key_prefix_list[1:]
         for item in key_prefix_list:
             data_list_code = item.get('dataListCode')
             key_prefix = item.get('keyPrefix')
-            key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
+            if data_list_code == 1:
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
+            else:
+                key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
             rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
             score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
             if rank is None or score is None:
@@ -165,7 +209,12 @@ def search_video(ab_exp_code, search_time, data_list_type, region_code, video_id
         code_list = [code for _, code in config_.REGION_CODE.items()]
         key_prefix = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefix')
         for code in code_list:
-            key_name = f"{key_prefix}{code}:{data_key}:{rule_key}:{search_time}"
+            if data_list_type == 1:
+                if ab_exp_code not in ['139']:
+                    continue
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
+            else:
+                key_name = f"{key_prefix}{code}:{data_key}:{rule_key}:{search_time}"
             rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
             score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
             if rank is None or score is None:
@@ -184,9 +233,17 @@ def search_video(ab_exp_code, search_time, data_list_type, region_code, video_id
 
     else:
         key_prefix = config_.VIDEO_DATA_LIST_MAPPING.get(data_list_type).get('keyPrefix')
-        key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
-        rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
-        score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
+        if data_list_type == 1:
+            if ab_exp_code not in ['139']:
+                rank, score = None, None
+            else:
+                key_name = f"{key_prefix}{data_key}:{rule_key_30day}:{search_time_seg[0]}"
+                rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
+                score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
+        else:
+            key_name = f"{key_prefix}{region_code}:{data_key}:{rule_key}:{search_time}"
+            rank = redis_helper.get_rank_with_value(key_name=key_name, value=int(video_id), desc=True)
+            score = redis_helper.get_score_with_value(key_name=key_name, value=int(video_id))
         if rank is not None and score is not None:
             videos = {
                 'abExpCode': ab_exp_code,
@@ -206,3 +263,7 @@ def search_video(ab_exp_code, search_time, data_list_type, region_code, video_id
         "data": data[(page_num - 1) * page_size:page_num * page_size]
     }
     return result
+
+
+if __name__ == '__main__':
+    get_video_list('139', '20220909.01', 0, '110000')

+ 9 - 7
recommend.py

@@ -138,7 +138,7 @@ def positon_duplicate(pos1_vids, pos2_vids, videos):
 
 def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
                     expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
-                    no_op_flag=False, old_video_index=-1, video_id=None, params=None):
+                    no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None):
     """
     首页线上推荐逻辑
     :param request_id: request_id
@@ -179,7 +179,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
                              client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                             params=params)
+                             params=params, rule_key_30day=rule_key_30day)
     # _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # # 小时级实验
     # if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
@@ -542,6 +542,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, page_type=0)
     expire_time = 3600
     rule_key = config_.AB_EXP_CODE['095'].get('rule_key')
     data_key = config_.AB_EXP_CODE['095'].get('data_key')
+    rule_key_30day = None
     no_op_flag = True
 
     # 获取实验配置
@@ -564,6 +565,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, page_type=0)
                     expire_time = 3600
                     rule_key = param.get('rule_key')
                     data_key = param.get('data_key')
+                    rule_key_30day = param.get('30day_rule_key')
                     no_op_flag = True
                     break
 
@@ -857,7 +859,7 @@ def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, page_type=0)
     #             data_key = 'data1'
     #             no_op_flag = True
 
-    return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index
+    return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day
 
 
 def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
@@ -923,7 +925,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
         return recommend_result
 
     # 普通mid推荐处理
-    top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index = \
+    top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day = \
         get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
@@ -945,7 +947,7 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
                              ab_code=ab_code, expire_time=expire_time,
                              rule_key=rule_key, data_key=data_key,
                              no_op_flag=no_op_flag, old_video_index=old_video_index,
-                             params=params)
+                             params=params, rule_key_30day=rule_key_30day)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1010,7 +1012,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
         # return rank_result
 
     # 普通mid推荐处理
-    top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index = \
+    top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day = \
         get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
@@ -1032,7 +1034,7 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
                              ab_code=ab_code, expire_time=expire_time,
                              rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
                              old_video_index=old_video_index, video_id=video_id,
-                             params=params)
+                             params=params, rule_key_30day=rule_key_30day)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,

+ 2 - 0
user2new.py

@@ -47,6 +47,8 @@ def user2new(app_type, mid, uid):
             config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_DUP1_24H,
             config_.LAST_VIDEO_FROM_REGION_H_PREFIX,
             config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_REGION_H,
+            config_.LAST_VIDEO_FROM_30DAY_PREFIX,
+            config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY,
         ]
         for h_key_prefix in h_key_prefix_list:
             h_key_name = f"{h_key_prefix}{app_type_item}:{mid}"

+ 49 - 38
video_rank.py

@@ -24,21 +24,27 @@ def video_rank(data, size, top_K, flow_pool_P):
         return None
     # 将各路召回的视频按照score从大到小排序
     # 最惊奇相关推荐相似视频
-    relevant_recall = [item for item in data['rov_pool_recall']
-                       if item.get('pushFrom') == config_.PUSH_FROM['top_video_relevant_appType_19']]
-    relevant_recall_rank = sorted(relevant_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # relevant_recall = [item for item in data['rov_pool_recall']
+    #                    if item.get('pushFrom') == config_.PUSH_FROM['top_video_relevant_appType_19']]
+    # relevant_recall_rank = sorted(relevant_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # 最惊奇完整影视视频
-    whole_movies_recall = [item for item in data['rov_pool_recall']
-                           if item.get('pushFrom') == config_.PUSH_FROM['whole_movies']]
-    whole_movies_recall_rank = sorted(whole_movies_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # whole_movies_recall = [item for item in data['rov_pool_recall']
+    #                        if item.get('pushFrom') == config_.PUSH_FROM['whole_movies']]
+    # whole_movies_recall_rank = sorted(whole_movies_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # 最惊奇影视解说视频
-    talk_videos_recall = [item for item in data['rov_pool_recall']
-                           if item.get('pushFrom') == config_.PUSH_FROM['talk_videos']]
-    talk_videos_recall_rank = sorted(talk_videos_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # talk_videos_recall = [item for item in data['rov_pool_recall']
+    #                        if item.get('pushFrom') == config_.PUSH_FROM['talk_videos']]
+    # talk_videos_recall_rank = sorted(talk_videos_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
 
     # 小时级更新数据
-    h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
-    h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
+    # h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+
+    # 相对30天天级规则更新数据
+    day_30_recall = [item for item in data['rov_pool_recall']
+                       if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_30day']]
+    day_30_recall_rank = sorted(day_30_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+
     # 地域分组小时级规则更新数据
     region_h_recall = [item for item in data['rov_pool_recall']
                          if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
@@ -49,9 +55,9 @@ def video_rank(data, size, top_K, flow_pool_P):
     region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
 
     # 地域分组天级规则更新数据
-    region_day_recall = [item for item in data['rov_pool_recall']
-                         if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
-    region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # region_day_recall = [item for item in data['rov_pool_recall']
+    #                      if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
+    # region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
 
     # 相对24h规则更新数据
     rule_24h_recall = [item for item in data['rov_pool_recall']
@@ -72,31 +78,35 @@ def video_rank(data, size, top_K, flow_pool_P):
     rule_48h_dup_recall_rank = sorted(rule_48h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
 
     # 天级规则更新数据
-    day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
-    day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
+    # day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
     # ROV召回池
-    rov_initial_recall = [
-        item for item in data['rov_pool_recall']
-        if item.get('pushFrom') not in
-           [config_.PUSH_FROM['top_video_relevant_appType_19'],
-            config_.PUSH_FROM['rov_recall_h'],
-            config_.PUSH_FROM['rov_recall_region_h'],
-            config_.PUSH_FROM['rov_recall_region_24h'],
-            config_.PUSH_FROM['rov_recall_region_day'],
-            config_.PUSH_FROM['rov_recall_24h'],
-            config_.PUSH_FROM['rov_recall_24h_dup'],
-            config_.PUSH_FROM['rov_recall_48h'],
-            config_.PUSH_FROM['rov_recall_48h_dup'],
-            config_.PUSH_FROM['rov_recall_day'],
-            config_.PUSH_FROM['whole_movies'],
-            config_.PUSH_FROM['talk_videos']]
-    ]
-    rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
-    rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
-                      region_h_recall_rank + region_24h_recall_rank + region_day_recall_rank + \
+    # rov_initial_recall = [
+    #     item for item in data['rov_pool_recall']
+    #     if item.get('pushFrom') not in
+    #        [config_.PUSH_FROM['top_video_relevant_appType_19'],
+    #         config_.PUSH_FROM['rov_recall_h'],
+    #         config_.PUSH_FROM['rov_recall_region_h'],
+    #         config_.PUSH_FROM['rov_recall_region_24h'],
+    #         config_.PUSH_FROM['rov_recall_region_day'],
+    #         config_.PUSH_FROM['rov_recall_24h'],
+    #         config_.PUSH_FROM['rov_recall_24h_dup'],
+    #         config_.PUSH_FROM['rov_recall_48h'],
+    #         config_.PUSH_FROM['rov_recall_48h_dup'],
+    #         config_.PUSH_FROM['rov_recall_day'],
+    #         config_.PUSH_FROM['whole_movies'],
+    #         config_.PUSH_FROM['talk_videos']]
+    # ]
+    # rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
+    # rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
+    #                   day_30_recall_rank + region_h_recall_rank + region_24h_recall_rank + \
+    #                   region_day_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank + \
+    #                   rule_48h_recall_rank + rule_48h_dup_recall_rank + \
+    #                   day_recall_rank + rov_initial_recall_rank
+    rov_recall_rank = day_30_recall_rank + \
+                      region_h_recall_rank + region_24h_recall_rank + \
                       rule_24h_recall_rank + rule_24h_dup_recall_rank + \
-                      rule_48h_recall_rank + rule_48h_dup_recall_rank + \
-                      day_recall_rank + rov_initial_recall_rank
+                      rule_48h_recall_rank + rule_48h_dup_recall_rank
     # 流量池
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
     # 对各路召回的视频进行去重
@@ -105,7 +115,8 @@ def video_rank(data, size, top_K, flow_pool_P):
     # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
     #     rov_recall_rank, flow_recall_rank))
 
-    rank_result = relevant_recall_rank
+    # rank_result = relevant_recall_rank
+    rank_result = []
 
     # 从ROV召回池中获取top k
     if len(rov_recall_rank) > 0:

+ 178 - 38
video_recall.py

@@ -16,7 +16,7 @@ config_ = set_config()
 class PoolRecall(object):
     """召回"""
     def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
-                 rule_key='', data_key='', no_op_flag=False, params=None):
+                 rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None):
         """
         初始化
         :param request_id: request_id
@@ -36,6 +36,7 @@ class PoolRecall(object):
         self.rule_key = rule_key
         self.data_key = data_key
         self.no_op_flag = no_op_flag
+        self.rule_key_30day = rule_key_30day
         self.redis_helper = RedisHelper(params=params)
 
     def copy_redis_zset_data(self, from_key_name, to_key_name):
@@ -1222,43 +1223,39 @@ class PoolRecall(object):
         if region_code == '':
             region_code = '-1'
 
-        # if self.ab_code in [config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1'),
-        #                     config_.AB_CODE['region_rank_by_h'].get('abtest_082'),
-        #                     config_.AB_CODE['region_rank_by_h'].get('abtest_112')]:
-
-        if region_code == '-1':
-            t = [
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-            ]
+        if self.ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_139'):
+            if region_code == '-1':
+                t = [
+                    gevent.spawn(self.recall_update_by_day, size, '30day'),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                ]
+            else:
+                t = [
+                    gevent.spawn(self.recall_update_by_day, size, '30day'),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                ]
         else:
-            t = [
-                # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
-                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
-                 gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
-                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
-                 gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
-            ]
-        # else:
-        #     if province_code == '-1':
-        #         # t = [gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h')]
-        #         t = [gevent.spawn(self.recall_region_dup_24h, province_code, size, '24h_dup2', expire_time)]
-        #
-        #     else:
-        #         t = [
-        #             # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
-        #              # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
-        #              # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
-        #              gevent.spawn(self.recall_region_dup_24h, province_code, size, 'region_h', expire_time),
-        #              gevent.spawn(self.recall_region_dup_24h, province_code, size, 'region_24h', expire_time),
-        #              gevent.spawn(self.recall_region_dup_24h, province_code, size, '24h_dup2', expire_time),
-        #
-        #              # gevent.spawn(self.region_dup_recall, province_code, 1, size, expire_time),
-        #              # gevent.spawn(self.region_dup_recall, province_code, 2, size, expire_time)
-        #              ]
+            if region_code == '-1':
+                t = [
+                    # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                    gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                ]
+            else:
+                t = [
+                    # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
+                     # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
+                     # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
+                     gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_h', expire_time),
+                     gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
+                     gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
+                     gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                ]
+
         gevent.joinall(t)
         region_recall_result_list = [i.get() for i in t]
         # 将已获取到的视频按顺序去重合并
@@ -1648,7 +1645,6 @@ class PoolRecall(object):
 
         return recall_result[:size]
 
-
     def update_last_video_record(self, record_key, pool_key_prefix, province_code):
         # 判断当前小时的小时级列表是否更新
         now_date = datetime.today()
@@ -1868,3 +1864,147 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         # })
         return pool_recall_result[:size]
+
+    def update_last_video_record_by_day(self, record_key, pool_key_prefix, expire_time):
+        # 判断当前日期的小时级列表是否更新
+        now_date = datetime.today()
+        now_dt = datetime.strftime(now_date, '%Y%m%d')
+        now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
+        if self.redis_helper.key_exists(key_name=now_pool_recall_key):
+            value = {'date': now_dt}
+            self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
+        else:
+            redis_dt = datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')
+            now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{redis_dt}"
+            value = {'date': redis_dt}
+            self.redis_helper.set_data_to_redis(key_name=record_key, value=str(value), expire_time=expire_time)
+        return now_pool_recall_key
+
+    def get_last_recommend_video_idx_by_day(self, record_key_prefix, pool_key_prefix, last_video_key_prefix, expire_time):
+        # 判断mid对应上一次视频位置 时间记录
+        record_key = f"{record_key_prefix}{self.app_type}:{self.mid}"
+        last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
+
+        if not self.redis_helper.key_exists(key_name=record_key):
+            # ###### 记录key不存在
+            self.redis_helper.del_keys(key_name=last_video_key)
+            idx = 0
+            pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
+                                                                   pool_key_prefix=pool_key_prefix,
+                                                                   expire_time=expire_time)
+        else:
+            # ###### 记录key存在,判断date
+            now_date = datetime.today()
+            # 获取记录的date
+            record = self.redis_helper.get_data_from_redis(key_name=record_key)
+            record_dt = eval(record).get('date')
+            now_dt = datetime.strftime(now_date, '%Y%m%d')
+            if record_dt == now_dt:
+                # 已获取当前日期数据
+                pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
+                idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
+            elif record_dt == datetime.strftime((now_date - timedelta(days=1)), '%Y%m%d'):
+                # 记录的dt - 当前dt = 1,判断当前h数据是否已更新
+                now_pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{now_dt}"
+                if self.redis_helper.key_exists(key_name=now_pool_recall_key):
+                    new_record = {'date': now_dt}
+                    self.redis_helper.set_data_to_redis(key_name=record_key,
+                                                        value=str(new_record),
+                                                        expire_time=expire_time)
+                    idx = 0
+                    self.redis_helper.del_keys(key_name=last_video_key)
+                    pool_recall_key = now_pool_recall_key
+                else:
+                    pool_recall_key = f"{pool_key_prefix}{self.data_key}:{self.rule_key_30day}:{record_dt}"
+                    idx = self.get_video_idx(pool_recall_key=pool_recall_key, last_video_key=last_video_key)
+            else:
+                idx = 0
+                self.redis_helper.del_keys(key_name=last_video_key)
+                pool_recall_key = self.update_last_video_record_by_day(record_key=record_key,
+                                                                       pool_key_prefix=pool_key_prefix,
+                                                                       expire_time=expire_time)
+
+        return pool_recall_key, idx
+
+    def recall_update_by_day(self, size=4, key_flag='', expire_time=24*3600):
+        """
+        从天级更新列表中获取视频
+        :param size: 获取视频个数
+        :param key_flag: 视频表标记
+        :param expire_time: 末位视频记录redis过期时间
+        :return:
+        """
+        if key_flag == '30day':
+            # 相对30天计算列表的筛选结果
+            # 视频列表
+            pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
+            # mid对应上一次视频位置 时间记录
+            record_key_prefix = config_.RECORD_KEY_NAME_PREFIX_LAST_VIDEO_30DAY
+            # mid对应上一次视频记录
+            last_video_key_prefix = config_.LAST_VIDEO_FROM_30DAY_PREFIX
+            push_from = config_.PUSH_FROM['rov_recall_30day']
+        else:
+            return []
+        # 获取相关redis key, 用户上一次在rov召回池对应的位置
+        pool_key, idx = self.get_last_recommend_video_idx_by_day(record_key_prefix=record_key_prefix,
+                                                                 pool_key_prefix=pool_key_prefix,
+                                                                 last_video_key_prefix=last_video_key_prefix,
+                                                                 expire_time=expire_time)
+        if not pool_key:
+            return []
+        recall_data = []
+        pool_recall_result = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        while len(pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_ROV_POOL:
+                break
+            # 获取数据
+            data = self.redis_helper.get_data_zset_with_index(key_name=pool_key,
+                                                              start=idx, end=idx + get_size - 1,
+                                                              with_scores=True)
+            if not data:
+                break
+            recall_data.extend(data)
+            # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
+            video_ids = []
+            video_score = {}
+            for value in data:
+                video_id = int(value[0])
+                video_ids.append(video_id)
+                video_score[video_id] = value[1]
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            ge = gevent.spawn(filter_.filter_videos)
+            ge.join()
+            filtered_result = ge.get()
+
+            if filtered_result:
+                # 添加视频源参数 pushFrom, abCode
+                temp_result = [{'videoId': int(item), 'rovScore': video_score[int(item)],
+                                'pushFrom': push_from, 'abCode': self.ab_code}
+                               for item in filtered_result if video_score.get(int(item)) is not None]
+                pool_recall_result.extend(temp_result)
+
+            idx += get_size
+
+        pool_recall_result.sort(key=lambda x: x.get('rovScore', 0), reverse=True)
+
+        if len(recall_data) > 0 and len(pool_recall_result) == 0 and self.mid:
+            # 召回数据不为空 & 过滤后结果为空 & mid不为空时,更新召回获取的末位视频id记录到定位的key中
+            last_video_key = f'{last_video_key_prefix}{self.app_type}:{self.mid}'
+            self.redis_helper.set_data_to_redis(key_name=last_video_key, value=recall_data[-1][0],
+                                                expire_time=expire_time)
+
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'request_id': self.request_id,
+        #     'operation': push_from,
+        #     'pool_recall_result': pool_recall_result,
+        #     'executeTime': (time.time() - start_time) * 1000
+        # })
+        return pool_recall_result[:size]