import pandas as pd import datetime import process_feature from datetime import datetime as dt from my_config import set_config from my_utils import get_data_from_odps, write_to_pickle from log import Log config_, _ = set_config() log_ = Log() def get_rov_feature_table(date, project, table): """ 从DataWorks表中获取对应的特征值 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :return: feature_array type-DataFrame """ records = get_data_from_odps(date=date, project=project, table=table) feature_value_list = [] for record in records: feature_value = {} for feature_name in process_feature.features: if feature_name == 'dt': feature_value[feature_name] = date else: feature_value[feature_name] = record[feature_name] feature_value_list.append(feature_value) feature_array = pd.DataFrame(feature_value_list) log_.info('feature table finished... date={}, shape={}'.format(date, feature_array.shape)) return feature_array def get_data_with_date(date, delta_days, project, table): """ 获取某一时间范围的特征数据 :param date: 标准日期,delta基准,type-string,'%Y%m%d' :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」 :param project: type-string :param table: DataWorks表名,type-string :return: data,type-DataFrame """ base_date = dt.strptime(date, '%Y%m%d') data_list = [] for days in range(0, delta_days): delta = datetime.timedelta(days=days) delta_date = base_date - delta # 获取特征数据 delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table) data_list.append(delta_data) data = pd.concat(data_list) # 重新进行索引 data.reset_index(inplace=True) # 删除index列 data = data.drop(columns=['index']) return data def get_train_predict_data(): """ 获取训练和预测数据 :return: None """ now_date = datetime.datetime.today() log_.info('now date: {}'.format(now_date)) # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件 log_.info('===== train data') train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF) train_date = dt.strftime(train_dt, '%Y%m%d') train_data = get_data_with_date( date=train_date, delta_days=config_.TRAIN_DELTA_DAYS, project=config_.TRAIN_PROJECT, table=config_.TRAIN_TABLE ) write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME) log_.info('train data finished, shape={}'.format(train_data.shape)) # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件 log_.info('===== predict data') predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF) predict_date = dt.strftime(predict_dt, '%Y%m%d') predict_data = get_data_with_date( date=predict_date, delta_days=config_.PREDICT_DELTA_DAYS, project=config_.PREDICT_PROJECT, table=config_.PREDICT_TABLE ) write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME) log_.info('predict data finished, shape={}'.format(predict_data.shape)) # ###### app_type: [18, 19]预测数据 # for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]: # log_.info(f"app_type = {app_type}") # project = config_.PREDICT_PROJECT_18_19[str(app_type)] # table = config_.PREDICT_TABLE_18_19[str(app_type)] # predict_data = get_data_with_date( # date=predict_date, # delta_days=config_.PREDICT_DELTA_DAYS, # project=project, # table=table # ) # write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME_18_19[str(app_type)]) # log_.info(f'predict data finished, app_type = {app_type}, shape={predict_data.shape}') if __name__ == '__main__': get_train_predict_data()