get_data.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import pandas as pd
  2. import datetime
  3. import process_feature
  4. from datetime import datetime as dt
  5. from my_config import set_config
  6. from my_utils import get_data_from_odps, write_to_pickle
  7. from log import Log
  8. config_, _ = set_config()
  9. log_ = Log()
  10. def get_rov_feature_table(date, project, table):
  11. """
  12. 从DataWorks表中获取对应的特征值
  13. :param date: 日期 type-string '%Y%m%d'
  14. :param project: type-string
  15. :param table: 表名 type-string
  16. :return: feature_array type-DataFrame
  17. """
  18. records = get_data_from_odps(date=date, project=project, table=table)
  19. feature_value_list = []
  20. for record in records:
  21. feature_value = {}
  22. for feature_name in process_feature.features:
  23. if feature_name == 'dt':
  24. feature_value[feature_name] = date
  25. else:
  26. feature_value[feature_name] = record[feature_name]
  27. feature_value_list.append(feature_value)
  28. feature_array = pd.DataFrame(feature_value_list)
  29. log_.info('feature table finished... date={}, shape={}'.format(date, feature_array.shape))
  30. return feature_array
  31. def get_data_with_date(date, delta_days, project, table):
  32. """
  33. 获取某一时间范围的特征数据
  34. :param date: 标准日期,delta基准,type-string,'%Y%m%d'
  35. :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
  36. :param project: type-string
  37. :param table: DataWorks表名,type-string
  38. :return: data,type-DataFrame
  39. """
  40. base_date = dt.strptime(date, '%Y%m%d')
  41. data_list = []
  42. for days in range(0, delta_days):
  43. delta = datetime.timedelta(days=days)
  44. delta_date = base_date - delta
  45. # 获取特征数据
  46. delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
  47. data_list.append(delta_data)
  48. data = pd.concat(data_list)
  49. # 重新进行索引
  50. data.reset_index(inplace=True)
  51. # 删除index列
  52. data = data.drop(columns=['index'])
  53. return data
  54. def get_train_predict_data():
  55. """
  56. 获取训练和预测数据
  57. :return: None
  58. """
  59. now_date = datetime.datetime.today()
  60. log_.info('now date: {}'.format(now_date))
  61. # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
  62. log_.info('===== train data')
  63. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  64. train_date = dt.strftime(train_dt, '%Y%m%d')
  65. train_data = get_data_with_date(
  66. date=train_date,
  67. delta_days=config_.TRAIN_DELTA_DAYS,
  68. project=config_.TRAIN_PROJECT,
  69. table=config_.TRAIN_TABLE
  70. )
  71. write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
  72. log_.info('train data finished, shape={}'.format(train_data.shape))
  73. # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
  74. log_.info('===== predict data')
  75. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  76. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  77. predict_data = get_data_with_date(
  78. date=predict_date,
  79. delta_days=config_.PREDICT_DELTA_DAYS,
  80. project=config_.PREDICT_PROJECT,
  81. table=config_.PREDICT_TABLE
  82. )
  83. write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
  84. log_.info('predict data finished, shape={}'.format(predict_data.shape))
  85. # ###### app_type: [18, 19]预测数据
  86. # for app_type in [config_.APP_TYPE['LAO_HAO_KAN_VIDEO'], config_.APP_TYPE['ZUI_JING_QI']]:
  87. # log_.info(f"app_type = {app_type}")
  88. # project = config_.PREDICT_PROJECT_18_19[str(app_type)]
  89. # table = config_.PREDICT_TABLE_18_19[str(app_type)]
  90. # predict_data = get_data_with_date(
  91. # date=predict_date,
  92. # delta_days=config_.PREDICT_DELTA_DAYS,
  93. # project=project,
  94. # table=table
  95. # )
  96. # write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME_18_19[str(app_type)])
  97. # log_.info(f'predict data finished, app_type = {app_type}, shape={predict_data.shape}')
  98. if __name__ == '__main__':
  99. get_train_predict_data()