ad_arpu_update.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import datetime
  2. import json
  3. import time
  4. import traceback
  5. from threading import Timer
  6. from my_utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
  7. from my_config import set_config
  8. from log import Log
  9. config_, _ = set_config()
  10. log_ = Log()
  11. redis_helper = RedisHelper()
  12. features = [
  13. 'dt',
  14. 'dau',
  15. 'ad_own_view', # 自营广告曝光次数
  16. 'ad_vx_view', # 微信广告曝光次数
  17. ]
  18. def update_ad_arpu(project, table, dt):
  19. """更新上一周期的arpu值到redis中"""
  20. user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
  21. user_group_initial_df['dau'].fillna(0, inplace=True)
  22. user_group_initial_df['ad_own_view'].fillna(0, inplace=True)
  23. user_group_initial_df['ad_vx_view'].fillna(0, inplace=True)
  24. user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int)
  25. user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int)
  26. user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int)
  27. dau = user_group_initial_df['dau'][0]
  28. if dau == 0:
  29. log_.info(f"数据异常,dau = {dau}")
  30. return
  31. ad_own_view = user_group_initial_df['ad_own_view'][0]
  32. ad_vx_view = user_group_initial_df['ad_vx_view'][0]
  33. ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM)
  34. if ecpm is None:
  35. return
  36. ecpm = json.loads(ecpm)
  37. own_ecpm = ecpm.get('own', 0)
  38. vx_ecpm = ecpm.get('weixin', 0)
  39. if own_ecpm == 0 and vx_ecpm == 0:
  40. return
  41. # 计算上一周期广告收入
  42. income = ad_own_view * float(own_ecpm) / 1000 + ad_vx_view * float(vx_ecpm) / 1000
  43. # 计算上一周期arpu
  44. arpu = income / dau
  45. arpu = round(arpu, 4)
  46. # 写入redis
  47. if arpu >= 0:
  48. redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu)
  49. redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU)
  50. data = {
  51. 'dau': dau,
  52. 'ad_own_view': ad_own_view,
  53. 'ad_vx_view': ad_vx_view,
  54. 'ecpm': ecpm,
  55. 'own_ecpm': own_ecpm,
  56. 'vx_ecpm': vx_ecpm,
  57. 'income': income,
  58. 'arpu': arpu
  59. }
  60. log_.info(f"data = {data}")
  61. log_.info(f"update arpu finished!")
  62. def timer_check(dt):
  63. try:
  64. project = 'loghubods'
  65. # table = 'dau_ad_view_hour'
  66. table = 'dau_ad_view_per_5min'
  67. now_min = datetime.datetime.now().minute
  68. log_.info(f"now_min: {now_min}")
  69. # 查看当前更新的数据是否已准备好
  70. data_count = data_check(project=project, table=table, dt=dt)
  71. if data_count > 0:
  72. # 数据准备好,进行更新
  73. update_ad_arpu(project=project, table=table, dt=dt)
  74. log_.info(f"ad arpu update end!")
  75. elif now_min % 5 > 1:
  76. log_.info('数据未准备好!')
  77. return
  78. else:
  79. # 数据没准备好,1分钟后重新检查
  80. Timer(60, timer_check, (dt,)).start()
  81. except Exception as e:
  82. log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  83. send_msg_to_feishu(
  84. webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
  85. key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
  86. msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n"
  87. f"exception: {e}\n"
  88. f"traceback: {traceback.format_exc()}"
  89. )
  90. if __name__ == '__main__':
  91. now_date = datetime.datetime.today()
  92. # dt_min = datetime.datetime.now().minute
  93. # log_.info(f"dt_min: {dt_min}")
  94. dt = datetime.datetime.strftime(now_date - datetime.timedelta(minutes=5), '%Y%m%d%H%M')
  95. dt = f"{dt}00"
  96. log_.info(f"now_date: {dt}")
  97. timer_check(dt=dt)