import pandas as pd
import datetime
import process_feature
from datetime import datetime as dt
from config import set_config
from utils import get_data_from_odps, write_to_pickle
from log import Log
config_, _ = set_config()
log_ = Log()
def get_rov_feature_table(date, project, table):
"""
从DataWorks表中获取对应的特征值
:param date: 日期 type-string '%Y%m%d'
:param project: type-string
:param table: 表名 type-string
:return: feature_array type-DataFrame
"""
records = get_data_from_odps(date=date, project=project, table=table)
feature_value_list = []
for record in records:
feature_value = {}
for feature_name in process_feature.features:
if feature_name == 'dt':
feature_value[feature_name] = date
else:
feature_value[feature_name] = record[feature_name]
feature_value_list.append(feature_value)
feature_array = pd.DataFrame(feature_value_list)
log_.info('feature table finished... date={}, shape={}'.format(date, feature_array.shape))
return feature_array
def get_data_with_date(date, delta_days, project, table):
"""
获取某一时间范围的特征数据
:param date: 标准日期,delta基准,type-string,'%Y%m%d'
:param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
:param project: type-string
:param table: DataWorks表名,type-string
:return: data,type-DataFrame
"""
base_date = dt.strptime(date, '%Y%m%d')
data_list = []
for days in range(0, delta_days):
delta = datetime.timedelta(days=days)
delta_date = base_date - delta
# 获取特征数据
delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
data_list.append(delta_data)
data = pd.concat(data_list)
# 重新进行索引
data.reset_index(inplace=True)
# 删除index列
data = data.drop(columns=['index'])
return data
def get_train_predict_data():
"""
获取训练和预测数据
:return: None
"""
now_date = datetime.datetime.today()
log_.info('now date: {}'.format(now_date))
# ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
log_.info('===== train data')
train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
train_date = dt.strftime(train_dt, '%Y%m%d')
train_data = get_data_with_date(
date=train_date,
delta_days=config_.TRAIN_DELTA_DAYS,
project=config_.TRAIN_PROJECT,
table=config_.TRAIN_TABLE
)
write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
log_.info('train data finished, shape={}'.format(train_data.shape))
# ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
log_.info('===== predict data')
predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
predict_date = dt.strftime(predict_dt, '%Y%m%d')
predict_data = get_data_with_date(
date=predict_date,
delta_days=config_.PREDICT_DELTA_DAYS,
project=config_.PREDICT_PROJECT,
table=config_.PREDICT_TABLE
)
write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
log_.info('predict data finished, shape={}'.format(predict_data.shape))
# ###### app_type: [18, 19]预测数据
# for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
# log_.info(f"app_type = {app_type}")
# project = config_.PREDICT_PROJECT_18_19[str(app_type)]
# table = config_.PREDICT_TABLE_18_19[str(app_type)]
# predict_data = get_data_with_date(
# date=predict_date,
# delta_days=config_.PREDICT_DELTA_DAYS,
# project=project,
# table=table
# )
# write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME_18_19[str(app_type)])
# log_.info(f'predict data finished, app_type = {app_type}, shape={predict_data.shape}')
if __name__ == '__main__':
get_train_predict_data()