import os import sys import json from sklearn.linear_model import LogisticRegression sys.path.append(os.getcwd()) import numpy as np import pandas as pd import lightgbm as lgb from sklearn.preprocessing import LabelEncoder from sklearn.metrics import accuracy_score class LightGBM(object): """ LightGBM model for classification """ def __init__(self): self.label_encoder = LabelEncoder() self.my_c = [ "uid", "type", "channel", "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_user_id", "mode", "out_play_cnt", "out_like_cnt", "out_share_cnt", "out_collection_cnt" ] self.str_columns = ["uid", "type", "channel", "mode", "out_user_id"] self.float_columns = [ "fans", "view_count_user_30days", "share_count_user_30days", "return_count_user_30days", "rov_user", "str_user", "out_play_cnt", "out_like_cnt", "out_share_cnt", "out_collection_cnt" ] self.split_c = 0.95 self.yc = 0.8 self.model = "lightgbm_train.bin" def generate_x_data(self): """ Generate data for feature engineering :return: """ with open("whole_data/x_data_total_return.json") as f1: x_list = json.loads(f1.read()) index_t = int(len(x_list) * self.split_c) X_train = pd.DataFrame(x_list[:index_t], columns=self.my_c) for key in self.str_columns: X_train[key] = self.label_encoder.fit_transform(X_train[key]) for key in self.float_columns: X_train[key] = pd.to_numeric(X_train[key], errors='coerce') X_test = pd.DataFrame(x_list[index_t:], columns=self.my_c) for key in self.str_columns: X_test[key] = self.label_encoder.fit_transform(X_test[key]) for key in self.float_columns: X_test[key] = pd.to_numeric(X_test[key], errors='coerce') return X_train, X_test def generate_y_data(self): """ Generate data for label :return: """ with open("whole_data/y_data_total_return.json") as f2: y_list = json.loads(f2.read()) index_t = int(len(y_list) * 0.7) temp = sorted(y_list) yuzhi = temp[int(len(temp) * self.yc) - 1] y__list = [0 if i <= yuzhi else 1 for i in y_list] y_train = np.array(y__list[:index_t]) y_test = np.array(y__list[index_t:]) return y_train, y_test def train_model(self): """ Load dataset :return: """ X_train, X_test = self.generate_x_data() Y_train, Y_test = self.generate_y_data() train_data = lgb.Dataset(X_train, label=Y_train, categorical_feature=['uid', 'type', 'channel', 'mode', 'out_user_id']) test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data) params = { 'objective': 'binary', # 指定二分类任务 'metric': 'binary_logloss', # 评估指标为二分类的log损失 'num_leaves': 31, # 叶子节点数 'learning_rate': 0.05, # 学习率 'bagging_fraction': 0.9, # 建树的样本采样比例 'feature_fraction': 0.8, # 建树的特征选择比例 'bagging_freq': 5, # k 意味着每 k 次迭代执行bagging 'num_threads': 4 # 线程数量 } # 训练模型 num_round = 100 print("开始训练......") bst = lgb.train(params, train_data, num_round, valid_sets=[test_data]) bst.save_model(self.model, binary=True) def evaluate_model(self): """ 评估模型性能 :return: """ X_test, Y_test = [], [] bst = lgb.Booster(model_file=self.model) y_pred = bst.predict(X_test, num_iteration=bst.best_iteration) # 转换为二进制输出 y_pred_binary = np.where(y_pred > 0.7, 1, 0) # 评估模型 accuracy = accuracy_score(Y_test, y_pred_binary) print(f'Accuracy: {accuracy}') if __name__ == '__main__': L = LightGBM() L.train_model()