videos_filter.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import time
  2. import json
  3. import traceback
  4. import ast
  5. from datetime import date, timedelta, datetime
  6. from utils import filter_video_status, send_msg_to_feishu
  7. from db_helper import RedisHelper
  8. from config import set_config
  9. from log import Log
  10. config_, env = set_config()
  11. log_ = Log()
  12. def filter_position_videos():
  13. """按位置排序视频过滤"""
  14. log_.info("position videos filter start...")
  15. position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME]
  16. redis_helper = RedisHelper()
  17. for key_name in position_key_list:
  18. position = key_name.split('.')[-1]
  19. log_.info("position = {}".format(position))
  20. # 获取数据
  21. position_videos = redis_helper.get_data_from_redis(key_name=key_name)
  22. if position_videos is None:
  23. log_.info('position {} videos is None!'.format(position))
  24. continue
  25. else:
  26. # 过滤
  27. position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)]
  28. filter_video_ids = filter_video_status(video_ids=position_video_ids)
  29. # 重新写入redis
  30. redis_helper.set_data_to_redis(key_name=key_name,
  31. value=str(filter_video_ids),
  32. expire_time=30 * 3600)
  33. log_.info('position {} videos filter end!'.format(position))
  34. log_.info("position videos filter end!")
  35. def filter_relevant_videos():
  36. """运营强插相关推荐视频过滤"""
  37. log_.info("relevant videos with op filter filter start...")
  38. # 读取需要过滤的头部视频id
  39. redis_helper = RedisHelper()
  40. head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME)
  41. if head_videos is None or len(head_videos) == 0:
  42. log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos))
  43. return
  44. # 过滤
  45. remove_head_vids = []
  46. for head_vid in head_videos:
  47. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  48. # 头部视频 对应的key不存在时,将head_vid移除对应redis
  49. if not redis_helper.key_exists(key_name=key_name):
  50. remove_head_vids.append(head_vid)
  51. log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid))
  52. continue
  53. # 获取头部视频对应的相关视频
  54. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  55. # 该视频没有指定的相关性视频,将head_vid移除对应redis
  56. if relevant_videos is None:
  57. remove_head_vids.append(head_vid)
  58. log_.info('head_vid = {} not have relevant videos!'.format(head_vid))
  59. continue
  60. # 过滤
  61. relevant_videos = json.loads(relevant_videos)
  62. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos]
  63. filtered_videos = filter_video_status(video_ids=relevant_video_ids)
  64. # 保留可推荐 且生效中 的视频
  65. relevant_videos_new = [
  66. item for item in relevant_videos
  67. if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time())
  68. ]
  69. # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key
  70. if len(relevant_videos_new) == 0:
  71. remove_head_vids.append(head_vid)
  72. redis_helper.del_keys(key_name=key_name)
  73. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  74. head_vid, len(relevant_videos_new)))
  75. continue
  76. # 重新写入redis
  77. # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间
  78. finish_time_list = [item['finish_time'] for item in relevant_videos_new]
  79. expire_time = max(finish_time_list) - int(time.time()) + 5
  80. if expire_time <= 0:
  81. log_.info('head_vid = {} expire_time <= 0!'.format(head_vid))
  82. continue
  83. # 存入redis
  84. redis_helper.set_data_to_redis(key_name=key_name,
  85. value=json.dumps(relevant_videos_new),
  86. expire_time=expire_time)
  87. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  88. head_vid, len(relevant_videos_new)))
  89. # 将需要移除的头部视频id进行移除
  90. if len(remove_head_vids) == 0:
  91. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  92. log_.info("relevant videos with op filter end!")
  93. return
  94. redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids))
  95. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  96. log_.info("relevant videos with op filter end!")
  97. def filter_rov_pool(app_type=None):
  98. """ROV召回池视频过滤"""
  99. log_.info("rov recall pool filter start ...")
  100. # 拼接redis-key
  101. if app_type is None:
  102. key_name, _ = get_pool_redis_key(pool_type='rov')
  103. else:
  104. log_.info("appType = {}".format(app_type))
  105. key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
  106. # 获取视频
  107. redis_helper = RedisHelper()
  108. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  109. if data is None:
  110. log_.info("data is None")
  111. log_.info("rov recall pool filter end!")
  112. return
  113. # 过滤
  114. video_ids = [int(video_id) for video_id in data]
  115. filtered_result = filter_video_status(video_ids=video_ids)
  116. # 求差集,获取需要过滤掉的视频,并从redis中移除
  117. filter_videos = set(video_ids) - set(filtered_result)
  118. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  119. len(filtered_result),
  120. len(filter_videos)))
  121. if len(filter_videos) == 0:
  122. log_.info("rov recall pool filter end!")
  123. return
  124. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  125. log_.info("rov recall pool filter end!")
  126. def filter_flow_pool():
  127. """流量池视频过滤"""
  128. log_.info("flow pool filter start ...")
  129. for _, app_type in config_.APP_TYPE.items():
  130. log_.info('app_type {} videos filter start...'.format(app_type))
  131. # 拼接redis-key
  132. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  133. # 获取视频
  134. redis_helper = RedisHelper()
  135. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  136. if data is None:
  137. log_.info("data is None")
  138. log_.info("app_type {} videos filter end!".format(app_type))
  139. continue
  140. # videoId与flowPool做mapping
  141. video_ids = []
  142. mapping = {}
  143. for video in data:
  144. video_id, flow_pool = video.split('-')
  145. video_id = int(video_id)
  146. if video_id not in video_ids:
  147. video_ids.append(video_id)
  148. mapping[video_id] = [flow_pool]
  149. else:
  150. mapping[video_id].append(flow_pool)
  151. # 过滤
  152. if len(video_ids) == 0:
  153. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  154. log_.info("app_type {} videos filter end!".format(app_type))
  155. continue
  156. filtered_result = filter_video_status(video_ids=video_ids)
  157. # 求差集,获取需要过滤掉的视频,并从redis中移除
  158. filter_videos = set(video_ids) - set(filtered_result)
  159. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  160. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  161. # 移除
  162. if len(filter_videos) == 0:
  163. log_.info("app_type {} videos filter end!".format(app_type))
  164. continue
  165. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  166. for video_id in filter_videos
  167. for flow_pool in mapping[video_id]]
  168. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  169. log_.info("app_type {} videos filter end!".format(app_type))
  170. log_.info("flow pool filter end!")
  171. def filter_bottom():
  172. """兜底视频过滤"""
  173. log_.info("bottom videos filter start ...")
  174. # 获取视频
  175. redis_helper = RedisHelper()
  176. data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  177. if data is None:
  178. log_.info("data is None")
  179. log_.info("bottom videos filter end!")
  180. return
  181. # 过滤
  182. video_ids = [int(video_id) for video_id in data]
  183. filtered_result = filter_video_status(video_ids=video_ids)
  184. # 求差集,获取需要过滤掉的视频,并从redis中移除
  185. filter_videos = set(video_ids) - set(filtered_result)
  186. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  187. len(filtered_result),
  188. len(filter_videos)))
  189. if len(filter_videos) == 0:
  190. log_.info("bottom videos filter end!")
  191. return
  192. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  193. log_.info("bottom videos filter end!")
  194. def filter_rov_updated():
  195. """修改过ROV的视频过滤"""
  196. log_.info("update rov videos filter start ...")
  197. # 获取视频
  198. redis_helper = RedisHelper()
  199. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
  200. if data is None:
  201. log_.info("data is None")
  202. log_.info("update rov videos filter end!")
  203. return
  204. # 过滤
  205. video_ids = [int(video_id) for video_id in data]
  206. filtered_result = filter_video_status(video_ids=video_ids)
  207. # 求差集,获取需要过滤掉的视频,并从redis中移除
  208. filter_videos = set(video_ids) - set(filtered_result)
  209. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  210. len(filtered_result),
  211. len(filter_videos)))
  212. if len(filter_videos) == 0:
  213. log_.info("update rov videos filter end!")
  214. return
  215. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  216. log_.info("update rov videos filter end!")
  217. def get_pool_redis_key(pool_type, app_type=None):
  218. """
  219. 拼接key
  220. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  221. :param app_type: 产品标识
  222. :return: key_name
  223. """
  224. redis_helper = RedisHelper()
  225. if pool_type == 'rov':
  226. # appType = 6
  227. if app_type == config_.APP_TYPE['SHORT_VIDEO']:
  228. # 获取当前所在小时
  229. redis_date = datetime.now().hour
  230. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  231. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  232. if redis_helper.key_exists(key_name):
  233. return key_name, redis_date
  234. else:
  235. if redis_date == 0:
  236. redis_date = 23
  237. else:
  238. redis_date = redis_date - 1
  239. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  240. return key_name, redis_date
  241. # 其他
  242. else:
  243. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  244. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  245. if redis_helper.key_exists(key_name):
  246. redis_date = date.today().strftime('%Y%m%d')
  247. else:
  248. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  249. key_name = config_.RECALL_KEY_NAME_PREFIX + redis_date
  250. return key_name, redis_date
  251. elif pool_type == 'flow':
  252. # 流量池
  253. return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
  254. else:
  255. log_.error('pool type error')
  256. return None, None
  257. def main():
  258. try:
  259. # ROV召回池视频过滤
  260. filter_rov_pool()
  261. # appType = 6,ROV召回池视频过滤
  262. filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
  263. # 流量池视频过滤
  264. filter_flow_pool()
  265. # 兜底视频过滤
  266. filter_bottom()
  267. # 修改过ROV的视频过滤
  268. filter_rov_updated()
  269. # 运营强插相关推荐视频过滤
  270. filter_relevant_videos()
  271. # 按位置排序视频过滤
  272. filter_position_videos()
  273. except Exception as e:
  274. log_.error(traceback.format_exc())
  275. send_msg_to_feishu('{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc()))
  276. return
  277. if __name__ == '__main__':
  278. main()