Browse Source

add check local distribute count

liqian 3 years ago
parent
commit
992a399321
5 changed files with 97 additions and 11 deletions
  1. 11 2
      config.py
  2. 6 4
      db_helper.py
  3. 47 0
      recommend.py
  4. 21 0
      utils.py
  5. 12 5
      video_recall.py

+ 11 - 2
config.py

@@ -17,16 +17,25 @@ class BaseConfig(object):
     K = 3
     # 从流量池获取视频的概率设置
     P = 0.3
+
     # ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.item.score.{date}
     RECALL_KEY_NAME_PREFIX = 'com.weiqu.video.recall.hot.item.score.'
+
     # appType = 6, ROV召回池redis key前缀,完整格式:com.weiqu.video.recall.hot.apptype.h.item.score.{appType}.{h}
     RECALL_KEY_NAME_PREFIX_APP_TYPE = 'com.weiqu.video.recall.hot.apptype.h.item.score.'
+
     # 流量池redis key前缀,完整格式 com.weiqu.video.flowpool.hot.item.score.{appType}
     FLOW_POOL_KEY_NAME_PREFIX = 'com.weiqu.video.flowpool.hot.item.score.'
+
     # 首页推荐预曝光列表redis key 前缀,完整key格式:com.weiqu.video.hot.recommend.previewed.{appType}.{mid}
     PREVIEW_KEY_PREFIX = 'com.weiqu.video.hot.recommend.previewed.'
+
     # 用户上一次在rov召回池对应的位置 redis key前缀,完整key格式:com.weiqu.video.rov.pool.last.{appType}.{mid}.{date}
     LAST_VIDEO_FROM_ROV_POOL_PREFIX = 'com.weiqu.video.rov.pool.last.'
+
+    # 本地记录视频的可分发数,控制分发,完整key格式:com.weiqu.video.flowpool.local.distribute.count.{h}
+    LOCAL_DISTRIBUTE_COUNT_PREFIX = 'com.weiqu.video.flowpool.local.distribute.count.'
+
     # 从ROV召回池获取视频的最大频次,限制每次请求的获取次数
     MAX_FREQ_FROM_ROV_POOL = 3
 
@@ -90,7 +99,7 @@ class TestConfig(BaseConfig):
 
 
 class PreProductionConfig(BaseConfig):
-    """测试环境配置"""
+    """预发布环境配置"""
     # 线上环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',
@@ -119,7 +128,7 @@ class PreProductionConfig(BaseConfig):
 
 
 class ProductionConfig(BaseConfig):
-    """测试环境配置"""
+    """生产环境配置"""
     # 线上环境redis地址
     REDIS_INFO = {
         'host': 'r-bp1fogs2mflr1ybfot.redis.rds.aliyuncs.com',

+ 6 - 4
db_helper.py

@@ -83,7 +83,7 @@ class RedisHelper(object):
         conn = self.connect()
         conn.zadd(key_name, data)
         # 设置过期时间
-        conn.expire(key_name, expire_time)
+        conn.expire(key_name, int(expire_time))
 
     def get_data_zset_with_index(self, key_name, start, end, desc=True, with_scores=False):
         """
@@ -112,6 +112,8 @@ class RedisHelper(object):
         :return: score value对应的score
         """
         conn = self.connect()
+        if not conn.exists(key_name):
+            return None
         return conn.zscore(key_name, value)
 
     def update_score_with_value(self, key_name, value, score, expire_time=7*24*3600):
@@ -220,6 +222,6 @@ class HologresHelper(object):
 
 if __name__ == '__main__':
     redis_helper = RedisHelper()
-    key = 'com.weiqu.video.hot.recommend.item.score.20210901'
-    res = redis_helper.get_score_with_value(key, 90797)
-    print(res)
+    key = 'com.weiqu.video.flowpool.local.distribute.count.11.5'
+    redis_helper.add_data_with_zset(key, {'12345-133#442#2': 8.0})
+

+ 47 - 0
recommend.py

@@ -1,6 +1,8 @@
 import time
 import multiprocessing
+import traceback
 
+from datetime import datetime
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
@@ -72,10 +74,55 @@ def video_recommend(mid, uid, size, app_type, algo_type):
     if preview_video_ids:
         redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30*60)
         log_.info('preview redis update success!')
+
     # 将此次获取的ROV召回池末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
     rov_recall_video = [item['videoId'] for item in rank_result if item['pushFrom'] == 'recall_pool']
     if rov_recall_video:
         redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
         log_.info('last video redis update success!')
 
+    # 将此次分发的流量池视频,对 本地分发数-1 进行记录
+    flow_recall_video = [item for item in rank_result if item['pushFrom'] == 'flow_pool']
+    if flow_recall_video:
+        update_local_distribute_count(flow_recall_video)
+        log_.info('update local distribute count success!')
+
     return rank_result
+
+
+def update_local_distribute_count(videos):
+    """
+    更新本地分发数
+    :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
+                                    'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
+    :return:
+    """
+    try:
+        redis_h = datetime.now().hour
+        if datetime.now().minute >= 30:
+            redis_h += 0.5
+        key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
+        print(key_name)
+        redis_helper = RedisHelper()
+        update_data = {}
+        for item in videos:
+            video = '{}-{}'.format(item['videoId'], item['flowPool'])
+            current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
+            if current_count is not None:
+                # 该视频本地有记录,本地记录的分发数 - 1
+                new_count = current_count - 1
+            else:
+                # 该视频本地无记录,接口获取的分发数 - 1
+                new_count = int(item['distributeCount']) - 1
+            update_data[video] = new_count
+        log_.info('now update video local distribute count: {}, key: {}'.format(update_data, key_name))
+        # 更新redis中的数据
+        redis_helper.add_data_with_zset(key_name=key_name, data=update_data, expire_time=0.5*3600)
+
+    except Exception as e:
+        log_.error(traceback.format_exc())
+
+
+if __name__ == '__main__':
+    videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}]
+    update_local_distribute_count(videos)

+ 21 - 0
utils.py

@@ -2,6 +2,7 @@ import requests
 import json
 import time
 
+from datetime import datetime
 from db_helper import HologresHelper, RedisHelper
 from config import set_config
 from log import Log
@@ -54,6 +55,26 @@ def get_videos_remain_view_count(app_type, videos):
     return data
 
 
+def get_videos_local_distribute_count(video_id, flow_pool):
+    """
+    获取流量池视频本地分发数
+    :param video_id: video_id
+    :param flow_pool: 流量池标记
+    :return: current_count 本地记录的分发数
+    """
+    redis_h = datetime.now().hour
+    if datetime.now().minute >= 30:
+        redis_h += 0.5
+    key_name = config_.LOCAL_DISTRIBUTE_COUNT_PREFIX + str(redis_h)
+    redis_helper = RedisHelper()
+    video = '{}-{}'.format(video_id, flow_pool)
+    current_count = redis_helper.get_score_with_value(key_name=key_name, value=video)
+    if current_count is not None:
+        return current_count
+    else:
+        return None
+
+
 class FilterVideos(object):
     """视频过滤"""
     def __init__(self, app_type, video_ids, mid='', uid=''):

+ 12 - 5
video_recall.py

@@ -4,7 +4,7 @@ from datetime import date, timedelta, datetime
 from log import Log
 from db_helper import RedisHelper
 from config import set_config
-from utils import FilterVideos, get_videos_remain_view_count
+from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count
 
 log_ = Log()
 config_ = set_config()
@@ -125,8 +125,8 @@ class PoolRecall(object):
                         # 取其中一个 flow_pool 作为召回结果
                         # 添加视频源参数 pushFrom, abCode
                         flow_pool_recall_result.append(
-                            {'videoId': item[0], 'flowPool': item[1], 'rovScore': video_score[item[0]],
-                             'pushFrom': 'flow_pool', 'abCode': self.ab_code}
+                            {'videoId': item[0], 'flowPool': item[1], 'distributeCount': item[2],
+                             'rovScore': video_score[item[0]], 'pushFrom': 'flow_pool', 'abCode': self.ab_code}
                         )
                         flow_pool_recall_videos.append(item[0])
                 et_check = time.time()
@@ -155,8 +155,15 @@ class PoolRecall(object):
         check_result = []
         for item in view_count_result:
             if item[2] > 0:
-                # viewCount > 0
-                check_result.append(item)
+                # viewCount > 0,判断本地分发数
+                cur_count = get_videos_local_distribute_count(video_id=item[0], flow_pool=item[1])
+                # 无记录 或 cur_count > 0
+                if cur_count is None or cur_count > 0:
+                    check_result.append(item)
+                # cur_count <= 0,从流量召回池移除
+                else:
+                    value = '{}-{}'.format(item[0], item[1])
+                    self.redis_helper.remove_value_from_zset(key_name=flow_pool_key, value=value)
             else:
                 # viewCount <= 0
                 # 从流量召回池移除