import os import random import time import lightgbm as lgb import pandas as pd from sklearn.model_selection import train_test_split from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error from config import set_config from utils import read_from_pickle, write_to_pickle, data_normalization, \ request_post, filter_video_status, update_video_w_h_rate, filter_video_status_app, filter_shield_video from log import Log from db_helper import RedisHelper, MysqlHelper config_, env = set_config() log_ = Log() def process_data(filename): """ 数据清洗、预处理 :param filename: type-DataFrame :return: x, y, video_ids, features """ # 获取数据 data = read_from_pickle(filename) # 获取y,并将 y <= 0 的值更新为1 data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1 y = data['futre7dayreturn'] # 获取视频id列 video_ids = data['videoid'] # 获取x drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags'] x = data.drop(columns=drop_columns) # 计算后一天的回流比前一天的回流差值 x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn'] x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn'] x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn'] # 计算后一天回流比前一天回流的增长率 x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn'] x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn'] x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn'] # 缺失值填充为0 x.fillna(0, inplace=True) # 获取当前所使用的特征列表 features = list(x) return x, y, video_ids, features def process_predict_data(filename): """ 预测数据清洗、预处理 :param filename: type-DataFrame :return: x, y, video_ids, features """ # 获取数据 data = read_from_pickle(filename) # 获取视频id列 video_ids = data['videoid'] # 视频状态过滤 video_id_list = [int(video_id) for video_id in video_ids] filtered_videos = [str(item) for item in filter_video_status(video_ids=video_id_list)] data = data.loc[data['videoid'].isin(filtered_videos)] video_id_final = data['videoid'] # 获取x drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags'] x = data.drop(columns=drop_columns) # 计算后一天的回流比前一天的回流差值 x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn'] x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn'] x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn'] # 计算后一天回流比前一天回流的增长率 x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn'] x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn'] x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn'] # 缺失值填充为0 x.fillna(0, inplace=True) return x, video_id_final def train(x, y, features): """ 训练模型 :param x: X :param y: Y :param features: 特征列表 :return: None """ # 训练集、测试集分割 x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33) log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape)) log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape)) # 训练参数设置 params = { "objective": "regression", "reg_sqrt": True, "metric": "mape", "max_depth": -1, "num_leaves": 50, "learning_rate": 0.1, "bagging_fraction": 0.7, "feature_fraction": 0.7, "bagging_freq": 8, "bagging_seed": 2018, "lambda_l1": 0.11, "boosting": "dart", "nthread": 4, "verbosity": -1 } # 初始化数据集 train_set = lgb.Dataset(data=x_train, label=y_train) test_set = lgb.Dataset(data=x_test, label=y_test) # 模型训练 evals_result = {} model = lgb.train(params=params, train_set=train_set, num_boost_round=5000, valid_sets=[test_set], early_stopping_rounds=100, verbose_eval=100, evals_result=evals_result) # 将模型特征重要度存入csv feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()} feature_importance_filename = 'model_feature_importance.csv' pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'], ascending=False, **feature_importance_data) # 测试集预测 pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration) y_test = y_test.values err_mape = mean_absolute_percentage_error(y_test, pre_y_test) r2 = r2_score(y_test, pre_y_test) # 将测试集结果存入csv test_data = {'pre_y_test': pre_y_test, 'y_test': y_test} test_result_filename = 'test_result.csv' pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data) log_.info('err_mape={}, r2={}'.format(err_mape, r2)) # 保存模型 write_to_pickle(data=model, filename=config_.MODEL_FILENAME) def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data): """ 打包数据并存入csv :param filename: csv文件名 :param sort_columns: 指定排序列名列名,type-list, 默认为None :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列 :param data: 数据, type-dict :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) df = pd.DataFrame(data=data) if sort_columns: df = df.sort_values(by=sort_columns, ascending=ascending) df.to_csv(file, index=False) def pack_list_result_to_csv(filename, data, columns=None, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True): """ 打包数据并存入csv, 数据为字典列表 :param filename: csv文件名 :param data: 数据,type-list [{}, {},...] :param columns: 列名顺序 :param sort_columns: 指定排序列名列名,type-list, 默认为None :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列 :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) df = pd.DataFrame(data=data) if sort_columns: df = df.sort_values(by=sort_columns, ascending=ascending) df.to_csv(file, index=False, columns=columns) def predict(): """预测""" # 读取预测数据并进行清洗 x, video_ids = process_predict_data(config_.PREDICT_DATA_FILENAME) log_.info('predict data shape: x={}'.format(x.shape)) # 获取训练好的模型 model = read_from_pickle(filename=config_.MODEL_FILENAME) # 预测 y_ = model.predict(x) log_.info('predict finished!') # 将结果进行归一化到[0, 100] normal_y_ = data_normalization(list(y_)) log_.info('normalization finished!') # 按照normal_y_降序排序 predict_data = [] for i, video_id in enumerate(video_ids): data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i]} predict_data.append(data) predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True) # 按照排序,从100以固定差值做等差递减,以该值作为rovScore predict_result = [] redis_data = {} json_data = [] video_id_list = [] for j, item in enumerate(predict_data_sorted): video_id = int(item['video_id']) rov_score = 100 - j * config_.ROV_SCORE_D item['rov_score'] = rov_score predict_result.append(item) redis_data[video_id] = rov_score json_data.append({'videoId': video_id, 'rovScore': rov_score}) video_id_list.append(video_id) # 打包预测结果存入csv predict_result_filename = 'predict.csv' pack_list_result_to_csv(filename=predict_result_filename, data=predict_result, columns=['video_id', 'rov_score', 'normal_y_', 'y_'], sort_columns=['rov_score'], ascending=False) # 过滤 applet_status_filtered_videos = filter_video_status(video_ids=video_id_list) log_.info('applet_status_filtered_videos count = {}'.format(len(applet_status_filtered_videos))) # 屏蔽视频过滤 applet_filtered_videos = filter_shield_video(video_ids=applet_status_filtered_videos, shield_key_name_list=config_.SHIELD_CONFIG.get('-1')) log_.info('applet_filtered_videos count = {}'.format(len(applet_filtered_videos))) # 获取视频对应分数 applet_redis_data = {} for video_id in applet_filtered_videos: applet_redis_data[video_id] = redis_data.get(video_id) # 上传redis key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d') redis_helper = RedisHelper() redis_helper.add_data_with_zset(key_name=key_name, data=applet_redis_data, expire_time=2*24*3600) log_.info('applet data to redis finished!') # 清空修改ROV的视频数据 redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME) # 通知后端更新数据 log_.info('json_data count = {}'.format(len(json_data))) result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data}) if result['code'] == 0: log_.info('notify backend success!') else: log_.error('notify backend fail!') # ##### 下线 # # 更新视频的宽高比数据 # if video_id_list: # update_video_w_h_rate(video_ids=video_id_list, # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall']) # log_.info('update video w_h_rate to redis finished!') # ####### app应用数据更新 # 过滤 app_status_filtered_videos = filter_video_status_app(video_ids=video_id_list) log_.info('app_status_filtered_videos count = {}'.format(len(app_status_filtered_videos))) # 屏蔽视频过滤 app_filtered_videos = filter_shield_video(video_ids=app_status_filtered_videos, shield_key_name_list=config_.SHIELD_CONFIG.get('-1')) log_.info('app_filtered_videos count = {}'.format(len(app_filtered_videos))) # 获取视频对应分数 app_redis_data = {} for video_id in app_filtered_videos: app_redis_data[video_id] = redis_data.get(video_id) # 上传Redis redis_helper = RedisHelper() app_key_name = config_.RECALL_KEY_NAME_PREFIX_APP + time.strftime('%Y%m%d') redis_helper.add_data_with_zset(key_name=app_key_name, data=app_redis_data, expire_time=2*24*3600) log_.info('app data to redis finished!') # 清空修改ROV的视频数据 redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME_APP) def predict_test(): """测试环境数据生成""" # 获取测试环境中最近发布的40000条视频 sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;" mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO) data = mysql_helper.get_data(sql=sql) video_ids = [video[0] for video in data] # 视频状态过滤 applet_status_filtered_videos = filter_video_status(video_ids=video_ids) log_.info('applet_status_filtered_videos count = {}'.format(len(applet_status_filtered_videos))) # 屏蔽视频过滤 applet_filtered_videos = filter_shield_video(video_ids=applet_status_filtered_videos, shield_key_name_list=config_.SHIELD_CONFIG.get('-1')) log_.info('applet_filtered_videos count = {}'.format(len(applet_filtered_videos))) # 随机生成 0-100 数作为分数 redis_data = {} json_data = [] for video_id in applet_filtered_videos: score = random.uniform(0, 100) redis_data[video_id] = score json_data.append({'videoId': video_id, 'rovScore': score}) log_.info('json_data count = {}'.format(len(json_data))) # 上传Redis redis_helper = RedisHelper() key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d') redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2*24*3600) log_.info('test data to redis finished!') # 清空修改ROV的视频数据 redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME) # 通知后端更新数据 result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data}) if result['code'] == 0: log_.info('notify backend success!') else: log_.error('notify backend fail!') # ##### 下线 # # 更新视频的宽高比数据 # if filtered_videos: # update_video_w_h_rate(video_ids=filtered_videos, # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall']) # log_.info('update video w_h_rate to redis finished!') # ####### app应用数据更新 # 过滤 app_status_filtered_videos = filter_video_status_app(video_ids=applet_filtered_videos) log_.info('app_status_filtered_videos count = {}'.format(len(app_status_filtered_videos))) # 屏蔽视频过滤 app_filtered_videos = filter_shield_video(video_ids=app_status_filtered_videos, shield_key_name_list=config_.SHIELD_CONFIG.get('-1')) log_.info('app_filtered_videos count = {}'.format(len(app_filtered_videos))) # 获取视频对应分数 app_redis_data = {} for video_id in app_filtered_videos: app_redis_data[video_id] = redis_data.get(video_id) # 上传Redis redis_helper = RedisHelper() app_key_name = config_.RECALL_KEY_NAME_PREFIX_APP + time.strftime('%Y%m%d') redis_helper.add_data_with_zset(key_name=app_key_name, data=app_redis_data, expire_time=2*24*3600) log_.info('app test data to redis finished!') # 清空修改ROV的视频数据 redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME_APP) # ####### appType: [18, 19] 应用数据更新 # for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # log_.info(f"app_type = {app_type}") # videos_temp = random.sample(filtered_videos, 300) # redis_data_temp = {} # csv_data_temp = [] # for video_id in videos_temp: # score = random.uniform(0, 100) # redis_data_temp[video_id] = score # csv_data_temp.append({'video_id': video_id, 'rov_score': score}) # # 打包预测结果存入csv # predict_result_filename = f'predict_{app_type}.csv' # pack_list_result_to_csv(filename=predict_result_filename, # data=csv_data_temp, # columns=['video_id', 'rov_score'], # sort_columns=['rov_score'], # ascending=False) # # # 上传redis # key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{time.strftime('%Y%m%d')}" # redis_helper = RedisHelper() # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data_temp) # log_.info('data to redis finished!') def predict_18_19(): """预测 app_type:[18, 19]""" for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: log_.info(f"app_type = {app_type}") # 读取预测数据并进行清洗 predict_data_filename = config_.PREDICT_DATA_FILENAME_18_19[str(app_type)] x, video_ids = process_predict_data(predict_data_filename) log_.info('predict data shape: x = {}'.format(x.shape)) # 获取训练好的模型 model = read_from_pickle(filename=config_.MODEL_FILENAME) # 预测 y_ = model.predict(x) log_.info('predict finished!') # 将结果进行归一化到[0, 100] normal_y_ = data_normalization(list(y_)) log_.info('normalization finished!') # 按照normal_y_降序排序 predict_data = [] for i, video_id in enumerate(video_ids): data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i]} predict_data.append(data) predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True) # 按照排序,从100以固定差值做等差递减,以该值作为rovScore predict_result = [] redis_data = {} json_data = [] video_id_list = [] for j, item in enumerate(predict_data_sorted): video_id = int(item['video_id']) rov_score = 100 - j * config_.ROV_SCORE_D item['rov_score'] = rov_score predict_result.append(item) redis_data[video_id] = rov_score json_data.append({'videoId': video_id, 'rovScore': rov_score}) video_id_list.append(video_id) # 打包预测结果存入csv predict_result_filename = f'predict_{app_type}.csv' pack_list_result_to_csv(filename=predict_result_filename, data=predict_result, columns=['video_id', 'rov_score', 'normal_y_', 'y_'], sort_columns=['rov_score'], ascending=False) # 上传redis key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{time.strftime('%Y%m%d')}" redis_helper = RedisHelper() redis_helper.add_data_with_zset(key_name=key_name, data=redis_data) log_.info('data to redis finished!') if __name__ == '__main__': log_.info('rov model train start...') train_start = time.time() train_filename = config_.TRAIN_DATA_FILENAME X, Y, videos, fea = process_data(filename=train_filename) log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape)) train(X, Y, features=fea) train_end = time.time() log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000)) log_.info('rov model predict start...') predict_start = time.time() if env in ['dev', 'test']: predict_test() elif env in ['pre', 'pro']: predict() # predict_18_19() else: log_.error('env error') predict_end = time.time() log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))