Browse Source

Merge branch 'sim_recall_new' into test

liqian 2 years ago
parent
commit
967b9b3a5e
6 changed files with 842 additions and 24 deletions
  1. 27 1
      config.py
  2. 34 11
      db_helper.py
  3. 272 6
      recommend.py
  4. 215 5
      utils.py
  5. 157 0
      video_rank.py
  6. 137 1
      video_recall.py

+ 27 - 1
config.py

@@ -17,6 +17,13 @@ class BaseConfig(object):
         'PIAO_QUAN_VIDEO_PLUS': 21,  # 票圈视频+
         'JOURNEY': 22,  # 票圈足迹
         'BLESSING_YEAR': 3,  # 票圈福年
+        'H5': 12,  # H5
+    }
+    # 白名单(影视,宗教)过滤Redis
+    REDIS_INFO_FILTER = {
+        'host': 'r-bp1258kbkv8dj81dwj.redis.rds.aliyuncs.com',
+        'port': 6379,
+        'password': 'Wqsd@2019',
     }
 
     REGION_CODE = {
@@ -136,6 +143,9 @@ class BaseConfig(object):
             'abtest_228': 60044,
             'abtest_229': 60045,
             'abtest_262': 60046,
+            'abtest_316': 60047,
+            'abtest_319': 60048,
+            'abtest_320': 60049
         },  # 地域分组小时级规则实验
 
         'rank_by_24h': {
@@ -312,6 +322,18 @@ class BaseConfig(object):
             'ab_code': AB_CODE['region_rank_by_h'].get('abtest_262'),
             'shield_config': SHIELD_CONFIG2,
         },  # vlog:data-vlog, rule-rule4-2
+        '316': {
+            'data_key': 'data10', 'rule_key': 'rule19',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_316')
+        },
+        '319': {
+            'data_key': 'data10', 'rule_key': 'rule19',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_319')
+        },
+        '320': {
+            'data_key': 'data10', 'rule_key': 'rule19',
+            'ab_code': AB_CODE['region_rank_by_h'].get('abtest_320')
+        }
     }
 
     # APP ab实验配置
@@ -386,6 +408,10 @@ class BaseConfig(object):
         'talk_videos': 'talk_videos',  # 影视解说
         'special_mid': 'special_mid_videos',  # 特殊mid指定视频
         'rov_recall_30day': 'recall_pool_30day',  # 天级更新相对30天列表
+        'sim_hot_vid_recall': 'sim_hot_vid_recall',  # 相似视频召回
+        'fast_flow_recall': 'fast_flow_recall', #快速流量池召回
+        'normal_flow_recall': 'normal_flow_recall',  # 普通流量池召回
+
     }
 
     # category id mapping
@@ -1260,7 +1286,7 @@ class ProductionConfig(BaseConfig):
 def set_config():
     # 获取环境变量 ROV_SERVER_ENV
     env = os.environ.get('ROV_SERVER_ENV')
-    # env = 'dev'
+    env = 'pro'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 34 - 11
db_helper.py

@@ -11,15 +11,16 @@ config_ = set_config()
 log_ = Log()
 
 conn_redis = None
+conn_filter_redis = None
 
 
 class RedisHelper(object):
-    def __init__(self, params=None):
+    def __init__(self, params=None, redis_info=config_.REDIS_INFO):
         """
         初始化redis连接信息
         redis_info: redis连接信息, 格式:dict, {'host': '', 'port': '', 'password': ''}
         """
-        redis_info = config_.REDIS_INFO
+        self.redis_info = redis_info
         self.host = redis_info['host']
         self.port = redis_info['port']
         self.password = redis_info['password']
@@ -30,15 +31,26 @@ class RedisHelper(object):
         连接redis
         :return: conn
         """
-        global conn_redis
-        if conn_redis is None:
-            pool = redis.ConnectionPool(host=self.host,
-                                        port=self.port,
-                                        password=self.password,
-                                        decode_responses=True)
-            conn = redis.Redis(connection_pool=pool)
-            conn_redis = conn
-        return conn_redis
+        if self.redis_info == config_.REDIS_INFO_FILTER:
+            global conn_filter_redis
+            if conn_filter_redis is None:
+                pool = redis.ConnectionPool(host=self.host,
+                                            port=self.port,
+                                            password=self.password,
+                                            decode_responses=True)
+                conn = redis.Redis(connection_pool=pool)
+                conn_filter_redis = conn
+            return conn_filter_redis
+        else:
+            global conn_redis
+            if conn_redis is None:
+                pool = redis.ConnectionPool(host=self.host,
+                                            port=self.port,
+                                            password=self.password,
+                                            decode_responses=True)
+                conn = redis.Redis(connection_pool=pool)
+                conn_redis = conn
+            return conn_redis
 
     def key_exists(self, key_name):
         """
@@ -418,6 +430,17 @@ class RedisHelper(object):
         #         'executeTime': (time.time() - start_time) * 1000
         #     })
 
+    def get_batch_key(self, name_list):
+        conn = self.connect()
+        res = conn.mget(name_list)
+        return res
+
+    def mget(self, keys):
+        st_time = time.time()
+        conn = self.connect()
+        data = conn.mget(keys=keys)
+        #print(f"mget time: {(time.time() - st_time) * 1000}")
+        return data
 
 #hologres_info = config_.HOLOGRES_INFO
 #conn = psycopg2.connect(**hologres_info)

+ 272 - 6
recommend.py

@@ -10,7 +10,7 @@ import config
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
-from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
 from db_helper import RedisHelper
 import gevent
 from utils import FilterVideos, get_user_has30day_return
@@ -180,7 +180,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     pool_recall = PoolRecall(request_id=request_id,
                              app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
                              client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
-                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
     # _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
     # # 小时级实验
     # if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
@@ -281,6 +281,9 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
                 'rov_pool_recall': recall_result_list[0],
                 'flow_pool_recall': recall_result_list[2]
             }
+    #if ab_code=="ab_new_test":
+    #    rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+    #else:
     rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
 
     # 老视频实验
@@ -298,6 +301,7 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #     'rank_result': rank_result,
     #     'executeTime': (time.time() - start_rank) * 1000
     # })
+
     result['rankResult'] = rank_result
     result['rankTime'] = (time.time() - start_rank) * 1000
 
@@ -335,6 +339,179 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     return result
     # return rank_result, last_rov_recall_key
 
+def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
+                    expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
+                    no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
+                    shield_config=None):
+    """
+    首页线上推荐逻辑
+    :param request_id: request_id
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param size: 请求视频数量 type-int
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :param app_type: 产品标识  type-int
+    :param algo_type: 算法类型  type-string
+    :param client_info: 用户位置信息 {"country": "国家",  "province": "省份",  "city": "城市"}
+    :param expire_time: 末位视频记录redis过期时间
+    :param ab_code: AB实验code
+    :param video_id: 相关推荐头部视频id
+    :param params:
+    :return:
+    """
+    #1. recall
+    result = {}
+    result['rankResult'] =  []
+    # ####### 多进程召回
+    start_recall = time.time()
+
+    # 1. 根据城市或者省份获取region_code
+    city_code_list = [code for _, code in config_.CITY_CODE.items()]
+    # 获取provinceCode
+    province_code = client_info.get('provinceCode', '-1')
+    # 获取cityCode
+    city_code = client_info.get('cityCode', '-1')
+
+    if city_code in city_code_list:
+        # 分城市数据存在时,获取城市分组数据
+        region_code = city_code
+    else:
+        region_code = province_code
+    if region_code == '':
+        region_code = '-1'
+    
+    #print("region_code:", region_code)
+
+    #size =1000
+    pool_recall = PoolRecall(request_id=request_id,
+                             app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
+                             client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
+    if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+        t = [gevent.spawn(pool_recall.get_region_hour_recall, size, region_code),
+             gevent.spawn(pool_recall.get_region_day_recall, size, region_code),
+             gevent.spawn(pool_recall.get_selected_recall, size, region_code),
+             gevent.spawn(pool_recall.get_no_selected_recall, size, region_code)
+             ]
+    else:
+        t = [
+             gevent.spawn(pool_recall.get_region_hour_recall, size, region_code),
+             gevent.spawn(pool_recall.get_region_day_recall, size, region_code),
+             gevent.spawn(pool_recall.get_selected_recall, size, region_code),
+             gevent.spawn(pool_recall.get_no_selected_recall, size, region_code),
+             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+
+        if ab_code ==60049:
+            t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall))
+    gevent.joinall(t)
+    # all recall_result
+    all_recall_result_list = [i.get() for i in t]
+    all_recall_result = []
+    #print(all_recall_result_list)
+    result['recallTime'] = (time.time() - start_recall) * 1000
+
+    if not all_recall_result_list or len(all_recall_result_list)==0:
+        return result
+    for recall_item in all_recall_result_list:
+        if not recall_item or len(recall_item)==0:
+            continue
+        for per_item in recall_item:
+            all_recall_result.append(per_item)
+
+    #print("all_recall_result:", all_recall_result)
+    #2. duplicate
+    recall_dict = {}
+    fast_flow_set = set('')
+    flow_flow_set = set('')
+    region_h_recall = []
+    region_day_recall = []
+    select_day_recall = []
+    no_selected_recall  = []
+    sim_hot_recall = []
+    flow_recall = []
+    flowFlag_dict = {}
+    for per_item in all_recall_result:
+        #print(per_item)
+        vId = int(per_item.get("videoId",0))
+        if vId==0:
+            continue
+        recall_name = per_item.get("pushFrom",'')
+        flow_pool = per_item.get("flowPool", '')
+        if flow_pool != '':
+            flow_pool_id = int(flow_pool.split('#')[0])
+            if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
+                fast_flow_set.add(vId)
+            else:
+                flow_flow_set.add(vId)
+            flowFlag_dict[vId] = flow_pool
+
+        #duplicate divide into
+        if vId not in recall_dict:
+            if recall_name == config_.PUSH_FROM['rov_recall_region_h']:
+                region_h_recall.append(per_item)
+            elif recall_name == config_.PUSH_FROM['rov_recall_region_24h']:
+                region_day_recall.append(per_item)
+            elif recall_name == config_.PUSH_FROM['rov_recall_24h']:
+                select_day_recall.append(per_item)
+            elif recall_name == config_.PUSH_FROM['rov_recall_24h_dup']:
+                no_selected_recall.append(per_item)
+            elif recall_name == config_.PUSH_FROM['sim_hot_vid_recall']:
+                sim_hot_recall.append(per_item)
+            elif recall_name == config_.PUSH_FROM['flow_recall']:
+                flow_recall.append(per_item)
+        if vId not in recall_dict:
+            recall_dict[vId] = recall_name
+        else:
+            recall_name = recall_dict[vId] + "," + recall_name
+            recall_dict[vId] = recall_name
+    #print("recall_dict:", recall_dict)
+    #3. filter video, 先过预曝光
+    filter_ = FilterVideos(request_id=request_id,
+                           app_type=app_type, mid=mid, uid=uid, video_ids=list(recall_dict.keys()))
+
+    #print("filer:", list(recall_dict.keys()))
+    #a).expose filter
+    #all_recall_list = list(recall_dict.keys())
+    all_recall_list = filter_.filter_videos_new(pool_type='rov', region_code=region_code, shield_config=shield_config)
+    #print("filer after:", all_recall_list)
+    #4. sort: old sort: flow 按概率出
+    start_rank = time.time()
+    #quick_flow_pool_P get from redis
+    redis_helper = RedisHelper()
+    quick_flow_pool_P = redis_helper.get_data_from_redis(
+        key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
+    )
+    if quick_flow_pool_P:
+        flow_pool_P = quick_flow_pool_P
+
+    rank_result= []
+    if ab_code==60048 or ab_code==60049:
+        rank_ids, add_flow_set = video_new_rank(videoIds=all_recall_list,fast_flow_set=fast_flow_set, flow_set=flow_flow_set,size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+        #print("rank_ids:", rank_ids)
+        for rank_item in rank_ids:
+            rank_id = rank_item[0]
+            rank_score = rank_item[1]
+            pushFrom = recall_dict.get(rank_id, '')
+            #print(pushFrom, rank_id)
+            flowPoolFlag = ''
+            if rank_id in add_flow_set:
+                flowPoolFlag = flowFlag_dict.get(rank_id,'') 
+            rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag,
+                     'rovScore': rank_score, 'pushFrom': pushFrom,
+                     'abCode': ab_code})
+        #
+        #print("rank_result:", rank_result)
+    else:
+        all_dup_recall_result = region_h_recall+region_day_recall+select_day_recall+no_selected_recall+flow_recall
+        rank_result = refactor_video_rank(rov_recall_rank=all_dup_recall_result,fast_flow_set=fast_flow_set, flow_set=flow_flow_set, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+
+    result['rankResult'] = rank_result
+    result['rankTime'] = (time.time() - start_rank) * 1000
+    return result
+    # return rank_result, last_rov_recall_key
+
 
 def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
     """
@@ -498,6 +675,66 @@ def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
         log_.error(traceback.format_exc())
 
 
+def update_flow_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
+    """
+    根据最终的排序结果更新相关redis数据
+    :param result: 排序结果
+    :param app_type: 产品标识
+    :param mid: mid
+    :param top_K: 保证topK为召回池视频 type-int
+    :param expire_time: 末位视频记录redis过期时间
+    :return: None
+    """
+    # ####### redis数据刷新
+    try:
+        redis_helper = RedisHelper()
+        # log_.info('====== update redis')
+        if mid and mid != 'null':
+            # mid为空时,不做预曝光和定位数据更新
+            # 预曝光数据同步刷新到Redis, 过期时间为0.5h
+            preview_key_name = f"{config_.PREVIEW_KEY_PREFIX}{app_type}:{mid}"
+            preview_video_ids = [int(item['videoId']) for item in result]
+            if preview_video_ids:
+                # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
+                redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
+                # log_.info('preview redis update success!')
+
+        # 将此次分发的流量池视频,对 本地分发数-1 进行记录
+        if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+            # 获取本地分发数-1策略开关
+            switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME)
+            if switch is not None:
+                if int(switch) == 1:
+                    flow_recall_video = [item for item in result if item.get('flowPool', None) is not None]
+                else:
+                    flow_recall_video = [item for item in result if
+                                         item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
+            else:
+                flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
+            if flow_recall_video:
+                update_local_distribute_count(flow_recall_video)
+                # log_.info('update local distribute count success!')
+
+        # 限流视频分发数记录
+        if app_type == config_.APP_TYPE['APP']:
+            # APP 不计入
+            return
+        limit_video_id_list = redis_helper.get_data_from_set(
+            key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.today().strftime('%Y%m%d')}"
+        )
+        if limit_video_id_list is not None:
+            limit_video_id_list = [int(item) for item in limit_video_id_list]
+            for item in result:
+                video_id = item['videoId']
+                if video_id in limit_video_id_list:
+                    key_name = f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}"
+                    redis_helper.setnx_key(key_name=key_name, value=0, expire_time=24*2600)
+                    redis_helper.incr_key(key_name=key_name, amount=1, expire_time=24*3600)
+
+    except Exception as e:
+        log_.error("update redis data fail!")
+        log_.error(traceback.format_exc())
+
 def update_local_distribute_count(videos):
     """
     更新本地分发数
@@ -1087,7 +1324,18 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
 
     # 简单召回 - 排序 - 兜底
     get_result_st = time.time()
-    result = video_recommend(request_id=request_id,
+    print("ab_code:", ab_code)
+    if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
+        result = new_video_recommend(request_id=request_id,
+                             mid=mid, uid=uid, app_type=app_type,
+                             size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+                             algo_type=algo_type, client_info=client_info,
+                             ab_code=ab_code, expire_time=expire_time,
+                             rule_key=rule_key, data_key=data_key,
+                             no_op_flag=no_op_flag, old_video_index=old_video_index,
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+    else:
+        result = video_recommend(request_id=request_id,
                              mid=mid, uid=uid, app_type=app_type,
                              size=size, top_K=top_K, flow_pool_P=flow_pool_P,
                              algo_type=algo_type, client_info=client_info,
@@ -1115,7 +1363,10 @@ def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
     #                     app_type=app_type, mid=mid, uid=uid)
     # redis数据刷新
     update_redis_st = time.time()
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    if ab_code == 60047 or  ab_code == 60048 or  ab_code == 60049:
+        update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    else:
+        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,
@@ -1176,7 +1427,19 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # 简单召回 - 排序 - 兜底
     get_result_st = time.time()
-    result = video_recommend(request_id=request_id,
+    #print("ab_code:", ab_code)
+    if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
+        result = new_video_recommend(request_id=request_id,
+                                 mid=mid, uid=uid, app_type=app_type,
+                                 size=size, top_K=top_K, flow_pool_P=flow_pool_P,
+                                 algo_type='', client_info=client_info,
+                                 ab_code=ab_code, expire_time=expire_time,
+                                 rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                                 old_video_index=old_video_index, video_id=video_id,
+                                 params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
+        # log_.info({
+    else:
+        result = video_recommend(request_id=request_id,
                              mid=mid, uid=uid, app_type=app_type,
                              size=size, top_K=top_K, flow_pool_P=flow_pool_P,
                              algo_type='', client_info=client_info,
@@ -1205,7 +1468,10 @@ def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_
 
     # redis数据刷新
     update_redis_st = time.time()
-    update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    if ab_code == 60047 or ab_code == 60048 or  ab_code == 60049:
+        update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
+    else:
+        update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
     # log_.info({
     #     'logTimestamp': int(time.time() * 1000),
     #     'request_id': request_id,

+ 215 - 5
utils.py

@@ -3,6 +3,9 @@ import traceback
 import requests
 import json
 import time
+import gevent
+import pandas as pd
+import random
 
 from datetime import datetime
 # from db_helper import HologresHelper, RedisHelper, MysqlHelper
@@ -370,6 +373,7 @@ class FilterVideos(object):
         :param video_ids: 需过滤的视频列表 type-list
         :return: filtered_videos  过滤后的列表  type-list
         """
+        pre_time = time.time()
         if not self.mid or self.mid == 'null':
             # mid为空时,不做预曝光过滤
             return video_ids
@@ -377,7 +381,9 @@ class FilterVideos(object):
         redis_helper = RedisHelper()
         # key拼接
         key_name = f"{config_.PREVIEW_KEY_PREFIX}{self.app_type}:{self.mid}"
+        #print("key_name:", key_name)
         pe_videos_list = redis_helper.get_data_from_set(key_name)
+        #print("pe_videos_list:", pe_videos_list)
         # log_.info('****app_type = {}, mid = {}, uid = {}, pe_videos_list = {}'.format(
         #     self.app_type, self.mid, self.uid, pe_videos_list))
         # log_.info('****app_type = {}, mid = {}, uid = {}, video_ids = {}'.format(
@@ -385,7 +391,9 @@ class FilterVideos(object):
         if not pe_videos_list:
             return video_ids
         pe_videos = [int(video) for video in pe_videos_list]
+        #print("pe_videos:", len(pe_videos))
         filtered_videos = [video_id for video_id in video_ids if video_id not in pe_videos]
+        #print(f"pre res: {filtered_videos}\nexecute_time: {(time.time() - pre_time) * 1000}")
         return filtered_videos
 
     # def filter_video_status(self, video_ids):
@@ -439,9 +447,49 @@ class FilterVideos(object):
                         "uid": self.uid,
                         "types": list(types),
                         "videoIds": video_ids}
+        print(request_data)
         # 调用http接口
         result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
 
+        print("result:", result)
+        if result is None:
+            print("result is None")
+            # log_.info('过滤失败,types: {}'.format(types))
+            return []
+
+        if result['code'] != 0:
+            # log_.info('过滤失败,types: {}'.format(types))
+            return []
+
+        filtered_videos = result['data']
+        return filtered_videos
+
+    def filter_video_viewed_new(self, video_ids):
+        """
+        调用后端接口过滤用户已观看视频
+        :param video_ids: 视频id列表 type-list
+        :param types: 过滤参数 type-tuple, 默认(1, )
+        1-已观看 2-视频状态 3-是否进入老年人社区 4-话题状态 5-推荐状态 6-白名单过滤 7-涉政视频过滤
+        :return: filtered_videos
+        """
+        # 获取对应端的过滤参数types
+        st_time = time.time()
+        types = config_.FILTER_VIEWED_TYPES_CONFIG.get(self.app_type, None)
+        #print(types)
+        if types is None:
+            types = config_.FILTER_VIEWED_TYPES_CONFIG.get('other')
+        if 6 in types:
+            types = list(types)
+            types.remove(6)
+        #print(types)
+        request_data = {"appType": self.app_type,
+                        "mid": self.mid,
+                        "uid": self.uid,
+                        "types": list(types),
+                        "videoIds": video_ids}
+        # 调用http接口
+        result = request_post(request_url=config_.VIDEO_FILTER_URL, request_data=request_data, timeout=(0.1, 1))
+        #print(f"view res: {result}\nexecute_time: {(time.time() - st_time) * 1000}")
         if result is None:
             # log_.info('过滤失败,types: {}'.format(types))
             return []
@@ -460,6 +508,7 @@ class FilterVideos(object):
         :param shield_key_name_list: 过滤视频 redis-key
         :return: filtered_videos  过滤后的列表  type-list
         """
+        print("filter_shield_video:", len(filter_shield_video))
         if len(video_ids) == 0:
             return video_ids
         # 根据Redis缓存中的数据过滤
@@ -474,12 +523,171 @@ class FilterVideos(object):
             #     continue
             # shield_videos = [int(video) for video in shield_videos_list]
             # video_ids = [int(video_id) for video_id in video_ids if int(video_id) not in shield_videos]
+        print("video_ids:", len(video_ids))
         return video_ids
 
+    def new_filter_video(self):
+        """视频过滤"""
+        # 1. 预曝光过滤
+        st_pre = time.time()
+        #print("new_filter video_ids:", self.video_ids)
+        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        if not filtered_pre_result:
+            return None
+        # log_.info({
+        #      'logTimestamp': int(time.time() * 1000),
+        #      'request_id': self.request_id,
+        #      'app_type': self.app_type,
+        #      'mid': self.mid,
+        #      'uid': self.uid,
+        #      'operation': 'preview_filter',
+        #      'request_videos': self.video_ids,
+        #      'preview_filter_result': filtered_pre_result,
+        #      'executeTime': (time.time() - st_pre) * 1000
+        #  })
+        #2. 视频已曝光过滤
+        st_viewed = time.time()
+        #print("---filtered viewed---")
+        #print("filtered_pre_result:",filtered_pre_result)
+        filtered_viewed_result = self.filter_video_viewed(video_ids=filtered_pre_result)
+        if not filtered_viewed_result:
+            return None
+        return filtered_viewed_result
+
+    def new_flow_video(self, vid_list, flow_vids_set, region_code, shield_config):
+        flow_video_list = []
+        normal_video_list = []
+        for v_id in vid_list:
+            if v_id in flow_vids_set:
+                flow_video_list.append(v_id)
+            else:
+                normal_video_list.append(v_id)
+        shield_key_name_list = shield_config.get(region_code, None)
+        if shield_key_name_list is not None:
+            filtered_shield_video_ids = self.filter_shield_video(
+                video_ids=flow_video_list, shield_key_name_list=shield_key_name_list
+            )
+            return normal_video_list, filtered_shield_video_ids
+        else:
+            return normal_video_list, flow_video_list
+
+    def filter_movie_religion_video(self, video_ids):
+        """过滤白名单视频(影视,宗教)"""
+        # 影视 + 宗教: rov.filter.movie.{videoId}
+        # 宗教: rov.filter.religion.{videoId}
+        st_time = time.time()
+        if self.app_type not in [config_.APP_TYPE['WAN_NENG_VIDEO'],
+                                 config_.APP_TYPE['LAO_HAO_KAN_VIDEO'],
+                                 config_.APP_TYPE['ZUI_JING_QI'],
+                                 config_.APP_TYPE['H5']]:
+            # 过滤 影视 + 宗教
+            keys = [f"rov.filter.movie.{video_id}" for video_id in video_ids]
+        elif self.app_type in [config_.APP_TYPE['WAN_NENG_VIDEO'],
+                               config_.APP_TYPE['ZUI_JING_QI'],
+                               config_.APP_TYPE['H5']]:
+            # 过滤 影视 + 宗教
+            keys = [f"rov.filter.religion.{video_id}" for video_id in video_ids]
+        else:
+            #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
+            return video_ids
+        redis_helper = RedisHelper(redis_info=config_.REDIS_INFO_FILTER)
+        filter_videos = []
+        for i in range(len(keys)//1000 + 1):
+            video_ids_temp = video_ids[i*1000:(i+1)*1000]
+            if len(video_ids_temp) == 0:
+                break
+            mget_res = redis_helper.mget(keys=keys[i*1000:(i+1)*1000])
+            filter_videos.extend([int(data) for data in mget_res if data is not None])
+        if len(filter_videos) > 0:
+            filtered_videos = set(video_ids) - set(filter_videos)
+            #print(f"m_r res: {list(filtered_videos)}\nexecute_time: {(time.time() - st_time) * 1000}")
+            return list(filtered_videos)
+        else:
+            #print(f"m_r res: {video_ids}\nexecute_time: {(time.time() - st_time) * 1000}")
+            return video_ids
+
+    def filter_videos_new(self, pool_type='rov', region_code=None, shield_config=None):
+        """视频过滤"""
+        # 预曝光过滤
+        st_pre = time.time()
+        #print("self.video_ids:", len(self.video_ids))
+        filtered_pre_result = self.filter_video_previewed(self.video_ids)
+        if not filtered_pre_result:
+            return None
+        #print("filtered_pre_result:", len(filtered_pre_result))
+        #print(filtered_pre_result)
+        # 视频已曝光过滤/白名单过滤
+        st_viewed = time.time()
+        t = [
+            gevent.spawn(self.filter_video_viewed_new, filtered_pre_result),
+            gevent.spawn(self.filter_movie_religion_video, filtered_pre_result)]
+        gevent.joinall(t)
+        filtered_result_list = [i.get() for i in t]
+        #print("filtered_result_list1:",filtered_result_list[0])
+        #print("filtered_result_list2:",filtered_result_list[1])
+        filtered_viewed_set = set('')
+        for i in filtered_result_list[0]:
+            filtered_viewed_set.add(int(i))
+        filter_video_set =set('')
+        for j in filtered_result_list[1]:
+            filter_video_set.add(int(j))
+        filtered_viewed_result = list(filtered_viewed_set & filter_video_set)
+        #print(f"view&m_r res: {filtered_viewed_result}\nexecute_time: {(time.time() - st_viewed) * 1000}")
+        #print("filtered:",len(filtered_viewed_result))
+        if not filtered_viewed_result:
+            return None
+        filtered_viewed_videos = [int(video_id) for video_id in filtered_viewed_result]
+        #print("result:", filtered_viewed_videos)
+        if pool_type != 'flow':
+            return  filtered_viewed_videos
+        else:
+            # 流量池视频需过滤屏蔽视频
+            if region_code is None or shield_config is None:
+                return filtered_viewed_videos
+            else:
+                shield_key_name_list = shield_config.get(region_code, None)
+                if shield_key_name_list is not None:
+                    filtered_shield_video_ids = self.filter_shield_video(
+                       video_ids=filtered_viewed_videos, shield_key_name_list=shield_key_name_list
+                    )
+                    # log_.info({
+                    #     'logTimestamp': int(time.time() * 1000),
+                    #     'pool_type': pool_type,
+                    #     'request_id': self.request_id,
+                    #     'app_type': self.app_type,
+                    #     'mid': self.mid,
+                    #     'uid': self.uid,
+                    #     'operation': 'shield_filter',
+                    #     'request_videos': filtered_viewed_videos,
+                    #     'shield_filter_result': filtered_shield_video_ids,
+                    #     'executeTime': (time.time() - st_viewed) * 1000
+                    #  })
+                    return filtered_shield_video_ids
+                else:
+                    return filtered_viewed_videos
+
 
 if __name__ == '__main__':
-    # filter_ = FilterVideos(app_type=1, mid='22', uid='www', video_ids=[1, 2, 3, 55])
-    # filter_.filter_videos()
+    user = [
+        ('weixin_openid_o0w175fDc8pNnywrYN49E341tKfI', ''),
+        ('weixin_openid_o0w175YwC3hStzcR5DAQdbgzdMeI', ''),
+        ('weixin_openid_o0w175ftZDl6VJVDx9la3WVPh7mU', '15900461'),
+        ('weixin_openid_o0w175SPqpCVRcp7x1XvnX4qpIvI', '19659040'),
+        ('weixin_openid_o0w175cOnguapyWIrDrHkOWl4oFQ', '31210128'),
+        ('weixin_openid_o0w175UXYId-o71e1Q3SOheYNteQ', '33099722'),
+        ('weixin_openid_o0w175QQ5b42AtOe50bchrFgcttA', ''),
+        ('weixin_openid_o0w175bgaPlfLsp3YLDKWqLWtXX8', '35371534'),
+        ('weixin_openid_o0w175eRpvbmV6nOhM1VTyyLICWA', '30488803'),
+        ('weixin_openid_o0w175TZYvG47pQkOjyJFoxQuqsw', '')
+    ]
+    video_df = pd.read_csv('./data/videoids.csv')
+    videoid_list = video_df['videoid'].tolist()
+    for mid, uid in user:
+        video_ids = random.sample(videoid_list, 1000)
+        start_time = time.time()
+        filter_ = FilterVideos(request_id=f'{mid} - {uid}', app_type=0, mid=mid, uid=uid, video_ids=video_ids)
+        res = filter_.filter_videos_new()
+        print(f"res: {res}\nexecute_time: {(time.time() - start_time) * 1000}")
     # filter_.filter_video_status(video_ids=[1, 3, 5])
 
     # videos = [{'videoId': 9034659, 'flowPool': '3#11#3#1637824188547'}, {'videoId': 9035052, 'flowPool': '3#11#3#1637824172827'}]
@@ -491,9 +699,11 @@ if __name__ == '__main__':
 
     # update_video_w_h_rate(video_id=113, key_name='')
 
-    mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
+    # mid = "weixin_openid_obHDW5c4g3aULfCWh-68LcUSxCB"
     # request_url = f"{config_.GET_USER_30DayReturnCnt_URL}{mid}"
     # res = request_get(request_url=request_url, timeout=100)
-    res = get_user_has30day_return(mid=mid)
-    print(res, type(res))
+    # res = get_user_has30day_return(mid=mid)
+    # print(res, type(res))
+
+
 

+ 157 - 0
video_rank.py

@@ -1,3 +1,4 @@
+import json
 import random
 import numpy
 
@@ -150,6 +151,162 @@ def video_rank(data, size, top_K, flow_pool_P):
     return rank_result[:size]
 
 
+def video_new_rank(videoIds, fast_flow_set, flow_set, size, top_K, flow_pool_P):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :return: rank_result
+        """
+    add_flow_set = set('')
+    if not videoIds or len(videoIds)==0:
+        return [], add_flow_set
+
+    redisObj = RedisHelper()
+    vidKeys = []
+    for vid in videoIds:
+        vidKeys.append("k_p:"+str(vid))
+    #print("vidKeys:", vidKeys)
+    video_scores = redisObj.get_batch_key(vidKeys)
+    #print(video_scores)
+    video_items = []
+    for i in range(len(video_scores)):
+        try:
+            #print(video_scores[i])
+            if video_scores[i] is None:
+                video_items.append((videoIds[i], 0.0))
+            else:
+                video_score_str = json.loads(video_scores[i])
+                #print("video_score_str:",video_score_str)
+                video_items.append((videoIds[i], video_score_str[0]))
+        except Exception:
+            video_items.append((videoIds[i], 0.0))
+    sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
+    #print("sort_items:", sort_items)
+    rov_recall_rank = sort_items
+    fast_flow_recall_rank = []
+    flow_recall_rank = []
+    for item in sort_items:
+        if item[0] in fast_flow_set:
+            fast_flow_recall_rank.append(item)
+        elif item[0] in flow_set:
+            flow_recall_rank.append(item)
+    # all flow result
+    all_flow_recall_rank = fast_flow_recall_rank+flow_recall_rank
+    rank_result = []
+    rank_set = set('')
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(all_flow_recall_rank[:top_K])
+        all_flow_recall_rank = all_flow_recall_rank[top_K:]
+
+    for rank_item in rank_result:
+        rank_set.add(rank_item[0])
+    #print("rank_result:", rank_result)
+    # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
+    i = 0
+    left_quato = size - top_K
+    j = 0
+    jj = 0
+    while i < left_quato and (j<len(all_flow_recall_rank) or jj<len(rov_recall_rank)):
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            for flow_item in all_flow_recall_rank:
+                j+=1
+                if flow_item[0] in rank_set:
+                    continue
+                else:
+                    rank_result.append(flow_item)
+                    rank_set.add(flow_item[0])
+                    add_flow_set.add(flow_item[0])
+                i += 1
+                if i>= left_quato:
+                    break
+                
+        else:
+            for recall_item in rov_recall_rank:
+                jj+=1
+                if recall_item[0] in rank_set:
+                    continue
+                else:
+                    rank_result.append(recall_item)
+                    rank_set.add(recall_item[0])
+                i += 1
+                if i>= left_quato:
+                    break
+    #print("rank_result:", rank_result)
+    #print("add_flow_set:", add_flow_set)
+    return rank_result[:size], add_flow_set
+
+
+def refactor_video_rank(rov_recall_rank, fast_flow_set, flow_set, size, top_K, flow_pool_P):
+    """
+    视频分发排序
+    :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+    :param size: 请求数
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :return: rank_result
+    """
+    if not rov_recall_rank or len(rov_recall_rank) == 0:
+        return []
+    fast_flow_recall_rank = []
+    flow_recall_rank = []
+    for item in rov_recall_rank:
+        vid = item.get('videoId', 0)
+        #print(item)
+        if vid in fast_flow_set:
+            fast_flow_recall_rank.append(item)
+        elif vid in flow_set:
+            flow_recall_rank.append(item)
+    # all flow result
+    all_flow_recall_rank = fast_flow_recall_rank + flow_recall_rank
+    rank_result = []
+    rank_set = set('')
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(all_flow_recall_rank[:top_K])
+        all_flow_recall_rank = all_flow_recall_rank[top_K:]
+    #已存放了多少VID
+    for rank_item in rank_result:
+        rank_set.add(rank_item.get('videoId', 0))
+
+    # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
+    i = 0
+    while i < size - top_K:
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            for flow_item in all_flow_recall_rank:
+                flow_vid  = flow_item.get('videoId', 0)
+                if flow_vid in rank_set:
+                    continue
+                else:
+                    rank_result.append(flow_item)
+                    rank_set.add(flow_vid)
+        else:
+            for recall_item in rov_recall_rank:
+                flow_vid = recall_item.get('videoId', 0)
+                if flow_vid in rank_set:
+                    continue
+                else:
+                    rank_result.append(recall_item)
+                    rank_set.add(flow_vid)
+        i += 1
+    return rank_result[:size]
+
 def remove_duplicate(rov_recall, flow_recall, top_K):
     """
     对多路召回的视频去重

+ 137 - 1
video_recall.py

@@ -8,6 +8,7 @@ from db_helper import RedisHelper
 from config import set_config
 from utils import FilterVideos, get_videos_remain_view_count, get_videos_local_distribute_count, send_msg_to_feishu
 import gevent
+import  json
 
 log_ = Log()
 config_ = set_config()
@@ -16,7 +17,7 @@ config_ = set_config()
 class PoolRecall(object):
     """召回"""
     def __init__(self, request_id, app_type, client_info=None, mid='', uid='', ab_code='',
-                 rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None):
+                 rule_key='', data_key='', no_op_flag=False, params=None, rule_key_30day=None, shield_config=None, video_id=None):
         """
         初始化
         :param request_id: request_id
@@ -31,6 +32,7 @@ class PoolRecall(object):
         self.app_type = app_type
         self.mid = mid
         self.uid = uid
+        self.video_id = video_id
         self.ab_code = ab_code
         self.client_info = client_info
         self.rule_key = rule_key
@@ -1256,6 +1258,7 @@ class PoolRecall(object):
             ]
         else:
             t = [
+                #add recall video
                 # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'region_24h'),
                  # gevent.spawn(self.rov_pool_recall_with_region_by_h, province_code, size, 'day_24h'),
@@ -1263,6 +1266,7 @@ class PoolRecall(object):
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, 'region_24h', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup2', expire_time),
                  gevent.spawn(self.recall_region_dup_24h, region_code, size, '24h_dup3', expire_time)
+                 #
             ]
 
         gevent.joinall(t)
@@ -2109,3 +2113,135 @@ class PoolRecall(object):
         #     'executeTime': (time.time() - start_time) * 1000
         # })
         return pool_recall_result[:size]
+
+    #linfan
+    def get_sim_hot_item_reall(self):
+        recall_key = "sim_hot_"+str(self.video_id)
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not  None:
+            json_result =json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['sim_hot_vid_recall'],
+                     'abCode': self.ab_code}
+                )
+        return recall_result[:100]
+
+    # get region_hour_recall
+    def get_region_hour_recall(self, size=4, region_code='-1'):
+        pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H
+        recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_h'],
+                     'abCode': self.ab_code}
+                )
+        return recall_result[:100]
+
+    # get region_day_recall
+    def get_region_day_recall(self, size=4,region_code='-1'):
+        """召回池召回视频"""
+        pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H
+        recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_region_24h'],
+                     'abCode': self.ab_code}
+                )
+        return recall_result[:100]
+
+
+    def get_selected_recall(self, size=4, region_code='-1'):
+        """召回池召回视频"""
+        pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H
+        recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h'],
+                     'abCode': self.ab_code}
+                )
+        #print("recall_result:", recall_result)
+        return recall_result[:100]
+
+    def get_no_selected_recall(self, size=4, region_code='-1'):
+        """未选择召回池召回视频"""
+        pool_key_prefix = config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H
+        recall_key = f"{pool_key_prefix}{region_code}:{self.data_key}:{self.rule_key}"
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['rov_recall_24h_dup'],
+                     'abCode': self.ab_code}
+                )
+        return recall_result[:100]
+
+    def get_fast_flow_pool_recall(self, size=4):
+        """快速流量池召回视频"""
+        recall_key = f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}:{config_.QUICK_FLOW_POOL_ID}"
+
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['fast_flow_recall'],
+                     'abCode': self.ab_code}
+                )
+        return  recall_result
+
+    def get_flow_pool_recall(self, size=4):
+        """流量池召回视频"""
+        recall_key = f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{self.app_type}"
+        #print("recall_key:", recall_key)
+        data = self.redis_helper.get_data_from_redis(key_name=recall_key)
+        #print(data)
+        recall_result = []
+        if data is not None:
+            json_result = json.loads(data)
+            #print("json_result:", json_result)
+            for per_item in json_result:
+                recall_result.append(
+                    {'videoId': per_item[0], 'flowPool': '',
+                     'rovScore': per_item[1], 'pushFrom': config_.PUSH_FROM['normal_flow_recall'],
+                     'abCode': self.ab_code}
+                )
+        return  recall_result