laohaokan_recommend_update.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import datetime
  2. import traceback
  3. import gevent
  4. from db_helper import RedisHelper
  5. from my_utils import send_msg_to_feishu
  6. from my_config import set_config
  7. from log import Log
  8. config_, env = set_config()
  9. log_ = Log()
  10. # initial_param = {'data': 'data1', 'rule': 'rule4'}
  11. # new_param = config_.LHK_RULE_PARAMS
  12. redis_helper = RedisHelper()
  13. def get_religion_videos(now_date, religion_name):
  14. """获取宗教视频列表"""
  15. key_name_prefix = config_.RELIGION_VIDEOS[religion_name].get('key_name_prefix')
  16. religion_key_name = f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  17. if not redis_helper.key_exists(religion_key_name):
  18. redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
  19. religion_key_name = f"{key_name_prefix}{redis_dt}"
  20. religion_videos = redis_helper.get_all_data_from_zset(key_name=religion_key_name, desc=True, with_scores=True)
  21. if religion_videos is None:
  22. return []
  23. return religion_videos
  24. def merge_process(initial_key_name, new_key_name, now_videos, religion_video_id_list, rank_count):
  25. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  26. if initial_data is None or len(initial_data) == 0:
  27. return now_videos, religion_video_id_list
  28. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  29. initial_video_ids = [video_id for video_id in initial_video_ids if video_id not in now_videos]
  30. religion_video_id_list = [video_id for video_id in religion_video_id_list if video_id not in initial_video_ids]
  31. if len(religion_video_id_list) == 0:
  32. new_video_ids = initial_video_ids
  33. else:
  34. new_video_ids = []
  35. for i, video_id in enumerate(initial_video_ids):
  36. new_video_ids.append(video_id)
  37. now_videos.append(video_id)
  38. if i % rank_count == 1 and len(religion_video_id_list) > 0:
  39. new_video_ids.append(religion_video_id_list[0])
  40. now_videos.append(religion_video_id_list[0])
  41. religion_video_id_list = religion_video_id_list[1:]
  42. # 按照排序给定分数
  43. new_result = {}
  44. step = 100 / (len(new_video_ids) * 2)
  45. for i, video_id in enumerate(new_video_ids):
  46. score = 100 - i * step
  47. new_result[int(video_id)] = score
  48. # 写入新的key中
  49. redis_helper.add_data_with_zset(key_name=new_key_name, data=new_result, expire_time=2 * 24 * 3600)
  50. return now_videos, religion_video_id_list
  51. def merge_with_region(now_date, now_h, region, religion_video_id_list,
  52. initial_param, lhk_data_key, lhk_rule_key, rank_count):
  53. initial_data_key = initial_param.get('data')
  54. initial_rule_key = initial_param.get('rule')
  55. now_videos = []
  56. # 地域小时级数据合并
  57. region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{initial_data_key}:{initial_rule_key}:" \
  58. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  59. new_region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{lhk_data_key}:{lhk_rule_key}:" \
  60. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  61. now_videos, religion_video_id_list = merge_process(initial_key_name=region_h_key_name,
  62. new_key_name=new_region_h_key_name,
  63. now_videos=now_videos,
  64. religion_video_id_list=religion_video_id_list,
  65. rank_count=rank_count)
  66. # 地域24h数据合并
  67. region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{initial_data_key}:" \
  68. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  69. new_region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{lhk_data_key}:" \
  70. f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  71. now_videos, religion_video_id_list = merge_process(initial_key_name=region_24h_key_name,
  72. new_key_name=new_region_24h_key_name,
  73. now_videos=now_videos,
  74. religion_video_id_list=religion_video_id_list,
  75. rank_count=rank_count)
  76. # 24h筛选数据合并
  77. dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{initial_data_key}:" \
  78. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  79. new_dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{lhk_data_key}:" \
  80. f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  81. now_videos, religion_video_id_list = merge_process(initial_key_name=dup2_24h_key_name,
  82. new_key_name=new_dup2_24h_key_name,
  83. now_videos=now_videos,
  84. religion_video_id_list=religion_video_id_list,
  85. rank_count=rank_count)
  86. # 24h筛选后剩余数据合并
  87. dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{initial_data_key}:" \
  88. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  89. new_dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{lhk_data_key}:" \
  90. f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  91. now_videos, religion_video_id_list = merge_process(initial_key_name=dup3_24h_key_name,
  92. new_key_name=new_dup3_24h_key_name,
  93. now_videos=now_videos,
  94. religion_video_id_list=religion_video_id_list,
  95. rank_count=rank_count)
  96. log_.info(f"region = {region} update end!")
  97. def merge_videos(now_date, now_h, param, rule_params):
  98. """将宗教视频插入到默认视频列表中"""
  99. # 获取宗教视频列表
  100. lhk_data_key = param.get('data')
  101. lhk_rule_key = param.get('rule')
  102. religion_name = rule_params.get(lhk_rule_key).get('religion_name')
  103. initial_param = rule_params.get(lhk_rule_key).get('initial_param')
  104. rank_count = rule_params.get(lhk_rule_key).get('rank_count')
  105. religion_videos = get_religion_videos(now_date=now_date, religion_name=religion_name)
  106. religion_video_id_list = [int(video_id) for video_id, _ in religion_videos]
  107. # 列表合并
  108. region_code_list = [code for region, code in config_.REGION_CODE.items()]
  109. task_list = [
  110. gevent.spawn(
  111. merge_with_region,
  112. now_date, now_h, region, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count
  113. )
  114. for region in region_code_list
  115. ]
  116. gevent.joinall(task_list)
  117. # 特殊城市视频数据准备
  118. for region, city_list in config_.REGION_CITY_MAPPING.items():
  119. t = [
  120. gevent.spawn(
  121. merge_with_region,
  122. now_date, now_h, city_code, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count
  123. )
  124. for city_code in city_list
  125. ]
  126. gevent.joinall(t)
  127. def main():
  128. try:
  129. log_.info(f"laohaokan recommend data update start...")
  130. now_date = datetime.datetime.today()
  131. now_h = datetime.datetime.now().hour
  132. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  133. lhk_rule_params = config_.LHK_RULE_PARAMS
  134. rule_params = lhk_rule_params.get('rule_params', {})
  135. params_list = lhk_rule_params.get('params_list', [])
  136. for param in params_list:
  137. log_.info(f"param = {param} update start...")
  138. merge_videos(now_date, now_h, param, rule_params)
  139. log_.info(f"param = {param} update end!")
  140. log_.info(f"laohaokan recommend data update end!")
  141. # send_msg_to_feishu(
  142. # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  143. # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  144. # msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新完成\n"
  145. # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
  146. # f"now_h: {now_h}\n"
  147. # f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}"
  148. # )
  149. except Exception as e:
  150. log_.error(f"老好看推荐视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  151. send_msg_to_feishu(
  152. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  153. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  154. msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新失败\n"
  155. f"exception: {e}\n"
  156. f"traceback: {traceback.format_exc()}"
  157. )
  158. if __name__ == '__main__':
  159. main()