rov_train.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import os
  2. import random
  3. import time
  4. import traceback
  5. import lightgbm as lgb
  6. import pandas as pd
  7. from sklearn.model_selection import train_test_split
  8. from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
  9. from config import set_config
  10. from utils import read_from_pickle, write_to_pickle, data_normalization, \
  11. request_post, filter_video_status, send_msg_to_feishu
  12. from log import Log
  13. from db_helper import RedisHelper, MysqlHelper
  14. config_ = set_config()
  15. log_ = Log()
  16. def process_data(filename):
  17. """
  18. 数据清洗、预处理
  19. :param filename: type-DataFrame
  20. :return: x, y, video_ids, features
  21. """
  22. # 获取数据
  23. data = read_from_pickle(filename)
  24. # 获取y,并将 y <= 0 的值更新为1
  25. data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1
  26. y = data['futre7dayreturn']
  27. # 获取视频id列
  28. video_ids = data['videoid']
  29. # 获取x
  30. drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
  31. x = data.drop(columns=drop_columns)
  32. # 计算后一天的回流比前一天的回流差值
  33. x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
  34. x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
  35. x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
  36. # 计算后一天回流比前一天回流的增长率
  37. x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
  38. x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
  39. x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
  40. # 缺失值填充为0
  41. x.fillna(0)
  42. # 获取当前所使用的特征列表
  43. features = list(x)
  44. return x, y, video_ids, features
  45. def train(x, y, features):
  46. """
  47. 训练模型
  48. :param x: X
  49. :param y: Y
  50. :param features: 特征列表
  51. :return: None
  52. """
  53. # 训练集、测试集分割
  54. x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33)
  55. log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape))
  56. log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape))
  57. # 训练参数设置
  58. params = {
  59. "objective": "regression",
  60. "reg_sqrt": True,
  61. "metric": "mape",
  62. "max_depth": -1,
  63. "num_leaves": 50,
  64. "learning_rate": 0.1,
  65. "bagging_fraction": 0.7,
  66. "feature_fraction": 0.7,
  67. "bagging_freq": 8,
  68. "bagging_seed": 2018,
  69. "lambda_l1": 0.11,
  70. "boosting": "dart",
  71. "nthread": 4,
  72. "verbosity": -1
  73. }
  74. # 初始化数据集
  75. train_set = lgb.Dataset(data=x_train, label=y_train)
  76. test_set = lgb.Dataset(data=x_test, label=y_test)
  77. # 模型训练
  78. evals_result = {}
  79. model = lgb.train(params=params, train_set=train_set, num_boost_round=5000,
  80. valid_sets=[test_set], early_stopping_rounds=100,
  81. verbose_eval=100, evals_result=evals_result)
  82. # 将模型特征重要度存入csv
  83. feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()}
  84. feature_importance_filename = 'model_feature_importance.csv'
  85. pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'],
  86. ascending=False, **feature_importance_data)
  87. # 测试集预测
  88. pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration)
  89. y_test = y_test.values
  90. err_mape = mean_absolute_percentage_error(y_test, pre_y_test)
  91. r2 = r2_score(y_test, pre_y_test)
  92. # 将测试集结果存入csv
  93. test_data = {'pre_y_test': pre_y_test, 'y_test': y_test}
  94. test_result_filename = 'test_result.csv'
  95. pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data)
  96. log_.info('err_mape={}, r2={}'.format(err_mape, r2))
  97. # 保存模型
  98. write_to_pickle(data=model, filename=config_.MODEL_FILENAME)
  99. def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data):
  100. """
  101. 打包数据并存入csv
  102. :param filename: csv文件名
  103. :param sort_columns: 指定排序列名列名,type-list, 默认为None
  104. :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
  105. :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
  106. :param data: 数据
  107. :return: None
  108. """
  109. if not os.path.exists(filepath):
  110. os.makedirs(filepath)
  111. file = os.path.join(filepath, filename)
  112. df = pd.DataFrame(data=data)
  113. if sort_columns:
  114. df = df.sort_values(by=sort_columns, ascending=ascending)
  115. df.to_csv(file, index=False)
  116. def predict():
  117. """预测"""
  118. # 读取预测数据并进行清洗
  119. x, y, video_ids, _ = process_data(config_.PREDICT_DATA_FILENAME)
  120. log_.info('predict data shape: x={}'.format(x.shape))
  121. # 获取训练好的模型
  122. model = read_from_pickle(filename=config_.MODEL_FILENAME)
  123. # 预测
  124. y_ = model.predict(x)
  125. log_.info('predict finished!')
  126. # 将结果进行归一化到[0, 100]
  127. normal_y_ = data_normalization(list(y_))
  128. log_.info('normalization finished!')
  129. # 打包预测结果存入csv
  130. predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids}
  131. predict_result_filename = 'predict.csv'
  132. pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data)
  133. # 上传redis
  134. redis_data = {}
  135. json_data = []
  136. for i in range(len(video_ids)):
  137. redis_data[video_ids[i]] = normal_y_[i]
  138. json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]})
  139. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  140. redis_helper = RedisHelper()
  141. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
  142. log_.info('data to redis finished!')
  143. # 通知后端更新数据
  144. # result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
  145. # if result['code'] == 0:
  146. # log_.info('notify backend success!')
  147. # else:
  148. # log_.error('notify backend fail!')
  149. def predict_test():
  150. """测试环境数据生成"""
  151. # 获取测试环境中最近发布的40000条视频
  152. mysql_info = {
  153. 'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
  154. 'port': 3306,
  155. 'user': 'wx2016_longvideo',
  156. 'password': 'wx2016_longvideoP@assword1234',
  157. 'db': 'longvideo'
  158. }
  159. sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;"
  160. mysql_helper = MysqlHelper(mysql_info=mysql_info)
  161. data = mysql_helper.get_data(sql=sql)
  162. video_ids = [video[0] for video in data]
  163. # 视频状态过滤
  164. filtered_videos = filter_video_status(video_ids)
  165. log_.info('filtered_videos nums={}'.format(len(filtered_videos)))
  166. # 随机生成 0-100 数作为分数
  167. redis_data = {}
  168. json_data = []
  169. for video_id in filtered_videos:
  170. score = random.uniform(0, 100)
  171. redis_data[video_id] = score
  172. json_data.append({'videoId': video_id, 'rovScore': score})
  173. # 上传Redis
  174. redis_helper = RedisHelper()
  175. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  176. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
  177. log_.info('test data to redis finished!')
  178. # 通知后端更新数据
  179. result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
  180. if result['code'] == 0:
  181. log_.info('notify backend success!')
  182. else:
  183. log_.error('notify backend fail!')
  184. if __name__ == '__main__':
  185. try:
  186. log_.info('rov model train start...')
  187. train_start = time.time()
  188. train_filename = config_.TRAIN_DATA_FILENAME
  189. X, Y, videos, fea = process_data(filename=train_filename)
  190. log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
  191. train(X, Y, features=fea)
  192. train_end = time.time()
  193. log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
  194. log_.info('rov model predict start...')
  195. predict_start = time.time()
  196. predict()
  197. predict_end = time.time()
  198. log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
  199. except Exception as e:
  200. log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
  201. send_msg_to_feishu('rov-offline生产环境 - ROV召回池更新失败, exception: {}'.format(e))