import datetime import json import traceback from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu_new from config import set_config from log import Log config_, _ = set_config() log_ = Log() def top_n_videos_update(dt): """更新各端的topN视频""" top_n_videos = {} # 获取topN数据 top_n_videos_project = config_.no_ad_videos['topN'].get('project') top_n_videos_table = config_.no_ad_videos['topN'].get('table') n_list = config_.no_ad_videos['topN'].get('n_list') top_n_videos_features = [ 'apptype', 'videoid', 'rank' ] data_count = data_check(project=top_n_videos_project, table=top_n_videos_table, dt=dt) if data_count > 0: top_n_videos_df = get_feature_data(project=top_n_videos_project, table=top_n_videos_table, features=top_n_videos_features, dt=dt) top_n_videos_df['videoid'] = top_n_videos_df['videoid'].astype(int) top_n_videos_df['rank'] = top_n_videos_df['rank'].astype(int) app_type_list = list(set(top_n_videos_df['apptype'].tolist())) for app_type in app_type_list: app_type_df = top_n_videos_df[top_n_videos_df['apptype'] == app_type] app_type_item = {} for n in n_list: n_df = app_type_df[app_type_df['rank'] <= n] n_videos = list(n_df['videoid'].tolist()) app_type_item[f'top{n}'] = n_videos top_n_videos[app_type] = app_type_item return top_n_videos def update_to_redis(no_ad_videos_mapping_list): """分端合并数据,更新到redis""" redis_data = {} for no_ad_videos_mapping in no_ad_videos_mapping_list: for app_type, item in no_ad_videos_mapping.items(): if app_type not in redis_data: redis_data[app_type] = item else: for k, val in item.items(): redis_data[app_type][k] = val # to redis redis_helper = RedisHelper() for app_type, data in redis_data.items(): key_name = f"{config_.KEY_NAME_PREFIX_NO_AD_VIDEOS}{app_type}" redis_helper.set_data_to_redis(key_name=key_name, value=json.dumps(data), expire_time=int(24*3600)) log_.info(f"to redis: app_type={app_type}, data={data}, key_name={key_name}") def main(): try: now_date = datetime.datetime.today() dt = datetime.datetime.strftime(now_date, '%Y%m%d%H') log_.info(f"now_date: {dt}") # 当小时分享当小时回流topN视频更新 top_n_videos = top_n_videos_update(dt=dt) log_.info(f"topN: {top_n_videos}") # 数据更新至redis no_ad_videos_mapping_list = [top_n_videos] update_to_redis(no_ad_videos_mapping_list=no_ad_videos_mapping_list) log_.info(f"to redis finished!") except Exception as e: log_.error(f"特定视频不出广告数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") msg_list = [ f"env: rov-offline {config_.ENV_TEXT}", f"now time: {datetime.datetime.strftime(datetime.datetime.now(), '%Y%m%d %H:%M:%S')}", f"exception: {e}", f"traceback: {traceback.format_exc()}", ] send_msg_to_feishu_new( webhook=config_.FEISHU_ROBOT['ad_video_update_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['ad_video_update_robot'].get('key_word'), title='特定视频不出广告数据更新失败', msg_list=msg_list ) if __name__ == '__main__': main()