123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import datetime
- import json
- import time
- import traceback
- from threading import Timer
- from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
- from config import set_config
- from log import Log
- config_, _ = set_config()
- log_ = Log()
- redis_helper = RedisHelper()
- features = [
- 'dt',
- 'dau',
- 'ad_own_view', # 自营广告曝光次数
- 'ad_vx_view', # 微信广告曝光次数
- ]
- def update_ad_arpu(project, table, dt):
- """更新上一周期的arpu值到redis中"""
- user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
- user_group_initial_df['dau'].fillna(0, inplace=True)
- user_group_initial_df['ad_own_view'].fillna(0, inplace=True)
- user_group_initial_df['ad_vx_view'].fillna(0, inplace=True)
- user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int)
- user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int)
- user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int)
- dau = user_group_initial_df['dau'][0]
- if dau == 0:
- log_.info(f"数据异常,dau = {dau}")
- return
- ad_own_view = user_group_initial_df['ad_own_view'][0]
- ad_vx_view = user_group_initial_df['ad_vx_view'][0]
- ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM)
- if ecpm is None:
- return
- ecpm = json.loads(ecpm)
- own_ecpm = ecpm.get('own', 0)
- vx_ecpm = ecpm.get('weixin', 0)
- if own_ecpm == 0 and vx_ecpm == 0:
- return
- # 计算上一周期广告收入
- income = ad_own_view * float(own_ecpm) / 1000 + ad_vx_view * float(vx_ecpm) / 1000
- # 计算上一周期arpu
- arpu = income / dau
- arpu = round(arpu, 4)
- # 写入redis
- if arpu >= 0:
- redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu)
- redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU)
- data = {
- 'dau': dau,
- 'ad_own_view': ad_own_view,
- 'ad_vx_view': ad_vx_view,
- 'ecpm': ecpm,
- 'own_ecpm': own_ecpm,
- 'vx_ecpm': vx_ecpm,
- 'income': income,
- 'arpu': arpu
- }
- log_.info(f"data = {data}")
- log_.info(f"update arpu finished!")
- def timer_check(dt):
- try:
- project = 'loghubods'
- # table = 'dau_ad_view_hour'
- table = 'dau_ad_view_per_5min'
- now_min = datetime.datetime.now().minute
- log_.info(f"now_min: {now_min}")
- # 查看当前更新的数据是否已准备好
- data_count = data_check(project=project, table=table, dt=dt)
- if data_count > 0:
- # 数据准备好,进行更新
- update_ad_arpu(project=project, table=table, dt=dt)
- log_.info(f"ad arpu update end!")
- elif now_min % 5 > 1:
- log_.info('数据未准备好!')
- return
- else:
- # 数据没准备好,1分钟后重新检查
- Timer(60, timer_check, (dt,)).start()
- except Exception as e:
- log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- now_date = datetime.datetime.today()
- # dt_min = datetime.datetime.now().minute
- # log_.info(f"dt_min: {dt_min}")
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(minutes=5), '%Y%m%d%H%M')
- dt = f"{dt}00"
- log_.info(f"now_date: {dt}")
- timer_check(dt=dt)
|