liqian %!s(int64=3) %!d(string=hai) anos
pai
achega
9fb32cea6f
Modificáronse 3 ficheiros con 148 adicións e 16 borrados
  1. 2 1
      .gitignore
  2. 27 0
      config.py
  3. 119 15
      rov_train.py

+ 2 - 1
.gitignore

@@ -59,4 +59,5 @@ docs/_build/
 target/
 
 .idea/
-.DS_Store
+.DS_Store
+./data

+ 27 - 0
config.py

@@ -0,0 +1,27 @@
+class BaseConfig(object):
+    # 数据存放路径
+    DATA_DIR_PATH = './data'
+
+    # 训练数据截止时间距当前日期间隔天数
+    TRAIN_DIFF = 7
+    # 训练数据所需数据范围天数
+    TRAIN_DELTA_DAYS = 30
+    # 训练数据表名
+    TRAIN_PROJECT = 'usercdm'
+    TRAIN_TABLE = 'rov_feature_add_v1'
+    # 训练数据文件存放路径
+    TRAIN_DATA_FILENAME = 'train_data.pickle'
+
+    # 预测数据截止时间距当前日期间隔天数
+    PREDICT_DIFF = 1
+    # 预测数据所需数据范围天数
+    PREDICT_DELTA_DAYS = 1
+    # 预测数据表名
+    PREDICT_PROJECT = 'usercdm'
+    PREDICT_TABLE = 'rov_predict_table_add_v1'
+    # 预测数据文件存放路径
+    PREDICT_DATA_FILENAME = 'predict_data.pickle'
+
+
+def set_config():
+    return BaseConfig()

+ 119 - 15
rov_train.py

@@ -1,31 +1,54 @@
 import pandas as pd
 import datetime
 import pickle
+import os
 import process_feature
 
 from odps import ODPS
 from datetime import datetime as dt
+from config import set_config
 
+config_ = set_config()
 
-def get_rov_feature_table(date, table):
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
     """
-    从DataWorks表中获取对应的特征值
+    从odps获取数据
     :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
     :param table: 表名 type-string
-    :return: feature_array type-DataFrame
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
     """
     odps = ODPS(
         access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
         secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
-        project='usercdm',
+        project=project,
         endpoint='http://service.cn.maxcompute.aliyun.com/api',
-        connect_timeout=3000,
-        read_timeout=500000,
-        pool_maxsize=1000,
-        pool_connections=1000
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
     )
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+
+def get_rov_feature_table(date, project, table):
+    """
+    从DataWorks表中获取对应的特征值
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :return: feature_array type-DataFrame
+    """
+    records = get_data_from_odps(date=date, project=project, table=table)
     feature_value_list = []
-    for record in odps.read_table(name=table, partition='dt=%s' % date):
+    for record in records:
         feature_value = {}
         for feature_name in process_feature.features:
             if feature_name == 'dt':
@@ -34,15 +57,16 @@ def get_rov_feature_table(date, table):
                 feature_value[feature_name] = record[feature_name]
         feature_value_list.append(feature_value)
     feature_array = pd.DataFrame(feature_value_list)
-    print(date, table, 'feature table finish')
+    print(date, project, table, 'feature table finish')
     return feature_array
 
 
-def get_data_with_date(date, delta_days, table):
+def get_data_with_date(date, delta_days, project, table):
     """
     获取某一时间范围的特征数据
     :param date: 标准日期,delta基准,type-string,'%Y%m%d'
     :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
+    :param project: type-string
     :param table: DataWorks表名,type-string
     :return: data,type-DataFrame
     """
@@ -52,7 +76,7 @@ def get_data_with_date(date, delta_days, table):
         delta = datetime.timedelta(days=days)
         delta_date = base_date - delta
         # 获取特征数据
-        delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), table=table)
+        delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
         data_list.append(delta_data)
     data = pd.concat(data_list)
     # 重新进行索引
@@ -62,10 +86,90 @@ def get_data_with_date(date, delta_days, table):
     return data
 
 
+def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
+    """
+    将数据写入pickle文件中
+    :param data: 数据
+    :param filename: 写入的文件名
+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
+    :return: None
+    """
+    if not os.path.exists(filepath):
+        os.makedirs(filepath)
+    file = os.path.join(filepath, filename)
+    with open(file, 'wb') as wf:
+        pickle.dump(data, wf)
+
+
+def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
+    """
+    从pickle文件读取数据
+    :param filename: 文件名
+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
+    :return: data
+    """
+    file = os.path.join(filepath, filename)
+    if not os.path.exists(file):
+        return None
+    with open(file, 'rb') as rf:
+        data = pickle.load(rf)
+    return data
+
+
+def get_train_predict_data():
+    """
+    获取训练和预测数据
+    :return: None
+    """
+    now_date = datetime.datetime.today()
+    # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
+    train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
+    train_date = dt.strftime(train_dt, '%Y%m%d')
+    train_data = get_data_with_date(
+        date=train_date,
+        delta_days=config_.TRAIN_DELTA_DAYS,
+        project=config_.TRAIN_PROJECT,
+        table=config_.TRAIN_TABLE
+    )
+    write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
+
+    # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
+    predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
+    predict_date = dt.strftime(predict_dt, '%Y%m%d')
+    predict_data = get_data_with_date(
+        date=predict_date,
+        delta_days=config_.PREDICT_DELTA_DAYS,
+        project=config_.PREDICT_PROJECT,
+        table=config_.PREDICT_TABLE
+    )
+    write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
+
+
+def process_data(data):
+    """
+    数据清洗、预处理
+    :param data: type-DataFrame
+    :return:
+    """
+    pass
+
+
 if __name__ == '__main__':
-    dt_test = '20211006'
+    dt_test = '20211007'
+    project_test = 'usercdm'
     table_test = 'rov_feature_add_v1'
     # res = get_rov_feature_table(dt_test, table_test)
-    res = get_data_with_date(date=dt_test, delta_days=3, table=table_test)
-    print(res.shape)
+    # res = get_data_with_date(date=dt_test, delta_days=2, project=project_test, table=table_test)
+    # print(res.shape)
+    # write_to_pickle(res, 'test.pickle')
+
+    data = read_from_pickle('test.pickle')
+    if data is not None:
+        print(data.shape, type(data))
+        print(list(data))
+        print(data[data['futre7dayreturn']<0])
+    else:
+        print(data)
+
+