recommend_region_data_dup.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import multiprocessing
  2. import os
  3. import sys
  4. import time
  5. import traceback
  6. import gevent
  7. import datetime
  8. import pandas as pd
  9. import math
  10. from functools import reduce
  11. from odps import ODPS
  12. from threading import Timer, Thread
  13. from my_utils import MysqlHelper, RedisHelper, get_data_from_odps, filter_video_status, filter_shield_video, \
  14. check_table_partition_exits, filter_video_status_app, send_msg_to_feishu, filter_political_videos
  15. from my_config import set_config
  16. from log import Log
  17. from check_video_limit_distribute import update_limit_video_score
  18. config_, _ = set_config()
  19. log_ = Log()
  20. region_code = config_.REGION_CODE
  21. def dup_data(h_video_ids, initial_key_name, dup_key_name, region, political_filter, shield_config, dup_remove):
  22. redis_helper = RedisHelper()
  23. if redis_helper.key_exists(key_name=initial_key_name):
  24. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  25. # 屏蔽视频过滤
  26. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  27. shield_key_name_list = shield_config.get(region, None)
  28. if shield_key_name_list is not None:
  29. initial_video_ids = filter_shield_video(video_ids=initial_video_ids,
  30. shield_key_name_list=shield_key_name_list)
  31. # 涉政视频过滤
  32. if political_filter is True:
  33. initial_video_ids = filter_political_videos(video_ids=initial_video_ids)
  34. dup_data = {}
  35. # 视频去重逻辑
  36. if dup_remove is True:
  37. for video_id, score in initial_data:
  38. if int(video_id) not in h_video_ids and int(video_id) in initial_video_ids:
  39. dup_data[int(video_id)] = score
  40. h_video_ids.append(int(video_id))
  41. else:
  42. for video_id, score in initial_data:
  43. if int(video_id) in initial_video_ids:
  44. dup_data[int(video_id)] = score
  45. if len(dup_data) > 0:
  46. redis_helper.add_data_with_zset(key_name=dup_key_name, data=dup_data, expire_time=4 * 3600)
  47. # 限流视频score调整
  48. update_limit_video_score(initial_videos=dup_data, key_name=dup_key_name)
  49. return h_video_ids
  50. def dup_to_redis(now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key, by_48h_rule_key,
  51. region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove):
  52. """将地域分组小时级数据与其他召回视频池去重,存入对应的redis"""
  53. log_.info(f"region = {region} dup start ...")
  54. # ##### 获取地域小时级数据
  55. region_h_key_name = \
  56. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{data_key}:{rule_key}:" \
  57. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  58. redis_helper = RedisHelper()
  59. if redis_helper.key_exists(key_name=region_h_key_name):
  60. region_h_data = redis_helper.get_all_data_from_zset(key_name=region_h_key_name, with_scores=True)
  61. h_video_ids = [int(video_id) for video_id, _ in region_h_data]
  62. else:
  63. h_video_ids = []
  64. # ##### 去重更新不区分地域小时级列表,并另存为redis中
  65. if h_rule_key is not None:
  66. h_key_name = \
  67. f"{config_.RECALL_KEY_NAME_PREFIX_BY_H_H}{data_key}:{h_rule_key}:" \
  68. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  69. h_dup_key_name = \
  70. f"{config_.RECALL_KEY_NAME_PREFIX_DUP_H_H}{region}:{data_key}:{rule_key}:" \
  71. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  72. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_key_name,
  73. dup_key_name=h_dup_key_name, region=region, political_filter=political_filter,
  74. shield_config=shield_config, dup_remove=dup_remove)
  75. # ##### 去重更新地域分组小时级24h列表,并另存为redis中
  76. region_24h_key_name = \
  77. f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_24H}{region}:{data_key}:{region_24h_rule_key}:" \
  78. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  79. region_24h_dup_key_name = \
  80. f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  81. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  82. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=region_24h_key_name,
  83. dup_key_name=region_24h_dup_key_name, region=region, political_filter=political_filter,
  84. shield_config=shield_config, dup_remove=dup_remove)
  85. if rule_rank_h_flag == '48h':
  86. # ##### 去重小程序相对48h更新结果,并另存为redis中
  87. h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H}{data_key}:{by_48h_rule_key}:" \
  88. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  89. h_48h_dup_key_name = \
  90. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  91. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  92. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_48h_key_name,
  93. dup_key_name=h_48h_dup_key_name, region=region, political_filter=political_filter,
  94. shield_config=shield_config, dup_remove=dup_remove)
  95. # ##### 去重小程序相对48h 筛选后剩余数据 更新结果,并另存为redis中
  96. if by_48h_rule_key == 'rule1':
  97. other_h_48h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_48H_OTHER}{data_key}:" \
  98. f"{by_48h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  99. other_h_48h_dup_key_name = \
  100. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_48H_H}{region}:{data_key}:{rule_key}:" \
  101. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  102. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_48h_key_name,
  103. dup_key_name=other_h_48h_dup_key_name, region=region,
  104. political_filter=political_filter, shield_config=shield_config,
  105. dup_remove=dup_remove)
  106. else:
  107. # ##### 去重小程序相对24h更新结果,并另存为redis中
  108. h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H}{data_key}:{by_24h_rule_key}:" \
  109. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  110. h_24h_dup_key_name = \
  111. f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  112. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  113. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=h_24h_key_name,
  114. dup_key_name=h_24h_dup_key_name, region=region, political_filter=political_filter,
  115. shield_config=shield_config, dup_remove=dup_remove)
  116. # ##### 去重小程序相对24h 筛选后剩余数据 更新结果,并另存为redis中
  117. # if by_24h_rule_key in ['rule3', 'rule4']:
  118. other_h_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_BY_24H_OTHER}{data_key}:" \
  119. f"{by_24h_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  120. other_h_24h_dup_key_name = \
  121. f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{data_key}:{rule_key}:" \
  122. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  123. h_video_ids = dup_data(h_video_ids=h_video_ids, initial_key_name=other_h_24h_key_name,
  124. dup_key_name=other_h_24h_dup_key_name, region=region, political_filter=political_filter,
  125. shield_config=shield_config, dup_remove=dup_remove)
  126. log_.info(f"region = {region} dup end!")
  127. def copy_data_for_city(region, city_code, data_key, rule_key, now_date, now_h, shield_config):
  128. """copy 对应数据到城市对应redis,并做相应屏蔽视频过滤"""
  129. log_.info(f"city_code = {city_code} start ...")
  130. redis_helper = RedisHelper()
  131. key_prefix_list = [
  132. config_.RECALL_KEY_NAME_PREFIX_DUP_H_H, # 不区分地域小时级
  133. config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H, # 地域相对24h
  134. config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H, # 不区分地域相对24h
  135. config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H, # 不区分地域相对24h筛选后
  136. ]
  137. for key_prefix in key_prefix_list:
  138. region_key = f"{key_prefix}{region}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  139. city_key = f"{key_prefix}{city_code}:{data_key}:{rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  140. if not redis_helper.key_exists(key_name=region_key):
  141. continue
  142. region_data = redis_helper.get_all_data_from_zset(key_name=region_key, with_scores=True)
  143. if not region_data:
  144. continue
  145. # 屏蔽视频过滤
  146. region_video_ids = [int(video_id) for video_id, _ in region_data]
  147. shield_key_name_list = shield_config.get(city_code, None)
  148. # shield_key_name_list = config_.SHIELD_CONFIG.get(city_code, None)
  149. if shield_key_name_list is not None:
  150. filtered_video_ids = filter_shield_video(video_ids=region_video_ids,
  151. shield_key_name_list=shield_key_name_list)
  152. else:
  153. filtered_video_ids = region_video_ids
  154. city_data = {}
  155. for video_id, score in region_data:
  156. if int(video_id) in filtered_video_ids:
  157. city_data[int(video_id)] = score
  158. if len(city_data) > 0:
  159. redis_helper.add_data_with_zset(key_name=city_key, data=city_data, expire_time=2 * 24 * 3600)
  160. log_.info(f"city_code = {city_code} end!")
  161. def dup_with_param(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag):
  162. log_.info(f"param = {param} start...")
  163. data_key = param.get('data')
  164. data_param = data_params_item.get(data_key)
  165. log_.info(f"data_key = {data_key}, data_param = {data_param}")
  166. rule_key = param.get('rule')
  167. rule_param = rule_params_item.get(rule_key)
  168. log_.info(f"rule_key = {rule_key}, rule_param = {rule_param}")
  169. h_rule_key = rule_param.get('h_rule_key', None)
  170. region_24h_rule_key = rule_param.get('region_24h_rule_key', 'rule1')
  171. by_24h_rule_key = rule_param.get('24h_rule_key', None)
  172. by_48h_rule_key = rule_param.get('48h_rule_key', None)
  173. dup_remove = rule_param.get('dup_remove', True)
  174. # 屏蔽视频过滤
  175. shield_config = rule_param.get('shield_config', config_.SHIELD_CONFIG)
  176. # 涉政视频过滤
  177. political_filter = rule_param.get('political_filter', None)
  178. task_list = [
  179. gevent.spawn(dup_to_redis,
  180. now_date, now_h, rule_key, h_rule_key, region_24h_rule_key, by_24h_rule_key,
  181. by_48h_rule_key, region, data_key, rule_rank_h_flag, political_filter, shield_config, dup_remove)
  182. for region in region_code_list
  183. ]
  184. gevent.joinall(task_list)
  185. # 特殊城市视频数据准备
  186. for region, city_list in config_.REGION_CITY_MAPPING.items():
  187. t = [
  188. gevent.spawn(
  189. copy_data_for_city,
  190. region, city_code, data_key, rule_key, now_date, now_h, shield_config
  191. )
  192. for city_code in city_list
  193. ]
  194. gevent.joinall(t)
  195. log_.info(f"param = {param} end!")
  196. def dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params):
  197. # 获取特征数据
  198. data_params_item = rule_params.get('data_params')
  199. rule_params_item = rule_params.get('rule_params')
  200. params_list = rule_params.get('params_list')
  201. pool = multiprocessing.Pool(processes=len(params_list))
  202. for param in params_list:
  203. pool.apply_async(
  204. func=dup_with_param,
  205. args=(param, data_params_item, rule_params_item, region_code_list, now_date, now_h, rule_rank_h_flag)
  206. )
  207. pool.close()
  208. pool.join()
  209. def h_timer_check():
  210. try:
  211. rule_rank_h_flag = sys.argv[1]
  212. if rule_rank_h_flag == '48h':
  213. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE_48H
  214. else:
  215. rule_params = config_.RULE_PARAMS_REGION_APP_TYPE
  216. now_date = datetime.datetime.today()
  217. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}, rule_rank_h_flag: {rule_rank_h_flag}")
  218. now_h = datetime.datetime.now().hour
  219. region_code_list = [code for region, code in region_code.items()]
  220. # 获取数据更新状态
  221. redis_helper = RedisHelper()
  222. rule_24h_status = redis_helper.get_data_from_redis(
  223. key_name=f"{config_.RULE_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  224. region_24h_status = redis_helper.get_data_from_redis(
  225. key_name=f"{config_.REGION_24H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  226. rule_h_status = redis_helper.get_data_from_redis(
  227. key_name=f"{config_.RULE_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  228. region_h_status = redis_helper.get_data_from_redis(
  229. key_name=f"{config_.REGION_H_DATA_STATUS}:{datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  230. log_.info(f"rule_24h_status: {rule_24h_status}, region_24h_status: {region_24h_status}, "
  231. f"rule_h_status: {rule_h_status}, region_h_status: {region_h_status}")
  232. if rule_24h_status == '1' and region_24h_status == '1' and rule_h_status == '1' and region_h_status == '1':
  233. dup_task(now_date, now_h, region_code_list, rule_rank_h_flag, rule_params)
  234. log_.info(f"recommend region data dup end!")
  235. else:
  236. # 数据未更新好,1分钟后重新检查
  237. Timer(60, h_timer_check).start()
  238. except Exception as e:
  239. log_.error(f"地域相关推荐数据去重更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  240. send_msg_to_feishu(
  241. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  242. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  243. msg_text=f"rov-offline{config_.ENV_TEXT} - 地域相关推荐数据去重更新失败\n"
  244. f"exception: {e}\n"
  245. f"traceback: {traceback.format_exc()}"
  246. )
  247. if __name__ == '__main__':
  248. log_.info(f"recommend region data dup start...")
  249. h_timer_check()