import datetime import json import time import traceback from threading import Timer from my_utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu from my_config import set_config from log import Log config_, _ = set_config() log_ = Log() redis_helper = RedisHelper() features = [ 'dt', 'dau', 'ad_own_view', # 自营广告曝光次数 'ad_vx_view', # 微信广告曝光次数 ] def update_ad_arpu(project, table, dt): """更新上一周期的arpu值到redis中""" user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt) user_group_initial_df['dau'].fillna(0, inplace=True) user_group_initial_df['ad_own_view'].fillna(0, inplace=True) user_group_initial_df['ad_vx_view'].fillna(0, inplace=True) user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int) user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int) user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int) dau = user_group_initial_df['dau'][0] if dau == 0: log_.info(f"数据异常,dau = {dau}") return ad_own_view = user_group_initial_df['ad_own_view'][0] ad_vx_view = user_group_initial_df['ad_vx_view'][0] ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM) if ecpm is None: return ecpm = json.loads(ecpm) own_ecpm = ecpm.get('own', 0) vx_ecpm = ecpm.get('weixin', 0) if own_ecpm == 0 and vx_ecpm == 0: return # 计算上一周期广告收入 income = ad_own_view * float(own_ecpm) / 1000 + ad_vx_view * float(vx_ecpm) / 1000 # 计算上一周期arpu arpu = income / dau arpu = round(arpu, 4) # 写入redis if arpu >= 0: redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu) redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU) data = { 'dau': dau, 'ad_own_view': ad_own_view, 'ad_vx_view': ad_vx_view, 'ecpm': ecpm, 'own_ecpm': own_ecpm, 'vx_ecpm': vx_ecpm, 'income': income, 'arpu': arpu } log_.info(f"data = {data}") log_.info(f"update arpu finished!") def timer_check(dt): try: project = 'loghubods' # table = 'dau_ad_view_hour' table = 'dau_ad_view_per_5min' now_min = datetime.datetime.now().minute log_.info(f"now_min: {now_min}") # 查看当前更新的数据是否已准备好 data_count = data_check(project=project, table=table, dt=dt) if data_count > 0: # 数据准备好,进行更新 update_ad_arpu(project=project, table=table, dt=dt) log_.info(f"ad arpu update end!") elif now_min % 5 > 1: log_.info('数据未准备好!') return else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check, (dt,)).start() except Exception as e: log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': now_date = datetime.datetime.today() # dt_min = datetime.datetime.now().minute # log_.info(f"dt_min: {dt_min}") dt = datetime.datetime.strftime(now_date - datetime.timedelta(minutes=5), '%Y%m%d%H%M') dt = f"{dt}00" log_.info(f"now_date: {dt}") timer_check(dt=dt)