recommend.py 17 KB


  1. import json
  2. import time
  3. import multiprocessing
  4. import traceback
  5. from datetime import datetime
  6. import config
  7. from log import Log
  8. from config import set_config
  9. from video_recall import PoolRecall
  10. from video_rank import video_rank, bottom_strategy, video_rank_by_w_h_rate
  11. from db_helper import RedisHelper
  12. import gevent
  13. from utils import FilterVideos
  14. import ast
  15. log_ = Log()
  16. config_ = set_config()
  17. def relevant_video_top_recommend(app_type, mid, uid, head_vid, videos, size):
  18. """
  19. 相关推荐强插 运营给定置顶相关性视频
  20. :param app_type: 产品标识 type-int
  21. :param mid: mid
  22. :param uid: uid
  23. :param head_vid: 相关推荐头部视频id type-int
  24. :param videos: 当前相关推荐结果 type-list
  25. :param size: 返回视频个数 type-int
  26. :return: rank_result
  27. """
  28. # 获取头部视频对应的相关性视频
  29. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  30. redis_helper = RedisHelper()
  31. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  32. if relevant_videos is None:
  33. # 该视频没有指定的相关性视频
  34. return videos
  35. relevant_videos = json.loads(relevant_videos)
  36. # 按照指定顺序排序
  37. relevant_videos_sorted = sorted(relevant_videos, key=lambda x: x['order'], reverse=False)
  38. # 过滤
  39. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos_sorted]
  40. filter_helper = FilterVideos(app_type=app_type, video_ids=relevant_video_ids, mid=mid, uid=uid)
  41. filtered_ids = filter_helper.filter_videos()
  42. if filtered_ids is None:
  43. return videos
  44. # 获取生效中的视频
  45. now = int(time.time())
  46. relevant_videos_in_effect = [
  47. {'videoId': int(item['recommend_vid']), 'pushFrom': config_.PUSH_FROM['relevant_video_op'],
  48. 'abCode': config_.AB_CODE['relevant_video_op']}
  49. for item in relevant_videos_sorted
  50. if item['start_time'] < now < item['finish_time'] and int(item['recommend_vid']) in filtered_ids
  51. ]
  52. if len(relevant_videos_in_effect) == 0:
  53. return videos
  54. # 与现有排序结果 进行合并重排
  55. # 获取现有排序结果中流量池视频 及其位置
  56. relevant_ids = [item['videoId'] for item in relevant_videos_in_effect]
  57. flow_pool_videos = []
  58. other_videos = []
  59. for i, item in enumerate(videos):
  60. if item.get('pushFrom', None) == config_.PUSH_FROM['flow_recall'] and item.get('videoId') not in relevant_ids:
  61. flow_pool_videos.append((i, item))
  62. elif item.get('videoId') not in relevant_ids:
  63. other_videos.append(item)
  64. else:
  65. continue
  66. # 重排,保持流量池视频位置不变
  67. rank_result = relevant_videos_in_effect + other_videos
  68. for i, item in flow_pool_videos:
  69. rank_result.insert(i, item)
  70. return rank_result[:size]
  71. def video_position_recommend(mid, uid, app_type, videos):
  72. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  73. # algo_type=algo_type, client_info=client_info)
  74. redis_helper = RedisHelper()
  75. pos1_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION1_KEY_NAME)
  76. pos2_vids = redis_helper.get_data_from_redis(config.BaseConfig.RECALL_POSITION2_KEY_NAME)
  77. if pos1_vids is not None:
  78. pos1_vids = ast.literal_eval(pos1_vids)
  79. if pos2_vids is not None:
  80. pos2_vids = ast.literal_eval(pos2_vids)
  81. pos1_vids = [] if pos1_vids is None else pos1_vids
  82. pos2_vids = [] if pos2_vids is None else pos2_vids
  83. pos1_vids = [int(i) for i in pos1_vids]
  84. pos2_vids = [int(i) for i in pos2_vids]
  85. filter_1 = FilterVideos(app_type=app_type, video_ids=pos1_vids, mid=mid, uid=uid)
  86. filter_2 = FilterVideos(app_type=app_type, video_ids=pos2_vids, mid=mid, uid=uid)
  87. t = [gevent.spawn(filter_1.filter_videos), gevent.spawn(filter_2.filter_videos)]
  88. gevent.joinall(t)
  89. filted_list = [i.get() for i in t]
  90. pos1_vids = filted_list[0]
  91. pos2_vids = filted_list[1]
  92. videos = positon_duplicate(pos1_vids, pos2_vids, videos)
  93. if pos1_vids is not None and len(pos1_vids) >0 :
  94. videos.insert(0, {'videoId': int(pos1_vids[0]), 'rovScore': 100,
  95. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  96. if pos2_vids is not None and len(pos2_vids) >0 :
  97. videos.insert(1, {'videoId': int(pos2_vids[0]), 'rovScore': 100,
  98. 'pushFrom': config_.PUSH_FROM['position_insert'], 'abCode': config_.AB_CODE['position_insert']})
  99. return videos[:10]
  100. def positon_duplicate(pos1_vids, pos2_vids, videos):
  101. s = set()
  102. if pos1_vids is not None and len(pos1_vids) >0:
  103. s.add(int(pos1_vids[0]))
  104. if pos2_vids is not None and len(pos2_vids) >0:
  105. s.add(int(pos2_vids[0]))
  106. l = []
  107. for item in videos:
  108. if item['videoId'] in s:
  109. continue
  110. else:
  111. l.append(item)
  112. return l
  113. def video_recommend(mid, uid, size, app_type, algo_type, client_info, expire_time=24*3600,
  114. ab_code=config_.AB_CODE['initial']):
  115. """
  116. 首页线上推荐逻辑
  117. :param mid: mid type-string
  118. :param uid: uid type-string
  119. :param size: 请求视频数量 type-int
  120. :param app_type: 产品标识 type-int
  121. :param algo_type: 算法类型 type-string
  122. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  123. :param expire_time: 末位视频记录redis过期时间
  124. :param ab_code: AB实验code
  125. :return:
  126. """
  127. # ####### 多进程召回
  128. start_recall = time.time()
  129. # log_.info('====== recall')
  130. '''
  131. cores = multiprocessing.cpu_count()
  132. pool = multiprocessing.Pool(processes=cores)
  133. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code)
  134. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  135. pool_list = [
  136. # rov召回池
  137. pool.apply_async(pool_recall.rov_pool_recall, (size,)),
  138. # 流量池
  139. pool.apply_async(pool_recall.flow_pool_recall, (size,))
  140. ]
  141. recall_result_list = [p.get() for p in pool_list]
  142. pool.close()
  143. pool.join()
  144. '''
  145. recall_result_list = []
  146. pool_recall = PoolRecall(app_type=app_type, mid=mid, uid=uid, ab_code=ab_code,
  147. client_info=client_info)
  148. _, last_rov_recall_key, _ = pool_recall.get_video_last_idx()
  149. t = [gevent.spawn(pool_recall.rov_pool_recall, size, expire_time), gevent.spawn(pool_recall.flow_pool_recall, size)]
  150. gevent.joinall(t)
  151. recall_result_list = [i.get() for i in t]
  152. end_recall = time.time()
  153. log_.info('mid: {}, uid: {}, recall: {}, execute time = {}ms'.format(
  154. mid, uid, recall_result_list, (end_recall - start_recall) * 1000))
  155. # ####### 排序
  156. start_rank = time.time()
  157. # log_.info('====== rank')
  158. data = {
  159. 'rov_pool_recall': recall_result_list[0],
  160. 'flow_pool_recall': recall_result_list[1]
  161. }
  162. rank_result = video_rank(data=data, size=size)
  163. end_rank = time.time()
  164. log_.info('mid: {}, uid: {}, rank_result: {}, execute time = {}ms'.format(
  165. mid, uid, rank_result, (end_rank - start_rank) * 1000))
  166. if not rank_result:
  167. # 兜底策略
  168. # log_.info('====== bottom strategy')
  169. start_bottom = time.time()
  170. rank_result = bottom_strategy(size=size, app_type=app_type, ab_code=ab_code)
  171. end_bottom = time.time()
  172. log_.info('mid: {}, uid: {}, bottom strategy result: {}, execute time = {}ms'.format(
  173. mid, uid, rank_result, (end_bottom - start_bottom) * 1000))
  174. return rank_result, last_rov_recall_key
  175. def ab_test_op(rank_result, ab_code_list, app_type, mid, uid, **kwargs):
  176. """
  177. 对排序后的结果 按照AB实验进行对应的分组操作
  178. :param rank_result: 排序后的结果
  179. :param ab_code_list: 此次请求参与的 ab实验组
  180. :param app_type: 产品标识
  181. :param mid: mid
  182. :param uid: uid
  183. :param kwargs: 其他参数
  184. :return:
  185. """
  186. # ####### 视频宽高比AB实验
  187. # 对内容精选进行 视频宽高比分发实验
  188. if config_.AB_CODE['w_h_rate'] in ab_code_list and app_type in config_.AB_TEST.get('w_h_rate', []):
  189. rank_result = video_rank_by_w_h_rate(videos=rank_result)
  190. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_w_h_rate_result: {}'.format(
  191. app_type, mid, uid, rank_result))
  192. # 按position位置排序
  193. if config_.AB_CODE['position_insert'] in ab_code_list and app_type in config_.AB_TEST.get('position_insert', []):
  194. rank_result = video_position_recommend(mid, uid, app_type, rank_result)
  195. print('===========================')
  196. print(rank_result)
  197. log_.info('app_type: {}, mid: {}, uid: {}, rank_by_position_insert_result: {}'.format(
  198. app_type, mid, uid, rank_result))
  199. # 相关推荐强插
  200. if config_.AB_CODE['relevant_video_op'] in ab_code_list \
  201. and app_type in config_.AB_TEST.get('relevant_video_op', []):
  202. head_vid = kwargs['head_vid']
  203. size = kwargs['size']
  204. rank_result = relevant_video_top_recommend(
  205. app_type=app_type, mid=mid, uid=uid, head_vid=head_vid, videos=rank_result, size=size
  206. )
  207. log_.info('app_type: {}, mid: {}, uid: {}, head_vid: {}, rank_by_relevant_video_op_result: {}'.format(
  208. app_type, mid, uid, head_vid, rank_result))
  209. return rank_result
  210. def update_redis_data(result, app_type, mid, last_rov_recall_key, expire_time=24*3600):
  211. """
  212. 根据最终的排序结果更新相关redis数据
  213. :param result: 排序结果
  214. :param app_type: 产品标识
  215. :param mid: mid
  216. :param last_rov_recall_key: 用户上一次在rov召回池对应的位置 redis key
  217. :param expire_time: 末位视频记录redis过期时间
  218. :return: None
  219. """
  220. # ####### redis数据刷新
  221. try:
  222. # log_.info('====== update redis')
  223. # 预曝光数据同步刷新到Redis, 过期时间为0.5h
  224. redis_helper = RedisHelper()
  225. preview_key_name = config_.PREVIEW_KEY_PREFIX + '{}.{}'.format(app_type, mid)
  226. preview_video_ids = [int(item['videoId']) for item in result]
  227. if preview_video_ids:
  228. # log_.error('key_name = {} \n values = {}'.format(preview_key_name, tuple(preview_video_ids)))
  229. redis_helper.add_data_with_set(key_name=preview_key_name, values=tuple(preview_video_ids), expire_time=30 * 60)
  230. log_.info('preview redis update success!')
  231. # 将此次获取的ROV召回池config_.K末位视频id同步刷新到Redis中,方便下次快速定位到召回位置,过期时间为1天
  232. rov_recall_video = [item['videoId'] for item in result[:config_.K]
  233. if item['pushFrom'] == config_.PUSH_FROM['rov_recall']]
  234. if len(rov_recall_video) > 0:
  235. if app_type == config_.APP_TYPE['APP']:
  236. key_name = config_.UPDATE_ROV_KEY_NAME_APP
  237. else:
  238. key_name = config_.UPDATE_ROV_KEY_NAME
  239. if not redis_helper.get_score_with_value(key_name=key_name, value=rov_recall_video[-1]):
  240. redis_helper.set_data_to_redis(key_name=last_rov_recall_key, value=rov_recall_video[-1],
  241. expire_time=expire_time)
  242. log_.info('last video redis update success!')
  243. # 将此次分发的流量池视频,对 本地分发数-1 进行记录
  244. flow_recall_video = [item for item in result if item['pushFrom'] == config_.PUSH_FROM['flow_recall']]
  245. if flow_recall_video:
  246. update_local_distribute_count(flow_recall_video)
  247. log_.info('update local distribute count success!')
  248. except Exception as e:
  249. log_.error("update redis data fail!")
  250. log_.error(traceback.format_exc())
  251. def update_local_distribute_count(videos):
  252. """
  253. 更新本地分发数
  254. :param videos: 视频列表 type-list [{'videoId':'', 'flowPool':'', 'distributeCount': '',
  255. 'rovScore': '', 'pushFrom': 'flow_pool', 'abCode': self.ab_code}, ....]
  256. :return:
  257. """
  258. try:
  259. redis_helper = RedisHelper()
  260. for item in videos:
  261. key_name = '{}{}.{}'.format(config_.LOCAL_DISTRIBUTE_COUNT_PREFIX, item['videoId'], item['flowPool'])
  262. # 本地记录的分发数 - 1
  263. redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  264. # if redis_helper.key_exists(key_name=key_name):
  265. # # 该视频本地有记录,本地记录的分发数 - 1
  266. # redis_helper.decr_key(key_name=key_name, amount=1, expire_time=5 * 60)
  267. # else:
  268. # # 该视频本地无记录,接口获取的分发数 - 1
  269. # redis_helper.incr_key(key_name=key_name, amount=int(item['distributeCount']) - 1, expire_time=5 * 60)
  270. except Exception as e:
  271. log_.error('update_local_distribute_count error...')
  272. log_.error(traceback.format_exc())
  273. def video_homepage_recommend(mid, uid, size, app_type, algo_type, client_info):
  274. """
  275. 首页线上推荐逻辑
  276. :param mid: mid type-string
  277. :param uid: uid type-string
  278. :param size: 请求视频数量 type-int
  279. :param app_type: 产品标识 type-int
  280. :param algo_type: 算法类型 type-string
  281. :param client_info: 用户位置信息 {"country": "国家", "province": "省份", "city": "城市"}
  282. :return:
  283. """
  284. # 对 vlog 切换10%的流量做实验
  285. # 对mid进行哈希
  286. print(hash(mid))
  287. print(abs(hash(mid)) % 10)
  288. if app_type in config_.AB_TEST['rank_by_h'] and abs(hash(mid)) % 10 in [0, 1, 7, 8, 4, ]:
  289. print('in')
  290. # 简单召回 - 排序 - 兜底
  291. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  292. algo_type=algo_type, client_info=client_info,
  293. expire_time=3600,
  294. ab_code=config_.AB_CODE['rank_by_h'])
  295. # ab-test
  296. result = ab_test_op(rank_result=rank_result,
  297. ab_code_list=[config_.AB_CODE['position_insert']],
  298. app_type=app_type, mid=mid, uid=uid)
  299. # redis数据刷新
  300. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key,
  301. expire_time=3600)
  302. else:
  303. # 简单召回 - 排序 - 兜底
  304. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  305. algo_type=algo_type, client_info=client_info)
  306. # ab-test
  307. result = ab_test_op(rank_result=rank_result,
  308. ab_code_list=[config_.AB_CODE['position_insert']],
  309. app_type=app_type, mid=mid, uid=uid)
  310. # redis数据刷新
  311. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  312. return result
  313. def video_relevant_recommend(video_id, mid, uid, size, app_type):
  314. """
  315. 相关推荐逻辑
  316. :param video_id: 相关推荐的头部视频id
  317. :param mid: mid type-string
  318. :param uid: uid type-string
  319. :param size: 请求视频数量 type-int
  320. :param app_type: 产品标识 type-int
  321. :return: videos type-list
  322. """
  323. # videos = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type, algo_type='', client_info=None)
  324. # 简单召回 - 排序 - 兜底
  325. rank_result, last_rov_recall_key = video_recommend(mid=mid, uid=uid, size=size, app_type=app_type,
  326. algo_type='', client_info=None)
  327. # ab-test
  328. result = ab_test_op(rank_result=rank_result,
  329. ab_code_list=[
  330. config_.AB_CODE['position_insert'],
  331. config_.AB_CODE['relevant_video_op']
  332. ],
  333. app_type=app_type, mid=mid, uid=uid, head_vid=video_id, size=size)
  334. # redis数据刷新
  335. update_redis_data(result=result, app_type=app_type, mid=mid, last_rov_recall_key=last_rov_recall_key)
  336. return result
  337. if __name__ == '__main__':
  338. videos = [
  339. {"videoId": 10136461, "rovScore": 99.971, "pushFrom": "recall_pool", "abCode": 10000},
  340. {"videoId": 10239014, "rovScore": 99.97, "pushFrom": "recall_pool", "abCode": 10000},
  341. {"videoId": 9851154, "rovScore": 99.969, "pushFrom": "recall_pool", "abCode": 10000},
  342. {"videoId": 10104347, "rovScore": 99.968, "pushFrom": "recall_pool", "abCode": 10000},
  343. {"videoId": 10141507, "rovScore": 99.967, "pushFrom": "recall_pool", "abCode": 10000},
  344. {"videoId": 10292817, "flowPool": "2#6#2#1641780979606", "rovScore": 53.926690610816486,
  345. "pushFrom": "flow_pool", "abCode": 10000},
  346. {"videoId": 10224932, "flowPool": "2#5#1#1641800279644", "rovScore": 53.47890460059617, "pushFrom": "flow_pool",
  347. "abCode": 10000},
  348. {"videoId": 9943255, "rovScore": 99.966, "pushFrom": "recall_pool", "abCode": 10000},
  349. {"videoId": 10282970, "flowPool": "2#5#1#1641784814103", "rovScore": 52.682815076325575,
  350. "pushFrom": "flow_pool", "abCode": 10000},
  351. {"videoId": 10282205, "rovScore": 99.965, "pushFrom": "recall_pool", "abCode": 10000}
  352. ]
  353. res = relevant_video_top_recommend(app_type=4, mid='', uid=1111, head_vid=123, videos=videos, size=10)
  354. print(res)