check_video_limit_distribute.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import gevent
  2. import datetime
  3. import numpy as np
  4. from my_config import set_config
  5. from log import Log
  6. from my_utils import RedisHelper
  7. config_, _ = set_config()
  8. log_ = Log()
  9. redis_helper = RedisHelper()
  10. def update_limit_video_score(initial_videos, key_name):
  11. """
  12. 调整限流视频的分数: 将视频移至所在列表的中位数之后,多个视频时,按照原本的顺序进行排列
  13. :param initial_videos: 视频列表及score type-dict, {videoId: score, ...}
  14. :param key_name: 视频列表对应的key
  15. :return:
  16. """
  17. if not initial_videos:
  18. return
  19. # 获取当前限流视频
  20. data = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS)
  21. if data is None:
  22. return
  23. # 获取限流视频对应的score
  24. limit_video_initial_score = []
  25. for video in eval(data):
  26. video_id = int(video[0])
  27. initial_score = initial_videos.get(video_id, None)
  28. if initial_score is not None:
  29. limit_video_initial_score.append((video_id, initial_score))
  30. # log_.info(f"limit_video_initial_score = {limit_video_initial_score}")
  31. if len(limit_video_initial_score) == 0:
  32. return
  33. # 获取原始列表的分数的中位数
  34. initial_video_score_list = sorted([val for key, val in initial_videos.items()], reverse=False)
  35. media_score = np.median(initial_video_score_list)
  36. # 取中位数后一位
  37. if len(initial_video_score_list) % 2 == 0:
  38. temp_index = len(initial_video_score_list)//2
  39. else:
  40. temp_index = len(initial_video_score_list) // 2 + 1
  41. if len(initial_video_score_list) > 1:
  42. temp_score = initial_video_score_list[temp_index]
  43. else:
  44. temp_score = 0
  45. # 对限流视频score进行调整
  46. limit_video_final_score = {}
  47. limit_video_initial_score.sort(key=lambda x: x[1], reverse=True)
  48. limit_video_id_list = []
  49. for video_id, initial_score in limit_video_initial_score:
  50. if initial_score > media_score:
  51. limit_video_id_list.append(video_id)
  52. if len(limit_video_id_list) > 0:
  53. limit_score_step = (temp_score - media_score) / (len(limit_video_id_list) + 1)
  54. for i, video_id in enumerate(limit_video_id_list):
  55. final_score = media_score - limit_score_step * (i + 1)
  56. limit_video_final_score[int(video_id)] = final_score
  57. # log_.info(f"media_score = {media_score}, temp_score = {temp_score}, "
  58. # f"limit_video_final_score = {limit_video_final_score}")
  59. # 更新限流视频的score
  60. if len(limit_video_final_score) == 0:
  61. return
  62. redis_helper.add_data_with_zset(key_name=key_name, data=limit_video_final_score, expire_time=2 * 24 * 3600)
  63. return limit_video_final_score
  64. def check_videos_distribute():
  65. """
  66. 检查当前限流视频分发数
  67. :return: stop_distribute_video_id_list
  68. """
  69. # 获取当前限流视频及最大分发数
  70. data = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_PREFIX_LIMIT_VIDEOS)
  71. if data is None:
  72. return []
  73. # 判断是否已超分发
  74. stop_distribute_video_id_list = []
  75. for video_id, max_distribute_count in eval(data):
  76. distributed_count = redis_helper.get_data_from_redis(
  77. key_name=f"{config_.KEY_NAME_PREFIX_LIMIT_VIDEO_DISTRIBUTE_COUNT}{video_id}"
  78. )
  79. if distributed_count is None:
  80. continue
  81. if int(distributed_count) >= int(max_distribute_count):
  82. stop_distribute_video_id_list.append(int(video_id))
  83. return stop_distribute_video_id_list
  84. def process_with_region(data_key, rule_key, region, stop_distribute_video_id_list, now_date, now_h):
  85. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region}")
  86. # 将已超分发视频加入到地域小时级线上过滤应用列表中
  87. # redis_helper.add_data_with_set(
  88. # key_name=f"{config_.REGION_H_VIDEO_FILER}{region}.{app_type}.{data_key}.{rule_key}",
  89. # values=stop_distribute_video_id_list,
  90. # expire_time=2 * 3600
  91. # )
  92. # 将已超分发视频加入到地域分组24h的数据线上过滤应用列表中
  93. # redis_helper.add_data_with_set(
  94. # key_name=f"{config_.REGION_H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}",
  95. # values=stop_distribute_video_id_list,
  96. # expire_time=2 * 3600
  97. # )
  98. # 将已超分发视频加入到不区分相对24h线上过滤应用列表中
  99. # redis_helper.add_data_with_set(
  100. # key_name=f"{config_.H_VIDEO_FILER_24H}{region}.{app_type}.{data_key}.{rule_key}",
  101. # values=stop_distribute_video_id_list,
  102. # expire_time=2 * 3600
  103. # )
  104. key_prefix_list = [
  105. config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表
  106. config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表
  107. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表
  108. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h列表2
  109. # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表
  110. ]
  111. # if rule_key == 'rule4':
  112. # key_prefix_list = [
  113. # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表
  114. # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表
  115. # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表
  116. # config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h列表2
  117. # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表
  118. # ]
  119. # elif rule_key == 'rule5':
  120. # key_prefix_list = [
  121. # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表
  122. # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表
  123. # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H, # 不区分地域相对48h列表
  124. # config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H, # 不区分地域相对48h列表2
  125. # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表
  126. # ]
  127. # else:
  128. # key_prefix_list = [
  129. # config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H, # 地域分组小时级列表
  130. # config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域分组相对24h列表
  131. # config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h列表
  132. # config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H, # 大列表
  133. # ]
  134. for key_prefix in key_prefix_list:
  135. key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:" \
  136. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  137. if not redis_helper.key_exists(key_name=key_name):
  138. if now_h == 0:
  139. redis_date = now_date - datetime.timedelta(days=1)
  140. redis_h = 23
  141. else:
  142. redis_date = now_date
  143. redis_h = now_h - 1
  144. key_name = f"{key_prefix}{region}:{data_key}:{rule_key}:" \
  145. f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}:{redis_h}"
  146. redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
  147. """
  148. # 将已超分发视频 移除 不区分相对24h列表2
  149. if rule_key == 'rule4':
  150. key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}" \
  151. f"{region}.{app_type}.{data_key}.{rule_key}." \
  152. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  153. if not redis_helper.key_exists(key_name=key_name):
  154. if now_h == 0:
  155. redis_date = now_date - datetime.timedelta(days=1)
  156. redis_h = 23
  157. else:
  158. redis_date = now_date
  159. redis_h = now_h - 1
  160. key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}" \
  161. f"{region}.{app_type}.{data_key}.{rule_key}." \
  162. f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}.{redis_h}"
  163. redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
  164. # 将已超分发视频 移除 大列表
  165. key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}" \
  166. f"{region}.{app_type}.{data_key}.{rule_key}." \
  167. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}.{now_h}"
  168. if not redis_helper.key_exists(key_name=key_name):
  169. if now_h == 0:
  170. redis_date = now_date - datetime.timedelta(days=1)
  171. redis_h = 23
  172. else:
  173. redis_date = now_date
  174. redis_h = now_h - 1
  175. key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP_REGION_H}" \
  176. f"{region}.{app_type}.{data_key}.{rule_key}." \
  177. f"{datetime.datetime.strftime(redis_date, '%Y%m%d')}.{redis_h}"
  178. redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
  179. """
  180. log_.info(f"data_key = {data_key}, rule_key = {rule_key}, region = {region} "
  181. f"videos check end!")
  182. def check_region_videos(rule_params):
  183. """检查限流视频分发数"""
  184. # 获取当前日期
  185. now_date = datetime.datetime.today()
  186. # 获取当前所在小时
  187. now_h = datetime.datetime.now().hour
  188. log_.info(f'now_date = {now_date}, now_h = {now_h}.')
  189. # 获取已超分发视频
  190. stop_distribute_video_id_list = check_videos_distribute()
  191. log_.info(f"stop_distribute_video_id_list = {stop_distribute_video_id_list}, "
  192. f"count = {len(stop_distribute_video_id_list)}")
  193. if len(stop_distribute_video_id_list) == 0:
  194. return
  195. # 对已超分发的视频进行移除
  196. region_code_list = [code for region, code in config_.REGION_CODE.items()] + \
  197. [code for city, code in config_.CITY_CODE.items()]
  198. for param in rule_params.get('params_list'):
  199. data_key = param.get('data')
  200. rule_key = param.get('rule')
  201. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  202. task_list = [
  203. gevent.spawn(process_with_region,
  204. data_key, rule_key, region, stop_distribute_video_id_list, now_date, now_h)
  205. for region in region_code_list
  206. ]
  207. gevent.joinall(task_list)
  208. # 将已超分发视频 移除 天级更新30天列表
  209. log_.info("day_by_30day check start...")
  210. day30_key_prefix = config_.RECALL_KEY_NAME_PREFIX_30DAY
  211. for param in config_.RULE_PARAMS_30DAY_APP_TYPE.get('params_list'):
  212. data_key = param.get('data')
  213. rule_key = param.get('rule')
  214. log_.info(f"data_key = {data_key}, rule_key = {rule_key}")
  215. key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  216. if not redis_helper.key_exists(key_name=key_name):
  217. redis_date = now_date - datetime.timedelta(days=1)
  218. key_name = f"{day30_key_prefix}:{data_key}:{rule_key}:{datetime.datetime.strftime(redis_date, '%Y%m%d')}"
  219. redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
  220. log_.info("day_by_30day check end!")
  221. # 将已超分发视频 移除 原始大列表
  222. # key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  223. # if not redis_helper.key_exists(key_name=key_name):
  224. # redis_date = now_date - datetime.timedelta(days=1)
  225. # key_name = f"{config_.RECALL_KEY_NAME_PREFIX}{datetime.datetime.strftime(redis_date, '%Y%m%d')}"
  226. # redis_helper.remove_value_from_zset(key_name=key_name, value=stop_distribute_video_id_list)
  227. if __name__ == '__main__':
  228. log_.info("start...")
  229. check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE)
  230. # check_region_videos(rule_params=config_.RULE_PARAMS_REGION_APP_TYPE_48H)
  231. log_.info("end!")