rov_train.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import pandas as pd
  2. import datetime
  3. import pickle
  4. import os
  5. import process_feature
  6. from odps import ODPS
  7. from datetime import datetime as dt
  8. from config import set_config
  9. config_ = set_config()
  10. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  11. pool_maxsize=1000, pool_connections=1000):
  12. """
  13. 从odps获取数据
  14. :param date: 日期 type-string '%Y%m%d'
  15. :param project: type-string
  16. :param table: 表名 type-string
  17. :param connect_timeout: 连接超时设置
  18. :param read_timeout: 读取超时设置
  19. :param pool_maxsize:
  20. :param pool_connections:
  21. :return: records
  22. """
  23. odps = ODPS(
  24. access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
  25. secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
  26. project=project,
  27. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  28. connect_timeout=connect_timeout,
  29. read_timeout=read_timeout,
  30. pool_maxsize=pool_maxsize,
  31. pool_connections=pool_connections
  32. )
  33. records = odps.read_table(name=table, partition='dt=%s' % date)
  34. return records
  35. def get_rov_feature_table(date, project, table):
  36. """
  37. 从DataWorks表中获取对应的特征值
  38. :param date: 日期 type-string '%Y%m%d'
  39. :param project: type-string
  40. :param table: 表名 type-string
  41. :return: feature_array type-DataFrame
  42. """
  43. records = get_data_from_odps(date=date, project=project, table=table)
  44. feature_value_list = []
  45. for record in records:
  46. feature_value = {}
  47. for feature_name in process_feature.features:
  48. if feature_name == 'dt':
  49. feature_value[feature_name] = date
  50. else:
  51. feature_value[feature_name] = record[feature_name]
  52. feature_value_list.append(feature_value)
  53. feature_array = pd.DataFrame(feature_value_list)
  54. print(date, project, table, 'feature table finish')
  55. return feature_array
  56. def get_data_with_date(date, delta_days, project, table):
  57. """
  58. 获取某一时间范围的特征数据
  59. :param date: 标准日期,delta基准,type-string,'%Y%m%d'
  60. :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
  61. :param project: type-string
  62. :param table: DataWorks表名,type-string
  63. :return: data,type-DataFrame
  64. """
  65. base_date = dt.strptime(date, '%Y%m%d')
  66. data_list = []
  67. for days in range(0, delta_days):
  68. delta = datetime.timedelta(days=days)
  69. delta_date = base_date - delta
  70. # 获取特征数据
  71. delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
  72. data_list.append(delta_data)
  73. data = pd.concat(data_list)
  74. # 重新进行索引
  75. data.reset_index(inplace=True)
  76. # 删除index列
  77. data = data.drop(columns=['index'])
  78. return data
  79. def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
  80. """
  81. 将数据写入pickle文件中
  82. :param data: 数据
  83. :param filename: 写入的文件名
  84. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  85. :return: None
  86. """
  87. if not os.path.exists(filepath):
  88. os.makedirs(filepath)
  89. file = os.path.join(filepath, filename)
  90. with open(file, 'wb') as wf:
  91. pickle.dump(data, wf)
  92. def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
  93. """
  94. 从pickle文件读取数据
  95. :param filename: 文件名
  96. :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
  97. :return: data
  98. """
  99. file = os.path.join(filepath, filename)
  100. if not os.path.exists(file):
  101. return None
  102. with open(file, 'rb') as rf:
  103. data = pickle.load(rf)
  104. return data
  105. def get_train_predict_data():
  106. """
  107. 获取训练和预测数据
  108. :return: None
  109. """
  110. now_date = datetime.datetime.today()
  111. # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
  112. train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
  113. train_date = dt.strftime(train_dt, '%Y%m%d')
  114. train_data = get_data_with_date(
  115. date=train_date,
  116. delta_days=config_.TRAIN_DELTA_DAYS,
  117. project=config_.TRAIN_PROJECT,
  118. table=config_.TRAIN_TABLE
  119. )
  120. write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
  121. # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
  122. predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
  123. predict_date = dt.strftime(predict_dt, '%Y%m%d')
  124. predict_data = get_data_with_date(
  125. date=predict_date,
  126. delta_days=config_.PREDICT_DELTA_DAYS,
  127. project=config_.PREDICT_PROJECT,
  128. table=config_.PREDICT_TABLE
  129. )
  130. write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
  131. def process_data(data):
  132. """
  133. 数据清洗、预处理
  134. :param data: type-DataFrame
  135. :return:
  136. """
  137. pass
  138. if __name__ == '__main__':
  139. dt_test = '20211007'
  140. project_test = 'usercdm'
  141. table_test = 'rov_feature_add_v1'
  142. # res = get_rov_feature_table(dt_test, table_test)
  143. # res = get_data_with_date(date=dt_test, delta_days=2, project=project_test, table=table_test)
  144. # print(res.shape)
  145. # write_to_pickle(res, 'test.pickle')
  146. data = read_from_pickle('test.pickle')
  147. if data is not None:
  148. print(data.shape, type(data))
  149. print(list(data))
  150. print(data[data['futre7dayreturn']<0])
  151. else:
  152. print(data)