import pandas as pd import datetime import pickle import process_feature from odps import ODPS from datetime import datetime as dt def get_rov_feature_table(date, table): """ 从DataWorks表中获取对应的特征值 :param date: 日期 type-string '%Y%m%d' :param table: 表名 type-string :return: feature_array type-DataFrame """ odps = ODPS( access_id='LTAI4FtW5ZzxMvdw35aNkmcp', secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc', project='usercdm', endpoint='http://service.cn.maxcompute.aliyun.com/api', connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) feature_value_list = [] for record in odps.read_table(name=table, partition='dt=%s' % date): feature_value = {} for feature_name in process_feature.features: if feature_name == 'dt': feature_value[feature_name] = date else: feature_value[feature_name] = record[feature_name] feature_value_list.append(feature_value) feature_array = pd.DataFrame(feature_value_list) print(date, table, 'feature table finish') return feature_array def get_data_with_date(date, delta_days, table): """ 获取某一时间范围的特征数据 :param date: 标准日期,delta基准,type-string,'%Y%m%d' :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」 :param table: DataWorks表名,type-string :return: data,type-DataFrame """ base_date = dt.strptime(date, '%Y%m%d') data_list = [] for days in range(0, delta_days): delta = datetime.timedelta(days=days) delta_date = base_date - delta # 获取特征数据 delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), table=table) data_list.append(delta_data) data = pd.concat(data_list) # 重新进行索引 data.reset_index(inplace=True) # 删除index列 data = data.drop(columns=['index']) return data if __name__ == '__main__': dt_test = '20211006' table_test = 'rov_feature_add_v1' # res = get_rov_feature_table(dt_test, table_test) res = get_data_with_date(date=dt_test, delta_days=3, table=table_test) print(res.shape)