videos_filter.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import time
  2. import traceback
  3. from datetime import date, timedelta, datetime
  4. from utils import filter_video_status, send_msg_to_feishu
  5. from db_helper import RedisHelper
  6. from config import set_config
  7. from log import Log
  8. config_, env = set_config()
  9. log_ = Log()
  10. def filter_rov_pool(app_type=None):
  11. """ROV召回池视频过滤"""
  12. log_.info("rov recall pool filter start ...")
  13. # 拼接redis-key
  14. if app_type is None:
  15. key_name, _ = get_pool_redis_key(pool_type='rov')
  16. else:
  17. log_.info("appType = {}".format(app_type))
  18. key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
  19. # 获取视频
  20. redis_helper = RedisHelper()
  21. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  22. if data is None:
  23. log_.info("data is None")
  24. log_.info("rov recall pool filter end!")
  25. return
  26. # 过滤
  27. video_ids = [int(video_id) for video_id in data]
  28. filtered_result = filter_video_status(video_ids=video_ids)
  29. # 求差集,获取需要过滤掉的视频,并从redis中移除
  30. filter_videos = set(video_ids) - set(filtered_result)
  31. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  32. len(filtered_result),
  33. len(filter_videos)))
  34. if len(filter_videos) == 0:
  35. log_.info("rov recall pool filter end!")
  36. return
  37. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  38. log_.info("rov recall pool filter end!")
  39. def filter_flow_pool():
  40. """流量池视频过滤"""
  41. log_.info("flow pool filter start ...")
  42. for _, app_type in config_.APP_TYPE.items():
  43. log_.info('app_type {} videos filter start...'.format(app_type))
  44. # 拼接redis-key
  45. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  46. # 获取视频
  47. redis_helper = RedisHelper()
  48. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  49. if data is None:
  50. log_.info("data is None")
  51. log_.info("app_type {} videos filter end!".format(app_type))
  52. continue
  53. # videoId与flowPool做mapping
  54. video_ids = []
  55. mapping = {}
  56. for video in data:
  57. video_id, flow_pool = video.split('-')
  58. video_id = int(video_id)
  59. if video_id not in video_ids:
  60. video_ids.append(video_id)
  61. mapping[video_id] = [flow_pool]
  62. else:
  63. mapping[video_id].append(flow_pool)
  64. # 过滤
  65. if len(video_ids) == 0:
  66. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  67. log_.info("app_type {} videos filter end!".format(app_type))
  68. continue
  69. filtered_result = filter_video_status(video_ids=video_ids)
  70. # 求差集,获取需要过滤掉的视频,并从redis中移除
  71. filter_videos = set(video_ids) - set(filtered_result)
  72. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  73. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  74. # 移除
  75. if len(filter_videos) == 0:
  76. log_.info("app_type {} videos filter end!".format(app_type))
  77. continue
  78. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  79. for video_id in filter_videos
  80. for flow_pool in mapping[video_id]]
  81. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  82. log_.info("app_type {} videos filter end!".format(app_type))
  83. log_.info("flow pool filter end!")
  84. def filter_bottom():
  85. """兜底视频过滤"""
  86. log_.info("bottom videos filter start ...")
  87. # 获取视频
  88. redis_helper = RedisHelper()
  89. data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  90. if data is None:
  91. log_.info("data is None")
  92. log_.info("bottom videos filter end!")
  93. return
  94. # 过滤
  95. video_ids = [int(video_id) for video_id in data]
  96. filtered_result = filter_video_status(video_ids=video_ids)
  97. # 求差集,获取需要过滤掉的视频,并从redis中移除
  98. filter_videos = set(video_ids) - set(filtered_result)
  99. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  100. len(filtered_result),
  101. len(filter_videos)))
  102. if len(filter_videos) == 0:
  103. log_.info("bottom videos filter end!")
  104. return
  105. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  106. log_.info("bottom videos filter end!")
  107. def filter_rov_updated():
  108. """修改过ROV的视频过滤"""
  109. log_.info("update rov videos filter start ...")
  110. # 获取视频
  111. redis_helper = RedisHelper()
  112. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
  113. if data is None:
  114. log_.info("data is None")
  115. log_.info("update rov videos filter end!")
  116. return
  117. # 过滤
  118. video_ids = [int(video_id) for video_id in data]
  119. filtered_result = filter_video_status(video_ids=video_ids)
  120. # 求差集,获取需要过滤掉的视频,并从redis中移除
  121. filter_videos = set(video_ids) - set(filtered_result)
  122. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  123. len(filtered_result),
  124. len(filter_videos)))
  125. if len(filter_videos) == 0:
  126. log_.info("update rov videos filter end!")
  127. return
  128. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  129. log_.info("update rov videos filter end!")
  130. def get_pool_redis_key(pool_type, app_type=None):
  131. """
  132. 拼接key
  133. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  134. :param app_type: 产品标识
  135. :return: key_name
  136. """
  137. redis_helper = RedisHelper()
  138. if pool_type == 'rov':
  139. # appType = 6
  140. if app_type == config_.APP_TYPE['SHORT_VIDEO']:
  141. # 获取当前所在小时
  142. redis_date = datetime.now().hour
  143. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  144. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  145. if redis_helper.key_exists(key_name):
  146. return key_name, redis_date
  147. else:
  148. if redis_date == 0:
  149. redis_date_ = 23
  150. else:
  151. redis_date_ = redis_date - 1
  152. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date_)
  153. return key_name, redis_date_
  154. # 其他
  155. else:
  156. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  157. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  158. if redis_helper.key_exists(key_name):
  159. redis_date = date.today().strftime('%Y%m%d')
  160. else:
  161. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  162. key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
  163. return key_name, redis_date
  164. elif pool_type == 'flow':
  165. # 流量池
  166. return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
  167. else:
  168. log_.error('pool type error')
  169. return None, None
  170. def main():
  171. try:
  172. # ROV召回池视频过滤
  173. filter_rov_pool()
  174. # appType = 6,ROV召回池视频过滤
  175. filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
  176. # 流量池视频过滤
  177. filter_flow_pool()
  178. # 兜底视频过滤
  179. filter_bottom()
  180. # 修改过ROV的视频过滤
  181. filter_rov_updated()
  182. except Exception as e:
  183. log_.error(traceback.format_exc())
  184. send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
  185. return
  186. if __name__ == '__main__':
  187. main()