|
@@ -0,0 +1,99 @@
|
|
|
+import datetime
|
|
|
+import json
|
|
|
+import traceback
|
|
|
+from threading import Timer
|
|
|
+from utils import RedisHelper, data_check, get_feature_data, send_msg_to_feishu
|
|
|
+from config import set_config
|
|
|
+from log import Log
|
|
|
+config_, _ = set_config()
|
|
|
+log_ = Log()
|
|
|
+redis_helper = RedisHelper()
|
|
|
+
|
|
|
+features = [
|
|
|
+ 'dt',
|
|
|
+ 'dau',
|
|
|
+ 'ad_own_view', # 自营广告曝光次数
|
|
|
+ 'ad_vx_view', # 微信广告曝光次数
|
|
|
+]
|
|
|
+
|
|
|
+
|
|
|
+def update_ad_arpu(project, table, dt):
|
|
|
+ """更新上一周期的arpu值到redis中"""
|
|
|
+ user_group_initial_df = get_feature_data(project=project, table=table, features=features, dt=dt)
|
|
|
+ user_group_initial_df['dau'].fillna(0, inplace=True)
|
|
|
+ user_group_initial_df['ad_own_view'].fillna(0, inplace=True)
|
|
|
+ user_group_initial_df['ad_vx_view'].fillna(0, inplace=True)
|
|
|
+ user_group_initial_df['dau'] = user_group_initial_df['dau'].astype(int)
|
|
|
+ user_group_initial_df['ad_own_view'] = user_group_initial_df['ad_own_view'].astype(int)
|
|
|
+ user_group_initial_df['ad_vx_view'] = user_group_initial_df['ad_vx_view'].astype(int)
|
|
|
+ dau = user_group_initial_df['dau'][0]
|
|
|
+ if dau == 0:
|
|
|
+ log_.info(f"数据异常,dau = {dau}")
|
|
|
+ return
|
|
|
+ ad_own_view = user_group_initial_df['ad_own_view'][0]
|
|
|
+ ad_vx_view = user_group_initial_df['ad_vx_view'][0]
|
|
|
+ ecpm = redis_helper.get_data_from_redis(key_name=config_.KEY_NAME_AD_ECPM)
|
|
|
+ if ecpm is None:
|
|
|
+ return
|
|
|
+ ecpm = json.loads(ecpm)
|
|
|
+ own_ecpm = ecpm.get('own', 0)
|
|
|
+ vx_ecpm = ecpm.get('vx', 0)
|
|
|
+ if own_ecpm == 0 and vx_ecpm == 0:
|
|
|
+ return
|
|
|
+ # 计算上一周期广告收入
|
|
|
+ income = ad_own_view * float(own_ecpm) + ad_vx_view * float(vx_ecpm)
|
|
|
+ # 计算上一周期arpu
|
|
|
+ arpu = income / dau
|
|
|
+ arpu = round(arpu, 4)
|
|
|
+ # 写入redis
|
|
|
+ if arpu >= 0:
|
|
|
+ redis_helper.set_data_to_redis(key_name=config_.KEY_NAME_AD_ARPU, value=arpu)
|
|
|
+ redis_helper.persist_key(key_name=config_.KEY_NAME_AD_ARPU)
|
|
|
+ data = {
|
|
|
+ 'dau': dau,
|
|
|
+ 'ad_own_view': ad_own_view,
|
|
|
+ 'ad_vx_view': ad_vx_view,
|
|
|
+ 'ecpm': ecpm,
|
|
|
+ 'own_ecpm': own_ecpm,
|
|
|
+ 'vx_ecpm': vx_ecpm,
|
|
|
+ 'income': income,
|
|
|
+ 'arpu': arpu
|
|
|
+ }
|
|
|
+ log_.info(f"data = {data}")
|
|
|
+ log_.info(f"update arpu finished!")
|
|
|
+
|
|
|
+
|
|
|
+def timer_check():
|
|
|
+ try:
|
|
|
+ project = 'loghubods'
|
|
|
+ table = 'dau_ad_view_hour'
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ now_min = datetime.datetime.now().minute
|
|
|
+ dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
|
|
|
+ log_.info(f"now_date: {dt}")
|
|
|
+ # 查看当前更新的数据是否已准备好
|
|
|
+ data_count = data_check(project=project, table=table, dt=dt)
|
|
|
+ if data_count > 0:
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ update_ad_arpu(project=project, table=table, dt=dt)
|
|
|
+ log_.info(f"ad arpu update end!")
|
|
|
+ elif now_min > 30:
|
|
|
+ log_.info('数据未准备好!')
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, timer_check).start()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(f"新策略 -- arpu值更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
+ send_msg_to_feishu(
|
|
|
+ webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
|
|
|
+ key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
|
|
|
+ msg_text=f"rov-offline{config_.ENV_TEXT} - 新策略 -- arpu值更新失败\n"
|
|
|
+ f"exception: {e}\n"
|
|
|
+ f"traceback: {traceback.format_exc()}"
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ timer_check()
|