123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import pandas as pd
- import datetime
- import pickle
- import os
- import process_feature
- from odps import ODPS
- from datetime import datetime as dt
- from config import set_config
- config_ = set_config()
- def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
- pool_maxsize=1000, pool_connections=1000):
- """
- 从odps获取数据
- :param date: 日期 type-string '%Y%m%d'
- :param project: type-string
- :param table: 表名 type-string
- :param connect_timeout: 连接超时设置
- :param read_timeout: 读取超时设置
- :param pool_maxsize:
- :param pool_connections:
- :return: records
- """
- odps = ODPS(
- access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
- secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
- project=project,
- endpoint='http://service.cn.maxcompute.aliyun.com/api',
- connect_timeout=connect_timeout,
- read_timeout=read_timeout,
- pool_maxsize=pool_maxsize,
- pool_connections=pool_connections
- )
- records = odps.read_table(name=table, partition='dt=%s' % date)
- return records
- def get_rov_feature_table(date, project, table):
- """
- 从DataWorks表中获取对应的特征值
- :param date: 日期 type-string '%Y%m%d'
- :param project: type-string
- :param table: 表名 type-string
- :return: feature_array type-DataFrame
- """
- records = get_data_from_odps(date=date, project=project, table=table)
- feature_value_list = []
- for record in records:
- feature_value = {}
- for feature_name in process_feature.features:
- if feature_name == 'dt':
- feature_value[feature_name] = date
- else:
- feature_value[feature_name] = record[feature_name]
- feature_value_list.append(feature_value)
- feature_array = pd.DataFrame(feature_value_list)
- print(date, project, table, 'feature table finish')
- return feature_array
- def get_data_with_date(date, delta_days, project, table):
- """
- 获取某一时间范围的特征数据
- :param date: 标准日期,delta基准,type-string,'%Y%m%d'
- :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
- :param project: type-string
- :param table: DataWorks表名,type-string
- :return: data,type-DataFrame
- """
- base_date = dt.strptime(date, '%Y%m%d')
- data_list = []
- for days in range(0, delta_days):
- delta = datetime.timedelta(days=days)
- delta_date = base_date - delta
- # 获取特征数据
- delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
- data_list.append(delta_data)
- data = pd.concat(data_list)
- # 重新进行索引
- data.reset_index(inplace=True)
- # 删除index列
- data = data.drop(columns=['index'])
- return data
- def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
- """
- 将数据写入pickle文件中
- :param data: 数据
- :param filename: 写入的文件名
- :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
- :return: None
- """
- if not os.path.exists(filepath):
- os.makedirs(filepath)
- file = os.path.join(filepath, filename)
- with open(file, 'wb') as wf:
- pickle.dump(data, wf)
- def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
- """
- 从pickle文件读取数据
- :param filename: 文件名
- :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
- :return: data
- """
- file = os.path.join(filepath, filename)
- if not os.path.exists(file):
- return None
- with open(file, 'rb') as rf:
- data = pickle.load(rf)
- return data
- def get_train_predict_data():
- """
- 获取训练和预测数据
- :return: None
- """
- now_date = datetime.datetime.today()
- # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
- train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
- train_date = dt.strftime(train_dt, '%Y%m%d')
- train_data = get_data_with_date(
- date=train_date,
- delta_days=config_.TRAIN_DELTA_DAYS,
- project=config_.TRAIN_PROJECT,
- table=config_.TRAIN_TABLE
- )
- write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
- # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
- predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
- predict_date = dt.strftime(predict_dt, '%Y%m%d')
- predict_data = get_data_with_date(
- date=predict_date,
- delta_days=config_.PREDICT_DELTA_DAYS,
- project=config_.PREDICT_PROJECT,
- table=config_.PREDICT_TABLE
- )
- write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
- def process_data(data):
- """
- 数据清洗、预处理
- :param data: type-DataFrame
- :return:
- """
- pass
- if __name__ == '__main__':
- dt_test = '20211007'
- project_test = 'usercdm'
- table_test = 'rov_feature_add_v1'
- # res = get_rov_feature_table(dt_test, table_test)
- # res = get_data_with_date(date=dt_test, delta_days=2, project=project_test, table=table_test)
- # print(res.shape)
- # write_to_pickle(res, 'test.pickle')
- data = read_from_pickle('test.pickle')
- if data is not None:
- print(data.shape, type(data))
- print(list(data))
- print(data[data['futre7dayreturn']<0])
- else:
- print(data)
|