video_rank.py 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306
  1. import json
  2. import random
  3. import numpy
  4. from log import Log
  5. from config import set_config
  6. from video_recall import PoolRecall
  7. from db_helper import RedisHelper
  8. from utils import FilterVideos, send_msg_to_feishu
  9. from rank_service import get_featurs, get_tf_serving_sores
  10. log_ = Log()
  11. config_ = set_config()
  12. def video_rank(data, size, top_K, flow_pool_P, flow_pool_recall_process=None):
  13. """
  14. 视频分发排序
  15. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  16. :param size: 请求数
  17. :param top_K: 保证topK为召回池视频 type-int
  18. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  19. :return: rank_result
  20. """
  21. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  22. return [], flow_pool_recall_process
  23. # 将各路召回的视频按照score从大到小排序
  24. # 最惊奇相关推荐相似视频
  25. # relevant_recall = [item for item in data['rov_pool_recall']
  26. # if item.get('pushFrom') == config_.PUSH_FROM['top_video_relevant_appType_19']]
  27. # relevant_recall_rank = sorted(relevant_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  28. # 最惊奇完整影视视频
  29. # whole_movies_recall = [item for item in data['rov_pool_recall']
  30. # if item.get('pushFrom') == config_.PUSH_FROM['whole_movies']]
  31. # whole_movies_recall_rank = sorted(whole_movies_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  32. # 最惊奇影视解说视频
  33. # talk_videos_recall = [item for item in data['rov_pool_recall']
  34. # if item.get('pushFrom') == config_.PUSH_FROM['talk_videos']]
  35. # talk_videos_recall_rank = sorted(talk_videos_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  36. # 小时级更新数据
  37. # h_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_h']]
  38. # h_recall_rank = sorted(h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  39. # 相对30天天级规则更新数据
  40. day_30_recall = [item for item in data['rov_pool_recall']
  41. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_30day']]
  42. day_30_recall_rank = sorted(day_30_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  43. # 地域分组小时级规则更新数据
  44. region_h_recall = [item for item in data['rov_pool_recall']
  45. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  46. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  47. # 地域分组小时级更新24h规则更新数据
  48. region_24h_recall = [item for item in data['rov_pool_recall']
  49. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  50. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  51. # 地域分组天级规则更新数据
  52. # region_day_recall = [item for item in data['rov_pool_recall']
  53. # if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_day']]
  54. # region_day_recall_rank = sorted(region_day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  55. # 相对24h规则更新数据
  56. rule_24h_recall = [item for item in data['rov_pool_recall']
  57. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  58. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  59. # 相对24h规则筛选后剩余更新数据
  60. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  61. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  62. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  63. # 相对48h规则更新数据
  64. rule_48h_recall = [item for item in data['rov_pool_recall']
  65. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h']]
  66. rule_48h_recall_rank = sorted(rule_48h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  67. # 相对48h规则筛选后剩余更新数据
  68. rule_48h_dup_recall = [item for item in data['rov_pool_recall']
  69. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_48h_dup']]
  70. rule_48h_dup_recall_rank = sorted(rule_48h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  71. # 天级规则更新数据
  72. # day_recall = [item for item in data['rov_pool_recall'] if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_day']]
  73. # day_recall_rank = sorted(day_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  74. # ROV召回池
  75. # rov_initial_recall = [
  76. # item for item in data['rov_pool_recall']
  77. # if item.get('pushFrom') not in
  78. # [config_.PUSH_FROM['top_video_relevant_appType_19'],
  79. # config_.PUSH_FROM['rov_recall_h'],
  80. # config_.PUSH_FROM['rov_recall_region_h'],
  81. # config_.PUSH_FROM['rov_recall_region_24h'],
  82. # config_.PUSH_FROM['rov_recall_region_day'],
  83. # config_.PUSH_FROM['rov_recall_24h'],
  84. # config_.PUSH_FROM['rov_recall_24h_dup'],
  85. # config_.PUSH_FROM['rov_recall_48h'],
  86. # config_.PUSH_FROM['rov_recall_48h_dup'],
  87. # config_.PUSH_FROM['rov_recall_day'],
  88. # config_.PUSH_FROM['whole_movies'],
  89. # config_.PUSH_FROM['talk_videos']]
  90. # ]
  91. # rov_initial_recall_rank = sorted(rov_initial_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  92. # rov_recall_rank = whole_movies_recall_rank + talk_videos_recall_rank + h_recall_rank + \
  93. # day_30_recall_rank + region_h_recall_rank + region_24h_recall_rank + \
  94. # region_day_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  95. # rule_48h_recall_rank + rule_48h_dup_recall_rank + \
  96. # day_recall_rank + rov_initial_recall_rank
  97. rov_recall_rank = day_30_recall_rank + \
  98. region_h_recall_rank + region_24h_recall_rank + \
  99. rule_24h_recall_rank + rule_24h_dup_recall_rank + \
  100. rule_48h_recall_rank + rule_48h_dup_recall_rank
  101. # 流量池
  102. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  103. # 对各路召回的视频进行去重
  104. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  105. top_K=top_K)
  106. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  107. # rov_recall_rank, flow_recall_rank))
  108. # rank_result = relevant_recall_rank
  109. rank_result = []
  110. # add_flow_pool_recall_log
  111. if flow_pool_recall_process is None:
  112. flow_pool_recall_process = {}
  113. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank,
  114. 'flow_recall_rank': flow_recall_rank}
  115. # 从ROV召回池中获取top k
  116. if len(rov_recall_rank) > 0:
  117. rank_result.extend(rov_recall_rank[:top_K])
  118. rov_recall_rank = rov_recall_rank[top_K:]
  119. else:
  120. rank_result.extend(flow_recall_rank[:top_K])
  121. flow_recall_rank = flow_recall_rank[top_K:]
  122. # 按概率 p 及score排序获取 size - k 个视频
  123. i = 0
  124. while i < size - top_K:
  125. # 随机生成[0, 1)浮点数
  126. rand = random.random()
  127. # add_flow_pool_recall_log
  128. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  129. flow_pool_recall_process[f'{i}_rand'] = rand
  130. # log_.info('rand: {}'.format(rand))
  131. if rand < flow_pool_P:
  132. if flow_recall_rank:
  133. rank_result.append(flow_recall_rank[0])
  134. flow_recall_rank.remove(flow_recall_rank[0])
  135. else:
  136. rank_result.extend(rov_recall_rank[:size - top_K - i])
  137. return rank_result[:size], flow_pool_recall_process
  138. else:
  139. if rov_recall_rank:
  140. rank_result.append(rov_recall_rank[0])
  141. rov_recall_rank.remove(rov_recall_rank[0])
  142. else:
  143. rank_result.extend(flow_recall_rank[:size - top_K - i])
  144. return rank_result[:size], flow_pool_recall_process
  145. i += 1
  146. return rank_result[:size], flow_pool_recall_process
  147. def video_new_rank(videoIds, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  148. """
  149. 视频分发排序
  150. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  151. :param size: 请求数
  152. :param top_K: 保证topK为召回池视频 type-int
  153. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  154. :return: rank_result
  155. """
  156. add_flow_set = set('')
  157. if not videoIds or len(videoIds)==0:
  158. return [], add_flow_set
  159. redisObj = RedisHelper()
  160. vidKeys = []
  161. for vid in videoIds:
  162. vidKeys.append("k_p:"+str(vid))
  163. #print("vidKeys:", vidKeys)
  164. video_scores = redisObj.get_batch_key(vidKeys)
  165. #print(video_scores)
  166. video_items = []
  167. for i in range(len(video_scores)):
  168. try:
  169. #print(video_scores[i])
  170. if video_scores[i] is None:
  171. video_items.append((videoIds[i], 0.0))
  172. else:
  173. video_score_str = json.loads(video_scores[i])
  174. #print("video_score_str:",video_score_str)
  175. video_items.append((videoIds[i], video_score_str[0]))
  176. except Exception:
  177. video_items.append((videoIds[i], 0.0))
  178. sort_items = sorted(video_items, key=lambda k: k[1], reverse=True)
  179. #print("sort_items:", sort_items)
  180. rov_recall_rank = sort_items
  181. fast_flow_recall_rank = []
  182. flow_recall_rank = []
  183. for item in sort_items:
  184. if item[0] in fast_flow_set:
  185. fast_flow_recall_rank.append(item)
  186. elif item[0] in flow_set:
  187. flow_recall_rank.append(item)
  188. # all flow result
  189. all_flow_recall_rank = fast_flow_recall_rank+flow_recall_rank
  190. rank_result = []
  191. rank_set = set('')
  192. # 从ROV召回池中获取top k
  193. if len(rov_recall_rank) > 0:
  194. rank_result.extend(rov_recall_rank[:top_K])
  195. rov_recall_rank = rov_recall_rank[top_K:]
  196. else:
  197. rank_result.extend(all_flow_recall_rank[:top_K])
  198. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  199. for rank_item in rank_result:
  200. rank_set.add(rank_item[0])
  201. #print("rank_result:", rank_result)
  202. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  203. i = 0
  204. left_quato = size - top_K
  205. j = 0
  206. jj = 0
  207. while i < left_quato and (j<len(all_flow_recall_rank) or jj<len(rov_recall_rank)):
  208. # 随机生成[0, 1)浮点数
  209. rand = random.random()
  210. # log_.info('rand: {}'.format(rand))
  211. if rand < flow_pool_P:
  212. for flow_item in all_flow_recall_rank:
  213. j+=1
  214. if flow_item[0] in rank_set:
  215. continue
  216. else:
  217. rank_result.append(flow_item)
  218. rank_set.add(flow_item[0])
  219. add_flow_set.add(flow_item[0])
  220. i += 1
  221. if i>= left_quato:
  222. break
  223. else:
  224. for recall_item in rov_recall_rank:
  225. jj+=1
  226. if recall_item[0] in rank_set:
  227. continue
  228. else:
  229. rank_result.append(recall_item)
  230. rank_set.add(recall_item[0])
  231. i += 1
  232. if i>= left_quato:
  233. break
  234. #print("rank_result:", rank_result)
  235. #print("add_flow_set:", add_flow_set)
  236. return rank_result[:size], add_flow_set
  237. def refactor_video_rank(rov_recall_rank, fast_flow_set, flow_set, size, top_K, flow_pool_P):
  238. """
  239. 视频分发排序
  240. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  241. :param size: 请求数
  242. :param top_K: 保证topK为召回池视频 type-int
  243. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  244. :return: rank_result
  245. """
  246. if not rov_recall_rank or len(rov_recall_rank) == 0:
  247. return []
  248. fast_flow_recall_rank = []
  249. flow_recall_rank = []
  250. for item in rov_recall_rank:
  251. vid = item.get('videoId', 0)
  252. #print(item)
  253. if vid in fast_flow_set:
  254. fast_flow_recall_rank.append(item)
  255. elif vid in flow_set:
  256. flow_recall_rank.append(item)
  257. # all flow result
  258. all_flow_recall_rank = fast_flow_recall_rank + flow_recall_rank
  259. rank_result = []
  260. rank_set = set('')
  261. # 从ROV召回池中获取top k
  262. if len(rov_recall_rank) > 0:
  263. rank_result.extend(rov_recall_rank[:top_K])
  264. rov_recall_rank = rov_recall_rank[top_K:]
  265. else:
  266. rank_result.extend(all_flow_recall_rank[:top_K])
  267. all_flow_recall_rank = all_flow_recall_rank[top_K:]
  268. #已存放了多少VID
  269. for rank_item in rank_result:
  270. rank_set.add(rank_item.get('videoId', 0))
  271. # 按概率 p 及score排序获取 size - k 个视频, 第4个位置按概率取流量池
  272. i = 0
  273. while i < size - top_K:
  274. # 随机生成[0, 1)浮点数
  275. rand = random.random()
  276. # log_.info('rand: {}'.format(rand))
  277. if rand < flow_pool_P:
  278. for flow_item in all_flow_recall_rank:
  279. flow_vid = flow_item.get('videoId', 0)
  280. if flow_vid in rank_set:
  281. continue
  282. else:
  283. rank_result.append(flow_item)
  284. rank_set.add(flow_vid)
  285. else:
  286. for recall_item in rov_recall_rank:
  287. flow_vid = recall_item.get('videoId', 0)
  288. if flow_vid in rank_set:
  289. continue
  290. else:
  291. rank_result.append(recall_item)
  292. rank_set.add(flow_vid)
  293. i += 1
  294. return rank_result[:size]
  295. def remove_duplicate(rov_recall, flow_recall, top_K):
  296. """
  297. 对多路召回的视频去重
  298. 去重原则:
  299. 如果视频在ROV召回池topK,则保留ROV召回池,否则保留流量池
  300. :param rov_recall: ROV召回池-已排序
  301. :param flow_recall: 流量池-已排序
  302. :param top_K: 保证topK为召回池视频 type-int
  303. :return:
  304. """
  305. flow_recall_result = []
  306. rov_recall_remove = []
  307. flow_recall_video_ids = [item['videoId'] for item in flow_recall]
  308. # rov_recall topK
  309. for item in rov_recall[:top_K]:
  310. if item['videoId'] in flow_recall_video_ids:
  311. flow_recall_video_ids.remove(item['videoId'])
  312. # other
  313. for item in rov_recall[top_K:]:
  314. if item['videoId'] in flow_recall_video_ids:
  315. rov_recall_remove.append(item)
  316. # rov recall remove
  317. for item in rov_recall_remove:
  318. rov_recall.remove(item)
  319. # flow recall remove
  320. for item in flow_recall:
  321. if item['videoId'] in flow_recall_video_ids:
  322. flow_recall_result.append(item)
  323. return rov_recall, flow_recall_result
  324. def bottom_strategy(request_id, size, app_type, ab_code, params):
  325. """
  326. 兜底策略: 从ROV召回池中获取top1000,进行状态过滤后的视频
  327. :param request_id: request_id
  328. :param size: 需要获取的视频数
  329. :param app_type: 产品标识 type-int
  330. :param ab_code: abCode
  331. :param params:
  332. :return:
  333. """
  334. pool_recall = PoolRecall(request_id=request_id, app_type=app_type, ab_code=ab_code)
  335. key_name, _ = pool_recall.get_pool_redis_key(pool_type='rov')
  336. redis_helper = RedisHelper(params=params)
  337. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=1000)
  338. if not data:
  339. log_.info('{} —— ROV推荐进入了二次兜底, data = {}'.format(config_.ENV_TEXT, data))
  340. send_msg_to_feishu('{} —— ROV推荐进入了二次兜底,请查看是否有数据更新失败问题。'.format(config_.ENV_TEXT))
  341. # 二次兜底
  342. bottom_data = bottom_strategy_last(size=size, app_type=app_type, ab_code=ab_code, params=params)
  343. return bottom_data
  344. # 视频状态过滤采用离线定时过滤方案
  345. # 状态过滤
  346. # filter_videos = FilterVideos(app_type=app_type, video_ids=data)
  347. # filtered_data = filter_videos.filter_video_status(video_ids=data)
  348. if len(data) > size:
  349. random_data = numpy.random.choice(data, size, False)
  350. else:
  351. random_data = data
  352. bottom_data = [{'videoId': int(item), 'pushFrom': config_.PUSH_FROM['bottom'], 'abCode': ab_code}
  353. for item in random_data]
  354. return bottom_data
  355. def bottom_strategy_last(size, app_type, ab_code, params):
  356. """
  357. 兜底策略: 从兜底视频中随机获取视频,进行状态过滤后的视频
  358. :param size: 需要获取的视频数
  359. :param app_type: 产品标识 type-int
  360. :param ab_code: abCode
  361. :param params:
  362. :return:
  363. """
  364. redis_helper = RedisHelper(params=params)
  365. bottom_data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  366. random_data = numpy.random.choice(bottom_data, size * 30, False)
  367. # 视频状态过滤采用离线定时过滤方案
  368. # 状态过滤
  369. # filter_videos = FilterVideos(app_type=app_type, video_ids=random_data)
  370. # filtered_data = filter_videos.filter_video_status(video_ids=random_data)
  371. bottom_data = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom_last'], 'abCode': ab_code}
  372. for video_id in random_data[:size]]
  373. return bottom_data
  374. def bottom_strategy2(size, app_type, mid, uid, ab_code, client_info, params):
  375. """
  376. 兜底策略: 从兜底视频中随机获取视频,进行过滤后的视频
  377. :param size: 需要获取的视频数
  378. :param app_type: 产品标识 type-int
  379. :param mid: mid
  380. :param uid: uid
  381. :param ab_code: abCode
  382. :param client_info: 地域信息
  383. :param params:
  384. :return:
  385. """
  386. # 获取存在城市分组数据的城市编码列表
  387. city_code_list = [code for _, code in config_.CITY_CODE.items()]
  388. # 获取provinceCode
  389. province_code = client_info.get('provinceCode', '-1')
  390. # 获取cityCode
  391. city_code = client_info.get('cityCode', '-1')
  392. if city_code in city_code_list:
  393. # 分城市数据存在时,获取城市分组数据
  394. region_code = city_code
  395. else:
  396. region_code = province_code
  397. if region_code == '':
  398. region_code = '-1'
  399. redis_helper = RedisHelper(params=params)
  400. bottom_data = redis_helper.get_data_from_set(key_name=config_.BOTTOM2_KEY_NAME)
  401. bottom_result = []
  402. if bottom_data is None:
  403. return bottom_result
  404. if len(bottom_data) > 0:
  405. try:
  406. random_data = numpy.random.choice(bottom_data, size * 5, False)
  407. except Exception as e:
  408. random_data = bottom_data
  409. video_ids = [int(item) for item in random_data]
  410. # 过滤
  411. filter_ = FilterVideos(request_id=params.request_id, app_type=app_type, mid=mid, uid=uid, video_ids=video_ids)
  412. filtered_data = filter_.filter_videos(pool_type='flow', region_code=region_code)
  413. if filtered_data:
  414. bottom_result = [{'videoId': int(video_id), 'pushFrom': config_.PUSH_FROM['bottom2'], 'abCode': ab_code}
  415. for video_id in filtered_data[:size]]
  416. return bottom_result
  417. def video_rank_by_w_h_rate(videos):
  418. """
  419. 视频宽高比实验(每组的前两个视频调整为横屏视频),根据视频宽高比信息对视频进行重排
  420. :param videos:
  421. :return:
  422. """
  423. redis_helper = RedisHelper()
  424. # ##### 判断前两个视频是否是置顶视频 或者 流量池视频
  425. top_2_push_from_flag = [False, False]
  426. for i, video in enumerate(videos[:2]):
  427. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  428. top_2_push_from_flag[i] = True
  429. if top_2_push_from_flag[0] and top_2_push_from_flag[1]:
  430. return videos
  431. # ##### 判断前两个视频是否为横屏
  432. top_2_w_h_rate_flag = [False, False]
  433. for i, video in enumerate(videos[:2]):
  434. if video['pushFrom'] in [config_.PUSH_FROM['top'], config_.PUSH_FROM['flow_recall']]:
  435. # 视频来源为置顶 或 流量池时,不做判断
  436. top_2_w_h_rate_flag[i] = True
  437. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  438. # 视频来源为 rov召回池 或 一层兜底时,判断是否是横屏
  439. w_h_rate = redis_helper.get_score_with_value(
  440. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  441. if w_h_rate is not None:
  442. top_2_w_h_rate_flag[i] = True
  443. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  444. # 视频来源为 二层兜底时,判断是否是横屏
  445. w_h_rate = redis_helper.get_score_with_value(
  446. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  447. if w_h_rate is not None:
  448. top_2_w_h_rate_flag[i] = True
  449. if top_2_w_h_rate_flag[0] and top_2_w_h_rate_flag[1]:
  450. return videos
  451. # ##### 前两个视频中有不符合前面两者条件的,对视频进行位置调整
  452. # 记录横屏视频位置
  453. horizontal_video_index = []
  454. # 记录流量池视频位置
  455. flow_video_index = []
  456. # 记录置顶视频位置
  457. top_video_index = []
  458. for i, video in enumerate(videos):
  459. # 视频来源为置顶
  460. if video['pushFrom'] == config_.PUSH_FROM['top']:
  461. top_video_index.append(i)
  462. # 视频来源为流量池
  463. elif video['pushFrom'] == config_.PUSH_FROM['flow_recall']:
  464. flow_video_index.append(i)
  465. # 视频来源为rov召回池 或 一层兜底
  466. elif video['pushFrom'] in [config_.PUSH_FROM['rov_recall'], config_.PUSH_FROM['bottom']]:
  467. w_h_rate = redis_helper.get_score_with_value(
  468. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'], value=video['videoId'])
  469. if w_h_rate is not None:
  470. horizontal_video_index.append(i)
  471. else:
  472. continue
  473. # 视频来源为 二层兜底
  474. elif video['pushFrom'] == config_.PUSH_FROM['bottom_last']:
  475. w_h_rate = redis_helper.get_score_with_value(
  476. key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['bottom_last'], value=video['videoId'])
  477. if w_h_rate is not None:
  478. horizontal_video_index.append(i)
  479. else:
  480. continue
  481. # 重新排序
  482. top2_index = []
  483. for i in range(2):
  484. if i in top_video_index:
  485. top2_index.append(i)
  486. elif i in flow_video_index:
  487. top2_index.append(i)
  488. flow_video_index.remove(i)
  489. elif i in horizontal_video_index:
  490. top2_index.append(i)
  491. horizontal_video_index.remove(i)
  492. elif len(horizontal_video_index) > 0:
  493. # 调整横屏视频到第一位
  494. top2_index.append(horizontal_video_index[0])
  495. # 从横屏位置记录中移除
  496. horizontal_video_index.pop(0)
  497. elif i == 0:
  498. return videos
  499. # 重排
  500. flow_result = [videos[i] for i in flow_video_index]
  501. other_result = [videos[i] for i in range(len(videos)) if i not in top2_index and i not in flow_video_index]
  502. top2_result = []
  503. for i, j in enumerate(top2_index):
  504. item = videos[j]
  505. if i != j:
  506. # 修改abCode
  507. item['abCode'] = config_.AB_CODE['w_h_rate']
  508. top2_result.append(item)
  509. new_rank_result = top2_result
  510. for i in range(len(top2_index), len(videos)):
  511. if i in flow_video_index:
  512. new_rank_result.append(flow_result[0])
  513. flow_result.pop(0)
  514. else:
  515. new_rank_result.append(other_result[0])
  516. other_result.pop(0)
  517. return new_rank_result
  518. def video_rank_with_old_video(rank_result, old_video_recall, size, top_K, old_video_index=2):
  519. """
  520. 视频分发排序 - 包含老视频, 老视频插入固定位置
  521. :param rank_result: 排序后的结果
  522. :param size: 请求数
  523. :param old_video_index: 老视频插入的位置索引,默认为2
  524. :return: new_rank_result
  525. """
  526. if not old_video_recall:
  527. return rank_result
  528. if not rank_result:
  529. return old_video_recall[:size]
  530. # 视频去重
  531. rank_video_ids = [item['videoId'] for item in rank_result]
  532. old_video_remove = []
  533. for old_video in old_video_recall:
  534. if old_video['videoId'] in rank_video_ids:
  535. old_video_remove.append(old_video)
  536. for item in old_video_remove:
  537. old_video_recall.remove(item)
  538. if not old_video_recall:
  539. return rank_result
  540. # 插入老视频
  541. # 随机获取一个视频
  542. ind = random.randint(0, len(old_video_recall) - 1)
  543. old_video = old_video_recall[ind]
  544. # 插入
  545. if len(rank_result) < top_K:
  546. new_rank_result = rank_result + [old_video]
  547. else:
  548. new_rank_result = rank_result[:old_video_index] + [old_video] + rank_result[old_video_index:]
  549. if len(new_rank_result) > size:
  550. # 判断后两位视频来源
  551. push_from_1 = new_rank_result[-1]['pushFrom']
  552. push_from_2 = new_rank_result[-2]['pushFrom']
  553. if push_from_2 == config_.PUSH_FROM['rov_recall'] and push_from_1 == config_.PUSH_FROM['flow_recall']:
  554. new_rank_result = new_rank_result[:-2] + new_rank_result[-1:]
  555. return new_rank_result[:size]
  556. def video_new_rank2(data, size, top_K, flow_pool_P, ab_code, mid, exp_config=None, env_dict=None):
  557. """
  558. 视频分发排序
  559. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  560. :param size: 请求数
  561. :param top_K: 保证topK为召回池视频 type-int
  562. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  563. :return: rank_result
  564. """
  565. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  566. return [], 0
  567. #全量的是vlog,票圈精选, 334,60057,
  568. # 60054: simrecall,
  569. pre_str = "k_p2:"
  570. rov_recall_rank = data['rov_pool_recall']
  571. #print(rov_recall_rank)
  572. #call rank service
  573. #flag_call_service = 0
  574. sort_index = 0
  575. if exp_config and "sort_flag" in exp_config:
  576. sort_index = exp_config["sort_flag"]
  577. #print("sort_index:", sort_index)
  578. redisObj = RedisHelper()
  579. vidKeys = []
  580. rec_recall_item_list = []
  581. rec_recall_vid_list = []
  582. day_vidKeys = []
  583. hour_vidKeys = []
  584. pre_day_str = "v_ctr:"
  585. pre_hour_str = "v_hour_ctr:"
  586. for recall_item in data['rov_pool_recall']:
  587. try:
  588. vid = int(recall_item.get("videoId", 0))
  589. rec_recall_vid_list.append(vid)
  590. rec_recall_item_list.append(recall_item)
  591. vidKeys.append(pre_str + str(vid))
  592. day_vidKeys.append(pre_day_str+str(vid))
  593. hour_vidKeys.append(pre_hour_str+str(vid))
  594. except:
  595. continue
  596. video_scores = redisObj.get_batch_key(vidKeys)
  597. #print("video_scores:", video_scores)
  598. if (ab_code == 60066 or ab_code == 60069 or ab_code == 60070 or ab_code == 60071) and len(rec_recall_vid_list)>0:
  599. video_static_info = redisObj.get_batch_key(day_vidKeys)
  600. video_hour_static_info = redisObj.get_batch_key(hour_vidKeys)
  601. #print("env_dict:", env_dict)
  602. feature_dict = get_featurs(mid, data, size, top_K, flow_pool_P, rec_recall_vid_list,env_dict, video_static_info, video_hour_static_info)
  603. score_result = get_tf_serving_sores(feature_dict)
  604. #print("score_result:", score_result)
  605. if video_scores and len(video_scores)>0 and rec_recall_item_list and score_result and len(score_result) > 0\
  606. and len(score_result) == len(rec_recall_item_list) and len(video_scores)== len(score_result):
  607. for i in range(len(score_result)):
  608. try:
  609. if video_scores[i] is None and len(score_result[i])>0:
  610. return_score = 0.000000001
  611. # sore_index :10 = model score
  612. if sort_index == 10:
  613. total_score = score_result[i][0]
  614. else:
  615. total_score = return_score * score_result[i][0]
  616. rec_recall_item_list[i]['sort_score'] = total_score
  617. rec_recall_item_list[i]['base_rov_score'] = 0.0
  618. rec_recall_item_list[i]['share_score'] = return_score
  619. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  620. else:
  621. video_score_str = json.loads(video_scores[i])
  622. # sore_index :10 = model score
  623. return_score = 0.000000001
  624. if sort_index == 10:
  625. total_score = score_result[i][0]
  626. else:
  627. if len(video_score_str)>= sort_index and len(video_score_str)>0:
  628. return_score = video_score_str[sort_index]
  629. total_score = return_score * score_result[i][0]
  630. #print("total_score:", total_score, " model score :", score_result[i][0], "return_score:",
  631. # return_score)
  632. rec_recall_item_list[i]['sort_score'] = total_score
  633. rec_recall_item_list[i]['base_rov_score'] = video_score_str[0]
  634. rec_recall_item_list[i]['share_score'] = return_score
  635. rec_recall_item_list[i]['model_score'] = score_result[i][0]
  636. except Exception as e:
  637. #print('exception: {}:', e)
  638. return_score = 0.000000001
  639. if sort_index == 10:
  640. total_score = 0.00000001
  641. else:
  642. total_score = return_score * 0.00000001
  643. rec_recall_item_list[i]['sort_score'] = total_score
  644. rec_recall_item_list[i]['base_rov_score'] = 0
  645. rec_recall_item_list[i]['share_score'] = return_score
  646. rec_recall_item_list[i]['model_score'] = 0.00000001
  647. rec_recall_item_list[i]['flag_call_service'] = 1
  648. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  649. else:
  650. rov_recall_rank = sup_rank(video_scores, rec_recall_item_list)
  651. else:
  652. if video_scores and len(rec_recall_item_list) > 0 and len(video_scores)>0:
  653. for i in range(len(video_scores)):
  654. try:
  655. if video_scores[i] is None:
  656. rec_recall_item_list[i]['sort_score'] = 0.0
  657. else:
  658. video_score_str = json.loads(video_scores[i])
  659. # print("video_score_str:", video_score_str)
  660. rec_recall_item_list[i]['sort_score'] = video_score_str[0]
  661. except Exception:
  662. rec_recall_item_list[i]['sort_score'] = 0.0
  663. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  664. #print(rov_recall_rank)
  665. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  666. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  667. top_K=top_K)
  668. rank_result = []
  669. rank_set = set('')
  670. # 从ROV召回池中获取top k
  671. if len(rov_recall_rank) > 0:
  672. rank_result.extend(rov_recall_rank[:top_K])
  673. rov_recall_rank = rov_recall_rank[top_K:]
  674. else:
  675. rank_result.extend(flow_recall_rank[:top_K])
  676. flow_recall_rank = flow_recall_rank[top_K:]
  677. # 按概率 p 及score排序获取 size - k 个视频
  678. flow_num = 0
  679. flowConfig = 0
  680. # 本段代码控制流量池,通过实验传参,现不动
  681. if flowConfig == 1 and len(rov_recall_rank) > 0:
  682. for recall_item in rank_result:
  683. flow_recall_name = recall_item.get("flowPool", '')
  684. flow_num = flow_num + 1
  685. all_recall_rank = rov_recall_rank + flow_recall_rank
  686. if flow_num > 0:
  687. rank_result.extend(all_recall_rank[:size - top_K])
  688. return rank_result, flow_num
  689. else:
  690. i = 0
  691. while i < size - top_K:
  692. # 随机生成[0, 1)浮点数
  693. rand = random.random()
  694. # log_.info('rand: {}'.format(rand))
  695. if rand < flow_pool_P:
  696. if flow_recall_rank:
  697. rank_result.append(flow_recall_rank[0])
  698. flow_recall_rank.remove(flow_recall_rank[0])
  699. else:
  700. rank_result.extend(rov_recall_rank[:size - top_K - i])
  701. return rank_result[:size], flow_num
  702. else:
  703. if rov_recall_rank:
  704. rank_result.append(rov_recall_rank[0])
  705. rov_recall_rank.remove(rov_recall_rank[0])
  706. else:
  707. rank_result.extend(flow_recall_rank[:size - top_K - i])
  708. return rank_result[:size], flow_num
  709. i += 1
  710. else:
  711. i = 0
  712. while i < size - top_K:
  713. # 随机生成[0, 1)浮点数
  714. rand = random.random()
  715. # log_.info('rand: {}'.format(rand))
  716. if rand < flow_pool_P:
  717. if flow_recall_rank:
  718. rank_result.append(flow_recall_rank[0])
  719. flow_recall_rank.remove(flow_recall_rank[0])
  720. else:
  721. rank_result.extend(rov_recall_rank[:size - top_K - i])
  722. return rank_result[:size], flow_num
  723. else:
  724. if rov_recall_rank:
  725. rank_result.append(rov_recall_rank[0])
  726. rov_recall_rank.remove(rov_recall_rank[0])
  727. else:
  728. rank_result.extend(flow_recall_rank[:size - top_K - i])
  729. return rank_result[:size], flow_num
  730. i += 1
  731. return rank_result[:size], flow_num
  732. def video_new_rank3(data, size, top_K, flow_pool_P, rank_key_prefix='rank:score1:', flow_pool_recall_process=None):
  733. """
  734. 视频分发排序
  735. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  736. :param size: 请求数
  737. :param top_K: 保证topK为召回池视频 type-int
  738. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  739. :param rank_key_prefix:
  740. :return: rank_result
  741. """
  742. redis_helper = RedisHelper()
  743. if not data['rov_pool_recall'] and not data['flow_pool_recall']:
  744. return [], 0, flow_pool_recall_process
  745. rov_recall_rank = data['rov_pool_recall']
  746. vid_keys = []
  747. rec_recall_item_list = []
  748. rec_recall_vid_list = []
  749. for recall_item in data['rov_pool_recall']:
  750. try:
  751. vid = int(recall_item.get("videoId", 0))
  752. rec_recall_vid_list.append(vid)
  753. rec_recall_item_list.append(recall_item)
  754. vid_keys.append(f"{rank_key_prefix}{vid}")
  755. except:
  756. continue
  757. video_scores = redis_helper.get_batch_key(vid_keys)
  758. if video_scores and len(rec_recall_item_list) > 0 and len(rec_recall_item_list) == len(video_scores):
  759. for i in range(len(video_scores)):
  760. try:
  761. if video_scores[i] is None:
  762. rec_recall_item_list[i]['sort_score'] = 0.0
  763. else:
  764. rec_recall_item_list[i]['sort_score'] = float(video_scores[i])
  765. except Exception:
  766. rec_recall_item_list[i]['sort_score'] = 0.0
  767. rov_recall_rank = sorted(rec_recall_item_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  768. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  769. rov_recall_rank, flow_recall_rank = remove_duplicate(
  770. rov_recall=rov_recall_rank, flow_recall=flow_recall_rank, top_K=top_K
  771. )
  772. rank_result = []
  773. # add_flow_pool_recall_log
  774. if flow_pool_recall_process is None:
  775. flow_pool_recall_process = {}
  776. flow_pool_recall_process['recall_duplicate_res'] = {'rov_recall_rank': rov_recall_rank, 'flow_recall_rank': flow_recall_rank}
  777. # 从ROV召回池中获取top k
  778. if len(rov_recall_rank) > 0:
  779. rank_result.extend(rov_recall_rank[:top_K])
  780. rov_recall_rank = rov_recall_rank[top_K:]
  781. else:
  782. rank_result.extend(flow_recall_rank[:top_K])
  783. flow_recall_rank = flow_recall_rank[top_K:]
  784. # 按概率 p 及score排序获取 size - k 个视频
  785. flow_num = 0
  786. i = 0
  787. while i < size - top_K:
  788. # 随机生成[0, 1)浮点数
  789. rand = random.random()
  790. # add_flow_pool_recall_log
  791. flow_pool_recall_process['flow_pool_P'] = flow_pool_P
  792. flow_pool_recall_process[f'{i}_rand'] = rand
  793. # log_.info('rand: {}'.format(rand))
  794. if rand < flow_pool_P:
  795. if flow_recall_rank:
  796. rank_result.append(flow_recall_rank[0])
  797. flow_recall_rank.remove(flow_recall_rank[0])
  798. else:
  799. rank_result.extend(rov_recall_rank[:size - top_K - i])
  800. return rank_result[:size], flow_num, flow_pool_recall_process
  801. else:
  802. if rov_recall_rank:
  803. rank_result.append(rov_recall_rank[0])
  804. rov_recall_rank.remove(rov_recall_rank[0])
  805. else:
  806. rank_result.extend(flow_recall_rank[:size - top_K - i])
  807. return rank_result[:size], flow_num, flow_pool_recall_process
  808. i += 1
  809. return rank_result[:size], flow_num, flow_pool_recall_process
  810. # 排序服务兜底
  811. def sup_rank(video_scores, recall_list):
  812. if video_scores and len(recall_list) > 0:
  813. for i in range(len(video_scores)):
  814. try:
  815. if video_scores[i] is None:
  816. recall_list[i]['sort_score'] = 0.0
  817. else:
  818. video_score_str = json.loads(video_scores[i])
  819. recall_list[i]['flag_call_service'] = 0
  820. recall_list[i]['sort_score'] = video_score_str[0]
  821. except Exception:
  822. recall_list[i]['sort_score'] = 0.0
  823. rov_recall_rank = sorted(recall_list, key=lambda k: k.get('sort_score', 0), reverse=True)
  824. #print("rov_recall_rank:", rov_recall_rank)
  825. else:
  826. rov_recall_rank = recall_list
  827. return rov_recall_rank
  828. def video_sanke_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  829. """
  830. 视频分发排序
  831. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  832. :param size: 请求数
  833. :param top_K: 保证topK为召回池视频 type-int
  834. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  835. :return: rank_result
  836. """
  837. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  838. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  839. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  840. return [], 0
  841. # 地域分组小时级规则更新数据
  842. recall_dict = {}
  843. region_h_recall = [item for item in data['rov_pool_recall']
  844. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  845. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  846. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  847. # 地域分组小时级更新24h规则更新数据
  848. region_24h_recall = [item for item in data['rov_pool_recall']
  849. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  850. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  851. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  852. # 相对24h规则更新数据
  853. rule_24h_recall = [item for item in data['rov_pool_recall']
  854. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  855. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  856. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  857. # 相对24h规则筛选后剩余更新数据
  858. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  859. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  860. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  861. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  862. hot_recall = []
  863. w2v_recall =[]
  864. sim_recall = []
  865. u2u2i_recall = []
  866. if ab_Code==60058:
  867. if len(data['u2i_recall'])>0:
  868. hot_recall = sorted(data['u2i_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  869. recall_dict['u2i_recall'] = hot_recall
  870. elif ab_Code==60059:
  871. if len(data['w2v_recall'])>0:
  872. recall_dict['w2v_recall'] = data['w2v_recall']
  873. else:
  874. recall_dict['w2v_recall'] = w2v_recall
  875. elif ab_Code==60061 or ab_Code==60063:
  876. if len(data['sim_recall'])>0:
  877. recall_dict['sim_recall'] = data['sim_recall']
  878. else:
  879. recall_dict['sim_recall'] = sim_recall
  880. elif ab_Code==60062:
  881. if len(data['u2u2i_recall'])>0:
  882. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  883. else:
  884. recall_dict['u2u2i_recall'] = u2u2i_recall
  885. recall_list = [('rov_recall_region_h',1, 1),('rov_recall_region_h',0.5, 1),('rov_recall_region_24h',1,1),
  886. ('u2i_recall',0.5,1), ('w2v_recall',0.5,1),('rov_recall_24h',1,1), ('rov_recall_24h_dup',0.5,1)]
  887. if exp_config and exp_config['recall_list']:
  888. recall_list = exp_config['recall_list']
  889. #print("recall_config:", recall_list)
  890. rov_recall_rank = []
  891. select_ids = set('')
  892. for i in range(3):
  893. if len(rov_recall_rank)>8:
  894. break
  895. for per_recall_item in recall_list:
  896. per_recall_name = per_recall_item[0]
  897. per_recall_freq = per_recall_item[1]
  898. per_limt_num = per_recall_item[2]
  899. rand_num = random.random()
  900. #print(recall_dict[per_recall_name])
  901. if rand_num<per_recall_freq and per_recall_name in recall_dict:
  902. per_recall = recall_dict[per_recall_name]
  903. #print("per_recall_item:", per_recall_item)
  904. cur_recall_num = 0
  905. for recall_item in per_recall:
  906. vid = recall_item['videoId']
  907. if vid in select_ids:
  908. continue
  909. rov_recall_rank.append(recall_item)
  910. select_ids.add(vid)
  911. cur_recall_num+=1
  912. if cur_recall_num>=per_limt_num:
  913. break
  914. # print("rov_recall_rank:")
  915. # print(rov_recall_rank)
  916. #rov_recall_rank = region_h_recall_rank + region_24h_recall_rank + \
  917. # rule_24h_recall_rank + rule_24h_dup_recall_rank
  918. # 流量池
  919. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  920. # 对各路召回的视频进行去重
  921. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  922. top_K=top_K)
  923. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  924. # rov_recall_rank, flow_recall_rank))
  925. # rank_result = relevant_recall_rank
  926. rank_result = []
  927. # 从ROV召回池中获取top k
  928. if len(rov_recall_rank) > 0:
  929. rank_result.extend(rov_recall_rank[:top_K])
  930. rov_recall_rank = rov_recall_rank[top_K:]
  931. else:
  932. rank_result.extend(flow_recall_rank[:top_K])
  933. flow_recall_rank = flow_recall_rank[top_K:]
  934. flow_num = 0
  935. flowConfig =0
  936. if exp_config and exp_config['flowConfig']:
  937. flowConfig = exp_config['flowConfig']
  938. if flowConfig == 1 and len(rov_recall_rank) > 0:
  939. rank_result.extend(rov_recall_rank[:top_K])
  940. for recall_item in rank_result:
  941. flow_recall_name = recall_item.get("flowPool", '')
  942. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  943. flow_num = flow_num + 1
  944. all_recall_rank = rov_recall_rank + flow_recall_rank
  945. if flow_num > 0:
  946. rank_result.extend(all_recall_rank[:size - top_K])
  947. return rank_result[:size], flow_num
  948. else:
  949. # 按概率 p 及score排序获取 size - k 个视频
  950. i = 0
  951. while i < size - top_K:
  952. # 随机生成[0, 1)浮点数
  953. rand = random.random()
  954. # log_.info('rand: {}'.format(rand))
  955. if rand < flow_pool_P:
  956. if flow_recall_rank:
  957. rank_result.append(flow_recall_rank[0])
  958. flow_recall_rank.remove(flow_recall_rank[0])
  959. else:
  960. rank_result.extend(rov_recall_rank[:size - top_K - i])
  961. return rank_result[:size], flow_num
  962. else:
  963. if rov_recall_rank:
  964. rank_result.append(rov_recall_rank[0])
  965. rov_recall_rank.remove(rov_recall_rank[0])
  966. else:
  967. rank_result.extend(flow_recall_rank[:size - top_K - i])
  968. return rank_result[:size], flow_num
  969. i += 1
  970. else:
  971. # 按概率 p 及score排序获取 size - k 个视频
  972. i = 0
  973. while i < size - top_K:
  974. # 随机生成[0, 1)浮点数
  975. rand = random.random()
  976. # log_.info('rand: {}'.format(rand))
  977. if rand < flow_pool_P:
  978. if flow_recall_rank:
  979. rank_result.append(flow_recall_rank[0])
  980. flow_recall_rank.remove(flow_recall_rank[0])
  981. else:
  982. rank_result.extend(rov_recall_rank[:size - top_K - i])
  983. return rank_result[:size], flow_num
  984. else:
  985. if rov_recall_rank:
  986. rank_result.append(rov_recall_rank[0])
  987. rov_recall_rank.remove(rov_recall_rank[0])
  988. else:
  989. rank_result.extend(flow_recall_rank[:size - top_K - i])
  990. return rank_result[:size],flow_num
  991. i += 1
  992. return rank_result[:size], flow_num
  993. def video_sank_pos_rank(data, size, top_K, flow_pool_P, ab_Code='', exp_config=None):
  994. """
  995. 视频分发排序
  996. :param data: 各路召回的视频 type-dict {'rov_pool_recall': [], 'flow_pool_recall': []}
  997. :param size: 请求数
  998. :param top_K: 保证topK为召回池视频 type-int
  999. :param flow_pool_P: size-top_K视频为流量池视频的概率 type-float
  1000. :return: rank_result
  1001. """
  1002. if not data['rov_pool_recall'] and not data['flow_pool_recall'] \
  1003. and len(data['u2i_recall'])==0 and len(data['w2v_recall'])==0 \
  1004. and len(data['sim_recall']) == 0 and len(data['u2u2i_recall']) == 0 :
  1005. return [], 0
  1006. # 地域分组小时级规则更新数据
  1007. recall_dict = {}
  1008. region_h_recall = [item for item in data['rov_pool_recall']
  1009. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_h']]
  1010. region_h_recall_rank = sorted(region_h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1011. recall_dict['rov_recall_region_h'] = region_h_recall_rank
  1012. # 地域分组小时级更新24h规则更新数据
  1013. region_24h_recall = [item for item in data['rov_pool_recall']
  1014. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_region_24h']]
  1015. region_24h_recall_rank = sorted(region_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1016. recall_dict['rov_recall_region_24h'] = region_24h_recall_rank
  1017. # 相对24h规则更新数据
  1018. rule_24h_recall = [item for item in data['rov_pool_recall']
  1019. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h']]
  1020. rule_24h_recall_rank = sorted(rule_24h_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1021. recall_dict['rov_recall_24h'] = rule_24h_recall_rank
  1022. # 相对24h规则筛选后剩余更新数据
  1023. rule_24h_dup_recall = [item for item in data['rov_pool_recall']
  1024. if item.get('pushFrom') == config_.PUSH_FROM['rov_recall_24h_dup']]
  1025. rule_24h_dup_recall_rank = sorted(rule_24h_dup_recall, key=lambda k: k.get('rovScore', 0), reverse=True)
  1026. recall_dict['rov_recall_24h_dup'] = rule_24h_dup_recall_rank
  1027. u2i_recall = []
  1028. u2i_play_recall = []
  1029. w2v_recall =[]
  1030. sim_recall = []
  1031. u2u2i_recall = []
  1032. return_video_recall = []
  1033. #print("")
  1034. if ab_Code==60058:
  1035. if len(data['u2i_recall'])>0:
  1036. recall_dict['u2i_recall'] = data['u2i_recall']
  1037. else:
  1038. recall_dict['u2i_recall'] = u2i_recall
  1039. if len(data['u2i_play_recall']) > 0:
  1040. recall_dict['u2i_play_recall'] = data['u2i_play_recall']
  1041. else:
  1042. recall_dict['u2i_play_recall'] = u2i_play_recall
  1043. elif ab_Code==60059:
  1044. if len(data['w2v_recall'])>0:
  1045. recall_dict['w2v_recall'] = data['w2v_recall']
  1046. else:
  1047. recall_dict['w2v_recall'] = w2v_recall
  1048. elif ab_Code==60061 or ab_Code==60063:
  1049. if len(data['sim_recall'])>0:
  1050. recall_dict['sim_recall'] = data['sim_recall']
  1051. else:
  1052. recall_dict['sim_recall'] = sim_recall
  1053. elif ab_Code==60062:
  1054. if len(data['u2u2i_recall'])>0:
  1055. recall_dict['u2u2i_recall'] = data['u2u2i_recall']
  1056. else:
  1057. recall_dict['u2u2i_recall'] = u2u2i_recall
  1058. elif ab_Code==60064:
  1059. if len(data['return_video_recall'])>0:
  1060. recall_dict['return_video_recall'] = data['return_video_recall']
  1061. else:
  1062. recall_dict['return_video_recall'] = return_video_recall
  1063. recall_pos1 = [('rov_recall_region_h',0, 0.98),('rov_recall_24h',0.98, 1),('rov_recall_region_24h',0,1),
  1064. ('rov_recall_24h',0,1), ('rov_recall_24h_dup',0,1)]
  1065. recall_pos2 = [('rov_recall_region_h',0,0.98),('rov_recall_24h',0.98,1),('rov_recall_region_24h',0,1),
  1066. ('rov_recall_24h',0,1),('rov_recall_24h_dup',0,1)]
  1067. recall_pos3 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0.98,1), ('rov_recall_region_24h', 0,1),
  1068. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1069. recall_pos4 = [('rov_recall_region_h', 0,0.98), ('rov_recall_24h', 0,0.02), ('rov_recall_region_24h', 0,1),
  1070. ('rov_recall_24h', 0,1), ('rov_recall_24h_dup', 0,1)]
  1071. if exp_config and 'recall_pos1' in exp_config \
  1072. and 'recall_pos2' in exp_config \
  1073. and 'recall_pos3' in exp_config \
  1074. and 'recall_pos4' in exp_config :
  1075. recall_pos1 = exp_config['recall_pos1']
  1076. recall_pos2 = exp_config['recall_pos2']
  1077. recall_pos3 = exp_config['recall_pos3']
  1078. recall_pos4 = exp_config['recall_pos4']
  1079. #print("recall_config:", recall_pos1)
  1080. rov_recall_rank = []
  1081. recall_list = []
  1082. recall_list.append(recall_pos1)
  1083. recall_list.append(recall_pos2)
  1084. recall_list.append(recall_pos3)
  1085. recall_list.append(recall_pos4)
  1086. select_ids = set('')
  1087. recall_num_limit_dict = {}
  1088. if exp_config and 'recall_num_limit' in exp_config:
  1089. recall_num_limit_dict = exp_config['recall_num_limit']
  1090. exp_recall_dict = {}
  1091. #index_pos = 0
  1092. for j in range(3):
  1093. if len(rov_recall_rank)>12:
  1094. break
  1095. # choose pos
  1096. for recall_pos_config in recall_list:
  1097. rand_num = random.random()
  1098. index_pos = 0
  1099. # choose pos recall
  1100. for per_recall_item in recall_pos_config:
  1101. if index_pos == 1:
  1102. break
  1103. if len(per_recall_item)<3:
  1104. continue
  1105. per_recall_name = per_recall_item[0]
  1106. per_recall_min = float(per_recall_item[1])
  1107. per_recall_max = float(per_recall_item[2])
  1108. per_recall_num = exp_recall_dict.get(per_recall_name, 0)
  1109. per_recall_total_num = recall_num_limit_dict.get(per_recall_name, 0)
  1110. # recall set total num
  1111. if len(recall_num_limit_dict)>0 and per_recall_total_num>0 and per_recall_num>= per_recall_total_num:
  1112. continue
  1113. if rand_num >= per_recall_min and rand_num < per_recall_max and per_recall_name in recall_dict:
  1114. per_recall = recall_dict[per_recall_name]
  1115. for recall_item in per_recall:
  1116. vid = recall_item['videoId']
  1117. if vid in select_ids:
  1118. continue
  1119. recall_item['rand'] = rand_num
  1120. rov_recall_rank.append(recall_item)
  1121. select_ids.add(vid)
  1122. if per_recall_name in exp_recall_dict:
  1123. exp_recall_dict[per_recall_name] +=1
  1124. else:
  1125. exp_recall_dict[per_recall_name] = 1
  1126. index_pos = 1
  1127. break
  1128. #print("rov_recall_rank:", rov_recall_rank)
  1129. if len(rov_recall_rank)<4:
  1130. rov_doudi_rank = region_h_recall_rank + sim_recall + u2i_recall + u2u2i_recall + w2v_recall +return_video_recall+u2i_play_recall+ region_24h_recall_rank + rule_24h_recall_rank + rule_24h_dup_recall_rank
  1131. for recall_item in rov_doudi_rank:
  1132. vid = recall_item['videoId']
  1133. if vid in select_ids:
  1134. continue
  1135. rov_recall_rank.append(recall_item)
  1136. select_ids.add(vid)
  1137. if len(rov_recall_rank)>12:
  1138. break
  1139. # print("rov_recall_rank:")
  1140. #print(rov_recall_rank)
  1141. # 流量池
  1142. flow_recall_rank = sorted(data['flow_pool_recall'], key=lambda k: k.get('rovScore', 0), reverse=True)
  1143. # 对各路召回的视频进行去重
  1144. rov_recall_rank, flow_recall_rank = remove_duplicate(rov_recall=rov_recall_rank, flow_recall=flow_recall_rank,
  1145. top_K=top_K)
  1146. # log_.info('remove_duplicate finished! rov_recall_rank = {}, flow_recall_rank = {}'.format(
  1147. # rov_recall_rank, flow_recall_rank))
  1148. # rank_result = relevant_recall_rank
  1149. rank_result = []
  1150. # 从ROV召回池中获取top k
  1151. if len(rov_recall_rank) > 0:
  1152. rank_result.extend(rov_recall_rank[:top_K])
  1153. rov_recall_rank = rov_recall_rank[top_K:]
  1154. else:
  1155. rank_result.extend(flow_recall_rank[:top_K])
  1156. flow_recall_rank = flow_recall_rank[top_K:]
  1157. flow_num = 0
  1158. flowConfig =0
  1159. if exp_config and exp_config['flowConfig']:
  1160. flowConfig = exp_config['flowConfig']
  1161. if flowConfig == 1 and len(rov_recall_rank) > 0:
  1162. rank_result.extend(rov_recall_rank[:top_K])
  1163. for recall_item in rank_result:
  1164. flow_recall_name = recall_item.get("flowPool", '')
  1165. if flow_recall_name is not None and flow_recall_name.find("#") > -1:
  1166. flow_num = flow_num + 1
  1167. all_recall_rank = rov_recall_rank + flow_recall_rank
  1168. if flow_num > 0:
  1169. rank_result.extend(all_recall_rank[:size - top_K])
  1170. return rank_result[:size], flow_num
  1171. else:
  1172. # 按概率 p 及score排序获取 size - k 个视频
  1173. i = 0
  1174. while i < size - top_K:
  1175. # 随机生成[0, 1)浮点数
  1176. rand = random.random()
  1177. # log_.info('rand: {}'.format(rand))
  1178. if rand < flow_pool_P:
  1179. if flow_recall_rank:
  1180. rank_result.append(flow_recall_rank[0])
  1181. flow_recall_rank.remove(flow_recall_rank[0])
  1182. else:
  1183. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1184. return rank_result[:size], flow_num
  1185. else:
  1186. if rov_recall_rank:
  1187. rank_result.append(rov_recall_rank[0])
  1188. rov_recall_rank.remove(rov_recall_rank[0])
  1189. else:
  1190. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1191. return rank_result[:size], flow_num
  1192. i += 1
  1193. else:
  1194. # 按概率 p 及score排序获取 size - k 个视频
  1195. i = 0
  1196. while i < size - top_K:
  1197. # 随机生成[0, 1)浮点数
  1198. rand = random.random()
  1199. # log_.info('rand: {}'.format(rand))
  1200. if rand < flow_pool_P:
  1201. if flow_recall_rank:
  1202. rank_result.append(flow_recall_rank[0])
  1203. flow_recall_rank.remove(flow_recall_rank[0])
  1204. else:
  1205. rank_result.extend(rov_recall_rank[:size - top_K - i])
  1206. return rank_result[:size], flow_num
  1207. else:
  1208. if rov_recall_rank:
  1209. rank_result.append(rov_recall_rank[0])
  1210. rov_recall_rank.remove(rov_recall_rank[0])
  1211. else:
  1212. rank_result.extend(flow_recall_rank[:size - top_K - i])
  1213. return rank_result[:size],flow_num
  1214. i += 1
  1215. return rank_result[:size], flow_num
  1216. if __name__ == '__main__':
  1217. d_test = [{'videoId': 10028734, 'rovScore': 99.977, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1218. {'videoId': 1919925, 'rovScore': 99.974, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1219. {'videoId': 9968118, 'rovScore': 99.972, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1220. {'videoId': 9934863, 'rovScore': 99.971, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1221. {'videoId': 10219869, 'flowPool': '1#1#1#1640830818883', 'rovScore': 82.21929728934731, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1222. {'videoId': 10212814, 'flowPool': '1#1#1#1640759014984', 'rovScore': 81.26694187726412, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1223. {'videoId': 10219437, 'flowPool': '1#1#1#1640827620520', 'rovScore': 81.21634156641908, 'pushFrom': 'flow_pool', 'abCode': 10000},
  1224. {'videoId': 1994050, 'rovScore': 99.97, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1225. {'videoId': 9894474, 'rovScore': 99.969, 'pushFrom': 'recall_pool', 'abCode': 10000},
  1226. {'videoId': 10028081, 'rovScore': 99.966, 'pushFrom': 'recall_pool', 'abCode': 10000}]
  1227. res = video_rank_by_w_h_rate(videos=d_test)
  1228. for tmp in res:
  1229. print(tmp)