liqian 1 anno fa
parent
commit
529993853b

+ 10 - 6
ad_feature_data_sample.py

@@ -32,10 +32,14 @@ if __name__ == '__main__':
     data_dir = './data/train_data'
     sample_data_dir = './data/sample_train_data'
     now_date = datetime.datetime.today()
-    # dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
-    # neg_under_sample(data_dir=data_dir, sample_data_dir=sample_data_dir, dt=dt)
+    print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
+    dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
+    print(f"update data dt: {dt}")
+    neg_under_sample(data_dir=data_dir, sample_data_dir=sample_data_dir, dt=dt)
+    print(f"{time.time() - st_time}s")
 
-    for days in range(3, 19):
-        cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
-        print(f"cur_dt = {cur_dt}")
-        neg_under_sample(data_dir=data_dir, sample_data_dir=sample_data_dir, dt=cur_dt)
+
+    # for days in range(3, 19):
+    #     cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
+    #     print(f"cur_dt = {cur_dt}")
+    #     neg_under_sample(data_dir=data_dir, sample_data_dir=sample_data_dir, dt=cur_dt)

+ 9 - 7
ad_feature_process.py

@@ -165,12 +165,14 @@ if __name__ == '__main__':
     table = 'admodel_data_train'
     # dt = '20230725'
     now_date = datetime.datetime.today()
-    # dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
-    # df = daily_data_process(project=project, table=table, features=features, dt=dt, app_type=0)
-    # print(time.time() - st_time)
+    print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
+    dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=15), '%Y%m%d')
+    print(f"update data dt: {dt}")
+    df = daily_data_process(project=project, table=table, features=features, dt=dt, app_type=0)
+    print(f"{time.time() - st_time}s")
 
-    for days in range(10, 19):
-        cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
-        print(f"cur_dt = {cur_dt}")
-        df = daily_data_process(project=project, table=table, features=features, dt=cur_dt, app_type=0)
+    # for days in range(15, 23):
+    #     cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
+    #     print(f"cur_dt = {cur_dt}")
+    #     df = daily_data_process(project=project, table=table, features=features, dt=cur_dt, app_type=0)
 

+ 4 - 2
ad_generate_train_test.py

@@ -6,12 +6,13 @@ import os
 if __name__ == '__main__':
     now_date = datetime.datetime.today()
     dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    print(f"now_date: {dt}")
     train_test_data_dir = './data/train_test_data'
     if not os.path.exists(train_test_data_dir):
         os.makedirs(train_test_data_dir)
     # 训练集
     data_df_list = []
-    for days in range(4, 19):
+    for days in range(16, 23):
         cur_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=days), '%Y%m%d')
         print(f"cur_dt = {cur_dt}")
         cur_dt_df = pd.read_csv(f"./data/sample_train_data/{cur_dt}.csv")
@@ -20,7 +21,8 @@ if __name__ == '__main__':
     print(f"all data num: {all_df.shape[0]}")
     all_df.to_csv(f'{train_test_data_dir}/train_{dt}.csv', index=False)
     # 测试集
-    test_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=3), '%Y%m%d')
+    test_dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=15), '%Y%m%d')
+    print(f"test data dt: {test_dt}")
     test_df = pd.read_csv(f"./data/sample_train_data/{test_dt}.csv")
     print(f"test data num: {test_df.shape[0]}")
     test_df.to_csv(f'{train_test_data_dir}/test_{dt}.csv', index=False)

+ 27 - 1
ad_model_run.sh

@@ -43,8 +43,34 @@ fi
 
 # 5. 离线预测数据处理
 python ad_predict_video_data_process.py
+if [ $? -ne 0 ];
+then
+  msg = "[ERROR] ad_predict_video_data_process.py"
+  echo $msg
+  exit -1
+fi
+
 python ad_predict_user_data_process.py
+if [ $? -ne 0 ];
+then
+  msg = "[ERROR] ad_predict_user_data_process.py"
+  echo $msg
+  exit -1
+fi
+
 python ad_xgboost_predict_data_generate.py
+if [ $? -ne 0 ];
+then
+  msg = "[ERROR] ad_xgboost_predict_data_generate.py"
+  echo $msg
+  exit -1
+fi
 
 # 6. 离线预测
-python ad_xgboost_predict.py
+python ad_xgboost_predict.py
+if [ $? -ne 0 ];
+then
+  msg = "[ERROR] ad_xgboost_predict.py"
+  echo $msg
+  exit -1
+fi

+ 19 - 4
ad_predict_user_data_process.py

@@ -3,6 +3,8 @@ import time
 import datetime
 import pandas as pd
 from odps import ODPS
+from utils import data_check
+from threading import Timer
 
 # ODPS服务配置
 odps_config = {
@@ -107,12 +109,25 @@ def user_data_process(project, table, dt, app_type):
     user_df.to_csv(f"{predict_data_dir}/user_feature.csv", index=False)
 
 
-if __name__ == '__main__':
-    st_time = time.time()
+def timer_check():
     project = 'loghubods'
     table = 'admodel_testset_mid'
     # dt = '20230725'
     now_date = datetime.datetime.today()
     dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
-    user_data_process(project=project, table=table, dt=dt, app_type=0)
-    print(time.time() - st_time)
+    # 查看当前更新的数据是否已准备好
+    data_count = data_check(project=project, table=table, dt=dt)
+    if data_count > 0:
+        print(f"ad predict user data count = {data_count}")
+        # 数据准备好,进行更新
+        user_data_process(project=project, table=table, dt=dt, app_type=0)
+        print(f"ad predict user data update end!")
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(60, timer_check).start()
+
+
+if __name__ == '__main__':
+    st_time = time.time()
+    timer_check()
+    print(f"{time.time() - st_time}s")

+ 22 - 6
ad_predict_video_data_process.py

@@ -3,6 +3,8 @@ import time
 import datetime
 import pandas as pd
 from odps import ODPS
+from utils import data_check
+from threading import Timer
 
 # ODPS服务配置
 odps_config = {
@@ -40,7 +42,7 @@ def get_feature_data(project, table, dt, app_type):
         endpoint=odps_config['ENDPOINT'],
     )
     feature_data = []
-    sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type}"
+    sql = f"select * from {project}.{table} where dt={dt} and apptype={app_type} limit 1000"
     with odps.execute_sql(sql).open_reader() as reader:
         for record in reader:
             # print(record)
@@ -52,7 +54,7 @@ def get_feature_data(project, table, dt, app_type):
         return feature_df
 
 
-def user_data_process(project, table, dt, app_type):
+def video_data_process(project, table, dt, app_type):
     """每日特征处理"""
     print('step 1: get video feature data')
     feature_initial_df = get_feature_data(project=project, table=table, dt=dt, app_type=app_type)
@@ -122,12 +124,26 @@ def user_data_process(project, table, dt, app_type):
     video_df.to_csv(f"{predict_data_dir}/video_feature.csv", index=False)
 
 
-if __name__ == '__main__':
-    st_time = time.time()
+def timer_check():
     project = 'loghubods'
     table = 'admodel_testset_video'
     # dt = '20230725'
     now_date = datetime.datetime.today()
     dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d')
-    user_data_process(project=project, table=table, dt=dt, app_type=0)
-    print(time.time() - st_time)
+    # 查看当前更新的数据是否已准备好
+    data_count = data_check(project=project, table=table, dt=dt)
+    if data_count > 0:
+        print(f"ad predict video data count = {data_count}")
+        # 数据准备好,进行更新
+        video_data_process(project=project, table=table, dt=dt, app_type=0)
+        print(f"ad predict video data update end!")
+    else:
+        # 数据没准备好,1分钟后重新检查
+        Timer(60, timer_check).start()
+
+
+if __name__ == '__main__':
+    st_time = time.time()
+    timer_check()
+    print(f"{time.time() - st_time}s")
+

+ 9 - 4
ad_xgboost_predict.py

@@ -1,3 +1,4 @@
+import time
 import pandas as pd
 import xgboost as xgb
 from xgboost.sklearn import XGBClassifier
@@ -36,8 +37,9 @@ def predict(app_type):
     print(res_df.head())
 
     # 5. to csv
-    res_df.to_csv('./data/predict_data/predict_res.csv', index=False)
-    print("to csv finished!")
+    # res_df.to_csv('./data/predict_data/predict_res.csv', index=False)
+    # print("to csv finished!")
+    f = open('./data/predict_data/predict_res.txt', "w")
 
     xgb_config = config_.AD_MODEL_ABTEST_CONFIG['xgb']
     # 6. to redis
@@ -47,7 +49,8 @@ def predict(app_type):
         video_id = row['videoid']
         pre_res = row['res_predict']
         key = f"{xgb_config['predict_key_prefix']}{app_type}:{mid}:{video_id}"
-        redis_helper.set_data_to_redis(key_name=key, value=pre_res, expire_time=48*3600)
+        # redis_helper.set_data_to_redis(key_name=key, value=pre_res, expire_time=48*3600)
+        f.write(f"{key},{pre_res}\n")
     print("to redis finished!")
 
     # 7. 计算阈值
@@ -63,10 +66,12 @@ def predict(app_type):
     for ab_code, param in record.items():
         threshold = predict_mean * param
         # 写入redis
-        threshold_key = f"{xgb_config['threshold']}{abtest_id}{ab_code}"
+        threshold_key = f"{xgb_config['threshold']}{abtest_id}:{ab_code}"
         redis_helper.set_data_to_redis(key_name=threshold_key, value=threshold, expire_time=48 * 3600)
     print("update threshold finished!")
 
 
 if __name__ == '__main__':
+    st_time = time.time()
     predict(config_.APP_TYPE['VLOG'])
+    print(f"{time.time() - st_time}s")

+ 4 - 0
ad_xgboost_predict_data_generate.py

@@ -1,4 +1,5 @@
 import os
+import time
 import pandas as pd
 
 predict_data_dir = './data/predict_data'
@@ -26,6 +27,7 @@ def read_csv_data(filepath):
 
 
 if __name__ == '__main__':
+    st_time = time.time()
     # 1. 获取用户特征数据
     user_filepath = f"{predict_data_dir}/{user_filename}"
     user_df = read_csv_data(filepath=user_filepath)
@@ -66,3 +68,5 @@ if __name__ == '__main__':
         if not os.path.exists(predict_data_dir):
             os.makedirs(predict_data_dir)
         res_df.to_csv(f"{predict_data_dir}/predict_data_{ad_status}.csv", index=False)
+
+    print(f"{time.time() - st_time}s")

+ 58 - 50
ad_xgboost_train.py

@@ -1,59 +1,67 @@
 import pandas as pd
 import datetime
+import time
 from sklearn.model_selection import train_test_split
 from xgboost.sklearn import XGBClassifier
 from sklearn import metrics
 
 
-now_date = datetime.datetime.today()
-dt = datetime.datetime.strftime(now_date, '%Y%m%d')
-# 1. 读取数据
-# data = pd.read_csv(f'./data/train_test_data/train_test_{dt}.csv')
-# print(data.shape)
-train_data = pd.read_csv(f'./data/train_test_data/train_{dt}.csv')
-print(train_data.shape)
-test_data = pd.read_csv(f'./data/train_test_data/test_{dt}.csv')
-print(test_data.shape)
-# 2. 划分x和y
-# data_columns = data.columns.values.tolist()
-# x = data[data_columns[:-1]]
-# y = data[data_columns[-1]]
-# print(f"x_shape: {x.shape}, y_shape: {y.shape}")
-data_columns = train_data.columns.values.tolist()
-x_train = train_data[data_columns[:-1]]
-y_train = train_data[data_columns[-1]]
-x_test = test_data[data_columns[:-1]]
-y_test = test_data[data_columns[-1]]
-# 3. 训练集和测试集分割
-# x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=1)
-print(f"x_train_shape: {x_train.shape}")
-print(f"x_test_shape: {x_test.shape}")
-# 4. 模型训练
-xgb_model = XGBClassifier(
-    objective='binary:logistic',
-    learning_rate=0.3,
-    max_depth=5,
-    eval_metric=['mae', 'auc']
-)
-xgb_model.fit(x_train, y_train, eval_set=[(x_train, y_train), (x_test, y_test)])
-# 5. 模型保存
-xgb_model.save_model('./data/ad_xgb.model')
-# 6. 测试集预测
-y_test_pre = xgb_model.predict(x_test)
+def xgboost_train():
+    now_date = datetime.datetime.today()
+    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
+    # 1. 读取数据
+    # data = pd.read_csv(f'./data/train_test_data/train_test_{dt}.csv')
+    # print(data.shape)
+    train_data = pd.read_csv(f'./data/train_test_data/train_{dt}.csv')
+    print(train_data.shape)
+    test_data = pd.read_csv(f'./data/train_test_data/test_{dt}.csv')
+    print(test_data.shape)
+    # 2. 划分x和y
+    # data_columns = data.columns.values.tolist()
+    # x = data[data_columns[:-1]]
+    # y = data[data_columns[-1]]
+    # print(f"x_shape: {x.shape}, y_shape: {y.shape}")
+    data_columns = train_data.columns.values.tolist()
+    x_train = train_data[data_columns[:-1]]
+    y_train = train_data[data_columns[-1]]
+    x_test = test_data[data_columns[:-1]]
+    y_test = test_data[data_columns[-1]]
+    # 3. 训练集和测试集分割
+    # x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=1)
+    print(f"x_train_shape: {x_train.shape}")
+    print(f"x_test_shape: {x_test.shape}")
+    # 4. 模型训练
+    xgb_model = XGBClassifier(
+        objective='binary:logistic',
+        learning_rate=0.3,
+        max_depth=5,
+        eval_metric=['mae', 'auc']
+    )
+    xgb_model.fit(x_train, y_train, eval_set=[(x_train, y_train), (x_test, y_test)])
+    # 5. 模型保存
+    xgb_model.save_model('./data/ad_xgb.model')
+    # 6. 测试集预测
+    y_test_pre = xgb_model.predict(x_test)
 
-# test_df = x_test.copy()
-# test_df['y'] = y_test
-# test_df['y_pre'] = y_test_pre
-# test_df.to_csv('./data/test_pre.csv', index=False)
+    # test_df = x_test.copy()
+    # test_df['y'] = y_test
+    # test_df['y_pre'] = y_test_pre
+    # test_df.to_csv('./data/test_pre.csv', index=False)
 
-# 7. 模型效果验证
-test_accuracy = metrics.accuracy_score(y_test, y_test_pre)
-print("Test Accuracy: %.2f%%" % (test_accuracy * 100.0))
-test_auc = metrics.roc_auc_score(y_test, y_test_pre)
-print("auc: %.2f%%" % (test_auc * 100.0))
-test_recall = metrics.recall_score(y_test, y_test_pre)
-print("recall:%.2f%%"%(test_recall*100.0))
-test_f1 = metrics.f1_score(y_test, y_test_pre)
-print("f1:%.2f%%"%(test_f1*100.0))
-test_precision = metrics.precision_score(y_test, y_test_pre)
-print("precision:%.2f%%"%(test_precision*100.0))
+    # 7. 模型效果验证
+    test_accuracy = metrics.accuracy_score(y_test, y_test_pre)
+    print("Test Accuracy: %.2f%%" % (test_accuracy * 100.0))
+    test_auc = metrics.roc_auc_score(y_test, y_test_pre)
+    print("auc: %.2f%%" % (test_auc * 100.0))
+    test_recall = metrics.recall_score(y_test, y_test_pre)
+    print("recall:%.2f%%"%(test_recall*100.0))
+    test_f1 = metrics.f1_score(y_test, y_test_pre)
+    print("f1:%.2f%%"%(test_f1*100.0))
+    test_precision = metrics.precision_score(y_test, y_test_pre)
+    print("precision:%.2f%%"%(test_precision*100.0))
+
+
+if __name__ == '__main__':
+    st_time = time.time()
+    xgboost_train()
+    print(f"{time.time() - st_time}s")

+ 81 - 1
config.py

@@ -1980,7 +1980,87 @@ class BaseConfig(object):
             # 阈值结果存放 redis key 前缀,完整格式:ad:xgb:predict:{abtestId}:{abtestGroup}
             'threshold': 'ad:xgb:threshold:',
             # 实验ID列表
-            'abtest_id_mapping': {APP_TYPE['VLOG']: '173'}
+            'abtest_id_mapping': {APP_TYPE['VLOG']: '173'},
+            # 自动调整阈值配置
+            'auto_update_config': {
+                # 票圈vlog
+                APP_TYPE['VLOG']: {
+                    'abtest_id': '173',
+                    'abtest_group_list': ['ab3'],
+                    'not_update': 0.01,  # 无需调整阈值的uv浮动
+                    'gradient': 0.05,  # 调整梯度
+                    'max_update_step': 5,  # 最大调整步数
+                    # 调整步长
+                    'threshold_update': {
+                        'ab0': 1 / 48,
+                        'ab1': 1 / 48,
+                        'ab2': 1 / 48,
+                        'ab3': 1 / 48,
+                        'ab4': 1 / 48,
+                        'ab5': 1 / 48,
+                        'ab6': 1 / 48,
+                        'ab7': 1 / 48,
+                        'ab8': 1 / 48,
+                        'ab9': 1 / 48,
+                    },
+                    # 分时段控制目标uv参数
+                    'target_uv_param': {
+                        'ab0': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab1': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab2': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab3': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab4': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab5': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab6': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在设定目标uv的0%
+                        'ab7': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-9点,uv控制在0%
+                        'ab8': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                        'ab9': {
+                            'update_hours': list(range(7)), 'update_param': 0,
+                            'special_update_config': {'special_hours': [0, 1, 7, 8], 'special_gradient': 0.01,
+                                                      'special_max_update_step': 23}
+                        },  # 0-7点,uv控制在0%
+                    },
+                },
+
+
+            }
+
         }
     }