videos_filter.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import time
  2. import traceback
  3. from datetime import date, timedelta
  4. from utils import filter_video_status, send_msg_to_feishu
  5. from db_helper import RedisHelper
  6. from config import set_config
  7. from log import Log
  8. config_, env = set_config()
  9. log_ = Log()
  10. def filter_rov_pool():
  11. """ROV召回池视频过滤"""
  12. log_.info("rov recall pool filter start ...")
  13. # 拼接redis-key
  14. key_name, _ = get_pool_redis_key(pool_type='rov')
  15. # 获取视频
  16. redis_helper = RedisHelper()
  17. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  18. if data is None:
  19. log_.info("data is None")
  20. log_.info("rov recall pool filter end!")
  21. return
  22. # 过滤
  23. video_ids = [int(video_id) for video_id in data]
  24. filtered_result = filter_video_status(video_ids=video_ids)
  25. # 求差集,获取需要过滤掉的视频,并从redis中移除
  26. filter_videos = set(video_ids) - set(filtered_result)
  27. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  28. len(filtered_result),
  29. len(filter_videos)))
  30. if len(filter_videos) == 0:
  31. log_.info("rov recall pool filter end!")
  32. return
  33. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  34. log_.info("rov recall pool filter end!")
  35. def filter_flow_pool():
  36. """流量池视频过滤"""
  37. log_.info("flow pool filter start ...")
  38. for _, app_type in config_.APP_TYPE.items():
  39. log_.info('app_type {} videos filter start...'.format(app_type))
  40. # 拼接redis-key
  41. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  42. # 获取视频
  43. redis_helper = RedisHelper()
  44. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  45. if data is None:
  46. log_.info("data is None")
  47. log_.info("app_type {} videos filter end!".format(app_type))
  48. continue
  49. # videoId与flowPool做mapping
  50. video_ids = []
  51. mapping = {}
  52. for video in data:
  53. video_id, flow_pool = video.split('-')
  54. video_id = int(video_id)
  55. if video_id not in video_ids:
  56. video_ids.append(video_id)
  57. mapping[video_id] = [flow_pool]
  58. else:
  59. mapping[video_id].append(flow_pool)
  60. # 过滤
  61. if len(video_ids) == 0:
  62. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  63. log_.info("app_type {} videos filter end!".format(app_type))
  64. continue
  65. filtered_result = filter_video_status(video_ids=video_ids)
  66. # 求差集,获取需要过滤掉的视频,并从redis中移除
  67. filter_videos = set(video_ids) - set(filtered_result)
  68. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  69. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  70. # 移除
  71. if len(filter_videos) == 0:
  72. log_.info("app_type {} videos filter end!".format(app_type))
  73. continue
  74. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  75. for video_id in filter_videos
  76. for flow_pool in mapping[video_id]]
  77. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  78. log_.info("app_type {} videos filter end!".format(app_type))
  79. log_.info("flow pool filter end!")
  80. def filter_bottom():
  81. """兜底视频过滤"""
  82. log_.info("bottom videos filter start ...")
  83. # 获取视频
  84. redis_helper = RedisHelper()
  85. data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  86. if data is None:
  87. log_.info("data is None")
  88. log_.info("bottom videos filter end!")
  89. return
  90. # 过滤
  91. video_ids = [int(video_id) for video_id in data]
  92. filtered_result = filter_video_status(video_ids=video_ids)
  93. # 求差集,获取需要过滤掉的视频,并从redis中移除
  94. filter_videos = set(video_ids) - set(filtered_result)
  95. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  96. len(filtered_result),
  97. len(filter_videos)))
  98. if len(filter_videos) == 0:
  99. log_.info("bottom videos filter end!")
  100. return
  101. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  102. log_.info("bottom videos filter end!")
  103. def filter_rov_updated():
  104. """修改过ROV的视频过滤"""
  105. log_.info("update rov videos filter start ...")
  106. # 获取视频
  107. redis_helper = RedisHelper()
  108. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
  109. if data is None:
  110. log_.info("data is None")
  111. log_.info("update rov videos filter end!")
  112. return
  113. # 过滤
  114. video_ids = [int(video_id) for video_id in data]
  115. filtered_result = filter_video_status(video_ids=video_ids)
  116. # 求差集,获取需要过滤掉的视频,并从redis中移除
  117. filter_videos = set(video_ids) - set(filtered_result)
  118. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  119. len(filtered_result),
  120. len(filter_videos)))
  121. if len(filter_videos) == 0:
  122. log_.info("update rov videos filter end!")
  123. return
  124. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  125. log_.info("update rov videos filter end!")
  126. def get_pool_redis_key(pool_type, app_type=None):
  127. """
  128. 拼接key
  129. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  130. :param app_type: 产品标识
  131. :return: key_name
  132. """
  133. redis_helper = RedisHelper()
  134. if pool_type == 'rov':
  135. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  136. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  137. if redis_helper.key_exists(key_name):
  138. redis_date = date.today().strftime('%Y%m%d')
  139. else:
  140. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  141. key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
  142. return key_name, redis_date
  143. elif pool_type == 'flow':
  144. # 流量池
  145. return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
  146. else:
  147. log_.error('pool type error')
  148. return None, None
  149. def main():
  150. try:
  151. # ROV召回池视频过滤
  152. filter_rov_pool()
  153. # 流量池视频过滤
  154. filter_flow_pool()
  155. # 兜底视频过滤
  156. filter_bottom()
  157. # 修改过ROV的视频过滤
  158. filter_rov_updated()
  159. except Exception as e:
  160. log_.error(traceback.format_exc())
  161. send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
  162. return
  163. if __name__ == '__main__':
  164. main()