videos_filter.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. import time
  2. import json
  3. import traceback
  4. import ast
  5. from datetime import date, timedelta, datetime
  6. from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app
  7. from db_helper import RedisHelper
  8. from config import set_config
  9. from log import Log
  10. config_, env = set_config()
  11. log_ = Log()
  12. def filter_position_videos():
  13. """按位置排序视频过滤"""
  14. log_.info("position videos filter start...")
  15. position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME]
  16. redis_helper = RedisHelper()
  17. for key_name in position_key_list:
  18. position = key_name.split('.')[-1]
  19. log_.info("position = {}".format(position))
  20. # 获取数据
  21. position_videos = redis_helper.get_data_from_redis(key_name=key_name)
  22. if position_videos is None:
  23. log_.info('position {} videos is None!'.format(position))
  24. continue
  25. else:
  26. # 过滤
  27. position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)]
  28. filter_video_ids = filter_video_status(video_ids=position_video_ids)
  29. # 重新写入redis
  30. redis_helper.set_data_to_redis(key_name=key_name,
  31. value=str(filter_video_ids),
  32. expire_time=30 * 3600)
  33. log_.info('position {} videos filter end!'.format(position))
  34. log_.info("position videos filter end!")
  35. def filter_relevant_videos():
  36. """运营强插相关推荐视频过滤"""
  37. log_.info("relevant videos with op filter filter start...")
  38. # 读取需要过滤的头部视频id
  39. redis_helper = RedisHelper()
  40. head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME)
  41. if head_videos is None or len(head_videos) == 0:
  42. log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos))
  43. return
  44. # 过滤
  45. remove_head_vids = []
  46. for head_vid in head_videos:
  47. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  48. # 头部视频 对应的key不存在时,将head_vid移除对应redis
  49. if not redis_helper.key_exists(key_name=key_name):
  50. remove_head_vids.append(head_vid)
  51. log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid))
  52. continue
  53. # 获取头部视频对应的相关视频
  54. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  55. # 该视频没有指定的相关性视频,将head_vid移除对应redis
  56. if relevant_videos is None:
  57. remove_head_vids.append(head_vid)
  58. log_.info('head_vid = {} not have relevant videos!'.format(head_vid))
  59. continue
  60. # 过滤
  61. relevant_videos = json.loads(relevant_videos)
  62. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos]
  63. filtered_videos = filter_video_status(video_ids=relevant_video_ids)
  64. # 保留可推荐 且生效中 的视频
  65. relevant_videos_new = [
  66. item for item in relevant_videos
  67. if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time())
  68. ]
  69. # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key
  70. if len(relevant_videos_new) == 0:
  71. remove_head_vids.append(head_vid)
  72. redis_helper.del_keys(key_name=key_name)
  73. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  74. head_vid, len(relevant_videos_new)))
  75. continue
  76. # 重新写入redis
  77. # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间
  78. finish_time_list = [item['finish_time'] for item in relevant_videos_new]
  79. expire_time = max(finish_time_list) - int(time.time()) + 5
  80. if expire_time <= 0:
  81. log_.info('head_vid = {} expire_time <= 0!'.format(head_vid))
  82. continue
  83. # 存入redis
  84. redis_helper.set_data_to_redis(key_name=key_name,
  85. value=json.dumps(relevant_videos_new),
  86. expire_time=expire_time)
  87. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  88. head_vid, len(relevant_videos_new)))
  89. # 将需要移除的头部视频id进行移除
  90. if len(remove_head_vids) == 0:
  91. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  92. log_.info("relevant videos with op filter end!")
  93. return
  94. redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids))
  95. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  96. log_.info("relevant videos with op filter end!")
  97. def filter_rov_pool(app_type=None):
  98. """ROV召回池视频过滤"""
  99. log_.info("rov recall pool filter start ...")
  100. # 拼接redis-key
  101. if app_type is None:
  102. key_name, _ = get_pool_redis_key(pool_type='rov')
  103. else:
  104. log_.info("appType = {}".format(app_type))
  105. key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
  106. # 获取视频
  107. redis_helper = RedisHelper()
  108. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  109. if data is None:
  110. log_.info("data is None")
  111. log_.info("rov recall pool filter end!")
  112. return
  113. # 过滤
  114. video_ids = [int(video_id) for video_id in data]
  115. if app_type == config_.APP_TYPE['APP']:
  116. filtered_result = filter_video_status_app(video_ids=video_ids)
  117. else:
  118. filtered_result = filter_video_status(video_ids=video_ids)
  119. # 求差集,获取需要过滤掉的视频,并从redis中移除
  120. filter_videos = set(video_ids) - set(filtered_result)
  121. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  122. len(filtered_result),
  123. len(filter_videos)))
  124. if len(filter_videos) == 0:
  125. log_.info("rov recall pool filter end!")
  126. return
  127. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  128. log_.info("rov recall pool filter end!")
  129. def filter_flow_pool():
  130. """流量池视频过滤"""
  131. log_.info("flow pool filter start ...")
  132. for _, app_type in config_.APP_TYPE.items():
  133. log_.info('app_type {} videos filter start...'.format(app_type))
  134. # 拼接redis-key
  135. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  136. # 获取视频
  137. redis_helper = RedisHelper()
  138. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  139. if data is None:
  140. log_.info("data is None")
  141. log_.info("app_type {} videos filter end!".format(app_type))
  142. continue
  143. # videoId与flowPool做mapping
  144. video_ids = []
  145. mapping = {}
  146. for video in data:
  147. video_id, flow_pool = video.split('-')
  148. video_id = int(video_id)
  149. if video_id not in video_ids:
  150. video_ids.append(video_id)
  151. mapping[video_id] = [flow_pool]
  152. else:
  153. mapping[video_id].append(flow_pool)
  154. # 过滤
  155. if len(video_ids) == 0:
  156. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  157. log_.info("app_type {} videos filter end!".format(app_type))
  158. continue
  159. if app_type == config_.APP_TYPE['APP']:
  160. filtered_result = filter_video_status_app(video_ids=video_ids)
  161. else:
  162. filtered_result = filter_video_status(video_ids=video_ids)
  163. # 求差集,获取需要过滤掉的视频,并从redis中移除
  164. filter_videos = set(video_ids) - set(filtered_result)
  165. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  166. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  167. # 移除
  168. if len(filter_videos) == 0:
  169. log_.info("app_type {} videos filter end!".format(app_type))
  170. continue
  171. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  172. for video_id in filter_videos
  173. for flow_pool in mapping[video_id]]
  174. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  175. log_.info("app_type {} videos filter end!".format(app_type))
  176. log_.info("flow pool filter end!")
  177. def filter_bottom():
  178. """兜底视频过滤"""
  179. log_.info("bottom videos filter start ...")
  180. # 获取视频
  181. redis_helper = RedisHelper()
  182. data = redis_helper.get_data_zset_with_index(key_name=config_.BOTTOM_KEY_NAME, start=0, end=-1)
  183. if data is None:
  184. log_.info("data is None")
  185. log_.info("bottom videos filter end!")
  186. return
  187. # 过滤
  188. video_ids = [int(video_id) for video_id in data]
  189. filtered_result = filter_video_status(video_ids=video_ids)
  190. # 求差集,获取需要过滤掉的视频,并从redis中移除
  191. filter_videos = set(video_ids) - set(filtered_result)
  192. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  193. len(filtered_result),
  194. len(filter_videos)))
  195. if len(filter_videos) == 0:
  196. log_.info("bottom videos filter end!")
  197. return
  198. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  199. log_.info("bottom videos filter end!")
  200. def filter_rov_updated():
  201. """修改过ROV的视频过滤"""
  202. log_.info("update rov videos filter start ...")
  203. # 获取视频
  204. redis_helper = RedisHelper()
  205. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME, start=0, end=-1)
  206. if data is None:
  207. log_.info("data is None")
  208. log_.info("update rov videos filter end!")
  209. return
  210. # 过滤
  211. video_ids = [int(video_id) for video_id in data]
  212. filtered_result = filter_video_status(video_ids=video_ids)
  213. # 求差集,获取需要过滤掉的视频,并从redis中移除
  214. filter_videos = set(video_ids) - set(filtered_result)
  215. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  216. len(filtered_result),
  217. len(filter_videos)))
  218. if len(filter_videos) == 0:
  219. log_.info("update rov videos filter end!")
  220. return
  221. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  222. log_.info("update rov videos filter end!")
  223. def filter_rov_updated_app():
  224. """修改过ROV的视频过滤-app推荐状态过滤"""
  225. log_.info("update rov videos app filter start ...")
  226. # 获取视频
  227. redis_helper = RedisHelper()
  228. data = redis_helper.get_data_zset_with_index(key_name=config_.UPDATE_ROV_KEY_NAME_APP, start=0, end=-1)
  229. if data is None:
  230. log_.info("data is None")
  231. log_.info("update rov videos app filter end!")
  232. return
  233. # 过滤
  234. video_ids = [int(video_id) for video_id in data]
  235. filtered_result = filter_video_status_app(video_ids=video_ids)
  236. # 求差集,获取需要过滤掉的视频,并从redis中移除
  237. filter_videos = set(video_ids) - set(filtered_result)
  238. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  239. len(filtered_result),
  240. len(filter_videos)))
  241. if len(filter_videos) == 0:
  242. log_.info("update rov videos app filter end!")
  243. return
  244. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=list(filter_videos))
  245. log_.info("update rov videos app filter end!")
  246. def get_pool_redis_key(pool_type, app_type=None):
  247. """
  248. 拼接key
  249. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  250. :param app_type: 产品标识
  251. :return: key_name
  252. """
  253. redis_helper = RedisHelper()
  254. if pool_type == 'rov':
  255. # appType = 6
  256. if app_type == config_.APP_TYPE['SHORT_VIDEO']:
  257. # 获取当前所在小时
  258. redis_date = datetime.now().hour
  259. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  260. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  261. if redis_helper.key_exists(key_name):
  262. return key_name, redis_date
  263. else:
  264. if redis_date == 0:
  265. redis_date = 23
  266. else:
  267. redis_date = redis_date - 1
  268. key_name = '{}{}.{}'.format(config_.RECALL_KEY_NAME_PREFIX_APP_TYPE, app_type, redis_date)
  269. return key_name, redis_date
  270. else:
  271. # appType = 13 票圈视频app
  272. if app_type == config_.APP_TYPE['APP']:
  273. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP
  274. # appType: [18, 19]
  275. elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
  276. key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.'
  277. # 其他
  278. else:
  279. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX
  280. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  281. key_name = key_name_prefix + time.strftime('%Y%m%d')
  282. if redis_helper.key_exists(key_name):
  283. redis_date = date.today().strftime('%Y%m%d')
  284. else:
  285. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  286. key_name = key_name_prefix + redis_date
  287. return key_name, redis_date
  288. elif pool_type == 'flow':
  289. # 流量池
  290. return config_.FLOWPOOL_KEY_NAME_PREFIX + str(app_type)
  291. else:
  292. log_.error('pool type error')
  293. return None, None
  294. def filter_app_pool():
  295. """过滤票圈视频APP小时级数据"""
  296. log_.info("app pool filter start ...")
  297. redis_helper = RedisHelper()
  298. # 获取当前日期
  299. now_date = date.today().strftime('%Y%m%d')
  300. # 获取当前所在小时
  301. now_h = datetime.now().hour
  302. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  303. if now_h < 7:
  304. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  305. redis_h = 21
  306. elif now_h > 21:
  307. redis_date = now_date
  308. redis_h = 21
  309. else:
  310. if now_h % 2 == 0:
  311. redis_date = now_date
  312. redis_h = now_h - 1
  313. else:
  314. redis_date = now_date
  315. redis_h = now_h
  316. log_.info(f'redis_date = {redis_date}, redis_h = {redis_h}.')
  317. # 拼接key
  318. key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}'
  319. # 获取视频
  320. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  321. if data is None:
  322. log_.info("data is None")
  323. log_.info("app pool filter end!")
  324. return
  325. # 过滤
  326. video_ids = [int(video_id) for video_id in data]
  327. filtered_result = filter_video_status_app(video_ids=video_ids)
  328. # 求差集,获取需要过滤掉的视频,并从redis中移除
  329. filter_videos = set(video_ids) - set(filtered_result)
  330. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  331. len(filtered_result),
  332. len(filter_videos)))
  333. if len(filter_videos) == 0:
  334. log_.info("app pool filter end!")
  335. return
  336. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  337. log_.info("app pool filter end!")
  338. def filter_rov_h():
  339. """过滤小程序小时级数据"""
  340. return_count_list = [20, 10]
  341. log_.info("rov_h pool filter start ...")
  342. redis_helper = RedisHelper()
  343. # 获取当前日期
  344. now_date = date.today().strftime('%Y%m%d')
  345. # 获取当前所在小时
  346. now_h = datetime.now().hour
  347. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  348. for cnt in return_count_list:
  349. log_.info(f"return_count = {cnt}")
  350. # 需过滤两个视频列表
  351. key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_H, config_.RECALL_KEY_NAME_PREFIX_DUP_H]
  352. for i, key_prefix in enumerate(key_prefix_list):
  353. # 拼接key
  354. key_name = f"{key_prefix}{cnt}.{now_date}.{now_h}"
  355. log_.info(f"key_name: {key_name}")
  356. # 获取视频
  357. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  358. if data is None:
  359. log_.info("data is None")
  360. log_.info("filter end!")
  361. continue
  362. # 过滤
  363. video_ids = [int(video_id) for video_id in data]
  364. filtered_result = filter_video_status(video_ids=video_ids)
  365. # 求差集,获取需要过滤掉的视频,并从redis中移除
  366. filter_videos = set(video_ids) - set(filtered_result)
  367. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  368. len(filtered_result),
  369. len(filter_videos)))
  370. if len(filter_videos) == 0:
  371. log_.info("filter end!")
  372. continue
  373. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  374. if i == 0:
  375. # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
  376. redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER}{cnt}",
  377. values=filter_videos, expire_time=2*3600)
  378. log_.info("rov_h pool filter end!")
  379. def main():
  380. try:
  381. # ROV召回池视频过滤
  382. filter_rov_pool()
  383. # appType = 6,ROV召回池视频过滤
  384. filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
  385. # appType = 13,票圈视频APP视频过滤
  386. filter_rov_pool(app_type=config_.APP_TYPE['APP'])
  387. # appType = 18, ROV召回池视频过滤
  388. filter_rov_pool(app_type=config_.APP_TYPE['LAO_HAO_KAN_VIDEO'])
  389. # appType = 19, ROV召回池视频过滤
  390. filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI'])
  391. # 流量池视频过滤
  392. filter_flow_pool()
  393. # 兜底视频过滤
  394. filter_bottom()
  395. # 修改过ROV的视频过滤
  396. filter_rov_updated()
  397. filter_rov_updated_app()
  398. # 运营强插相关推荐视频过滤
  399. filter_relevant_videos()
  400. # 按位置排序视频过滤
  401. filter_position_videos()
  402. # 过滤票圈视频APP小时级数据
  403. filter_app_pool()
  404. # 过滤小程序小时级数据
  405. filter_rov_h()
  406. except Exception as e:
  407. log_.error(traceback.format_exc())
  408. send_msg_to_feishu(
  409. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  410. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  411. msg_text='{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc())
  412. )
  413. return
  414. if __name__ == '__main__':
  415. main()