123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 |
- import os
- import random
- import time
- import lightgbm as lgb
- import pandas as pd
- from sklearn.model_selection import train_test_split
- from sklearn.metrics import mean_absolute_error, r2_score, mean_absolute_percentage_error
- from my_config import set_config
- from my_utils import read_from_pickle, write_to_pickle, data_normalization, \
- request_post, filter_video_status, update_video_w_h_rate, filter_video_status_app, filter_shield_video
- from log import Log
- from db_helper import RedisHelper, MysqlHelper
- config_, env = set_config()
- log_ = Log()
- def process_data(filename):
- """
- 数据清洗、预处理
- :param filename: type-DataFrame
- :return: x, y, video_ids, features
- """
- # 获取数据
- data = read_from_pickle(filename)
- # 获取y,并将 y <= 0 的值更新为1
- data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1
- y = data['futre7dayreturn']
- # 获取视频id列
- video_ids = data['videoid']
- # 获取x
- drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
- x = data.drop(columns=drop_columns)
- # 计算后一天的回流比前一天的回流差值
- x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
- x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
- x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
- # 计算后一天回流比前一天回流的增长率
- x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
- x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
- x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
- # 缺失值填充为0
- x.fillna(0, inplace=True)
- # 获取当前所使用的特征列表
- features = list(x)
- return x, y, video_ids, features
- def process_predict_data(filename):
- """
- 预测数据清洗、预处理
- :param filename: type-DataFrame
- :return: x, y, video_ids, features
- """
- # 获取数据
- data = read_from_pickle(filename)
- # 获取视频id列
- video_ids = data['videoid']
- # 视频状态过滤
- video_id_list = [int(video_id) for video_id in video_ids]
- filtered_videos = [str(item) for item in filter_video_status(video_ids=video_id_list)]
- data = data.loc[data['videoid'].isin(filtered_videos)]
- video_id_final = data['videoid']
- # 获取x
- drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
- x = data.drop(columns=drop_columns)
- # 计算后一天的回流比前一天的回流差值
- x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
- x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
- x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
- # 计算后一天回流比前一天回流的增长率
- x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
- x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
- x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
- # 缺失值填充为0
- x.fillna(0, inplace=True)
- return x, video_id_final
- def train(x, y, features):
- """
- 训练模型
- :param x: X
- :param y: Y
- :param features: 特征列表
- :return: None
- """
- # 训练集、测试集分割
- x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33)
- log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape))
- log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape))
- # 训练参数设置
- params = {
- "objective": "regression",
- "reg_sqrt": True,
- "metric": "mape",
- "max_depth": -1,
- "num_leaves": 50,
- "learning_rate": 0.1,
- "bagging_fraction": 0.7,
- "feature_fraction": 0.7,
- "bagging_freq": 8,
- "bagging_seed": 2018,
- "lambda_l1": 0.11,
- "boosting": "dart",
- "nthread": 4,
- "verbosity": -1
- }
- # 初始化数据集
- train_set = lgb.Dataset(data=x_train, label=y_train)
- test_set = lgb.Dataset(data=x_test, label=y_test)
- # 模型训练
- evals_result = {}
- model = lgb.train(params=params, train_set=train_set, num_boost_round=5000,
- valid_sets=[test_set], early_stopping_rounds=100,
- verbose_eval=100, evals_result=evals_result)
- # 将模型特征重要度存入csv
- feature_importance_data = {'feature': features, 'feature_importance': model.feature_importance()}
- feature_importance_filename = 'model_feature_importance.csv'
- pack_result_to_csv(filename=feature_importance_filename, sort_columns=['feature_importance'],
- ascending=False, **feature_importance_data)
- # 测试集预测
- pre_y_test = model.predict(data=x_test, num_iteration=model.best_iteration)
- y_test = y_test.values
- err_mape = mean_absolute_percentage_error(y_test, pre_y_test)
- r2 = r2_score(y_test, pre_y_test)
- # 将测试集结果存入csv
- test_data = {'pre_y_test': pre_y_test, 'y_test': y_test}
- test_result_filename = 'test_result.csv'
- pack_result_to_csv(filename=test_result_filename, sort_columns=['pre_y_test'], ascending=False, **test_data)
- log_.info('err_mape={}, r2={}'.format(err_mape, r2))
- # 保存模型
- write_to_pickle(data=model, filename=config_.MODEL_FILENAME)
- def pack_result_to_csv(filename, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True, **data):
- """
- 打包数据并存入csv
- :param filename: csv文件名
- :param sort_columns: 指定排序列名列名,type-list, 默认为None
- :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
- :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
- :param data: 数据, type-dict
- :return: None
- """
- if not os.path.exists(filepath):
- os.makedirs(filepath)
- file = os.path.join(filepath, filename)
- df = pd.DataFrame(data=data)
- if sort_columns:
- df = df.sort_values(by=sort_columns, ascending=ascending)
- df.to_csv(file, index=False)
- def pack_list_result_to_csv(filename, data, columns=None, sort_columns=None, filepath=config_.DATA_DIR_PATH, ascending=True):
- """
- 打包数据并存入csv, 数据为字典列表
- :param filename: csv文件名
- :param data: 数据,type-list [{}, {},...]
- :param columns: 列名顺序
- :param sort_columns: 指定排序列名列名,type-list, 默认为None
- :param filepath: csv文件存放路径,默认为config_.DATA_DIR_PATH
- :param ascending: 是否按指定列的数组升序排列,默认为True,即升序排列
- :return: None
- """
- if not os.path.exists(filepath):
- os.makedirs(filepath)
- file = os.path.join(filepath, filename)
- df = pd.DataFrame(data=data)
- if sort_columns:
- df = df.sort_values(by=sort_columns, ascending=ascending)
- df.to_csv(file, index=False, columns=columns)
- def predict():
- """预测"""
- # 读取预测数据并进行清洗
- x, video_ids = process_predict_data(config_.PREDICT_DATA_FILENAME)
- log_.info('predict data shape: x={}'.format(x.shape))
- # 获取训练好的模型
- model = read_from_pickle(filename=config_.MODEL_FILENAME)
- # 预测
- y_ = model.predict(x)
- log_.info('predict finished!')
- # 将结果进行归一化到[0, 100]
- normal_y_ = data_normalization(list(y_))
- log_.info('normalization finished!')
- # 按照normal_y_降序排序
- predict_data = []
- for i, video_id in enumerate(video_ids):
- data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i]}
- predict_data.append(data)
- predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True)
- # 按照排序,从100以固定差值做等差递减,以该值作为rovScore
- predict_result = []
- redis_data = {}
- json_data = []
- video_id_list = []
- for j, item in enumerate(predict_data_sorted):
- video_id = int(item['video_id'])
- rov_score = 100 - j * config_.ROV_SCORE_D
- item['rov_score'] = rov_score
- predict_result.append(item)
- redis_data[video_id] = rov_score
- json_data.append({'videoId': video_id, 'rovScore': rov_score})
- video_id_list.append(video_id)
- # 打包预测结果存入csv
- predict_result_filename = 'predict.csv'
- pack_list_result_to_csv(filename=predict_result_filename,
- data=predict_result,
- columns=['video_id', 'rov_score', 'normal_y_', 'y_'],
- sort_columns=['rov_score'],
- ascending=False)
- # 过滤
- applet_status_filtered_videos = filter_video_status(video_ids=video_id_list)
- log_.info('applet_status_filtered_videos count = {}'.format(len(applet_status_filtered_videos)))
- # 屏蔽视频过滤
- applet_filtered_videos = filter_shield_video(video_ids=applet_status_filtered_videos,
- shield_key_name_list=config_.SHIELD_CONFIG.get('-1'))
- log_.info('applet_filtered_videos count = {}'.format(len(applet_filtered_videos)))
- # 获取视频对应分数
- applet_redis_data = {}
- for video_id in applet_filtered_videos:
- applet_redis_data[video_id] = redis_data.get(video_id)
- # 上传redis
- key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
- redis_helper = RedisHelper()
- redis_helper.add_data_with_zset(key_name=key_name, data=applet_redis_data, expire_time=2*24*3600)
- log_.info('applet data to redis finished!')
- # 清空修改ROV的视频数据
- redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
- # 通知后端更新数据
- log_.info('json_data count = {}'.format(len(json_data)))
- result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
- if result['code'] == 0:
- log_.info('notify backend success!')
- else:
- log_.error('notify backend fail!')
- # ##### 下线
- # # 更新视频的宽高比数据
- # if video_id_list:
- # update_video_w_h_rate(video_ids=video_id_list,
- # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
- # log_.info('update video w_h_rate to redis finished!')
- # ####### app应用数据更新
- # 过滤
- app_status_filtered_videos = filter_video_status_app(video_ids=video_id_list)
- log_.info('app_status_filtered_videos count = {}'.format(len(app_status_filtered_videos)))
- # 屏蔽视频过滤
- app_filtered_videos = filter_shield_video(video_ids=app_status_filtered_videos,
- shield_key_name_list=config_.SHIELD_CONFIG.get('-1'))
- log_.info('app_filtered_videos count = {}'.format(len(app_filtered_videos)))
- # 获取视频对应分数
- app_redis_data = {}
- for video_id in app_filtered_videos:
- app_redis_data[video_id] = redis_data.get(video_id)
- # 上传Redis
- redis_helper = RedisHelper()
- app_key_name = config_.RECALL_KEY_NAME_PREFIX_APP + time.strftime('%Y%m%d')
- redis_helper.add_data_with_zset(key_name=app_key_name, data=app_redis_data, expire_time=2*24*3600)
- log_.info('app data to redis finished!')
- # 清空修改ROV的视频数据
- redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME_APP)
- def predict_test():
- """测试环境数据生成"""
- # 获取测试环境中最近发布的40000条视频
- sql = "SELECT id FROM wx_video ORDER BY id DESC LIMIT 40000;"
- mysql_helper = MysqlHelper(mysql_info=config_.MYSQL_INFO)
- data = mysql_helper.get_data(sql=sql)
- video_ids = [video[0] for video in data]
- # 视频状态过滤
- applet_status_filtered_videos = filter_video_status(video_ids=video_ids)
- log_.info('applet_status_filtered_videos count = {}'.format(len(applet_status_filtered_videos)))
- # 屏蔽视频过滤
- applet_filtered_videos = filter_shield_video(video_ids=applet_status_filtered_videos,
- shield_key_name_list=config_.SHIELD_CONFIG.get('-1'))
- log_.info('applet_filtered_videos count = {}'.format(len(applet_filtered_videos)))
- # 随机生成 0-100 数作为分数
- redis_data = {}
- json_data = []
- for video_id in applet_filtered_videos:
- score = random.uniform(0, 100)
- redis_data[video_id] = score
- json_data.append({'videoId': video_id, 'rovScore': score})
- log_.info('json_data count = {}'.format(len(json_data)))
- # 上传Redis
- redis_helper = RedisHelper()
- key_name = config_.RECALL_KEY_NAME_PREFIX + time.strftime('%Y%m%d')
- redis_helper.add_data_with_zset(key_name=key_name, data=redis_data, expire_time=2*24*3600)
- log_.info('test data to redis finished!')
- # 清空修改ROV的视频数据
- redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME)
- # 通知后端更新数据
- result = request_post(request_url=config_.NOTIFY_BACKEND_UPDATE_ROV_SCORE_URL, request_data={'videos': json_data})
- if result['code'] == 0:
- log_.info('notify backend success!')
- else:
- log_.error('notify backend fail!')
- # ##### 下线
- # # 更新视频的宽高比数据
- # if filtered_videos:
- # update_video_w_h_rate(video_ids=filtered_videos,
- # key_name=config_.W_H_RATE_UP_1_VIDEO_LIST_KEY_NAME['rov_recall'])
- # log_.info('update video w_h_rate to redis finished!')
- # ####### app应用数据更新
- # 过滤
- app_status_filtered_videos = filter_video_status_app(video_ids=applet_filtered_videos)
- log_.info('app_status_filtered_videos count = {}'.format(len(app_status_filtered_videos)))
- # 屏蔽视频过滤
- app_filtered_videos = filter_shield_video(video_ids=app_status_filtered_videos,
- shield_key_name_list=config_.SHIELD_CONFIG.get('-1'))
- log_.info('app_filtered_videos count = {}'.format(len(app_filtered_videos)))
- # 获取视频对应分数
- app_redis_data = {}
- for video_id in app_filtered_videos:
- app_redis_data[video_id] = redis_data.get(video_id)
- # 上传Redis
- redis_helper = RedisHelper()
- app_key_name = config_.RECALL_KEY_NAME_PREFIX_APP + time.strftime('%Y%m%d')
- redis_helper.add_data_with_zset(key_name=app_key_name, data=app_redis_data, expire_time=2*24*3600)
- log_.info('app test data to redis finished!')
- # 清空修改ROV的视频数据
- redis_helper.del_keys(key_name=config_.UPDATE_ROV_KEY_NAME_APP)
- # ####### appType: [18, 19] 应用数据更新
- # for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- # log_.info(f"app_type = {app_type}")
- # videos_temp = random.sample(filtered_videos, 300)
- # redis_data_temp = {}
- # csv_data_temp = []
- # for video_id in videos_temp:
- # score = random.uniform(0, 100)
- # redis_data_temp[video_id] = score
- # csv_data_temp.append({'video_id': video_id, 'rov_score': score})
- # # 打包预测结果存入csv
- # predict_result_filename = f'predict_{app_type}.csv'
- # pack_list_result_to_csv(filename=predict_result_filename,
- # data=csv_data_temp,
- # columns=['video_id', 'rov_score'],
- # sort_columns=['rov_score'],
- # ascending=False)
- #
- # # 上传redis
- # key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{time.strftime('%Y%m%d')}"
- # redis_helper = RedisHelper()
- # redis_helper.add_data_with_zset(key_name=key_name, data=redis_data_temp)
- # log_.info('data to redis finished!')
- def predict_18_19():
- """预测 app_type:[18, 19]"""
- for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
- log_.info(f"app_type = {app_type}")
- # 读取预测数据并进行清洗
- predict_data_filename = config_.PREDICT_DATA_FILENAME_18_19[str(app_type)]
- x, video_ids = process_predict_data(predict_data_filename)
- log_.info('predict data shape: x = {}'.format(x.shape))
- # 获取训练好的模型
- model = read_from_pickle(filename=config_.MODEL_FILENAME)
- # 预测
- y_ = model.predict(x)
- log_.info('predict finished!')
- # 将结果进行归一化到[0, 100]
- normal_y_ = data_normalization(list(y_))
- log_.info('normalization finished!')
- # 按照normal_y_降序排序
- predict_data = []
- for i, video_id in enumerate(video_ids):
- data = {'video_id': video_id, 'normal_y_': normal_y_[i], 'y_': y_[i]}
- predict_data.append(data)
- predict_data_sorted = sorted(predict_data, key=lambda temp: temp['normal_y_'], reverse=True)
- # 按照排序,从100以固定差值做等差递减,以该值作为rovScore
- predict_result = []
- redis_data = {}
- json_data = []
- video_id_list = []
- for j, item in enumerate(predict_data_sorted):
- video_id = int(item['video_id'])
- rov_score = 100 - j * config_.ROV_SCORE_D
- item['rov_score'] = rov_score
- predict_result.append(item)
- redis_data[video_id] = rov_score
- json_data.append({'videoId': video_id, 'rovScore': rov_score})
- video_id_list.append(video_id)
- # 打包预测结果存入csv
- predict_result_filename = f'predict_{app_type}.csv'
- pack_list_result_to_csv(filename=predict_result_filename,
- data=predict_result,
- columns=['video_id', 'rov_score', 'normal_y_', 'y_'],
- sort_columns=['rov_score'],
- ascending=False)
- # 上传redis
- key_name = f"{config_.RECALL_KEY_NAME_PREFIX_APP_TYPE}{app_type}.{time.strftime('%Y%m%d')}"
- redis_helper = RedisHelper()
- redis_helper.add_data_with_zset(key_name=key_name, data=redis_data)
- log_.info('data to redis finished!')
- if __name__ == '__main__':
- log_.info('rov model train start...')
- train_start = time.time()
- train_filename = config_.TRAIN_DATA_FILENAME
- X, Y, videos, fea = process_data(filename=train_filename)
- log_.info('X_shape = {}, Y_sahpe = {}'.format(X.shape, Y.shape))
- train(X, Y, features=fea)
- train_end = time.time()
- log_.info('rov model train end, execute time = {}ms'.format((train_end - train_start)*1000))
- log_.info('rov model predict start...')
- predict_start = time.time()
- if env in ['dev', 'test']:
- predict_test()
- elif env in ['pre', 'pro']:
- predict()
- # predict_18_19()
- else:
- log_.error('env error')
- predict_end = time.time()
- log_.info('rov model predict end, execute time = {}ms'.format((predict_end - predict_start)*1000))
|