compose_score.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import sys
  2. import traceback
  3. import pandas as pd
  4. from db_helper import RedisHelper
  5. from utils import send_msg_to_feishu
  6. from config import set_config
  7. from log import Log
  8. config_, _ = set_config()
  9. log_ = Log()
  10. redis_helper = RedisHelper()
  11. def cal_compose_score(score_hour_path, score_24h_path, merge_score_path):
  12. """分值合并"""
  13. score_hour_df = pd.read_csv(score_hour_path)
  14. score_24h_df = pd.read_csv(score_24h_path)
  15. # print(score_hour_df)
  16. # print(score_24h_df)
  17. score_hour_df['videoid'] = score_hour_df['videoid'].astype(int)
  18. score_24h_df['videoid'] = score_24h_df['videoid'].astype(int)
  19. score_merge_df = pd.merge(score_hour_df, score_24h_df, on='videoid', how='outer')
  20. score_merge_df.fillna(0, inplace=True)
  21. # print(score_merge_df)
  22. print(f"score_hour_df shape: {score_hour_df.shape}")
  23. print(f"score_24h_df shape: {score_24h_df.shape}")
  24. print(f"score_merge_df shape: {score_merge_df.shape}")
  25. score_merge_df['score1'] = score_merge_df['24h_score1'] + score_merge_df['hour_score1']
  26. score_merge_df['score2'] = score_merge_df['24h_score1'] + score_merge_df['hour_score2']
  27. # score_merge_df['score3'] = score_merge_df['24h_score1'] + score_merge_df['hour_score3']
  28. score_merge_df['score4'] = score_merge_df['24h_score1'] + score_merge_df['hour_score4']
  29. score_merge_df['score5'] = score_merge_df['24h_score1'] + score_merge_df['hour_score5']
  30. # print(score_merge_df)
  31. print(f"score_merge_df shape: {score_merge_df.shape}")
  32. score_merge_df.to_csv(merge_score_path, index=False)
  33. score_df = score_merge_df[['videoid', 'score1', 'score2', 'score4', 'score5']]
  34. print(f"score_df shape: {score_merge_df.shape}")
  35. return score_df
  36. def score_to_redis(score_df):
  37. redis_data = dict()
  38. rank_score_key_prefix = 'rank:'
  39. score_name_list = score_df.columns.to_list()[1:]
  40. for ind, row in score_df.iterrows():
  41. if ind % 1000 == 0:
  42. if len(redis_data) > 0:
  43. print(ind, len(redis_data))
  44. redis_helper.update_batch_set_key(data=redis_data, expire_time=24*60*60)
  45. redis_data = {}
  46. video_id = int(row['videoid'])
  47. for score_name in score_name_list:
  48. score = row[score_name]
  49. rank_score_key = f"{rank_score_key_prefix}{score_name}:{video_id}"
  50. redis_data[rank_score_key] = score
  51. # print(rank_score_key, score)
  52. # redis_helper.set_data_to_redis(key_name=rank_score_key, value=score, expire_time=24*60*60)
  53. if len(redis_data) > 0:
  54. print(len(redis_data))
  55. redis_helper.update_batch_set_key(data=redis_data, expire_time=24 * 60 * 60)
  56. if __name__ == '__main__':
  57. try:
  58. now_date = sys.argv[1]
  59. print("now date:", now_date)
  60. score_hour_path = f"./data/hour_score_{now_date}.csv"
  61. score_24h_path = f"./data/24h_score_{now_date}.csv"
  62. merge_score_path = f"./data/merge_score_{now_date}.csv"
  63. score_df = cal_compose_score(
  64. score_hour_path=score_hour_path, score_24h_path=score_24h_path, merge_score_path=merge_score_path
  65. )
  66. score_to_redis(score_df=score_df)
  67. print("rank score update finished!")
  68. except Exception as e:
  69. log_.error(f"rank 分值合并更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  70. send_msg_to_feishu(
  71. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  72. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  73. msg_text=f"rov-offline{config_.ENV_TEXT} - rank 分值合并更新失败\n"
  74. f"exception: {e}\n"
  75. f"traceback: {traceback.format_exc()}"
  76. )