video_rank.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import random
  2. import numpy
  3. from log import Log
  4. from config import set_config
  5. from video_recall import PoolRecall
  6. from db_helper import RedisHelper
  7. from utils import FilterVideos, send_msg_to_feishu
  8. log_ = Log()
  9. config_ = set_config()
  10. def video_rank(data, size):
  11. """
  12. 视频分发排序
  13. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  14. :param size: 请求数
  15. :return: rank_result
  16. """
  17. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  18. return None
  19. # 将各路召回的视频按照score从大到小排序
  20. # ROV召回池
  21. rov_recall_rank = sorted(data['rov_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
  22. # 流量池
  23. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: (k.get('rovScore'), 0), reverse=True)
  24. # 对各路召回的视频进行去重
  25. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank)
  26. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  27. # rov_recall_rank, flow_recall_rank))
  28. # 从ROV召回池中获取top k
  29. if len(rov_recall_rank) > 0:
  30. rank_result = rov_recall_rank[:config_.K]
  31. rov_recall_rank = rov_recall_rank[config_.K:]
  32. else:
  33. rank_result = flow_recall_rank[:config_.K]
  34. flow_recall_rank = flow_recall_rank[config_.K:]
  35. # 按概率 p 及score排序获取 size - k 个视频
  36. i = 0
  37. while i < size - config_.K:
  38. # 随机生成[0, 1)浮点数
  39. rand = random.random()
  40. # log_.info('rand: {}'.format(rand))
  41. if rand < config_.P:
  42. if flow_recall_rank:
  43. rank_result.append(flow_recall_rank[0])
  44. flow_recall_rank.remove(flow_recall_rank[0])
  45. else:
  46. rank_result.extend(rov_recall_rank[:size - config_.K - i])
  47. return rank_result
  48. else:
  49. if rov_recall_rank:
  50. rank_result.append(rov_recall_rank[0])
  51. rov_recall_rank.remove(rov_recall_rank[0])
  52. else:
  53. rank_result.extend(flow_recall_rank[:size - config_.K - i])
  54. return rank_result
  55. i += 1
  56. return rank_result
  57. def remove_duplicate(rov_recall, flow_recall):
  58. """
  59. 对多路召回的视频去重
  60. 去重原则:
  61. 如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
  62. :param rov_recall: ROV召回池-已排序
  63. :param flow_recall: 流量池-已排序
  64. :return:
  65. """
  66. flow_recall_result = []
  67. rov_recall_remove = []
  68. flow_recall_video_ids = [item['videoId'] for item in flow_recall]
  69. # rov_recall topK
  70. for item in rov_recall[:config_.K]:
  71. if item['videoId'] in flow_recall_video_ids:
  72. flow_recall_video_ids.remove(item['videoId'])
  73. # other
  74. for item in rov_recall[config_.K:]:
  75. if item['videoId'] in flow_recall_video_ids:
  76. rov_recall_remove.append(item)
  77. # rov recall remove
  78. for item in rov_recall_remove:
  79. rov_recall.remove(item)
  80. # flow recall remove
  81. for item in flow_recall:
  82. if item['videoId'] in flow_recall_video_ids:
  83. flow_recall_result.append(item)
  84. return rov_recall, flow_recall_result
  85. def bottom_strategy(size, app_type, ab_code):
  86. """
  87. 兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
  88. :param size: 需要获取的视频数
  89. :param app_type: 产品标识 type-int
  90. :param ab_code: abCode
  91. :return:
  92. """
  93. pool_recall = PoolRecall(app_type=app_type, ab_code=ab_code)
  94. key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
  95. redis_helper = RedisHelper()
  96. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
  97. if not data:
  98. log_.info('{} —— ROV推荐进入了二次兜底, data = {}'.format(config_.ENV_TEXT, data))
  99. send_msg_to_feishu('{} —— ROV推荐进入了二次兜底,请查看是否有数据更新失败问题。'.format(config_.ENV_TEXT))
  100. # 二次兜底
  101. bottom_data = bottom_strategy_last(size=size, app_type=app_type, ab_code=ab_code)
  102. return bottom_data
  103. # 视频状态过滤采用离线定时过滤方案
  104. # 状态过滤
  105. # filter_videos = FilterVideos(app_type=app_type, video_ids=data)
  106. # filtered_data = filter_videos.filter_video_status(video_ids=data)
  107. if len(data) > size:
  108. random_data = numpy.random.choice(data, size, False)
  109. else:
  110. random_data = data
  111. bottom_data = [{'videoId': int(item), 'pushFrom': config_.PUSH_FROM['bottom'], 'abCode': ab_code}
  112. for item in random_data]
  113. return bottom_data
  114. def bottom_strategy_last(size, app_type, ab_code):
  115. """
  116. 兜底策略: 从兜底视频中随机获取视频,进行状态过滤后的视频
  117. :param size: 需要获取的视频数
  118. :param app_type: 产品标识 type-int
  119. :param ab_code: abCode
  120. :return:
  121. """
  122. redis_helper = RedisHelper()
  123. bottom_data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  124. random_data = numpy.random.choice(bottom_data, size * 30, False)
  125. # 视频状态过滤采用离线定时过滤方案
  126. # 状态过滤
  127. # filter_videos = FilterVideos(app_type=app_type, video_ids=random_data)
  128. # filtered_data = filter_videos.filter_video_status(video_ids=random_data)
  129. bottom_data = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom_last'], 'abCode': ab_code}
  130. for video_id in random_data[:size]]
  131. return bottom_data
  132. def video_rank_by_w_h_rate(videos):
  133. """
  134. 视频宽高比实验(每组的前两个视频调整为横屏视频),根据视频宽高比信息对视频进行重排
  135. :param videos:
  136. :return:
  137. """
  138. redis_helper = RedisHelper()
  139. # ##### 判断前两个视频是否是置顶视频 或者 流量池视频
  140. top_2_push_from_flag = [False, False]
  141. for i, video in enumerate(videos[:2]):
  142. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  143. top_2_push_from_flag[i] = True
  144. if top_2_push_from_flag[0] and top_2_push_from_flag[1]:
  145. return videos
  146. # ##### 判断前两个视频是否为横屏
  147. top_2_w_h_rate_flag = [False, False]
  148. for i, video in enumerate(videos[:2]):
  149. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  150. # 视频来源为置顶 或 流量池时,不做判断
  151. top_2_w_h_rate_flag[i] = True
  152. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  153. # 视频来源为 rov召回池 或 一层兜底时,判断是否是横屏
  154. w_h_rate = redis_helper.get_score_with_value(
  155. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  156. if w_h_rate is not None:
  157. top_2_w_h_rate_flag[i] = True
  158. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  159. # 视频来源为 二层兜底时,判断是否是横屏
  160. w_h_rate = redis_helper.get_score_with_value(
  161. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  162. if w_h_rate is not None:
  163. top_2_w_h_rate_flag[i] = True
  164. if top_2_w_h_rate_flag[0] and top_2_w_h_rate_flag[1]:
  165. return videos
  166. # ##### 前两个视频中有不符合前面两者条件的,对视频进行位置调整
  167. # 记录横屏视频位置
  168. horizontal_video_index = []
  169. # 记录流量池视频位置
  170. flow_video_index = []
  171. # 记录置顶视频位置
  172. top_video_index = []
  173. for i, video in enumerate(videos):
  174. # 视频来源为置顶
  175. if video['pushFrom'] == config_.PUSH_FROM['top']:
  176. top_video_index.append(i)
  177. # 视频来源为流量池
  178. elif video['pushFrom'] == config_.PUSH_FROM['flow_recall']:
  179. flow_video_index.append(i)
  180. # 视频来源为rov召回池 或 一层兜底
  181. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  182. w_h_rate = redis_helper.get_score_with_value(
  183. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  184. if w_h_rate is not None:
  185. horizontal_video_index.append(i)
  186. else:
  187. continue
  188. # 视频来源为 二层兜底
  189. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  190. w_h_rate = redis_helper.get_score_with_value(
  191. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  192. if w_h_rate is not None:
  193. horizontal_video_index.append(i)
  194. else:
  195. continue
  196. # 重新排序
  197. top2_index = []
  198. for i in range(2):
  199. if i in top_video_index:
  200. top2_index.append(i)
  201. elif i in flow_video_index:
  202. top2_index.append(i)
  203. flow_video_index.remove(i)
  204. elif i in horizontal_video_index:
  205. top2_index.append(i)
  206. horizontal_video_index.remove(i)
  207. elif len(horizontal_video_index) > 0:
  208. # 调整横屏视频到第一位
  209. top2_index.append(horizontal_video_index[0])
  210. # 从横屏位置记录中移除
  211. horizontal_video_index.pop(0)
  212. elif i == 0:
  213. return videos
  214. # 重排
  215. flow_result = [videos[i] for i in flow_video_index]
  216. other_result = [videos[i] for i in range(len(videos)) if i not in top2_index and i not in flow_video_index]
  217. top2_result = []
  218. for i, j in enumerate(top2_index):
  219. item = videos[j]
  220. if i != j:
  221. # 修改abCode
  222. item['abCode'] = config_.AB_CODE['w_h_rate']
  223. top2_result.append(item)
  224. new_rank_result = top2_result
  225. for i in range(len(top2_index), len(videos)):
  226. if i in flow_video_index:
  227. new_rank_result.append(flow_result[0])
  228. flow_result.pop(0)
  229. else:
  230. new_rank_result.append(other_result[0])
  231. other_result.pop(0)
  232. return new_rank_result
  233. if __name__ == '__main__':
  234. d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
  235. {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
  236. {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
  237. {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
  238. {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
  239. {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
  240. {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
  241. {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
  242. {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
  243. {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
  244. res = video_rank_by_w_h_rate(videos=d_test)
  245. for tmp in res:
  246. print(tmp)