""" 仅使用标题信息 """ import os import sys from sklearn.preprocessing import LabelEncoder sys.path.append(os.getcwd()) import pandas as pd import lightgbm as lgb from scipy.stats import randint as sp_randint from scipy.stats import uniform as sp_uniform from sklearn.model_selection import RandomizedSearchCV, train_test_split from sklearn.metrics import roc_auc_score, accuracy_score class LightGBM(object): """ LightGBM model for classification """ def __init__(self, flag, dt): self.label_encoder = LabelEncoder() self.my_c = [ "channel", "type", "tag1", "tag2", "tag3" ] self.str_columns = [ "channel", "type", "tag1", "tag2", "tag3" ] self.split_c = 0.75 self.yc = 0.8 self.model = "models/lightgbm_0409_all_tags.bin" self.flag = flag self.dt = dt def read_data(self, path, yc=None): """ Read data from local :return: """ df = pd.read_json(path) df = df.dropna(subset=['label']) # 把 label 为空的删掉 df = df.dropna(subset=['tag1', 'tag2'], how="all") # 把 tag 为空的数据也删掉 labels = df['label'] features = df.drop(['label', 'tag3', 'tag4'], axis=1) for key in self.str_columns: features[key] = self.label_encoder.fit_transform(features[key]) return features, labels, df def best_params(self): """ find best params for lightgbm """ path = "data/train_data/all_train_20240408.json" X, y, ori_df = self.read_data(path) print(len(list(y))) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) lgb_ = lgb.LGBMClassifier(objective='binary') # 设置搜索的参数范围 param_dist = { 'num_leaves': sp_randint(20, 40), 'learning_rate': sp_uniform(0.001, 0.1), 'feature_fraction': sp_uniform(0.5, 0.9), 'bagging_fraction': sp_uniform(0.5, 0.9), 'bagging_freq': sp_randint(1, 10), 'min_child_samples': sp_randint(5, 100), } # 定义 RandomizedSearchCV rsearch = RandomizedSearchCV( estimator=lgb_, param_distributions=param_dist, n_iter=100, cv=3, scoring='roc_auc', random_state=42, verbose=2 ) # 开始搜索 rsearch.fit(X_train, y_train) # 打印最佳参数和对应的AUC得分 print("Best parameters found: ", rsearch.best_params_) print("Best AUC found: ", rsearch.best_score_) # 使用最佳参数在测试集上的表现 best_model = rsearch.best_estimator_ y_pred = best_model.predict_proba(X_test)[:, 1] auc = roc_auc_score(y_test, y_pred) print("AUC on test set: ", auc) def train_model(self): """ Load dataset :return: """ path = "data/train_data/all_train_20240408.json" x, y, ori_df = self.read_data(path) train_size = int(len(x) * self.split_c) X_train, X_test = x[:train_size], x[train_size:] Y_train, Y_test = y[:train_size], y[train_size:] train_data = lgb.Dataset( X_train, label=Y_train, categorical_feature=["tag1", "tag2"], ) test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data) params = { 'bagging_fraction': 0.9323330736797192, 'bagging_freq': 1, 'feature_fraction': 0.8390650729441467, 'learning_rate': 0.07595782999760721, 'min_child_samples': 93, 'num_leaves': 36, 'num_threads': 16 } # 训练模型 num_round = 100 print("开始训练......") bst = lgb.train(params, train_data, num_round, valid_sets=[test_data]) bst.save_model(self.model) print("模型训练完成✅") def evaluate_model(self): """ 评估模型性能 :return: """ fw = open("result/summary_{}.txt".format(dt), "a+", encoding="utf-8") path = 'data/predict_data/all_predict_{}.json'.format(dt) x, y, ori_df = self.read_data(path, yc=6) true_label_df = pd.DataFrame(list(y), columns=['ture_label']) bst = lgb.Booster(model_file=self.model) y_pred = bst.predict(x, num_iteration=bst.best_iteration) pred_score_df = pd.DataFrame(list(y_pred), columns=['pred_score']) temp = sorted(list(y_pred)) yuzhi = temp[int(len(temp) * 0.9) - 1] y_pred_binary = [0 if i <= yuzhi else 1 for i in list(y_pred)] pred_label_df = pd.DataFrame(list(y_pred_binary), columns=['pred_label']) score_list = [] for index, item in enumerate(list(y_pred)): real_label = y[index] score = item prid_label = y_pred_binary[index] fw.write("{}\t{}\t{}\n".format(real_label, prid_label, score)) score_list.append(score) print("预测样本总量: {}".format(len(score_list))) data_series = pd.Series(score_list) print("统计 score 信息") print(data_series.describe()) # 评估模型 accuracy = accuracy_score(y, y_pred_binary) print(f"Accuracy: {accuracy}") fw.close() # 水平合并 df_concatenated = pd.concat([ori_df, true_label_df, pred_score_df, pred_label_df], axis=1) # for key in self.str_columns: # df_concatenated[key] = [self.label_mapping[key][i] for i in df_concatenated[key]] df_concatenated.to_excel("data/predict_data/spider_predict_result_{}.xlsx".format(dt), index=False) def feature_importance(self): """ Get the importance of each feature :return: """ lgb_model = lgb.Booster(model_file=self.model) importance = lgb_model.feature_importance(importance_type='split') feature_name = lgb_model.feature_name() feature_importance = sorted(zip(feature_name, importance), key=lambda x: x[1], reverse=True) # 打印特征重要性 for name, imp in feature_importance: print(name, imp) # "cat summary_20240326.txt | awk -F "\t" '{print $1" "$3}'| /root/AUC/AUC/AUC" """ ossutil64 cp /root/luojunhui/alg/data/predict_data/spider_predict_result_20240330.xlsx oss://art-pubbucket/0temp/ """ if __name__ == "__main__": i = int(input("输入 1 训练, 输入 2 预测:\n")) if i == 1: f = "train" dt = "whole" L = LightGBM(flag=f, dt=dt) L.train_model() elif i == 2: f = "predict" dt = int(input("输入日期, 20240316-21:\n")) L = LightGBM(flag=f, dt=dt) L.evaluate_model() L.feature_importance() elif i == 3: L = LightGBM("train", "whole") L.best_params()