videos_filter.py 56 KB


  1. import multiprocessing
  2. import gevent
  3. import os
  4. import time
  5. import json
  6. import traceback
  7. import ast
  8. import pandas as pd
  9. from datetime import date, timedelta, datetime
  10. from region_rule_rank_h import region_code
  11. from utils import filter_video_status, send_msg_to_feishu, filter_video_status_app
  12. from db_helper import RedisHelper, MysqlHelper
  13. from config import set_config
  14. from log import Log
  15. config_, env = set_config()
  16. log_ = Log()
  17. redis_helper = RedisHelper()
  18. def filter_position_videos():
  19. """按位置排序视频过滤"""
  20. log_.info("position videos filter start...")
  21. position_key_list = [config_.RECALL_POSITION1_KEY_NAME, config_.RECALL_POSITION2_KEY_NAME]
  22. redis_helper = RedisHelper()
  23. for key_name in position_key_list:
  24. position = key_name.split('.')[-1]
  25. log_.info("position = {}".format(position))
  26. # 获取数据
  27. position_videos = redis_helper.get_data_from_redis(key_name=key_name)
  28. if position_videos is None:
  29. log_.info('position {} videos is None!'.format(position))
  30. continue
  31. else:
  32. # 过滤
  33. position_video_ids = [int(video_id) for video_id in ast.literal_eval(position_videos)]
  34. filter_video_ids = filter_video_status(video_ids=position_video_ids)
  35. # 重新写入redis
  36. redis_helper.set_data_to_redis(key_name=key_name,
  37. value=str(filter_video_ids),
  38. expire_time=30 * 3600)
  39. log_.info('position {} videos filter end!'.format(position))
  40. log_.info("position videos filter end!")
  41. def filter_relevant_videos():
  42. """运营强插相关推荐视频过滤"""
  43. log_.info("relevant videos with op filter filter start...")
  44. # 读取需要过滤的头部视频id
  45. redis_helper = RedisHelper()
  46. head_videos = redis_helper.get_data_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME)
  47. if head_videos is None or len(head_videos) == 0:
  48. log_.info("relevant videos with op filter end! head_videos = {}".format(head_videos))
  49. return
  50. # 过滤
  51. remove_head_vids = []
  52. for head_vid in head_videos:
  53. key_name = '{}{}'.format(config_.RELEVANT_VIDEOS_WITH_OP_KEY_NAME, head_vid)
  54. # 头部视频 对应的key不存在时,将head_vid移除对应redis
  55. if not redis_helper.key_exists(key_name=key_name):
  56. remove_head_vids.append(head_vid)
  57. log_.info('head_vid = {} relevant redis key not exist!'.format(head_vid))
  58. continue
  59. # 获取头部视频对应的相关视频
  60. relevant_videos = redis_helper.get_data_from_redis(key_name=key_name)
  61. # 该视频没有指定的相关性视频,将head_vid移除对应redis
  62. if relevant_videos is None:
  63. remove_head_vids.append(head_vid)
  64. log_.info('head_vid = {} not have relevant videos!'.format(head_vid))
  65. continue
  66. # 过滤
  67. relevant_videos = json.loads(relevant_videos)
  68. relevant_video_ids = [int(item['recommend_vid']) for item in relevant_videos]
  69. filtered_videos = filter_video_status(video_ids=relevant_video_ids)
  70. # 保留可推荐 且生效中 的视频
  71. relevant_videos_new = [
  72. item for item in relevant_videos
  73. if int(item['recommend_vid']) in filtered_videos and int(item['finish_time']) > int(time.time())
  74. ]
  75. # 过滤后没有符合的视频,将head_vid移除对应redis,删除对应的相关推荐的key
  76. if len(relevant_videos_new) == 0:
  77. remove_head_vids.append(head_vid)
  78. redis_helper.del_keys(key_name=key_name)
  79. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  80. head_vid, len(relevant_videos_new)))
  81. continue
  82. # 重新写入redis
  83. # 以最晚结束的视频的结束时间 - 当前时间 + 5s 作为key的过期时间
  84. finish_time_list = [item['finish_time'] for item in relevant_videos_new]
  85. expire_time = max(finish_time_list) - int(time.time()) + 5
  86. if expire_time <= 0:
  87. log_.info('head_vid = {} expire_time <= 0!'.format(head_vid))
  88. continue
  89. # 存入redis
  90. redis_helper.set_data_to_redis(key_name=key_name,
  91. value=json.dumps(relevant_videos_new),
  92. expire_time=expire_time)
  93. log_.info('head_vid = {} filtered finished! new relevant videos count = {}'.format(
  94. head_vid, len(relevant_videos_new)))
  95. # 将需要移除的头部视频id进行移除
  96. if len(remove_head_vids) == 0:
  97. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  98. log_.info("relevant videos with op filter end!")
  99. return
  100. redis_helper.remove_value_from_set(key_name=config_.RELEVANT_TOP_VIDEOS_KEY_NAME, values=tuple(remove_head_vids))
  101. log_.info('head videos remove finished! remove_head_vids = {}'.format(remove_head_vids))
  102. log_.info("relevant videos with op filter end!")
  103. def filter_rov_pool(app_type=None):
  104. """ROV召回池视频过滤"""
  105. log_.info("rov recall pool filter start ...")
  106. # 拼接redis-key
  107. if app_type is None:
  108. key_name, _ = get_pool_redis_key(pool_type='rov')
  109. else:
  110. log_.info("appType = {}".format(app_type))
  111. key_name, _ = get_pool_redis_key(pool_type='rov', app_type=app_type)
  112. # 获取视频
  113. redis_helper = RedisHelper()
  114. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  115. if data is None:
  116. log_.info("data is None")
  117. log_.info("rov recall pool filter end!")
  118. return
  119. # 过滤
  120. video_ids = [int(video_id) for video_id in data]
  121. if app_type == config_.APP_TYPE['APP']:
  122. filtered_result = filter_video_status_app(video_ids=video_ids)
  123. else:
  124. filtered_result = filter_video_status(video_ids=video_ids)
  125. # 求差集,获取需要过滤掉的视频,并从redis中移除
  126. filter_videos = set(video_ids) - set(filtered_result)
  127. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  128. len(filtered_result),
  129. len(filter_videos)))
  130. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  131. if len(filter_videos) == 0:
  132. log_.info("rov recall pool filter end!")
  133. return
  134. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  135. log_.info("rov recall pool filter end!")
  136. def filter_flow_pool():
  137. """流量池视频过滤"""
  138. log_.info("flow pool filter start ...")
  139. app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
  140. for _, app_type in config_.APP_TYPE.items():
  141. log_.info('app_type {} videos filter start...'.format(app_type))
  142. if app_type in app_type_list:
  143. filter_flow_pool_18_19(app_type=app_type)
  144. else:
  145. for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
  146. log_.info(f"flow_pool_id = {flow_pool_id}")
  147. # 拼接redis-key
  148. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type, flow_pool_id=flow_pool_id)
  149. # 获取视频
  150. redis_helper = RedisHelper()
  151. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  152. if data is None:
  153. log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
  154. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  155. continue
  156. # videoId与flowPool做mapping
  157. video_ids = []
  158. mapping = {}
  159. for video in data:
  160. video_id, flow_pool = video.split('-')
  161. video_id = int(video_id)
  162. if video_id not in video_ids:
  163. video_ids.append(video_id)
  164. mapping[video_id] = [flow_pool]
  165. else:
  166. mapping[video_id].append(flow_pool)
  167. # 过滤
  168. if len(video_ids) == 0:
  169. log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
  170. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  171. continue
  172. if app_type == config_.APP_TYPE['APP']:
  173. filtered_result = filter_video_status_app(video_ids=video_ids)
  174. else:
  175. filtered_result = filter_video_status(video_ids=video_ids)
  176. # 求差集,获取需要过滤掉的视频,并从redis中移除
  177. filter_videos = set(video_ids) - set(filtered_result)
  178. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  179. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  180. # 移除
  181. if len(filter_videos) == 0:
  182. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  183. continue
  184. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  185. for video_id in filter_videos
  186. for flow_pool in mapping[video_id]]
  187. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  188. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  189. log_.info(f"app_type = {app_type} videos filter end!")
  190. log_.info("flow pool filter end!")
  191. def filter_flow_pool_new():
  192. """流量池视频过滤"""
  193. log_.info("flow pool filter start ...")
  194. app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
  195. for _, app_type in config_.APP_TYPE.items():
  196. log_.info('app_type {} videos filter start...'.format(app_type))
  197. if app_type in app_type_list:
  198. filter_flow_pool_18_19(app_type=app_type)
  199. else:
  200. for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
  201. log_.info(f"flow_pool_id = {flow_pool_id}")
  202. # 拼接redis-key
  203. key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type, flow_pool_id=flow_pool_id)
  204. # 获取视频
  205. redis_helper = RedisHelper()
  206. data = redis_helper.get_data_from_set(key_name=key_name)
  207. if data is None:
  208. log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
  209. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  210. continue
  211. # videoId与flowPool做mapping
  212. video_ids = []
  213. mapping = {}
  214. for video in data:
  215. video_id, flow_pool = video.split('-')
  216. video_id = int(video_id)
  217. if video_id not in video_ids:
  218. video_ids.append(video_id)
  219. mapping[video_id] = [flow_pool]
  220. else:
  221. mapping[video_id].append(flow_pool)
  222. # 过滤
  223. if len(video_ids) == 0:
  224. log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
  225. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  226. continue
  227. if app_type == config_.APP_TYPE['APP']:
  228. filtered_result = filter_video_status_app(video_ids=video_ids)
  229. else:
  230. filtered_result = filter_video_status(video_ids=video_ids)
  231. # 求差集,获取需要过滤掉的视频,并从redis中移除
  232. filter_videos = set(video_ids) - set(filtered_result)
  233. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  234. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  235. # 移除
  236. if len(filter_videos) == 0:
  237. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  238. continue
  239. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  240. for video_id in filter_videos
  241. for flow_pool in mapping[video_id]]
  242. redis_helper.remove_value_from_set(key_name=key_name, values=tuple(remove_videos))
  243. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  244. log_.info(f"app_type = {app_type} videos filter end!")
  245. log_.info("flow pool filter end!")
  246. def filter_flow_pool_process(app_type, flow_pool_id, key_name):
  247. # 获取视频
  248. redis_helper = RedisHelper()
  249. data = redis_helper.get_data_from_set(key_name=key_name)
  250. if data is None:
  251. log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
  252. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  253. return
  254. # videoId与flowPool做mapping
  255. video_ids = []
  256. mapping = {}
  257. for video in data:
  258. video_id, flow_pool = video.split('-')
  259. video_id = int(video_id)
  260. if video_id not in video_ids:
  261. video_ids.append(video_id)
  262. mapping[video_id] = [flow_pool]
  263. else:
  264. mapping[video_id].append(flow_pool)
  265. # 过滤
  266. if len(video_ids) == 0:
  267. log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
  268. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  269. return
  270. if app_type == config_.APP_TYPE['APP']:
  271. filtered_result = filter_video_status_app(video_ids=video_ids)
  272. else:
  273. filtered_result = filter_video_status(video_ids=video_ids)
  274. # 求差集,获取需要过滤掉的视频,并从redis中移除
  275. filter_videos = set(video_ids) - set(filtered_result)
  276. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  277. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  278. # 移除
  279. if len(filter_videos) == 0:
  280. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  281. return
  282. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  283. for video_id in filter_videos
  284. for flow_pool in mapping[video_id]]
  285. redis_helper.remove_value_from_set(key_name=key_name, values=tuple(remove_videos))
  286. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  287. def filter_flow_pool_new_with_level():
  288. """流量池视频过滤"""
  289. log_.info("flow pool filter start ...")
  290. level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  291. level_list = [level for level in json.loads(level_weight)]
  292. log_.info(f"level_list: {level_list}")
  293. app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
  294. for _, app_type in config_.APP_TYPE.items():
  295. log_.info('app_type {} videos filter start...'.format(app_type))
  296. if app_type in app_type_list:
  297. filter_flow_pool_18_19(app_type=app_type)
  298. else:
  299. for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
  300. log_.info(f"flow_pool_id = {flow_pool_id}")
  301. # 拼接redis-key
  302. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  303. key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type, flow_pool_id=flow_pool_id)
  304. filter_flow_pool_process(app_type, flow_pool_id, key_name)
  305. else:
  306. for level in level_list:
  307. key_name = get_pool_redis_key(pool_type='flow_set', app_type=app_type,
  308. flow_pool_id=flow_pool_id, level=level)
  309. filter_flow_pool_process(app_type, flow_pool_id, key_name)
  310. log_.info(f"app_type = {app_type}, level = {level} videos filter end!")
  311. log_.info(f"app_type = {app_type} videos filter end!")
  312. log_.info("flow pool filter end!")
  313. def filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name):
  314. # 获取视频
  315. redis_helper = RedisHelper()
  316. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  317. if data is None:
  318. log_.info(f"flow_pool_id = {flow_pool_id}, data is None")
  319. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  320. return
  321. # videoId与flowPool做mapping
  322. video_ids = []
  323. mapping = {}
  324. for video in data:
  325. video_id, flow_pool = video.split('-')
  326. video_id = int(video_id)
  327. if video_id not in video_ids:
  328. video_ids.append(video_id)
  329. mapping[video_id] = [flow_pool]
  330. else:
  331. mapping[video_id].append(flow_pool)
  332. # 过滤
  333. if len(video_ids) == 0:
  334. log_.info(f"data size = {len(data)}, video_ids size = {len(video_ids)}, data = {data}")
  335. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id}, videos filter end!")
  336. return
  337. if app_type == config_.APP_TYPE['APP']:
  338. filtered_result = filter_video_status_app(video_ids=video_ids)
  339. else:
  340. filtered_result = filter_video_status(video_ids=video_ids)
  341. # 求差集,获取需要过滤掉的视频,并从redis中移除
  342. filter_videos = set(video_ids) - set(filtered_result)
  343. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  344. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  345. # 移除
  346. if len(filter_videos) == 0:
  347. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  348. return
  349. remove_videos = ['{}-{}'.format(video_id, flow_pool)
  350. for video_id in filter_videos
  351. for flow_pool in mapping[video_id]]
  352. redis_helper.remove_value_from_zset(key_name=key_name, value=remove_videos)
  353. log_.info(f"app_type = {app_type}, flow_pool_id = {flow_pool_id} videos filter end!")
  354. def filter_flow_pool_new_with_level_score():
  355. """流量池视频过滤"""
  356. log_.info("flow pool filter start ...")
  357. level_weight = redis_helper.get_data_from_redis(key_name=config_.FLOWPOOL_LEVEL_WEIGHT_KEY_NAME)
  358. level_list = [level for level in json.loads(level_weight)]
  359. log_.info(f"level_list: {level_list}")
  360. app_type_list = [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]
  361. for _, app_type in config_.APP_TYPE.items():
  362. log_.info('app_type {} videos filter start...'.format(app_type))
  363. if app_type in app_type_list:
  364. filter_flow_pool_18_19(app_type=app_type)
  365. else:
  366. for flow_pool_id in [None, config_.QUICK_FLOW_POOL_ID]:
  367. log_.info(f"flow_pool_id = {flow_pool_id}")
  368. # 拼接redis-key
  369. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  370. key_name = get_pool_redis_key(pool_type='flow_level_score', app_type=app_type, flow_pool_id=flow_pool_id)
  371. filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name)
  372. else:
  373. for level in level_list:
  374. key_name = get_pool_redis_key(pool_type='flow_level_score', app_type=app_type,
  375. flow_pool_id=flow_pool_id, level=level)
  376. filter_flow_pool_level_score_process(app_type, flow_pool_id, key_name)
  377. log_.info(f"app_type = {app_type}, level = {level} videos filter end!")
  378. log_.info(f"app_type = {app_type} videos filter end!")
  379. log_.info("flow pool filter end!")
  380. def filter_flow_pool_18_19(app_type):
  381. """流量池视频过滤"""
  382. log_.info('app_type {} videos filter start...'.format(app_type))
  383. # 拼接redis-key
  384. key_name = get_pool_redis_key(pool_type='flow', app_type=app_type)
  385. # 获取视频
  386. redis_helper = RedisHelper()
  387. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  388. if data is None:
  389. log_.info("data is None")
  390. log_.info("app_type {} videos filter end!".format(app_type))
  391. return
  392. video_ids = [int(video_id) for video_id in data]
  393. # 过滤
  394. if len(video_ids) == 0:
  395. log_.info("data size = {}, video_ids size = {}, data = {}".format(len(data), len(video_ids), data))
  396. log_.info("app_type {} videos filter end!".format(app_type))
  397. return
  398. filtered_result = filter_video_status(video_ids=video_ids)
  399. # 求差集,获取需要过滤掉的视频,并从redis中移除
  400. filter_videos = set(video_ids) - set(filtered_result)
  401. log_.info("data size = {}, video_ids size = {}, filtered size = {}, filter sizer = {}".format(
  402. len(data), len(video_ids), len(filtered_result), len(filter_videos)))
  403. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  404. # 移除
  405. if len(filter_videos) == 0:
  406. log_.info("app_type {} videos filter end!".format(app_type))
  407. return
  408. redis_helper.remove_value_from_zset(key_name=key_name, value=filter_videos)
  409. log_.info("app_type {} videos filter end!".format(app_type))
  410. log_.info("flow pool filter end!")
  411. def filter_bottom():
  412. """兜底视频过滤"""
  413. log_.info("bottom videos filter start ...")
  414. # 获取视频
  415. redis_helper = RedisHelper()
  416. data = redis_helper.get_all_data_from_zset(key_name=config_.BOTTOM_KEY_NAME)
  417. if data is None:
  418. log_.info("data is None")
  419. log_.info("bottom videos filter end!")
  420. return
  421. # 过滤
  422. video_ids = [int(video_id) for video_id in data]
  423. filtered_result = filter_video_status(video_ids=video_ids)
  424. # 求差集,获取需要过滤掉的视频,并从redis中移除
  425. filter_videos = set(video_ids) - set(filtered_result)
  426. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  427. len(filtered_result),
  428. len(filter_videos)))
  429. if len(filter_videos) == 0:
  430. log_.info("bottom videos filter end!")
  431. return
  432. redis_helper.remove_value_from_zset(key_name=config_.BOTTOM_KEY_NAME, value=list(filter_videos))
  433. log_.info("bottom videos filter end!")
  434. def filter_rov_updated():
  435. """修改过ROV的视频过滤"""
  436. log_.info("update rov videos filter start ...")
  437. # 获取视频
  438. redis_helper = RedisHelper()
  439. data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME)
  440. if data is None:
  441. log_.info("data is None")
  442. log_.info("update rov videos filter end!")
  443. return
  444. # 过滤
  445. video_ids = [int(video_id) for video_id in data]
  446. filtered_result = filter_video_status(video_ids=video_ids)
  447. # 求差集,获取需要过滤掉的视频,并从redis中移除
  448. filter_videos = set(video_ids) - set(filtered_result)
  449. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  450. len(filtered_result),
  451. len(filter_videos)))
  452. log_.info({'key_name': config_.UPDATE_ROV_KEY_NAME, 'filter_videos': filter_videos})
  453. if len(filter_videos) == 0:
  454. log_.info("update rov videos filter end!")
  455. return
  456. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME, value=list(filter_videos))
  457. log_.info("update rov videos filter end!")
  458. def filter_rov_updated_app():
  459. """修改过ROV的视频过滤-app推荐状态过滤"""
  460. log_.info("update rov videos app filter start ...")
  461. # 获取视频
  462. redis_helper = RedisHelper()
  463. data = redis_helper.get_all_data_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP)
  464. if data is None:
  465. log_.info("data is None")
  466. log_.info("update rov videos app filter end!")
  467. return
  468. # 过滤
  469. video_ids = [int(video_id) for video_id in data]
  470. filtered_result = filter_video_status_app(video_ids=video_ids)
  471. # 求差集,获取需要过滤掉的视频,并从redis中移除
  472. filter_videos = set(video_ids) - set(filtered_result)
  473. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  474. len(filtered_result),
  475. len(filter_videos)))
  476. if len(filter_videos) == 0:
  477. log_.info("update rov videos app filter end!")
  478. return
  479. redis_helper.remove_value_from_zset(key_name=config_.UPDATE_ROV_KEY_NAME_APP, value=list(filter_videos))
  480. log_.info("update rov videos app filter end!")
  481. def get_pool_redis_key(pool_type, app_type=None, flow_pool_id=None, level=None):
  482. """
  483. 拼接key
  484. :param pool_type: type-string {'rov': rov召回池, 'flow': 流量池}
  485. :param app_type: 产品标识
  486. :param flow_pool_id: 流量池ID
  487. :return: key_name
  488. """
  489. redis_helper = RedisHelper()
  490. if pool_type == 'rov':
  491. # appType = 6
  492. if app_type == config_.APP_TYPE['SHORT_VIDEO']:
  493. # 获取当前所在小时
  494. redis_date = datetime.now().hour
  495. # 判断热度列表是否更新,未更新则使用前一小时的热度列表
  496. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:{redis_date}'
  497. if redis_helper.key_exists(key_name):
  498. return key_name, redis_date
  499. else:
  500. if redis_date == 0:
  501. redis_date = 23
  502. else:
  503. redis_date = redis_date - 1
  504. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:{redis_date}'
  505. return key_name, redis_date
  506. # appType: [18, 19]
  507. elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
  508. key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}:'
  509. now_date = datetime.today().strftime('%Y%m%d')
  510. now_h = datetime.now().hour
  511. key_name = f"{key_name_prefix}{now_date}:{now_h}"
  512. if redis_helper.key_exists(key_name):
  513. return key_name, now_h
  514. else:
  515. if now_h == 0:
  516. redis_h = 23
  517. redis_date = (datetime.today() - timedelta(days=1)).strftime('%Y%m%d')
  518. else:
  519. redis_h = now_h - 1
  520. redis_date = now_date
  521. key_name = f"{key_name_prefix}{redis_date}:{redis_h}"
  522. return key_name, redis_h
  523. else:
  524. # appType = 13 票圈视频app
  525. if app_type == config_.APP_TYPE['APP']:
  526. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX_APP
  527. # # appType: [18, 19]
  528. # elif app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
  529. # key_name_prefix = f'{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.'
  530. # 其他
  531. else:
  532. key_name_prefix = config_.RECALL_KEY_NAME_PREFIX
  533. # 判断热度列表是否更新,未更新则使用前一天的热度列表
  534. key_name = f"{key_name_prefix}{time.strftime('%Y%m%d')}"
  535. if redis_helper.key_exists(key_name):
  536. redis_date = date.today().strftime('%Y%m%d')
  537. else:
  538. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  539. key_name = f"{key_name_prefix}{redis_date}"
  540. return key_name, redis_date
  541. elif pool_type == 'flow':
  542. # 流量池
  543. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  544. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{flow_pool_id}"
  545. else:
  546. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX}{app_type}"
  547. elif pool_type == 'flow_set':
  548. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  549. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}:{flow_pool_id}"
  550. else:
  551. if level is None:
  552. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET}{app_type}"
  553. else:
  554. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL}{app_type}:{level}"
  555. elif pool_type == 'flow_level_score':
  556. if flow_pool_id == config_.QUICK_FLOW_POOL_ID:
  557. return f"{config_.QUICK_FLOWPOOL_KEY_NAME_PREFIX}{app_type}:{flow_pool_id}"
  558. else:
  559. return f"{config_.FLOWPOOL_KEY_NAME_PREFIX_SET_LEVEL_SCORE}{app_type}:{level}"
  560. else:
  561. log_.error('pool type error')
  562. return None, None
  563. def filter_app_pool():
  564. """过滤票圈视频APP小时级数据"""
  565. log_.info("app pool filter start ...")
  566. redis_helper = RedisHelper()
  567. # 获取当前日期
  568. now_date = date.today().strftime('%Y%m%d')
  569. # 获取当前所在小时
  570. now_h = datetime.now().hour
  571. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  572. if now_h < 7:
  573. redis_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
  574. redis_h = 21
  575. elif now_h > 21:
  576. redis_date = now_date
  577. redis_h = 21
  578. else:
  579. if now_h % 2 == 0:
  580. redis_date = now_date
  581. redis_h = now_h - 1
  582. else:
  583. redis_date = now_date
  584. redis_h = now_h
  585. log_.info(f'redis_date = {redis_date}, redis_h = {redis_h}.')
  586. # 拼接key
  587. key_name = f'{config_.APP_FINAL_RECALL_KEY_NAME_PREFIX}{redis_date}.{redis_h}'
  588. # 获取视频
  589. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  590. if data is None:
  591. log_.info("data is None")
  592. log_.info("app pool filter end!")
  593. return
  594. # 过滤
  595. video_ids = [int(video_id) for video_id in data]
  596. filtered_result = filter_video_status_app(video_ids=video_ids)
  597. # 求差集,获取需要过滤掉的视频,并从redis中移除
  598. filter_videos = set(video_ids) - set(filtered_result)
  599. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  600. len(filtered_result),
  601. len(filter_videos)))
  602. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  603. if len(filter_videos) == 0:
  604. log_.info("app pool filter end!")
  605. return
  606. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  607. log_.info("app pool filter end!")
  608. def filter_rov_h():
  609. """过滤小程序小时级数据"""
  610. rule_params = config_.RULE_PARAMS
  611. log_.info("rov_h pool filter start ...")
  612. redis_helper = RedisHelper()
  613. # 获取当前日期
  614. now_date = date.today().strftime('%Y%m%d')
  615. # 获取当前所在小时
  616. now_h = datetime.now().hour
  617. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  618. for key, value in rule_params.items():
  619. log_.info(f"rule = {key}, param = {value}")
  620. # 需过滤两个视频列表
  621. key_prefix_list = [
  622. config_.RECALL_KEY_NAME_PREFIX_BY_H,
  623. config_.RECALL_KEY_NAME_PREFIX_DUP_24H_H,
  624. config_.RECALL_KEY_NAME_PREFIX_DUP_H
  625. ]
  626. for i, key_prefix in enumerate(key_prefix_list):
  627. # 拼接key
  628. key_name = f"{key_prefix}{key}.{now_date}.{now_h}"
  629. log_.info(f"key_name: {key_name}")
  630. # 获取视频
  631. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  632. if data is None:
  633. log_.info("data is None")
  634. log_.info("filter end!")
  635. continue
  636. # 过滤
  637. video_ids = [int(video_id) for video_id in data]
  638. filtered_result = filter_video_status(video_ids=video_ids)
  639. # 求差集,获取需要过滤掉的视频,并从redis中移除
  640. filter_videos = set(video_ids) - set(filtered_result)
  641. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  642. len(filtered_result),
  643. len(filter_videos)))
  644. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  645. if len(filter_videos) == 0:
  646. log_.info("filter end!")
  647. continue
  648. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  649. if i == 0:
  650. # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
  651. redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER}{key}",
  652. values=filter_videos, expire_time=2*3600)
  653. log_.info("rov_h pool filter end!")
  654. def filter_rov_day():
  655. """过滤小程序天级数据"""
  656. rule_params = config_.RULE_PARAMS_DAY
  657. log_.info("rov_day pool filter start ...")
  658. redis_helper = RedisHelper()
  659. # 获取当前日期
  660. now_date = date.today().strftime('%Y%m%d')
  661. log_.info(f'now_date = {now_date}.')
  662. for key, value in rule_params.items():
  663. log_.info(f"rule = {key}, param = {value}")
  664. # 需过滤三个视频列表
  665. key_prefix_list = [
  666. config_.RECALL_KEY_NAME_PREFIX_BY_DAY,
  667. config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_PRE,
  668. config_.RECALL_KEY_NAME_PREFIX_DUP_DAY_NOW
  669. ]
  670. for i, key_prefix in enumerate(key_prefix_list):
  671. # 拼接key
  672. key_name = f"{key_prefix}{key}.{now_date}"
  673. log_.info(f"key_name: {key_name}")
  674. # 获取视频
  675. data = redis_helper.get_data_zset_with_index(key_name=key_name, start=0, end=-1)
  676. if data is None:
  677. log_.info("data is None")
  678. log_.info("filter end!")
  679. continue
  680. # 过滤
  681. video_ids = [int(video_id) for video_id in data]
  682. filtered_result = filter_video_status(video_ids=video_ids)
  683. # 求差集,获取需要过滤掉的视频,并从redis中移除
  684. filter_videos = set(video_ids) - set(filtered_result)
  685. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  686. len(filtered_result),
  687. len(filter_videos)))
  688. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  689. if len(filter_videos) == 0:
  690. log_.info("filter end!")
  691. continue
  692. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  693. log_.info("rov_day pool filter end!")
  694. def filter_old_videos():
  695. """过滤老视频数据"""
  696. log_.info("old videos filter start ...")
  697. redis_helper = RedisHelper()
  698. # 获取当前日期
  699. now_date = date.today().strftime('%Y%m%d')
  700. log_.info(f'now_date = {now_date}.')
  701. # 拼接key
  702. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_OLD_VIDEOS}{now_date}'
  703. # 获取视频
  704. data = redis_helper.get_data_from_set(key_name=key_name)
  705. if data is None:
  706. log_.info("data is None")
  707. log_.info("old videos filter end!")
  708. return
  709. # 过滤
  710. video_ids = [int(video_id) for video_id in data]
  711. filtered_result = filter_video_status(video_ids=video_ids)
  712. # 求差集,获取需要过滤掉的视频,并从redis中移除
  713. filter_videos = set(video_ids) - set(filtered_result)
  714. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  715. len(filtered_result),
  716. len(filter_videos)))
  717. if len(filter_videos) == 0:
  718. log_.info("old videos filter end!")
  719. return
  720. redis_helper.remove_value_from_set(key_name=key_name, values=filter_videos)
  721. log_.info("old videos filter end!")
  722. def filter_process_with_region(data_key, rule_key, region, now_date, now_h):
  723. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region}")
  724. # 需过滤视频列表
  725. key_prefix_list = [
  726. config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H,
  727. config_.RECALL_KEY_NAME_PREFIX_DUP_H_H,
  728. config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H,
  729. # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_DAY_H,
  730. # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_DAY_H,
  731. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H,
  732. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H,
  733. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H,
  734. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H,
  735. # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H
  736. ]
  737. for i, key_prefix in enumerate(key_prefix_list):
  738. # 拼接key
  739. key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:{now_date}:{now_h}"
  740. # 获取视频
  741. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  742. if data is None:
  743. log_.info("data is None")
  744. log_.info("filter end!")
  745. continue
  746. # 过滤
  747. video_ids = [int(video_id) for video_id in data]
  748. if data_key in ['data7', ]:
  749. filtered_result = filter_video_status_app(video_ids=video_ids)
  750. else:
  751. filtered_result = filter_video_status(video_ids=video_ids)
  752. # 求差集,获取需要过滤掉的视频,并从redis中移除
  753. filter_videos = set(video_ids) - set(filtered_result)
  754. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  755. len(filtered_result),
  756. len(filter_videos)))
  757. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  758. if len(filter_videos) == 0:
  759. log_.info("filter end!")
  760. continue
  761. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  762. # if i == 0:
  763. # # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
  764. # redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER}"
  765. # f"{region}.{app_type}.{data_key}.{rule_key}",
  766. # values=filter_videos, expire_time=2 * 3600)
  767. # elif i == 1:
  768. # # 将地域分组24h的数据需要过滤的视频加入到线上过滤应用列表中
  769. # redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}"
  770. # f"{region}.{app_type}.{data_key}.{rule_key}",
  771. # values=filter_videos, expire_time=2 * 3600)
  772. # elif i == 2:
  773. # # 将相对24h的数据需要过滤的视频加入到线上过滤应用列表中
  774. # redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}"
  775. # f"{region}.{app_type}.{data_key}.{rule_key}",
  776. # values=filter_videos, expire_time=2 * 3600)
  777. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region} videos filter end!")
  778. def filter_process_with_param(param, region_code_list, now_date, now_h):
  779. data_key = param.get('data')
  780. rule_key = param.get('rule')
  781. log_.info(f"param = {param} videos filter start... ")
  782. task_list = [
  783. gevent.spawn(filter_process_with_region, data_key, rule_key, region, now_date, now_h)
  784. for region in region_code_list
  785. ]
  786. gevent.joinall(task_list)
  787. log_.info(f"param = {param} videos filter end!")
  788. def filter_region_videos(rule_params):
  789. """过滤地域分组规则视频"""
  790. region_code_list = [code for region, code in region_code.items()] + \
  791. [code for city, code in config_.CITY_CODE.items()]
  792. log_.info("region_h videos filter start ...")
  793. # 获取当前日期
  794. now_date = date.today().strftime('%Y%m%d')
  795. # 获取当前所在小时
  796. now_h = datetime.now().hour
  797. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  798. params_list = rule_params.get('params_list')
  799. pool = multiprocessing.Pool(processes=len(params_list))
  800. for param in params_list:
  801. pool.apply_async(
  802. func=filter_process_with_param,
  803. args=(param, region_code_list, now_date, now_h)
  804. )
  805. pool.close()
  806. pool.join()
  807. # task_list = []
  808. # for param in rule_params.get('params_list'):
  809. # data_key = param.get('data')
  810. # rule_key = param.get('rule')
  811. # log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  812. # task_list.extend(
  813. # [
  814. # gevent.spawn(filter_process_with_region, data_key, rule_key, region, now_date, now_h)
  815. # for region in region_code_list
  816. # ]
  817. # )
  818. # gevent.joinall(task_list)
  819. log_.info("region_h videos filter end!")
  820. def filter_region_videos_by_day():
  821. """过滤地域分组天级规则视频"""
  822. region_code_list = [code for region, code in region_code.items()]
  823. rule_params = config_.RULE_PARAMS_REGION_DAY
  824. log_.info("region_day videos filter start ...")
  825. redis_helper = RedisHelper()
  826. # 获取当前日期
  827. now_date = date.today().strftime('%Y%m%d')
  828. log_.info(f'now_date = {now_date}.')
  829. for region in region_code_list:
  830. log_.info(f"region = {region}")
  831. for key, value in rule_params.items():
  832. log_.info(f"rule = {key}, param = {value}")
  833. # 需过滤视频列表
  834. key_prefix_list = [
  835. config_.RECALL_KEY_NAME_PREFIX_REGION_BY_DAY
  836. ]
  837. for i, key_prefix in enumerate(key_prefix_list):
  838. # 拼接key
  839. key_name = f"{key_prefix}{region}.{key}.{now_date}"
  840. log_.info(f"key_name: {key_name}")
  841. # 获取视频
  842. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  843. if data is None:
  844. log_.info("data is None")
  845. log_.info("filter end!")
  846. continue
  847. # 过滤
  848. video_ids = [int(video_id) for video_id in data]
  849. filtered_result = filter_video_status(video_ids=video_ids)
  850. # 求差集,获取需要过滤掉的视频,并从redis中移除
  851. filter_videos = set(video_ids) - set(filtered_result)
  852. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  853. len(filtered_result),
  854. len(filter_videos)))
  855. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  856. if len(filter_videos) == 0:
  857. log_.info("filter end!")
  858. continue
  859. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  860. log_.info(f"region = {region} videos filter end!")
  861. log_.info("region_day videos filter end!")
  862. def filter_rov_h_24h():
  863. """过滤小程序小时级更新24h数据"""
  864. rule_params = config_.RULE_PARAMS_24H
  865. log_.info("rov_h_by24h pool filter start ...")
  866. redis_helper = RedisHelper()
  867. # 获取当前日期
  868. now_date = date.today().strftime('%Y%m%d')
  869. # 获取当前所在小时
  870. now_h = datetime.now().hour
  871. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  872. for key, value in rule_params.items():
  873. log_.info(f"rule = {key}, param = {value}")
  874. # 需过滤两个视频列表
  875. key_prefix_list = [config_.RECALL_KEY_NAME_PREFIX_BY_24H, config_.RECALL_KEY_NAME_PREFIX_DUP_24H]
  876. for i, key_prefix in enumerate(key_prefix_list):
  877. # 拼接key
  878. key_name = f"{key_prefix}{key}.{now_date}.{now_h}"
  879. log_.info(f"key_name: {key_name}")
  880. # 获取视频
  881. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  882. if data is None:
  883. log_.info("data is None")
  884. log_.info("filter end!")
  885. continue
  886. # 过滤
  887. video_ids = [int(video_id) for video_id in data]
  888. filtered_result = filter_video_status(video_ids=video_ids)
  889. # 求差集,获取需要过滤掉的视频,并从redis中移除
  890. filter_videos = set(video_ids) - set(filtered_result)
  891. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  892. len(filtered_result),
  893. len(filter_videos)))
  894. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  895. if len(filter_videos) == 0:
  896. log_.info("filter end!")
  897. continue
  898. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  899. if i == 0:
  900. # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
  901. redis_helper.add_data_with_set(key_name=f"{config_.H_VIDEO_FILER_24H}{key}",
  902. values=filter_videos, expire_time=2*3600)
  903. log_.info("rov_h_by24h pool filter end!")
  904. def filter_region_videos_24h():
  905. """过滤地域分组24h规则视频"""
  906. region_code_list = [code for region, code in region_code.items()]
  907. rule_params = config_.RULE_PARAMS_REGION_24H
  908. log_.info("region_24h videos filter start ...")
  909. redis_helper = RedisHelper()
  910. # 获取当前日期
  911. now_date = date.today().strftime('%Y%m%d')
  912. # 获取当前所在小时
  913. now_h = datetime.now().hour
  914. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  915. for region in region_code_list:
  916. log_.info(f"region = {region}")
  917. for key, value in rule_params.items():
  918. log_.info(f"rule = {key}, param = {value}")
  919. # 需过滤视频列表
  920. key_prefix_list = [
  921. config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H,
  922. config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_DAY_24H,
  923. config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_24H
  924. ]
  925. for i, key_prefix in enumerate(key_prefix_list):
  926. # 拼接key
  927. key_name = f"{key_prefix}{region}.{key}.{now_date}.{now_h}"
  928. log_.info(f"key_name: {key_name}")
  929. # 获取视频
  930. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  931. if data is None:
  932. log_.info("data is None")
  933. log_.info("filter end!")
  934. continue
  935. # 过滤
  936. video_ids = [int(video_id) for video_id in data]
  937. filtered_result = filter_video_status(video_ids=video_ids)
  938. # 求差集,获取需要过滤掉的视频,并从redis中移除
  939. filter_videos = set(video_ids) - set(filtered_result)
  940. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  941. len(filtered_result),
  942. len(filter_videos)))
  943. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  944. if len(filter_videos) == 0:
  945. log_.info("filter end!")
  946. continue
  947. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  948. if i == 0:
  949. # 将小时级的数据需要过滤的视频加入到线上过滤应用列表中
  950. redis_helper.add_data_with_set(key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{key}",
  951. values=filter_videos, expire_time=2 * 3600)
  952. # filter_data(filter_videos, region)
  953. log_.info(f"region = {region} videos filter end!")
  954. log_.info("region_24h videos filter end!")
  955. def filter_data(videos, region):
  956. now_dt = datetime.now().strftime('%Y%m%d%H%M')
  957. filepath = './data/filter_data'
  958. if not os.path.exists(filepath):
  959. os.makedirs(filepath)
  960. res = []
  961. mysql_helper = MysqlHelper(mysql_info=config_.FILTER_MYSQL_INFO)
  962. video_status_sql = "SELECT t1.id AS 'video_id', " \
  963. "t1.transcode_status AS 'transcoding_status', " \
  964. "t2.audit_status AS 'audit_status', " \
  965. "t2.video_status AS 'open_status', " \
  966. "t2.recommend_status AS 'applet_rec_status', " \
  967. "t2.app_recommend_status AS 'app_rec_status', " \
  968. "t3.charge AS 'payment_status', " \
  969. "case when t4.max_validate_count is null then 0 else t4.max_validate_count end AS 'encryption_status' " \
  970. "FROM longvideo.wx_video t1 " \
  971. "LEFT JOIN longvideo.wx_video_status t2 ON t1.id= t2.video_id " \
  972. "LEFT JOIN longvideo.wx_video_detail t3 ON t1.id= t3.video_id " \
  973. "LEFT JOIN longvideo.wx_video_pwd t4 ON t1.id= t4.video_id"
  974. for video_id in list(videos):
  975. sql = "SELECT video_id, audit_status, applet_rec_status, open_status, " \
  976. "payment_status, encryption_status, transcoding_status " \
  977. "FROM ({}) " \
  978. "WHERE video_id = {};".format(video_status_sql, video_id)
  979. data = mysql_helper.get_data(sql=sql)
  980. res += data
  981. columns = ['video_id', 'audit_status', 'applet_rec_status', 'open_status',
  982. 'payment_status', 'encryption_status', 'transcoding_status']
  983. filter_df = pd.DataFrame(data=res, columns=columns)
  984. filename = f"filter_data_region_{region}_{now_dt}.csv"
  985. file = os.path.join(filepath, filename)
  986. filter_df.to_csv(file, index=False)
  987. def filter_whole_movies():
  988. """过滤完整电影数据"""
  989. log_.info("whole movies filter start ...")
  990. redis_helper = RedisHelper()
  991. # 获取当前日期
  992. now_date = date.today().strftime('%Y%m%d')
  993. now_h = datetime.now().hour
  994. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  995. # 拼接key
  996. key_name = f'{config_.RECALL_KEY_NAME_PREFIX_WHOLE_MOVIES}{now_date}.{now_h}'
  997. # 获取视频
  998. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  999. if data is None:
  1000. log_.info("data is None")
  1001. log_.info("whole movies filter end!")
  1002. return
  1003. # 过滤
  1004. video_ids = [int(video_id) for video_id in data]
  1005. filtered_result = filter_video_status(video_ids=video_ids)
  1006. # 求差集,获取需要过滤掉的视频,并从redis中移除
  1007. filter_videos = set(video_ids) - set(filtered_result)
  1008. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  1009. len(filtered_result),
  1010. len(filter_videos)))
  1011. log_.info({'key_name': key_name, 'filter_videos': filter_videos})
  1012. if len(filter_videos) == 0:
  1013. log_.info("whole movies filter end!")
  1014. return
  1015. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  1016. log_.info("whole movies filter end!")
  1017. def filter_day_30day():
  1018. """过滤小程序天级更新30天数据"""
  1019. log_.info("day_by_30day pool filter start ...")
  1020. # 获取当前日期
  1021. now_date = date.today().strftime('%Y%m%d')
  1022. rule_params = config_.RULE_PARAMS_30DAY_APP_TYPE
  1023. params_list = rule_params.get('params_list')
  1024. redis_helper = RedisHelper()
  1025. log_.info(f'now_date = {now_date}.')
  1026. for param in params_list:
  1027. data_key = param.get('data')
  1028. rule_key = param.get('rule')
  1029. log_.info(f"param = {param} videos filter start... ")
  1030. # 需过滤视频列表
  1031. key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
  1032. key_name = f"{key_prefix}{data_key}:{rule_key}:{now_date}"
  1033. log_.info(f"key_name: {key_name}")
  1034. # 获取视频
  1035. data = redis_helper.get_all_data_from_zset(key_name=key_name)
  1036. if data is None:
  1037. log_.info("data is None")
  1038. log_.info("filter end!")
  1039. continue
  1040. # 过滤
  1041. video_ids = [int(video_id) for video_id in data]
  1042. filtered_result = filter_video_status(video_ids=video_ids)
  1043. # 求差集,获取需要过滤掉的视频,并从redis中移除
  1044. filter_videos = set(video_ids) - set(filtered_result)
  1045. log_.info("video_ids size = {}, filtered size = {}, filter sizer = {}".format(len(video_ids),
  1046. len(filtered_result),
  1047. len(filter_videos)))
  1048. if len(filter_videos) == 0:
  1049. log_.info("filter end!")
  1050. continue
  1051. redis_helper.remove_value_from_zset(key_name=key_name, value=list(filter_videos))
  1052. log_.info("day_by_30day pool filter end!")
  1053. def main():
  1054. try:
  1055. # ROV召回池视频过滤
  1056. # filter_rov_pool()
  1057. # appType = 6,ROV召回池视频过滤
  1058. # filter_rov_pool(app_type=config_.APP_TYPE['SHORT_VIDEO'])
  1059. # appType = 13,票圈视频APP视频过滤
  1060. # filter_rov_pool(app_type=config_.APP_TYPE['APP'])
  1061. # appType = 18, ROV召回池视频过滤
  1062. # filter_rov_pool(app_type=config_.APP_TYPE['LAO_HAO_KAN_VIDEO'])
  1063. # appType = 19, ROV召回池视频过滤
  1064. # filter_rov_pool(app_type=config_.APP_TYPE['ZUI_JING_QI'])
  1065. # 流量池视频过滤
  1066. filter_flow_pool()
  1067. # filter_flow_pool_new()
  1068. filter_flow_pool_new_with_level()
  1069. filter_flow_pool_new_with_level_score()
  1070. # 兜底视频过滤
  1071. # filter_bottom()
  1072. # 修改过ROV的视频过滤
  1073. # filter_rov_updated()
  1074. # filter_rov_updated_app()
  1075. # 运营强插相关推荐视频过滤
  1076. # filter_relevant_videos()
  1077. # 按位置排序视频过滤
  1078. # filter_position_videos()
  1079. # 过滤票圈视频APP小时级数据
  1080. # filter_app_pool()
  1081. # 过滤小程序小时级数据
  1082. # filter_rov_h()
  1083. # 过滤小程序天级数据
  1084. # filter_rov_day()
  1085. # 过滤老视频数据
  1086. # filter_old_videos()
  1087. # 过滤地域分组小时级视频
  1088. filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE)
  1089. # filter_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H)
  1090. # 过滤地域分组天级视频
  1091. # filter_region_videos_by_day()
  1092. # 过滤小时级更新24h视频
  1093. # filter_rov_h_24h()
  1094. # 过滤地域分组24h规则视频
  1095. # filter_region_videos_24h()
  1096. # 过滤完整电影数据
  1097. # filter_whole_movies()
  1098. # 过滤小程序天级更新30天数据
  1099. # filter_day_30day()
  1100. except Exception as e:
  1101. log_.error(traceback.format_exc())
  1102. send_msg_to_feishu(
  1103. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  1104. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  1105. msg_text='{} - 过滤失败 \n {}'.format(config_.ENV_TEXT, traceback.format_exc())
  1106. )
  1107. return
  1108. if __name__ == '__main__':
  1109. main()