liqian %!s(int64=2) %!d(string=hai) anos
pai
achega
7c05bae5fe

+ 13 - 0
ad_threshold_update_task.sh

@@ -0,0 +1,13 @@
+source /etc/profile
+echo $ROV_OFFLINE_ENV
+if [[ $ROV_OFFLINE_ENV == 'test' ]]; then
+    cd /data2/rov-offline &&
+    /root/anaconda3/bin/python /data2/rov-offline/ad_user_data_update.py &&
+    /root/anaconda3/bin/python /data2/rov-offline/ad_video_data_update.py &&
+    /root/anaconda3/bin/python /data2/rov-offline/ad_user_video_predict.py
+elif [[ $ROV_OFFLINE_ENV == 'pro' ]]; then
+    cd /data/rov-offline &&
+    /root/anaconda3/bin/python /data/rov-offline/ad_user_data_update.py &&
+    /root/anaconda3/bin/python /data/rov-offline/ad_video_data_update.py &&
+    /root/anaconda3/bin/python /data/rov-offline/ad_user_video_predict.py
+fi

+ 35 - 7
ad_user_video_predict.py

@@ -1,11 +1,14 @@
 import datetime
+
+import numpy as np
 import pandas as pd
 from odps import ODPS
-from utils import data_check, get_feature_data, send_msg_to_feishu
+from utils import data_check, get_feature_data, send_msg_to_feishu, RedisHelper
 from config import set_config
 from log import Log
 config_, _ = set_config()
 log_ = Log()
+redis_helper = RedisHelper()
 
 
 def predict_user_group_share_rate(dt, app_type):
@@ -59,15 +62,40 @@ def predict_video_share_rate(dt, app_type):
 
 
 def predict_ad_group_video():
-    app_type = config_.APP_TYPE['VLOG']
-    now_date = datetime.datetime.today()
+    now_date = datetime.datetime.today() - datetime.timedelta(days=1)
     dt = datetime.datetime.strftime(now_date, '%Y%m%d')
-    user_group_df = predict_user_group_share_rate(dt=dt, app_type=app_type)
-    video_df = predict_video_share_rate(dt=dt, app_type=app_type)
-    print(f"user_group_df count = {len(user_group_df)}, \nvideo_df count = {len(video_df)}")
+    log_.info(f"dt = {dt}")
+    # 获取用户组预测值
+    group_key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{dt}"
+    group_data = redis_helper.get_all_data_from_zset(key_name=group_key_name, with_scores=True)
+    if group_data is None:
+        log_.info(f"group data is None!")
+    group_df = pd.DataFrame(data=group_data, columns=['group', 'group_ad_share_rate'])
+    group_df = group_df[group_df['group'] != 'mean_group']
+    log_.info(f"group_df count = {len(group_df)}")
+    # 获取视频预测值
+    video_key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{dt}"
+    video_data = redis_helper.get_all_data_from_zset(key_name=video_key_name, with_scores=True)
+    if video_data is None:
+        log_.info(f"video data is None!")
+    video_df = pd.DataFrame(data=video_data, columns=['videoid', 'video_ad_share_rate'])
+    video_df = video_df[video_df['videoid'] != -1]
+    log_.info(f"video_df count = {len(video_df)}")
     predict_df = video_df
-    for index, item in user_group_df.iterrows():
+    threshold_data = {}
+    all_group_data = []
+    for index, item in group_df.iterrows():
         predict_df[item['group']] = predict_df['video_ad_share_rate'] * item['group_ad_share_rate']
+        # 获取分组对应的均值作为阈值
+        threshold_data[item['group']] = predict_df[item['group']].mean()
+        all_group_data.extend(predict_df[item['group']].tolist())
+    threshold_data['mean_group'] = np.mean(all_group_data)
+    log_.info(f"threshold_data = {threshold_data}")
+    # 将阈值写入redis
+    for key, val in threshold_data.items():
+        key_name = f"{config_.KEY_NAME_PREFIX_AD_THRESHOLD}{key}"
+        redis_helper.set_data_to_redis(key_name=key_name, value=val, expire_time=2 * 24 * 3600)
+
     predict_df.to_csv('./data/ad_user_video_predict.csv')
     return predict_df
 

+ 4 - 1
ad_users_data_update.py

@@ -22,6 +22,8 @@ def predict_user_group_share_rate(project, table, dt, app_type):
     user_group_df = get_feature_data(project=project, table=table, features=features, dt=dt)
     user_group_df['apptype'] = user_group_df['apptype'].astype(int)
     user_group_df = user_group_df[user_group_df['apptype'] == app_type]
+    user_group_df['sharerate_all'].fillna(0, inplace=True)
+    user_group_df['sharerate_ad'].fillna(0, inplace=True)
     user_group_df['sharerate_all'] = user_group_df['sharerate_all'].astype(float)
     user_group_df['sharerate_ad'] = user_group_df['sharerate_ad'].astype(float)
     # 获取有广告时所有用户组近30天的分享率
@@ -30,6 +32,7 @@ def predict_user_group_share_rate(project, table, dt, app_type):
     # 计算用户组有广告时分享率
     user_group_df['group_ad_share_rate'] = \
         user_group_df['sharerate_ad'] * float(ad_all_group_share_rate) / user_group_df['sharerate_all']
+    user_group_df['group_ad_share_rate'].fillna(0, inplace=True)
     # 结果写入redis
     key_name = f"{config_.KEY_NAME_PREFIX_AD_GROUP}{dt}"
     redis_data = {}
@@ -48,7 +51,7 @@ def timer_check():
         app_type = config_.APP_TYPE['VLOG']
         project = config_.ad_model_data['users_share_rate'].get('project')
         table = config_.ad_model_data['users_share_rate'].get('table')
-        now_date = datetime.datetime.today()
+        now_date = datetime.datetime.today() - datetime.timedelta(days=1)
         dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         log_.info(f"now_date: {dt}")
         now_min = datetime.datetime.now().minute

+ 5 - 2
ad_video_data_upate.py

@@ -22,7 +22,8 @@ def predict_video_share_rate(project, table, dt, app_type):
     video_df = get_feature_data(project=project, table=table, features=features, dt=dt)
     video_df['apptype'] = video_df['apptype'].astype(int)
     video_df = video_df[video_df['apptype'] == app_type]
-    video_df['videoid'] = video_df['videoid'].astype(int)
+    video_df['sharerate_all'].fillna(0, inplace=True)
+    video_df['sharerate_ad'].fillna(0, inplace=True)
     video_df['sharerate_all'] = video_df['sharerate_all'].astype(float)
     video_df['sharerate_ad'] = video_df['sharerate_ad'].astype(float)
     # 获取有广告时所有视频近30天的分享率
@@ -31,6 +32,8 @@ def predict_video_share_rate(project, table, dt, app_type):
     # 计算视频有广告时分享率
     video_df['video_ad_share_rate'] = \
         video_df['sharerate_ad'] * float(ad_all_videos_share_rate) / video_df['sharerate_all']
+    video_df['video_ad_share_rate'].fillna(0, inplace=True)
+    video_df = video_df[video_df['video_ad_share_rate'] != 0]
     # 结果写入redis
     key_name = f"{config_.KEY_NAME_PREFIX_AD_VIDEO}{dt}"
     redis_data = {}
@@ -49,7 +52,7 @@ def timer_check():
         app_type = config_.APP_TYPE['VLOG']
         project = config_.ad_model_data['videos_share_rate'].get('project')
         table = config_.ad_model_data['videos_share_rate'].get('table')
-        now_date = datetime.datetime.today()
+        now_date = datetime.datetime.today() - datetime.timedelta(days=1)
         dt = datetime.datetime.strftime(now_date, '%Y%m%d')
         log_.info(f"now_date: {dt}")
         now_min = datetime.datetime.now().minute

+ 4 - 2
config.py

@@ -602,6 +602,8 @@ class BaseConfig(object):
     KEY_NAME_PREFIX_AD_VIDEO = 'ad:video:predict:share:rate:'
     # 用户分组结果存放 redis key 前缀,完整格式:mid:group:{mid}
     KEY_NAME_PREFIX_MID_GROUP = 'mid:group:'
+    # 广告推荐阈值结果存放 redis key 前缀,完整格式:ad:threshold:{group}
+    KEY_NAME_PREFIX_AD_THRESHOLD = 'ad:threshold:'
 
 
 class DevelopmentConfig(BaseConfig):
@@ -926,8 +928,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
-    # env = os.environ.get('ROV_OFFLINE_ENV')
-    env = 'dev'
+    env = os.environ.get('ROV_OFFLINE_ENV')
+    # env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 4 - 3
user_group_update.py

@@ -26,11 +26,12 @@ def update_user_group_to_redis(project, table, dt, app_type):
     feature_df = get_feature_data(project=project, table=table, features=features, dt=dt)
     feature_df['apptype'] = feature_df['apptype'].astype(int)
     feature_df = feature_df[feature_df['apptype'] == app_type]
-    print(feature_df)
-    for group in features[1:]:
+    group_list = features[1:]
+    for group in group_list:
         log_.info(f"group = {group} update redis start ...")
         mid_list = feature_df[group].tolist()
-        print(mid_list)
+        mid_list = [mid for mid in mid_list if mid is not None]
+        log_.info(f"mid count = {len(mid_list)}")
         for mid in mid_list:
             key_name = f"{config_.KEY_NAME_PREFIX_MID_GROUP}{mid}"
             redis_helper.set_data_to_redis(key_name=key_name, value=group, expire_time=25 * 3600)