Browse Source

update modify

linfan 1 year ago
parent
commit
04f5e92b25
2 changed files with 206 additions and 1 deletions
  1. 110 1
      recommend.py
  2. 96 0
      video_rank.py

+ 110 - 1
recommend.py

@@ -10,7 +10,7 @@ import config
 from log import Log
 from config import set_config
 from video_recall import PoolRecall
-from video_rank import video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
+from video_rank import video_new_rank2,video_new_rank,video_rank,refactor_video_rank, bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2
 from db_helper import RedisHelper
 import gevent
 from utils import FilterVideos, get_user_has30day_return
@@ -336,6 +336,115 @@ def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, al
     #
     # result['rankResult'] = rank_result
 
+    return result
+    # return rank_result, last_rov_recall_key
+
+def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
+                    expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
+                    no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
+                    shield_config=None):
+    """
+    首页线上推荐逻辑
+    :param request_id: request_id
+    :param mid: mid type-string
+    :param uid: uid type-string
+    :param size: 请求视频数量 type-int
+    :param top_K: 保证topK为召回池视频 type-int
+    :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+    :param app_type: 产品标识  type-int
+    :param algo_type: 算法类型  type-string
+    :param client_info: 用户位置信息 {"country": "国家",  "province": "省份",  "city": "城市"}
+    :param expire_time: 末位视频记录redis过期时间
+    :param ab_code: AB实验code
+    :param video_id: 相关推荐头部视频id
+    :param params:
+    :return:
+    """
+    result = {}
+    # ####### 多进程召回
+    start_recall = time.time()
+    # log_.info('====== recall')
+    recall_result_list = []
+    pool_recall = PoolRecall(request_id=request_id,
+                             app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
+                             client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
+                             params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
+
+    if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
+    else:
+        t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
+             gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
+             gevent.spawn(pool_recall.flow_pool_recall, size)]
+
+    gevent.joinall(t)
+    recall_result_list = [i.get() for i in t]
+
+    result['recallResult'] = recall_result_list
+    result['recallTime'] = (time.time() - start_recall) * 1000
+
+    # ####### 排序
+    start_rank = time.time()
+    # log_.info('====== rank')
+    if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
+        if ab_code in [
+            config_.AB_CODE['rov_rank_appType_18_19'],
+            config_.AB_CODE['rov_rank_appType_19'],
+            config_.AB_CODE['top_video_relevant_appType_19']
+        ]:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[1]
+            }
+        else:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': []
+            }
+    else:
+        if recall_result_list[1]:
+            redis_helper = RedisHelper()
+            quick_flow_pool_P = redis_helper.get_data_from_redis(
+                key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
+            )
+            if quick_flow_pool_P:
+                flow_pool_P = quick_flow_pool_P
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[1]
+            }
+        else:
+            data = {
+                'rov_pool_recall': recall_result_list[0],
+                'flow_pool_recall': recall_result_list[2]
+            }
+    #if ab_code=="ab_new_test":
+    print("data:", data)
+    #rank_result = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+    # else:
+    #     rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
+
+
+    # 老视频实验
+    # if ab_code in [config_.AB_CODE['old_video']]:
+    #     rank_result = video_rank_with_old_video(rank_result=rank_result, old_video_recall=recall_result_list[2],
+    #                                             size=size, top_K=top_K, old_video_index=old_video_index)
+
+    # end_rank = time.time()
+    # log_.info({
+    #     'logTimestamp': int(time.time() * 1000),
+    #     'request_id': request_id,
+    #     'mid': mid,
+    #     'uid': uid,
+    #     'operation': 'rank',
+    #     'rank_result': rank_result,
+    #     'executeTime': (time.time() - start_rank) * 1000
+    # })
+
+    result['rankResult'] = rank_result
+    result['rankTime'] = (time.time() - start_rank) * 1000
+
+
     return result
     # return rank_result, last_rov_recall_key
 

+ 96 - 0
video_rank.py

@@ -592,6 +592,102 @@ def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_vi
     return new_rank_result[:size]
 
 
+def video_new_rank2(data, fast_flow_set, flow_set, size, top_K, flow_pool_P):
+    """
+        视频分发排序
+        :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
+        :param size: 请求数
+        :param top_K: 保证topK为召回池视频 type-int
+        :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
+        :return: rank_result
+        """
+    add_flow_set = set('')
+    if not data or len(data) == 0:
+        return [], add_flow_set
+
+    redisObj = RedisHelper()
+    vidKeys = []
+    for vid in videoIds:
+        vidKeys.append("k_p:" + str(vid))
+    # print("vidKeys:", vidKeys)
+    video_scores = redisObj.get_batch_key(vidKeys)
+    # print(video_scores)
+    video_items = []
+    for i in range(len(video_scores)):
+        try:
+            # print(video_scores[i])
+            if video_scores[i] is None:
+                video_items.append((videoIds[i], 0.0))
+            else:
+                video_score_str = json.loads(video_scores[i])
+                # print("video_score_str:",video_score_str)
+                video_items.append((videoIds[i], video_score_str[0]))
+        except Exception:
+            video_items.append((videoIds[i], 0.0))
+    sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
+    # print("sort_items:", sort_items)
+    rov_recall_rank = sort_items
+    fast_flow_recall_rank = []
+    flow_recall_rank = []
+    for item in sort_items:
+        if item[0] in fast_flow_set:
+            fast_flow_recall_rank.append(item)
+        elif item[0] in flow_set:
+            flow_recall_rank.append(item)
+    # all flow result
+    all_flow_recall_rank = fast_flow_recall_rank + flow_recall_rank
+    rank_result = []
+    rank_set = set('')
+
+    # 从ROV召回池中获取top k
+    if len(rov_recall_rank) > 0:
+        rank_result.extend(rov_recall_rank[:top_K])
+        rov_recall_rank = rov_recall_rank[top_K:]
+    else:
+        rank_result.extend(all_flow_recall_rank[:top_K])
+        all_flow_recall_rank = all_flow_recall_rank[top_K:]
+
+    for rank_item in rank_result:
+        rank_set.add(rank_item[0])
+    # print("rank_result:", rank_result)
+    # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
+    i = 0
+    left_quato = size - top_K
+    j = 0
+    jj = 0
+    while i < left_quato and (j < len(all_flow_recall_rank) or jj < len(rov_recall_rank)):
+        # 随机生成[0, 1)浮点数
+        rand = random.random()
+        # log_.info('rand: {}'.format(rand))
+        if rand < flow_pool_P:
+            for flow_item in all_flow_recall_rank:
+                j += 1
+                if flow_item[0] in rank_set:
+                    continue
+                else:
+                    rank_result.append(flow_item)
+                    rank_set.add(flow_item[0])
+                    add_flow_set.add(flow_item[0])
+                i += 1
+                if i >= left_quato:
+                    break
+
+        else:
+            for recall_item in rov_recall_rank:
+                jj += 1
+                if recall_item[0] in rank_set:
+                    continue
+                else:
+                    rank_result.append(recall_item)
+                    rank_set.add(recall_item[0])
+                i += 1
+                if i >= left_quato:
+                    break
+    # print("rank_result:", rank_result)
+    # print("add_flow_set:", add_flow_set)
+    return rank_result[:size], add_flow_set
+
+
 if __name__ == '__main__':
     d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
               {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},