123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import datetime
- import os
- import traceback
- from odps import ODPS
- from datetime import datetime as dt
- from threading import Timer
- from config import set_config
- from log import Log
- from utils import send_msg_to_feishu
- config_, env = set_config()
- log_ = Log()
- def rov_train_recall_pool_update():
- # 训练数据和预测数据都准备好时,更新模型,预测
- os.system('cd {} && sh {}/rov_train_recall_pool_update.sh'.format(config_.PROJECT_PATH, config_.PROJECT_PATH))
- # # 将日志上传到oss
- # log_cmd = "ossutil cp -r -f {} oss://{}/{}".format(log_.logname, config_.BUCKET_NAME,
- # config_.OSS_FOLDER_LOGS + 'rov_recall_pool/')
- # os.system(log_cmd)
- # 生产环境 - 将预测得到的ROV数据文件predict.csv上传到oss
- if env == 'pro':
- data_cmd = "ossutil cp -r -f {} oss://{}/{}dt={}/{}".format("{}/data/predict.csv".format(config_.PROJECT_PATH),
- config_.BUCKET_NAME,
- config_.OSS_FOLDER_DATA,
- datetime.datetime.today().strftime('%Y%m%d'),
- 'predict.csv')
- os.system(data_cmd)
- def data_check(project, table, date):
- odps = ODPS(
- access_id='LTAI4FtW5ZzxMvdw35aNkmcp',
- secret_access_key='0VKnydcaHK3ITjylbgUsLubX6rwiwc',
- project=project,
- endpoint='http://service.cn.maxcompute.aliyun.com/api',
- connect_timeout=3000,
- read_timeout=500000,
- pool_maxsize=1000,
- pool_connections=1000
- )
- try:
- sql = "select * from {}.{} where dt = {}".format(project, table, date)
- with odps.execute_sql(sql=sql).open_reader() as reader:
- feature_count = reader.count
- except Exception as e:
- feature_count = 0
- return feature_count
- def timer_check():
- # 当前日期
- now_date = datetime.datetime.today()
- # 训练数据 最近日期分区
- train_dt = now_date - datetime.timedelta(days=config_.TRAIN_DIFF)
- train_date = dt.strftime(train_dt, '%Y%m%d')
- # 预测数据 最近日期分区
- predict_dt = now_date - datetime.timedelta(days=config_.PREDICT_DIFF)
- predict_date = dt.strftime(predict_dt, '%Y%m%d')
- # 查看训练数据特征是否准备好
- train_feature_count = data_check(config_.TRAIN_PROJECT, config_.TRAIN_TABLE, train_date)
- # 查看训练数据特征是否准备好
- predict_feature_count = data_check(config_.PREDICT_PROJECT, config_.PREDICT_TABLE, predict_date)
- # 数据未准备好,1分钟后重新检查
- if train_feature_count == 0 or predict_feature_count == 0:
- # 数据未准备好,1分钟后重新检查
- Timer(60, timer_check).start()
- else:
- # 数据准备好,更新模型,预测
- rov_train_recall_pool_update()
- if __name__ == '__main__':
- try:
- timer_check()
- except Exception as e:
- log_.error('ROV召回池更新失败, exception: {}, traceback: {}'.format(e, traceback.format_exc()))
- send_msg_to_feishu('rov-offline{} - ROV召回池更新失败, exception: {}'.format(config_.ENV_TEXT, e))
|