recommend_region_data_dup.py 14 KB


  1. import multiprocessing
  2. import os
  3. import sys
  4. import time
  5. import traceback
  6. import gevent
  7. import datetime
  8. import pandas as pd
  9. import math
  10. from functools import reduce
  11. from odps import ODPS
  12. from threading import Timer, Thread
  13. from utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
  14. check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
  15. from config import set_config
  16. from log import Log
  17. from check_video_limit_distribute import update_limit_video_score
  18. config_, _ = set_config()
  19. log_ = Log()
  20. region_code = config_.REGION_CODE
  21. def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
  22. redis_helper = RedisHelper()
  23. if redis_helper.key_exists(key_name=initial_key_name):
  24. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  25. # 屏蔽视频过滤
  26. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  27. shield_key_name_list = shield_config.get(region, None)
  28. if shield_key_name_list is not None:
  29. initial_video_ids = filter_shield_video(video_ids=initial_video_ids,
  30. shield_key_name_list=shield_key_name_list)
  31. # 涉政视频过滤
  32. if political_filter is True:
  33. initial_video_ids = filter_political_videos(video_ids=initial_video_ids)
  34. dup_data = {}
  35. # 视频去重逻辑
  36. if dup_remove is True:
  37. for video_id, score in initial_data:
  38. if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids:
  39. dup_data[int(video_id)] = score
  40. h_video_ids.append(int(video_id))
  41. else:
  42. for video_id, score in initial_data:
  43. if int(video_id) in initial_video_ids:
  44. dup_data[int(video_id)] = score
  45. if len(dup_data) > 0:
  46. redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=2 * 24 * 3600)
  47. # 限流视频score调整
  48. update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name)
  49. return h_video_ids
  50. def dup_to_redis(now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key,
  51. region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove):
  52. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  53. # ##### 获取地域小时级数据
  54. region_h_key_name = \
  55. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  56. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  57. redis_helper = RedisHelper()
  58. if redis_helper.key_exists(key_name=region_h_key_name):
  59. region_h_data = redis_helper.get_all_data_from_zset(key_name=region_h_key_name, with_scores=True)
  60. h_video_ids = [int(video_id) for video_id, _ in region_h_data]
  61. else:
  62. h_video_ids = []
  63. # ##### 去重更新不区分地域小时级列表,并另存为redis中
  64. if h_rule_key is not None:
  65. h_key_name = \
  66. f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \
  67. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  68. h_dup_key_name = \
  69. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \
  70. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  71. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name,
  72. dup_key_name=h_dup_key_name, region=region, political_filter=political_filter,
  73. shield_config=shield_config, dup_remove=dup_remove)
  74. # ##### 去重更新地域分组小时级24h列表,并另存为redis中
  75. region_24h_key_name = \
  76. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
  77. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  78. region_24h_dup_key_name = \
  79. f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  80. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  81. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
  82. dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter,
  83. shield_config=shield_config, dup_remove=dup_remove)
  84. if rule_rank_h_flag == '48h':
  85. # ##### 去重小程序相对48h更新结果,并另存为redis中
  86. h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \
  87. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  88. h_48h_dup_key_name = \
  89. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  90. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  91. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
  92. dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter,
  93. shield_config=shield_config, dup_remove=dup_remove)
  94. # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
  95. if by_48h_rule_key == 'rule1':
  96. other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \
  97. f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  98. other_h_48h_dup_key_name = \
  99. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  100. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  101. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
  102. dup_key_name=other_h_48h_dup_key_name, region=region,
  103. political_filter=political_filter, shield_config=shield_config,
  104. dup_remove=dup_remove)
  105. else:
  106. # ##### 去重小程序相对24h更新结果,并另存为redis中
  107. h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
  108. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  109. h_24h_dup_key_name = \
  110. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  111. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  112. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
  113. dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter,
  114. shield_config=shield_config, dup_remove=dup_remove)
  115. # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
  116. # if by_24h_rule_key in ['rule3', 'rule4']:
  117. other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
  118. f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  119. other_h_24h_dup_key_name = \
  120. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  121. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  122. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
  123. dup_key_name=other_h_24h_dup_key_name, region=region, political_filter=political_filter,
  124. shield_config=shield_config, dup_remove=dup_remove)
  125. def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, shield_config):
  126. """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤"""
  127. log_.info(f"city_code = {city_code} start ...")
  128. redis_helper = RedisHelper()
  129. key_prefix_list = [
  130. config_.RECALL_KEY_NAME_PREFIX_DUP_H_H, # 不区分地域小时级
  131. config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域相对24h
  132. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h
  133. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h筛选后
  134. ]
  135. for key_prefix in key_prefix_list:
  136. region_key = f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  137. city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  138. if not redis_helper.key_exists(key_name=region_key):
  139. continue
  140. region_data = redis_helper.get_all_data_from_zset(key_name=region_key, with_scores=True)
  141. if not region_data:
  142. continue
  143. # 屏蔽视频过滤
  144. region_video_ids = [int(video_id) for video_id, _ in region_data]
  145. shield_key_name_list = shield_config.get(city_code, None)
  146. # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None)
  147. if shield_key_name_list is not None:
  148. filtered_video_ids = filter_shield_video(video_ids=region_video_ids,
  149. shield_key_name_list=shield_key_name_list)
  150. else:
  151. filtered_video_ids = region_video_ids
  152. city_data = {}
  153. for video_id, score in region_data:
  154. if int(video_id) in filtered_video_ids:
  155. city_data[int(video_id)] = score
  156. if len(city_data) > 0:
  157. redis_helper.add_data_with_zset(key_name=city_key, data=city_data, expire_time=2 * 24 * 3600)
  158. log_.info(f"city_code = {city_code} end!")
  159. def dup_with_param(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag):
  160. log_.info(f"param = {param} start...")
  161. data_key = param.get('data')
  162. data_param = data_params_item.get(data_key)
  163. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  164. rule_key = param.get('rule')
  165. rule_param = rule_params_item.get(rule_key)
  166. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  167. h_rule_key = param.get('h_rule_key', None)
  168. region_24h_rule_key = param.get('region_24h_rule_key', 'rule1')
  169. by_24h_rule_key = param.get('24h_rule_key', None)
  170. by_48h_rule_key = param.get('48h_rule_key', None)
  171. dup_remove = param.get('dup_remove', True)
  172. # 屏蔽视频过滤
  173. shield_config = param.get('shield_config', config_.SHIELD_CONFIG)
  174. # 涉政视频过滤
  175. political_filter = param.get('political_filter', None)
  176. task_list = [
  177. gevent.spawn(dup_to_redis,
  178. now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key,
  179. by_48h_rule_key, region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove)
  180. for region in region_code_list
  181. ]
  182. gevent.joinall(task_list)
  183. # 特殊城市视频数据准备
  184. for region, city_list in config_.REGION_CITY_MAPPING.items():
  185. t = [
  186. gevent.spawn(
  187. copy_data_for_city,
  188. region, city_code, data_key, rule_key, now_date, now_h, shield_config
  189. )
  190. for city_code in city_list
  191. ]
  192. gevent.joinall(t)
  193. log_.info(f"param = {param} end!")
  194. def dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params):
  195. # 获取特征数据
  196. data_params_item = rule_params.get('data_params')
  197. rule_params_item = rule_params.get('rule_params')
  198. params_list = rule_params.get('params_list')
  199. pool = multiprocessing.Pool(processes=len(params_list))
  200. for param in params_list:
  201. pool.apply_async(
  202. func=dup_with_param,
  203. args=(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag)
  204. )
  205. pool.close()
  206. pool.join()
  207. def h_timer_check():
  208. try:
  209. rule_rank_h_flag = sys.argv[1]
  210. if rule_rank_h_flag == '48h':
  211. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
  212. else:
  213. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
  214. now_date = datetime.datetime.today()
  215. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
  216. now_h = datetime.datetime.now().hour
  217. region_code_list = [code for region, code in region_code.items()]
  218. # 获取数据更新状态
  219. redis_helper = RedisHelper()
  220. rule_24h_status = redis_helper.get_data_from_redis(
  221. key_name=f"{config_.RULE_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  222. region_24h_status = redis_helper.get_data_from_redis(
  223. key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  224. rule_h_status = redis_helper.get_data_from_redis(
  225. key_name=f"{config_.RULE_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  226. region_h_status = redis_helper.get_data_from_redis(
  227. key_name=f"{config_.REGION_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  228. log_.info(f"rule_24h_status: {rule_24h_status}, region_24h_status: {region_24h_status}, "
  229. f"rule_h_status: {rule_h_status}, region_h_status: {region_h_status}")
  230. if rule_24h_status == '1' and region_24h_status == '1' and rule_h_status == '1' and region_h_status == '1':
  231. dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params)
  232. else:
  233. # 数据未更新好,1分钟后重新检查
  234. Timer(60, h_timer_check).start()
  235. except Exception as e:
  236. log_.error(f"地域相关推荐数据去重更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  237. send_msg_to_feishu(
  238. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  239. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  240. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域相关推荐数据去重更新失败\n"
  241. f"exception: {e}\n"
  242. f"traceback: {traceback.format_exc()}"
  243. )
  244. if __name__ == '__main__':
  245. log_.info(f"recommend region data dup start...")
  246. h_timer_check()
  247. log_.info(f"recommend region data dup end!")