""" 针对爬虫类型数据单独训练模型 """ import os import sys import json import optuna from sklearn.linear_model import LogisticRegression sys.path.append(os.getcwd()) import numpy as np import pandas as pd import lightgbm as lgb from sklearn.preprocessing import LabelEncoder from sklearn.metrics import accuracy_score class LightGBM(object): """ LightGBM model for classification """ def __init__(self, flag, dt): self.label_encoder = LabelEncoder() self.my_c = [ "channel", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt", "tag1", "tag2", "tag3" ] self.str_columns = ["channel", "mode", "out_user_id", "tag1", "tag2", "tag3"] self.float_columns = [ "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_play_cnt", "out_like_cnt", "out_share_cnt", ] self.split_c = 0.7 self.yc = 0.8 self.model = "lightgbm_0326_spider.bin" self.flag = flag self.dt = dt def bays_params(self, trial): """ Bayesian parameters for :return: best parameters """ # 定义搜索空间 param = { 'objective': 'binary', 'metric': 'binary_logloss', 'verbosity': -1, 'boosting_type': 'gbdt', 'num_leaves': trial.suggest_int('num_leaves', 20, 40), 'learning_rate': trial.suggest_loguniform('learning_rate', 1e-8, 1.0), 'feature_fraction': trial.suggest_uniform('feature_fraction', 0.4, 1.0), 'bagging_fraction': trial.suggest_uniform('bagging_fraction', 0.4, 1.0), 'bagging_freq': trial.suggest_int('bagging_freq', 1, 7), 'min_child_samples': trial.suggest_int('min_child_samples', 5, 100), "num_threads": 16, # 线程数量 } X_train, X_test = self.generate_x_data() Y_train, Y_test = self.generate_y_data() train_data = lgb.Dataset( X_train, label=Y_train, categorical_feature=["channel", "mode", "out_user_id", "tag1", "tag2", "tag3"], ) test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data) gbm = lgb.train(param, train_data, num_boost_round=100, valid_sets=[test_data]) preds = gbm.predict(X_test) pred_labels = np.rint(preds) accuracy = accuracy_score(Y_test, pred_labels) return accuracy def generate_x_data(self): """ Generate data for feature engineering :return: """ with open("data/produce_data/x_data_total_return_{}_{}_spider.json".format(self.flag, self.dt)) as f1: x_list = json.loads(f1.read()) index_t = int(len(x_list) * self.split_c) X_train = pd.DataFrame(x_list[:index_t], columns=self.my_c) for key in self.str_columns: X_train[key] = self.label_encoder.fit_transform(X_train[key]) for key in self.float_columns: X_train[key] = pd.to_numeric(X_train[key], errors="coerce") X_test = pd.DataFrame(x_list[index_t:], columns=self.my_c) for key in self.str_columns: X_test[key] = self.label_encoder.fit_transform(X_test[key]) for key in self.float_columns: X_test[key] = pd.to_numeric(X_test[key], errors="coerce") return X_train, X_test def generate_y_data(self): """ Generate data for label :return: """ with open("data/produce_data/y_data_total_return_{}_{}_spider.json".format(self.flag, self.dt)) as f2: y_list = json.loads(f2.read()) index_t = int(len(y_list) * self.split_c) temp = sorted(y_list) yuzhi = temp[int(len(temp) * self.yc) - 1] print("阈值是: {}".format(yuzhi)) y__list = [0 if i <= yuzhi else 1 for i in y_list] y_train = np.array(y__list[:index_t]) y_test = np.array(y__list[index_t:]) return y_train, y_test def train_model(self): """ Load dataset :return: """ X_train, X_test = self.generate_x_data() Y_train, Y_test = self.generate_y_data() train_data = lgb.Dataset( X_train, label=Y_train, categorical_feature=["uid", "type", "channel", "mode", "out_user_id", "tag1", "tag2", "tag3"], ) test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data) params = { "objective": "binary", # 指定二分类任务 "metric": "binary_logloss", # 评估指标为二分类的log损失 "num_leaves": 36, # 叶子节点数 "learning_rate": 0.08479152931388902, # 学习率 "bagging_fraction": 0.6588121592044218, # 建树的样本采样比例 "feature_fraction": 0.4572757903437793, # 建树的特征选择比例 "bagging_freq": 2, # k 意味着每 k 次迭代执行bagging "num_threads": 16, # 线程数量 "mini_child_samples": 71 } # 训练模型 num_round = 100 print("开始训练......") bst = lgb.train(params, train_data, num_round, valid_sets=[test_data]) bst.save_model(self.model) print("模型训练完成✅") def evaluate_model(self): """ 评估模型性能 :return: """ fw = open("summary_tag_03{}.txt".format(self.dt), "a+", encoding="utf-8") # 测试数据 with open("data/produce_data/x_data_total_return_predict_{}.json".format(self.dt)) as f1: x_list = json.loads(f1.read()) # 测试 label with open("data/produce_data/y_data_total_return_predict_{}.json".format(self.dt)) as f2: Y_test = json.loads(f2.read()) Y_test = [0 if i <= 19 else 1 for i in Y_test] X_test = pd.DataFrame(x_list, columns=self.my_c) for key in self.str_columns: X_test[key] = self.label_encoder.fit_transform(X_test[key]) for key in self.float_columns: X_test[key] = pd.to_numeric(X_test[key], errors="coerce") bst = lgb.Booster(model_file=self.model) y_pred = bst.predict(X_test, num_iteration=bst.best_iteration) temp = sorted(list(y_pred)) yuzhi = temp[int(len(temp) * 0.7) - 1] y_pred_binary = [0 if i <= yuzhi else 1 for i in list(y_pred)] # 转换为二进制输出 score_list = [] for index, item in enumerate(list(y_pred)): real_label = Y_test[index] score = item prid_label = y_pred_binary[index] print(real_label, "\t", prid_label, "\t", score) fw.write("{}\t{}\t{}\n".format(real_label, prid_label, score)) score_list.append(score) print("预测样本总量: {}".format(len(score_list))) data_series = pd.Series(score_list) print("统计 score 信息") print(data_series.describe()) # 评估模型 accuracy = accuracy_score(Y_test, y_pred_binary) print(f"Accuracy: {accuracy}") fw.close() def feature_importance(self): """ Get the importance of each feature :return: """ lgb_model = lgb.Booster(model_file=self.model) importance = lgb_model.feature_importance(importance_type='split') feature_name = lgb_model.feature_name() feature_importance = sorted(zip(feature_name, importance), key=lambda x: x[1], reverse=True) # 打印特征重要性 for name, imp in feature_importance: print(name, imp) if __name__ == "__main__": # i = int(input("输入 1 训练, 输入 2 预测:\n")) # if i == 1: # f = "train" # dt = "whole" # L = LightGBM(flag=f, dt=dt) # L.train_model() # elif i == 2: # f = "predict" # dt = int(input("输入日期, 16-21:\n")) # L = LightGBM(flag=f, dt=dt) # L.evaluate_model() L = LightGBM("train", "whole") study = optuna.create_study(direction='maximize') study.optimize(L.bays_params, n_trials=100) print('Number of finished trials:', len(study.trials)) print('Best trial:', study.best_trial.params) # L.train_model() # L.evaluate_model() # L.feature_importance()