瀏覽代碼

add flowpool abtest: experimental_flow_set_level_score

liqian 1 年之前
父節點
當前提交
dd2a1113ed
共有 4 個文件被更改,包括 291 次插入1 次删除
  1. 2 1
      app.py
  2. 2 0
      config.py
  3. 69 0
      recommend.py
  4. 218 0
      video_recall.py

+ 2 - 1
app.py

@@ -30,7 +30,8 @@ log_ = Log()
 config_ = set_config()
 level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
 flow_pool_abtest_config = {'control_group': [7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
-                           'experimental_flow_set_level': []}
+                           'experimental_flow_set_level': [],
+                           'experimental_flow_set_level_score': []}
 
 
 def update_flow_pool_config():

+ 2 - 0
config.py

@@ -758,6 +758,8 @@ class BaseConfig(object):
     FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL = 'flow:pool:level:item:'
     # 流量池各层分发概率权重存放 redis key,完整格式 flow:pool:level:
     FLOWPOOL_LEVEL_WEIGHT_KEY_NAME = 'flow:pool:level:recommend:weight'
+    # 流量池视频分层存放(按分数排序) redis key前缀,完整格式 flow:pool:level:item:score:{appType}:{level}
+    FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE = 'flow:pool:level:item:score:'
     # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:{appType}:{flowPool_id}
     QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET = 'flow:pool:quick:item:'
     # 快速曝光流量池分发概率 redis key前缀,完整格式 flow:pool:quick:distribute:rate:{flowPool_id}

+ 69 - 0
recommend.py

@@ -236,6 +236,12 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
+    elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+                          size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+                          size, flow_pool_abtest_group=flow_pool_abtest_group)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall,
@@ -475,6 +481,12 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
              gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
+    elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+                          size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+                          size, flow_pool_abtest_group=flow_pool_abtest_group)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
              gevent.spawn(pool_recall.flow_pool_recall,
@@ -1037,6 +1049,8 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_w
             if flow_recall_video:
                 if flow_pool_abtest_group == 'experimental_flow_set_level':
                     update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
+                elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
+                    update_local_distribute_count_new_with_level_score(flow_recall_video, level_weight)
                 else:
                     update_local_distribute_count(flow_recall_video)
                 # update_local_distribute_count_new(flow_recall_video)
@@ -1288,6 +1302,61 @@ def update_local_distribute_count_new_with_level(videos, level_weight):
         log_.error(traceback.format_exc())
 
 
+def update_local_distribute_count_new_with_level_score(videos, level_weight):
+    """
+    更新本地分发数
+    :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
+                                    'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
+    :return:
+    """
+    try:
+        redis_helper = RedisHelper()
+        # level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+        # level_list = [level for level in json.loads(level_weight)]
+        if level_weight is None:
+            level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+        level_list = [level for level in level_weight]
+
+        for item in videos:
+            video_id, flow_pool = item['videoId'], item['flowPool']
+            key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+            # 本地记录的分发数 - 1
+            redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
+            # 对该视频做分发数检查
+            cur_count = redis_helper.get_data_from_redis(key_name=key_name)
+            # 无记录
+            if cur_count is None:
+                continue
+            # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+            if int(cur_count) <= 0:
+                add_remove_log = False
+                redis_helper.del_keys(key_name=key_name)
+                for app_name in config_.APP_TYPE:
+                    app_type = config_.APP_TYPE.get(app_name)
+                    flow_pool_key_list = [
+                        f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
+                    ]
+                    for level in level_list:
+                        flow_pool_key_list.append(f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}")
+                    for key in flow_pool_key_list:
+                        remove_res = redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}")
+                        if remove_res > 0:
+                            add_remove_log = True
+                if add_remove_log is True:
+                    log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
+
+            # if redis_helper.key_exists(key_name=key_name):
+            #     # 该视频本地有记录,本地记录的分发数 - 1
+            #     redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+            # else:
+            #     # 该视频本地无记录,接口获取的分发数 - 1
+            #     redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
+
+    except Exception as e:
+        log_.error('update_local_distribute_count error...')
+        log_.error(traceback.format_exc())
+
+
 def get_religion_class_with_mid(mid, religion_class_name):
     """
     判断用户是否属于对应的宗教类型

+ 218 - 0
video_recall.py

@@ -793,6 +793,128 @@ class PoolRecall(object):
 
         return flow_pool_recall_result[:size]
 
+    def flow_pool_recall_new_with_level_score(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
+        """从流量池中获取视频"""
+        # 获取存在城市分组数据的城市编码列表
+        city_code_list = [code for _, code in config_.CITY_CODE.items()]
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        # 获取cityCode
+        city_code = self.client_info.get('cityCode', '-1')
+
+        if city_code in city_code_list:
+            # 分城市数据存在时,获取城市分组数据
+            region_code = city_code
+        else:
+            region_code = province_code
+        if region_code == '':
+            region_code = '-1'
+
+        flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
+        if flow_pool_key is None:
+            return []
+        # print(flow_pool_key)
+        flow_pool_recall_result = []
+        flow_pool_recall_videos = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        idx = 0
+        while len(flow_pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
+                break
+            # 获取数据
+            # st_get = time.time()
+            data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
+                                                              start=idx, end=idx + get_size - 1,
+                                                              with_scores=True)
+            # et_get = time.time()
+            # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
+            #     freq, data, (et_get - st_get) * 1000))
+            if not data:
+                # log_.info('流量池中的视频已取完')
+                break
+            # 将video_id 与 flow_pool, score做mapping整理
+            video_ids = []
+            video_mapping = {}
+            video_score = {}
+            for value in data:
+                try:
+                    video_id, flow_pool = value[0].split('-')
+                except Exception as e:
+                    log_.error({
+                        'request_id': self.request_id,
+                        'app_type': self.app_type,
+                        'flow_pool_value': value
+                    })
+                    continue
+                video_id = int(video_id)
+                video_score[value[0]] = value[1]
+                if video_id not in video_ids:
+                    video_ids.append(video_id)
+                if video_id not in video_mapping:
+                    video_mapping[video_id] = [flow_pool]
+                else:
+                    video_mapping[video_id].append(flow_pool)
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
+                              region_code=region_code, shield_config=self.shield_config)
+            ge.join()
+            filtered_result = ge.get()
+            # 检查可分发数
+            if filtered_result:
+                # st_check = time.time()
+                ge = gevent.spawn(self.check_video_counts_new_with_level_score,
+                                  video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                ge.join()
+                check_result = ge.get()
+                # log_.info({
+                #     'logTimestamp': int(time.time() * 1000),
+                #     'request_id': self.request_id,
+                #     'app_type': self.app_type,
+                #     'mid': self.mid,
+                #     'uid': self.uid,
+                #     'operation': 'check_video_counts',
+                #     'executeTime': (time.time() - st_check) * 1000
+                # })
+
+                for item in check_result:
+                    video_id = int(item[0])
+                    flow_pool = item[1]
+                    if video_id not in flow_pool_recall_videos:
+                        # 取其中一个 flow_pool 作为召回结果
+                        # 添加视频源参数 pushFrom, abCode
+                        flow_pool_recall_result.append(
+                            {'videoId': video_id, 'flowPool': flow_pool, 'level': level,
+                             'rovScore': video_score[f"{video_id}-{flow_pool}"], 'pushFrom': config_.PUSH_FROM['flow_recall'],
+                             'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
+                        )
+
+                        flow_pool_recall_videos.append(video_id)
+                # et_check = time.time()
+                # log_.info('check result: result = {}, execute time = {}ms'.format(
+                #     check_result, (et_check - st_check) * 1000))
+
+                # # 判断错误标记, True为错误
+                # if error_flag:
+                #     # 结束流量池召回
+                #     break
+
+            idx += get_size
+
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'request_id': self.request_id,
+        #     'operation': 'flow_pool_recall',
+        #     'executeTime': (time.time() - start_time) * 1000
+        # })
+
+        return flow_pool_recall_result[:size]
+
     def check_video_counts(self, video_ids, flow_pool_mapping):
         """
         检查视频剩余可分发数
@@ -930,6 +1052,57 @@ class PoolRecall(object):
                         log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
         return check_result
 
+    def check_video_counts_new_with_level_score(self, video_ids, flow_pool_mapping):
+        """
+        检查视频剩余可分发数
+        :param video_ids: 视频id type-list
+        :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
+        :return: check_result, error_flag
+        """
+        # flow_pool_key = self.get_pool_redis_key('flow')
+        # videos = []
+        # level_weight = self.redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
+        # level_list = [level for level in json.loads(level_weight)]
+        if self.level_weight is None:
+            level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+        else:
+            level_weight = self.level_weight
+        level_list = [level for level in level_weight]
+
+        check_result = []
+        for video_id in video_ids:
+            video_id = int(video_id)
+            for flow_pool in flow_pool_mapping.get(video_id, []):
+                # 判断是否有本地分发记录
+                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+                # 无记录
+                if cur_count is None:
+                    # videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                    continue
+                # 本地分发数 cur_count > 0
+                elif cur_count > 0:
+                    check_result.append((video_id, flow_pool))
+                # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+                else:
+                    add_remove_log = False
+                    remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+                    self.redis_helper.del_keys(remain_count_key)
+                    value = '{}-{}'.format(video_id, flow_pool)
+                    for item in config_.APP_TYPE:
+                        for level in level_list:
+                            flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{config_.APP_TYPE.get(item)}:{level}"
+                            remove_res = self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
+                            if remove_res > 0:
+                                add_remove_log = True
+                        quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{config_.APP_TYPE.get(item)}" \
+                                              f":{config_.QUICK_FLOW_POOL_ID}"
+                        remove_res = self.redis_helper.remove_value_from_zset(key_name=quick_flow_pool_key, value=value)
+                        if remove_res > 0:
+                            add_remove_log = True
+                    if add_remove_log is True:
+                        log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
+        return check_result
+
     """
         # 本次视频都有本地记录
         if len(videos) == 0:
@@ -1083,6 +1256,51 @@ class PoolRecall(object):
                         continue
                 return None, None
 
+        elif pool_type == 'flow_set_level_score':
+            if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}", None
+            else:
+                # 1. 获取流量池各层级分发概率权重
+                if self.level_weight is None:
+                    level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
+                else:
+                    level_weight = self.level_weight
+                # print(level_weight)
+                # 2. 判断各层级是否有视频需分发
+                available_level = []
+                for level, weight in level_weight.items():
+                    level_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{self.app_type}:{level}"
+                    if self.redis_helper.key_exists(key_name=level_key):
+                        available_level.append((level, level_key, weight))
+                if len(available_level) == 0:
+                    return None, None
+                # 3. 根据可分发层级权重设置分发概率
+                available_level = sorted(available_level, key=lambda x: x[2], reverse=False)
+                weight_sum = sum([int(item[2]) for item in available_level])
+                level_p_mapping = {}
+                level_p_low = 0
+                weight_temp = 0
+                for item in available_level:
+                    level, level_key, weight = item[0], item[1], item[2]
+                    level_p_up = (weight_temp + weight)/weight_sum
+                    level_p_mapping[level] = {
+                        'key': level_key,
+                        'level_p': [round(level_p_low, 2), round(level_p_up, 2)]
+                    }
+                    level_p_low = round(level_p_up, 2)
+                    weight_temp += weight
+                # log_.info(f"level_p_mapping: {level_p_mapping}")
+                # 4. 随机生成[0,1)之间数,返回相应概率区间的key
+                random_p = random.random()
+                for level, level_info in level_p_mapping.items():
+                    level_p = level_info['level_p']
+                    if level_p[0] <= random_p < level_p[1]:
+                        # log_.info(f"random_p: {random_p}, level_p: {level_p}, level: {level}")
+                        return level_info['key'], level
+                    else:
+                        continue
+                return None, None
+
         elif pool_type == 'special':
             key_name_prefix = config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS
             # 判断列表是否更新,未更新则使用前一天的列表