liqian 3 years ago
parent
commit
4251c04e41
5 changed files with 319 additions and 152 deletions
  1. 97 0
      get_data.py
  2. 58 0
      log.py
  3. 2 2
      process_feature.py
  4. 98 150
      rov_train.py
  5. 64 0
      utils.py

+ 97 - 0
get_data.py

@@ -0,0 +1,97 @@
+import pandas as pd
+import datetime
+import process_feature
+
+from datetime import datetime as dt
+from config import set_config
+from utils import get_data_from_odps, write_to_pickle
+from log import Log
+
+config_ = set_config()
+log_ = Log()
+
+
+def get_rov_feature_table(date, project, table):
+    """
+    从DataWorks表中获取对应的特征值
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :return: feature_array type-DataFrame
+    """
+    records = get_data_from_odps(date=date, project=project, table=table)
+    feature_value_list = []
+    for record in records:
+        feature_value = {}
+        for feature_name in process_feature.features:
+            if feature_name == 'dt':
+                feature_value[feature_name] = date
+            else:
+                feature_value[feature_name] = record[feature_name]
+        feature_value_list.append(feature_value)
+    feature_array = pd.DataFrame(feature_value_list)
+    log_.info('feature table finished... date={}, shape={}'.format(date, feature_array.shape))
+    return feature_array
+
+
+def get_data_with_date(date, delta_days, project, table):
+    """
+    获取某一时间范围的特征数据
+    :param date: 标准日期,delta基准,type-string,'%Y%m%d'
+    :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
+    :param project: type-string
+    :param table: DataWorks表名,type-string
+    :return: data,type-DataFrame
+    """
+    base_date = dt.strptime(date, '%Y%m%d')
+    data_list = []
+    for days in range(0, delta_days):
+        delta = datetime.timedelta(days=days)
+        delta_date = base_date - delta
+        # 获取特征数据
+        delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
+        data_list.append(delta_data)
+    data = pd.concat(data_list)
+    # 重新进行索引
+    data.reset_index(inplace=True)
+    # 删除index列
+    data = data.drop(columns=['index'])
+    return data
+
+
+def get_train_predict_data():
+    """
+    获取训练和预测数据
+    :return: None
+    """
+    now_date = datetime.datetime.today()
+    log_.info('now date: {}'.format(now_date))
+    # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
+    log_.info('===== train data')
+    train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
+    train_date = dt.strftime(train_dt, '%Y%m%d')
+    train_data = get_data_with_date(
+        date=train_date,
+        delta_days=config_.TRAIN_DELTA_DAYS,
+        project=config_.TRAIN_PROJECT,
+        table=config_.TRAIN_TABLE
+    )
+    write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
+    log_.info('train data finished, shape={}'.format(train_data.shape))
+
+    # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
+    log_.info('===== predict data')
+    predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
+    predict_date = dt.strftime(predict_dt, '%Y%m%d')
+    predict_data = get_data_with_date(
+        date=predict_date,
+        delta_days=config_.PREDICT_DELTA_DAYS,
+        project=config_.PREDICT_PROJECT,
+        table=config_.PREDICT_TABLE
+    )
+    write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
+    log_.info('predict data finished, shape={}'.format(predict_data.shape))
+
+
+if __name__ == '__main__':
+    get_train_predict_data()

+ 58 - 0
log.py

@@ -0,0 +1,58 @@
+import os
+import logging
+import time
+
+
+class Log(object):
+    # def __init__(self, pag_source, log_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "logs")):
+    def __init__(self, log_path=os.path.join(os.path.dirname(os.path.realpath(__file__)), "logs")):
+        if not os.path.exists(log_path):
+            os.makedirs(log_path)
+
+        # 文件的命名
+        # self.logname = os.path.join(log_path, '{}_{}.log'.format(pag_source, time.strftime('%Y%m%d')))
+        self.logname = os.path.join(log_path, '{}.log'.format(time.strftime('%Y%m%d')))
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+        # 日志输出格式
+        self.formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+        # self.formatter = logging.Formatter('%(asctime)s [%(filename)s] %(levelname)s: %(message)s')
+
+    def __console(self, level, message):
+        # 创建一个FileHandler,用于写到本地
+        fh = logging.FileHandler(self.logname, 'a', encoding='utf-8')
+        fh.setLevel(logging.DEBUG)
+        fh.setFormatter(self.formatter)
+        self.logger.addHandler(fh)
+
+        # 创建一个StreamHandler,用于输出到控制台
+        ch = logging.StreamHandler()
+        ch.setLevel(logging.DEBUG)
+        ch.setFormatter(self.formatter)
+        self.logger.addHandler(ch)
+
+        if level == 'info':
+            self.logger.info(message)
+        elif level == 'debug':
+            self.logger.debug(message)
+        elif level == 'warning':
+            self.logger.warning(message)
+        elif level == 'error':
+            self.logger.error(message)
+        # 这两行代码是为了避免日志输出重复问题
+        self.logger.removeHandler(ch)
+        self.logger.removeHandler(fh)
+        # 关闭打开的文件
+        fh.close()
+
+    def debug(self, message):
+        self.__console('debug', message)
+
+    def info(self, message):
+        self.__console('info', message)
+
+    def warning(self, message):
+        self.__console('warning', message)
+
+    def error(self, message):
+        self.__console('error', message)

+ 2 - 2
process_feature.py

@@ -23,7 +23,7 @@ features_name_list = [
     'day30sharecount',
     'day60sharecount',
 
-    'day1returncount',
+    'day1returncount',  # 一层回流
     'day3returncount',
     'day7returncount',
     'day14returncount',
@@ -71,7 +71,7 @@ features_name_list = [
     'day5returncount_3_stage',
     'day5returncount_4_stage',
 
-    'stage_one_retrn',
+    'stage_one_retrn',  # 首页一层回流
     'stage_two_retrn',
     'stage_three_retrn',
     'stage_four_retrn']

+ 98 - 150
rov_train.py

@@ -1,175 +1,123 @@
-import pandas as pd
-import datetime
-import pickle
-import os
-import process_feature
-
-from odps import ODPS
-from datetime import datetime as dt
+import lightgbm as lgb
+
+from sklearn.model_selection import train_test_split
+from sklearn.metrics import mean_absolute_error, r2_score
 from config import set_config
+from utils import read_from_pickle
+from log import Log
 
 config_ = set_config()
+log_ = Log()
 
 
-def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
-                       pool_maxsize=1000, pool_connections=1000):
-    """
-    从odps获取数据
-    :param date: 日期 type-string '%Y%m%d'
-    :param project: type-string
-    :param table: 表名 type-string
-    :param connect_timeout: 连接超时设置
-    :param read_timeout: 读取超时设置
-    :param pool_maxsize:
-    :param pool_connections:
-    :return: records
-    """
-    odps = ODPS(
-        access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
-        secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
-        project=project,
-        endpoint='http://service.cn.maxcompute.aliyun.com/api',
-        connect_timeout=connect_timeout,
-        read_timeout=read_timeout,
-        pool_maxsize=pool_maxsize,
-        pool_connections=pool_connections
-    )
-    records = odps.read_table(name=table, partition='dt=%s' % date)
-    return records
-
-
-def get_rov_feature_table(date, project, table):
-    """
-    从DataWorks表中获取对应的特征值
-    :param date: 日期 type-string '%Y%m%d'
-    :param project: type-string
-    :param table: 表名 type-string
-    :return: feature_array type-DataFrame
-    """
-    records = get_data_from_odps(date=date, project=project, table=table)
-    feature_value_list = []
-    for record in records:
-        feature_value = {}
-        for feature_name in process_feature.features:
-            if feature_name == 'dt':
-                feature_value[feature_name] = date
-            else:
-                feature_value[feature_name] = record[feature_name]
-        feature_value_list.append(feature_value)
-    feature_array = pd.DataFrame(feature_value_list)
-    print(date, project, table, 'feature table finish')
-    return feature_array
-
-
-def get_data_with_date(date, delta_days, project, table):
+def process_data(filename):
     """
-    获取某一时间范围的特征数据
-    :param date: 标准日期,delta基准,type-string,'%Y%m%d'
-    :param delta_days: 日期范围(天),type-int,「 >0: date前,<0: date后 」
-    :param project: type-string
-    :param table: DataWorks表名,type-string
-    :return: data,type-DataFrame
-    """
-    base_date = dt.strptime(date, '%Y%m%d')
-    data_list = []
-    for days in range(0, delta_days):
-        delta = datetime.timedelta(days=days)
-        delta_date = base_date - delta
-        # 获取特征数据
-        delta_data = get_rov_feature_table(date=delta_date.strftime('%Y%m%d'), project=project, table=table)
-        data_list.append(delta_data)
-    data = pd.concat(data_list)
-    # 重新进行索引
-    data.reset_index(inplace=True)
-    # 删除index列
-    data = data.drop(columns=['index'])
-    return data
-
-
-def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
-    """
-    将数据写入pickle文件中
-    :param data: 数据
-    :param filename: 写入的文件名
-    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
-    :return: None
+    数据清洗、预处理
+    :param filename: type-DataFrame
+    :return: x, y, video_ids, features
     """
-    if not os.path.exists(filepath):
-        os.makedirs(filepath)
-    file = os.path.join(filepath, filename)
-    with open(file, 'wb') as wf:
-        pickle.dump(data, wf)
+    # 获取数据
+    data = read_from_pickle(filename)
+    # 获取y,并将 y <= 0 的值更新为1
+    data['futre7dayreturn'].loc[data['futre7dayreturn'] <= 0] = 1
+    y = data['futre7dayreturn']
 
+    # 获取视频id列
+    video_ids = data['videoid']
 
-def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
-    """
-    从pickle文件读取数据
-    :param filename: 文件名
-    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
-    :return: data
-    """
-    file = os.path.join(filepath, filename)
-    if not os.path.exists(file):
-        return None
-    with open(file, 'rb') as rf:
-        data = pickle.load(rf)
-    return data
+    # 获取x
+    drop_columns = ['videoid', 'dt', 'futre7dayreturn', 'videotags', 'words_without_tags']
+    x = data.drop(columns=drop_columns)
+    # 计算后一天的回流比前一天的回流差值
+    x['stage_four_return_added'] = x['stage_four_retrn'] - x['stage_three_retrn']
+    x['stage_three_return_added'] = x['stage_three_retrn'] - x['stage_two_retrn']
+    x['stage_two_return_added'] = x['stage_two_retrn'] - x['stage_one_retrn']
+    # 计算后一天回流比前一天回流的增长率
+    x['stage_four_return_ratio'] = x['stage_four_return_added'] / x['stage_four_retrn']
+    x['stage_three_return_ratio'] = x['stage_three_return_added'] / x['stage_three_retrn']
+    x['stage_two_return_ratio'] = x['stage_two_return_added'] / x['stage_two_retrn']
 
+    # 缺失值填充为0
+    x.fillna(0)
 
-def get_train_predict_data():
-    """
-    获取训练和预测数据
-    :return: None
-    """
-    now_date = datetime.datetime.today()
-    # ###### 训练数据 - 从7天前获取前30天的数据,写入pickle文件
-    train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
-    train_date = dt.strftime(train_dt, '%Y%m%d')
-    train_data = get_data_with_date(
-        date=train_date,
-        delta_days=config_.TRAIN_DELTA_DAYS,
-        project=config_.TRAIN_PROJECT,
-        table=config_.TRAIN_TABLE
-    )
-    write_to_pickle(data=train_data, filename=config_.TRAIN_DATA_FILENAME)
-
-    # ###### 预测数据 - 从1天前获取前1天的数据,写入pickle文件
-    predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
-    predict_date = dt.strftime(predict_dt, '%Y%m%d')
-    predict_data = get_data_with_date(
-        date=predict_date,
-        delta_days=config_.PREDICT_DELTA_DAYS,
-        project=config_.PREDICT_PROJECT,
-        table=config_.PREDICT_TABLE
-    )
-    write_to_pickle(data=predict_data, filename=config_.PREDICT_DATA_FILENAME)
-
-
-def process_data(data):
+    # 获取当前所使用的特征列表
+    features = list(x)
+
+    return x, y, video_ids, features
+
+
+def train(x, y):
     """
-    数据清洗、预处理
-    :param data: type-DataFrame
+    训练模型
+    :param x:
+    :param y:
     :return:
     """
-    pass
+    # 训练集、测试集分割
+    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.33)
+    log_.info('x_train shape: {}, y_train shape: {}'.format(x_train.shape, y_train.shape))
+    log_.info('x_test shape: {}, y_test shape: {}'.format(x_test.shape, y_test.shape))
+
+    # 训练参数设置
+    params = {
+        "objective": "regression",
+        "reg_sqrt": True,
+        "metric": "mape",
+        "max_depth": -1,
+        "num_leaves": 50,
+        "learning_rate": 0.1,
+        "bagging_fraction": 0.7,
+        "feature_fraction": 0.7,
+        "bagging_freq": 8,
+        "bagging_seed": 2018,
+        "lambda_l1": 0.11,
+        "boosting": "dart",
+        "nthread": 4,
+        "verbosity": -1
+    }
+    # 初始化数据集
+    train_set = lgb.Dataset(data=x_train, label=y_train)
+    test_set = lgb.Dataset(data=x_test, label=y_test)
+    # 模型训练
+    evals_result = {}
+
+    model = lgb.train(params=params, train_set=train_set, num_boost_round=5000,
+                      valid_sets=[test_set], early_stopping_rounds=100,
+                      verbose_eval=100, evals_result=evals_result)
+
+    # 测试集预测
+    pre_test_y = model.predict(data=x_test, num_iteration=model.best_iteration)
+    y_test = y_test.values
+
+    err_mae = mean_absolute_error(y_test, pre_test_y)
+    r2 = r2_score(y_test, pre_test_y)
+
+    print(err_mae, r2)
 
 
 if __name__ == '__main__':
-    dt_test = '20211007'
-    project_test = 'usercdm'
-    table_test = 'rov_feature_add_v1'
+    # dt_test = '20211007'
+    # project_test = 'usercdm'
+    # table_test = 'rov_feature_add_v1'
     # res = get_rov_feature_table(dt_test, table_test)
     # res = get_data_with_date(date=dt_test, delta_days=2, project=project_test, table=table_test)
     # print(res.shape)
     # write_to_pickle(res, 'test.pickle')
 
-    data = read_from_pickle('test.pickle')
-    if data is not None:
-        print(data.shape, type(data))
-        print(list(data))
-        print(data[data['futre7dayreturn']<0])
-    else:
-        print(data)
+    # data = read_from_pickle('test.pickle')
+    # if data is not None:
+    #     print(data.shape, type(data))
+    #     print(list(data))
+    #     print(data[data['futre7dayreturn']<0])
+    # else:
+    #     print(data)
+
+    train_filename = config_.TRAIN_DATA_FILENAME
+    x, y, videos, fea = process_data(filename=train_filename)
+    print(x.shape, y.shape)
+    print(len(fea), fea)
+    train(x, y)
+
 
 
 

+ 64 - 0
utils.py

@@ -0,0 +1,64 @@
+import pickle
+import os
+
+from odps import ODPS
+from config import set_config
+
+config_ = set_config()
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    odps = ODPS(
+        access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
+        secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
+        project=project,
+        endpoint='http://service.cn.maxcompute.aliyun.com/api',
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
+    )
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+
+def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
+    """
+    将数据写入pickle文件中
+    :param data: 数据
+    :param filename: 写入的文件名
+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
+    :return: None
+    """
+    if not os.path.exists(filepath):
+        os.makedirs(filepath)
+    file = os.path.join(filepath, filename)
+    with open(file, 'wb') as wf:
+        pickle.dump(data, wf)
+
+
+def read_from_pickle(filename, filepath=config_.DATA_DIR_PATH):
+    """
+    从pickle文件读取数据
+    :param filename: 文件名
+    :param filepath: 文件存放路径,默认为config_.DATA_DIR_PATH
+    :return: data
+    """
+    file = os.path.join(filepath, filename)
+    if not os.path.exists(file):
+        return None
+    with open(file, 'rb') as rf:
+        data = pickle.load(rf)
+    return data