liqian 3 年 前
コミット
47f1daa059
3 ファイル変更166 行追加78 行削除
  1. 6 2
      config.py
  2. 35 13
      recommend.py
  3. 125 63
      utils.py

+ 6 - 2
config.py

@@ -12,8 +12,12 @@ class BaseConfig(object):
     RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
     # 流量池redis key
     FLOW_POOL_KEY_NAME = 'com.weiqu.video.flowpool.hot.item.score'
-    # 首页推荐预曝光列表redis key 前缀,完整key格式:PSEUDO_EXPOSURE_KEY_PREFIX.{appType}.{mid}
-    PSEUDO_EXPOSURE_KEY_PREFIX = 'com.weiqu.video.hot.recommend.pseudo.exposure.'
+    # 首页推荐预曝光列表redis key 前缀,完整key格式:com.weiqu.video.hot.recommend.previewed.{appType}.{mid}
+    PREVIEW_KEY_PREFIX = 'com.weiqu.video.hot.recommend.previewed.'
+    # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.{mid}.{date}
+    LAST_VIDEO_FROM_ROV_POOL_PREFIX = 'com.weiqu.video.rov.pool.last.'
+    # 从ROV召回池获取视频的最大频次,限制每次请求的获取次数
+    MAX_FREQ_FROM_ROV_POOL = 3
 
 
 class DevelopmentConfig(BaseConfig):

+ 35 - 13
recommend.py

@@ -1,24 +1,17 @@
+import random
+import time
+import multiprocessing
+
+from datetime import date, timedelta
 from log import Log
 from db_helper import RedisHelper
 from config import set_config
+from utils import FilterVideos, get_videos_remain_view_count
 
 log_ = Log()
 config_ = set_config()
 
 
-def rov_pool_recall(mid, uid, app_type, size):
-    """
-    从ROV召回池中获取视频
-    :param mid: mid type-string
-    :param uid: uid type-string
-    :param app_type: 产品标识 type-int
-    :param size: 获取数量 type-int
-    :return: rov_pool_recall_result
-    """
-
-
-
-
 def video_recommend(mid, uid, size, app_type, algo_type):
     """
     首页线上推荐逻辑
@@ -29,9 +22,38 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     :param algo_type: 算法类型  type-string
     :return:
     """
+    # 多进程召回
+    start_recall = time.time()
+    cores = multiprocessing.cpu_count()
+    pool = multiprocessing.Pool(processes=cores)
+    pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid)
+    pool_list = [
+        # rov召回池
+        pool.apply_async(pool_recall.rov_pool_recall, size),
+        # 流量池
+        pool.apply_async(pool_recall.flow_pool_recall, size)
+    ]
+    recall_result_list = [p.get() for p in pool_list]
+    pool.close()
+    pool.join()
+    end_recall = time.time()
+    log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
+        mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
+
+    # 排序
+    data = {
+        'rov_pool_recall': recall_result_list[0],
+        'flow_pool_recall': recall_result_list[1]
+    }
+
+
     # 1. 从ROV召回池中获取 size 个视频, 过滤
 
     # 2. 从流量池中获取 size-K 个视频,过滤,剩余可分发数 > 0
 
     # 3. 排序,topK 召回池视频,size-K 按概率 P 从流量池中获取视频
+    pass
+
+
+
 

+ 125 - 63
utils.py

@@ -23,73 +23,135 @@ def request_post(request_url, request_data):
         return res_data
 
 
-def filter_by_pseudo_exposure(app_type, mid, video_ids):
+def get_videos_remain_view_count(app_type, videos):
     """
-    伪曝光过滤
+    获取视频在流量池中的剩余可分发数
     :param app_type: 产品标识 type-int
-    :param mid: 设备id type-string
-    :param video_ids: 需过滤的视频列表 type-list
-    :return: filtered_videos  过滤后的列表  type-list
+    :param videos: 视频信息 (视频id, 流量池标记) type-list,[{'videoId': video_id, 'flowPool': flow_pool}, ...]
+    :return: data type-list,[(video_id, flow_pool, view_count), ...]
     """
-    # 根据Redis缓存中的数据过滤
-    redis_helper = RedisHelper()
-    # key拼接
-    key_name = '{}.{}.{}'.format(config_.PSEUDO_EXPOSURE_KEY_PREFIX, app_type, mid)
-    pe_videos_list = redis_helper.get_data_from_set(key_name)
-    if not pe_videos_list:
-        return video_ids
-    pe_videos = [eval(video) for video in pe_videos_list]
-    filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
-    return filtered_videos
-
-
-def filter_video_status(video_ids):
-    """
-    对视频状态进行过滤
-    :param video_ids: 视频id列表 type-list
-    :return: filtered_videos
-    """
-    st = time.time()
-    sql = "SELECT  video_id" \
-          "FROM    {}" \
-          "WHERE   audit_status = 5" \
-          "AND     applet_rec_status IN (1, 6)" \
-          "-- AND     open_status = 1" \
-          "-- AND     payment_status = 0" \
-          "-- AND     encryption_status != 5" \
-          "-- AND     transcoding_status = 3" \
-          "AND     video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
-    hologres_helper = HologresHelper()
-    data = hologres_helper.get_data(sql=sql)
-    filtered_videos = [temp[0] for temp in data]
-    et = time.time()
-    log_.info('filter video status finished! filtered_videos = {}, execute time = {}ms'.format(
-        filtered_videos, (et - st)*1000))
-
-
-def filter_video_viewed(app_type, mid, uid, video_ids, types=(1,)):
-    """
-    调用后端接口过滤用户已观看视频
-    :param app_type: 产品标识 type-int
-    :param mid: mid type-string
-    :param uid: uid type-string
-    :param video_ids: 视频id列表 type-list
-    :param types: 过滤参数 type-tuple, 默认(1, )  1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态
-    :return: filtered_videos
-    """
-    # 调用http接口
-    request_data = {"appType": app_type,
-                    "mid": mid,
-                    "uid": uid,
-                    "types": list(types),
-                    "videoIds": video_ids}
-    result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data)
+    if not videos:
+        return []
+
+    request_data = {'appType': app_type, 'videos': videos}
+    result = request_post(request_url=config_.GET_REMAIN_VIEW_COUNT_URL, request_data=request_data)
+
+    if result is None:
+        return []
+
     if result['code'] != 0:
-        log_.info('过滤失败,types: {}'.format(types))
-        return video_ids
-    filtered_videos = result['data']
-    return filtered_videos
+        log_.info('获取视频在流量池中的剩余可分发数失败')
+        return []
+
+    data = [(item['videoId'], item['flowPool'], item['viewCount']) for item in result['data'] if item['viewCount'] > 0]
+    return data
+
+
+class FilterVideos(object):
+    """视频过滤"""
+    def __init__(self, app_type, mid, uid, video_ids):
+        """
+        初始化
+        :param app_type: 产品标识 type-int
+        :param mid: mid type-string
+        :param uid: uid type-string
+        :param video_ids: 需过滤的视频列表 type-list
+        """
+        self.app_type = app_type
+        self.mid = mid
+        self.uid = uid
+        self.video_ids = video_ids
+
+    def filter_videos(self):
+        """视频过滤"""
+        # 预曝光过滤
+        st_pre = time.time()
+        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        et_pre = time.time()
+        log_.info('filter by previewed: app_type = {}, result = {}, execute time = {}ms'.format(
+            self.app_type, filtered_pre_result, (et_pre - st_pre) * 1000))
+        if not filtered_pre_result:
+            return None
+
+        # 视频状态过滤
+        st_status = time.time()
+        filtered_status_result = self.filter_video_status(video_ids=filtered_pre_result)
+        et_status = time.time()
+        log_.info('filter by video status: result = {}, execute time = {}ms'.format(
+            filtered_status_result, (et_status - st_status) * 1000))
+        if not filtered_status_result:
+            return None
+
+        # 视频已曝光过滤
+        st_viewed = time.time()
+        filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_status_result)
+        et_viewed = time.time()
+        log_.info('filter by viewed: app_type = {}, mid = {}, uid = {}, result = {}, execute time = {}ms'.format(
+            self.app_type, self.mid, self.uid, filtered_viewed_result, (et_viewed - st_viewed) * 1000))
+        if not filtered_viewed_result:
+            return None
+        else:
+            return filtered_viewed_result
+
+    def filter_video_previewed(self, video_ids):
+        """
+        预曝光过滤
+        :param video_ids: 需过滤的视频列表 type-list
+        :return: filtered_videos  过滤后的列表  type-list
+        """
+        # 根据Redis缓存中的数据过滤
+        redis_helper = RedisHelper()
+        # key拼接
+        key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(self.app_type, self.mid)
+        pe_videos_list = redis_helper.get_data_from_set(key_name)
+        if not pe_videos_list:
+            return video_ids
+        pe_videos = [eval(video) for video in pe_videos_list]
+        filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
+        return filtered_videos
+
+    def filter_video_status(self, video_ids):
+        """
+        对视频状态进行过滤
+        :param video_ids: 视频id列表 type-list
+        :return: filtered_videos
+        """
+        sql = "SELECT  video_id" \
+              "FROM    {}" \
+              "WHERE   audit_status = 5" \
+              "AND     applet_rec_status IN (1, 6)" \
+              "-- AND     open_status = 1" \
+              "-- AND     payment_status = 0" \
+              "-- AND     encryption_status != 5" \
+              "-- AND     transcoding_status = 3" \
+              "AND     video_id IN {};".format(config_.VIDEO_STATUS, tuple(video_ids))
+        hologres_helper = HologresHelper()
+        data = hologres_helper.get_data(sql=sql)
+        filtered_videos = [temp[0] for temp in data]
+        return filtered_videos
+
+    def filter_video_viewed(self, video_ids, types=(1,)):
+        """
+        调用后端接口过滤用户已观看视频
+        :param video_ids: 视频id列表 type-list
+        :param types: 过滤参数 type-tuple, 默认(1, )  1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态
+        :return: filtered_videos
+        """
+        # 调用http接口
+        request_data = {"appType": self.app_type,
+                        "mid": self.mid,
+                        "uid": self.uid,
+                        "types": list(types),
+                        "videoIds": video_ids}
+        result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data)
+        if result['code'] != 0:
+            log_.info('过滤失败,types: {}'.format(types))
+            return video_ids
+        filtered_videos = result['data']
+        return filtered_videos
 
 
 if __name__ == '__main__':
-    filter_video_status([1, 2, 3, 5978661])
+    filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
+    filter_.filter_videos()
+    filter_.filter_video_status(video_ids=[1, 3, 5])