rov_train.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. import os
  2. import random
  3. import time
  4. import lightgbm as lgb
  5. import pandas as pd
  6. from sklearn.model_selection import train_test_split
  7. from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
  8. from config import set_config
  9. from utils import read_from_pickle, write_to_pickle, data_normalization, \
  10. request_post, filter_video_status, update_video_w_h_rate
  11. from log import Log
  12. from db_helper import RedisHelper, MysqlHelper
  13. config_, env = set_config()
  14. log_ = Log()
  15. def process_data(filename):
  16. """
  17. 数据清洗、预处理
  18. :param filename: type-DataFrame
  19. :return: x, y, video_ids, features
  20. """
  21. # 获取数据
  22. data = read_from_pickle(filename)
  23. # 获取y,并将 y <= 0 的值更新为1
  24. data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1
  25. y = data['futre7dayreturn']
  26. # 获取视频id列
  27. video_ids = data['videoid']
  28. # 获取x
  29. drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
  30. x = data.drop(columns=drop_columns)
  31. # 计算后一天的回流比前一天的回流差值
  32. x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
  33. x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
  34. x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
  35. # 计算后一天回流比前一天回流的增长率
  36. x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
  37. x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
  38. x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
  39. # 缺失值填充为0
  40. x.fillna(0)
  41. # 获取当前所使用的特征列表
  42. features = list(x)
  43. return x, y, video_ids, features
  44. def process_predict_data(filename):
  45. """
  46. 预测数据清洗、预处理
  47. :param filename: type-DataFrame
  48. :return: x, y, video_ids, features
  49. """
  50. # 获取数据
  51. data = read_from_pickle(filename)
  52. print(len(data))
  53. # 获取视频id列
  54. video_ids = data['videoid']
  55. # 视频状态过滤
  56. video_id_list = [int(video_id) for video_id in video_ids]
  57. filtered_videos = [str(item) for item in filter_video_status(video_ids=video_id_list)]
  58. data = data.loc[data['videoid'].isin(filtered_videos)]
  59. print(len(data))
  60. video_id_final = data['videoid']
  61. # 获取x
  62. drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
  63. x = data.drop(columns=drop_columns)
  64. # 计算后一天的回流比前一天的回流差值
  65. x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
  66. x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
  67. x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
  68. # 计算后一天回流比前一天回流的增长率
  69. x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
  70. x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
  71. x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
  72. # 缺失值填充为0
  73. x.fillna(0)
  74. return x, video_id_final
  75. def train(x, y, features):
  76. """
  77. 训练模型
  78. :param x: X
  79. :param y: Y
  80. :param features: 特征列表
  81. :return: None
  82. """
  83. # 训练集、测试集分割
  84. x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33)
  85. log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape))
  86. log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape))
  87. # 训练参数设置
  88. params = {
  89. "objective": "regression",
  90. "reg_sqrt": True,
  91. "metric": "mape",
  92. "max_depth": -1,
  93. "num_leaves": 50,
  94. "learning_rate": 0.1,
  95. "bagging_fraction": 0.7,
  96. "feature_fraction": 0.7,
  97. "bagging_freq": 8,
  98. "bagging_seed": 2018,
  99. "lambda_l1": 0.11,
  100. "boosting": "dart",
  101. "nthread": 4,
  102. "verbosity": -1
  103. }
  104. # 初始化数据集
  105. train_set = lgb.Dataset(data=x_train, label=y_train)
  106. test_set = lgb.Dataset(data=x_test, label=y_test)
  107. # 模型训练
  108. evals_result = {}
  109. model = lgb.train(params=params, train_set=train_set, num_boost_round=5000,
  110. valid_sets=[test_set], early_stopping_rounds=100,
  111. verbose_eval=100, evals_result=evals_result)
  112. # 将模型特征重要度存入csv
  113. feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()}
  114. feature_importance_filename = 'model_feature_importance.csv'
  115. pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'],
  116. ascending=False, **feature_importance_data)
  117. # 测试集预测
  118. pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration)
  119. y_test = y_test.values
  120. err_mape = mean_absolute_percentage_error(y_test, pre_y_test)
  121. r2 = r2_score(y_test, pre_y_test)
  122. # 将测试集结果存入csv
  123. test_data = {'pre_y_test': pre_y_test, 'y_test': y_test}
  124. test_result_filename = 'test_result.csv'
  125. pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data)
  126. log_.info('err_mape={}, r2={}'.format(err_mape, r2))
  127. # 保存模型
  128. write_to_pickle(data=model, filename=config_.MODEL_FILENAME)
  129. def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data):
  130. """
  131. 打包数据并存入csv
  132. :param filename: csv文件名
  133. :param sort_columns: 指定排序列名列名,type-list, 默认为None
  134. :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
  135. :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
  136. :param data: 数据, type-dict
  137. :return: None
  138. """
  139. if not os.path.exists(filepath):
  140. os.makedirs(filepath)
  141. file = os.path.join(filepath, filename)
  142. df = pd.DataFrame(data=data)
  143. if sort_columns:
  144. df = df.sort_values(by=sort_columns, ascending=ascending)
  145. df.to_csv(file, index=False)
  146. def pack_list_result_to_csv(filename, data, columns=None, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True):
  147. """
  148. 打包数据并存入csv, 数据为字典列表
  149. :param filename: csv文件名
  150. :param data: 数据,type-list [{}, {},...]
  151. :param columns: 列名顺序
  152. :param sort_columns: 指定排序列名列名,type-list, 默认为None
  153. :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
  154. :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
  155. :return: None
  156. """
  157. if not os.path.exists(filepath):
  158. os.makedirs(filepath)
  159. file = os.path.join(filepath, filename)
  160. df = pd.DataFrame(data=data)
  161. if sort_columns:
  162. df = df.sort_values(by=sort_columns, ascending=ascending)
  163. df.to_csv(file, index=False, columns=columns)
  164. def predict():
  165. """预测"""
  166. # 读取预测数据并进行清洗
  167. x, video_ids = process_predict_data(config_.PREDICT_DATA_FILENAME)
  168. log_.info('predict data shape: x={}'.format(x.shape))
  169. # 获取训练好的模型
  170. model = read_from_pickle(filename=config_.MODEL_FILENAME)
  171. # 预测
  172. y_ = model.predict(x)
  173. log_.info('predict finished!')
  174. # 将结果进行归一化到[0, 100]
  175. normal_y_ = data_normalization(list(y_))
  176. log_.info('normalization finished!')
  177. # 按照normal_y_降序排序
  178. predict_data = []
  179. for i, video_id in enumerate(video_ids):
  180. data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i]}
  181. predict_data.append(data)
  182. predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True)
  183. # 按照排序,从100以固定差值做等差递减,以该值作为rovScore
  184. predict_result = []
  185. redis_data = {}
  186. json_data = []
  187. video_id_list = []
  188. for j, item in enumerate(predict_data_sorted):
  189. video_id = int(item['video_id'])
  190. rov_score = 100 - j * config_.ROV_SCORE_D
  191. item['rov_score'] = rov_score
  192. predict_result.append(item)
  193. redis_data[video_id] = rov_score
  194. json_data.append({'videoId': video_id, 'rovScore': rov_score})
  195. video_id_list.append(video_id)
  196. # 打包预测结果存入csv
  197. predict_result_filename = 'predict.csv'
  198. pack_list_result_to_csv(filename=predict_result_filename,
  199. data=predict_result,
  200. columns=['video_id', 'rov_score', 'normal_y_', 'y_'],
  201. sort_columns=['rov_score'],
  202. ascending=False)
  203. """
  204. # 上传redis
  205. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  206. redis_helper = RedisHelper()
  207. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
  208. log_.info('data to redis finished!')
  209. # 清空修改ROV的视频数据
  210. redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
  211. # 通知后端更新数据
  212. log_.info('json_data count = {}'.format(len(json_data)))
  213. result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
  214. if result['code'] == 0:
  215. log_.info('notify backend success!')
  216. else:
  217. log_.error('notify backend fail!')
  218. """
  219. # ##### 下线
  220. # # 更新视频的宽高比数据
  221. # if video_id_list:
  222. # update_video_w_h_rate(video_ids=video_id_list,
  223. # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
  224. # log_.info('update video w_h_rate to redis finished!')
  225. def predict_test():
  226. """测试环境数据生成"""
  227. # 获取测试环境中最近发布的40000条视频
  228. sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;"
  229. mysql_helper = MysqlHelper()
  230. data = mysql_helper.get_data(sql=sql)
  231. video_ids = [video[0] for video in data]
  232. # 视频状态过滤
  233. filtered_videos = filter_video_status(video_ids)
  234. log_.info('filtered_videos count = {}'.format(len(filtered_videos)))
  235. # 随机生成 0-100 数作为分数
  236. redis_data = {}
  237. json_data = []
  238. for video_id in filtered_videos:
  239. score = random.uniform(0, 100)
  240. redis_data[video_id] = score
  241. json_data.append({'videoId': video_id, 'rovScore': score})
  242. log_.info('json_data count = {}'.format(len(json_data)))
  243. # 上传Redis
  244. redis_helper = RedisHelper()
  245. key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
  246. redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
  247. log_.info('test data to redis finished!')
  248. # 清空修改ROV的视频数据
  249. redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
  250. # 通知后端更新数据
  251. result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
  252. if result['code'] == 0:
  253. log_.info('notify backend success!')
  254. else:
  255. log_.error('notify backend fail!')
  256. # ##### 下线
  257. # # 更新视频的宽高比数据
  258. # if filtered_videos:
  259. # update_video_w_h_rate(video_ids=filtered_videos,
  260. # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
  261. # log_.info('update video w_h_rate to redis finished!')
  262. if __name__ == '__main__':
  263. """
  264. log_.info('rov model train start...')
  265. train_start = time.time()
  266. train_filename = config_.TRAIN_DATA_FILENAME
  267. X, Y, videos, fea = process_data(filename=train_filename)
  268. log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
  269. train(X, Y, features=fea)
  270. train_end = time.time()
  271. log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
  272. """
  273. log_.info('rov model predict start...')
  274. predict_start = time.time()
  275. predict()
  276. # if env in ['dev', 'test']:
  277. # predict_test()
  278. # elif env in ['pre', 'pro']:
  279. # predict()
  280. # else:
  281. # log_.error('env error')
  282. predict_end = time.time()
  283. log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))