import pandas as pd import datetime import pickle import os import process_feature from odps import ODPS from datetime import datetime as dt from config import set_config config_ = set_config() def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 从odps获取数据 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ odps = ODPS( access_id='LTAI4FtW5ZzxMvdw35aNkmcp', secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc', project=project, endpoint='http://service.cn.maxcompute.aliyun.com/api', connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) records = odps.read_table(name=table, partition='dt=%s' % date) return records def get_rov_feature_table(date, project, table): """ 从DataWorks表中获取对应的特征值 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :return: feature_array type-DataFrame """ records = get_data_from_odps(date=date, project=project, table=table) feature_value_list = [] for record in records: feature_value = {} for feature_name in process_feature.features: if feature_name == 'dt': feature_value[feature_name] = date else: feature_value[feature_name] = record[feature_name] feature_value_list.append(feature_value) feature_array = pd.DataFrame(feature_value_list) print(date, project, table, 'feature table finish') return feature_array def get_data_with_date(date, delta_days, project, table): """ 获取某一时间范围的特征数据 :param date: 标准日期,delta基准,type-string,'%Y%m%d' :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」 :param project: type-string :param table: DataWorks表名,type-string :return: data,type-DataFrame """ base_date = dt.strptime(date, '%Y%m%d') data_list = [] for days in range(0, delta_days): delta = datetime.timedelta(days=days) delta_date = base_date - delta # 获取特征数据 delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table) data_list.append(delta_data) data = pd.concat(data_list) # 重新进行索引 data.reset_index(inplace=True) # 删除index列 data = data.drop(columns=['index']) return data def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH): """ 将数据写入pickle文件中 :param data: 数据 :param filename: 写入的文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: None """ if not os.path.exists(filepath): os.makedirs(filepath) file = os.path.join(filepath, filename) with open(file, 'wb') as wf: pickle.dump(data, wf) def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH): """ 从pickle文件读取数据 :param filename: 文件名 :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH :return: data """ file = os.path.join(filepath, filename) if not os.path.exists(file): return None with open(file, 'rb') as rf: data = pickle.load(rf) return data def get_train_predict_data(): """ 获取训练和预测数据 :return: None """ now_date = datetime.datetime.today() # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件 train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF) train_date = dt.strftime(train_dt, '%Y%m%d') train_data = get_data_with_date( date=train_date, delta_days=config_.TRAIN_DELTA_DAYS, project=config_.TRAIN_PROJECT, table=config_.TRAIN_TABLE ) write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME) # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件 predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF) predict_date = dt.strftime(predict_dt, '%Y%m%d') predict_data = get_data_with_date( date=predict_date, delta_days=config_.PREDICT_DELTA_DAYS, project=config_.PREDICT_PROJECT, table=config_.PREDICT_TABLE ) write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME) def process_data(data): """ 数据清洗、预处理 :param data: type-DataFrame :return: """ pass if __name__ == '__main__': dt_test = '20211007' project_test = 'usercdm' table_test = 'rov_feature_add_v1' # res = get_rov_feature_table(dt_test, table_test) # res = get_data_with_date(date=dt_test, delta_days=2, project=project_test, table=table_test) # print(res.shape) # write_to_pickle(res, 'test.pickle') data = read_from_pickle('test.pickle') if data is not None: print(data.shape, type(data)) print(list(data)) print(data[data['futre7dayreturn']<0]) else: print(data)