|
- import copy
- import json
- import random
- import time
- import multiprocessing
- import traceback
- import hashlib
- from datetime import datetime, timedelta
- import config
- from log import Log
- from config import set_config
- from video_recall import PoolRecall
- from video_rank import video_new_rank2, video_sank_pos_rank, video_new_rank, video_rank, refactor_video_rank, \
- bottom_strategy, video_rank_by_w_h_rate, video_rank_with_old_video, bottom_strategy2, video_new_rank3
- from db_helper import RedisHelper
- import gevent
- from utils import FilterVideos, get_user_has30day_return
- import ast
- log_ = Log()
- config_ = set_config()
- def relevant_video_top_recommend(request_id, app_type, mid, uid, head_vid, videos, size):
- """
- 相关推荐强插 运营给定置顶相关性视频
- :param request_id: request_id
- :param app_type: 产品标识 type-int
- :param mid: mid
- :param uid: uid
- :param head_vid: 相关推荐头部视频id type-int
- :param videos: 当前相关推荐结果 type-list
- :param size: 返回视频个数 type-int
- :return: rank_result
- """
- # 获取头部视频对应的相关性视频
- key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
- redis_helper = RedisHelper()
- relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
- if relevant_videos is None:
- # 该视频没有指定的相关性视频
- return videos
- relevant_videos = json.loads(relevant_videos)
- # 按照指定顺序排序
- relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False)
- # 过滤
- relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted]
- filter_helper = FilterVideos(request_id=request_id, app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid)
- filtered_ids = filter_helper.filter_videos()
- if filtered_ids is None:
- return videos
- # 获取生效中的视频
- now = int(time.time())
- relevant_videos_in_effect = [
- {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'],
- 'abCode': config_.AB_CODE['relevant_video_op']}
- for item in relevant_videos_sorted
- if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids
- ]
- if len(relevant_videos_in_effect) == 0:
- return videos
- # 与现有排序结果 进行合并重排
- # 获取现有排序结果中流量池视频 及其位置
- relevant_ids = [item['videoId'] for item in relevant_videos_in_effect]
- flow_pool_videos = []
- other_videos = []
- for i, item in enumerate(videos):
- if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids:
- flow_pool_videos.append((i, item))
- elif item.get('videoId') not in relevant_ids:
- other_videos.append(item)
- else:
- continue
- # 重排,保持流量池视频位置不变
- rank_result = relevant_videos_in_effect + other_videos
- for i, item in flow_pool_videos:
- rank_result.insert(i, item)
- return rank_result[:size]
- def video_position_recommend(request_id, mid, uid, app_type, videos):
- # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
- # algo_type=algo_type, client_info=client_info)
- redis_helper = RedisHelper()
- pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME)
- pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME)
- if pos1_vids is not None:
- pos1_vids = ast.literal_eval(pos1_vids)
- if pos2_vids is not None:
- pos2_vids = ast.literal_eval(pos2_vids)
- pos1_vids = [] if pos1_vids is None else pos1_vids
- pos2_vids = [] if pos2_vids is None else pos2_vids
- pos1_vids = [int(i) for i in pos1_vids]
- pos2_vids = [int(i) for i in pos2_vids]
- filter_1 = FilterVideos(request_id=request_id, app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid)
- filter_2 = FilterVideos(request_id=request_id, app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid)
- t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)]
- gevent.joinall(t)
- filted_list = [i.get() for i in t]
- pos1_vids = filted_list[0]
- pos2_vids = filted_list[1]
- videos = positon_duplicate(pos1_vids, pos2_vids, videos)
- if pos1_vids is not None and len(pos1_vids) >0 :
- videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
- 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
- if pos2_vids is not None and len(pos2_vids) >0 :
- videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100,
- 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
- return videos[:10]
- def positon_duplicate(pos1_vids, pos2_vids, videos):
- s = set()
- if pos1_vids is not None and len(pos1_vids) >0:
- s.add(int(pos1_vids[0]))
- if pos2_vids is not None and len(pos2_vids) >0:
- s.add(int(pos2_vids[0]))
- l = []
- for item in videos:
- if item['videoId'] in s:
- continue
- else:
- l.append(item)
- return l
- def video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
- expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
- no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
- shield_config=None, level_weight=None, flow_pool_abtest_group=None, env_dict=None):
- """
- 首页线上推荐逻辑
- :param request_id: request_id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param top_K: 保证topK为召回池视频 type-int
- :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param expire_time: 末位视频记录redis过期时间
- :param ab_code: AB实验code
- :param video_id: 相关推荐头部视频id
- :param params:
- :return:
- """
- result = {}
- # ####### 多进程召回
- start_recall = time.time()
- # log_.info('====== recall')
- '''
- cores = multiprocessing.cpu_count()
- pool = multiprocessing.Pool(processes=cores)
- pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
- _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
- pool_list = [
- # rov召回池
- pool.apply_async(pool_recall.rov_pool_recall, (size,)),
- # 流量池
- pool.apply_async(pool_recall.flow_pool_recall, (size,))
- ]
- recall_result_list = [p.get() for p in pool_list]
- pool.close()
- pool.join()
- '''
- recall_result_list = []
- pool_recall = PoolRecall(request_id=request_id,
- app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
- client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
- video_id= video_id, level_weight=level_weight, env_dict=env_dict)
- # _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
- # # 小时级实验
- # if ab_code in [code for _, code in config_.AB_CODE['rank_by_h'].items()]:
- # t = [gevent.spawn(pool_recall.rule_recall_by_h, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
- # gevent.spawn(pool_recall.flow_pool_recall, size)]
- # # 小时级实验
- # elif ab_code in [code for _, code in config_.AB_CODE['rank_by_24h'].items()]:
- # t = [gevent.spawn(pool_recall.rov_pool_recall_by_h, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
- # gevent.spawn(pool_recall.flow_pool_recall, size)]
- # 地域分组实验
- # if ab_code in [code for _, code in config_.AB_CODE['region_rank_by_h'].items()]:
- # load test config
- exp_config = None
- if ab_code == 60058:
- exp_config = pool_recall.get_U2I_config()
- elif ab_code == 60059:
- exp_config = pool_recall.get_w2v_config()
- elif ab_code == 60060:
- exp_config = pool_recall.get_test_config()
- elif ab_code == 60061:
- exp_config = pool_recall.get_simrecall_config()
- elif ab_code == 60062:
- exp_config = pool_recall.get_u2u2i_config()
- elif ab_code == 60063:
- exp_config = pool_recall.get_simrecall_config_new()
- elif ab_code == 60064:
- exp_config = pool_recall.get_video_recall_config()
- #print("exp_config:", exp_config)
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
- # if ab_code == 60058:
- # t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
- # t.append(gevent.spawn(pool_recall.get_play_reall, mid))
- # elif ab_code == 60059:
- # t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
- # elif ab_code == 60061 or ab_code == 60063:
- # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- # elif ab_code == 60062:
- # t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
- # elif ab_code == 60064:
- # t.append(gevent.spawn(pool_recall.get_return_video_reall))
- elif flow_pool_abtest_group == 'experimental_flow_set_level':
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- else:
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- if ab_code == 60058:
- t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
- t.append(gevent.spawn(pool_recall.get_play_reall, mid))
- elif ab_code == 60059:
- t.append(gevent.spawn(pool_recall.get_word2vec_item_reall))
- elif ab_code == 60061 or ab_code == 60063:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- elif ab_code == 60062:
- t.append(gevent.spawn(pool_recall.get_U2U2I_reall, mid))
- elif ab_code == 60064:
- t.append(gevent.spawn(pool_recall.get_return_video_reall))
- # 最惊奇相关推荐实验
- # elif ab_code == config_.AB_CODE['top_video_relevant_appType_19']:
- # t = [gevent.spawn(pool_recall.relevant_recall_19, video_id, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall_18_19, size)]
- # 最惊奇完整影视实验
- # elif ab_code == config_.AB_CODE['whole_movies']:
- # t = [gevent.spawn(pool_recall.rov_pool_recall_19, size, expire_time)]
- # 最惊奇/老好看实验
- # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall_18_19, size)]
- # # 天级实验
- # elif ab_code in [code for _, code in config_.AB_CODE['rank_by_day'].items()]:
- # t = [gevent.spawn(pool_recall.rov_pool_recall_by_day, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
- # gevent.spawn(pool_recall.flow_pool_recall, size)]
- # 老视频实验
- # elif ab_code in [config_.AB_CODE['old_video']]:
- # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall, size),
- # gevent.spawn(pool_recall.old_videos_recall, size)]
- # else:
- # if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time)]
- # else:
- # t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time),
- # gevent.spawn(pool_recall.flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
- # gevent.spawn(pool_recall.flow_pool_recall, size)]
- gevent.joinall(t)
- recall_result_list = [i.get() for i in t]
- # end_recall = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'recall',
- # 'recall_result': recall_result_list,
- # 'executeTime': (time.time() - start_recall) * 1000
- # })
- result['recallResult'] = recall_result_list
- result['recallTime'] = (time.time() - start_recall) * 1000
- # add_flow_pool_recall_log
- recall_result_list = copy.deepcopy(recall_result_list)
- flow_pool_recall_process = {}
- # ####### 排序
- start_rank = time.time()
- # log_.info('====== rank')
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- if ab_code in [
- config_.AB_CODE['rov_rank_appType_18_19'],
- config_.AB_CODE['rov_rank_appType_19'],
- config_.AB_CODE['top_video_relevant_appType_19']
- ]:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[1][0]
- # 'flow_pool_recall': recall_result_list[1]
- }
- else:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- 'flow_pool_recall': []
- }
- else:
- # add_flow_pool_recall_log
- if recall_result_list[1][0]:
- # if recall_result_list[1]:
- redis_helper = RedisHelper()
- quick_flow_pool_P = redis_helper.get_data_from_redis(
- key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
- )
- if quick_flow_pool_P:
- flow_pool_P = quick_flow_pool_P
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[1][0]
- # 'flow_pool_recall': recall_result_list[1]
- }
- # add_flow_pool_recall_log
- flow_pool_recall_process = recall_result_list[1][1]
- else:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[2][0]
- # 'flow_pool_recall': recall_result_list[2]
- }
- # add_flow_pool_recall_log
- flow_pool_recall_process = recall_result_list[2][1]
- data['u2i_recall'] = []
- data['u2i_play_recall'] = []
- data['w2v_recall'] = []
- data['sim_recall'] = []
- data['u2u2i_recall'] = []
- if ab_code == 60058:
- if len(recall_result_list)>=4:
- data['u2i_recall'] = recall_result_list[3]
- if len(recall_result_list)>=5:
- data['u2i_play_recall'] = recall_result_list[4]
- elif ab_code == 60059:
- if len(recall_result_list)>=4:
- data['w2v_recall'] = recall_result_list[3]
- elif (ab_code == 60061 or ab_code == 60063):
- if len(recall_result_list)>=4:
- data['sim_recall'] = recall_result_list[3]
- elif ab_code == 60062:
- if len(recall_result_list)>=4:
- data['u2u2i_recall'] = recall_result_list[3]
- elif ab_code == 60064:
- if len(recall_result_list)>=4: #if ab_code=="ab_new_test":
- data['return_video_recall'] = recall_result_list[3] # rank_result = video_new_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
- #else:
- #print("data['hot_recall']", data['hot_recall'])
- # 60058: u2itag, 60059:word2vec, 60061: sim_recall, 60062: u2u2i
- # if ab_code == 60058 or ab_code == 60059 or ab_code == 60060 or ab_code == 60061 \
- # or ab_code == 60062 or ab_code== 60063 or ab_code == 60064:
- # rank_result, flow_num = video_sank_pos_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_Code=ab_code, exp_config=exp_config)
- # result['flow_num'] = flow_num
- # if rank_result:
- # result['rank_num'] = len(rank_result)
- # else:
- # rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
- # add_flow_pool_recall_log
- rank_result, flow_pool_recall_process = video_rank(data=data, size=size, top_K=top_K,
- flow_pool_P=float(flow_pool_P),
- flow_pool_recall_process=flow_pool_recall_process)
- # 老视频实验
- # if ab_code in [config_.AB_CODE['old_video']]:
- # rank_result = video_rank_with_old_video(rank_result=rank_result, old_video_recall=recall_result_list[2],
- # size=size, top_K=top_K, old_video_index=old_video_index)
- # end_rank = time.time()
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'rank',
- # 'rank_result': rank_result,
- # 'executeTime': (time.time() - start_rank) * 1000
- # })
- result['rankResult'] = rank_result
- result['rankTime'] = (time.time() - start_rank) * 1000
- # if not rank_result:
- # # 兜底策略
- # # log_.info('====== bottom strategy')
- # start_bottom = time.time()
- # rank_result = bottom_strategy2(
- # size=size, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, params=params
- # )
- #
- # # if ab_code == config_.AB_CODE['region_rank_by_h'].get('abtest_130'):
- # # rank_result = bottom_strategy2(
- # # size=size, app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info, params=params
- # # )
- # # else:
- # # rank_result = bottom_strategy(
- # # request_id=request_id, size=size, app_type=app_type, ab_code=ab_code, params=params
- # # )
- #
- # # log_.info({
- # # 'logTimestamp': int(time.time() * 1000),
- # # 'request_id': request_id,
- # # 'mid': mid,
- # # 'uid': uid,
- # # 'operation': 'bottom',
- # # 'bottom_result': rank_result,
- # # 'executeTime': (time.time() - start_bottom) * 1000
- # # })
- # result['bottomResult'] = rank_result
- # result['bottomTime'] = (time.time() - start_bottom) * 1000
- #
- # result['rankResult'] = rank_result
- # add_flow_pool_recall_log
- flow_pool_recall_process['rank_result'] = rank_result
- result['flow_pool_recall_process'] = flow_pool_recall_process
- return result
- # return rank_result, last_rov_recall_key
- def video_old_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
- expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
- no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
- shield_config=None, env_dict=None, level_weight=None, flow_pool_abtest_group=None,
- rank_key_prefix=None, h_data_key=None, h_rule_key=None):
- """
- 首页线上推荐逻辑
- :param request_id: request_id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param top_K: 保证topK为召回池视频 type-int
- :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param expire_time: 末位视频记录redis过期时间
- :param ab_code: AB实验code
- :param video_id: 相关推荐头部视频id
- :param params:
- :return:
- """
- result = {}
- # ####### 多进程召回
- start_recall = time.time()
- # log_.info('====== recall')
- #print("abcode",ab_code)
- recall_result_list = []
- pool_recall = PoolRecall(request_id=request_id,
- app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
- client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- params=params, rule_key_30day=rule_key_30day, shield_config=shield_config,
- video_id=video_id, level_weight=level_weight,
- h_data_key=h_data_key, h_rule_key=h_rule_key,
- env_dict=env_dict
- )
- exp_config = pool_recall.get_sort_ab_codel_config()
- # 60054 全量: simrecall+融合排序
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time)]
- # if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073:
- # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- # if ab_code == 60056 or ab_code == 60071:
- # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- # t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
- # if ab_code ==60067 or ab_code == 60069:
- # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- # t.append(gevent.spawn(pool_recall.get_return_video_reall))
- # if ab_code == 60068 or ab_code == 60070:
- # t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- # t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
- elif flow_pool_abtest_group == 'experimental_flow_set_level':
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall_new_with_level_score2,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- else:
- t = [gevent.spawn(pool_recall.rov_pool_recall_with_region, size, expire_time),
- gevent.spawn(pool_recall.flow_pool_recall,
- size, flow_pool_id=config_.QUICK_FLOW_POOL_ID, flow_pool_abtest_group=flow_pool_abtest_group),
- gevent.spawn(pool_recall.flow_pool_recall,
- size, flow_pool_abtest_group=flow_pool_abtest_group)]
- if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074 \
- or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079 \
- or ab_code == 60087 or ab_code == 60088 or ab_code == 60089 or ab_code == 60090 \
- or ab_code == 60091:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- elif ab_code == 60056 or ab_code == 60071:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- t.append(gevent.spawn(pool_recall.get_U2I_reall, mid))
- elif ab_code == 60067 or ab_code == 60069:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- t.append(gevent.spawn(pool_recall.get_return_video_reall))
- elif ab_code == 60068 or ab_code == 60070 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082 \
- or ab_code == 60083 or ab_code == 60084 or ab_code == 60085 or ab_code == 60086 \
- or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 or ab_code == 60096\
- or ab_code == 60097:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall_filter))
- t.append(gevent.spawn(pool_recall.get_return_video_reall, 'rv2:'))
- gevent.joinall(t)
- recall_result_list = [i.get() for i in t]
- #print("recall:",recall_result_list)
- if len(recall_result_list)<0:
- result['recallResult']= []
- result['rankResult'] = []
- return result
- #1. merge simrecall or deepfm
- if ab_code == 60054 or ab_code == 60066 or ab_code == 60072 or ab_code == 60073 or ab_code == 60074 \
- or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 or ab_code == 60078 or ab_code == 60079 \
- or ab_code == 60087 or ab_code == 60088 or ab_code == 60089 or ab_code == 60090 \
- or ab_code == 60091:
- rov_pool_recall = []
- if len(recall_result_list) >= 2:
- region_recall = recall_result_list[0]
- sim_recall = []
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- sim_recall = recall_result_list[1]
- else:
- if len(recall_result_list)>=4:
- sim_recall = recall_result_list[3]
- now_video_ids = set('')
- if len(region_recall) > 0:
- for video in region_recall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(sim_recall) > 0:
- for video in sim_recall:
- video_id = video.get('videoId')
- # print("sim video_id:", video_id)
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(rov_pool_recall) > 0:
- recall_result_list[0] = rov_pool_recall
- # merge simrecall, merge u2i title recall, deepfm
- if ab_code == 60056 or ab_code == 60071:
- rov_pool_recall = []
- if len(recall_result_list)>=2:
- region_recall = recall_result_list[0]
- sim_recall = []
- u2i_title_recall = []
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- sim_recall = recall_result_list[1]
- if len(recall_result_list) >= 3:
- u2i_title_recall = recall_result_list[2]
- else:
- if len(recall_result_list) >= 4:
- sim_recall = recall_result_list[3]
- if len(recall_result_list) >= 5:
- u2i_title_recall = recall_result_list[4]
- #print("u2i_title_recall:", u2i_title_recall)
- now_video_ids = set('')
- if len(region_recall)>0:
- for video in region_recall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(sim_recall) > 0:
- for video in sim_recall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(u2i_title_recall)>0:
- for video in u2i_title_recall:
- video_id = video.get('videoId')
- #print("sim video_id:", video_id)
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(rov_pool_recall)>0:
- recall_result_list[0] = rov_pool_recall
- #2. merge simrecall, return video
- if ab_code == 60067 or ab_code == 60068 or ab_code == 60069 or ab_code == 60070 \
- or ab_code == 60080 or ab_code == 60081 or ab_code == 60082 or ab_code == 60083 or ab_code == 60084\
- or ab_code == 60085 or ab_code == 60086 \
- or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 or ab_code == 60096\
- or ab_code == 60097:
- rov_pool_recall = []
- if len(recall_result_list)>=2:
- region_recall = recall_result_list[0]
- return_video_reall = []
- sim_recall = []
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- sim_recall = recall_result_list[1]
- if len(recall_result_list) >= 3:
- return_video_reall = recall_result_list[2]
- else:
- if len(recall_result_list)>=4:
- sim_recall = recall_result_list[3]
- if len(recall_result_list)>=5:
- return_video_reall = recall_result_list[4]
- #print("sim:",sim_recall)
- now_video_ids = set('')
- if len(region_recall)>0:
- for video in region_recall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(sim_recall) > 0:
- for video in sim_recall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(return_video_reall)>0:
- for video in return_video_reall:
- video_id = video.get('videoId')
- if video_id not in now_video_ids:
- rov_pool_recall.append(video)
- now_video_ids.add(video_id)
- if len(rov_pool_recall)>0:
- recall_result_list[0] = rov_pool_recall
- result['recallResult'] = recall_result_list
- result['recallTime'] = (time.time() - start_recall) * 1000
- # add_flow_pool_recall_log
- recall_result_list = copy.deepcopy(recall_result_list)
- flow_pool_recall_process = {}
- # ####### 排序
- start_rank = time.time()
- # log_.info('====== rank')
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- if ab_code in [
- config_.AB_CODE['rov_rank_appType_18_19'],
- config_.AB_CODE['rov_rank_appType_19'],
- config_.AB_CODE['top_video_relevant_appType_19']
- ]:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[1][0]
- # 'flow_pool_recall': recall_result_list[1]
- }
- else:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- 'flow_pool_recall': []
- }
- else:
- # add_flow_pool_recall_log
- if recall_result_list[1][0]:
- # if recall_result_list[1]:
- redis_helper = RedisHelper()
- quick_flow_pool_P = redis_helper.get_data_from_redis(
- key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
- )
- if quick_flow_pool_P:
- flow_pool_P = quick_flow_pool_P
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[1][0]
- # 'flow_pool_recall': recall_result_list[1]
- }
- # add_flow_pool_recall_log
- flow_pool_recall_process = recall_result_list[1][1].copy()
- else:
- data = {
- 'rov_pool_recall': recall_result_list[0],
- # add_flow_pool_recall_log
- 'flow_pool_recall': recall_result_list[2][0]
- # 'flow_pool_recall': recall_result_list[2]
- }
- # add_flow_pool_recall_log
- flow_pool_recall_process = recall_result_list[2][1]
- # 3. 特征回流
- #
- # for recall_item in data['rov_pool_recall']:
- # if len(recall_item) <= 0:
- # continue
- # vid = recall_item.get("videoId", 0)
- # rec_recall_list.append(vid)
- # rec_recall_item_list.append(recall_item)
- # redisObj = RedisHelper()
- # video_static_info = redisObj.get_batch_key(vidKeys)
- # video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
- # vid_day_fea_list = []
- # vid_hour_fea_list = []
- # if video_static_info:
- # vid_day_fea_list = video_static_info
- # if video_hour_static_info:
- # vid_hour_fea_list = video_hour_static_info
- if env_dict:
- province_code = client_info.get('provinceCode', -1)
- if province_code and province_code == "":
- province_code =-1
- city_code = client_info.get('cityCode', -1)
- if city_code and city_code == "":
- city_code = -1
- env_dict['mid'] = mid
- env_dict['province_code'] = province_code
- env_dict['city_code'] = city_code
- env_json = env_dict
- #4.
- # rank_result, flow_num = video_new_rank2(data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), ab_code=ab_code, mid=mid, exp_config=exp_config, env_dict=env_dict)
- # rank_result, flow_num = video_new_rank3(
- # data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix
- # )
- # add_flow_pool_recall_log
- rank_result, flow_num, flow_pool_recall_process = video_new_rank3(
- data=data, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P), rank_key_prefix=rank_key_prefix,
- flow_pool_recall_process=flow_pool_recall_process
- )
- #print(rank_result)
- if rank_result:
- result['rank_num'] = len(rank_result)
- day_vidKeys = []
- hour_vidKeys = []
- rec_recall_list = []
- pre_str = "v_ctr:"
- pre_hour_str = "v_hour_ctr:"
- if env_dict and len(rank_result)>0:
- for rec_item in rank_result:
- vid = rec_item.get("videoId", 0)
- rec_recall_list.append(vid)
- day_vidKeys.append(pre_str+str(vid))
- hour_vidKeys.append(pre_hour_str+str(vid))
- redisObj = RedisHelper()
- video_static_info = redisObj.get_batch_key(day_vidKeys)
- video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
- vid_day_fea_list = []
- vid_hour_fea_list = []
- if video_static_info:
- vid_day_fea_list = video_static_info
- if video_hour_static_info:
- vid_hour_fea_list = video_hour_static_info
- env_dict['recall_list'] = rec_recall_list
- env_dict['vid_day_fea_list'] = vid_day_fea_list
- env_dict['vid_hour_fea_list'] = vid_hour_fea_list
- env_json = env_dict
- result['rankResult'] = rank_result
- result['flow_num'] = flow_num
- result['rankTime'] = (time.time() - start_rank) * 1000
- # add_flow_pool_recall_log
- flow_pool_recall_process['rank_result'] = rank_result
- result['flow_pool_recall_process'] = flow_pool_recall_process
- return result, env_json
- # return rank_result, last_rov_recall_key
- def new_video_recommend(request_id, mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info,
- expire_time=24*3600, ab_code=config_.AB_CODE['initial'], rule_key='', data_key='',
- no_op_flag=False, old_video_index=-1, video_id=None, params=None, rule_key_30day=None,
- shield_config=None):
- """
- 首页线上推荐逻辑
- :param request_id: request_id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param top_K: 保证topK为召回池视频 type-int
- :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param expire_time: 末位视频记录redis过期时间
- :param ab_code: AB实验code
- :param video_id: 相关推荐头部视频id
- :param params:
- :return:
- """
- #1. recall
- result = {}
- result['rankResult'] = []
- # ####### 多进程召回
- start_recall = time.time()
- # 1. 根据城市或者省份获取region_code
- city_code_list = [code for _, code in config_.CITY_CODE.items()]
- # 获取provinceCode
- province_code = client_info.get('provinceCode', '-1')
- # 获取cityCode
- city_code = client_info.get('cityCode', '-1')
- if city_code in city_code_list:
- # 分城市数据存在时,获取城市分组数据
- region_code = city_code
- else:
- region_code = province_code
- if region_code == '':
- region_code = '-1'
-
- #print("region_code:", region_code)
- #size =1000
- pool_recall = PoolRecall(request_id=request_id,
- app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
- client_info=client_info, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- params=params, rule_key_30day=rule_key_30day, shield_config=shield_config, video_id= video_id)
- if app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- t = [gevent.spawn(pool_recall.get_region_hour_recall, size, region_code),
- gevent.spawn(pool_recall.get_region_day_recall, size, region_code),
- gevent.spawn(pool_recall.get_selected_recall, size, region_code),
- gevent.spawn(pool_recall.get_no_selected_recall, size, region_code)
- ]
- if ab_code == 60049:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall))
- else:
- t = [
- gevent.spawn(pool_recall.get_region_hour_recall, size, region_code),
- gevent.spawn(pool_recall.get_region_day_recall, size, region_code),
- gevent.spawn(pool_recall.get_selected_recall, size, region_code),
- gevent.spawn(pool_recall.get_no_selected_recall, size, region_code),
- gevent.spawn(pool_recall.new_flow_pool_recall, size, config_.QUICK_FLOW_POOL_ID),
- gevent.spawn(pool_recall.new_flow_pool_recall, size)]
- if ab_code ==60049:
- t.append(gevent.spawn(pool_recall.get_sim_hot_item_reall))
- gevent.joinall(t)
- # all recall_result
- all_recall_result_list = [i.get() for i in t]
- all_recall_result = []
- #print(all_recall_result_list)
- result['recallTime'] = (time.time() - start_recall) * 1000
- #print("recall time:", result['recallTime'])
- if not all_recall_result_list or len(all_recall_result_list)==0:
- return result
- for recall_item in all_recall_result_list:
- if not recall_item or len(recall_item)==0:
- continue
- for per_item in recall_item:
- all_recall_result.append(per_item)
- #print("all_recall_result:", all_recall_result)
- #2. duplicate
- dup_time = time.time()
- recall_dict = {}
- fast_flow_set = set('')
- flow_flow_set = set('')
- region_h_recall = []
- region_day_recall = []
- select_day_recall = []
- no_selected_recall = []
- sim_hot_recall = []
- flow_recall = []
- flowFlag_dict = {}
- for per_item in all_recall_result:
- #print(per_item)
- try:
- vId = int(per_item.get("videoId",0))
- if vId==0:
- continue
- recall_name = per_item.get("pushFrom",'')
- flow_pool = per_item.get("flowPool", '')
- if flow_pool != '':
- flow_pool_id = int(flow_pool.split('#')[0])
- if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
- fast_flow_set.add(vId)
- else:
- flow_flow_set.add(vId)
- flowFlag_dict[vId] = flow_pool
- #duplicate divide into
- if vId not in recall_dict:
- if recall_name == config_.PUSH_FROM['rov_recall_region_h']:
- region_h_recall.append(per_item)
- elif recall_name == config_.PUSH_FROM['rov_recall_region_24h']:
- region_day_recall.append(per_item)
- elif recall_name == config_.PUSH_FROM['rov_recall_24h']:
- select_day_recall.append(per_item)
- elif recall_name == config_.PUSH_FROM['rov_recall_24h_dup']:
- no_selected_recall.append(per_item)
- elif recall_name == config_.PUSH_FROM['sim_hot_vid_recall']:
- sim_hot_recall.append(per_item)
- elif recall_name == config_.PUSH_FROM['flow_recall']:
- flow_recall.append(per_item)
- if vId not in recall_dict:
- recall_dict[vId] = recall_name
- else:
- recall_name = recall_dict[vId] + "," + recall_name
- recall_dict[vId] = recall_name
- except:
- continue
- #print("recall_dict:", recall_dict)
- #print("recall time:", (time.time()-dup_time)*1000)
- #3. filter video, 先过预曝光
- filter_time = time.time()
- filter_ = FilterVideos(request_id=request_id,
- app_type=app_type, mid=mid, uid=uid, video_ids=list(recall_dict.keys()))
- #print("filer:", list(recall_dict.keys()))
- #a).expose filter
- #all_recall_list = list(recall_dict.keys())
- all_recall_list = filter_.filter_videos_new(region_code=region_code, shield_config=shield_config, flow_set=flowFlag_dict.keys())
- #print("filer after:", all_recall_list)
- #print("filter_ time:", (time.time() - filter_time) * 1000)
- #4. sort: old sort: flow 按概率出
- start_rank = time.time()
- #quick_flow_pool_P get from redis
- redis_helper = RedisHelper()
- quick_flow_pool_P = redis_helper.get_data_from_redis(
- key_name=f"{config_.QUICK_FLOWPOOL_DISTRIBUTE_RATE_KEY_NAME_PREFIX}{config_.QUICK_FLOW_POOL_ID}"
- )
- if quick_flow_pool_P:
- flow_pool_P = quick_flow_pool_P
- rank_result= []
- if ab_code==60048 or ab_code==60049:
- rank_ids, add_flow_set = video_new_rank(videoIds=all_recall_list,fast_flow_set=fast_flow_set, flow_set=flow_flow_set,size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
- #print("rank_ids:", rank_ids)
- for rank_item in rank_ids:
- rank_id = rank_item[0]
- rank_score = rank_item[1]
- pushFrom = recall_dict.get(rank_id, '')
- #print(pushFrom, rank_id)
- flowPoolFlag = ''
- if rank_id in add_flow_set:
- flowPoolFlag = flowFlag_dict.get(rank_id,'')
- rank_result.append({'videoId': rank_id, 'flowPool': flowPoolFlag,
- 'rovScore': rank_score, 'pushFrom': pushFrom,
- 'abCode': ab_code})
- #
- #print("rank_result:", rank_result)
- else:
- all_dup_recall_result = region_h_recall+region_day_recall+select_day_recall+no_selected_recall+flow_recall
- rank_result = refactor_video_rank(rov_recall_rank=all_dup_recall_result,fast_flow_set=fast_flow_set, flow_set=flow_flow_set, size=size, top_K=top_K, flow_pool_P=float(flow_pool_P))
- result['rankResult'] = rank_result
- result['rankTime'] = (time.time() - start_rank) * 1000
- #print("rank time:", result['rankTime'])
- return result
- # return rank_result, last_rov_recall_key
- def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
- """
- 对排序后的结果 按照AB实验进行对应的分组操作
- :param rank_result: 排序后的结果
- :param ab_code_list: 此次请求参与的 ab实验组
- :param app_type: 产品标识
- :param mid: mid
- :param uid: uid
- :param kwargs: 其他参数
- :return:
- """
- # ####### 视频宽高比AB实验
- # 对内容精选进行 视频宽高比分发实验
- # if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []):
- # rank_result = video_rank_by_w_h_rate(videos=rank_result)
- # log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format(
- # app_type, mid, uid, rank_result))
- # 按position位置排序
- if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []):
- rank_result = video_position_recommend(mid, uid, app_type, rank_result)
- print('===========================')
- print(rank_result)
- log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format(
- app_type, mid, uid, rank_result))
- # 相关推荐强插
- # if config_.AB_CODE['relevant_video_op'] in ab_code_list \
- # and app_type in config_.AB_TEST.get('relevant_video_op', []):
- # head_vid = kwargs['head_vid']
- # size = kwargs['size']
- # rank_result = relevant_video_top_recommend(
- # app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size
- # )
- # log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format(
- # app_type, mid, uid, head_vid, rank_result))
- return rank_result
- def update_redis_data(result, app_type, mid, top_K, expire_time=24*3600, level_weight=None, flow_pool_abtest_group=None):
- """
- 根据最终的排序结果更新相关redis数据
- :param result: 排序结果
- :param app_type: 产品标识
- :param mid: mid
- :param top_K: 保证topK为召回池视频 type-int
- :param expire_time: 末位视频记录redis过期时间
- :return: None
- """
- # ####### redis数据刷新
- try:
- redis_helper = RedisHelper()
- # log_.info('====== update redis')
- if mid and mid != 'null':
- # mid为空时,不做预曝光和定位数据更新
- # 预曝光数据同步刷新到Redis, 过期时间为0.5h
- preview_key_name = f"{config_.PREVIEW_KEY_PREFIX}{app_type}:{mid}"
- preview_video_ids = [int(item['videoId']) for item in result]
- if preview_video_ids:
- # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
- redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
- # log_.info('preview redis update success!')
- # # 将此次获取的ROV召回池top_K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
- # rov_recall_video = [item['videoId'] for item in result[:top_K]
- # if item['pushFrom'] == config_.PUSH_FROM['rov_recall']]
- # if len(rov_recall_video) > 0:
- # if app_type == config_.APP_TYPE['APP']:
- # key_name = config_.UPDATE_ROV_KEY_NAME_APP
- # else:
- # key_name = config_.UPDATE_ROV_KEY_NAME
- # if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]):
- # redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1],
- # expire_time=expire_time)
- # log_.info('last video redis update success!')
- # 将此次获取的 地域分组小时级数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_h_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_region_h']]
- if len(rov_recall_h_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_h_video[-1],
- expire_time=expire_time)
- # 将此次获取的 不分地域小时级数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_h_h_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_h_h']]
- if len(rov_recall_h_h_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP_H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_h_h_video[-1],
- expire_time=expire_time)
- # 将此次获取的 地域分组相对24h数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_24h_dup1_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_region_24h']]
- if len(rov_recall_24h_dup1_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP1_24H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup1_video[-1],
- expire_time=expire_time)
- # 将此次获取的 相对24h筛选数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_24h_dup2_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_24h']]
- if len(rov_recall_24h_dup2_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP2_24H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup2_video[-1],
- expire_time=expire_time)
- # 将此次获取的 相对24h筛选后剩余数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_24h_dup3_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_24h_dup']]
- if len(rov_recall_24h_dup3_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_24H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_24h_dup3_video[-1],
- expire_time=expire_time)
- # 将此次获取的 不分地域小时级数据列表(不做离线去重)中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- rov_recall_h_h_without_dup_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall_h_h_without_dup']]
- if len(rov_recall_h_h_without_dup_video) > 0:
- last_video_key = f'{config_.LAST_VIDEO_FROM_H_PREFIX}{app_type}:{mid}'
- redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_h_h_without_dup_video[-1],
- expire_time=expire_time)
- # # 将此次获取的 相对48h筛选数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- # rov_recall_48h_dup2_video = [item['videoId'] for item in result[:top_K]
- # if item['pushFrom'] == config_.PUSH_FROM['rov_recall_48h']]
- # if len(rov_recall_48h_dup2_video) > 0:
- # last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP2_48H_PREFIX}{app_type}:{mid}'
- # redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_48h_dup2_video[-1],
- # expire_time=expire_time)
- #
- # # 将此次获取的 相对48h筛选后剩余数据列表 中的视频id同步刷新到redis中,方便下次快速定位到召回位置
- # rov_recall_48h_dup3_video = [item['videoId'] for item in result[:top_K]
- # if item['pushFrom'] == config_.PUSH_FROM['rov_recall_48h_dup']]
- # if len(rov_recall_48h_dup3_video) > 0:
- # last_video_key = f'{config_.LAST_VIDEO_FROM_REGION_DUP3_48H_PREFIX}{app_type}:{mid}'
- # redis_helper.set_data_to_redis(key_name=last_video_key, value=rov_recall_48h_dup3_video[-1],
- # expire_time=expire_time)
- # 将此次分发的流量池视频,对 本地分发数-1 进行记录
- if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # 获取本地分发数-1策略开关
- switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME)
- if switch is not None:
- if int(switch) == 1:
- flow_recall_video = [item for item in result if item.get('flowPool', None) is not None]
- else:
- flow_recall_video = [item for item in result if
- item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
- else:
- flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
- if flow_recall_video:
- if flow_pool_abtest_group == 'experimental_flow_set_level':
- update_local_distribute_count_new_with_level(flow_recall_video, level_weight)
- elif flow_pool_abtest_group == 'experimental_flow_set_level_score':
- update_local_distribute_count_new_with_level_score(flow_recall_video, level_weight)
- else:
- update_local_distribute_count(flow_recall_video)
- # update_local_distribute_count_new(flow_recall_video)
- # log_.info('update local distribute count success!')
- # 限流视频分发数记录
- if app_type == config_.APP_TYPE['APP']:
- # APP 不计入
- return
- limit_video_id_list = redis_helper.get_data_from_set(
- key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.today().strftime('%Y%m%d')}"
- )
- if limit_video_id_list is not None:
- limit_video_id_list = [int(item) for item in limit_video_id_list]
- for item in result:
- video_id = item['videoId']
- if video_id in limit_video_id_list:
- key_name = f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}"
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=24*2600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=24*3600)
- except Exception as e:
- log_.error("update redis data fail!")
- log_.error(traceback.format_exc())
- def update_flow_redis_data(result, app_type, mid, top_K, expire_time=24*3600):
- """
- 根据最终的排序结果更新相关redis数据
- :param result: 排序结果
- :param app_type: 产品标识
- :param mid: mid
- :param top_K: 保证topK为召回池视频 type-int
- :param expire_time: 末位视频记录redis过期时间
- :return: None
- """
- # ####### redis数据刷新
- try:
- redis_helper = RedisHelper()
- # log_.info('====== update redis')
- if mid and mid != 'null':
- # mid为空时,不做预曝光和定位数据更新
- # 预曝光数据同步刷新到Redis, 过期时间为0.5h
- preview_key_name = f"{config_.PREVIEW_KEY_PREFIX}{app_type}:{mid}"
- preview_video_ids = [int(item['videoId']) for item in result]
- if preview_video_ids:
- # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
- redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
- # log_.info('preview redis update success!')
- # 将此次分发的流量池视频,对 本地分发数-1 进行记录
- if app_type not in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # 获取本地分发数-1策略开关
- switch = redis_helper.get_data_from_redis(key_name=config_.IN_FLOW_POOL_COUNT_SWITCH_KEY_NAME)
- if switch is not None:
- if int(switch) == 1:
- flow_recall_video = [item for item in result if item.get('flowPool', None) is not None]
- else:
- flow_recall_video = [item for item in result if
- item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
- else:
- flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
- if flow_recall_video:
- update_local_distribute_count(flow_recall_video)
- # log_.info('update local distribute count success!')
- # 限流视频分发数记录
- if app_type == config_.APP_TYPE['APP']:
- # APP 不计入
- return
- limit_video_id_list = redis_helper.get_data_from_set(
- key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_SET}{datetime.today().strftime('%Y%m%d')}"
- )
- if limit_video_id_list is not None:
- limit_video_id_list = [int(item) for item in limit_video_id_list]
- for item in result:
- video_id = item['videoId']
- if video_id in limit_video_id_list:
- key_name = f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}"
- redis_helper.setnx_key(key_name=key_name, value=0, expire_time=24*2600)
- redis_helper.incr_key(key_name=key_name, amount=1, expire_time=24*3600)
- except Exception as e:
- log_.error("update redis data fail!")
- log_.error(traceback.format_exc())
- def update_local_distribute_count(videos):
- """
- 更新本地分发数
- :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
- 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
- :return:
- """
- try:
- redis_helper = RedisHelper()
- for item in videos:
- video_id, flow_pool = item['videoId'], item['flowPool']
- key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
- # 本地记录的分发数 - 1
- redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
- # 对该视频做分发数检查
- cur_count = redis_helper.get_data_from_redis(key_name=key_name)
- # 无记录
- if cur_count is None:
- continue
- # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
- if int(cur_count) <= 0:
- add_remove_log = False
- redis_helper.del_keys(key_name=key_name)
- for app_name in config_.APP_TYPE:
- app_type = config_.APP_TYPE.get(app_name)
- flow_pool_key_list = [
- f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}",
- f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
- ]
- for key in flow_pool_key_list:
- remove_res = redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}")
- if remove_res > 0:
- add_remove_log = True
- video_flow_pool_key_list = [
- f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}",
- f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}"
- ]
- for key in video_flow_pool_key_list:
- redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, ))
- if add_remove_log is True:
- log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
- # if redis_helper.key_exists(key_name=key_name):
- # # 该视频本地有记录,本地记录的分发数 - 1
- # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # else:
- # # 该视频本地无记录,接口获取的分发数 - 1
- # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
- except Exception as e:
- log_.error('update_local_distribute_count error...')
- log_.error(traceback.format_exc())
- def update_local_distribute_count_new(videos):
- """
- 更新本地分发数
- :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
- 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
- :return:
- """
- try:
- redis_helper = RedisHelper()
- for item in videos:
- video_id, flow_pool = item['videoId'], item['flowPool']
- key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
- # 本地记录的分发数 - 1
- redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
- # 对该视频做分发数检查
- cur_count = redis_helper.get_data_from_redis(key_name=key_name)
- # 无记录
- if cur_count is None:
- continue
- # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
- if int(cur_count) <= 0:
- add_remove_log = False
- redis_helper.del_keys(key_name=key_name)
- for app_name in config_.APP_TYPE:
- app_type = config_.APP_TYPE.get(app_name)
- flow_pool_key_list = [
- f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}",
- f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
- ]
- for key in flow_pool_key_list:
- remove_res = redis_helper.remove_value_from_set(key_name=key,
- values=(f"{video_id}-{flow_pool}", ))
- if remove_res > 0:
- add_remove_log = True
- video_flow_pool_key_list = [
- f"{config_.QUICK_FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}:{video_id}",
- f"{config_.FLOWPOOL_VIDEO_INFO_KEY_NAME_PREFIX}{app_type}:{video_id}"
- ]
- for key in video_flow_pool_key_list:
- redis_helper.remove_value_from_set(key_name=key, values=(flow_pool, ))
- if add_remove_log is True:
- log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
- # if redis_helper.key_exists(key_name=key_name):
- # # 该视频本地有记录,本地记录的分发数 - 1
- # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # else:
- # # 该视频本地无记录,接口获取的分发数 - 1
- # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
- except Exception as e:
- log_.error('update_local_distribute_count error...')
- log_.error(traceback.format_exc())
- def update_local_distribute_count_new_with_level(videos, level_weight):
- """
- 更新本地分发数
- :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
- 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
- :return:
- """
- try:
- redis_helper = RedisHelper()
- # level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
- # level_list = [level for level in json.loads(level_weight)]
- if level_weight is None:
- level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
- level_list = [level for level in level_weight]
- for item in videos:
- video_id, flow_pool = item['videoId'], item['flowPool']
- key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
- # 本地记录的分发数 - 1
- redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
- # 对该视频做分发数检查
- cur_count = redis_helper.get_data_from_redis(key_name=key_name)
- # 无记录
- if cur_count is None:
- continue
- # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
- if int(cur_count) <= 0:
- add_remove_log = False
- redis_helper.del_keys(key_name=key_name)
- for app_name in config_.APP_TYPE:
- app_type = config_.APP_TYPE.get(app_name)
- flow_pool_key_list = [
- f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
- ]
- for level in level_list:
- flow_pool_key_list.append(f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{app_type}:{level}")
- for key in flow_pool_key_list:
- remove_res = redis_helper.remove_value_from_set(key_name=key,
- values=(f"{video_id}-{flow_pool}", ))
- if remove_res > 0:
- add_remove_log = True
- if add_remove_log is True:
- log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
- # if redis_helper.key_exists(key_name=key_name):
- # # 该视频本地有记录,本地记录的分发数 - 1
- # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # else:
- # # 该视频本地无记录,接口获取的分发数 - 1
- # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
- except Exception as e:
- log_.error('update_local_distribute_count error...')
- log_.error(traceback.format_exc())
- def update_local_distribute_count_new_with_level_score(videos, level_weight):
- """
- 更新本地分发数
- :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
- 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
- :return:
- """
- try:
- redis_helper = RedisHelper()
- # level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
- # level_list = [level for level in json.loads(level_weight)]
- if level_weight is None:
- level_weight = {'1': 1, '2': 1, '3': 1, '4': 1, '5': 1, '6': 1}
- level_list = [level for level in level_weight]
- for item in videos:
- video_id, flow_pool = item['videoId'], item['flowPool']
- key_name = f"{config_.LOCAL_DISTRIBUTE_COUNT_PREFIX}{video_id}:{flow_pool}"
- # 本地记录的分发数 - 1
- redis_helper.decr_key(key_name=key_name, amount=1, expire_time=15 * 60)
- # 对该视频做分发数检查
- cur_count = redis_helper.get_data_from_redis(key_name=key_name)
- # 无记录
- if cur_count is None:
- continue
- # 本地分发数 cur_count <= 0,从所有的流量召回池移除,删除本地分发记录key
- if int(cur_count) <= 0:
- add_remove_log = False
- redis_helper.del_keys(key_name=key_name)
- for app_name in config_.APP_TYPE:
- app_type = config_.APP_TYPE.get(app_name)
- flow_pool_key_list = [
- f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{config_.QUICK_FLOW_POOL_ID}"
- ]
- for level in level_list:
- flow_pool_key_list.append(f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}")
- for key in flow_pool_key_list:
- remove_res = redis_helper.remove_value_from_zset(key_name=key, value=f"{video_id}-{flow_pool}")
- if remove_res > 0:
- add_remove_log = True
- if add_remove_log is True:
- log_.info({'tag': 'remove video_id from flow_pool', 'video_id': video_id, 'flow_pool': flow_pool})
- # if redis_helper.key_exists(key_name=key_name):
- # # 该视频本地有记录,本地记录的分发数 - 1
- # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # else:
- # # 该视频本地无记录,接口获取的分发数 - 1
- # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
- except Exception as e:
- log_.error('update_local_distribute_count error...')
- log_.error(traceback.format_exc())
- def get_religion_class_with_mid(mid, religion_class_name):
- """
- 判断用户是否属于对应的宗教类型
- :param mid: mid type-string
- :param religion_class_name: 宗教类型, type-string, (catholicism-天主教, christianity-基督教)
- :return: religion_class_flag, type-int, (0-否,1-是), 默认: 0
- """
- religion_class_flag = 0
- now_date = datetime.today()
- redis_helper = RedisHelper()
- if mid:
- hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest()
- hash_tag = hash_mid[-1:]
- key_name_prefix = config_.KEY_NAME_PREFIX_RELIGION_USER.get(religion_class_name, None)
- if key_name_prefix is None:
- return religion_class_flag
- key_name = f"{key_name_prefix}{hash_tag}:{datetime.strftime(now_date, '%Y%m%d')}"
- if not redis_helper.key_exists(key_name=key_name):
- key_name = f"{key_name_prefix}{hash_tag}:{datetime.strftime(now_date - timedelta(days=1), '%Y%m%d')}"
- if redis_helper.data_exists_with_set(key_name=key_name, value=mid):
- religion_class_flag = 1
- return religion_class_flag
- def get_recommend_params(recommend_type, ab_exp_info, ab_info_data, mid, app_type, page_type=0,
- flow_pool_abtest_config=None):
- """
- 根据实验分组给定对应的推荐参数
- :param recommend_type: 首页推荐和相关推荐区分参数(0-首页推荐,1-相关推荐)
- :param ab_exp_info: AB实验组参数
- :param ab_info_data: app实验组参数
- :param mid: mid
- :param app_type: app_type, type-int
- :param page_type: 页面区分参数,默认:0(首页)
- :param flow_pool_abtest_config: 流量池abtest配置
- :return:
- """
- top_K = config_.K
- flow_pool_P = config_.P
- # 不获取人工干预数据标记
- no_op_flag = True
- expire_time = 3600
- old_video_index = -1
- # 获取对应的默认配置
- ab_initial_config = config_.INITIAL_CONFIG.get(app_type, None)
- if ab_initial_config is None:
- ab_initial_config = config_.INITIAL_CONFIG.get('other')
- param = config_.AB_EXP_CODE[ab_initial_config]
- ab_code = param.get('ab_code')
- rule_key = param.get('rule_key')
- data_key = param.get('data_key')
- rule_key_30day = param.get('30day_rule_key')
- shield_config = config_.SHIELD_CONFIG
- rank_key_prefix = 'rank:score1:'
- h_rule_key = param.get('h_rule_key')
- h_data_key = param.get('h_data_key')
- # 默认使用 095 实验的配置
- # ab_code = config_.AB_EXP_CODE['095'].get('ab_code')
- # rule_key = config_.AB_EXP_CODE['095'].get('rule_key')
- # data_key = config_.AB_EXP_CODE['095'].get('data_key')
- # rule_key_30day = None
- # 获取用户近30天是否有回流
- # user_30day_return_result = get_user_has30day_return(mid=mid)
- # 获取实验配置
- if ab_exp_info:
- ab_exp_code_list = []
- config_value_dict = {}
- for _, item in ab_exp_info.items():
- if not item:
- continue
- for ab_item in item:
- ab_exp_code = ab_item.get('abExpCode', None)
- if not ab_exp_code:
- continue
- ab_exp_code_list.append(str(ab_exp_code))
- config_value_dict[str(ab_exp_code)] = ab_item.get('configValue', None)
- # 流量池视频分发概率实验
- if '211' in ab_exp_code_list:
- flow_pool_P = 0.9
- elif '221' in ab_exp_code_list:
- flow_pool_P = 0.7
- elif '299' in ab_exp_code_list:
- flow_pool_P = 0.5
- elif '300' in ab_exp_code_list:
- flow_pool_P = 0.4
- elif '301' in ab_exp_code_list:
- flow_pool_P = 0.6
- elif '339' in ab_exp_code_list:
- flow_pool_P = 0
- # if '136' in ab_exp_code_list:
- # # 无回流 - 消费人群
- # if user_30day_return_result == 0:
- # param = config_.AB_EXP_CODE.get('136')
- # ab_code = param.get('ab_code')
- # expire_time = 3600
- # rule_key = param.get('rule_key')
- # data_key = param.get('data_key')
- # no_op_flag = True
- # elif '137' in ab_exp_code_list:
- # # 有回流 - 分享人群
- # if user_30day_return_result == 1:
- # param = config_.AB_EXP_CODE.get('137')
- # ab_code = param.get('ab_code')
- # expire_time = 3600
- # rule_key = param.get('rule_key')
- # data_key = param.get('data_key')
- # no_op_flag = True
- # elif '161' in ab_exp_code_list:
- # # 无回流 - 消费人群
- # if user_30day_return_result == 0:
- # param = config_.AB_EXP_CODE.get('136')
- # ab_code = param.get('ab_code')
- # expire_time = 3600
- # rule_key = param.get('rule_key')
- # data_key = param.get('data_key')
- # no_op_flag = True
- # # 有回流 - 分享人群
- # else:
- # param = config_.AB_EXP_CODE.get('137')
- # ab_code = param.get('ab_code')
- # expire_time = 3600
- # rule_key = param.get('rule_key')
- # data_key = param.get('data_key')
- # no_op_flag = True
- # elif '162' in ab_exp_code_list:
- # # 有回流
- # if user_30day_return_result == 1:
- # param = config_.AB_EXP_CODE.get('162')
- # ab_code = param.get('ab_code')
- # expire_time = 3600
- # rule_key = param.get('rule_key')
- # data_key = param.get('data_key')
- # no_op_flag = True
- # 老好看视频 宗教人群实验
- # if '228' in ab_exp_code_list:
- # # 天主教
- # religion_param = config_.AB_EXP_CODE['228']
- # religion_class_name = religion_param.get('religion_class_name')
- # religion_class_flag = get_religion_class_with_mid(mid=mid, religion_class_name=religion_class_name)
- # if religion_class_flag == 1:
- # ab_code = religion_param.get('ab_code')
- # rule_key = religion_param.get('rule_key')
- # data_key = religion_param.get('data_key')
- # rule_key_30day = religion_param.get('30day_rule_key')
- # elif '229' in ab_exp_code_list:
- # # 基督教
- # religion_param = config_.AB_EXP_CODE['229']
- # religion_class_name = religion_param.get('religion_class_name')
- # religion_class_flag = get_religion_class_with_mid(mid=mid, religion_class_name=religion_class_name)
- # if religion_class_flag == 1:
- # ab_code = religion_param.get('ab_code')
- # rule_key = religion_param.get('rule_key')
- # data_key = religion_param.get('data_key')
- # rule_key_30day = religion_param.get('30day_rule_key')
-
- for code, param in config_.AB_EXP_CODE.items():
- if code in ab_exp_code_list:
- ab_code = param.get('ab_code')
- rule_key = param.get('rule_key')
- data_key = param.get('data_key')
- rule_key_30day = param.get('30day_rule_key')
- shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
- rank_key_prefix = param.get('rank_key_prefix', 'rank:score1:')
- h_rule_key = param.get('h_rule_key')
- h_data_key = param.get('h_data_key')
- break
- """
- # 推荐条数 10->4 实验
- # if config_.AB_EXP_CODE['rec_size_home'] in ab_exp_code_list:
- # config_value = config_value_dict.get(config_.AB_EXP_CODE['rec_size_home'], None)
- # if config_value:
- # config_value = eval(str(config_value))
- # else:
- # config_value = {}
- # log_.info(f'config_value: {config_value}, type: {type(config_value)}')
- # size = int(config_value.get('size', 4))
- # top_K = int(config_value.get('K', 3))
- # flow_pool_P = float(config_value.get('P', 0.3))
- # else:
- # size = size
- # top_K = config_.K
- # flow_pool_P = config_.P
-
- # 算法实验相对对照组
- # if config_.AB_EXP_CODE['ab_initial'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['ab_initial']
- # expire_time = 24 * 3600
- # rule_key = config_.RULE_KEY['initial']
- # no_op_flag = True
-
- # 小时级更新-规则1 实验
- # elif config_.AB_EXP_CODE['rule_rank1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank1')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank1']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['rule_rank2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank2']
-
- # elif config_.AB_EXP_CODE['rule_rank3'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank3')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank3']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['rule_rank4'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank4')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank4']
-
- # elif config_.AB_EXP_CODE['rule_rank5'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank5')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank5']
-
- # elif config_.AB_EXP_CODE['day_rule_rank1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank1')
- # expire_time = 24 * 3600
- # rule_key = config_.RULE_KEY_DAY['day_rule_rank1']
- # no_op_flag = True
-
- # if config_.AB_EXP_CODE['rule_rank6'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_h'].get('rule_rank6')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY['rule_rank6']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['day_rule_rank2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_day'].get('day_rule_rank2')
- # expire_time = 24 * 3600
- # rule_key = config_.RULE_KEY_DAY['day_rule_rank2']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank1')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank1']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['24h_rule_rank1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_24h'].get('24h_rule_rank1')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_24H['24h_rule_rank1']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['24h_rule_rank2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rank_by_24h'].get('24h_rule_rank2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_24H['24h_rule_rank2']
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank2']
- # no_op_flag = True
-
- # if config_.AB_EXP_CODE['region_rule_rank3'] in ab_exp_code_list or\
- # config_.AB_EXP_CODE['region_rule_rank3_appType_19'] in ab_exp_code_list or\
- # config_.AB_EXP_CODE['region_rule_rank3_appType_4'] in ab_exp_code_list or\
- # config_.AB_EXP_CODE['region_rule_rank3_appType_6'] in ab_exp_code_list or\
- # config_.AB_EXP_CODE['region_rule_rank3_appType_18'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank3')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank3'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank3'].get('data_key')
- # no_op_flag = True
-
- # if config_.AB_EXP_CODE['region_rule_rank4'] in ab_exp_code_list or\
- if config_.AB_EXP_CODE['region_rule_rank4_appType_19'] in ab_exp_code_list or \
- config_.AB_EXP_CODE['region_rule_rank4_appType_4'] in ab_exp_code_list or\
- config_.AB_EXP_CODE['region_rule_rank4_appType_6'] in ab_exp_code_list or\
- config_.AB_EXP_CODE['region_rule_rank4_appType_18'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4'].get('data_key')
- no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data1'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data1'].get('data_key')
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank3_appType_5_data2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank3_appType_5_data2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank3_appType_5_data2'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank3_appType_5_data2'].get('data_key')
- # no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data3'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_5_data3')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data3'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data3'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_5_data4'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_5_data4')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data4'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_5_data4'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_0_data2'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_0_data2')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_0_data2'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_0_data2'].get('data_key')
- no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank4_appType_19_data2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_19_data2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data2'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data2'].get('data_key')
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank4_appType_19_data3'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_19_data3')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data3'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_19_data3'].get('data_key')
- # no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank5_appType_0_data1'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank5_appType_0_data1')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank5_appType_0_data1'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_4_data2'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_4_data2')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data2'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data2'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_4_data3'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_4_data3')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data3'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_4_data3'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_6_data2'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_6_data2')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data2'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data2'].get('data_key')
- no_op_flag = True
-
- elif config_.AB_EXP_CODE['region_rule_rank4_appType_6_data3'] in ab_exp_code_list:
- ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_6_data3')
- expire_time = 3600
- rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data3'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_6_data3'].get('data_key')
- no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank4_appType_18_data2'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4_appType_18_data2')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_18_data2'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank4_appType_18_data2'].get('data_key')
- # no_op_flag = True
-
- # elif config_.AB_EXP_CODE['region_rule_rank6_appType_0_data1'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank6_appType_0_data1')
- # expire_time = 3600
- # rule_key = config_.RULE_KEY_REGION['region_rule_rank6_appType_0_data1'].get('rule_key')
- # data_key = config_.RULE_KEY_REGION['region_rule_rank6_appType_0_data1'].get('data_key')
- # no_op_flag = True
-
- else:
- ab_code = config_.AB_CODE['initial']
- expire_time = 24 * 3600
- rule_key = config_.RULE_KEY_REGION['initial'].get('rule_key')
- data_key = config_.RULE_KEY_REGION['initial'].get('data_key')
-
- # # 老好看视频 / 票圈最惊奇 首页/相关推荐逻辑更新实验
- # if config_.AB_EXP_CODE['rov_rank_appType_18_19'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rov_rank_appType_18_19']
- # expire_time = 3600
- # flow_pool_P = config_.P_18_19
- # no_op_flag = True
- #
- # elif config_.AB_EXP_CODE['rov_rank_appType_19'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['rov_rank_appType_19']
- # expire_time = 3600
- # top_K = 0
- # flow_pool_P = config_.P_18_19
- # no_op_flag = True
- #
- # elif config_.AB_EXP_CODE['top_video_relevant_appType_19'] in ab_exp_code_list and page_type == 2:
- # ab_code = config_.AB_CODE['top_video_relevant_appType_19']
- # expire_time = 3600
- # top_K = 1
- # flow_pool_P = config_.P_18_19
- # no_op_flag = True
- #
- # # 票圈最惊奇完整影视资源实验
- # elif config_.AB_EXP_CODE['whole_movies'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['whole_movies']
- # expire_time = 24 * 3600
- # no_op_flag = True
-
- # 老视频实验
- # if config_.AB_EXP_CODE['old_video'] in ab_exp_code_list:
- # ab_code = config_.AB_CODE['old_video']
- # no_op_flag = True
- # old_video_index = 2
- # else:
- # old_video_index = -1
- """
- # APP实验组
- if ab_info_data:
- ab_info_app = {}
- for page_code, item in json.loads(ab_info_data).items():
- if not item:
- continue
- ab_info_code = item.get('eventId', None)
- if ab_info_code:
- ab_info_app[page_code] = ab_info_code
- # print(f"======{ab_info_app}")
- # 首页推荐
- if recommend_type == 0:
- app_ab_code = ab_info_app.get('10003', None)
- for code, param in config_.APP_AB_CODE['10003'].items():
- if code == app_ab_code:
- ab_code = param.get('ab_code')
- rule_key = param.get('rule_key')
- data_key = param.get('data_key')
- break
- # # 相关推荐
- # elif recommend_type == 1:
- # if config_.APP_AB_CODE['10037'] == ab_info_app.get('10037', None):
- # ab_code = config_.AB_CODE['region_rank_by_h'].get('region_rule_rank4')
- # expire_time = 3600
- # rule_key = 'rule3'
- # data_key = 'data1'
- # no_op_flag = True
- # 流量池分发实验组划分
- # 1. 随机选取流量池id
- flow_pool_id_choice = random.choice(config_.FLOWPOOL_ID_LIST)
- # 2. 判断流量id所属实验配置分组
- flow_pool_abtest_group = 'control_group'
- for key, items in flow_pool_abtest_config.items():
- if int(flow_pool_id_choice) in items:
- flow_pool_abtest_group = key
- # log_.info(f"flow_pool_id_choice: {flow_pool_id_choice}, flow_pool_abtest_group: {flow_pool_abtest_group}")
- return top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, no_op_flag, old_video_index, rule_key_30day, \
- shield_config, flow_pool_abtest_group, rank_key_prefix, h_data_key, h_rule_key
- def video_homepage_recommend(request_id, mid, uid, size, app_type, algo_type,
- client_info, ab_exp_info, params, ab_info_data, version_audit_status, env_dict,
- level_weight, flow_pool_abtest_config):
- """
- 首页线上推荐逻辑
- :param request_id: request_id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
- :param params:
- :param ab_info_data: app实验分组参数
- :param version_audit_status: 小程序版本审核参数:1-审核中,2-审核通过
- :return:
- """
- # 对 vlog 切换10%的流量做实验
- # 对mid进行哈希
- # hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest()
- # if app_type in config_.AB_TEST['rank_by_h'] and hash_mid[-1:] in ['8', '0', 'a', 'b']:
- # # 简单召回 - 排序 - 兜底
- # rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
- # algo_type=algo_type, client_info=client_info,
- # expire_time=3600,
- # ab_code=config_.AB_CODE['rank_by_h'])
- # # ab-test
- # result = ab_test_op(rank_result=rank_result,
- # ab_code_list=[config_.AB_CODE['position_insert']],
- # app_type=app_type, mid=mid, uid=uid)
- # # redis数据刷新
- # update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- # expire_time=3600)
- # if app_type == config_.APP_TYPE['APP']:
- # # 票圈视频APP
- # top_K = config_.K
- # flow_pool_P = config_.P
- # # 简单召回 - 排序 - 兜底
- # rank_result, last_rov_recall_key = video_recommend(request_id=request_id,
- # mid=mid, uid=uid, app_type=app_type,
- # size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- # algo_type=algo_type, client_info=client_info,
- # expire_time=12 * 3600, params=params)
- # # ab-test
- # # result = ab_test_op(rank_result=rank_result,
- # # ab_code_list=[config_.AB_CODE['position_insert']],
- # # app_type=app_type, mid=mid, uid=uid)
- # # redis数据刷新
- # update_redis_data(result=rank_result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- # top_K=top_K, expire_time=12 * 3600)
- #
- # else:
- recommend_result = {}
- param_st = time.time()
- recommend_result['fea_info'] = env_dict
- # 特殊mid 和 小程序审核版本推荐处理
- if mid in get_special_mid_list() or version_audit_status == 1:
- rank_result = special_mid_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size)
- recommend_result['videos'] = rank_result
- return recommend_result
- # 普通mid推荐处理
- top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
- no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix, \
- h_data_key, h_rule_key = \
- get_recommend_params(recommend_type=0, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, mid=mid,
- app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'get_recommend_params',
- # 'executeTime': (time.time() - param_st) * 1000
- # })
- recommend_result['getRecommendParamsTime'] = (time.time() - param_st) * 1000
- # 简单召回 - 排序 - 兜底
- get_result_st = time.time()
- #print("ab_code:", ab_code)
- #new pipeline
- # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
- # result = new_video_recommend(request_id=request_id,
- # mid=mid, uid=uid, app_type=app_type,
- # size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- # algo_type=algo_type, client_info=client_info,
- # ab_code=ab_code, expire_time=expire_time,
- # rule_key=rule_key, data_key=data_key,
- # no_op_flag=no_op_flag, old_video_index=old_video_index,
- # params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
- # simrecal: 60054 +融合, 全量
- # return video, return video2
- # old video: 60056, test2
- if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
- or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
- or ab_code == 60074 or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 \
- or ab_code == 60078 or ab_code == 60079 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082\
- or ab_code == 60083 or ab_code == 60084 or ab_code == 60085 or ab_code == 60086 \
- or ab_code == 60087 or ab_code == 60088 or ab_code == 60089 or ab_code == 60090 \
- or ab_code == 60091 or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 \
- or ab_code == 60096 or ab_code == 60097:
- result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
- top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
- client_info=client_info, ab_code=ab_code, expire_time=expire_time,
- rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- old_video_index=old_video_index, video_id=None, params=params,
- rule_key_30day=rule_key_30day, shield_config=shield_config,
- env_dict=env_dict, level_weight=level_weight,
- flow_pool_abtest_group=flow_pool_abtest_group,
- rank_key_prefix=rank_key_prefix,
- h_data_key=h_data_key, h_rule_key=h_rule_key)
- recommend_result['fea_info'] = fea_info
- else:
- result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,
- flow_pool_P=flow_pool_P, algo_type=algo_type, client_info=client_info,
- ab_code=ab_code, expire_time=expire_time, rule_key=rule_key, data_key=data_key,
- no_op_flag=no_op_flag, old_video_index=old_video_index, params=params,
- rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
- flow_pool_abtest_group=flow_pool_abtest_group, env_dict=env_dict)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'get_recommend_result',
- # 'executeTime': (time.time() - get_result_st) * 1000
- # })
- recommend_result['recommendOperation'] = result
- rank_result = result.get('rankResult')
- for i, item in enumerate(rank_result):
- item['position'] = i + 1
- recommend_result['videos'] = rank_result
- recommend_result['getRecommendResultTime'] = (time.time() - get_result_st) * 1000
- # ab-test
- # result = ab_test_op(rank_result=rank_result,
- # ab_code_list=[config_.AB_CODE['position_insert']],
- # app_type=app_type, mid=mid, uid=uid)
- # redis数据刷新
- update_redis_st = time.time()
- # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
- # update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
- # else:
- # update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
- update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
- level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'update_redis_data',
- # 'executeTime': (time.time() - update_redis_st) * 1000
- # })
- recommend_result['updateRedisDataTime'] = (time.time() - update_redis_st) * 1000
- return recommend_result
- # return rank_result
- def video_relevant_recommend(request_id, video_id, mid, uid, size, app_type, ab_exp_info, client_info,
- page_type, params, ab_info_data, version_audit_status, env_dict,
- level_weight, flow_pool_abtest_config):
- """
- 相关推荐逻辑
- :param request_id: request_id
- :param video_id: 相关推荐的头部视频id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param app_type: 产品标识 type-int
- :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
- :param client_info: 地域参数
- :param page_type: 页面区分参数 1:详情页;2:分享页
- :param params:
- :param ab_info_data: app实验分组参数
- :param version_audit_status: 小程序版本审核参数:1-审核中,2-审核通过
- :return: videos type-list
- """
- recommend_result = {}
- param_st = time.time()
- recommend_result['fea_info'] = env_dict
- # 特殊mid 和 小程序审核版本推荐处理
- if mid in get_special_mid_list() or version_audit_status == 1:
- rank_result = special_mid_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size)
- recommend_result['videos'] = rank_result
- return recommend_result
- # return rank_result
- # 普通mid推荐处理
- top_K, flow_pool_P, ab_code, rule_key, data_key, expire_time, \
- no_op_flag, old_video_index, rule_key_30day, shield_config, flow_pool_abtest_group, rank_key_prefix, \
- h_data_key, h_rule_key = \
- get_recommend_params(recommend_type=1, ab_exp_info=ab_exp_info, ab_info_data=ab_info_data, page_type=page_type,
- mid=mid, app_type=app_type, flow_pool_abtest_config=flow_pool_abtest_config)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'get_recommend_params',
- # 'executeTime': (time.time() - param_st) * 1000
- # })
- recommend_result['getRecommendParamsTime'] = (time.time() - param_st) * 1000
- # 简单召回 - 排序 - 兜底
- get_result_st = time.time()
- #print("ab_code:", ab_code)
- # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
- # result = new_video_recommend(request_id=request_id,
- # mid=mid, uid=uid, app_type=app_type,
- # size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- # algo_type='', client_info=client_info,
- # ab_code=ab_code, expire_time=expire_time,
- # rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- # old_video_index=old_video_index, video_id=video_id,
- # params=params, rule_key_30day=rule_key_30day, shield_config=shield_config)
- if ab_code == 60054 or ab_code == 60056 or ab_code == 60067 or ab_code == 60068 or ab_code == 60066 \
- or ab_code == 60069 or ab_code == 60070 or ab_code == 60071 or ab_code == 60072 or ab_code == 60073 \
- or ab_code == 60074 or ab_code == 60075 or ab_code == 60076 or ab_code == 60077 \
- or ab_code == 60078 or ab_code == 60079 or ab_code == 60080 or ab_code == 60081 or ab_code == 60082 \
- or ab_code == 60083 or ab_code == 60084 or ab_code == 60085 or ab_code == 60086 \
- or ab_code == 60087 or ab_code == 60088 or ab_code == 60089 or ab_code == 60090 \
- or ab_code == 60091 or ab_code == 60092 or ab_code == 60093 or ab_code == 60094 or ab_code == 60095 \
- or ab_code == 60096 or ab_code == 60097:
- result, fea_info = video_old_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size,
- top_K=top_K, flow_pool_P=flow_pool_P, algo_type='',
- client_info=client_info, ab_code=ab_code, expire_time=expire_time,
- rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- old_video_index=old_video_index, video_id=video_id, params=params,
- rule_key_30day=rule_key_30day, shield_config=shield_config,
- env_dict=env_dict, level_weight=level_weight,
- flow_pool_abtest_group=flow_pool_abtest_group,
- rank_key_prefix=rank_key_prefix,
- h_data_key=h_data_key, h_rule_key=h_rule_key)
- recommend_result['fea_info'] = fea_info
- else:
- result = video_recommend(request_id=request_id, mid=mid, uid=uid, app_type=app_type, size=size, top_K=top_K,
- flow_pool_P=flow_pool_P, algo_type='', client_info=client_info, ab_code=ab_code,
- expire_time=expire_time, rule_key=rule_key, data_key=data_key, no_op_flag=no_op_flag,
- old_video_index=old_video_index, video_id=video_id, params=params,
- rule_key_30day=rule_key_30day, shield_config=shield_config, level_weight=level_weight,
- flow_pool_abtest_group=flow_pool_abtest_group, env_dict=env_dict)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'get_recommend_result',
- # 'executeTime': (time.time() - get_result_st) * 1000
- # })
- recommend_result['recommendOperation'] = result
- rank_result = result.get('rankResult')
- for i, item in enumerate(rank_result):
- item['position'] = i+1
- recommend_result['videos'] = rank_result
- recommend_result['getRecommendResultTime'] = (time.time() - get_result_st) * 1000
- # ab-test
- # result = ab_test_op(rank_result=rank_result,
- # ab_code_list=[config_.AB_CODE['position_insert'], config_.AB_CODE['relevant_video_op']],
- # app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size)
- # redis数据刷新
- update_redis_st = time.time()
- # if ab_code == 60047 or ab_code == 60048 or ab_code == 60049:
- # update_flow_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
- # else:
- # update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K)
- update_redis_data(result=rank_result, app_type=app_type, mid=mid, top_K=top_K,
- level_weight=level_weight, flow_pool_abtest_group=flow_pool_abtest_group)
- # log_.info({
- # 'logTimestamp': int(time.time() * 1000),
- # 'request_id': request_id,
- # 'app_type': app_type,
- # 'mid': mid,
- # 'uid': uid,
- # 'operation': 'update_redis_data',
- # 'executeTime': (time.time() - update_redis_st) * 1000
- # })
- recommend_result['updateRedisDataTime'] = (time.time() - update_redis_st) * 1000
- return recommend_result
- # return rank_result
- def special_mid_recommend(request_id, mid, uid, app_type, size,
- ab_code=config_.AB_CODE['special_mid'],
- push_from=config_.PUSH_FROM['special_mid'],
- expire_time=24*3600):
- redis_helper = RedisHelper()
- # 特殊mid推荐指定视频列表
- pool_recall = PoolRecall(request_id=request_id, app_type=app_type,
- mid=mid, uid=uid, ab_code=ab_code)
- # 获取相关redis key
- special_key_name, redis_date = pool_recall.get_pool_redis_key(pool_type='special')
- # 用户上一次在rov召回池对应的位置
- last_special_recall_key = f'{config_.LAST_VIDEO_FROM_SPECIAL_POOL_PREFIX}{app_type}:{mid}:{redis_date}'
- value = redis_helper.get_data_from_redis(last_special_recall_key)
- if value:
- idx = redis_helper.get_index_with_data(special_key_name, value)
- if not idx:
- idx = 0
- else:
- idx += 1
- else:
- idx = 0
- recall_result = []
- # 每次获取的视频数
- get_size = size * 5
- # 记录获取频次
- freq = 0
- while len(recall_result) < size:
- freq += 1
- if freq > config_.MAX_FREQ_FROM_ROV_POOL:
- break
- # 获取数据
- data = redis_helper.get_data_zset_with_index(key_name=special_key_name,
- start=idx, end=idx + get_size - 1,
- with_scores=True)
- if not data:
- break
- # 获取视频id,并转换类型为int,并存储为key-value{videoId: score}
- # 添加视频源参数 pushFrom, abCode
- temp_result = [{'videoId': int(value[0]), 'rovScore': value[1],
- 'pushFrom': push_from, 'abCode': ab_code}
- for value in data]
- recall_result.extend(temp_result)
- idx += get_size
- # 将此次获取的末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
- if mid and recall_result:
- # mid为空时,不做记录
- redis_helper.set_data_to_redis(key_name=last_special_recall_key,
- value=recall_result[:size][-1]['videoId'],
- expire_time=expire_time)
- return recall_result[:size]
- def get_special_mid_list():
- redis_helper = RedisHelper()
- special_mid_list = redis_helper.get_data_from_set(key_name=config_.KEY_NAME_SPECIAL_MID)
- if special_mid_list:
- return special_mid_list
- else:
- return []
- if __name__ == '__main__':
- videos = [
- {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486,
- "pushFrom": "flow_pool", "abCode": 10000},
- {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool",
- "abCode": 10000},
- {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575,
- "pushFrom": "flow_pool", "abCode": 10000},
- {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000}
- ]
- res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10)
- print(res)
|