recommend.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import json
  2. import time
  3. import multiprocessing
  4. import traceback
  5. from datetime import datetime
  6. import config
  7. from log import Log
  8. from config import set_config
  9. from video_recall import PoolRecall
  10. from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate
  11. from db_helper import RedisHelper
  12. import gevent
  13. from utils import FilterVideos
  14. import ast
  15. log_ = Log()
  16. config_ = set_config()
  17. def relevant_video_top_recommend(app_type, mid, uid, head_vid, videos, size):
  18. """
  19. 相关推荐强插 运营给定置顶相关性视频
  20. :param app_type: 产品标识 type-int
  21. :param mid: mid
  22. :param uid: uid
  23. :param head_vid: 相关推荐头部视频id type-int
  24. :param videos: 当前相关推荐结果 type-list
  25. :param size: 返回视频个数 type-int
  26. :return: rank_result
  27. """
  28. # 获取头部视频对应的相关性视频
  29. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  30. redis_helper = RedisHelper()
  31. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  32. if relevant_videos is None:
  33. # 该视频没有指定的相关性视频
  34. return videos
  35. relevant_videos = json.loads(relevant_videos)
  36. # 按照指定顺序排序
  37. relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False)
  38. # 过滤
  39. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted]
  40. filter_helper = FilterVideos(app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid)
  41. filtered_ids = filter_helper.filter_videos()
  42. if filtered_ids is None:
  43. return videos
  44. # 获取生效中的视频
  45. now = int(time.time())
  46. relevant_videos_in_effect = [
  47. {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'],
  48. 'abCode': config_.AB_CODE['relevant_video_op']}
  49. for item in relevant_videos_sorted
  50. if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids
  51. ]
  52. if len(relevant_videos_in_effect) == 0:
  53. return videos
  54. # 与现有排序结果 进行合并重排
  55. # 获取现有排序结果中流量池视频 及其位置
  56. relevant_ids = [item['videoId'] for item in relevant_videos_in_effect]
  57. flow_pool_videos = []
  58. other_videos = []
  59. for i, item in enumerate(videos):
  60. if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids:
  61. flow_pool_videos.append((i, item))
  62. elif item.get('videoId') not in relevant_ids:
  63. other_videos.append(item)
  64. else:
  65. continue
  66. # 重排,保持流量池视频位置不变
  67. rank_result = relevant_videos_in_effect + other_videos
  68. for i, item in flow_pool_videos:
  69. rank_result.insert(i, item)
  70. return rank_result[:size]
  71. def video_position_recommend(mid, uid, app_type, videos):
  72. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  73. # algo_type=algo_type, client_info=client_info)
  74. redis_helper = RedisHelper()
  75. pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME)
  76. pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME)
  77. if pos1_vids is not None:
  78. pos1_vids = ast.literal_eval(pos1_vids)
  79. if pos2_vids is not None:
  80. pos2_vids = ast.literal_eval(pos2_vids)
  81. pos1_vids = [] if pos1_vids is None else pos1_vids
  82. pos2_vids = [] if pos2_vids is None else pos2_vids
  83. pos1_vids = [int(i) for i in pos1_vids]
  84. pos2_vids = [int(i) for i in pos2_vids]
  85. filter_1 = FilterVideos(app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid)
  86. filter_2 = FilterVideos(app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid)
  87. t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)]
  88. gevent.joinall(t)
  89. filted_list = [i.get() for i in t]
  90. pos1_vids = filted_list[0]
  91. pos2_vids = filted_list[1]
  92. videos = positon_duplicate(pos1_vids, pos2_vids, videos)
  93. if pos1_vids is not None and len(pos1_vids) >0 :
  94. videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
  95. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  96. if pos2_vids is not None and len(pos2_vids) >0 :
  97. videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100,
  98. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  99. return videos[:10]
  100. def positon_duplicate(pos1_vids, pos2_vids, videos):
  101. s = set()
  102. if pos1_vids is not None and len(pos1_vids) >0:
  103. s.add(int(pos1_vids[0]))
  104. if pos2_vids is not None and len(pos2_vids) >0:
  105. s.add(int(pos2_vids[0]))
  106. l = []
  107. for item in videos:
  108. if item['videoId'] in s:
  109. continue
  110. else:
  111. l.append(item)
  112. return l
  113. def video_recommend(mid, uid, size, app_type, algo_type, client_info, expire_time=24*3600,
  114. ab_code=config_.AB_CODE['initial']):
  115. """
  116. 首页线上推荐逻辑
  117. :param mid: mid type-string
  118. :param uid: uid type-string
  119. :param size: 请求视频数量 type-int
  120. :param app_type: 产品标识 type-int
  121. :param algo_type: 算法类型 type-string
  122. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  123. :param expire_time: 末位视频记录redis过期时间
  124. :param ab_code: AB实验code
  125. :return:
  126. """
  127. # ####### 多进程召回
  128. start_recall = time.time()
  129. # log_.info('====== recall')
  130. '''
  131. cores = multiprocessing.cpu_count()
  132. pool = multiprocessing.Pool(processes=cores)
  133. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  134. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  135. pool_list = [
  136. # rov召回池
  137. pool.apply_async(pool_recall.rov_pool_recall, (size,)),
  138. # 流量池
  139. pool.apply_async(pool_recall.flow_pool_recall, (size,))
  140. ]
  141. recall_result_list = [p.get() for p in pool_list]
  142. pool.close()
  143. pool.join()
  144. '''
  145. recall_result_list = []
  146. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
  147. client_info=client_info)
  148. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  149. t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)]
  150. gevent.joinall(t)
  151. recall_result_list = [i.get() for i in t]
  152. end_recall = time.time()
  153. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  154. mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
  155. # ####### 排序
  156. start_rank = time.time()
  157. # log_.info('====== rank')
  158. data = {
  159. 'rov_pool_recall': recall_result_list[0],
  160. 'flow_pool_recall': recall_result_list[1]
  161. }
  162. rank_result = video_rank(data=data, size=size)
  163. end_rank = time.time()
  164. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  165. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  166. if not rank_result:
  167. # 兜底策略
  168. # log_.info('====== bottom strategy')
  169. start_bottom = time.time()
  170. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  171. end_bottom = time.time()
  172. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  173. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  174. return rank_result, last_rov_recall_key
  175. def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
  176. """
  177. 对排序后的结果 按照AB实验进行对应的分组操作
  178. :param rank_result: 排序后的结果
  179. :param ab_code_list: 此次请求参与的 ab实验组
  180. :param app_type: 产品标识
  181. :param mid: mid
  182. :param uid: uid
  183. :param kwargs: 其他参数
  184. :return:
  185. """
  186. # ####### 视频宽高比AB实验
  187. # 对内容精选进行 视频宽高比分发实验
  188. if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []):
  189. rank_result = video_rank_by_w_h_rate(videos=rank_result)
  190. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format(
  191. app_type, mid, uid, rank_result))
  192. # 按position位置排序
  193. if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []):
  194. rank_result = video_position_recommend(mid, uid, app_type, rank_result)
  195. print('===========================')
  196. print(rank_result)
  197. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format(
  198. app_type, mid, uid, rank_result))
  199. # 相关推荐强插
  200. if config_.AB_CODE['relevant_video_op'] in ab_code_list \
  201. and app_type in config_.AB_TEST.get('relevant_video_op', []):
  202. head_vid = kwargs['head_vid']
  203. size = kwargs['size']
  204. rank_result = relevant_video_top_recommend(
  205. app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size
  206. )
  207. log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format(
  208. app_type, mid, uid, head_vid, rank_result))
  209. return rank_result
  210. def update_redis_data(result, app_type, mid, last_rov_recall_key, expire_time=24*3600):
  211. """
  212. 根据最终的排序结果更新相关redis数据
  213. :param result: 排序结果
  214. :param app_type: 产品标识
  215. :param mid: mid
  216. :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key
  217. :param expire_time: 末位视频记录redis过期时间
  218. :return: None
  219. """
  220. # ####### redis数据刷新
  221. try:
  222. # log_.info('====== update redis')
  223. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  224. redis_helper = RedisHelper()
  225. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  226. preview_video_ids = [int(item['videoId']) for item in result]
  227. if preview_video_ids:
  228. # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
  229. redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
  230. log_.info('preview redis update success!')
  231. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  232. rov_recall_video = [item['videoId'] for item in result[:config_.K]
  233. if item['pushFrom'] == config_.PUSH_FROM['rov_recall']]
  234. if len(rov_recall_video) > 0:
  235. if app_type == config_.APP_TYPE['APP']:
  236. key_name = config_.UPDATE_ROV_KEY_NAME_APP
  237. else:
  238. key_name = config_.UPDATE_ROV_KEY_NAME
  239. if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]):
  240. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1],
  241. expire_time=expire_time)
  242. log_.info('last video redis update success!')
  243. # 将此次分发的流量池视频,对 本地分发数-1 进行记录
  244. flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
  245. if flow_recall_video:
  246. update_local_distribute_count(flow_recall_video)
  247. log_.info('update local distribute count success!')
  248. except Exception as e:
  249. log_.error("update redis data fail!")
  250. log_.error(traceback.format_exc())
  251. def update_local_distribute_count(videos):
  252. """
  253. 更新本地分发数
  254. :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
  255. 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
  256. :return:
  257. """
  258. try:
  259. redis_helper = RedisHelper()
  260. for item in videos:
  261. key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
  262. # 本地记录的分发数 - 1
  263. redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  264. # if redis_helper.key_exists(key_name=key_name):
  265. # # 该视频本地有记录,本地记录的分发数 - 1
  266. # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  267. # else:
  268. # # 该视频本地无记录,接口获取的分发数 - 1
  269. # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
  270. except Exception as e:
  271. log_.error('update_local_distribute_count error...')
  272. log_.error(traceback.format_exc())
  273. def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info):
  274. """
  275. 首页线上推荐逻辑
  276. :param mid: mid type-string
  277. :param uid: uid type-string
  278. :param size: 请求视频数量 type-int
  279. :param app_type: 产品标识 type-int
  280. :param algo_type: 算法类型 type-string
  281. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  282. :return:
  283. """
  284. # 对 vlog 切换10%的流量做实验
  285. # 对mid进行哈希
  286. print(hash(mid))
  287. print(abs(hash(mid)) % 10)
  288. if app_type in config_.AB_TEST['rank_by_h'] and abs(hash(mid)) % 10 in [0, 1, 7, 8, 4, ]:
  289. print('in')
  290. # 简单召回 - 排序 - 兜底
  291. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  292. algo_type=algo_type, client_info=client_info,
  293. expire_time=3600,
  294. ab_code=config_.AB_CODE['rank_by_h'])
  295. # ab-test
  296. result = ab_test_op(rank_result=rank_result,
  297. ab_code_list=[config_.AB_CODE['position_insert']],
  298. app_type=app_type, mid=mid, uid=uid)
  299. # redis数据刷新
  300. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
  301. expire_time=3600)
  302. else:
  303. # 简单召回 - 排序 - 兜底
  304. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  305. algo_type=algo_type, client_info=client_info)
  306. # ab-test
  307. result = ab_test_op(rank_result=rank_result,
  308. ab_code_list=[config_.AB_CODE['position_insert']],
  309. app_type=app_type, mid=mid, uid=uid)
  310. # redis数据刷新
  311. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  312. return result
  313. def video_relevant_recommend(video_id, mid, uid, size, app_type):
  314. """
  315. 相关推荐逻辑
  316. :param video_id: 相关推荐的头部视频id
  317. :param mid: mid type-string
  318. :param uid: uid type-string
  319. :param size: 请求视频数量 type-int
  320. :param app_type: 产品标识 type-int
  321. :return: videos type-list
  322. """
  323. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type='', client_info=None)
  324. # 简单召回 - 排序 - 兜底
  325. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  326. algo_type='', client_info=None)
  327. # ab-test
  328. result = ab_test_op(rank_result=rank_result,
  329. ab_code_list=[
  330. config_.AB_CODE['position_insert'],
  331. config_.AB_CODE['relevant_video_op']
  332. ],
  333. app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size)
  334. # redis数据刷新
  335. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  336. return result
  337. if __name__ == '__main__':
  338. videos = [
  339. {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000},
  340. {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000},
  341. {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000},
  342. {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000},
  343. {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000},
  344. {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486,
  345. "pushFrom": "flow_pool", "abCode": 10000},
  346. {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool",
  347. "abCode": 10000},
  348. {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000},
  349. {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575,
  350. "pushFrom": "flow_pool", "abCode": 10000},
  351. {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000}
  352. ]
  353. res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10)
  354. print(res)