video_rank.py 70 KB


  1. import copy
  2. import json
  3. import random
  4. import numpy
  5. from typing import Dict
  6. from typing import Set
  7. from log import Log
  8. from config import set_config
  9. from video_recall import PoolRecall
  10. from db_helper import RedisHelper
  11. from utils import FilterVideos, send_msg_to_feishu
  12. from rank_service import get_featurs, get_tf_serving_sores
  13. from parameter_update import param_update_rule
  14. log_ = Log()
  15. config_ = set_config()
  16. def video_rank(data, size, top_K, flow_pool_P, flow_pool_recall_process=None):
  17. """
  18. 视频分发排序
  19. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  20. :param size: 请求数
  21. :param top_K: 保证topK为召回池视频 type-int
  22. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  23. :return: rank_result
  24. """
  25. # add_flow_pool_recall_log
  26. if flow_pool_recall_process is None:
  27. flow_pool_recall_process = {}
  28. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  29. # add_flow_pool_recall_log
  30. return [], flow_pool_recall_process
  31. # return []
  32. # 将各路召回的视频按照score从大到小排序
  33. # 最惊奇相关推荐相似视频
  34. # relevant_recall = [item for item in data['rov_pool_recall']
  35. # if item.get('pushFrom') == config_.PUSH_FROM['top_video_relevant_appType_19']]
  36. # relevant_recall_rank = sorted(relevant_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  37. # 最惊奇完整影视视频
  38. # whole_movies_recall = [item for item in data['rov_pool_recall']
  39. # if item.get('pushFrom') == config_.PUSH_FROM['whole_movies']]
  40. # whole_movies_recall_rank = sorted(whole_movies_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  41. # 最惊奇影视解说视频
  42. # talk_videos_recall = [item for item in data['rov_pool_recall']
  43. # if item.get('pushFrom') == config_.PUSH_FROM['talk_videos']]
  44. # talk_videos_recall_rank = sorted(talk_videos_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  45. # 小时级更新数据
  46. # h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
  47. # h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  48. # 相对30天天级规则更新数据
  49. day_30_recall = [item for item in data['rov_pool_recall']
  50. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_30day']]
  51. day_30_recall_rank = sorted(day_30_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  52. # 地域分组小时级规则更新数据
  53. region_h_recall = [item for item in data['rov_pool_recall']
  54. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  55. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  56. # 不分地域小时级规则更新数据
  57. rule_h_recall = [item for item in data['rov_pool_recall']
  58. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h_h']]
  59. rule_h_recall_rank = sorted(rule_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  60. # 地域分组小时级更新24h规则更新数据
  61. region_24h_recall = [item for item in data['rov_pool_recall']
  62. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  63. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  64. # 地域分组天级规则更新数据
  65. # region_day_recall = [item for item in data['rov_pool_recall']
  66. # if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
  67. # region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  68. # 相对24h规则更新数据
  69. rule_24h_recall = [item for item in data['rov_pool_recall']
  70. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  71. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  72. # 相对24h规则筛选后剩余更新数据
  73. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  74. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  75. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  76. # 相对48h规则更新数据
  77. rule_48h_recall = [item for item in data['rov_pool_recall']
  78. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h']]
  79. rule_48h_recall_rank = sorted(rule_48h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  80. # 相对48h规则筛选后剩余更新数据
  81. rule_48h_dup_recall = [item for item in data['rov_pool_recall']
  82. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h_dup']]
  83. rule_48h_dup_recall_rank = sorted(rule_48h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  84. # 天级规则更新数据
  85. # day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
  86. # day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  87. # ROV召回池
  88. # rov_initial_recall = [
  89. # item for item in data['rov_pool_recall']
  90. # if item.get('pushFrom') not in
  91. # [config_.PUSH_FROM['top_video_relevant_appType_19'],
  92. # config_.PUSH_FROM['rov_recall_h'],
  93. # config_.PUSH_FROM['rov_recall_region_h'],
  94. # config_.PUSH_FROM['rov_recall_region_24h'],
  95. # config_.PUSH_FROM['rov_recall_region_day'],
  96. # config_.PUSH_FROM['rov_recall_24h'],
  97. # config_.PUSH_FROM['rov_recall_24h_dup'],
  98. # config_.PUSH_FROM['rov_recall_48h'],
  99. # config_.PUSH_FROM['rov_recall_48h_dup'],
  100. # config_.PUSH_FROM['rov_recall_day'],
  101. # config_.PUSH_FROM['whole_movies'],
  102. # config_.PUSH_FROM['talk_videos']]
  103. # ]
  104. # rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  105. # rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
  106. # day_30_recall_rank + region_h_recall_rank + region_24h_recall_rank + \
  107. # region_day_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  108. # rule_48h_recall_rank + rule_48h_dup_recall_rank + \
  109. # day_recall_rank + rov_initial_recall_rank
  110. rov_recall_rank = day_30_recall_rank + \
  111. region_h_recall_rank + rule_h_recall_rank + region_24h_recall_rank + \
  112. rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  113. rule_48h_recall_rank + rule_48h_dup_recall_rank
  114. # 流量池
  115. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  116. # 对各路召回的视频进行去重
  117. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  118. top_K=top_K)
  119. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  120. # rov_recall_rank, flow_recall_rank))
  121. # rank_result = relevant_recall_rank
  122. rank_result = []
  123. # add_flow_pool_recall_log
  124. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
  125. 'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
  126. # 从ROV召回池中获取top k
  127. if len(rov_recall_rank) > 0:
  128. rank_result.extend(rov_recall_rank[:top_K])
  129. rov_recall_rank = rov_recall_rank[top_K:]
  130. else:
  131. rank_result.extend(flow_recall_rank[:top_K])
  132. flow_recall_rank = flow_recall_rank[top_K:]
  133. # 按概率 p 及score排序获取 size - k 个视频
  134. i = 0
  135. while i < size - top_K:
  136. # 随机生成[0, 1)浮点数
  137. rand = random.random()
  138. # add_flow_pool_recall_log
  139. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  140. flow_pool_recall_process[f'{i}_rand'] = rand
  141. # log_.info('rand: {}'.format(rand))
  142. if rand < flow_pool_P:
  143. if flow_recall_rank:
  144. rank_result.append(flow_recall_rank[0])
  145. flow_recall_rank.remove(flow_recall_rank[0])
  146. else:
  147. rank_result.extend(rov_recall_rank[:size - top_K - i])
  148. return rank_result[:size], flow_pool_recall_process
  149. else:
  150. if rov_recall_rank:
  151. rank_result.append(rov_recall_rank[0])
  152. rov_recall_rank.remove(rov_recall_rank[0])
  153. else:
  154. rank_result.extend(flow_recall_rank[:size - top_K - i])
  155. return rank_result[:size], flow_pool_recall_process
  156. i += 1
  157. return rank_result[:size], flow_pool_recall_process
  158. def video_new_rank(videoIds, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  159. """
  160. 视频分发排序
  161. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  162. :param size: 请求数
  163. :param top_K: 保证topK为召回池视频 type-int
  164. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  165. :return: rank_result
  166. """
  167. add_flow_set = set('')
  168. if not videoIds or len(videoIds)==0:
  169. return [], add_flow_set
  170. redisObj = RedisHelper()
  171. vidKeys = []
  172. for vid in videoIds:
  173. vidKeys.append("k_p:"+str(vid))
  174. #print("vidKeys:", vidKeys)
  175. video_scores = redisObj.get_batch_key(vidKeys)
  176. #print(video_scores)
  177. video_items = []
  178. for i in range(len(video_scores)):
  179. try:
  180. #print(video_scores[i])
  181. if video_scores[i] is None:
  182. video_items.append((videoIds[i], 0.0))
  183. else:
  184. video_score_str = json.loads(video_scores[i])
  185. #print("video_score_str:",video_score_str)
  186. video_items.append((videoIds[i], video_score_str[0]))
  187. except Exception:
  188. video_items.append((videoIds[i], 0.0))
  189. sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
  190. #print("sort_items:", sort_items)
  191. rov_recall_rank = sort_items
  192. fast_flow_recall_rank = []
  193. flow_recall_rank = []
  194. for item in sort_items:
  195. if item[0] in fast_flow_set:
  196. fast_flow_recall_rank.append(item)
  197. elif item[0] in flow_set:
  198. flow_recall_rank.append(item)
  199. # all flow result
  200. all_flow_recall_rank = fast_flow_recall_rank+flow_recall_rank
  201. rank_result = []
  202. rank_set = set('')
  203. # 从ROV召回池中获取top k
  204. if len(rov_recall_rank) > 0:
  205. rank_result.extend(rov_recall_rank[:top_K])
  206. rov_recall_rank = rov_recall_rank[top_K:]
  207. else:
  208. rank_result.extend(all_flow_recall_rank[:top_K])
  209. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  210. for rank_item in rank_result:
  211. rank_set.add(rank_item[0])
  212. #print("rank_result:", rank_result)
  213. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  214. i = 0
  215. left_quato = size - top_K
  216. j = 0
  217. jj = 0
  218. while i < left_quato and (j<len(all_flow_recall_rank) or jj<len(rov_recall_rank)):
  219. # 随机生成[0, 1)浮点数
  220. rand = random.random()
  221. # log_.info('rand: {}'.format(rand))
  222. if rand < flow_pool_P:
  223. for flow_item in all_flow_recall_rank:
  224. j+=1
  225. if flow_item[0] in rank_set:
  226. continue
  227. else:
  228. rank_result.append(flow_item)
  229. rank_set.add(flow_item[0])
  230. add_flow_set.add(flow_item[0])
  231. i += 1
  232. if i>= left_quato:
  233. break
  234. else:
  235. for recall_item in rov_recall_rank:
  236. jj+=1
  237. if recall_item[0] in rank_set:
  238. continue
  239. else:
  240. rank_result.append(recall_item)
  241. rank_set.add(recall_item[0])
  242. i += 1
  243. if i>= left_quato:
  244. break
  245. #print("rank_result:", rank_result)
  246. #print("add_flow_set:", add_flow_set)
  247. return rank_result[:size], add_flow_set
  248. def refactor_video_rank(rov_recall_rank, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  249. """
  250. 视频分发排序
  251. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  252. :param size: 请求数
  253. :param top_K: 保证topK为召回池视频 type-int
  254. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  255. :return: rank_result
  256. """
  257. if not rov_recall_rank or len(rov_recall_rank) == 0:
  258. return []
  259. fast_flow_recall_rank = []
  260. flow_recall_rank = []
  261. for item in rov_recall_rank:
  262. vid = item.get('videoId', 0)
  263. #print(item)
  264. if vid in fast_flow_set:
  265. fast_flow_recall_rank.append(item)
  266. elif vid in flow_set:
  267. flow_recall_rank.append(item)
  268. # all flow result
  269. all_flow_recall_rank = fast_flow_recall_rank + flow_recall_rank
  270. rank_result = []
  271. rank_set = set('')
  272. # 从ROV召回池中获取top k
  273. if len(rov_recall_rank) > 0:
  274. rank_result.extend(rov_recall_rank[:top_K])
  275. rov_recall_rank = rov_recall_rank[top_K:]
  276. else:
  277. rank_result.extend(all_flow_recall_rank[:top_K])
  278. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  279. #已存放了多少VID
  280. for rank_item in rank_result:
  281. rank_set.add(rank_item.get('videoId', 0))
  282. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  283. i = 0
  284. while i < size - top_K:
  285. # 随机生成[0, 1)浮点数
  286. rand = random.random()
  287. # log_.info('rand: {}'.format(rand))
  288. if rand < flow_pool_P:
  289. for flow_item in all_flow_recall_rank:
  290. flow_vid = flow_item.get('videoId', 0)
  291. if flow_vid in rank_set:
  292. continue
  293. else:
  294. rank_result.append(flow_item)
  295. rank_set.add(flow_vid)
  296. else:
  297. for recall_item in rov_recall_rank:
  298. flow_vid = recall_item.get('videoId', 0)
  299. if flow_vid in rank_set:
  300. continue
  301. else:
  302. rank_result.append(recall_item)
  303. rank_set.add(flow_vid)
  304. i += 1
  305. return rank_result[:size]
  306. def remove_duplicate(rov_recall, flow_recall, top_K):
  307. """
  308. 对多路召回的视频去重
  309. 去重原则:
  310. 如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
  311. :param rov_recall: ROV召回池-已排序
  312. :param flow_recall: 流量池-已排序
  313. :param top_K: 保证topK为召回池视频 type-int
  314. :return:
  315. """
  316. flow_recall_result = []
  317. rov_recall_remove = []
  318. flow_recall_video_ids = [item['videoId'] for item in flow_recall]
  319. # rov_recall topK
  320. for item in rov_recall[:top_K]:
  321. if item['videoId'] in flow_recall_video_ids:
  322. flow_recall_video_ids.remove(item['videoId'])
  323. # other
  324. for item in rov_recall[top_K:]:
  325. if item['videoId'] in flow_recall_video_ids:
  326. rov_recall_remove.append(item)
  327. # rov recall remove
  328. for item in rov_recall_remove:
  329. rov_recall.remove(item)
  330. # flow recall remove
  331. for item in flow_recall:
  332. if item['videoId'] in flow_recall_video_ids:
  333. flow_recall_result.append(item)
  334. return rov_recall, flow_recall_result
  335. def bottom_strategy(request_id, size, app_type, ab_code, params):
  336. """
  337. 兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
  338. :param request_id: request_id
  339. :param size: 需要获取的视频数
  340. :param app_type: 产品标识 type-int
  341. :param ab_code: abCode
  342. :param params:
  343. :return:
  344. """
  345. pool_recall = PoolRecall(request_id=request_id, app_type=app_type, ab_code=ab_code)
  346. key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
  347. redis_helper = RedisHelper(params=params)
  348. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
  349. if not data:
  350. log_.info('{} —— ROV推荐进入了二次兜底, data = {}'.format(config_.ENV_TEXT, data))
  351. send_msg_to_feishu('{} —— ROV推荐进入了二次兜底,请查看是否有数据更新失败问题。'.format(config_.ENV_TEXT))
  352. # 二次兜底
  353. bottom_data = bottom_strategy_last(size=size, app_type=app_type, ab_code=ab_code, params=params)
  354. return bottom_data
  355. # 视频状态过滤采用离线定时过滤方案
  356. # 状态过滤
  357. # filter_videos = FilterVideos(app_type=app_type, video_ids=data)
  358. # filtered_data = filter_videos.filter_video_status(video_ids=data)
  359. if len(data) > size:
  360. random_data = numpy.random.choice(data, size, False)
  361. else:
  362. random_data = data
  363. bottom_data = [{'videoId': int(item), 'pushFrom': config_.PUSH_FROM['bottom'], 'abCode': ab_code}
  364. for item in random_data]
  365. return bottom_data
  366. def bottom_strategy_last(size, app_type, ab_code, params):
  367. """
  368. 兜底策略: 从兜底视频中随机获取视频,进行状态过滤后的视频
  369. :param size: 需要获取的视频数
  370. :param app_type: 产品标识 type-int
  371. :param ab_code: abCode
  372. :param params:
  373. :return:
  374. """
  375. redis_helper = RedisHelper(params=params)
  376. bottom_data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  377. random_data = numpy.random.choice(bottom_data, size * 30, False)
  378. # 视频状态过滤采用离线定时过滤方案
  379. # 状态过滤
  380. # filter_videos = FilterVideos(app_type=app_type, video_ids=random_data)
  381. # filtered_data = filter_videos.filter_video_status(video_ids=random_data)
  382. bottom_data = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom_last'], 'abCode': ab_code}
  383. for video_id in random_data[:size]]
  384. return bottom_data
  385. def bottom_strategy2(size, app_type, mid, uid, ab_code, client_info, params):
  386. """
  387. 兜底策略: 从兜底视频中随机获取视频,进行过滤后的视频
  388. :param size: 需要获取的视频数
  389. :param app_type: 产品标识 type-int
  390. :param mid: mid
  391. :param uid: uid
  392. :param ab_code: abCode
  393. :param client_info: 地域信息
  394. :param params:
  395. :return:
  396. """
  397. # 获取存在城市分组数据的城市编码列表
  398. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  399. # 获取provinceCode
  400. province_code = client_info.get('provinceCode', '-1')
  401. # 获取cityCode
  402. city_code = client_info.get('cityCode', '-1')
  403. if city_code in city_code_list:
  404. # 分城市数据存在时,获取城市分组数据
  405. region_code = city_code
  406. else:
  407. region_code = province_code
  408. if region_code == '':
  409. region_code = '-1'
  410. redis_helper = RedisHelper(params=params)
  411. bottom_data = redis_helper.get_data_from_set(key_name=config_.BOTTOM2_KEY_NAME)
  412. bottom_result = []
  413. if bottom_data is None:
  414. return bottom_result
  415. if len(bottom_data) > 0:
  416. try:
  417. random_data = numpy.random.choice(bottom_data, size * 5, False)
  418. except Exception as e:
  419. random_data = bottom_data
  420. video_ids = [int(item) for item in random_data]
  421. # 过滤
  422. filter_ = FilterVideos(request_id=params.request_id, app_type=app_type, mid=mid, uid=uid, video_ids=video_ids)
  423. filtered_data = filter_.filter_videos(pool_type='flow', region_code=region_code)
  424. if filtered_data:
  425. bottom_result = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom2'], 'abCode': ab_code}
  426. for video_id in filtered_data[:size]]
  427. return bottom_result
  428. def video_rank_by_w_h_rate(videos):
  429. """
  430. 视频宽高比实验(每组的前两个视频调整为横屏视频),根据视频宽高比信息对视频进行重排
  431. :param videos:
  432. :return:
  433. """
  434. redis_helper = RedisHelper()
  435. # ##### 判断前两个视频是否是置顶视频 或者 流量池视频
  436. top_2_push_from_flag = [False, False]
  437. for i, video in enumerate(videos[:2]):
  438. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  439. top_2_push_from_flag[i] = True
  440. if top_2_push_from_flag[0] and top_2_push_from_flag[1]:
  441. return videos
  442. # ##### 判断前两个视频是否为横屏
  443. top_2_w_h_rate_flag = [False, False]
  444. for i, video in enumerate(videos[:2]):
  445. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  446. # 视频来源为置顶 或 流量池时,不做判断
  447. top_2_w_h_rate_flag[i] = True
  448. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  449. # 视频来源为 rov召回池 或 一层兜底时,判断是否是横屏
  450. w_h_rate = redis_helper.get_score_with_value(
  451. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  452. if w_h_rate is not None:
  453. top_2_w_h_rate_flag[i] = True
  454. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  455. # 视频来源为 二层兜底时,判断是否是横屏
  456. w_h_rate = redis_helper.get_score_with_value(
  457. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  458. if w_h_rate is not None:
  459. top_2_w_h_rate_flag[i] = True
  460. if top_2_w_h_rate_flag[0] and top_2_w_h_rate_flag[1]:
  461. return videos
  462. # ##### 前两个视频中有不符合前面两者条件的,对视频进行位置调整
  463. # 记录横屏视频位置
  464. horizontal_video_index = []
  465. # 记录流量池视频位置
  466. flow_video_index = []
  467. # 记录置顶视频位置
  468. top_video_index = []
  469. for i, video in enumerate(videos):
  470. # 视频来源为置顶
  471. if video['pushFrom'] == config_.PUSH_FROM['top']:
  472. top_video_index.append(i)
  473. # 视频来源为流量池
  474. elif video['pushFrom'] == config_.PUSH_FROM['flow_recall']:
  475. flow_video_index.append(i)
  476. # 视频来源为rov召回池 或 一层兜底
  477. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  478. w_h_rate = redis_helper.get_score_with_value(
  479. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  480. if w_h_rate is not None:
  481. horizontal_video_index.append(i)
  482. else:
  483. continue
  484. # 视频来源为 二层兜底
  485. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  486. w_h_rate = redis_helper.get_score_with_value(
  487. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  488. if w_h_rate is not None:
  489. horizontal_video_index.append(i)
  490. else:
  491. continue
  492. # 重新排序
  493. top2_index = []
  494. for i in range(2):
  495. if i in top_video_index:
  496. top2_index.append(i)
  497. elif i in flow_video_index:
  498. top2_index.append(i)
  499. flow_video_index.remove(i)
  500. elif i in horizontal_video_index:
  501. top2_index.append(i)
  502. horizontal_video_index.remove(i)
  503. elif len(horizontal_video_index) > 0:
  504. # 调整横屏视频到第一位
  505. top2_index.append(horizontal_video_index[0])
  506. # 从横屏位置记录中移除
  507. horizontal_video_index.pop(0)
  508. elif i == 0:
  509. return videos
  510. # 重排
  511. flow_result = [videos[i] for i in flow_video_index]
  512. other_result = [videos[i] for i in range(len(videos)) if i not in top2_index and i not in flow_video_index]
  513. top2_result = []
  514. for i, j in enumerate(top2_index):
  515. item = videos[j]
  516. if i != j:
  517. # 修改abCode
  518. item['abCode'] = config_.AB_CODE['w_h_rate']
  519. top2_result.append(item)
  520. new_rank_result = top2_result
  521. for i in range(len(top2_index), len(videos)):
  522. if i in flow_video_index:
  523. new_rank_result.append(flow_result[0])
  524. flow_result.pop(0)
  525. else:
  526. new_rank_result.append(other_result[0])
  527. other_result.pop(0)
  528. return new_rank_result
  529. def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_video_index=2):
  530. """
  531. 视频分发排序 - 包含老视频, 老视频插入固定位置
  532. :param rank_result: 排序后的结果
  533. :param size: 请求数
  534. :param old_video_index: 老视频插入的位置索引,默认为2
  535. :return: new_rank_result
  536. """
  537. if not old_video_recall:
  538. return rank_result
  539. if not rank_result:
  540. return old_video_recall[:size]
  541. # 视频去重
  542. rank_video_ids = [item['videoId'] for item in rank_result]
  543. old_video_remove = []
  544. for old_video in old_video_recall:
  545. if old_video['videoId'] in rank_video_ids:
  546. old_video_remove.append(old_video)
  547. for item in old_video_remove:
  548. old_video_recall.remove(item)
  549. if not old_video_recall:
  550. return rank_result
  551. # 插入老视频
  552. # 随机获取一个视频
  553. ind = random.randint(0, len(old_video_recall) - 1)
  554. old_video = old_video_recall[ind]
  555. # 插入
  556. if len(rank_result) < top_K:
  557. new_rank_result = rank_result + [old_video]
  558. else:
  559. new_rank_result = rank_result[:old_video_index] + [old_video] + rank_result[old_video_index:]
  560. if len(new_rank_result) > size:
  561. # 判断后两位视频来源
  562. push_from_1 = new_rank_result[-1]['pushFrom']
  563. push_from_2 = new_rank_result[-2]['pushFrom']
  564. if push_from_2 == config_.PUSH_FROM['rov_recall'] and push_from_1 == config_.PUSH_FROM['flow_recall']:
  565. new_rank_result = new_rank_result[:-2] + new_rank_result[-1:]
  566. return new_rank_result[:size]
  567. def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=None, env_dict=None):
  568. """
  569. 视频分发排序
  570. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  571. :param size: 请求数
  572. :param top_K: 保证topK为召回池视频 type-int
  573. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  574. :return: rank_result
  575. """
  576. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  577. return [], 0
  578. #全量的是vlog,票圈精选, 334,60057,
  579. # 60054: simrecall,
  580. pre_str = "k_p2:"
  581. rov_recall_rank = data['rov_pool_recall']
  582. #print(rov_recall_rank)
  583. #call rank service
  584. #flag_call_service = 0
  585. sort_index = 0
  586. if exp_config and "sort_flag" in exp_config:
  587. sort_index = exp_config["sort_flag"]
  588. #print("sort_index:", sort_index)
  589. redisObj = RedisHelper()
  590. vidKeys = []
  591. rec_recall_item_list = []
  592. rec_recall_vid_list = []
  593. day_vidKeys = []
  594. hour_vidKeys = []
  595. pre_day_str = "v_ctr:"
  596. pre_hour_str = "v_hour_ctr:"
  597. for recall_item in data['rov_pool_recall']:
  598. try:
  599. vid = int(recall_item.get("videoId", 0))
  600. rec_recall_vid_list.append(vid)
  601. rec_recall_item_list.append(recall_item)
  602. vidKeys.append(pre_str + str(vid))
  603. day_vidKeys.append(pre_day_str+str(vid))
  604. hour_vidKeys.append(pre_hour_str+str(vid))
  605. except:
  606. continue
  607. video_scores = redisObj.get_batch_key(vidKeys)
  608. #print("video_scores:", video_scores)
  609. if (ab_code == 60066 or ab_code == 60069 or ab_code == 60070 or ab_code == 60071) and len(rec_recall_vid_list)>0:
  610. video_static_info = redisObj.get_batch_key(day_vidKeys)
  611. video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
  612. #print("env_dict:", env_dict)
  613. feature_dict = get_featurs(mid, data, size, top_K, flow_pool_P, rec_recall_vid_list,env_dict, video_static_info, video_hour_static_info)
  614. score_result = get_tf_serving_sores(feature_dict)
  615. #print("score_result:", score_result)
  616. if video_scores and len(video_scores)>0 and rec_recall_item_list and score_result and len(score_result) > 0\
  617. and len(score_result) == len(rec_recall_item_list) and len(video_scores)== len(score_result):
  618. for i in range(len(score_result)):
  619. try:
  620. if video_scores[i] is None and len(score_result[i])>0:
  621. return_score = 0.000000001
  622. # sore_index :10 = model score
  623. if sort_index == 10:
  624. total_score = score_result[i][0]
  625. else:
  626. total_score = return_score * score_result[i][0]
  627. rec_recall_item_list[i]['sort_score'] = total_score
  628. rec_recall_item_list[i]['base_rov_score'] = 0.0
  629. rec_recall_item_list[i]['share_score'] = return_score
  630. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  631. else:
  632. video_score_str = json.loads(video_scores[i])
  633. # sore_index :10 = model score
  634. return_score = 0.000000001
  635. if sort_index == 10:
  636. total_score = score_result[i][0]
  637. else:
  638. if len(video_score_str)>= sort_index and len(video_score_str)>0:
  639. return_score = video_score_str[sort_index]
  640. total_score = return_score * score_result[i][0]
  641. #print("total_score:", total_score, " model score :", score_result[i][0], "return_score:",
  642. # return_score)
  643. rec_recall_item_list[i]['sort_score'] = total_score
  644. rec_recall_item_list[i]['base_rov_score'] = video_score_str[0]
  645. rec_recall_item_list[i]['share_score'] = return_score
  646. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  647. except Exception as e:
  648. #print('exception: {}:', e)
  649. return_score = 0.000000001
  650. if sort_index == 10:
  651. total_score = 0.00000001
  652. else:
  653. total_score = return_score * 0.00000001
  654. rec_recall_item_list[i]['sort_score'] = total_score
  655. rec_recall_item_list[i]['base_rov_score'] = 0
  656. rec_recall_item_list[i]['share_score'] = return_score
  657. rec_recall_item_list[i]['model_score'] = 0.00000001
  658. rec_recall_item_list[i]['flag_call_service'] = 1
  659. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  660. else:
  661. rov_recall_rank = sup_rank(video_scores, rec_recall_item_list)
  662. else:
  663. if video_scores and len(rec_recall_item_list) > 0 and len(video_scores)>0:
  664. for i in range(len(video_scores)):
  665. try:
  666. if video_scores[i] is None:
  667. rec_recall_item_list[i]['sort_score'] = 0.0
  668. else:
  669. video_score_str = json.loads(video_scores[i])
  670. # print("video_score_str:", video_score_str)
  671. rec_recall_item_list[i]['sort_score'] = video_score_str[0]
  672. except Exception:
  673. rec_recall_item_list[i]['sort_score'] = 0.0
  674. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  675. #print(rov_recall_rank)
  676. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  677. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  678. top_K=top_K)
  679. rank_result = []
  680. rank_set = set('')
  681. # 从ROV召回池中获取top k
  682. if len(rov_recall_rank) > 0:
  683. rank_result.extend(rov_recall_rank[:top_K])
  684. rov_recall_rank = rov_recall_rank[top_K:]
  685. else:
  686. rank_result.extend(flow_recall_rank[:top_K])
  687. flow_recall_rank = flow_recall_rank[top_K:]
  688. # 按概率 p 及score排序获取 size - k 个视频
  689. flow_num = 0
  690. flowConfig = 0
  691. # 本段代码控制流量池,通过实验传参,现不动
  692. if flowConfig == 1 and len(rov_recall_rank) > 0:
  693. for recall_item in rank_result:
  694. flow_recall_name = recall_item.get("flowPool", '')
  695. flow_num = flow_num + 1
  696. all_recall_rank = rov_recall_rank + flow_recall_rank
  697. if flow_num > 0:
  698. rank_result.extend(all_recall_rank[:size - top_K])
  699. return rank_result, flow_num
  700. else:
  701. i = 0
  702. while i < size - top_K:
  703. # 随机生成[0, 1)浮点数
  704. rand = random.random()
  705. # log_.info('rand: {}'.format(rand))
  706. if rand < flow_pool_P:
  707. if flow_recall_rank:
  708. rank_result.append(flow_recall_rank[0])
  709. flow_recall_rank.remove(flow_recall_rank[0])
  710. else:
  711. rank_result.extend(rov_recall_rank[:size - top_K - i])
  712. return rank_result[:size], flow_num
  713. else:
  714. if rov_recall_rank:
  715. rank_result.append(rov_recall_rank[0])
  716. rov_recall_rank.remove(rov_recall_rank[0])
  717. else:
  718. rank_result.extend(flow_recall_rank[:size - top_K - i])
  719. return rank_result[:size], flow_num
  720. i += 1
  721. else:
  722. i = 0
  723. while i < size - top_K:
  724. # 随机生成[0, 1)浮点数
  725. rand = random.random()
  726. # log_.info('rand: {}'.format(rand))
  727. if rand < flow_pool_P:
  728. if flow_recall_rank:
  729. rank_result.append(flow_recall_rank[0])
  730. flow_recall_rank.remove(flow_recall_rank[0])
  731. else:
  732. rank_result.extend(rov_recall_rank[:size - top_K - i])
  733. return rank_result[:size], flow_num
  734. else:
  735. if rov_recall_rank:
  736. rank_result.append(rov_recall_rank[0])
  737. rov_recall_rank.remove(rov_recall_rank[0])
  738. else:
  739. rank_result.extend(flow_recall_rank[:size - top_K - i])
  740. return rank_result[:size], flow_num
  741. i += 1
  742. return rank_result[:size], flow_num
  743. def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:', flow_pool_recall_process=None):
  744. """
  745. 视频分发排序
  746. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  747. :param size: 请求数
  748. :param top_K: 保证topK为召回池视频 type-int
  749. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  750. :param rank_key_prefix:
  751. :return: rank_result
  752. """
  753. redis_helper = RedisHelper()
  754. if flow_pool_recall_process is None:
  755. flow_pool_recall_process = {}
  756. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  757. return [], 0, flow_pool_recall_process
  758. rov_recall_rank = data['rov_pool_recall']
  759. vid_keys = []
  760. rec_recall_item_list = []
  761. rec_recall_vid_list = []
  762. for recall_item in data['rov_pool_recall']:
  763. try:
  764. vid = int(recall_item.get("videoId", 0))
  765. rec_recall_vid_list.append(vid)
  766. rec_recall_item_list.append(recall_item)
  767. vid_keys.append(f"{rank_key_prefix}{vid}")
  768. except:
  769. continue
  770. video_scores = redis_helper.get_batch_key(vid_keys)
  771. if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
  772. for i in range(len(video_scores)):
  773. try:
  774. if video_scores[i] is None:
  775. rec_recall_item_list[i]['sort_score'] = 0.0
  776. else:
  777. rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
  778. except Exception:
  779. rec_recall_item_list[i]['sort_score'] = 0.0
  780. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  781. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  782. rov_recall_rank, flow_recall_rank = remove_duplicate(
  783. rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
  784. )
  785. rank_result = []
  786. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
  787. 'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
  788. # 从ROV召回池中获取top k
  789. if len(rov_recall_rank) > 0:
  790. rank_result.extend(rov_recall_rank[:top_K])
  791. rov_recall_rank = rov_recall_rank[top_K:]
  792. else:
  793. rank_result.extend(flow_recall_rank[:top_K])
  794. flow_recall_rank = flow_recall_rank[top_K:]
  795. # 按概率 p 及score排序获取 size - k 个视频
  796. flow_num = 0
  797. i = 0
  798. while i < size - top_K:
  799. # 随机生成[0, 1)浮点数
  800. rand = random.random()
  801. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  802. flow_pool_recall_process[f'{i}_rand'] = rand
  803. if rand < flow_pool_P:
  804. if flow_recall_rank:
  805. rank_result.append(flow_recall_rank[0])
  806. flow_recall_rank.remove(flow_recall_rank[0])
  807. else:
  808. rank_result.extend(rov_recall_rank[:size - top_K - i])
  809. return rank_result[:size], flow_num, flow_pool_recall_process
  810. else:
  811. if rov_recall_rank:
  812. rank_result.append(rov_recall_rank[0])
  813. rov_recall_rank.remove(rov_recall_rank[0])
  814. else:
  815. rank_result.extend(flow_recall_rank[:size - top_K - i])
  816. return rank_result[:size], flow_num, flow_pool_recall_process
  817. i += 1
  818. return rank_result[:size], flow_num, flow_pool_recall_process
  819. def video_new_rank3_4density(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:',
  820. flow_pool_recall_process=None,
  821. app_type=None, rule_key_str = None):
  822. """
  823. 视频分发排序
  824. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  825. :param size: 请求数
  826. :param top_K: 保证topK为召回池视频 type-int
  827. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  828. :param rank_key_prefix:
  829. :return: rank_result
  830. """
  831. # 1 读取多样性密度控制规则 todo 没有判断非字典数据,有报错风险,先用try兜底。
  832. redis_helper = RedisHelper()
  833. density_rules = {}
  834. rules_all = param_update_rule(redis_helper, rule_key_str)
  835. try:
  836. if len(rules_all) != 0:
  837. for k, v in rules_all.items():
  838. if str(app_type) in v.keys():
  839. app_type_rule = v[str(app_type)]
  840. if "density" in app_type_rule.keys():
  841. density_rules[k] = app_type_rule["density"]
  842. except Exception as e:
  843. log_.error("something is wrong in parsing density_rules:{}".format(e))
  844. density_rules = {}
  845. if flow_pool_recall_process is None:
  846. flow_pool_recall_process = {}
  847. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  848. return [], 0, flow_pool_recall_process
  849. rov_recall_rank = data['rov_pool_recall']
  850. vid_keys = []
  851. rec_recall_item_list = []
  852. rec_recall_vid_list = []
  853. for recall_item in data['rov_pool_recall']:
  854. try:
  855. vid = int(recall_item.get("videoId", 0))
  856. rec_recall_vid_list.append(vid)
  857. rec_recall_item_list.append(recall_item)
  858. vid_keys.append(f"{rank_key_prefix}{vid}")
  859. except:
  860. continue
  861. video_scores = redis_helper.get_batch_key(vid_keys)
  862. if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
  863. for i in range(len(video_scores)):
  864. try:
  865. if video_scores[i] is None:
  866. rec_recall_item_list[i]['sort_score'] = 0.0
  867. else:
  868. rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
  869. except Exception:
  870. rec_recall_item_list[i]['sort_score'] = 0.0
  871. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  872. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  873. rov_recall_rank, flow_recall_rank = remove_duplicate(
  874. rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
  875. )
  876. # 2 多样性需求,给video添加tag 无规则时 不需要取tag
  877. if len(density_rules) != 0:
  878. video_ids = []
  879. video_ids.extend([v["videoId"] for v in rov_recall_rank])
  880. video_ids.extend([v["videoId"] for v in flow_recall_rank])
  881. video_ids = list(set(video_ids))
  882. video_tag_dict = get_video_tags(redis_helper, video_ids)
  883. for v in rov_recall_rank:
  884. v["tags"] = video_tag_dict.get(v["videoId"], [])
  885. for v in flow_recall_rank:
  886. v["tags"] = video_tag_dict.get(v["videoId"], [])
  887. rank_result = []
  888. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
  889. 'flow_recall_rank': copy.deepcopy(flow_recall_rank)}
  890. # 从ROV召回池中获取top k
  891. if len(rov_recall_rank) > 0:
  892. rank_result.extend(rov_recall_rank[:top_K])
  893. rov_recall_rank = rov_recall_rank[top_K:]
  894. else:
  895. rank_result.extend(flow_recall_rank[:top_K])
  896. flow_recall_rank = flow_recall_rank[top_K:]
  897. # 按概率 p 及score排序获取 size - k 个视频
  898. flow_num = 0
  899. i = 0
  900. # print("zb-flow_pool_recall_process:" + str(flow_pool_recall_process))
  901. # print("zb-density_rules:" + str(density_rules))
  902. # print("zb-2:" + str([i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
  903. # print("zb-3:" + str([i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]))
  904. while i < size - top_K:
  905. # 随机生成[0, 1)浮点数
  906. rand = random.random()
  907. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  908. flow_pool_recall_process[f'{i}_rand'] = rand
  909. if rand < flow_pool_P:
  910. if flow_recall_rank:
  911. rank_result.append(flow_recall_rank[0])
  912. flow_recall_rank.remove(flow_recall_rank[0])
  913. else:
  914. rank_result.extend(rov_recall_rank[:size - top_K - i])
  915. # todo zhangbo rank
  916. rov_recall_rank_new = [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  917. flow_recall_rank_new = [i for i in flow_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  918. result = merge_density_control(
  919. rank_result[:size],
  920. rov_recall_rank_new,
  921. flow_recall_rank_new,
  922. density_rules
  923. )
  924. return result[:size], flow_num, flow_pool_recall_process
  925. else:
  926. if rov_recall_rank:
  927. rank_result.append(rov_recall_rank[0])
  928. rov_recall_rank.remove(rov_recall_rank[0])
  929. else:
  930. rank_result.extend(flow_recall_rank[:size - top_K - i])
  931. # todo zhangbo rank
  932. rov_recall_rank_new = [i for i in rov_recall_rank if
  933. i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  934. flow_recall_rank_new = [i for i in flow_recall_rank if
  935. i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  936. result = merge_density_control(
  937. rank_result[:size],
  938. rov_recall_rank_new,
  939. flow_recall_rank_new,
  940. density_rules
  941. )
  942. return result[:size], flow_num, flow_pool_recall_process
  943. i += 1
  944. # todo zhangbo rank
  945. rov_recall_rank_new = [i for i in rov_recall_rank if i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  946. flow_recall_rank_new = [i for i in flow_recall_rank if
  947. i["videoId"] not in [j["videoId"] for j in rank_result[:size]]]
  948. result = merge_density_control(
  949. rank_result[:size],
  950. rov_recall_rank_new,
  951. flow_recall_rank_new,
  952. density_rules
  953. )
  954. return result[:size], flow_num, flow_pool_recall_process
  955. # 排序服务兜底
  956. def sup_rank(video_scores, recall_list):
  957. if video_scores and len(recall_list) > 0:
  958. for i in range(len(video_scores)):
  959. try:
  960. if video_scores[i] is None:
  961. recall_list[i]['sort_score'] = 0.0
  962. else:
  963. video_score_str = json.loads(video_scores[i])
  964. recall_list[i]['flag_call_service'] = 0
  965. recall_list[i]['sort_score'] = video_score_str[0]
  966. except Exception:
  967. recall_list[i]['sort_score'] = 0.0
  968. rov_recall_rank = sorted(recall_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  969. #print("rov_recall_rank:", rov_recall_rank)
  970. else:
  971. rov_recall_rank = recall_list
  972. return rov_recall_rank
  973. def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  974. """
  975. 视频分发排序
  976. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  977. :param size: 请求数
  978. :param top_K: 保证topK为召回池视频 type-int
  979. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  980. :return: rank_result
  981. """
  982. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  983. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  984. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  985. return [], 0
  986. # 地域分组小时级规则更新数据
  987. recall_dict = {}
  988. region_h_recall = [item for item in data['rov_pool_recall']
  989. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  990. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  991. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  992. # 地域分组小时级更新24h规则更新数据
  993. region_24h_recall = [item for item in data['rov_pool_recall']
  994. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  995. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  996. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  997. # 相对24h规则更新数据
  998. rule_24h_recall = [item for item in data['rov_pool_recall']
  999. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  1000. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1001. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  1002. # 相对24h规则筛选后剩余更新数据
  1003. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  1004. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  1005. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1006. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  1007. hot_recall = []
  1008. w2v_recall =[]
  1009. sim_recall = []
  1010. u2u2i_recall = []
  1011. if ab_Code==60058:
  1012. if len(data['u2i_recall'])>0:
  1013. hot_recall = sorted(data['u2i_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  1014. recall_dict['u2i_recall'] = hot_recall
  1015. elif ab_Code==60059:
  1016. if len(data['w2v_recall'])>0:
  1017. recall_dict['w2v_recall'] = data['w2v_recall']
  1018. else:
  1019. recall_dict['w2v_recall'] = w2v_recall
  1020. elif ab_Code==60061 or ab_Code==60063:
  1021. if len(data['sim_recall'])>0:
  1022. recall_dict['sim_recall'] = data['sim_recall']
  1023. else:
  1024. recall_dict['sim_recall'] = sim_recall
  1025. elif ab_Code==60062:
  1026. if len(data['u2u2i_recall'])>0:
  1027. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  1028. else:
  1029. recall_dict['u2u2i_recall'] = u2u2i_recall
  1030. recall_list = [('rov_recall_region_h',1, 1),('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
  1031. ('u2i_recall',0.5,1), ('w2v_recall',0.5,1),('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
  1032. if exp_config and exp_config['recall_list']:
  1033. recall_list = exp_config['recall_list']
  1034. #print("recall_config:", recall_list)
  1035. rov_recall_rank = []
  1036. select_ids = set('')
  1037. for i in range(3):
  1038. if len(rov_recall_rank)>8:
  1039. break
  1040. for per_recall_item in recall_list:
  1041. per_recall_name = per_recall_item[0]
  1042. per_recall_freq = per_recall_item[1]
  1043. per_limt_num = per_recall_item[2]
  1044. rand_num = random.random()
  1045. #print(recall_dict[per_recall_name])
  1046. if rand_num<per_recall_freq and per_recall_name in recall_dict:
  1047. per_recall = recall_dict[per_recall_name]
  1048. #print("per_recall_item:", per_recall_item)
  1049. cur_recall_num = 0
  1050. for recall_item in per_recall:
  1051. vid = recall_item['videoId']
  1052. if vid in select_ids:
  1053. continue
  1054. rov_recall_rank.append(recall_item)
  1055. select_ids.add(vid)
  1056. cur_recall_num+=1
  1057. if cur_recall_num>=per_limt_num:
  1058. break
  1059. # print("rov_recall_rank:")
  1060. # print(rov_recall_rank)
  1061. #rov_recall_rank = region_h_recall_rank + region_24h_recall_rank + \
  1062. # rule_24h_recall_rank + rule_24h_dup_recall_rank
  1063. # 流量池
  1064. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  1065. # 对各路召回的视频进行去重
  1066. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  1067. top_K=top_K)
  1068. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  1069. # rov_recall_rank, flow_recall_rank))
  1070. # rank_result = relevant_recall_rank
  1071. rank_result = []
  1072. # 从ROV召回池中获取top k
  1073. if len(rov_recall_rank) > 0:
  1074. rank_result.extend(rov_recall_rank[:top_K])
  1075. rov_recall_rank = rov_recall_rank[top_K:]
  1076. else:
  1077. rank_result.extend(flow_recall_rank[:top_K])
  1078. flow_recall_rank = flow_recall_rank[top_K:]
  1079. flow_num = 0
  1080. flowConfig =0
  1081. if exp_config and exp_config['flowConfig']:
  1082. flowConfig = exp_config['flowConfig']
  1083. if flowConfig == 1 and len(rov_recall_rank) > 0:
  1084. rank_result.extend(rov_recall_rank[:top_K])
  1085. for recall_item in rank_result:
  1086. flow_recall_name = recall_item.get("flowPool", '')
  1087. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  1088. flow_num = flow_num + 1
  1089. all_recall_rank = rov_recall_rank + flow_recall_rank
  1090. if flow_num > 0:
  1091. rank_result.extend(all_recall_rank[:size - top_K])
  1092. return rank_result[:size], flow_num
  1093. else:
  1094. # 按概率 p 及score排序获取 size - k 个视频
  1095. i = 0
  1096. while i < size - top_K:
  1097. # 随机生成[0, 1)浮点数
  1098. rand = random.random()
  1099. # log_.info('rand: {}'.format(rand))
  1100. if rand < flow_pool_P:
  1101. if flow_recall_rank:
  1102. rank_result.append(flow_recall_rank[0])
  1103. flow_recall_rank.remove(flow_recall_rank[0])
  1104. else:
  1105. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1106. return rank_result[:size], flow_num
  1107. else:
  1108. if rov_recall_rank:
  1109. rank_result.append(rov_recall_rank[0])
  1110. rov_recall_rank.remove(rov_recall_rank[0])
  1111. else:
  1112. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1113. return rank_result[:size], flow_num
  1114. i += 1
  1115. else:
  1116. # 按概率 p 及score排序获取 size - k 个视频
  1117. i = 0
  1118. while i < size - top_K:
  1119. # 随机生成[0, 1)浮点数
  1120. rand = random.random()
  1121. # log_.info('rand: {}'.format(rand))
  1122. if rand < flow_pool_P:
  1123. if flow_recall_rank:
  1124. rank_result.append(flow_recall_rank[0])
  1125. flow_recall_rank.remove(flow_recall_rank[0])
  1126. else:
  1127. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1128. return rank_result[:size], flow_num
  1129. else:
  1130. if rov_recall_rank:
  1131. rank_result.append(rov_recall_rank[0])
  1132. rov_recall_rank.remove(rov_recall_rank[0])
  1133. else:
  1134. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1135. return rank_result[:size],flow_num
  1136. i += 1
  1137. return rank_result[:size], flow_num
  1138. def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  1139. """
  1140. 视频分发排序
  1141. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  1142. :param size: 请求数
  1143. :param top_K: 保证topK为召回池视频 type-int
  1144. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  1145. :return: rank_result
  1146. """
  1147. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  1148. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  1149. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  1150. return [], 0
  1151. # 地域分组小时级规则更新数据
  1152. recall_dict = {}
  1153. region_h_recall = [item for item in data['rov_pool_recall']
  1154. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  1155. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1156. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  1157. # 地域分组小时级更新24h规则更新数据
  1158. region_24h_recall = [item for item in data['rov_pool_recall']
  1159. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  1160. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1161. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  1162. # 相对24h规则更新数据
  1163. rule_24h_recall = [item for item in data['rov_pool_recall']
  1164. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  1165. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1166. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  1167. # 相对24h规则筛选后剩余更新数据
  1168. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  1169. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  1170. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1171. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  1172. u2i_recall = []
  1173. u2i_play_recall = []
  1174. w2v_recall =[]
  1175. sim_recall = []
  1176. u2u2i_recall = []
  1177. return_video_recall = []
  1178. #print("")
  1179. if ab_Code==60058:
  1180. if len(data['u2i_recall'])>0:
  1181. recall_dict['u2i_recall'] = data['u2i_recall']
  1182. else:
  1183. recall_dict['u2i_recall'] = u2i_recall
  1184. if len(data['u2i_play_recall']) > 0:
  1185. recall_dict['u2i_play_recall'] = data['u2i_play_recall']
  1186. else:
  1187. recall_dict['u2i_play_recall'] = u2i_play_recall
  1188. elif ab_Code==60059:
  1189. if len(data['w2v_recall'])>0:
  1190. recall_dict['w2v_recall'] = data['w2v_recall']
  1191. else:
  1192. recall_dict['w2v_recall'] = w2v_recall
  1193. elif ab_Code==60061 or ab_Code==60063:
  1194. if len(data['sim_recall'])>0:
  1195. recall_dict['sim_recall'] = data['sim_recall']
  1196. else:
  1197. recall_dict['sim_recall'] = sim_recall
  1198. elif ab_Code==60062:
  1199. if len(data['u2u2i_recall'])>0:
  1200. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  1201. else:
  1202. recall_dict['u2u2i_recall'] = u2u2i_recall
  1203. elif ab_Code==60064:
  1204. if len(data['return_video_recall'])>0:
  1205. recall_dict['return_video_recall'] = data['return_video_recall']
  1206. else:
  1207. recall_dict['return_video_recall'] = return_video_recall
  1208. recall_pos1 = [('rov_recall_region_h',0, 0.98),('rov_recall_24h',0.98, 1),('rov_recall_region_24h',0,1),
  1209. ('rov_recall_24h',0,1), ('rov_recall_24h_dup',0,1)]
  1210. recall_pos2 = [('rov_recall_region_h',0,0.98),('rov_recall_24h',0.98,1),('rov_recall_region_24h',0,1),
  1211. ('rov_recall_24h',0,1),('rov_recall_24h_dup',0,1)]
  1212. recall_pos3 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0.98,1), ('rov_recall_region_24h', 0,1),
  1213. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1214. recall_pos4 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0,0.02), ('rov_recall_region_24h', 0,1),
  1215. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1216. if exp_config and 'recall_pos1' in exp_config \
  1217. and 'recall_pos2' in exp_config \
  1218. and 'recall_pos3' in exp_config \
  1219. and 'recall_pos4' in exp_config :
  1220. recall_pos1 = exp_config['recall_pos1']
  1221. recall_pos2 = exp_config['recall_pos2']
  1222. recall_pos3 = exp_config['recall_pos3']
  1223. recall_pos4 = exp_config['recall_pos4']
  1224. #print("recall_config:", recall_pos1)
  1225. rov_recall_rank = []
  1226. recall_list = []
  1227. recall_list.append(recall_pos1)
  1228. recall_list.append(recall_pos2)
  1229. recall_list.append(recall_pos3)
  1230. recall_list.append(recall_pos4)
  1231. select_ids = set('')
  1232. recall_num_limit_dict = {}
  1233. if exp_config and 'recall_num_limit' in exp_config:
  1234. recall_num_limit_dict = exp_config['recall_num_limit']
  1235. exp_recall_dict = {}
  1236. #index_pos = 0
  1237. for j in range(3):
  1238. if len(rov_recall_rank)>12:
  1239. break
  1240. # choose pos
  1241. for recall_pos_config in recall_list:
  1242. rand_num = random.random()
  1243. index_pos = 0
  1244. # choose pos recall
  1245. for per_recall_item in recall_pos_config:
  1246. if index_pos == 1:
  1247. break
  1248. if len(per_recall_item)<3:
  1249. continue
  1250. per_recall_name = per_recall_item[0]
  1251. per_recall_min = float(per_recall_item[1])
  1252. per_recall_max = float(per_recall_item[2])
  1253. per_recall_num = exp_recall_dict.get(per_recall_name, 0)
  1254. per_recall_total_num = recall_num_limit_dict.get(per_recall_name, 0)
  1255. # recall set total num
  1256. if len(recall_num_limit_dict)>0 and per_recall_total_num>0 and per_recall_num>= per_recall_total_num:
  1257. continue
  1258. if rand_num >= per_recall_min and rand_num < per_recall_max and per_recall_name in recall_dict:
  1259. per_recall = recall_dict[per_recall_name]
  1260. for recall_item in per_recall:
  1261. vid = recall_item['videoId']
  1262. if vid in select_ids:
  1263. continue
  1264. recall_item['rand'] = rand_num
  1265. rov_recall_rank.append(recall_item)
  1266. select_ids.add(vid)
  1267. if per_recall_name in exp_recall_dict:
  1268. exp_recall_dict[per_recall_name] +=1
  1269. else:
  1270. exp_recall_dict[per_recall_name] = 1
  1271. index_pos = 1
  1272. break
  1273. #print("rov_recall_rank:", rov_recall_rank)
  1274. if len(rov_recall_rank)<4:
  1275. rov_doudi_rank = region_h_recall_rank + sim_recall + u2i_recall + u2u2i_recall + w2v_recall +return_video_recall+u2i_play_recall+ region_24h_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank
  1276. for recall_item in rov_doudi_rank:
  1277. vid = recall_item['videoId']
  1278. if vid in select_ids:
  1279. continue
  1280. rov_recall_rank.append(recall_item)
  1281. select_ids.add(vid)
  1282. if len(rov_recall_rank)>12:
  1283. break
  1284. # print("rov_recall_rank:")
  1285. #print(rov_recall_rank)
  1286. # 流量池
  1287. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  1288. # 对各路召回的视频进行去重
  1289. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  1290. top_K=top_K)
  1291. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  1292. # rov_recall_rank, flow_recall_rank))
  1293. # rank_result = relevant_recall_rank
  1294. rank_result = []
  1295. # 从ROV召回池中获取top k
  1296. if len(rov_recall_rank) > 0:
  1297. rank_result.extend(rov_recall_rank[:top_K])
  1298. rov_recall_rank = rov_recall_rank[top_K:]
  1299. else:
  1300. rank_result.extend(flow_recall_rank[:top_K])
  1301. flow_recall_rank = flow_recall_rank[top_K:]
  1302. flow_num = 0
  1303. flowConfig =0
  1304. if exp_config and exp_config['flowConfig']:
  1305. flowConfig = exp_config['flowConfig']
  1306. if flowConfig == 1 and len(rov_recall_rank) > 0:
  1307. rank_result.extend(rov_recall_rank[:top_K])
  1308. for recall_item in rank_result:
  1309. flow_recall_name = recall_item.get("flowPool", '')
  1310. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  1311. flow_num = flow_num + 1
  1312. all_recall_rank = rov_recall_rank + flow_recall_rank
  1313. if flow_num > 0:
  1314. rank_result.extend(all_recall_rank[:size - top_K])
  1315. return rank_result[:size], flow_num
  1316. else:
  1317. # 按概率 p 及score排序获取 size - k 个视频
  1318. i = 0
  1319. while i < size - top_K:
  1320. # 随机生成[0, 1)浮点数
  1321. rand = random.random()
  1322. # log_.info('rand: {}'.format(rand))
  1323. if rand < flow_pool_P:
  1324. if flow_recall_rank:
  1325. rank_result.append(flow_recall_rank[0])
  1326. flow_recall_rank.remove(flow_recall_rank[0])
  1327. else:
  1328. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1329. return rank_result[:size], flow_num
  1330. else:
  1331. if rov_recall_rank:
  1332. rank_result.append(rov_recall_rank[0])
  1333. rov_recall_rank.remove(rov_recall_rank[0])
  1334. else:
  1335. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1336. return rank_result[:size], flow_num
  1337. i += 1
  1338. else:
  1339. # 按概率 p 及score排序获取 size - k 个视频
  1340. i = 0
  1341. while i < size - top_K:
  1342. # 随机生成[0, 1)浮点数
  1343. rand = random.random()
  1344. # log_.info('rand: {}'.format(rand))
  1345. if rand < flow_pool_P:
  1346. if flow_recall_rank:
  1347. rank_result.append(flow_recall_rank[0])
  1348. flow_recall_rank.remove(flow_recall_rank[0])
  1349. else:
  1350. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1351. return rank_result[:size], flow_num
  1352. else:
  1353. if rov_recall_rank:
  1354. rank_result.append(rov_recall_rank[0])
  1355. rov_recall_rank.remove(rov_recall_rank[0])
  1356. else:
  1357. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1358. return rank_result[:size],flow_num
  1359. i += 1
  1360. return rank_result[:size], flow_num
  1361. def merge_density_control(
  1362. data: list, rov: list, flow: list, rule: dict
  1363. ) -> list:
  1364. if len(rule) == 0:
  1365. return data
  1366. # 1 判断是否满足规则
  1367. status_cur: Dict[str, int] = {}
  1368. for d in data:
  1369. if "tags" in d.keys() and len(d["tags"]) != 0:
  1370. for t in d["tags"]:
  1371. if t in rule.keys():
  1372. status_cur[t] = 1 + status_cur[t] if t in status_cur.keys() else 1
  1373. if len(status_cur) == 0:
  1374. return data
  1375. status_cur_illegal: Dict[str, int] = {}
  1376. for k, v in status_cur.items():
  1377. if k in rule.keys() and rule[k] < v:
  1378. status_cur_illegal[k] = v - rule[k]
  1379. if len(status_cur_illegal) == 0:
  1380. return data
  1381. # 2 反向遍历,直到status_cur_illegal满足,记录要替换的index和召回池标记。
  1382. indexes = []
  1383. pushes = []
  1384. var1 = len(data)
  1385. for i in range(var1-1, -1, -1):
  1386. d = data[i]
  1387. tags = d["tags"] if "tags" in d.keys() else []
  1388. inters = set(tags) & set(status_cur_illegal.keys())
  1389. if len(inters) == 0:
  1390. continue
  1391. indexes.append(i)
  1392. pushes.append(d["flowPool"] if "flowPool" in d.keys() and len(d["flowPool"]) != 0 else "")
  1393. for inter in inters:
  1394. status_cur_illegal[inter] = status_cur_illegal[inter] - 1
  1395. if status_cur_illegal[inter] == 0:
  1396. status_cur_illegal.pop(inter)
  1397. status_cur[inter] = status_cur[inter] - 1
  1398. if status_cur[inter] == 0:
  1399. status_cur.pop(inter)
  1400. # 3 反向遍历index,再正向遍历增补列表,取可替换的video
  1401. for index, push in zip(reversed(indexes), reversed(pushes)):
  1402. if len(push) > 0:
  1403. # 5 如果是flow的video 取不到 不做替换
  1404. candidate = flow
  1405. else:
  1406. # 5 如果是rov的video 取不到 不做替换
  1407. candidate = rov
  1408. for i, d in enumerate(candidate):
  1409. judge_rule_set = judge_rule(rule=rule, status=status_cur)
  1410. tags = set(d["tags"] if "tags" in d.keys() else [])
  1411. if len(judge_rule_set & tags) != 0:
  1412. continue
  1413. # 开始插入
  1414. tmp = copy.deepcopy(data[index])
  1415. d["density_up"] = i
  1416. data[index] = d
  1417. tmp["density_down"] = index
  1418. candidate[i] = tmp
  1419. # 更新状态
  1420. for tag in tags:
  1421. status_cur[tag] = status_cur[tag] + 1 if tag in status_cur.keys() else 1
  1422. break
  1423. if len(push) > 0:
  1424. flow = candidate
  1425. else:
  1426. rov = candidate
  1427. return data
  1428. def judge_rule(rule: dict, status: dict) -> Set:
  1429. result = set()
  1430. for k, v in status.items():
  1431. if k in rule.keys() and rule[k] <= v:
  1432. result.add(k)
  1433. return result
  1434. def get_video_tags(redis_helper, video_ids) -> dict:
  1435. REDIS_PREFIX = "alg_recsys_video_tags_"
  1436. redis_keys = [REDIS_PREFIX + str(i) for i in video_ids]
  1437. video_tags = redis_helper.get_batch_key(redis_keys)
  1438. video_tag_dict = {}
  1439. if video_tags is not None:
  1440. for i, tags_str in enumerate(video_tags):
  1441. tags = []
  1442. if tags_str is not None and len(tags_str) != 0:
  1443. tags = str(tags_str).split(",")
  1444. video_tag_dict[video_ids[i]] = tags
  1445. return video_tag_dict
  1446. if __name__ == '__main__':
  1447. data: list = [
  1448. {'videoId': 1, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
  1449. {'videoId': 2, 'flowPool': ''},
  1450. # {'videoId': 2, 'flowPool': '', 'tags': ['下午好','祝福']},
  1451. {'videoId': 3, 'flowPool': '', 'tags': ['早上好']},
  1452. {'videoId': 4, 'flowPool': 'flow', 'tags': ['下午好','元旦','祝福']},
  1453. ]
  1454. rov = [
  1455. {'videoId': 10, 'flowPool': '', 'tags': ['下午好']},
  1456. {'videoId': 11, 'flowPool': '', 'tags': ['祝福']},
  1457. {'videoId': 12, 'flowPool': '', 'tags': ['下午好','元旦','祝福']},
  1458. {'videoId': 13, 'flowPool': '', 'tags': ['元旦', "下午好"]},
  1459. # {'videoId': 14, 'flowPool': '', 'tags': []},
  1460. # {'videoId': 15, 'flowPool': '', 'tags': []}
  1461. ]
  1462. flow = [
  1463. {'videoId': 20, 'flowPool': 'flow', 'tags': ['下午好']},
  1464. {'videoId': 21, 'flowPool': 'flow', 'tags': ['下午好']},
  1465. {'videoId': 22, 'flowPool': 'flow', 'tags': []},
  1466. {'videoId': 23, 'flowPool': 'flow', 'tags': []}
  1467. ]
  1468. rule = {
  1469. '祝福': 1,
  1470. "下午好": 1
  1471. }
  1472. result = merge_density_control(data, rov, flow, rule)
  1473. print(len(result))
  1474. print(result)
  1475. # d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1476. # {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1477. # {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1478. # {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1479. # {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1480. # {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1481. # {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1482. # {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1483. # {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1484. # {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
  1485. # res = video_rank_by_w_h_rate(videos=d_test)
  1486. # for tmp in res:
  1487. # print(tmp)
  1488. pass