import sys import traceback import pandas as pd from db_helper import RedisHelper from my_utils import send_msg_to_feishu from my_config import set_config from log import Log config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() def cal_compose_score(score_hour_path, score_24h_path, merge_score_path): """分值合并""" score_hour_df = pd.read_csv(score_hour_path) score_24h_df = pd.read_csv(score_24h_path) # print(score_hour_df) # print(score_24h_df) score_hour_df['videoid'] = score_hour_df['videoid'].astype(int) score_24h_df['videoid'] = score_24h_df['videoid'].astype(int) score_merge_df = pd.merge(score_hour_df, score_24h_df, on='videoid', how='outer') score_merge_df.fillna(0, inplace=True) # print(score_merge_df) log_.info(f"score_hour_df shape: {score_hour_df.shape}") log_.info(f"score_24h_df shape: {score_24h_df.shape}") log_.info(f"score_merge_df shape: {score_merge_df.shape}") score_merge_df['score1'] = score_merge_df['24h_score1'] + score_merge_df['hour_score1'] # score_merge_df['score2'] = score_merge_df['24h_score1'] + score_merge_df['hour_score2'] # score_merge_df['score3'] = score_merge_df['24h_score1'] + score_merge_df['hour_score3'] score_merge_df['score4'] = score_merge_df['24h_score1'] + score_merge_df['hour_score4'] # score_merge_df['score5'] = score_merge_df['24h_score1'] + score_merge_df['hour_score5'] score_merge_df['score6'] = score_merge_df['24h_score1'] * 0.2 + score_merge_df['hour_score4'] * 0.8 score_merge_df['score7'] = score_merge_df['24h_score2'] + score_merge_df['hour_score4'] score_merge_df['score8'] = score_merge_df['24h_score1'] + score_merge_df['hour_score6'] # print(score_merge_df) log_.info(f"score_merge_df shape: {score_merge_df.shape}") score_merge_df.to_csv(merge_score_path, index=False) score_df = score_merge_df[['videoid', 'score1', 'score4', 'score6', 'score7', 'score8']] log_.info(f"score_df shape: {score_merge_df.shape}") return score_df def score_to_redis(score_df): redis_data = dict() rank_score_key_prefix = 'rank:' score_name_list = score_df.columns.to_list()[1:] for ind, row in score_df.iterrows(): if ind % 1000 == 0: if len(redis_data) > 0: print(ind, len(redis_data)) redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60) redis_data = {} video_id = int(row['videoid']) for score_name in score_name_list: score = row[score_name] rank_score_key = f"{rank_score_key_prefix}{score_name}:{video_id}" redis_data[rank_score_key] = score # print(rank_score_key, score) # redis_helper.set_data_to_redis(key_name=rank_score_key, value=score, expire_time=24*60*60) if len(redis_data) > 0: print(len(redis_data)) redis_helper.update_batch_set_key(data=redis_data, expire_time=24 * 60 * 60) if __name__ == '__main__': try: now_date = sys.argv[1] log_.info(f"now date: {now_date}") score_hour_path = f"./data/hour_score_{now_date}.csv" score_24h_path = f"./data/24h_score_{now_date}.csv" merge_score_path = f"./data/merge_score_{now_date}.csv" score_df = cal_compose_score( score_hour_path=score_hour_path, score_24h_path=score_24h_path, merge_score_path=merge_score_path ) score_to_redis(score_df=score_df) log_.info("rank score update finished!") except Exception as e: log_.error(f"rank 分值合并更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - rank 分值合并更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" )