1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import datetime
- import json
- import traceback
- from my_utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu_new
- from my_config import set_config
- from log import Log
- config_, _ = set_config()
- log_ = Log()
- def top_n_videos_update(dt):
- """更新各端的topN视频"""
- top_n_videos = {}
- # 获取topN数据
- top_n_videos_project = config_.no_ad_videos['topN'].get('project')
- top_n_videos_table = config_.no_ad_videos['topN'].get('table')
- n_list = config_.no_ad_videos['topN'].get('n_list')
- top_n_videos_features = [
- 'apptype', 'videoid', 'rank'
- ]
- data_count = data_check(project=top_n_videos_project, table=top_n_videos_table, dt=dt)
- if data_count > 0:
- top_n_videos_df = get_feature_data(project=top_n_videos_project, table=top_n_videos_table,
- features=top_n_videos_features, dt=dt)
- top_n_videos_df['videoid'] = top_n_videos_df['videoid'].astype(int)
- top_n_videos_df['rank'] = top_n_videos_df['rank'].astype(int)
- app_type_list = list(set(top_n_videos_df['apptype'].tolist()))
- for app_type in app_type_list:
- app_type_df = top_n_videos_df[top_n_videos_df['apptype'] == app_type]
- app_type_item = {}
- for n in n_list:
- n_df = app_type_df[app_type_df['rank'] <= n]
- n_videos = list(n_df['videoid'].tolist())
- app_type_item[f'top{n}'] = n_videos
- top_n_videos[app_type] = app_type_item
- return top_n_videos
- def update_to_redis(no_ad_videos_mapping_list):
- """分端合并数据,更新到redis"""
- redis_data = {}
- for no_ad_videos_mapping in no_ad_videos_mapping_list:
- for app_type, item in no_ad_videos_mapping.items():
- if app_type not in redis_data:
- redis_data[app_type] = item
- else:
- for k, val in item.items():
- redis_data[app_type][k] = val
- # to redis
- no_ad_app_type_list = []
- redis_helper = RedisHelper()
- for app_type, data in redis_data.items():
- key_name = f"{config_.KEY_NAME_PREFIX_NO_AD_VIDEOS}{app_type}"
- redis_helper.set_data_to_redis(key_name=key_name, value=json.dumps(data), expire_time=int(24*3600))
- log_.info(f"to redis: app_type={app_type}, data={data}, key_name={key_name}")
- no_ad_app_type_list.append(int(app_type))
- # 此次无符合要求数据的app_type清空对应数据
- for _, app_type in config_.APP_TYPE.items():
- if app_type not in no_ad_app_type_list:
- key_name = f"{config_.KEY_NAME_PREFIX_NO_AD_VIDEOS}{app_type}"
- redis_helper.del_keys(key_name=key_name)
- log_.info(f"del from redis: app_type={app_type}, key_name={key_name}")
- def main():
- try:
- now_date = datetime.datetime.today()
- dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
- log_.info(f"now_date: {dt}")
- # 当小时分享当小时回流topN视频更新
- top_n_videos = top_n_videos_update(dt=dt)
- log_.info(f"topN: {top_n_videos}")
- # 数据更新至redis
- no_ad_videos_mapping_list = [top_n_videos]
- update_to_redis(no_ad_videos_mapping_list=no_ad_videos_mapping_list)
- log_.info(f"to redis finished!")
- except Exception as e:
- log_.error(f"特定视频不出广告数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- msg_list = [
- f"env: rov-offline {config_.ENV_TEXT}",
- f"now time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}",
- f"exception: {e}",
- f"traceback: {traceback.format_exc()}",
- ]
- send_msg_to_feishu_new(
- webhook=config_.FEISHU_ROBOT['ad_video_update_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['ad_video_update_robot'].get('key_word'),
- title='特定视频不出广告数据更新失败',
- msg_list=msg_list
- )
- if __name__ == '__main__':
- main()
|