recommend.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import time
  2. import multiprocessing
  3. import traceback
  4. from datetime import datetime
  5. import config
  6. from log import Log
  7. from config import set_config
  8. from video_recall import PoolRecall
  9. from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate
  10. from db_helper import RedisHelper
  11. import gevent
  12. from utils import FilterVideos
  13. import ast
  14. log_ = Log()
  15. config_ = set_config()
  16. def video_position_recommend(mid, uid, app_type, videos):
  17. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  18. # algo_type=algo_type, client_info=client_info)
  19. redis_helper = RedisHelper()
  20. pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME)
  21. pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME)
  22. if pos1_vids is not None:
  23. pos1_vids = ast.literal_eval(pos1_vids)
  24. if pos2_vids is not None:
  25. pos2_vids = ast.literal_eval(pos2_vids)
  26. pos1_vids = [] if pos1_vids is None else pos1_vids
  27. pos2_vids = [] if pos2_vids is None else pos2_vids
  28. pos1_vids = [int(i) for i in pos1_vids]
  29. pos2_vids = [int(i) for i in pos2_vids]
  30. filter_1 = FilterVideos(app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid)
  31. filter_2 = FilterVideos(app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid)
  32. t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)]
  33. gevent.joinall(t)
  34. filted_list = [i.get() for i in t]
  35. pos1_vids = filted_list[0]
  36. pos2_vids = filted_list[1]
  37. videos = positon_duplicate(pos1_vids, pos2_vids, videos)
  38. if pos1_vids is not None and len(pos1_vids) >0 :
  39. videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
  40. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  41. if pos2_vids is not None and len(pos2_vids) >0 :
  42. videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100,
  43. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  44. return videos[:10]
  45. def positon_duplicate(pos1_vids, pos2_vids, videos):
  46. s = set()
  47. if pos1_vids is not None and len(pos1_vids) >0:
  48. s.add(int(pos1_vids[0]))
  49. if pos2_vids is not None and len(pos2_vids) >0:
  50. s.add(int(pos2_vids[0]))
  51. l = []
  52. for i in range(len(videos)):
  53. if videos[i]['videoId'] in s:
  54. l.append(i)
  55. for i in l:
  56. videos.pop(i)
  57. return videos
  58. def video_recommend(mid, uid, size, app_type, algo_type, client_info):
  59. """
  60. 首页线上推荐逻辑
  61. :param mid: mid type-string
  62. :param uid: uid type-string
  63. :param size: 请求视频数量 type-int
  64. :param app_type: 产品标识 type-int
  65. :param algo_type: 算法类型 type-string
  66. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  67. :return:
  68. """
  69. ab_code = config_.AB_CODE['initial']
  70. # ####### 多进程召回
  71. start_recall = time.time()
  72. # log_.info('====== recall')
  73. '''
  74. cores = multiprocessing.cpu_count()
  75. pool = multiprocessing.Pool(processes=cores)
  76. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  77. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  78. pool_list = [
  79. # rov召回池
  80. pool.apply_async(pool_recall.rov_pool_recall, (size,)),
  81. # 流量池
  82. pool.apply_async(pool_recall.flow_pool_recall, (size,))
  83. ]
  84. recall_result_list = [p.get() for p in pool_list]
  85. pool.close()
  86. pool.join()
  87. '''
  88. recall_result_list = []
  89. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code, client_info=client_info)
  90. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  91. t = [gevent.spawn(pool_recall.rov_pool_recall, size), gevent.spawn(pool_recall.flow_pool_recall, size)]
  92. gevent.joinall(t)
  93. recall_result_list = [i.get() for i in t]
  94. end_recall = time.time()
  95. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  96. mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
  97. # ####### 排序
  98. start_rank = time.time()
  99. # log_.info('====== rank')
  100. data = {
  101. 'rov_pool_recall': recall_result_list[0],
  102. 'flow_pool_recall': recall_result_list[1]
  103. }
  104. rank_result = video_rank(data=data, size=size)
  105. end_rank = time.time()
  106. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  107. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  108. if not rank_result:
  109. # 兜底策略
  110. # log_.info('====== bottom strategy')
  111. start_bottom = time.time()
  112. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  113. end_bottom = time.time()
  114. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  115. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  116. return rank_result, last_rov_recall_key
  117. def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
  118. """
  119. 对排序后的结果 按照AB实验进行对应的分组操作
  120. :param rank_result: 排序后的结果
  121. :param ab_code_list: 此次请求参与的 ab实验组
  122. :param app_type: 产品标识
  123. :param mid: mid
  124. :param uid: uid
  125. :param kwargs: 其他参数
  126. :return:
  127. """
  128. # ####### 视频宽高比AB实验
  129. # 对内容精选进行 视频宽高比分发实验
  130. if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []):
  131. rank_result = video_rank_by_w_h_rate(videos=rank_result)
  132. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format(
  133. app_type, mid, uid, rank_result))
  134. # 按position位置排序
  135. if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []):
  136. rank_result = video_position_recommend(mid, uid, app_type, rank_result)
  137. print('===========================')
  138. print(rank_result)
  139. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format(
  140. app_type, mid, uid, rank_result))
  141. return rank_result
  142. def update_redis_data(result, app_type, mid, last_rov_recall_key):
  143. """
  144. 根据最终的排序结果更新相关redis数据
  145. :param result: 排序结果
  146. :param app_type: 产品标识
  147. :param mid: mid
  148. :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key
  149. :return: None
  150. """
  151. # ####### redis数据刷新
  152. try:
  153. # log_.info('====== update redis')
  154. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  155. redis_helper = RedisHelper()
  156. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  157. preview_video_ids = [int(item['videoId']) for item in result]
  158. if preview_video_ids:
  159. # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
  160. redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
  161. log_.info('preview redis update success!')
  162. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  163. rov_recall_video = [item['videoId'] for item in result[:config_.K]
  164. if item['pushFrom'] == config_.PUSH_FROM['rov_recall']]
  165. if len(rov_recall_video) > 0:
  166. if not redis_helper.get_score_with_value(key_name=config_.UPDATE_ROV_KEY_NAME, value=rov_recall_video[-1]):
  167. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1])
  168. log_.info('last video redis update success!')
  169. # 将此次分发的流量池视频,对 本地分发数-1 进行记录
  170. flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
  171. if flow_recall_video:
  172. update_local_distribute_count(flow_recall_video)
  173. log_.info('update local distribute count success!')
  174. except Exception as e:
  175. log_.error("update redis data fail!")
  176. log_.error(traceback.format_exc())
  177. def update_local_distribute_count(videos):
  178. """
  179. 更新本地分发数
  180. :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
  181. 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
  182. :return:
  183. """
  184. try:
  185. redis_helper = RedisHelper()
  186. for item in videos:
  187. key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
  188. # 本地记录的分发数 - 1
  189. redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  190. # if redis_helper.key_exists(key_name=key_name):
  191. # # 该视频本地有记录,本地记录的分发数 - 1
  192. # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  193. # else:
  194. # # 该视频本地无记录,接口获取的分发数 - 1
  195. # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
  196. except Exception as e:
  197. log_.error('update_local_distribute_count error...')
  198. log_.error(traceback.format_exc())
  199. def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info):
  200. """
  201. 首页线上推荐逻辑
  202. :param mid: mid type-string
  203. :param uid: uid type-string
  204. :param size: 请求视频数量 type-int
  205. :param app_type: 产品标识 type-int
  206. :param algo_type: 算法类型 type-string
  207. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  208. :return:
  209. """
  210. # 简单召回 - 排序 - 兜底
  211. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  212. algo_type=algo_type, client_info=client_info)
  213. # ab-test
  214. result = ab_test_op(rank_result=rank_result,
  215. ab_code_list=[config_.AB_CODE['w_h_rate'], config_.AB_CODE['position_insert']],
  216. app_type=app_type, mid=mid, uid=uid)
  217. # redis数据刷新
  218. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  219. return result
  220. def video_relevant_recommend(video_id, mid, uid, size, app_type):
  221. """
  222. 相关推荐逻辑
  223. :param video_id: 相关推荐的头部视频id
  224. :param mid: mid type-string
  225. :param uid: uid type-string
  226. :param size: 请求视频数量 type-int
  227. :param app_type: 产品标识 type-int
  228. :return: videos type-list
  229. """
  230. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type='', client_info=None)
  231. # 简单召回 - 排序 - 兜底
  232. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  233. algo_type='', client_info=None)
  234. # ab-test
  235. result = ab_test_op(rank_result=rank_result,
  236. ab_code_list=[config_.AB_CODE['w_h_rate'], config_.AB_CODE['position_insert']],
  237. app_type=app_type, mid=mid, uid=uid, video_id=video_id)
  238. # redis数据刷新
  239. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  240. return result
  241. if __name__ == '__main__':
  242. videos = [{'videoId': '12345', 'flowPool': '133#442#2', 'distributeCount': 10}]
  243. update_local_distribute_count(videos)