123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- import json
- import time
- import multiprocessing
- import traceback
- import hashlib
- from datetime import datetime
- import config
- from log import Log
- from config import set_config
- from video_recall import PoolRecall
- from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate
- from db_helper import RedisHelper
- import gevent
- from utils import FilterVideos
- import ast
- log_ = Log()
- config_ = set_config()
- def relevant_video_top_recommend(app_type, mid, uid, head_vid, videos, size):
- """
- 相关推荐强插 运营给定置顶相关性视频
- :param app_type: 产品标识 type-int
- :param mid: mid
- :param uid: uid
- :param head_vid: 相关推荐头部视频id type-int
- :param videos: 当前相关推荐结果 type-list
- :param size: 返回视频个数 type-int
- :return: rank_result
- """
- # 获取头部视频对应的相关性视频
- key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
- redis_helper = RedisHelper()
- relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
- if relevant_videos is None:
- # 该视频没有指定的相关性视频
- return videos
- relevant_videos = json.loads(relevant_videos)
- # 按照指定顺序排序
- relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False)
- # 过滤
- relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted]
- filter_helper = FilterVideos(app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid)
- filtered_ids = filter_helper.filter_videos()
- if filtered_ids is None:
- return videos
- # 获取生效中的视频
- now = int(time.time())
- relevant_videos_in_effect = [
- {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'],
- 'abCode': config_.AB_CODE['relevant_video_op']}
- for item in relevant_videos_sorted
- if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids
- ]
- if len(relevant_videos_in_effect) == 0:
- return videos
- # 与现有排序结果 进行合并重排
- # 获取现有排序结果中流量池视频 及其位置
- relevant_ids = [item['videoId'] for item in relevant_videos_in_effect]
- flow_pool_videos = []
- other_videos = []
- for i, item in enumerate(videos):
- if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids:
- flow_pool_videos.append((i, item))
- elif item.get('videoId') not in relevant_ids:
- other_videos.append(item)
- else:
- continue
- # 重排,保持流量池视频位置不变
- rank_result = relevant_videos_in_effect + other_videos
- for i, item in flow_pool_videos:
- rank_result.insert(i, item)
- return rank_result[:size]
- def video_position_recommend(mid, uid, app_type, videos):
- # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
- # algo_type=algo_type, client_info=client_info)
- redis_helper = RedisHelper()
- pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME)
- pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME)
- if pos1_vids is not None:
- pos1_vids = ast.literal_eval(pos1_vids)
- if pos2_vids is not None:
- pos2_vids = ast.literal_eval(pos2_vids)
- pos1_vids = [] if pos1_vids is None else pos1_vids
- pos2_vids = [] if pos2_vids is None else pos2_vids
- pos1_vids = [int(i) for i in pos1_vids]
- pos2_vids = [int(i) for i in pos2_vids]
- filter_1 = FilterVideos(app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid)
- filter_2 = FilterVideos(app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid)
- t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)]
- gevent.joinall(t)
- filted_list = [i.get() for i in t]
- pos1_vids = filted_list[0]
- pos2_vids = filted_list[1]
- videos = positon_duplicate(pos1_vids, pos2_vids, videos)
- if pos1_vids is not None and len(pos1_vids) >0 :
- videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
- 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
- if pos2_vids is not None and len(pos2_vids) >0 :
- videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100,
- 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
- return videos[:10]
- def positon_duplicate(pos1_vids, pos2_vids, videos):
- s = set()
- if pos1_vids is not None and len(pos1_vids) >0:
- s.add(int(pos1_vids[0]))
- if pos2_vids is not None and len(pos2_vids) >0:
- s.add(int(pos2_vids[0]))
- l = []
- for item in videos:
- if item['videoId'] in s:
- continue
- else:
- l.append(item)
- return l
- def video_recommend(mid, uid, size, top_K, flow_pool_P, app_type, algo_type, client_info, expire_time=24*3600,
- ab_code=config_.AB_CODE['initial']):
- """
- 首页线上推荐逻辑
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param top_K: 保证topK为召回池视频 type-int
- :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param expire_time: 末位视频记录redis过期时间
- :param ab_code: AB实验code
- :return:
- """
- # ####### 多进程召回
- start_recall = time.time()
- # log_.info('====== recall')
- '''
- cores = multiprocessing.cpu_count()
- pool = multiprocessing.Pool(processes=cores)
- pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
- _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
- pool_list = [
- # rov召回池
- pool.apply_async(pool_recall.rov_pool_recall, (size,)),
- # 流量池
- pool.apply_async(pool_recall.flow_pool_recall, (size,))
- ]
- recall_result_list = [p.get() for p in pool_list]
- pool.close()
- pool.join()
- '''
- recall_result_list = []
- pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
- client_info=client_info)
- _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
- t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)]
- gevent.joinall(t)
- recall_result_list = [i.get() for i in t]
- end_recall = time.time()
- log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
- mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
- # ####### 排序
- start_rank = time.time()
- # log_.info('====== rank')
- data = {
- 'rov_pool_recall': recall_result_list[0],
- 'flow_pool_recall': recall_result_list[1]
- }
- rank_result = video_rank(data=data, size=size, top_K=top_K, flow_pool_P=flow_pool_P)
- end_rank = time.time()
- log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
- mid, uid, rank_result, (end_rank - start_rank) * 1000))
- if not rank_result:
- # 兜底策略
- # log_.info('====== bottom strategy')
- start_bottom = time.time()
- rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
- end_bottom = time.time()
- log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
- mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
- return rank_result, last_rov_recall_key
- def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
- """
- 对排序后的结果 按照AB实验进行对应的分组操作
- :param rank_result: 排序后的结果
- :param ab_code_list: 此次请求参与的 ab实验组
- :param app_type: 产品标识
- :param mid: mid
- :param uid: uid
- :param kwargs: 其他参数
- :return:
- """
- # ####### 视频宽高比AB实验
- # 对内容精选进行 视频宽高比分发实验
- # if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []):
- # rank_result = video_rank_by_w_h_rate(videos=rank_result)
- # log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format(
- # app_type, mid, uid, rank_result))
- # 按position位置排序
- if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []):
- rank_result = video_position_recommend(mid, uid, app_type, rank_result)
- print('===========================')
- print(rank_result)
- log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format(
- app_type, mid, uid, rank_result))
- # 相关推荐强插
- # if config_.AB_CODE['relevant_video_op'] in ab_code_list \
- # and app_type in config_.AB_TEST.get('relevant_video_op', []):
- # head_vid = kwargs['head_vid']
- # size = kwargs['size']
- # rank_result = relevant_video_top_recommend(
- # app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size
- # )
- # log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format(
- # app_type, mid, uid, head_vid, rank_result))
- return rank_result
- def update_redis_data(result, app_type, mid, last_rov_recall_key, top_K, expire_time=24*3600):
- """
- 根据最终的排序结果更新相关redis数据
- :param result: 排序结果
- :param app_type: 产品标识
- :param mid: mid
- :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key
- :param top_K: 保证topK为召回池视频 type-int
- :param expire_time: 末位视频记录redis过期时间
- :return: None
- """
- # ####### redis数据刷新
- try:
- # log_.info('====== update redis')
- # 预曝光数据同步刷新到Redis, 过期时间为0.5h
- redis_helper = RedisHelper()
- preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
- preview_video_ids = [int(item['videoId']) for item in result]
- if preview_video_ids:
- # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
- redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
- log_.info('preview redis update success!')
- # 将此次获取的ROV召回池top_K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
- rov_recall_video = [item['videoId'] for item in result[:top_K]
- if item['pushFrom'] == config_.PUSH_FROM['rov_recall']]
- if len(rov_recall_video) > 0:
- if app_type == config_.APP_TYPE['APP']:
- key_name = config_.UPDATE_ROV_KEY_NAME_APP
- else:
- key_name = config_.UPDATE_ROV_KEY_NAME
- if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]):
- redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1],
- expire_time=expire_time)
- log_.info('last video redis update success!')
- # 将此次分发的流量池视频,对 本地分发数-1 进行记录
- flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
- if flow_recall_video:
- update_local_distribute_count(flow_recall_video)
- log_.info('update local distribute count success!')
- except Exception as e:
- log_.error("update redis data fail!")
- log_.error(traceback.format_exc())
- def update_local_distribute_count(videos):
- """
- 更新本地分发数
- :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
- 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
- :return:
- """
- try:
- redis_helper = RedisHelper()
- for item in videos:
- key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
- # 本地记录的分发数 - 1
- redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # if redis_helper.key_exists(key_name=key_name):
- # # 该视频本地有记录,本地记录的分发数 - 1
- # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
- # else:
- # # 该视频本地无记录,接口获取的分发数 - 1
- # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
- except Exception as e:
- log_.error('update_local_distribute_count error...')
- log_.error(traceback.format_exc())
- def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info, ab_exp_info):
- """
- 首页线上推荐逻辑
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param app_type: 产品标识 type-int
- :param algo_type: 算法类型 type-string
- :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
- :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
- :return:
- """
- # 对 vlog 切换10%的流量做实验
- # 对mid进行哈希
- # hash_mid = hashlib.md5(mid.encode('utf-8')).hexdigest()
- # if app_type in config_.AB_TEST['rank_by_h'] and hash_mid[-1:] in ['8', '0', 'a', 'b']:
- # # 简单召回 - 排序 - 兜底
- # rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
- # algo_type=algo_type, client_info=client_info,
- # expire_time=3600,
- # ab_code=config_.AB_CODE['rank_by_h'])
- # # ab-test
- # result = ab_test_op(rank_result=rank_result,
- # ab_code_list=[config_.AB_CODE['position_insert']],
- # app_type=app_type, mid=mid, uid=uid)
- # # redis数据刷新
- # update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- # expire_time=3600)
- if app_type == config_.APP_TYPE['APP']:
- # 票圈视频APP
- top_K = config_.K
- flow_pool_P = config_.P
- # 简单召回 - 排序 - 兜底
- rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type,
- size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- algo_type=algo_type, client_info=client_info,
- expire_time=12 * 3600)
- # ab-test
- result = ab_test_op(rank_result=rank_result,
- ab_code_list=[config_.AB_CODE['position_insert']],
- app_type=app_type, mid=mid, uid=uid)
- # redis数据刷新
- update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- top_K=top_K, expire_time=12 * 3600)
- else:
- if ab_exp_info is None:
- size = size
- top_K = config_.K
- flow_pool_P = config_.P
- else:
- exp_item_id_list = [item.get('expItemId') for item in ab_exp_info]
- if config_.EXP_ITEM_ID['rec_size'] in exp_item_id_list:
- config_value = ab_exp_info[exp_item_id_list.index(config_.EXP_ITEM_ID['rec_size'])].get('configValue')
- size = config_value.get('size', 4)
- top_K = config_value.get('K', 3)
- flow_pool_P = config_value.get('P', 0.3)
- else:
- size = size
- top_K = config_.K
- flow_pool_P = config_.P
- # 简单召回 - 排序 - 兜底
- rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type,
- size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- algo_type=algo_type, client_info=client_info)
- # ab-test
- result = ab_test_op(rank_result=rank_result,
- ab_code_list=[config_.AB_CODE['position_insert']],
- app_type=app_type, mid=mid, uid=uid)
- # redis数据刷新
- update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- top_K=top_K)
- return result
- def video_relevant_recommend(video_id, mid, uid, size, app_type, ab_exp_info):
- """
- 相关推荐逻辑
- :param video_id: 相关推荐的头部视频id
- :param mid: mid type-string
- :param uid: uid type-string
- :param size: 请求视频数量 type-int
- :param app_type: 产品标识 type-int
- :param ab_exp_info: ab实验分组参数 [{"expItemId":1, "configValue":{"size":4, "K":3, ...}}, ...]
- :return: videos type-list
- """
- if ab_exp_info is None:
- size = size
- top_K = config_.K
- flow_pool_P = config_.P
- else:
- exp_item_id_list = [item.get('expItemId') for item in ab_exp_info]
- if config_.EXP_ITEM_ID['rec_size'] in exp_item_id_list:
- config_value = ab_exp_info[exp_item_id_list.index(config_.EXP_ITEM_ID['rec_size'])].get('configValue')
- size = config_value.get('size', 4)
- top_K = config_value.get('K', 3)
- flow_pool_P = config_value.get('P', 0.3)
- else:
- size = size
- top_K = config_.K
- flow_pool_P = config_.P
- # 简单召回 - 排序 - 兜底
- rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, app_type=app_type,
- size=size, top_K=top_K, flow_pool_P=flow_pool_P,
- algo_type='', client_info=None)
- # ab-test
- result = ab_test_op(rank_result=rank_result,
- ab_code_list=[config_.AB_CODE['position_insert'], config_.AB_CODE['relevant_video_op']],
- app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size)
- # redis数据刷新
- update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
- top_K=top_K)
- return result
- if __name__ == '__main__':
- videos = [
- {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486,
- "pushFrom": "flow_pool", "abCode": 10000},
- {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool",
- "abCode": 10000},
- {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000},
- {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575,
- "pushFrom": "flow_pool", "abCode": 10000},
- {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000}
- ]
- res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10)
- print(res)
|