فهرست منبع

add new flowpool abtest

liqian 1 سال پیش
والد
کامیت
20ae90e508
2فایلهای تغییر یافته به همراه111 افزوده شده و 4 حذف شده
  1. 4 4
      recommend.py
  2. 107 0
      video_recall.py

+ 4 - 4
recommend.py

@@ -239,9 +239,9 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
     elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
-             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
-             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
@@ -512,9 +512,9 @@ def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
     elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
-             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
                           size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
-             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score,
+             gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
                           size, flow_pool_abtest_group=flow_pool_abtest_group)]
     else:
         t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),

+ 107 - 0
video_recall.py

@@ -973,6 +973,113 @@ class PoolRecall(object):
 
         return flow_pool_recall_result[:size], flow_pool_recall_process
 
+    def flow_pool_recall_new_with_level_score2(self, size=10, flow_pool_id=None, flow_pool_abtest_group=None):
+        """从流量池中获取视频"""
+        # add_flow_pool_recall_log
+        flow_pool_recall_process = {}
+        # 获取存在城市分组数据的城市编码列表
+        city_code_list = [code for _, code in config_.CITY_CODE.items()]
+        # 获取provinceCode
+        province_code = self.client_info.get('provinceCode', '-1')
+        # 获取cityCode
+        city_code = self.client_info.get('cityCode', '-1')
+
+        if city_code in city_code_list:
+            # 分城市数据存在时,获取城市分组数据
+            region_code = city_code
+        else:
+            region_code = province_code
+        if region_code == '':
+            region_code = '-1'
+
+        flow_pool_key, level = self.get_pool_redis_key('flow_set_level_score', flow_pool_id=flow_pool_id)
+
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['flow_pool_key'] = flow_pool_key
+        flow_pool_recall_process['level'] = level
+
+        if flow_pool_key is None:
+            return [], flow_pool_recall_process
+        # print(flow_pool_key)
+        flow_pool_recall_result = []
+        flow_pool_recall_videos = []
+        # 每次获取的视频数
+        get_size = size * 5
+        # 获取数据
+        idx = 0
+        data = self.redis_helper.get_data_zset_with_index(key_name=flow_pool_key,
+                                                          start=idx, end=idx + get_size * 5 - 1,
+                                                          with_scores=True)
+        flow_pool_recall_process['initial_data'] = data
+        if not data:
+            return [], flow_pool_recall_process
+
+        # 将video_id 与 flow_pool, score做mapping整理
+        video_ids = []
+        video_mapping = {}
+        video_score = {}
+        for value in data:
+            try:
+                video_id, flow_pool = value[0].split('-')
+            except Exception as e:
+                log_.error({
+                    'request_id': self.request_id,
+                    'app_type': self.app_type,
+                    'flow_pool_value': value
+                })
+                continue
+            video_id = int(video_id)
+            video_score[value[0]] = value[1]
+            if video_id not in video_ids:
+                video_ids.append(video_id)
+            if video_id not in video_mapping:
+                video_mapping[video_id] = [flow_pool]
+            else:
+                video_mapping[video_id].append(flow_pool)
+
+        # 检查可分发数
+        ge = gevent.spawn(self.check_video_counts_new_with_level_score,
+                          video_ids=video_ids, flow_pool_mapping=video_mapping)
+        ge.join()
+        check_result = ge.get()
+        # add_flow_pool_recall_log
+        flow_pool_recall_process['check_counts_data'] = check_result
+
+        check_result_mapping = {}
+        check_result_items = []
+        if check_result:
+            # 获取score top20 视频进入过滤
+            for item in check_result:
+                video_id = int(item[0])
+                flow_pool = item[1]
+                score = video_score[f"{video_id}-{flow_pool}"]
+                if video_id not in flow_pool_recall_videos:
+                    check_result_mapping[video_id] = [flow_pool, score]
+                    check_result_items.append([video_id, flow_pool, score])
+            check_result_items = sorted(check_result_items, key=lambda x: x[2], reverse=True)
+            to_filter_videos = [item[0] for item in check_result_items[:get_size]]
+            # 过滤
+            filter_ = FilterVideos(request_id=self.request_id,
+                                   app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=to_filter_videos)
+            ge = gevent.spawn(filter_.filter_videos, pool_type='flow',
+                              region_code=region_code, shield_config=self.shield_config)
+            ge.join()
+            filtered_result = ge.get()
+            # add_flow_pool_recall_log
+            flow_pool_recall_process['filtered_data'] = filtered_result
+
+            for item in filtered_result:
+                video_id = int(item)
+                # 添加视频源参数 pushFrom, abCode
+                flow_pool_recall_result.append(
+                    {'videoId': video_id, 'flowPool': check_result_mapping[video_id][0], 'level': level,
+                     'rovScore': check_result_mapping[video_id][1],
+                     'pushFrom': config_.PUSH_FROM['flow_recall'],
+                     'abCode': self.ab_code, 'flow_pool_abtest_group': flow_pool_abtest_group}
+                )
+
+        return flow_pool_recall_result[:size], flow_pool_recall_process
+
     def check_video_counts(self, video_ids, flow_pool_mapping):
         """
         检查视频剩余可分发数