Browse Source

update flowpool recall

liqian 1 year ago
parent
commit
2a65ad5970
4 changed files with 278 additions and 37 deletions
  1. 4 0
      config.py
  2. 13 4
      db_helper.py
  3. 95 33
      recommend.py
  4. 166 0
      video_recall.py

+ 4 - 0
config.py

@@ -702,6 +702,10 @@ class BaseConfig(object):
     FLOWPOOL_KEY_NAME_PREFIX = 'flow:pool:item:score:'
     # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:score:{appType}:{flowPool_id}
     QUICK_FLOWPOOL_KEY_NAME_PREFIX = 'flow:pool:quick:item:score:'
+    # 流量池离线模型结果存放 redis key前缀,完整格式 flow:pool:item:{appType}
+    FLOWPOOL_KEY_NAME_PREFIX_SET = 'flow:pool:item:'
+    # 快速曝光流量池数据存放 redis key前缀,完整格式 flow:pool:quick:item:{appType}:{flowPool_id}
+    QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET = 'flow:pool:quick:item:'
     # 快速曝光流量池分发概率 redis key前缀,完整格式 flow:pool:quick:distribute:rate:{flowPool_id}
     QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX = 'flow:pool:quick:distribute:rate:'
 

+ 13 - 4
db_helper.py

@@ -362,7 +362,7 @@ class RedisHelper(object):
         """
         # start_time = time.time()
         conn = self.connect()
-        conn.srem(key_name, *values)
+        res = conn.srem(key_name, *values)
         # if self.params is not None:
         #     log_.info({
         #         'logTimestamp': int(time.time() * 1000),
@@ -370,6 +370,7 @@ class RedisHelper(object):
         #         'operation': 'remove_value_from_set',
         #         'executeTime': (time.time() - start_time) * 1000
         #     })
+        return res
 
     def decr_key(self, key_name, amount=1, expire_time=30*60):
         """
@@ -529,7 +530,15 @@ if __name__ == '__main__':
     # redis_helper.add_data_with_set(key_name=key_name, values=values, expire_time=30 * 60)
     # res = redis_helper.get_data_from_set(key_name=key_name)
 
-    res = redis_helper.remove_value_from_zset(
-        key_name="recall:item:score:region:dup3:24h:110000:data1:rule4:20230315:14",
-        value=111111)
+    key_name = f"previewed:videos:5:aan7"
+    res = redis_helper.get_data_with_count_from_set(
+        key_name=key_name,
+        count=20)
+    print(res)
+    res1 = redis_helper.remove_value_from_set(key_name=key_name, values=tuple({'2881413'}))
+    print(res1)
+    res = redis_helper.get_data_with_count_from_set(
+        key_name=key_name,
+        count=20)
     print(res)
+

+ 95 - 33
recommend.py

@@ -229,8 +229,8 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
             t.append(gevent.spawn(pool_recall.get_return_video_reall))
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
-             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
-             gevent.spawn(pool_recall.flow_pool_recall, size)]
+             gevent.spawn(pool_recall.flow_pool_recall_new, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall_new, size)]
         if ab_code==60058:
             t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
             t.append(gevent.spawn(pool_recall.get_play_reall, mid))
@@ -458,8 +458,8 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
         #     t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
-             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
-             gevent.spawn(pool_recall.flow_pool_recall, size)]
+             gevent.spawn(pool_recall.flow_pool_recall_new, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall_new, size)]
     if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073:
         t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
     elif ab_code == 60056 or ab_code == 60071:
@@ -1008,7 +1008,8 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
             else:
                 flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
             if flow_recall_video:
-                update_local_distribute_count(flow_recall_video)
+                # update_local_distribute_count(flow_recall_video)
+                update_local_distribute_count_new(flow_recall_video)
                 # log_.info('update local distribute count success!')
 
         # 限流视频分发数记录
@@ -1146,6 +1147,61 @@ def update_local_distribute_count(videos):
         log_.error(traceback.format_exc())
 
 
+def update_local_distribute_count_new(videos):
+    """
+    更新本地分发数
+    :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
+                                    'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
+    :return:
+    """
+    try:
+        redis_helper = RedisHelper()
+        for item in videos:
+            video_id, flow_pool = item['videoId'], item['flowPool']
+            key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+            # 本地记录的分发数 - 1
+            redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
+            # 对该视频做分发数检查
+            cur_count = redis_helper.get_data_from_redis(key_name=key_name)
+            # 无记录
+            if cur_count is None:
+                continue
+            # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+            if int(cur_count) <= 0:
+                add_remove_log = False
+                redis_helper.del_keys(key_name=key_name)
+                for app_name in config_.APP_TYPE:
+                    app_type = config_.APP_TYPE.get(app_name)
+                    flow_pool_key_list = [
+                        f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}",
+                        f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
+                    ]
+                    for key in flow_pool_key_list:
+                        remove_res = redis_helper.remove_value_from_set(key_name=key,
+                                                                        values=(f"{video_id}-{flow_pool}", ))
+                        if remove_res > 0:
+                            add_remove_log = True
+                    video_flow_pool_key_list = [
+                        f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}",
+                        f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}"
+                    ]
+                    for key in video_flow_pool_key_list:
+                        redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, ))
+                if add_remove_log is True:
+                    log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
+
+            # if redis_helper.key_exists(key_name=key_name):
+            #     # 该视频本地有记录,本地记录的分发数 - 1
+            #     redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
+            # else:
+            #     # 该视频本地无记录,接口获取的分发数 - 1
+            #     redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
+
+    except Exception as e:
+        log_.error('update_local_distribute_count error...')
+        log_.error(traceback.format_exc())
+
+
 def get_religion_class_with_mid(mid, religion_class_name):
     """
     判断用户是否属于对应的宗教类型
@@ -1686,19 +1742,19 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     get_result_st = time.time()
     #print("ab_code:", ab_code)
     #new pipeline
-    if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
-        result = new_video_recommend(request_id=request_id,
-                             mid=mid, uid=uid, app_type=app_type,
-                             size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                             algo_type=algo_type, client_info=client_info,
-                             ab_code=ab_code, expire_time=expire_time,
-                             rule_key=rule_key, data_key=data_key,
-                             no_op_flag=no_op_flag, old_video_index=old_video_index,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+    # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
+    #     result = new_video_recommend(request_id=request_id,
+    #                          mid=mid, uid=uid, app_type=app_type,
+    #                          size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+    #                          algo_type=algo_type, client_info=client_info,
+    #                          ab_code=ab_code, expire_time=expire_time,
+    #                          rule_key=rule_key, data_key=data_key,
+    #                          no_op_flag=no_op_flag, old_video_index=old_video_index,
+    #                          params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
     # simrecal: 60054 +融合, 全量
     # return video, return video2
     # old video: 60056, test2
-    elif ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
+    if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073:
         result, fea_info = video_old_recommend(request_id=request_id,
                                      mid=mid, uid=uid, app_type=app_type,
@@ -1740,10 +1796,13 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     #                     app_type=app_type, mid=mid, uid=uid)
     # redis数据刷新
     update_redis_st = time.time()
-    if ab_code == 60047 or  ab_code == 60048 or  ab_code == 60049:
-        update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
-    else:
-        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    # if ab_code == 60047 or  ab_code == 60048 or  ab_code == 60049:
+    #     update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    # else:
+    #     update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1805,16 +1864,16 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
     # 简单召回 - 排序 - 兜底
     get_result_st = time.time()
     #print("ab_code:", ab_code)
-    if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
-        result = new_video_recommend(request_id=request_id,
-                                 mid=mid, uid=uid, app_type=app_type,
-                                 size=size, top_K=top_K, flow_pool_P=flow_pool_P,
-                                 algo_type='', client_info=client_info,
-                                 ab_code=ab_code, expire_time=expire_time,
-                                 rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                                 old_video_index=old_video_index, video_id=video_id,
-                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
-    elif ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066\
+    # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
+    #     result = new_video_recommend(request_id=request_id,
+    #                              mid=mid, uid=uid, app_type=app_type,
+    #                              size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+    #                              algo_type='', client_info=client_info,
+    #                              ab_code=ab_code, expire_time=expire_time,
+    #                              rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+    #                              old_video_index=old_video_index, video_id=video_id,
+    #                              params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+    if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
             or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073:
         result, fea_info = video_old_recommend(request_id=request_id,
                                  mid=mid, uid=uid, app_type=app_type,
@@ -1857,10 +1916,13 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # redis数据刷新
     update_redis_st = time.time()
-    if ab_code == 60047 or ab_code == 60048 or  ab_code == 60049:
-         update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
-    else:
-         update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    # if ab_code == 60047 or ab_code == 60048 or  ab_code == 60049:
+    #      update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    # else:
+    #      update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+
+    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,

+ 166 - 0
video_recall.py

@@ -555,6 +555,123 @@ class PoolRecall(object):
 
         return flow_pool_recall_result[:size]
 
+    def flow_pool_recall_new(self, size=10, flow_pool_id=None):
+        """从流量池中获取视频"""
+        start_time = time.time()
+        # 获取存在城市分组数据的城市编码列表
+        city_code_list = [code for _, code in config_.CITY_CODE.items()]
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        # 获取cityCode
+        city_code = self.client_info.get('cityCode', '-1')
+
+        if city_code in city_code_list:
+            # 分城市数据存在时,获取城市分组数据
+            region_code = city_code
+        else:
+            region_code = province_code
+        if region_code == '':
+            region_code = '-1'
+
+        flow_pool_key = self.get_pool_redis_key('flow_set', flow_pool_id=flow_pool_id)
+        # print(flow_pool_key)
+        flow_pool_recall_result = []
+        flow_pool_recall_videos = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 记录获取频次
+        freq = 0
+        idx = 0
+        while len(flow_pool_recall_result) < size:
+            freq += 1
+            if freq > config_.MAX_FREQ_FROM_FLOW_POOL:
+                break
+            # 获取数据
+            # st_get = time.time()
+            data = self.redis_helper.get_data_with_count_from_set(key_name=flow_pool_key, count=get_size)
+            # et_get = time.time()
+            # log_.info('get data from flow pool redis: freq = {}, data = {}, execute time = {}ms'.format(
+            #     freq, data, (et_get - st_get) * 1000))
+            if not data:
+                # log_.info('流量池中的视频已取完')
+                break
+            # 将video_id 与 flow_pool, score做mapping整理
+            video_ids = []
+            video_mapping = {}
+            video_score = {}
+            for value in data:
+                try:
+                    video_id, flow_pool = value.split('-')
+                except Exception as e:
+                    log_.error({
+                        'request_id': self.request_id,
+                        'app_type': self.app_type,
+                        'flow_pool_value': value
+                    })
+                    continue
+                video_id = int(video_id)
+                if video_id not in video_ids:
+                    video_ids.append(video_id)
+                if video_id not in video_mapping:
+                    video_mapping[video_id] = [flow_pool]
+                else:
+                    video_mapping[video_id].append(flow_pool)
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
+            ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
+                              region_code=region_code, shield_config=self.shield_config)
+            ge.join()
+            filtered_result = ge.get()
+            # 检查可分发数
+            if filtered_result:
+                st_check = time.time()
+                ge = gevent.spawn(self.check_video_counts_new, video_ids=filtered_result, flow_pool_mapping=video_mapping)
+                ge.join()
+                check_result = ge.get()
+                # log_.info({
+                #     'logTimestamp': int(time.time() * 1000),
+                #     'request_id': self.request_id,
+                #     'app_type': self.app_type,
+                #     'mid': self.mid,
+                #     'uid': self.uid,
+                #     'operation': 'check_video_counts',
+                #     'executeTime': (time.time() - st_check) * 1000
+                # })
+
+                for item in check_result:
+                    video_id = int(item[0])
+                    flow_pool = item[1]
+                    if video_id not in flow_pool_recall_videos:
+                        # 取其中一个 flow_pool 作为召回结果
+                        # 添加视频源参数 pushFrom, abCode
+                        flow_pool_recall_result.append(
+                            {'videoId': video_id, 'flowPool': flow_pool,
+                             'rovScore': random.uniform(0, 100), 'pushFrom': config_.PUSH_FROM['flow_recall'],
+                             'abCode': self.ab_code}
+                        )
+
+                        flow_pool_recall_videos.append(video_id)
+                # et_check = time.time()
+                # log_.info('check result: result = {}, execute time = {}ms'.format(
+                #     check_result, (et_check - st_check) * 1000))
+
+                # # 判断错误标记, True为错误
+                # if error_flag:
+                #     # 结束流量池召回
+                #     break
+
+            idx += get_size
+
+        # log_.info({
+        #     'logTimestamp': int(time.time() * 1000),
+        #     'request_id': self.request_id,
+        #     'operation': 'flow_pool_recall',
+        #     'executeTime': (time.time() - start_time) * 1000
+        # })
+
+        return flow_pool_recall_result[:size]
+
     def check_video_counts(self, video_ids, flow_pool_mapping):
         """
         检查视频剩余可分发数
@@ -597,6 +714,49 @@ class PoolRecall(object):
                         log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
         return check_result
 
+    def check_video_counts_new(self, video_ids, flow_pool_mapping):
+        """
+        检查视频剩余可分发数
+        :param video_ids: 视频id type-list
+        :param flow_pool_mapping: 视频id-流量池标记mapping, type-dict
+        :return: check_result, error_flag
+        """
+        # flow_pool_key = self.get_pool_redis_key('flow')
+        # videos = []
+        check_result = []
+        for video_id in video_ids:
+            video_id = int(video_id)
+            for flow_pool in flow_pool_mapping.get(video_id, []):
+                # 判断是否有本地分发记录
+                cur_count = get_videos_local_distribute_count(video_id=video_id, flow_pool=flow_pool)
+                # 无记录
+                if cur_count is None:
+                    # videos.append({'videoId': video_id, 'flowPool': flow_pool})
+                    continue
+                # 本地分发数 cur_count > 0
+                elif cur_count > 0:
+                    check_result.append((video_id, flow_pool))
+                # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
+                else:
+                    add_remove_log = False
+                    remain_count_key = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
+                    self.redis_helper.del_keys(remain_count_key)
+                    value = '{}-{}'.format(video_id, flow_pool)
+                    for item in config_.APP_TYPE:
+                        flow_pool_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}"
+                        remove_res = self.redis_helper.remove_value_from_set(key_name=flow_pool_key, values=(value, ))
+                        if remove_res > 0:
+                            add_remove_log = True
+                        quick_flow_pool_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{config_.APP_TYPE.get(item)}" \
+                                              f":{config_.QUICK_FLOW_POOL_ID}"
+                        remove_res = self.redis_helper.remove_value_from_set(key_name=quick_flow_pool_key,
+                                                                             values=(value, ))
+                        if remove_res > 0:
+                            add_remove_log = True
+                    if add_remove_log is True:
+                        log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
+        return check_result
+
     """
         # 本次视频都有本地记录
         if len(videos) == 0:
@@ -694,6 +854,12 @@ class PoolRecall(object):
             else:
                 return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
 
+        elif pool_type == 'flow_set':
+            if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
+            else:
+                return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{self.app_type}"
+
         elif pool_type == 'special':
             key_name_prefix = config_.KEY_NAME_PREFIX_SPECIAL_VIDEOS
             # 判断列表是否更新,未更新则使用前一天的列表