import os import random import time import lightgbm as lgb import pandas as pd from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error from config import set_config from utils import read_from_pickle, write_to_pickle, data_normalization, request_post, filter_video_status from log import Log from db_helper import RedisHelper, MysqlHelper config_ = set_config() log_ = Log() def process_data(filename): """ 数据清洗、预处理 :param filename: type-DataFrame :return: x, y, video_ids, features """ # 获取数据 data = read_from_pickle(filename) # 获取y,并将 y <= 0 的值更新为1 data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1 y = data['futre7dayreturn'] # 获取视频id列 video_ids = data['videoid'] # 获取x drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags'] x = data.drop(columns=drop_columns) # 计算后一天的回流比前一天的回流差值 x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn'] x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn'] x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn'] # 计算后一天回流比前一天回流的增长率 x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn'] x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn'] x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn'] # 缺失值填充为0 x.fillna(0) # 获取当前所使用的特征列表 features = list(x) return x, y, video_ids, features def train(x, y, features): """ 训练模型 :param x: X :param y: Y :param features: 特征列表 :return: None """ # 训练集、测试集分割 x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33) log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape)) log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape)) # 训练参数设置 params = { "objective": "regression", "reg_sqrt": True, "metric": "mape", "max_depth": -1, "num_leaves": 50, "learning_rate": 0.1, "bagging_fraction": 0.7, "feature_fraction": 0.7, "bagging_freq": 8, "bagging_seed": 2018, "lambda_l1": 0.11, "boosting": "dart", "nthread": 4, "verbosity": -1 } # 初始化数据集 train_set = lgb.Dataset(data=x_train, label=y_train) test_set = lgb.Dataset(data=x_test, label=y_test) # 模型训练 evals_result = {} model = lgb.train(params=params, train_set=train_set, num_boost_round=5000, valid_sets=[test_set], early_stopping_rounds=100, verbose_eval=100, evals_result=evals_result) # 将模型特征重要度存入csv feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()} feature_importance_filename = 'model_feature_importance.csv' pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'], ascending=False, **feature_importance_data) # 测试集预测 pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration) y_test = y_test.values err_mape = mean_absolute_percentage_error(y_test, pre_y_test) r2 = r2_score(y_test, pre_y_test) # 将测试集结果存入csv test_data = {'pre_y_test': pre_y_test, 'y_test': y_test} test_result_filename = 'test_result.csv' pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data) log_.info('err_mape={}, r2={}'.format(err_mape, r2)) # 保存模型 write_to_pickle(data=model, filename=config_.MODEL_FILENAME) def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data): """ 打包数据并存入csv :param filename: csv文件名 :param sort_columns: 指定排序列名列名,type-list, 默认为None :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列 :param data: 数据 :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) df = pd.DataFrame(data=data) if sort_columns: df = df.sort_values(by=sort_columns, ascending=ascending) df.to_csv(file, index=False) def predict(): """预测""" # 读取预测数据并进行清洗 x, y, video_ids, _ = process_data(config_.PREDICT_DATA_FILENAME) log_.info('predict data shape: x={}'.format(x.shape)) # 获取训练好的模型 model = read_from_pickle(filename=config_.MODEL_FILENAME) # 预测 y_ = model.predict(x) log_.info('predict finished!') # 将结果进行归一化到[0, 100] normal_y_ = data_normalization(list(y_)) log_.info('normalization finished!') # 打包预测结果存入csv predict_data = {'normal_y_': normal_y_, 'y_': y_, 'y': y, 'video_ids': video_ids} predict_result_filename = 'predict.csv' pack_result_to_csv(filename=predict_result_filename, sort_columns=['normal_y_'], ascending=False, **predict_data) # 上传redis redis_data = {} json_data = [] for i in range(len(video_ids)): redis_data[video_ids[i]] = normal_y_[i] json_data.append({'videoId': video_ids[i], 'rovScore': normal_y_[i]}) key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d') redis_helper = RedisHelper() redis_helper.add_data_with_zset(key_name=key_name, data=redis_data) log_.info('data to redis finished!') # 通知后端更新数据 result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data}) if result['code'] == 0: log_.info('notify backend success!') else: log_.error('notify backend fail!') def predict_test(): """测试环境数据生成""" # 获取测试环境中最近发布的40000条视频 mysql_info = { 'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com', 'port': 3306, 'user': 'wx2016_longvideo', 'password': 'wx2016_longvideoP@assword1234', 'db': 'longvideo' } sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;" mysql_helper = MysqlHelper(mysql_info=mysql_info) data = mysql_helper.get_data(sql=sql) video_ids = [video[0] for video in data] # 视频状态过滤 filtered_videos = filter_video_status(video_ids) log_.info('filtered_videos nums={}'.format(len(filtered_videos))) # 随机生成 0-100 数作为分数 redis_data = {} json_data = [] for video_id in filtered_videos: score = random.uniform(0, 100) redis_data[video_id] = score json_data.append({'videoId': video_id, 'rovScore': score}) # 上传Redis redis_helper = RedisHelper() key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d') redis_helper.add_data_with_zset(key_name=key_name, data=redis_data) log_.info('test data to redis finished!') # 通知后端更新数据 result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data}) if result['code'] == 0: log_.info('notify backend success!') else: log_.error('notify backend fail!') if __name__ == '__main__': log_.info('rov model train start...') train_start = time.time() train_filename = config_.TRAIN_DATA_FILENAME X, Y, videos, fea = process_data(filename=train_filename) log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape)) train(X, Y, features=fea) train_end = time.time() log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000)) log_.info('rov model predict start...') predict_start = time.time() predict() predict_end = time.time() log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))