123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- import os
- import sys
- import json
- import optuna
- from sklearn.linear_model import LogisticRegression
- sys.path.append(os.getcwd())
- import numpy as np
- import pandas as pd
- import lightgbm as lgb
- from sklearn.preprocessing import LabelEncoder
- from sklearn.metrics import accuracy_score
- class LightGBM(object):
- """
- LightGBM model for classification
- """
- def __init__(self, flag, dt):
- self.label_encoder = LabelEncoder()
- self.my_c = [
- "uid",
- "type",
- "channel",
- "fans",
- "view_count_user_30days",
- "share_count_user_30days",
- "return_count_user_30days",
- "rov_user",
- "str_user",
- "out_user_id",
- "mode",
- "out_play_cnt",
- "out_like_cnt",
- "out_share_cnt",
- "out_collection_cnt",
- "tag1",
- "tag2",
- "tag3"
- ]
- self.str_columns = ["uid", "type", "channel", "mode", "out_user_id", "tag1", "tag2", "tag3"]
- self.float_columns = [
- "fans",
- "view_count_user_30days",
- "share_count_user_30days",
- "return_count_user_30days",
- "rov_user",
- "str_user",
- "out_play_cnt",
- "out_like_cnt",
- "out_share_cnt",
- "out_collection_cnt",
- ]
- self.split_c = 0.999
- self.yc = 0.8
- self.model = "lightgbm_0326.bin"
- self.flag = flag
- self.dt = dt
- def bays_params(self, trial):
- """
- Bayesian parameters for
- :return: best parameters
- """
- # 定义搜索空间
- param = {
- 'objective': 'binary',
- 'metric': 'binary_logloss',
- 'verbosity': -1,
- 'boosting_type': 'gbdt',
- 'num_leaves': trial.suggest_int('num_leaves', 20, 40),
- 'learning_rate': trial.suggest_loguniform('learning_rate', 1e-8, 1.0),
- 'feature_fraction': trial.suggest_uniform('feature_fraction', 0.4, 1.0),
- 'bagging_fraction': trial.suggest_uniform('bagging_fraction', 0.4, 1.0),
- 'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
- 'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
- "num_threads": 16, # 线程数量
- }
- X_train, X_test = self.generate_x_data()
- Y_train, Y_test = self.generate_y_data()
- train_data = lgb.Dataset(
- X_train,
- label=Y_train,
- categorical_feature=["uid", "type", "channel", "mode", "out_user_id", "tag1", "tag2", "tag3"],
- )
- test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data)
- gbm = lgb.train(param, train_data, num_boost_round=100, valid_sets=[test_data])
- preds = gbm.predict(X_test)
- pred_labels = np.rint(preds)
- accuracy = accuracy_score(Y_test, pred_labels)
- return accuracy
- def generate_x_data(self):
- """
- Generate data for feature engineering
- :return:
- """
- with open("data/produce_data/x_data_total_return_{}_{}.json".format(self.flag, self.dt)) as f1:
- x_list = json.loads(f1.read())
- index_t = int(len(x_list) * self.split_c)
- X_train = pd.DataFrame(x_list[:index_t], columns=self.my_c)
- for key in self.str_columns:
- X_train[key] = self.label_encoder.fit_transform(X_train[key])
- for key in self.float_columns:
- X_train[key] = pd.to_numeric(X_train[key], errors="coerce")
- X_test = pd.DataFrame(x_list[index_t:], columns=self.my_c)
- for key in self.str_columns:
- X_test[key] = self.label_encoder.fit_transform(X_test[key])
- for key in self.float_columns:
- X_test[key] = pd.to_numeric(X_test[key], errors="coerce")
- return X_train, X_test
- def generate_y_data(self):
- """
- Generate data for label
- :return:
- """
- with open("data/produce_data/y_data_total_return_{}_{}.json".format(self.flag, self.dt)) as f2:
- y_list = json.loads(f2.read())
- index_t = int(len(y_list) * self.split_c)
- temp = sorted(y_list)
- yuzhi = temp[int(len(temp) * self.yc) - 1]
- print("阈值是: {}".format(yuzhi))
- y__list = [0 if i <= yuzhi else 1 for i in y_list]
- y_train = np.array(y__list[:index_t])
- y_test = np.array(y__list[index_t:])
- return y_train, y_test
- def train_model(self):
- """
- Load dataset
- :return:
- """
- X_train, X_test = self.generate_x_data()
- Y_train, Y_test = self.generate_y_data()
- train_data = lgb.Dataset(
- X_train,
- label=Y_train,
- categorical_feature=["uid", "type", "channel", "mode", "out_user_id", "tag1", "tag2", "tag3"],
- )
- test_data = lgb.Dataset(X_test, label=Y_test, reference=train_data)
- params = {
- "objective": "binary", # 指定二分类任务
- "metric": "binary_logloss", # 评估指标为二分类的log损失
- "num_leaves": 36, # 叶子节点数
- "learning_rate": 0.08479152931388902, # 学习率
- "bagging_fraction": 0.6588121592044218, # 建树的样本采样比例
- "feature_fraction": 0.4572757903437793, # 建树的特征选择比例
- "bagging_freq": 2, # k 意味着每 k 次迭代执行bagging
- "num_threads": 16, # 线程数量
- "mini_child_samples": 71
- }
- # 训练模型
- num_round = 100
- print("开始训练......")
- bst = lgb.train(params, train_data, num_round, valid_sets=[test_data])
- bst.save_model(self.model)
- print("模型训练完成✅")
- def evaluate_model(self):
- """
- 评估模型性能
- :return:
- """
- fw = open("summary_tag_03{}.txt".format(self.dt), "a+", encoding="utf-8")
- # 测试数据
- with open("data/produce_data/x_data_total_return_predict_{}.json".format(self.dt)) as f1:
- x_list = json.loads(f1.read())
- # 测试 label
- with open("data/produce_data/y_data_total_return_predict_{}.json".format(self.dt)) as f2:
- Y_test = json.loads(f2.read())
- Y_test = [0 if i <= 27 else 1 for i in Y_test]
- X_test = pd.DataFrame(x_list, columns=self.my_c)
- for key in self.str_columns:
- X_test[key] = self.label_encoder.fit_transform(X_test[key])
- for key in self.float_columns:
- X_test[key] = pd.to_numeric(X_test[key], errors="coerce")
- bst = lgb.Booster(model_file=self.model)
- y_pred = bst.predict(X_test, num_iteration=bst.best_iteration)
- temp = sorted(list(y_pred))
- yuzhi = temp[int(len(temp) * 0.7) - 1]
- y_pred_binary = [0 if i <= yuzhi else 1 for i in list(y_pred)]
- # 转换为二进制输出
- score_list = []
- for index, item in enumerate(list(y_pred)):
- real_label = Y_test[index]
- score = item
- prid_label = y_pred_binary[index]
- print(real_label, "\t", prid_label, "\t", score)
- fw.write("{}\t{}\t{}\n".format(real_label, prid_label, score))
- score_list.append(score)
- print("预测样本总量: {}".format(len(score_list)))
- data_series = pd.Series(score_list)
- print("统计 score 信息")
- print(data_series.describe())
- # 评估模型
- accuracy = accuracy_score(Y_test, y_pred_binary)
- print(f"Accuracy: {accuracy}")
- fw.close()
- def feature_importance(self):
- """
- Get the importance of each feature
- :return:
- """
- lgb_model = lgb.Booster(model_file=self.model)
- importance = lgb_model.feature_importance(importance_type='split')
- feature_name = lgb_model.feature_name()
- feature_importance = sorted(zip(feature_name, importance), key=lambda x: x[1], reverse=True)
- # 打印特征重要性
- for name, imp in feature_importance:
- print(name, imp)
- if __name__ == "__main__":
- i = int(input("输入 1 训练, 输入 2 预测:\n"))
- if i == 1:
- f = "train"
- dt = "whole"
- L = LightGBM(flag=f, dt=dt)
- L.train_model()
- elif i == 2:
- f = "predict"
- dt = int(input("输入日期, 16-21:\n"))
- L = LightGBM(flag=f, dt=dt)
- L.evaluate_model()
- # study = optuna.create_study(direction='maximize')
- # study.optimize(L.bays_params, n_trials=100)
- # print('Number of finished trials:', len(study.trials))
- # print('Best trial:', study.best_trial.params)
- # L.train_model()
- # L.evaluate_model()
- # L.feature_importance()
|