import datetime
import json
import time
import traceback
from threading import Timer
from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
from config import set_config
from log import Log
config_, _ = set_config()
log_ = Log()
redis_helper = RedisHelper()
features = [
'dt',
'dau',
'ad_own_view', # 自营广告曝光次数
'ad_vx_view', # 微信广告曝光次数
]
def update_ad_arpu(project, table, dt):
"""更新上一周期的arpu值到redis中"""
user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
user_group_initial_df['dau'].fillna(0, inplace=True)
user_group_initial_df['ad_own_view'].fillna(0, inplace=True)
user_group_initial_df['ad_vx_view'].fillna(0, inplace=True)
user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int)
user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int)
user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int)
dau = user_group_initial_df['dau'][0]
if dau == 0:
log_.info(f"数据异常,dau = {dau}")
return
ad_own_view = user_group_initial_df['ad_own_view'][0]
ad_vx_view = user_group_initial_df['ad_vx_view'][0]
ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM)
if ecpm is None:
return
ecpm = json.loads(ecpm)
own_ecpm = ecpm.get('own', 0)
vx_ecpm = ecpm.get('weixin', 0)
if own_ecpm == 0 and vx_ecpm == 0:
return
# 计算上一周期广告收入
income = ad_own_view * float(own_ecpm) / 1000 + ad_vx_view * float(vx_ecpm) / 1000
# 计算上一周期arpu
arpu = income / dau
arpu = round(arpu, 4)
# 写入redis
if arpu >= 0:
redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu)
redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU)
data = {
'dau': dau,
'ad_own_view': ad_own_view,
'ad_vx_view': ad_vx_view,
'ecpm': ecpm,
'own_ecpm': own_ecpm,
'vx_ecpm': vx_ecpm,
'income': income,
'arpu': arpu
}
log_.info(f"data = {data}")
log_.info(f"update arpu finished!")
def timer_check(dt):
try:
project = 'loghubods'
# table = 'dau_ad_view_hour'
table = 'dau_ad_view_per_5min'
now_min = datetime.datetime.now().minute
log_.info(f"now_min: {now_min}")
# 查看当前更新的数据是否已准备好
data_count = data_check(project=project, table=table, dt=dt)
if data_count > 0:
# 数据准备好,进行更新
update_ad_arpu(project=project, table=table, dt=dt)
log_.info(f"ad arpu update end!")
elif now_min % 5 > 1:
log_.info('数据未准备好!')
return
else:
# 数据没准备好,1分钟后重新检查
Timer(60, timer_check, (dt,)).start()
except Exception as e:
log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
send_msg_to_feishu(
webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n"
f"exception: {e}\n"
f"traceback: {traceback.format_exc()}"
)
if __name__ == '__main__':
now_date = datetime.datetime.today()
# dt_min = datetime.datetime.now().minute
# log_.info(f"dt_min: {dt_min}")
dt = datetime.datetime.strftime(now_date - datetime.timedelta(minutes=5), '%Y%m%d%H%M')
dt = f"{dt}00"
log_.info(f"now_date: {dt}")
timer_check(dt=dt)