videos_filter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import time
  2. import json
  3. import traceback
  4. from datetime import date, timedelta, datetime
  5. from utils import filter_video_status, send_msg_to_feishu
  6. from db_helper import RedisHelper
  7. from config import set_config
  8. from log import Log
  9. config_, env = set_config()
  10. log_ = Log()
  11. def filter_relevant_videos():
  12. """运营强插相关推荐视频过滤"""
  13. log_.info("relevant videos with op filter start...")
  14. # 读取需要过滤的头部视频id
  15. redis_helper = RedisHelper()
  16. head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME)
  17. if head_videos is None or len(head_videos) == 0:
  18. log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos))
  19. return
  20. # 过滤
  21. remove_head_vids = []
  22. for head_vid in head_videos:
  23. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  24. # 头部视频 对应的key不存在时,将head_vid移除对应redis
  25. if not redis_helper.key_exists(key_name=key_name):
  26. remove_head_vids.append(head_vid)
  27. log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid))
  28. continue
  29. # 获取头部视频对应的相关视频
  30. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  31. # 该视频没有指定的相关性视频,将head_vid移除对应redis
  32. if relevant_videos is None:
  33. remove_head_vids.append(head_vid)
  34. log_.info('head_vid = {} not have relevant videos!'.format(head_vid))
  35. continue
  36. # 过滤
  37. relevant_videos = json.loads(relevant_videos)
  38. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos]
  39. filtered_videos = filter_video_status(video_ids=relevant_video_ids)
  40. # 保留可推荐 且生效中 的视频
  41. relevant_videos_new = [
  42. item for item in relevant_videos
  43. if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) <= int(time.time())
  44. ]
  45. # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key
  46. if len(relevant_videos_new) == 0:
  47. remove_head_vids.append(head_vid)
  48. redis_helper.del_keys(key_name=key_name)
  49. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  50. head_vid, len(relevant_videos_new)))
  51. continue
  52. # 重新写入redis
  53. # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间
  54. finish_time_list = [item['finish_time'] for item in relevant_videos_new]
  55. expire_time = max(finish_time_list) - int(time.time()) + 5
  56. if expire_time <= 0:
  57. log_.info('head_vid = {} expire_time <= 0!'.format(head_vid))
  58. continue
  59. # 存入redis
  60. redis_helper.set_data_to_redis(key_name=key_name,
  61. value=json.dumps(relevant_videos_new),
  62. expire_time=expire_time)
  63. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  64. head_vid, len(relevant_videos_new)))
  65. # 将需要移除的头部视频id进行移除
  66. redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids))
  67. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  68. log_.info("relevant videos with op filter end!")
  69. def filter_rov_pool(app_type=None):
  70. """ROV召回池视频过滤"""
  71. log_.info("rov recall pool filter start ...")
  72. # 拼接redis-key
  73. if app_type is None:
  74. key_name, _ = get_pool_redis_key(pool_type='rov')
  75. else:
  76. log_.info("appType = {}".format(app_type))
  77. key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
  78. # 获取视频
  79. redis_helper = RedisHelper()
  80. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  81. if data is None:
  82. log_.info("data is None")
  83. log_.info("rov recall pool filter end!")
  84. return
  85. # 过滤
  86. video_ids = [int(video_id) for video_id in data]
  87. filtered_result = filter_video_status(video_ids=video_ids)
  88. # 求差集,获取需要过滤掉的视频,并从redis中移除
  89. filter_videos = set(video_ids) - set(filtered_result)
  90. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  91. len(filtered_result),
  92. len(filter_videos)))
  93. if len(filter_videos) == 0:
  94. log_.info("rov recall pool filter end!")
  95. return
  96. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  97. log_.info("rov recall pool filter end!")
  98. def filter_flow_pool():
  99. """流量池视频过滤"""
  100. log_.info("flow pool filter start ...")
  101. for _, app_type in config_.APP_TYPE.items():
  102. log_.info('app_type {} videos filter start...'.format(app_type))
  103. # 拼接redis-key
  104. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  105. # 获取视频
  106. redis_helper = RedisHelper()
  107. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  108. if data is None:
  109. log_.info("data is None")
  110. log_.info("app_type {} videos filter end!".format(app_type))
  111. continue
  112. # videoId与flowPool做mapping
  113. video_ids = []
  114. mapping = {}
  115. for video in data:
  116. video_id, flow_pool = video.split('-')
  117. video_id = int(video_id)
  118. if video_id not in video_ids:
  119. video_ids.append(video_id)
  120. mapping[video_id] = [flow_pool]
  121. else:
  122. mapping[video_id].append(flow_pool)
  123. # 过滤
  124. if len(video_ids) == 0:
  125. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  126. log_.info("app_type {} videos filter end!".format(app_type))
  127. continue
  128. filtered_result = filter_video_status(video_ids=video_ids)
  129. # 求差集,获取需要过滤掉的视频,并从redis中移除
  130. filter_videos = set(video_ids) - set(filtered_result)
  131. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  132. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  133. # 移除
  134. if len(filter_videos) == 0:
  135. log_.info("app_type {} videos filter end!".format(app_type))
  136. continue
  137. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  138. for video_id in filter_videos
  139. for flow_pool in mapping[video_id]]
  140. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  141. log_.info("app_type {} videos filter end!".format(app_type))
  142. log_.info("flow pool filter end!")
  143. def filter_bottom():
  144. """兜底视频过滤"""
  145. log_.info("bottom videos filter start ...")
  146. # 获取视频
  147. redis_helper = RedisHelper()
  148. data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  149. if data is None:
  150. log_.info("data is None")
  151. log_.info("bottom videos filter end!")
  152. return
  153. # 过滤
  154. video_ids = [int(video_id) for video_id in data]
  155. filtered_result = filter_video_status(video_ids=video_ids)
  156. # 求差集,获取需要过滤掉的视频,并从redis中移除
  157. filter_videos = set(video_ids) - set(filtered_result)
  158. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  159. len(filtered_result),
  160. len(filter_videos)))
  161. if len(filter_videos) == 0:
  162. log_.info("bottom videos filter end!")
  163. return
  164. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  165. log_.info("bottom videos filter end!")
  166. def filter_rov_updated():
  167. """修改过ROV的视频过滤"""
  168. log_.info("update rov videos filter start ...")
  169. # 获取视频
  170. redis_helper = RedisHelper()
  171. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
  172. if data is None:
  173. log_.info("data is None")
  174. log_.info("update rov videos filter end!")
  175. return
  176. # 过滤
  177. video_ids = [int(video_id) for video_id in data]
  178. filtered_result = filter_video_status(video_ids=video_ids)
  179. # 求差集,获取需要过滤掉的视频,并从redis中移除
  180. filter_videos = set(video_ids) - set(filtered_result)
  181. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  182. len(filtered_result),
  183. len(filter_videos)))
  184. if len(filter_videos) == 0:
  185. log_.info("update rov videos filter end!")
  186. return
  187. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  188. log_.info("update rov videos filter end!")
  189. def get_pool_redis_key(pool_type, app_type=None):
  190. """
  191. 拼接key
  192. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  193. :param app_type: 产品标识
  194. :return: key_name
  195. """
  196. redis_helper = RedisHelper()
  197. if pool_type == 'rov':
  198. # appType = 6
  199. if app_type == config_.APP_TYPE['SHORT_VIDEO']:
  200. # 获取当前所在小时
  201. redis_date = datetime.now().hour
  202. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  203. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  204. if redis_helper.key_exists(key_name):
  205. return key_name, redis_date
  206. else:
  207. if redis_date == 0:
  208. redis_date = 23
  209. else:
  210. redis_date = redis_date - 1
  211. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  212. return key_name, redis_date
  213. # 其他
  214. else:
  215. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  216. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  217. if redis_helper.key_exists(key_name):
  218. redis_date = date.today().strftime('%Y%m%d')
  219. else:
  220. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  221. key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
  222. return key_name, redis_date
  223. elif pool_type == 'flow':
  224. # 流量池
  225. return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
  226. else:
  227. log_.error('pool type error')
  228. return None, None
  229. def main():
  230. try:
  231. # ROV召回池视频过滤
  232. filter_rov_pool()
  233. # appType = 6,ROV召回池视频过滤
  234. filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
  235. # 流量池视频过滤
  236. filter_flow_pool()
  237. # 兜底视频过滤
  238. filter_bottom()
  239. # 修改过ROV的视频过滤
  240. filter_rov_updated()
  241. # 运营强插相关推荐视频过滤
  242. filter_relevant_videos()
  243. except Exception as e:
  244. log_.error(traceback.format_exc())
  245. send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
  246. return
  247. if __name__ == '__main__':
  248. main()