liqian 3 سال پیش
والد
کامیت
524c238e0f
6فایلهای تغییر یافته به همراه97 افزوده شده و 30 حذف شده
  1. 3 1
      app.py
  2. 2 0
      config.py
  3. 35 16
      recommend.py
  4. 1 1
      utils.py
  5. 34 7
      video_rank.py
  6. 22 5
      video_recall.py

+ 3 - 1
app.py

@@ -33,13 +33,15 @@ def homepage_recommend():
             size = 10
         if category_id in config_.CATEGORY['recommend']:
             # 推荐
-            videos = video_recommend()
+            videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type=algo_type)
             result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+            log_.info('result: {}'.format(result))
             return json.dumps(result)
         elif category_id in config_.CATEGORY['other']:
             # 其他类别
             videos = get_category_videos()
             result = {'code': 200, 'message': 'success', 'data': {'videos': videos}}
+            log_.info('result: {}'.format(result))
             return json.dumps(result)
         else:
             log_.error('categoryId error, categoryId = {}'.format(category_id))

+ 2 - 0
config.py

@@ -1,4 +1,6 @@
 class BaseConfig(object):
+    # abCode
+    AB_CODE = 10000
     # category id mapping
     CATEGORY = {
         'recommend': [0],  # 推荐

+ 35 - 16
recommend.py

@@ -1,12 +1,11 @@
-import random
 import time
 import multiprocessing
 
-from datetime import date, timedelta
 from log import Log
-from db_helper import RedisHelper
 from config import set_config
-from utils import FilterVideos, get_videos_remain_view_count
+from video_recall import PoolRecall
+from video_rank import video_rank, bottom_strategy
+from db_helper import RedisHelper
 
 log_ = Log()
 config_ = set_config()
@@ -22,11 +21,14 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     :param algo_type: 算法类型  type-string
     :return:
     """
-    # 多进程召回
+    ab_code = config_.AB_CODE
+    # ####### 多进程召回
     start_recall = time.time()
+    log_.info('====== recall')
     cores = multiprocessing.cpu_count()
     pool = multiprocessing.Pool(processes=cores)
-    pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid)
+    pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
+    _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     pool_list = [
         # rov召回池
         pool.apply_async(pool_recall.rov_pool_recall, size),
@@ -40,20 +42,37 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
         mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
 
-    # 排序
+    # ####### 排序
+    start_rank = time.time()
+    log_.info('====== rank')
     data = {
         'rov_pool_recall': recall_result_list[0],
         'flow_pool_recall': recall_result_list[1]
     }
+    rank_result = video_rank(data=data, size=size)
+    end_rank = time.time()
+    log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
+        mid, uid, rank_result, (end_rank - start_rank) * 1000))
 
+    if not rank_result:
+        # 兜底策略
+        log_.info('====== bottom strategy')
+        start_bottom = time.time()
+        rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
+        end_bottom = time.time()
+        log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
+            mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
 
-    # 1. 从ROV召回池中获取 size 个视频, 过滤
-
-    # 2. 从流量池中获取 size-K 个视频,过滤,剩余可分发数 > 0
-
-    # 3. 排序,topK 召回池视频,size-K 按概率 P 从流量池中获取视频
-    pass
-
-
-
+    # ####### redis数据刷新
+    log_.info('====== update redis')
+    # 预曝光数据同步刷新到Redis, 过期时间为0.5h
+    redis_helper = RedisHelper()
+    preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
+    preview_video_ids = [item['videoId'] for item in rank_result]
+    redis_helper.set_data_to_redis(key_name=preview_key_name, value=preview_video_ids, expire_time=0.5*3600)
+    # 将此次获取的ROV召回池末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
+    rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
+    if rov_recall_video:
+        redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
 
+    return rank_result

+ 1 - 1
utils.py

@@ -43,7 +43,7 @@ def get_videos_remain_view_count(app_type, videos):
         log_.info('获取视频在流量池中的剩余可分发数失败')
         return []
 
-    data = [(item['videoId'], item['flowPool'], item['viewCount']) for item in result['data'] if item['viewCount'] > 0]
+    data = [(item['videoId'], item['flowPool'], item['viewCount']) for item in result['data']]
     return data
 
 

+ 34 - 7
video_rank.py

@@ -1,7 +1,11 @@
 import random
+import numpy
 
 from log import Log
 from config import set_config
+from video_recall import PoolRecall
+from db_helper import RedisHelper
+from utils import FilterVideos
 
 log_ = Log()
 config_ = set_config()
@@ -14,6 +18,8 @@ def video_rank(data, size):
     :param size: 请求数
     :return: rank_result
     """
+    if not data['rov_pool_recall'] and not data['flow_pool_recall']:
+        return None
     # 将各路召回的视频按照score从大到小排序
     # ROV召回池
     rov_recall_rank = sorted(data['rov_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
@@ -21,19 +27,15 @@ def video_rank(data, size):
     flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
     # 对各路召回的视频进行去重
     rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank)
+
     # 从ROV召回池中获取top k
     if len(rov_recall_rank) > 0:
         rank_result = rov_recall_rank[:config_.K]
         rov_recall_rank = rov_recall_rank[config_.K:]
-    elif len(flow_recall_rank) > 0:
+    else:
         rank_result = flow_recall_rank[:config_.K]
         flow_recall_rank = flow_recall_rank[config_.K:]
-    else:
-        # 兜底策略
-        return None
-    if not rov_recall_rank and not flow_recall_rank:
-        # 兜底策略
-        return None
+
     # 按概率 p 及score排序获取 size - k 个视频
     i = 0
     while i < size - config_.K:
@@ -81,3 +83,28 @@ def remove_duplicate(rov_recall, flow_recall):
         if item[0] in flow_recall_remove:
             flow_recall.remove(item)
     return rov_recall, flow_recall
+
+
+def bottom_strategy(size, app_type, ab_code, mid='', uid=''):
+    """
+    兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
+    :param size: 需要获取的视频数
+    :param app_type: 产品标识 type-int
+    :param ab_code: abCode
+    :param mid:
+    :param uid:
+    :return:
+    """
+    pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
+    key_name = pool_recall.get_pool_redis_key(pool_type='rov')
+    redis_helper = RedisHelper()
+    data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
+    if not data:
+        log_.info('bottom strategy no data!')
+        return []
+    # 状态过滤
+    filter_videos = FilterVideos(app_type=app_type, mid=mid, uid=uid, video_ids=data)
+    filtered_data = filter_videos.filter_video_status(video_ids=data)
+    random_data = numpy.random.choice(filtered_data, size, False)
+    bottom_data = [{'videoId': item, 'pushFrom': 'bottom_strategy', 'abCode': ab_code} for item in random_data]
+    return bottom_data

+ 22 - 5
video_recall.py

@@ -12,16 +12,18 @@ config_ = set_config()
 
 class PoolRecall(object):
     """召回"""
-    def __init__(self, app_type, mid, uid):
+    def __init__(self, app_type, mid, uid, ab_code):
         """
         初始化
         :param app_type: 产品标识 type-int
         :param mid: mid type-string
         :param uid: uid type-string
+        :param ab_code: ab_code type-int
         """
         self.app_type = app_type
         self.mid = mid
         self.uid = uid
+        self.ab_code = ab_code
         self.redis_helper = RedisHelper()
 
     def rov_pool_recall(self, size=10):
@@ -62,7 +64,10 @@ class PoolRecall(object):
             filter_ = FilterVideos(app_type=self.app_type, mid=self.mid, uid=self.uid, video_ids=video_ids)
             filtered_result = filter_.filter_videos()
             if filtered_result:
-                temp_result = [{'videoId': item, 'rovScore': video_score[item]} for item in filtered_result]
+                # 添加视频源参数 pushFrom, abCode
+                temp_result = [{'videoId': item, 'rovScore': video_score[item],
+                                'pushFrom': 'recall_pool', 'abCode': self.ab_code}
+                               for item in filtered_result]
                 rov_pool_recall_result.extend(temp_result)
             else:
                 # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
@@ -117,8 +122,10 @@ class PoolRecall(object):
                 for item in check_result:
                     if item[0] not in flow_pool_recall_videos:
                         # 取其中一个 flow_pool 作为召回结果
+                        # 添加视频源参数 pushFrom, abCode
                         flow_pool_recall_result.append(
-                            {'videoId': item[0], 'flowPool': item[1], 'rovScore': video_score[item[0]]}
+                            {'videoId': item[0], 'flowPool': item[1], 'rovScore': video_score[item[0]],
+                             'pushFrom': 'flow_pool', 'abCode': self.ab_code}
                         )
                         flow_pool_recall_videos.append(item[0])
                 et_check = time.time()
@@ -139,9 +146,19 @@ class PoolRecall(object):
         for video_id in video_ids:
             for flow_pool in flow_pool_mapping[video_id]:
                 videos.append({'videoId': video_id, 'flowPool': flow_pool})
-        check_result = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
-        if not check_result:
+        view_count_result = get_videos_remain_view_count(app_type=self.app_type, videos=videos)
+        if not view_count_result:
             return None
+        check_result = []
+        for item in view_count_result:
+            if item[2] > 0:
+                # viewCount > 0
+                check_result.append(item)
+            else:
+                # viewCount <= 0
+                # 从流量召回池移除
+                value = '{}-{}'.format(item[0], item[1])
+                self.redis_helper.remove_value_from_zset(key_name=config_.FLOW_POOL_KEY_NAME, value=value)
         return check_result
 
     def get_pool_redis_key(self, pool_type):