ad_arpu_update.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import datetime
  2. import json
  3. import traceback
  4. from threading import Timer
  5. from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  6. from config import set_config
  7. from log import Log
  8. config_, _ = set_config()
  9. log_ = Log()
  10. redis_helper = RedisHelper()
  11. features = [
  12. 'dt',
  13. 'dau',
  14. 'ad_own_view', # 自营广告曝光次数
  15. 'ad_vx_view', # 微信广告曝光次数
  16. ]
  17. def update_ad_arpu(project, table, dt):
  18. """更新上一周期的arpu值到redis中"""
  19. user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  20. user_group_initial_df['dau'].fillna(0, inplace=True)
  21. user_group_initial_df['ad_own_view'].fillna(0, inplace=True)
  22. user_group_initial_df['ad_vx_view'].fillna(0, inplace=True)
  23. user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int)
  24. user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int)
  25. user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int)
  26. dau = user_group_initial_df['dau'][0]
  27. if dau == 0:
  28. log_.info(f"数据异常,dau = {dau}")
  29. return
  30. ad_own_view = user_group_initial_df['ad_own_view'][0]
  31. ad_vx_view = user_group_initial_df['ad_vx_view'][0]
  32. ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM)
  33. if ecpm is None:
  34. return
  35. ecpm = json.loads(ecpm)
  36. own_ecpm = ecpm.get('own', 0)
  37. vx_ecpm = ecpm.get('weixin', 0)
  38. if own_ecpm == 0 and vx_ecpm == 0:
  39. return
  40. # 计算上一周期广告收入
  41. income = ad_own_view * float(own_ecpm) / 1000 + ad_vx_view * float(vx_ecpm) / 1000
  42. # 计算上一周期arpu
  43. arpu = income / dau
  44. arpu = round(arpu, 4)
  45. # 写入redis
  46. if arpu >= 0:
  47. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu)
  48. redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU)
  49. data = {
  50. 'dau': dau,
  51. 'ad_own_view': ad_own_view,
  52. 'ad_vx_view': ad_vx_view,
  53. 'ecpm': ecpm,
  54. 'own_ecpm': own_ecpm,
  55. 'vx_ecpm': vx_ecpm,
  56. 'income': income,
  57. 'arpu': arpu
  58. }
  59. log_.info(f"data = {data}")
  60. log_.info(f"update arpu finished!")
  61. def timer_check():
  62. try:
  63. project = 'loghubods'
  64. table = 'dau_ad_view_hour'
  65. now_date = datetime.datetime.today()
  66. now_min = datetime.datetime.now().minute
  67. dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
  68. log_.info(f"now_date: {dt}")
  69. # 查看当前更新的数据是否已准备好
  70. data_count = data_check(project=project, table=table, dt=dt)
  71. if data_count > 0:
  72. # 数据准备好,进行更新
  73. update_ad_arpu(project=project, table=table, dt=dt)
  74. log_.info(f"ad arpu update end!")
  75. elif now_min > 30:
  76. log_.info('数据未准备好!')
  77. return
  78. else:
  79. # 数据没准备好,1分钟后重新检查
  80. Timer(60, timer_check).start()
  81. except Exception as e:
  82. log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  83. send_msg_to_feishu(
  84. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  85. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  86. msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n"
  87. f"exception: {e}\n"
  88. f"traceback: {traceback.format_exc()}"
  89. )
  90. if __name__ == '__main__':
  91. timer_check()