rov_train.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import os
  2. import time
  3. import lightgbm as lgb
  4. import pandas as pd
  5. from sklearn.model_selection import train_test_split
  6. from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
  7. from config import set_config
  8. from utils import read_from_pickle, write_to_pickle, data_normalization, request_post
  9. from log import Log
  10. from db_helper import RedisHelper
  11. config_ = set_config()
  12. log_ = Log()
  13. def process_data(filename):
  14. """
  15. 数据清洗、预处理
  16. :param filename: type-DataFrame
  17. :return: x, y, video_ids, features
  18. """
  19. # 获取数据
  20. data = read_from_pickle(filename)
  21. # 获取y,并将 y <= 0 的值更新为1
  22. data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1
  23. y = data['futre7dayreturn']
  24. # 获取视频id列
  25. video_ids = data['videoid']
  26. # 获取x
  27. drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
  28. x = data.drop(columns=drop_columns)
  29. # 计算后一天的回流比前一天的回流差值
  30. x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
  31. x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
  32. x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
  33. # 计算后一天回流比前一天回流的增长率
  34. x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
  35. x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
  36. x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
  37. # 缺失值填充为0
  38. x.fillna(0)
  39. # 获取当前所使用的特征列表
  40. features = list(x)
  41. return x, y, video_ids, features
  42. def train(x, y, features):
  43. """
  44. 训练模型
  45. :param x: X
  46. :param y: Y
  47. :param features: 特征列表
  48. :return: None
  49. """
  50. # 训练集、测试集分割
  51. x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33)
  52. log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape))
  53. log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape))
  54. # 训练参数设置
  55. params = {
  56. "objective": "regression",
  57. "reg_sqrt": True,
  58. "metric": "mape",
  59. "max_depth": -1,
  60. "num_leaves": 50,
  61. "learning_rate": 0.1,
  62. "bagging_fraction": 0.7,
  63. "feature_fraction": 0.7,
  64. "bagging_freq": 8,
  65. "bagging_seed": 2018,
  66. "lambda_l1": 0.11,
  67. "boosting": "dart",
  68. "nthread": 4,
  69. "verbosity": -1
  70. }
  71. # 初始化数据集
  72. train_set = lgb.Dataset(data=x_train, label=y_train)
  73. test_set = lgb.Dataset(data=x_test, label=y_test)
  74. # 模型训练
  75. evals_result = {}
  76. model = lgb.train(params=params, train_set=train_set, num_boost_round=5000,
  77. valid_sets=[test_set], early_stopping_rounds=100,
  78. verbose_eval=100, evals_result=evals_result)
  79. # 将模型特征重要度存入csv
  80. feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()}
  81. feature_importance_filename = 'model_feature_importance.csv'
  82. pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'],
  83. ascending=False, **feature_importance_data)
  84. # 测试集预测
  85. pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration)
  86. y_test = y_test.values
  87. err_mape = mean_absolute_percentage_error(y_test, pre_y_test)
  88. r2 = r2_score(y_test, pre_y_test)
  89. # 将测试集结果存入csv
  90. test_data = {'pre_y_test': pre_y_test, 'y_test': y_test}
  91. test_result_filename = 'test_result.csv'
  92. pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data)
  93. log_.info('err_mape={}, r2={}'.format(err_mape, r2))
  94. # 保存模型
  95. write_to_pickle(data=model, filename=config_.MODEL_FILENAME)
  96. def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data):
  97. """
  98. 打包数据并存入csv
  99. :param filename: csv文件名
  100. :param sort_columns: 指定排序列名列名,type-list, 默认为None
  101. :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
  102. :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
  103. :param data: 数据
  104. :return: None
  105. """
  106. if not os.path.exists(filepath):
  107. os.makedirs(filepath)
  108. file = os.path.join(filepath, filename)
  109. df = pd.DataFrame(data=data)
  110. if sort_columns:
  111. df = df.sort_values(by=sort_columns, ascending=ascending)
  112. df.to_csv(file, index=False)
  113. def predict():
  114. """预测"""
  115. # 读取预测数据并进行清洗
  116. x, y, video_ids, _ = process_data(config_.PREDICT_DATA_FILENAME)
  117. log_.info('predict data shape: x={}'.format(x.shape))
  118. # 获取训练好的模型
  119. model = read_from_pickle(filename=config_.MODEL_FILENAME)
  120. # 预测
  121. y_ = model.predict(x)
  122. log_.info('predict finished!')
  123. # 将结果进行归一化到[0, 100]
  124. normal_y_ = data_normalization(list(y_))
  125. log_.info('normalization finished!')
  126. # 打包预测结果存入csv
  127. predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids}
  128. predict_result_filename = 'predict.csv'
  129. pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data)
  130. # 上传redis
  131. redis_data = {}
  132. json_data = []
  133. for i in range(len(video_ids)):
  134. redis_data[video_ids[i]] = normal_y_[i]
  135. json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]})
  136. key_name = config_.RECALL_KEY_NAME_PREFIX + '{}.{}'.format(config_.APP_TYPE['VLOG'], time.strftime('%Y%m%d'))
  137. redis_helper = RedisHelper()
  138. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
  139. log_.info('data to redis finished!')
  140. # 通知后端更新数据
  141. result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
  142. if result['code'] == 0:
  143. log_.info('notify backend success!')
  144. else:
  145. log_.error('notify backend fail!')
  146. if __name__ == '__main__':
  147. log_.info('rov model train start...')
  148. train_start = time.time()
  149. train_filename = config_.TRAIN_DATA_FILENAME
  150. X, Y, videos, fea = process_data(filename=train_filename)
  151. log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
  152. train(X, Y, features=fea)
  153. train_end = time.time()
  154. log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
  155. log_.info('rov model predict start...')
  156. predict_start = time.time()
  157. predict()
  158. predict_end = time.time()
  159. log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))