laohaokan_recommend_update.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import datetime
  2. import gevent
  3. from db_helper import RedisHelper
  4. from utils import send_msg_to_feishu
  5. from config import set_config
  6. from log import Log
  7. config_, env = set_config()
  8. log_ = Log()
  9. initial_param = {'data': 'data1', 'rule': 'rule4'}
  10. new_param = config_.LHK_RULE_PARAMS
  11. redis_helper = RedisHelper()
  12. def get_religion_videos(now_date):
  13. """获取宗教视频列表"""
  14. religion_key_name = f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
  15. if not redis_helper.key_exists(religion_key_name):
  16. redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
  17. religion_key_name = f"{config_.KEY_NAME_PREFIX_RELIGION_VIDEOS}{redis_dt}"
  18. religion_videos = redis_helper.get_all_data_from_zset(key_name=religion_key_name, desc=True, with_scores=True)
  19. if religion_videos is None:
  20. return []
  21. return religion_videos
  22. def merge_process(initial_key_name, new_key_name, now_videos, religion_video_id_list):
  23. initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
  24. if initial_data is None or len(initial_data) == 0:
  25. return now_videos, religion_video_id_list
  26. initial_video_ids = [int(video_id) for video_id, _ in initial_data]
  27. initial_video_ids = [video_id for video_id in initial_video_ids if video_id not in now_videos]
  28. religion_video_id_list = [video_id for video_id in religion_video_id_list if video_id not in initial_video_ids]
  29. if len(religion_video_id_list) == 0:
  30. new_video_ids = initial_video_ids
  31. else:
  32. new_video_ids = []
  33. for i, video_id in enumerate(initial_video_ids):
  34. new_video_ids.append(video_id)
  35. now_videos.append(video_id)
  36. if i % 2 == 1 and len(religion_video_id_list) > 0:
  37. new_video_ids.append(religion_video_id_list[0])
  38. now_videos.append(religion_video_id_list[0])
  39. religion_video_id_list = religion_video_id_list[1:]
  40. # 按照排序给定分数
  41. new_result = {}
  42. step = 100 / (len(new_video_ids) * 2)
  43. for i, video_id in enumerate(new_video_ids):
  44. score = 100 - i * step
  45. new_result[int(video_id)] = score
  46. # 写入新的key中
  47. redis_helper.add_data_with_zset(key_name=new_key_name, data=new_result, expire_time=2 * 24 * 3600)
  48. return now_videos, religion_video_id_list
  49. def merge_with_region(now_date, now_h, region, religion_video_id_list):
  50. initial_data_key = initial_param.get('data')
  51. initial_rule_key = initial_param.get('rule')
  52. new_data_key = new_param.get('data')
  53. new_rule_key = new_param.get('rule')
  54. now_videos = []
  55. # 地域小时级数据合并
  56. region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{initial_data_key}:{initial_rule_key}:" \
  57. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  58. new_region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{new_data_key}:{new_rule_key}:" \
  59. f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  60. now_videos, religion_video_id_list = merge_process(initial_key_name=region_h_key_name,
  61. new_key_name=new_region_h_key_name,
  62. now_videos=now_videos,
  63. religion_video_id_list=religion_video_id_list)
  64. # 地域24h数据合并
  65. region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{initial_data_key}:" \
  66. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  67. new_region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{new_data_key}:" \
  68. f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  69. now_videos, religion_video_id_list = merge_process(initial_key_name=region_24h_key_name,
  70. new_key_name=new_region_24h_key_name,
  71. now_videos=now_videos,
  72. religion_video_id_list=religion_video_id_list)
  73. # 24h筛选数据合并
  74. dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{initial_data_key}:" \
  75. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  76. new_dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{new_data_key}:" \
  77. f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  78. now_videos, religion_video_id_list = merge_process(initial_key_name=dup2_24h_key_name,
  79. new_key_name=new_dup2_24h_key_name,
  80. now_videos=now_videos,
  81. religion_video_id_list=religion_video_id_list)
  82. # 24h筛选后剩余数据合并
  83. dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{initial_data_key}:" \
  84. f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  85. new_dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{new_data_key}:" \
  86. f"{new_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
  87. now_videos, religion_video_id_list = merge_process(initial_key_name=dup3_24h_key_name,
  88. new_key_name=new_dup3_24h_key_name,
  89. now_videos=now_videos,
  90. religion_video_id_list=religion_video_id_list)
  91. log_.info(f"region = {region} update end!")
  92. def merge_videos(now_date, now_h):
  93. """将宗教视频插入到默认视频列表中"""
  94. # 获取宗教视频列表
  95. religion_videos = get_religion_videos(now_date=now_date)
  96. religion_video_id_list = [int(video_id) for video_id, _ in religion_videos]
  97. # 列表合并
  98. region_code_list = [code for region, code in config_.REGION_CODE.items()]
  99. task_list = [
  100. gevent.spawn(merge_with_region, now_date, now_h, region, religion_video_id_list)
  101. for region in region_code_list
  102. ]
  103. gevent.joinall(task_list)
  104. # 特殊城市视频数据准备
  105. for region, city_list in config_.REGION_CITY_MAPPING.items():
  106. t = [
  107. gevent.spawn(
  108. merge_with_region,
  109. now_date, now_h, city_code, religion_video_id_list
  110. )
  111. for city_code in city_list
  112. ]
  113. gevent.joinall(t)
  114. if __name__ == '__main__':
  115. log_.info(f"laohaokan recommend data update start...")
  116. now_date = datetime.datetime.today()
  117. now_h = datetime.datetime.now().hour
  118. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
  119. merge_videos(now_date, now_h)
  120. log_.info(f"laohaokan recommend data update end!")
  121. send_msg_to_feishu(
  122. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  123. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  124. msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新完成\n"
  125. f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
  126. f"now_h: {now_h}\n"
  127. f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}"
  128. )