video_rank.py 59 KB


  1. import json
  2. import random
  3. import numpy
  4. from log import Log
  5. from config import set_config
  6. from video_recall import PoolRecall
  7. from db_helper import RedisHelper
  8. from utils import FilterVideos, send_msg_to_feishu
  9. from rank_service import get_featurs, get_tf_serving_sores
  10. log_ = Log()
  11. config_ = set_config()
  12. def video_rank(data, size, top_K, flow_pool_P, flow_pool_recall_process=None):
  13. """
  14. 视频分发排序
  15. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  16. :param size: 请求数
  17. :param top_K: 保证topK为召回池视频 type-int
  18. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  19. :return: rank_result
  20. """
  21. # add_flow_pool_recall_log
  22. if flow_pool_recall_process is None:
  23. flow_pool_recall_process = {}
  24. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  25. # add_flow_pool_recall_log
  26. return [], flow_pool_recall_process
  27. # return []
  28. # 将各路召回的视频按照score从大到小排序
  29. # 最惊奇相关推荐相似视频
  30. # relevant_recall = [item for item in data['rov_pool_recall']
  31. # if item.get('pushFrom') == config_.PUSH_FROM['top_video_relevant_appType_19']]
  32. # relevant_recall_rank = sorted(relevant_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  33. # 最惊奇完整影视视频
  34. # whole_movies_recall = [item for item in data['rov_pool_recall']
  35. # if item.get('pushFrom') == config_.PUSH_FROM['whole_movies']]
  36. # whole_movies_recall_rank = sorted(whole_movies_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  37. # 最惊奇影视解说视频
  38. # talk_videos_recall = [item for item in data['rov_pool_recall']
  39. # if item.get('pushFrom') == config_.PUSH_FROM['talk_videos']]
  40. # talk_videos_recall_rank = sorted(talk_videos_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  41. # 小时级更新数据
  42. # h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
  43. # h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  44. # 相对30天天级规则更新数据
  45. day_30_recall = [item for item in data['rov_pool_recall']
  46. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_30day']]
  47. day_30_recall_rank = sorted(day_30_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  48. # 地域分组小时级规则更新数据
  49. region_h_recall = [item for item in data['rov_pool_recall']
  50. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  51. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  52. # 地域分组小时级更新24h规则更新数据
  53. region_24h_recall = [item for item in data['rov_pool_recall']
  54. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  55. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  56. # 地域分组天级规则更新数据
  57. # region_day_recall = [item for item in data['rov_pool_recall']
  58. # if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
  59. # region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  60. # 相对24h规则更新数据
  61. rule_24h_recall = [item for item in data['rov_pool_recall']
  62. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  63. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  64. # 相对24h规则筛选后剩余更新数据
  65. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  66. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  67. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  68. # 相对48h规则更新数据
  69. rule_48h_recall = [item for item in data['rov_pool_recall']
  70. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h']]
  71. rule_48h_recall_rank = sorted(rule_48h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  72. # 相对48h规则筛选后剩余更新数据
  73. rule_48h_dup_recall = [item for item in data['rov_pool_recall']
  74. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h_dup']]
  75. rule_48h_dup_recall_rank = sorted(rule_48h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  76. # 天级规则更新数据
  77. # day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
  78. # day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  79. # ROV召回池
  80. # rov_initial_recall = [
  81. # item for item in data['rov_pool_recall']
  82. # if item.get('pushFrom') not in
  83. # [config_.PUSH_FROM['top_video_relevant_appType_19'],
  84. # config_.PUSH_FROM['rov_recall_h'],
  85. # config_.PUSH_FROM['rov_recall_region_h'],
  86. # config_.PUSH_FROM['rov_recall_region_24h'],
  87. # config_.PUSH_FROM['rov_recall_region_day'],
  88. # config_.PUSH_FROM['rov_recall_24h'],
  89. # config_.PUSH_FROM['rov_recall_24h_dup'],
  90. # config_.PUSH_FROM['rov_recall_48h'],
  91. # config_.PUSH_FROM['rov_recall_48h_dup'],
  92. # config_.PUSH_FROM['rov_recall_day'],
  93. # config_.PUSH_FROM['whole_movies'],
  94. # config_.PUSH_FROM['talk_videos']]
  95. # ]
  96. # rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  97. # rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
  98. # day_30_recall_rank + region_h_recall_rank + region_24h_recall_rank + \
  99. # region_day_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  100. # rule_48h_recall_rank + rule_48h_dup_recall_rank + \
  101. # day_recall_rank + rov_initial_recall_rank
  102. rov_recall_rank = day_30_recall_rank + \
  103. region_h_recall_rank + region_24h_recall_rank + \
  104. rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  105. rule_48h_recall_rank + rule_48h_dup_recall_rank
  106. # 流量池
  107. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  108. # 对各路召回的视频进行去重
  109. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  110. top_K=top_K)
  111. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  112. # rov_recall_rank, flow_recall_rank))
  113. # rank_result = relevant_recall_rank
  114. rank_result = []
  115. # add_flow_pool_recall_log
  116. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
  117. 'flow_recall_rank': flow_recall_rank}
  118. # 从ROV召回池中获取top k
  119. if len(rov_recall_rank) > 0:
  120. rank_result.extend(rov_recall_rank[:top_K])
  121. rov_recall_rank = rov_recall_rank[top_K:]
  122. else:
  123. rank_result.extend(flow_recall_rank[:top_K])
  124. flow_recall_rank = flow_recall_rank[top_K:]
  125. # 按概率 p 及score排序获取 size - k 个视频
  126. i = 0
  127. while i < size - top_K:
  128. # 随机生成[0, 1)浮点数
  129. rand = random.random()
  130. # add_flow_pool_recall_log
  131. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  132. flow_pool_recall_process[f'{i}_rand'] = rand
  133. # log_.info('rand: {}'.format(rand))
  134. if rand < flow_pool_P:
  135. if flow_recall_rank:
  136. rank_result.append(flow_recall_rank[0])
  137. flow_recall_rank.remove(flow_recall_rank[0])
  138. else:
  139. rank_result.extend(rov_recall_rank[:size - top_K - i])
  140. return rank_result[:size], flow_pool_recall_process
  141. else:
  142. if rov_recall_rank:
  143. rank_result.append(rov_recall_rank[0])
  144. rov_recall_rank.remove(rov_recall_rank[0])
  145. else:
  146. rank_result.extend(flow_recall_rank[:size - top_K - i])
  147. return rank_result[:size], flow_pool_recall_process
  148. i += 1
  149. return rank_result[:size], flow_pool_recall_process
  150. def video_new_rank(videoIds, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  151. """
  152. 视频分发排序
  153. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  154. :param size: 请求数
  155. :param top_K: 保证topK为召回池视频 type-int
  156. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  157. :return: rank_result
  158. """
  159. add_flow_set = set('')
  160. if not videoIds or len(videoIds)==0:
  161. return [], add_flow_set
  162. redisObj = RedisHelper()
  163. vidKeys = []
  164. for vid in videoIds:
  165. vidKeys.append("k_p:"+str(vid))
  166. #print("vidKeys:", vidKeys)
  167. video_scores = redisObj.get_batch_key(vidKeys)
  168. #print(video_scores)
  169. video_items = []
  170. for i in range(len(video_scores)):
  171. try:
  172. #print(video_scores[i])
  173. if video_scores[i] is None:
  174. video_items.append((videoIds[i], 0.0))
  175. else:
  176. video_score_str = json.loads(video_scores[i])
  177. #print("video_score_str:",video_score_str)
  178. video_items.append((videoIds[i], video_score_str[0]))
  179. except Exception:
  180. video_items.append((videoIds[i], 0.0))
  181. sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
  182. #print("sort_items:", sort_items)
  183. rov_recall_rank = sort_items
  184. fast_flow_recall_rank = []
  185. flow_recall_rank = []
  186. for item in sort_items:
  187. if item[0] in fast_flow_set:
  188. fast_flow_recall_rank.append(item)
  189. elif item[0] in flow_set:
  190. flow_recall_rank.append(item)
  191. # all flow result
  192. all_flow_recall_rank = fast_flow_recall_rank+flow_recall_rank
  193. rank_result = []
  194. rank_set = set('')
  195. # 从ROV召回池中获取top k
  196. if len(rov_recall_rank) > 0:
  197. rank_result.extend(rov_recall_rank[:top_K])
  198. rov_recall_rank = rov_recall_rank[top_K:]
  199. else:
  200. rank_result.extend(all_flow_recall_rank[:top_K])
  201. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  202. for rank_item in rank_result:
  203. rank_set.add(rank_item[0])
  204. #print("rank_result:", rank_result)
  205. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  206. i = 0
  207. left_quato = size - top_K
  208. j = 0
  209. jj = 0
  210. while i < left_quato and (j<len(all_flow_recall_rank) or jj<len(rov_recall_rank)):
  211. # 随机生成[0, 1)浮点数
  212. rand = random.random()
  213. # log_.info('rand: {}'.format(rand))
  214. if rand < flow_pool_P:
  215. for flow_item in all_flow_recall_rank:
  216. j+=1
  217. if flow_item[0] in rank_set:
  218. continue
  219. else:
  220. rank_result.append(flow_item)
  221. rank_set.add(flow_item[0])
  222. add_flow_set.add(flow_item[0])
  223. i += 1
  224. if i>= left_quato:
  225. break
  226. else:
  227. for recall_item in rov_recall_rank:
  228. jj+=1
  229. if recall_item[0] in rank_set:
  230. continue
  231. else:
  232. rank_result.append(recall_item)
  233. rank_set.add(recall_item[0])
  234. i += 1
  235. if i>= left_quato:
  236. break
  237. #print("rank_result:", rank_result)
  238. #print("add_flow_set:", add_flow_set)
  239. return rank_result[:size], add_flow_set
  240. def refactor_video_rank(rov_recall_rank, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  241. """
  242. 视频分发排序
  243. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  244. :param size: 请求数
  245. :param top_K: 保证topK为召回池视频 type-int
  246. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  247. :return: rank_result
  248. """
  249. if not rov_recall_rank or len(rov_recall_rank) == 0:
  250. return []
  251. fast_flow_recall_rank = []
  252. flow_recall_rank = []
  253. for item in rov_recall_rank:
  254. vid = item.get('videoId', 0)
  255. #print(item)
  256. if vid in fast_flow_set:
  257. fast_flow_recall_rank.append(item)
  258. elif vid in flow_set:
  259. flow_recall_rank.append(item)
  260. # all flow result
  261. all_flow_recall_rank = fast_flow_recall_rank + flow_recall_rank
  262. rank_result = []
  263. rank_set = set('')
  264. # 从ROV召回池中获取top k
  265. if len(rov_recall_rank) > 0:
  266. rank_result.extend(rov_recall_rank[:top_K])
  267. rov_recall_rank = rov_recall_rank[top_K:]
  268. else:
  269. rank_result.extend(all_flow_recall_rank[:top_K])
  270. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  271. #已存放了多少VID
  272. for rank_item in rank_result:
  273. rank_set.add(rank_item.get('videoId', 0))
  274. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  275. i = 0
  276. while i < size - top_K:
  277. # 随机生成[0, 1)浮点数
  278. rand = random.random()
  279. # log_.info('rand: {}'.format(rand))
  280. if rand < flow_pool_P:
  281. for flow_item in all_flow_recall_rank:
  282. flow_vid = flow_item.get('videoId', 0)
  283. if flow_vid in rank_set:
  284. continue
  285. else:
  286. rank_result.append(flow_item)
  287. rank_set.add(flow_vid)
  288. else:
  289. for recall_item in rov_recall_rank:
  290. flow_vid = recall_item.get('videoId', 0)
  291. if flow_vid in rank_set:
  292. continue
  293. else:
  294. rank_result.append(recall_item)
  295. rank_set.add(flow_vid)
  296. i += 1
  297. return rank_result[:size]
  298. def remove_duplicate(rov_recall, flow_recall, top_K):
  299. """
  300. 对多路召回的视频去重
  301. 去重原则:
  302. 如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
  303. :param rov_recall: ROV召回池-已排序
  304. :param flow_recall: 流量池-已排序
  305. :param top_K: 保证topK为召回池视频 type-int
  306. :return:
  307. """
  308. flow_recall_result = []
  309. rov_recall_remove = []
  310. flow_recall_video_ids = [item['videoId'] for item in flow_recall]
  311. # rov_recall topK
  312. for item in rov_recall[:top_K]:
  313. if item['videoId'] in flow_recall_video_ids:
  314. flow_recall_video_ids.remove(item['videoId'])
  315. # other
  316. for item in rov_recall[top_K:]:
  317. if item['videoId'] in flow_recall_video_ids:
  318. rov_recall_remove.append(item)
  319. # rov recall remove
  320. for item in rov_recall_remove:
  321. rov_recall.remove(item)
  322. # flow recall remove
  323. for item in flow_recall:
  324. if item['videoId'] in flow_recall_video_ids:
  325. flow_recall_result.append(item)
  326. return rov_recall, flow_recall_result
  327. def bottom_strategy(request_id, size, app_type, ab_code, params):
  328. """
  329. 兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
  330. :param request_id: request_id
  331. :param size: 需要获取的视频数
  332. :param app_type: 产品标识 type-int
  333. :param ab_code: abCode
  334. :param params:
  335. :return:
  336. """
  337. pool_recall = PoolRecall(request_id=request_id, app_type=app_type, ab_code=ab_code)
  338. key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
  339. redis_helper = RedisHelper(params=params)
  340. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
  341. if not data:
  342. log_.info('{} —— ROV推荐进入了二次兜底, data = {}'.format(config_.ENV_TEXT, data))
  343. send_msg_to_feishu('{} —— ROV推荐进入了二次兜底,请查看是否有数据更新失败问题。'.format(config_.ENV_TEXT))
  344. # 二次兜底
  345. bottom_data = bottom_strategy_last(size=size, app_type=app_type, ab_code=ab_code, params=params)
  346. return bottom_data
  347. # 视频状态过滤采用离线定时过滤方案
  348. # 状态过滤
  349. # filter_videos = FilterVideos(app_type=app_type, video_ids=data)
  350. # filtered_data = filter_videos.filter_video_status(video_ids=data)
  351. if len(data) > size:
  352. random_data = numpy.random.choice(data, size, False)
  353. else:
  354. random_data = data
  355. bottom_data = [{'videoId': int(item), 'pushFrom': config_.PUSH_FROM['bottom'], 'abCode': ab_code}
  356. for item in random_data]
  357. return bottom_data
  358. def bottom_strategy_last(size, app_type, ab_code, params):
  359. """
  360. 兜底策略: 从兜底视频中随机获取视频,进行状态过滤后的视频
  361. :param size: 需要获取的视频数
  362. :param app_type: 产品标识 type-int
  363. :param ab_code: abCode
  364. :param params:
  365. :return:
  366. """
  367. redis_helper = RedisHelper(params=params)
  368. bottom_data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  369. random_data = numpy.random.choice(bottom_data, size * 30, False)
  370. # 视频状态过滤采用离线定时过滤方案
  371. # 状态过滤
  372. # filter_videos = FilterVideos(app_type=app_type, video_ids=random_data)
  373. # filtered_data = filter_videos.filter_video_status(video_ids=random_data)
  374. bottom_data = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom_last'], 'abCode': ab_code}
  375. for video_id in random_data[:size]]
  376. return bottom_data
  377. def bottom_strategy2(size, app_type, mid, uid, ab_code, client_info, params):
  378. """
  379. 兜底策略: 从兜底视频中随机获取视频,进行过滤后的视频
  380. :param size: 需要获取的视频数
  381. :param app_type: 产品标识 type-int
  382. :param mid: mid
  383. :param uid: uid
  384. :param ab_code: abCode
  385. :param client_info: 地域信息
  386. :param params:
  387. :return:
  388. """
  389. # 获取存在城市分组数据的城市编码列表
  390. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  391. # 获取provinceCode
  392. province_code = client_info.get('provinceCode', '-1')
  393. # 获取cityCode
  394. city_code = client_info.get('cityCode', '-1')
  395. if city_code in city_code_list:
  396. # 分城市数据存在时,获取城市分组数据
  397. region_code = city_code
  398. else:
  399. region_code = province_code
  400. if region_code == '':
  401. region_code = '-1'
  402. redis_helper = RedisHelper(params=params)
  403. bottom_data = redis_helper.get_data_from_set(key_name=config_.BOTTOM2_KEY_NAME)
  404. bottom_result = []
  405. if bottom_data is None:
  406. return bottom_result
  407. if len(bottom_data) > 0:
  408. try:
  409. random_data = numpy.random.choice(bottom_data, size * 5, False)
  410. except Exception as e:
  411. random_data = bottom_data
  412. video_ids = [int(item) for item in random_data]
  413. # 过滤
  414. filter_ = FilterVideos(request_id=params.request_id, app_type=app_type, mid=mid, uid=uid, video_ids=video_ids)
  415. filtered_data = filter_.filter_videos(pool_type='flow', region_code=region_code)
  416. if filtered_data:
  417. bottom_result = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom2'], 'abCode': ab_code}
  418. for video_id in filtered_data[:size]]
  419. return bottom_result
  420. def video_rank_by_w_h_rate(videos):
  421. """
  422. 视频宽高比实验(每组的前两个视频调整为横屏视频),根据视频宽高比信息对视频进行重排
  423. :param videos:
  424. :return:
  425. """
  426. redis_helper = RedisHelper()
  427. # ##### 判断前两个视频是否是置顶视频 或者 流量池视频
  428. top_2_push_from_flag = [False, False]
  429. for i, video in enumerate(videos[:2]):
  430. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  431. top_2_push_from_flag[i] = True
  432. if top_2_push_from_flag[0] and top_2_push_from_flag[1]:
  433. return videos
  434. # ##### 判断前两个视频是否为横屏
  435. top_2_w_h_rate_flag = [False, False]
  436. for i, video in enumerate(videos[:2]):
  437. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  438. # 视频来源为置顶 或 流量池时,不做判断
  439. top_2_w_h_rate_flag[i] = True
  440. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  441. # 视频来源为 rov召回池 或 一层兜底时,判断是否是横屏
  442. w_h_rate = redis_helper.get_score_with_value(
  443. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  444. if w_h_rate is not None:
  445. top_2_w_h_rate_flag[i] = True
  446. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  447. # 视频来源为 二层兜底时,判断是否是横屏
  448. w_h_rate = redis_helper.get_score_with_value(
  449. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  450. if w_h_rate is not None:
  451. top_2_w_h_rate_flag[i] = True
  452. if top_2_w_h_rate_flag[0] and top_2_w_h_rate_flag[1]:
  453. return videos
  454. # ##### 前两个视频中有不符合前面两者条件的,对视频进行位置调整
  455. # 记录横屏视频位置
  456. horizontal_video_index = []
  457. # 记录流量池视频位置
  458. flow_video_index = []
  459. # 记录置顶视频位置
  460. top_video_index = []
  461. for i, video in enumerate(videos):
  462. # 视频来源为置顶
  463. if video['pushFrom'] == config_.PUSH_FROM['top']:
  464. top_video_index.append(i)
  465. # 视频来源为流量池
  466. elif video['pushFrom'] == config_.PUSH_FROM['flow_recall']:
  467. flow_video_index.append(i)
  468. # 视频来源为rov召回池 或 一层兜底
  469. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  470. w_h_rate = redis_helper.get_score_with_value(
  471. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  472. if w_h_rate is not None:
  473. horizontal_video_index.append(i)
  474. else:
  475. continue
  476. # 视频来源为 二层兜底
  477. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  478. w_h_rate = redis_helper.get_score_with_value(
  479. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  480. if w_h_rate is not None:
  481. horizontal_video_index.append(i)
  482. else:
  483. continue
  484. # 重新排序
  485. top2_index = []
  486. for i in range(2):
  487. if i in top_video_index:
  488. top2_index.append(i)
  489. elif i in flow_video_index:
  490. top2_index.append(i)
  491. flow_video_index.remove(i)
  492. elif i in horizontal_video_index:
  493. top2_index.append(i)
  494. horizontal_video_index.remove(i)
  495. elif len(horizontal_video_index) > 0:
  496. # 调整横屏视频到第一位
  497. top2_index.append(horizontal_video_index[0])
  498. # 从横屏位置记录中移除
  499. horizontal_video_index.pop(0)
  500. elif i == 0:
  501. return videos
  502. # 重排
  503. flow_result = [videos[i] for i in flow_video_index]
  504. other_result = [videos[i] for i in range(len(videos)) if i not in top2_index and i not in flow_video_index]
  505. top2_result = []
  506. for i, j in enumerate(top2_index):
  507. item = videos[j]
  508. if i != j:
  509. # 修改abCode
  510. item['abCode'] = config_.AB_CODE['w_h_rate']
  511. top2_result.append(item)
  512. new_rank_result = top2_result
  513. for i in range(len(top2_index), len(videos)):
  514. if i in flow_video_index:
  515. new_rank_result.append(flow_result[0])
  516. flow_result.pop(0)
  517. else:
  518. new_rank_result.append(other_result[0])
  519. other_result.pop(0)
  520. return new_rank_result
  521. def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_video_index=2):
  522. """
  523. 视频分发排序 - 包含老视频, 老视频插入固定位置
  524. :param rank_result: 排序后的结果
  525. :param size: 请求数
  526. :param old_video_index: 老视频插入的位置索引,默认为2
  527. :return: new_rank_result
  528. """
  529. if not old_video_recall:
  530. return rank_result
  531. if not rank_result:
  532. return old_video_recall[:size]
  533. # 视频去重
  534. rank_video_ids = [item['videoId'] for item in rank_result]
  535. old_video_remove = []
  536. for old_video in old_video_recall:
  537. if old_video['videoId'] in rank_video_ids:
  538. old_video_remove.append(old_video)
  539. for item in old_video_remove:
  540. old_video_recall.remove(item)
  541. if not old_video_recall:
  542. return rank_result
  543. # 插入老视频
  544. # 随机获取一个视频
  545. ind = random.randint(0, len(old_video_recall) - 1)
  546. old_video = old_video_recall[ind]
  547. # 插入
  548. if len(rank_result) < top_K:
  549. new_rank_result = rank_result + [old_video]
  550. else:
  551. new_rank_result = rank_result[:old_video_index] + [old_video] + rank_result[old_video_index:]
  552. if len(new_rank_result) > size:
  553. # 判断后两位视频来源
  554. push_from_1 = new_rank_result[-1]['pushFrom']
  555. push_from_2 = new_rank_result[-2]['pushFrom']
  556. if push_from_2 == config_.PUSH_FROM['rov_recall'] and push_from_1 == config_.PUSH_FROM['flow_recall']:
  557. new_rank_result = new_rank_result[:-2] + new_rank_result[-1:]
  558. return new_rank_result[:size]
  559. def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=None, env_dict=None):
  560. """
  561. 视频分发排序
  562. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  563. :param size: 请求数
  564. :param top_K: 保证topK为召回池视频 type-int
  565. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  566. :return: rank_result
  567. """
  568. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  569. return [], 0
  570. #全量的是vlog,票圈精选, 334,60057,
  571. # 60054: simrecall,
  572. pre_str = "k_p2:"
  573. rov_recall_rank = data['rov_pool_recall']
  574. #print(rov_recall_rank)
  575. #call rank service
  576. #flag_call_service = 0
  577. sort_index = 0
  578. if exp_config and "sort_flag" in exp_config:
  579. sort_index = exp_config["sort_flag"]
  580. #print("sort_index:", sort_index)
  581. redisObj = RedisHelper()
  582. vidKeys = []
  583. rec_recall_item_list = []
  584. rec_recall_vid_list = []
  585. day_vidKeys = []
  586. hour_vidKeys = []
  587. pre_day_str = "v_ctr:"
  588. pre_hour_str = "v_hour_ctr:"
  589. for recall_item in data['rov_pool_recall']:
  590. try:
  591. vid = int(recall_item.get("videoId", 0))
  592. rec_recall_vid_list.append(vid)
  593. rec_recall_item_list.append(recall_item)
  594. vidKeys.append(pre_str + str(vid))
  595. day_vidKeys.append(pre_day_str+str(vid))
  596. hour_vidKeys.append(pre_hour_str+str(vid))
  597. except:
  598. continue
  599. video_scores = redisObj.get_batch_key(vidKeys)
  600. #print("video_scores:", video_scores)
  601. if (ab_code == 60066 or ab_code == 60069 or ab_code == 60070 or ab_code == 60071) and len(rec_recall_vid_list)>0:
  602. video_static_info = redisObj.get_batch_key(day_vidKeys)
  603. video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
  604. #print("env_dict:", env_dict)
  605. feature_dict = get_featurs(mid, data, size, top_K, flow_pool_P, rec_recall_vid_list,env_dict, video_static_info, video_hour_static_info)
  606. score_result = get_tf_serving_sores(feature_dict)
  607. #print("score_result:", score_result)
  608. if video_scores and len(video_scores)>0 and rec_recall_item_list and score_result and len(score_result) > 0\
  609. and len(score_result) == len(rec_recall_item_list) and len(video_scores)== len(score_result):
  610. for i in range(len(score_result)):
  611. try:
  612. if video_scores[i] is None and len(score_result[i])>0:
  613. return_score = 0.000000001
  614. # sore_index :10 = model score
  615. if sort_index == 10:
  616. total_score = score_result[i][0]
  617. else:
  618. total_score = return_score * score_result[i][0]
  619. rec_recall_item_list[i]['sort_score'] = total_score
  620. rec_recall_item_list[i]['base_rov_score'] = 0.0
  621. rec_recall_item_list[i]['share_score'] = return_score
  622. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  623. else:
  624. video_score_str = json.loads(video_scores[i])
  625. # sore_index :10 = model score
  626. return_score = 0.000000001
  627. if sort_index == 10:
  628. total_score = score_result[i][0]
  629. else:
  630. if len(video_score_str)>= sort_index and len(video_score_str)>0:
  631. return_score = video_score_str[sort_index]
  632. total_score = return_score * score_result[i][0]
  633. #print("total_score:", total_score, " model score :", score_result[i][0], "return_score:",
  634. # return_score)
  635. rec_recall_item_list[i]['sort_score'] = total_score
  636. rec_recall_item_list[i]['base_rov_score'] = video_score_str[0]
  637. rec_recall_item_list[i]['share_score'] = return_score
  638. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  639. except Exception as e:
  640. #print('exception: {}:', e)
  641. return_score = 0.000000001
  642. if sort_index == 10:
  643. total_score = 0.00000001
  644. else:
  645. total_score = return_score * 0.00000001
  646. rec_recall_item_list[i]['sort_score'] = total_score
  647. rec_recall_item_list[i]['base_rov_score'] = 0
  648. rec_recall_item_list[i]['share_score'] = return_score
  649. rec_recall_item_list[i]['model_score'] = 0.00000001
  650. rec_recall_item_list[i]['flag_call_service'] = 1
  651. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  652. else:
  653. rov_recall_rank = sup_rank(video_scores, rec_recall_item_list)
  654. else:
  655. if video_scores and len(rec_recall_item_list) > 0 and len(video_scores)>0:
  656. for i in range(len(video_scores)):
  657. try:
  658. if video_scores[i] is None:
  659. rec_recall_item_list[i]['sort_score'] = 0.0
  660. else:
  661. video_score_str = json.loads(video_scores[i])
  662. # print("video_score_str:", video_score_str)
  663. rec_recall_item_list[i]['sort_score'] = video_score_str[0]
  664. except Exception:
  665. rec_recall_item_list[i]['sort_score'] = 0.0
  666. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  667. #print(rov_recall_rank)
  668. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  669. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  670. top_K=top_K)
  671. rank_result = []
  672. rank_set = set('')
  673. # 从ROV召回池中获取top k
  674. if len(rov_recall_rank) > 0:
  675. rank_result.extend(rov_recall_rank[:top_K])
  676. rov_recall_rank = rov_recall_rank[top_K:]
  677. else:
  678. rank_result.extend(flow_recall_rank[:top_K])
  679. flow_recall_rank = flow_recall_rank[top_K:]
  680. # 按概率 p 及score排序获取 size - k 个视频
  681. flow_num = 0
  682. flowConfig = 0
  683. # 本段代码控制流量池,通过实验传参,现不动
  684. if flowConfig == 1 and len(rov_recall_rank) > 0:
  685. for recall_item in rank_result:
  686. flow_recall_name = recall_item.get("flowPool", '')
  687. flow_num = flow_num + 1
  688. all_recall_rank = rov_recall_rank + flow_recall_rank
  689. if flow_num > 0:
  690. rank_result.extend(all_recall_rank[:size - top_K])
  691. return rank_result, flow_num
  692. else:
  693. i = 0
  694. while i < size - top_K:
  695. # 随机生成[0, 1)浮点数
  696. rand = random.random()
  697. # log_.info('rand: {}'.format(rand))
  698. if rand < flow_pool_P:
  699. if flow_recall_rank:
  700. rank_result.append(flow_recall_rank[0])
  701. flow_recall_rank.remove(flow_recall_rank[0])
  702. else:
  703. rank_result.extend(rov_recall_rank[:size - top_K - i])
  704. return rank_result[:size], flow_num
  705. else:
  706. if rov_recall_rank:
  707. rank_result.append(rov_recall_rank[0])
  708. rov_recall_rank.remove(rov_recall_rank[0])
  709. else:
  710. rank_result.extend(flow_recall_rank[:size - top_K - i])
  711. return rank_result[:size], flow_num
  712. i += 1
  713. else:
  714. i = 0
  715. while i < size - top_K:
  716. # 随机生成[0, 1)浮点数
  717. rand = random.random()
  718. # log_.info('rand: {}'.format(rand))
  719. if rand < flow_pool_P:
  720. if flow_recall_rank:
  721. rank_result.append(flow_recall_rank[0])
  722. flow_recall_rank.remove(flow_recall_rank[0])
  723. else:
  724. rank_result.extend(rov_recall_rank[:size - top_K - i])
  725. return rank_result[:size], flow_num
  726. else:
  727. if rov_recall_rank:
  728. rank_result.append(rov_recall_rank[0])
  729. rov_recall_rank.remove(rov_recall_rank[0])
  730. else:
  731. rank_result.extend(flow_recall_rank[:size - top_K - i])
  732. return rank_result[:size], flow_num
  733. i += 1
  734. return rank_result[:size], flow_num
  735. def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:', flow_pool_recall_process=None):
  736. """
  737. 视频分发排序
  738. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  739. :param size: 请求数
  740. :param top_K: 保证topK为召回池视频 type-int
  741. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  742. :param rank_key_prefix:
  743. :return: rank_result
  744. """
  745. redis_helper = RedisHelper()
  746. # add_flow_pool_recall_log
  747. if flow_pool_recall_process is None:
  748. flow_pool_recall_process = {}
  749. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  750. # add_flow_pool_recall_log
  751. return [], 0, flow_pool_recall_process
  752. # return [], 0
  753. rov_recall_rank = data['rov_pool_recall']
  754. vid_keys = []
  755. rec_recall_item_list = []
  756. rec_recall_vid_list = []
  757. for recall_item in data['rov_pool_recall']:
  758. try:
  759. vid = int(recall_item.get("videoId", 0))
  760. rec_recall_vid_list.append(vid)
  761. rec_recall_item_list.append(recall_item)
  762. vid_keys.append(f"{rank_key_prefix}{vid}")
  763. except:
  764. continue
  765. video_scores = redis_helper.get_batch_key(vid_keys)
  766. if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
  767. for i in range(len(video_scores)):
  768. try:
  769. if video_scores[i] is None:
  770. rec_recall_item_list[i]['sort_score'] = 0.0
  771. else:
  772. rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
  773. except Exception:
  774. rec_recall_item_list[i]['sort_score'] = 0.0
  775. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  776. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  777. rov_recall_rank, flow_recall_rank = remove_duplicate(
  778. rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
  779. )
  780. rank_result = []
  781. # add_flow_pool_recall_log
  782. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank, 'flow_recall_rank': flow_recall_rank}
  783. # 从ROV召回池中获取top k
  784. if len(rov_recall_rank) > 0:
  785. rank_result.extend(rov_recall_rank[:top_K])
  786. rov_recall_rank = rov_recall_rank[top_K:]
  787. else:
  788. rank_result.extend(flow_recall_rank[:top_K])
  789. flow_recall_rank = flow_recall_rank[top_K:]
  790. # 按概率 p 及score排序获取 size - k 个视频
  791. flow_num = 0
  792. i = 0
  793. while i < size - top_K:
  794. # 随机生成[0, 1)浮点数
  795. rand = random.random()
  796. # add_flow_pool_recall_log
  797. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  798. flow_pool_recall_process[f'{i}_rand'] = rand
  799. # log_.info('rand: {}'.format(rand))
  800. if rand < flow_pool_P:
  801. if flow_recall_rank:
  802. rank_result.append(flow_recall_rank[0])
  803. flow_recall_rank.remove(flow_recall_rank[0])
  804. else:
  805. rank_result.extend(rov_recall_rank[:size - top_K - i])
  806. return rank_result[:size], flow_num, flow_pool_recall_process
  807. else:
  808. if rov_recall_rank:
  809. rank_result.append(rov_recall_rank[0])
  810. rov_recall_rank.remove(rov_recall_rank[0])
  811. else:
  812. rank_result.extend(flow_recall_rank[:size - top_K - i])
  813. return rank_result[:size], flow_num, flow_pool_recall_process
  814. i += 1
  815. return rank_result[:size], flow_num, flow_pool_recall_process
  816. # 排序服务兜底
  817. def sup_rank(video_scores, recall_list):
  818. if video_scores and len(recall_list) > 0:
  819. for i in range(len(video_scores)):
  820. try:
  821. if video_scores[i] is None:
  822. recall_list[i]['sort_score'] = 0.0
  823. else:
  824. video_score_str = json.loads(video_scores[i])
  825. recall_list[i]['flag_call_service'] = 0
  826. recall_list[i]['sort_score'] = video_score_str[0]
  827. except Exception:
  828. recall_list[i]['sort_score'] = 0.0
  829. rov_recall_rank = sorted(recall_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  830. #print("rov_recall_rank:", rov_recall_rank)
  831. else:
  832. rov_recall_rank = recall_list
  833. return rov_recall_rank
  834. def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  835. """
  836. 视频分发排序
  837. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  838. :param size: 请求数
  839. :param top_K: 保证topK为召回池视频 type-int
  840. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  841. :return: rank_result
  842. """
  843. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  844. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  845. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  846. return [], 0
  847. # 地域分组小时级规则更新数据
  848. recall_dict = {}
  849. region_h_recall = [item for item in data['rov_pool_recall']
  850. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  851. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  852. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  853. # 地域分组小时级更新24h规则更新数据
  854. region_24h_recall = [item for item in data['rov_pool_recall']
  855. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  856. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  857. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  858. # 相对24h规则更新数据
  859. rule_24h_recall = [item for item in data['rov_pool_recall']
  860. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  861. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  862. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  863. # 相对24h规则筛选后剩余更新数据
  864. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  865. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  866. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  867. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  868. hot_recall = []
  869. w2v_recall =[]
  870. sim_recall = []
  871. u2u2i_recall = []
  872. if ab_Code==60058:
  873. if len(data['u2i_recall'])>0:
  874. hot_recall = sorted(data['u2i_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  875. recall_dict['u2i_recall'] = hot_recall
  876. elif ab_Code==60059:
  877. if len(data['w2v_recall'])>0:
  878. recall_dict['w2v_recall'] = data['w2v_recall']
  879. else:
  880. recall_dict['w2v_recall'] = w2v_recall
  881. elif ab_Code==60061 or ab_Code==60063:
  882. if len(data['sim_recall'])>0:
  883. recall_dict['sim_recall'] = data['sim_recall']
  884. else:
  885. recall_dict['sim_recall'] = sim_recall
  886. elif ab_Code==60062:
  887. if len(data['u2u2i_recall'])>0:
  888. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  889. else:
  890. recall_dict['u2u2i_recall'] = u2u2i_recall
  891. recall_list = [('rov_recall_region_h',1, 1),('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
  892. ('u2i_recall',0.5,1), ('w2v_recall',0.5,1),('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
  893. if exp_config and exp_config['recall_list']:
  894. recall_list = exp_config['recall_list']
  895. #print("recall_config:", recall_list)
  896. rov_recall_rank = []
  897. select_ids = set('')
  898. for i in range(3):
  899. if len(rov_recall_rank)>8:
  900. break
  901. for per_recall_item in recall_list:
  902. per_recall_name = per_recall_item[0]
  903. per_recall_freq = per_recall_item[1]
  904. per_limt_num = per_recall_item[2]
  905. rand_num = random.random()
  906. #print(recall_dict[per_recall_name])
  907. if rand_num<per_recall_freq and per_recall_name in recall_dict:
  908. per_recall = recall_dict[per_recall_name]
  909. #print("per_recall_item:", per_recall_item)
  910. cur_recall_num = 0
  911. for recall_item in per_recall:
  912. vid = recall_item['videoId']
  913. if vid in select_ids:
  914. continue
  915. rov_recall_rank.append(recall_item)
  916. select_ids.add(vid)
  917. cur_recall_num+=1
  918. if cur_recall_num>=per_limt_num:
  919. break
  920. # print("rov_recall_rank:")
  921. # print(rov_recall_rank)
  922. #rov_recall_rank = region_h_recall_rank + region_24h_recall_rank + \
  923. # rule_24h_recall_rank + rule_24h_dup_recall_rank
  924. # 流量池
  925. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  926. # 对各路召回的视频进行去重
  927. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  928. top_K=top_K)
  929. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  930. # rov_recall_rank, flow_recall_rank))
  931. # rank_result = relevant_recall_rank
  932. rank_result = []
  933. # 从ROV召回池中获取top k
  934. if len(rov_recall_rank) > 0:
  935. rank_result.extend(rov_recall_rank[:top_K])
  936. rov_recall_rank = rov_recall_rank[top_K:]
  937. else:
  938. rank_result.extend(flow_recall_rank[:top_K])
  939. flow_recall_rank = flow_recall_rank[top_K:]
  940. flow_num = 0
  941. flowConfig =0
  942. if exp_config and exp_config['flowConfig']:
  943. flowConfig = exp_config['flowConfig']
  944. if flowConfig == 1 and len(rov_recall_rank) > 0:
  945. rank_result.extend(rov_recall_rank[:top_K])
  946. for recall_item in rank_result:
  947. flow_recall_name = recall_item.get("flowPool", '')
  948. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  949. flow_num = flow_num + 1
  950. all_recall_rank = rov_recall_rank + flow_recall_rank
  951. if flow_num > 0:
  952. rank_result.extend(all_recall_rank[:size - top_K])
  953. return rank_result[:size], flow_num
  954. else:
  955. # 按概率 p 及score排序获取 size - k 个视频
  956. i = 0
  957. while i < size - top_K:
  958. # 随机生成[0, 1)浮点数
  959. rand = random.random()
  960. # log_.info('rand: {}'.format(rand))
  961. if rand < flow_pool_P:
  962. if flow_recall_rank:
  963. rank_result.append(flow_recall_rank[0])
  964. flow_recall_rank.remove(flow_recall_rank[0])
  965. else:
  966. rank_result.extend(rov_recall_rank[:size - top_K - i])
  967. return rank_result[:size], flow_num
  968. else:
  969. if rov_recall_rank:
  970. rank_result.append(rov_recall_rank[0])
  971. rov_recall_rank.remove(rov_recall_rank[0])
  972. else:
  973. rank_result.extend(flow_recall_rank[:size - top_K - i])
  974. return rank_result[:size], flow_num
  975. i += 1
  976. else:
  977. # 按概率 p 及score排序获取 size - k 个视频
  978. i = 0
  979. while i < size - top_K:
  980. # 随机生成[0, 1)浮点数
  981. rand = random.random()
  982. # log_.info('rand: {}'.format(rand))
  983. if rand < flow_pool_P:
  984. if flow_recall_rank:
  985. rank_result.append(flow_recall_rank[0])
  986. flow_recall_rank.remove(flow_recall_rank[0])
  987. else:
  988. rank_result.extend(rov_recall_rank[:size - top_K - i])
  989. return rank_result[:size], flow_num
  990. else:
  991. if rov_recall_rank:
  992. rank_result.append(rov_recall_rank[0])
  993. rov_recall_rank.remove(rov_recall_rank[0])
  994. else:
  995. rank_result.extend(flow_recall_rank[:size - top_K - i])
  996. return rank_result[:size],flow_num
  997. i += 1
  998. return rank_result[:size], flow_num
  999. def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  1000. """
  1001. 视频分发排序
  1002. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  1003. :param size: 请求数
  1004. :param top_K: 保证topK为召回池视频 type-int
  1005. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  1006. :return: rank_result
  1007. """
  1008. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  1009. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  1010. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  1011. return [], 0
  1012. # 地域分组小时级规则更新数据
  1013. recall_dict = {}
  1014. region_h_recall = [item for item in data['rov_pool_recall']
  1015. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  1016. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1017. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  1018. # 地域分组小时级更新24h规则更新数据
  1019. region_24h_recall = [item for item in data['rov_pool_recall']
  1020. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  1021. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1022. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  1023. # 相对24h规则更新数据
  1024. rule_24h_recall = [item for item in data['rov_pool_recall']
  1025. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  1026. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1027. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  1028. # 相对24h规则筛选后剩余更新数据
  1029. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  1030. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  1031. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1032. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  1033. u2i_recall = []
  1034. u2i_play_recall = []
  1035. w2v_recall =[]
  1036. sim_recall = []
  1037. u2u2i_recall = []
  1038. return_video_recall = []
  1039. #print("")
  1040. if ab_Code==60058:
  1041. if len(data['u2i_recall'])>0:
  1042. recall_dict['u2i_recall'] = data['u2i_recall']
  1043. else:
  1044. recall_dict['u2i_recall'] = u2i_recall
  1045. if len(data['u2i_play_recall']) > 0:
  1046. recall_dict['u2i_play_recall'] = data['u2i_play_recall']
  1047. else:
  1048. recall_dict['u2i_play_recall'] = u2i_play_recall
  1049. elif ab_Code==60059:
  1050. if len(data['w2v_recall'])>0:
  1051. recall_dict['w2v_recall'] = data['w2v_recall']
  1052. else:
  1053. recall_dict['w2v_recall'] = w2v_recall
  1054. elif ab_Code==60061 or ab_Code==60063:
  1055. if len(data['sim_recall'])>0:
  1056. recall_dict['sim_recall'] = data['sim_recall']
  1057. else:
  1058. recall_dict['sim_recall'] = sim_recall
  1059. elif ab_Code==60062:
  1060. if len(data['u2u2i_recall'])>0:
  1061. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  1062. else:
  1063. recall_dict['u2u2i_recall'] = u2u2i_recall
  1064. elif ab_Code==60064:
  1065. if len(data['return_video_recall'])>0:
  1066. recall_dict['return_video_recall'] = data['return_video_recall']
  1067. else:
  1068. recall_dict['return_video_recall'] = return_video_recall
  1069. recall_pos1 = [('rov_recall_region_h',0, 0.98),('rov_recall_24h',0.98, 1),('rov_recall_region_24h',0,1),
  1070. ('rov_recall_24h',0,1), ('rov_recall_24h_dup',0,1)]
  1071. recall_pos2 = [('rov_recall_region_h',0,0.98),('rov_recall_24h',0.98,1),('rov_recall_region_24h',0,1),
  1072. ('rov_recall_24h',0,1),('rov_recall_24h_dup',0,1)]
  1073. recall_pos3 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0.98,1), ('rov_recall_region_24h', 0,1),
  1074. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1075. recall_pos4 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0,0.02), ('rov_recall_region_24h', 0,1),
  1076. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1077. if exp_config and 'recall_pos1' in exp_config \
  1078. and 'recall_pos2' in exp_config \
  1079. and 'recall_pos3' in exp_config \
  1080. and 'recall_pos4' in exp_config :
  1081. recall_pos1 = exp_config['recall_pos1']
  1082. recall_pos2 = exp_config['recall_pos2']
  1083. recall_pos3 = exp_config['recall_pos3']
  1084. recall_pos4 = exp_config['recall_pos4']
  1085. #print("recall_config:", recall_pos1)
  1086. rov_recall_rank = []
  1087. recall_list = []
  1088. recall_list.append(recall_pos1)
  1089. recall_list.append(recall_pos2)
  1090. recall_list.append(recall_pos3)
  1091. recall_list.append(recall_pos4)
  1092. select_ids = set('')
  1093. recall_num_limit_dict = {}
  1094. if exp_config and 'recall_num_limit' in exp_config:
  1095. recall_num_limit_dict = exp_config['recall_num_limit']
  1096. exp_recall_dict = {}
  1097. #index_pos = 0
  1098. for j in range(3):
  1099. if len(rov_recall_rank)>12:
  1100. break
  1101. # choose pos
  1102. for recall_pos_config in recall_list:
  1103. rand_num = random.random()
  1104. index_pos = 0
  1105. # choose pos recall
  1106. for per_recall_item in recall_pos_config:
  1107. if index_pos == 1:
  1108. break
  1109. if len(per_recall_item)<3:
  1110. continue
  1111. per_recall_name = per_recall_item[0]
  1112. per_recall_min = float(per_recall_item[1])
  1113. per_recall_max = float(per_recall_item[2])
  1114. per_recall_num = exp_recall_dict.get(per_recall_name, 0)
  1115. per_recall_total_num = recall_num_limit_dict.get(per_recall_name, 0)
  1116. # recall set total num
  1117. if len(recall_num_limit_dict)>0 and per_recall_total_num>0 and per_recall_num>= per_recall_total_num:
  1118. continue
  1119. if rand_num >= per_recall_min and rand_num < per_recall_max and per_recall_name in recall_dict:
  1120. per_recall = recall_dict[per_recall_name]
  1121. for recall_item in per_recall:
  1122. vid = recall_item['videoId']
  1123. if vid in select_ids:
  1124. continue
  1125. recall_item['rand'] = rand_num
  1126. rov_recall_rank.append(recall_item)
  1127. select_ids.add(vid)
  1128. if per_recall_name in exp_recall_dict:
  1129. exp_recall_dict[per_recall_name] +=1
  1130. else:
  1131. exp_recall_dict[per_recall_name] = 1
  1132. index_pos = 1
  1133. break
  1134. #print("rov_recall_rank:", rov_recall_rank)
  1135. if len(rov_recall_rank)<4:
  1136. rov_doudi_rank = region_h_recall_rank + sim_recall + u2i_recall + u2u2i_recall + w2v_recall +return_video_recall+u2i_play_recall+ region_24h_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank
  1137. for recall_item in rov_doudi_rank:
  1138. vid = recall_item['videoId']
  1139. if vid in select_ids:
  1140. continue
  1141. rov_recall_rank.append(recall_item)
  1142. select_ids.add(vid)
  1143. if len(rov_recall_rank)>12:
  1144. break
  1145. # print("rov_recall_rank:")
  1146. #print(rov_recall_rank)
  1147. # 流量池
  1148. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  1149. # 对各路召回的视频进行去重
  1150. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  1151. top_K=top_K)
  1152. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  1153. # rov_recall_rank, flow_recall_rank))
  1154. # rank_result = relevant_recall_rank
  1155. rank_result = []
  1156. # 从ROV召回池中获取top k
  1157. if len(rov_recall_rank) > 0:
  1158. rank_result.extend(rov_recall_rank[:top_K])
  1159. rov_recall_rank = rov_recall_rank[top_K:]
  1160. else:
  1161. rank_result.extend(flow_recall_rank[:top_K])
  1162. flow_recall_rank = flow_recall_rank[top_K:]
  1163. flow_num = 0
  1164. flowConfig =0
  1165. if exp_config and exp_config['flowConfig']:
  1166. flowConfig = exp_config['flowConfig']
  1167. if flowConfig == 1 and len(rov_recall_rank) > 0:
  1168. rank_result.extend(rov_recall_rank[:top_K])
  1169. for recall_item in rank_result:
  1170. flow_recall_name = recall_item.get("flowPool", '')
  1171. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  1172. flow_num = flow_num + 1
  1173. all_recall_rank = rov_recall_rank + flow_recall_rank
  1174. if flow_num > 0:
  1175. rank_result.extend(all_recall_rank[:size - top_K])
  1176. return rank_result[:size], flow_num
  1177. else:
  1178. # 按概率 p 及score排序获取 size - k 个视频
  1179. i = 0
  1180. while i < size - top_K:
  1181. # 随机生成[0, 1)浮点数
  1182. rand = random.random()
  1183. # log_.info('rand: {}'.format(rand))
  1184. if rand < flow_pool_P:
  1185. if flow_recall_rank:
  1186. rank_result.append(flow_recall_rank[0])
  1187. flow_recall_rank.remove(flow_recall_rank[0])
  1188. else:
  1189. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1190. return rank_result[:size], flow_num
  1191. else:
  1192. if rov_recall_rank:
  1193. rank_result.append(rov_recall_rank[0])
  1194. rov_recall_rank.remove(rov_recall_rank[0])
  1195. else:
  1196. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1197. return rank_result[:size], flow_num
  1198. i += 1
  1199. else:
  1200. # 按概率 p 及score排序获取 size - k 个视频
  1201. i = 0
  1202. while i < size - top_K:
  1203. # 随机生成[0, 1)浮点数
  1204. rand = random.random()
  1205. # log_.info('rand: {}'.format(rand))
  1206. if rand < flow_pool_P:
  1207. if flow_recall_rank:
  1208. rank_result.append(flow_recall_rank[0])
  1209. flow_recall_rank.remove(flow_recall_rank[0])
  1210. else:
  1211. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1212. return rank_result[:size], flow_num
  1213. else:
  1214. if rov_recall_rank:
  1215. rank_result.append(rov_recall_rank[0])
  1216. rov_recall_rank.remove(rov_recall_rank[0])
  1217. else:
  1218. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1219. return rank_result[:size],flow_num
  1220. i += 1
  1221. return rank_result[:size], flow_num
  1222. if __name__ == '__main__':
  1223. d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1224. {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1225. {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1226. {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1227. {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1228. {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1229. {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1230. {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1231. {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1232. {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
  1233. res = video_rank_by_w_h_rate(videos=d_test)
  1234. for tmp in res:
  1235. print(tmp)