123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- import datetime
- import traceback
- import gevent
- from db_helper import RedisHelper
- from my_utils import send_msg_to_feishu
- from my_config import set_config
- from log import Log
- config_, env = set_config()
- log_ = Log()
- # initial_param = {'data': 'data1', 'rule': 'rule4'}
- # new_param = config_.LHK_RULE_PARAMS
- redis_helper = RedisHelper()
- def get_religion_videos(now_date, religion_name):
- """获取宗教视频列表"""
- key_name_prefix = config_.RELIGION_VIDEOS[religion_name].get('key_name_prefix')
- religion_key_name = f"{key_name_prefix}{datetime.datetime.strftime(now_date, '%Y%m%d')}"
- if not redis_helper.key_exists(religion_key_name):
- redis_dt = datetime.datetime.strftime((now_date - datetime.timedelta(days=1)), '%Y%m%d')
- religion_key_name = f"{key_name_prefix}{redis_dt}"
- religion_videos = redis_helper.get_all_data_from_zset(key_name=religion_key_name, desc=True, with_scores=True)
- if religion_videos is None:
- return []
- return religion_videos
- def merge_process(initial_key_name, new_key_name, now_videos, religion_video_id_list, rank_count):
- initial_data = redis_helper.get_all_data_from_zset(key_name=initial_key_name, with_scores=True)
- if initial_data is None or len(initial_data) == 0:
- return now_videos, religion_video_id_list
- initial_video_ids = [int(video_id) for video_id, _ in initial_data]
- initial_video_ids = [video_id for video_id in initial_video_ids if video_id not in now_videos]
- religion_video_id_list = [video_id for video_id in religion_video_id_list if video_id not in initial_video_ids]
- if len(religion_video_id_list) == 0:
- new_video_ids = initial_video_ids
- else:
- new_video_ids = []
- for i, video_id in enumerate(initial_video_ids):
- new_video_ids.append(video_id)
- now_videos.append(video_id)
- if i % rank_count == 1 and len(religion_video_id_list) > 0:
- new_video_ids.append(religion_video_id_list[0])
- now_videos.append(religion_video_id_list[0])
- religion_video_id_list = religion_video_id_list[1:]
- # 按照排序给定分数
- new_result = {}
- step = 100 / (len(new_video_ids) * 2)
- for i, video_id in enumerate(new_video_ids):
- score = 100 - i * step
- new_result[int(video_id)] = score
- # 写入新的key中
- redis_helper.add_data_with_zset(key_name=new_key_name, data=new_result, expire_time=2 * 24 * 3600)
- return now_videos, religion_video_id_list
- def merge_with_region(now_date, now_h, region, religion_video_id_list,
- initial_param, lhk_data_key, lhk_rule_key, rank_count):
- initial_data_key = initial_param.get('data')
- initial_rule_key = initial_param.get('rule')
- now_videos = []
- # 地域小时级数据合并
- region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{initial_data_key}:{initial_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- new_region_h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_REGION_BY_H}{region}:{lhk_data_key}:{lhk_rule_key}:" \
- f"{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- now_videos, religion_video_id_list = merge_process(initial_key_name=region_h_key_name,
- new_key_name=new_region_h_key_name,
- now_videos=now_videos,
- religion_video_id_list=religion_video_id_list,
- rank_count=rank_count)
- # 地域24h数据合并
- region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{initial_data_key}:" \
- f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- new_region_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP1_REGION_24H_H}{region}:{lhk_data_key}:" \
- f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- now_videos, religion_video_id_list = merge_process(initial_key_name=region_24h_key_name,
- new_key_name=new_region_24h_key_name,
- now_videos=now_videos,
- religion_video_id_list=religion_video_id_list,
- rank_count=rank_count)
- # 24h筛选数据合并
- dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{initial_data_key}:" \
- f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- new_dup2_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP2_REGION_24H_H}{region}:{lhk_data_key}:" \
- f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- now_videos, religion_video_id_list = merge_process(initial_key_name=dup2_24h_key_name,
- new_key_name=new_dup2_24h_key_name,
- now_videos=now_videos,
- religion_video_id_list=religion_video_id_list,
- rank_count=rank_count)
- # 24h筛选后剩余数据合并
- dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{initial_data_key}:" \
- f"{initial_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- new_dup3_24h_key_name = f"{config_.RECALL_KEY_NAME_PREFIX_DUP3_REGION_24H_H}{region}:{lhk_data_key}:" \
- f"{lhk_rule_key}:{datetime.datetime.strftime(now_date, '%Y%m%d')}:{now_h}"
- now_videos, religion_video_id_list = merge_process(initial_key_name=dup3_24h_key_name,
- new_key_name=new_dup3_24h_key_name,
- now_videos=now_videos,
- religion_video_id_list=religion_video_id_list,
- rank_count=rank_count)
- log_.info(f"region = {region} update end!")
- def merge_videos(now_date, now_h, param, rule_params):
- """将宗教视频插入到默认视频列表中"""
- # 获取宗教视频列表
- lhk_data_key = param.get('data')
- lhk_rule_key = param.get('rule')
- religion_name = rule_params.get(lhk_rule_key).get('religion_name')
- initial_param = rule_params.get(lhk_rule_key).get('initial_param')
- rank_count = rule_params.get(lhk_rule_key).get('rank_count')
- religion_videos = get_religion_videos(now_date=now_date, religion_name=religion_name)
- religion_video_id_list = [int(video_id) for video_id, _ in religion_videos]
- # 列表合并
- region_code_list = [code for region, code in config_.REGION_CODE.items()]
- task_list = [
- gevent.spawn(
- merge_with_region,
- now_date, now_h, region, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count
- )
- for region in region_code_list
- ]
- gevent.joinall(task_list)
- # 特殊城市视频数据准备
- for region, city_list in config_.REGION_CITY_MAPPING.items():
- t = [
- gevent.spawn(
- merge_with_region,
- now_date, now_h, city_code, religion_video_id_list, initial_param, lhk_data_key, lhk_rule_key, rank_count
- )
- for city_code in city_list
- ]
- gevent.joinall(t)
- def main():
- try:
- log_.info(f"laohaokan recommend data update start...")
- now_date = datetime.datetime.today()
- now_h = datetime.datetime.now().hour
- log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d%H')}")
- lhk_rule_params = config_.LHK_RULE_PARAMS
- rule_params = lhk_rule_params.get('rule_params', {})
- params_list = lhk_rule_params.get('params_list', [])
- for param in params_list:
- log_.info(f"param = {param} update start...")
- merge_videos(now_date, now_h, param, rule_params)
- log_.info(f"param = {param} update end!")
- log_.info(f"laohaokan recommend data update end!")
- # send_msg_to_feishu(
- # webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- # key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- # msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新完成\n"
- # f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}\n"
- # f"now_h: {now_h}\n"
- # f"finished time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}"
- # )
- except Exception as e:
- log_.error(f"老好看推荐视频数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 老好看推荐视频数据更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- main()
|